Coverage for sparkle/selector/extractor.py: 57%
100 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-29 10:17 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-29 10:17 +0000
1"""Methods regarding feature extractors."""
3from __future__ import annotations
4from typing import Any
5from pathlib import Path
6import ast
7import subprocess
9import runrunner as rrr
10from runrunner.base import Status, Runner
11from runrunner.local import Run, LocalRun
13from sparkle.types import SparkleCallable, SolverStatus
14from sparkle.structures import FeatureDataFrame
15from sparkle.tools import RunSolver
16from sparkle.instance import InstanceSet
19class Extractor(SparkleCallable):
20 """Extractor base class for extracting features from instances."""
22 wrapper = "sparkle_extractor_wrapper.py"
23 extractor_cli = Path(__file__).parent / "extractor_cli.py"
25 def __init__(self: Extractor, directory: Path) -> None:
26 """Initialize solver.
28 Args:
29 directory: Directory of the solver.
30 runsolver_exec: Path to the runsolver executable.
31 By default, runsolver in directory.
32 """
33 super().__init__(directory)
34 self._features = None
35 self._feature_groups = None
36 self._groupwise_computation = None
38 def __str__(self: Extractor) -> str:
39 """Return the string representation of the extractor."""
40 return self.name
42 def __repr__(self: Extractor) -> str:
43 """Return detailed representation of the extractor."""
44 return (
45 f"{self.name}:\n"
46 f"\t- Directory: {self.directory}\n"
47 f"\t- Wrapper: {self.wrapper}\n"
48 f"\t- # Feature Groups: {len(self.feature_groups)}\n"
49 f"\t- Output Dimension (# Features): {self.output_dimension}\n"
50 f"\t- Groupwise Computation Enabled: {self.groupwise_computation}"
51 )
53 @property
54 def features(self: Extractor) -> list[tuple[str, str]]:
55 """Determines the features of the extractor."""
56 if self._features is None:
57 extractor_process = subprocess.run(
58 [self.directory / Extractor.wrapper, "-features"], capture_output=True
59 )
60 self._features = ast.literal_eval(extractor_process.stdout.decode())
61 return self._features
63 @property
64 def feature_groups(self: Extractor) -> list[str]:
65 """Returns the various feature groups the Extractor has."""
66 if self._feature_groups is None:
67 self._feature_groups = list(set([group for group, _ in self.features]))
68 return self._feature_groups
70 @property
71 def output_dimension(self: Extractor) -> int:
72 """The size of the output vector of the extractor."""
73 return len(self.features)
75 @property
76 def groupwise_computation(self: Extractor) -> bool:
77 """Determines if you can call the extractor per group for parallelisation."""
78 if self._groupwise_computation is None:
79 extractor_help = subprocess.run(
80 [self.directory / Extractor.wrapper, "-h"], capture_output=True
81 )
82 # Not the cleanest / most precise way to determine this
83 self._groupwise_computation = (
84 "-feature_group" in extractor_help.stdout.decode()
85 )
86 return self._groupwise_computation
88 def build_cmd(
89 self: Extractor,
90 instance: Path | list[Path],
91 feature_group: str = None,
92 output_file: Path = None,
93 cutoff_time: int = None,
94 log_dir: Path = None,
95 ) -> list[str]:
96 """Builds a command line string seperated by space.
98 Args:
99 instance: The instance to run on
100 feature_group: The optional feature group to run the extractor for.
101 output_file: Optional file to write the output to.
102 runsolver_args: The arguments for runsolver. If not present,
103 will run the extractor without runsolver.
104 cutoff_time: The maximum runtime.
105 log_dir: Directory path for logs.
107 Returns:
108 The command seperated per item in the list.
109 """
110 cmd_list_extractor = []
111 if not isinstance(instance, list):
112 instance = [instance]
113 cmd_list_extractor = [
114 f"{self.directory / Extractor.wrapper}",
115 "-extractor_dir",
116 f"{self.directory}/",
117 "-instance_file",
118 ] + [str(file) for file in instance]
119 if feature_group is not None:
120 cmd_list_extractor += ["-feature_group", feature_group]
121 if output_file is not None:
122 cmd_list_extractor += ["-output_file", str(output_file)]
123 if cutoff_time is not None:
124 # Extractor handles output file itself
125 return RunSolver.wrap_command(
126 self.runsolver_exec,
127 cmd_list_extractor,
128 cutoff_time,
129 log_dir,
130 log_name_base=self.name,
131 raw_results_file=False,
132 )
133 return cmd_list_extractor
135 def run(
136 self: Extractor,
137 instance: Path | list[Path],
138 feature_group: str = None,
139 output_file: Path = None,
140 cutoff_time: int = None,
141 log_dir: Path = None,
142 ) -> list[list[Any]] | list[Any] | None:
143 """Runs an extractor job with Runrunner.
145 Args:
146 extractor_path: Path to the executable
147 instance: Path to the instance to run on
148 feature_group: The feature group to compute. Must be supported by the
149 extractor to use.
150 output_file: Target output. If None, piped to the RunRunner job.
151 cutoff_time: CPU cutoff time in seconds
152 log_dir: Directory to write logs. Defaults to CWD.
154 Returns:
155 The features or None if an output file is used, or features can not be found.
156 """
157 log_dir = Path() if log_dir is None else log_dir
158 if feature_group is not None and not self.groupwise_computation:
159 # This extractor cannot handle groups, compute all features
160 feature_group = None
161 cmd_extractor = self.build_cmd(
162 instance, feature_group, output_file, cutoff_time, log_dir
163 )
164 run_on = Runner.LOCAL # TODO: Let this function also handle Slurm runs
165 extractor_run = rrr.add_to_queue(runner=run_on, cmd=" ".join(cmd_extractor))
166 if isinstance(extractor_run, LocalRun):
167 extractor_run.wait()
168 if extractor_run.status == Status.ERROR:
169 print(f"{self.name} failed to compute features for {instance}.")
170 for i, job in enumerate(extractor_run.jobs):
171 print(
172 f"Job {i} error yielded was:\n"
173 f"\t-stdout: '{job.stdout}'\n"
174 f"\t-stderr: '{job.stderr}'\n"
175 )
176 return None
177 # RunRunner adds a stamp before the statement
178 output = [
179 ast.literal_eval(job.stdout.split("\t", maxsplit=1)[-1])
180 for job in extractor_run.jobs
181 ]
182 if len(output) == 1:
183 return output[0]
184 return output
185 return None
187 def run_cli(
188 self: Extractor,
189 instance_set: InstanceSet | list[Path],
190 feature_dataframe: FeatureDataFrame,
191 cutoff_time: int,
192 feature_group: str = None,
193 run_on: Runner = Runner.SLURM,
194 sbatch_options: list[str] = None,
195 srun_options: list[str] = None,
196 parallel_jobs: int = None,
197 slurm_prepend: str | list[str] | Path = None,
198 dependencies: list[Run] = None,
199 log_dir: Path = None,
200 ) -> None:
201 """Run the Extractor CLI and write result to the FeatureDataFrame.
203 Args:
204 instance_set: The instance set to run the Extractor on.
205 feature_dataframe: The feature dataframe to write to.
206 cutoff_time: CPU cutoff time in seconds
207 feature_group: The feature group to compute. If left empty,
208 will run on all feature groups.
209 run_on: The runner to use.
210 sbatch_options: Additional options to pass to sbatch.
211 srun_options: Additional options to pass to srun.
212 parallel_jobs: Number of parallel jobs to run.
213 slurm_prepend: Slurm script to prepend to the sbatch
214 dependencies: List of dependencies to add to the job.
215 log_dir: The directory to write logs to.
216 """
217 instances = (
218 instance_set
219 if isinstance(instance_set, list)
220 else instance_set.instance_paths
221 )
222 log_dir = Path() if log_dir is None else log_dir
223 feature_group = f"--feature-group {feature_group} " if feature_group else ""
224 commands = [
225 f"python3 {Extractor.extractor_cli} "
226 f"--extractor {self.directory} "
227 f"--instance {instance_path} "
228 f"--feature-csv {feature_dataframe.csv_filepath} "
229 f"{feature_group}"
230 f"--cutoff {cutoff_time} "
231 f"--log-dir {log_dir}"
232 for instance_path in instances
233 ]
235 job_name = f"Run Extractor: {self.name} on {len(instances)} instances"
236 import subprocess
238 run = rrr.add_to_queue(
239 runner=run_on,
240 cmd=commands,
241 name=job_name,
242 stdout=None if run_on == Runner.LOCAL else subprocess.PIPE, # Print
243 stderr=None if run_on == Runner.LOCAL else subprocess.PIPE, # Print
244 base_dir=log_dir,
245 sbatch_options=sbatch_options,
246 srun_options=srun_options,
247 parallel_jobs=parallel_jobs,
248 prepend=slurm_prepend,
249 dependencies=dependencies,
250 )
251 if isinstance(run, LocalRun):
252 print("Waiting for the local calculations to finish.")
253 run.wait()
254 for job in run.jobs:
255 jobs_done = sum(j.status == Status.COMPLETED for j in run.jobs)
256 print(f"Executing Progress: {jobs_done} out of {len(run.jobs)}")
257 if jobs_done == len(run.jobs):
258 break
259 job.wait()
260 print("Computing features done!")
261 else:
262 print(f"Running {self.name} through Slurm with Job IDs: {run.run_id}")
263 return run
265 def get_feature_vector(
266 self: Extractor, result: Path, runsolver_values: Path = None
267 ) -> list[str]:
268 """Extracts feature vector from an output file.
270 Args:
271 result: The raw output of the extractor
272 runsolver_values: The output of runsolver.
274 Returns:
275 A list of features. Vector of missing values upon failure.
276 """
277 if (
278 result.exists()
279 and RunSolver.get_status(runsolver_values, None) != SolverStatus.TIMEOUT
280 ):
281 feature_values = ast.literal_eval(result.read_text())
282 return [str(value) for _, _, value in feature_values]
283 return [FeatureDataFrame.missing_value] * self.output_dimension