Coverage for src/sparkle/selector/extractor.py: 54%
106 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-15 14:11 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-15 14:11 +0000
1"""Methods regarding feature extractors."""
3from __future__ import annotations
4from typing import Any
5from pathlib import Path
6import ast
7import subprocess
9import runrunner as rrr
10from runrunner.base import Status, Runner
11from runrunner.local import Run, LocalRun
13from sparkle.types import SparkleCallable, SolverStatus
14from sparkle.structures import FeatureDataFrame
15from sparkle.tools import RunSolver
16from sparkle.instance import InstanceSet
19class Extractor(SparkleCallable):
20 """Extractor base class for extracting features from instances."""
22 wrapper = "sparkle_extractor_wrapper.py"
23 extractor_cli = Path(__file__).parent / "extractor_cli.py"
25 def __init__(self: Extractor, directory: Path) -> None:
26 """Initialize solver.
28 Args:
29 directory: Directory of the solver.
30 runsolver_exec: Path to the runsolver executable.
31 By default, runsolver in directory.
32 """
33 super().__init__(directory)
34 self._features = None
35 self._feature_groups = None
36 self._groupwise_computation = None
38 def __str__(self: Extractor) -> str:
39 """Return the string representation of the extractor."""
40 return self.name
42 def __repr__(self: Extractor) -> str:
43 """Return detailed representation of the extractor."""
44 return (
45 f"{self.name}:\n"
46 f"\t- Directory: {self.directory}\n"
47 f"\t- Wrapper: {self.wrapper}\n"
48 f"\t- # Feature Groups: {len(self.feature_groups)}\n"
49 f"\t- Output Dimension (# Features): {self.output_dimension}\n"
50 f"\t- Groupwise Computation Enabled: {self.groupwise_computation}"
51 )
53 @property
54 def features(self: Extractor) -> list[tuple[str, str]]:
55 """Determines the features of the extractor."""
56 if self._features is None:
57 extractor_process = subprocess.run(
58 [self.directory / Extractor.wrapper, "-features"], capture_output=True
59 )
60 self._features = ast.literal_eval(extractor_process.stdout.decode())
61 return self._features
63 @property
64 def feature_groups(self: Extractor) -> list[str]:
65 """Returns the various feature groups the Extractor has."""
66 if self._feature_groups is None:
67 self._feature_groups = list(set([group for group, _ in self.features]))
68 return self._feature_groups
70 @property
71 def output_dimension(self: Extractor) -> int:
72 """The size of the output vector of the extractor."""
73 return len(self.features)
75 @property
76 def groupwise_computation(self: Extractor) -> bool:
77 """Determines if you can call the extractor per group for parallelisation."""
78 if self._groupwise_computation is None:
79 extractor_help = subprocess.run(
80 [self.directory / Extractor.wrapper, "-h"], capture_output=True
81 )
82 # Not the cleanest / most precise way to determine this
83 self._groupwise_computation = (
84 "-feature_group" in extractor_help.stdout.decode()
85 )
86 return self._groupwise_computation
88 def build_cmd(
89 self: Extractor,
90 instance: Path | list[Path],
91 feature_group: str = None,
92 output_file: Path = None,
93 cutoff_time: int = None,
94 log_dir: Path = None,
95 ) -> list[str]:
96 """Builds a command line string seperated by space.
98 Args:
99 instance: The instance to run on
100 feature_group: The optional feature group to run the extractor for.
101 output_file: Optional file to write the output to.
102 runsolver_args: The arguments for runsolver. If not present,
103 will run the extractor without runsolver.
104 cutoff_time: The maximum runtime.
105 log_dir: Directory path for logs.
107 Returns:
108 The command seperated per item in the list.
109 """
110 cmd_list_extractor = []
111 if not isinstance(instance, list):
112 instance = [instance]
113 cmd_list_extractor = [
114 f"{self.directory / Extractor.wrapper}",
115 "-extractor_dir",
116 f"{self.directory}/",
117 "-instance_file",
118 ] + [str(file) for file in instance]
119 if feature_group is not None:
120 cmd_list_extractor += ["-feature_group", feature_group]
121 if output_file is not None:
122 cmd_list_extractor += ["-output_file", str(output_file)]
123 if cutoff_time is not None:
124 # Extractor handles output file itself
125 return RunSolver.wrap_command(
126 self.runsolver_exec,
127 cmd_list_extractor,
128 cutoff_time,
129 log_dir,
130 log_name_base=self.name,
131 raw_results_file=False,
132 )
133 return cmd_list_extractor
135 def run(
136 self: Extractor,
137 instance: Path | list[Path],
138 feature_group: str = None,
139 output_file: Path = None,
140 cutoff_time: int = None,
141 log_dir: Path = None,
142 ) -> list[list[Any]] | list[Any] | None:
143 """Runs an extractor job with Runrunner.
145 Args:
146 extractor_path: Path to the executable
147 instance: Path to the instance to run on
148 feature_group: The feature group to compute. Must be supported by the
149 extractor to use.
150 output_file: Target output. If None, piped to the RunRunner job.
151 cutoff_time: CPU cutoff time in seconds
152 log_dir: Directory to write logs. Defaults to CWD.
154 Returns:
155 The features or None if an output file is used, or features can not be found.
156 """
157 log_dir = Path() if log_dir is None else log_dir
158 if feature_group is not None and not self.groupwise_computation:
159 # This extractor cannot handle groups, compute all features
160 feature_group = None
161 cmd_extractor = self.build_cmd(
162 instance, feature_group, output_file, cutoff_time, log_dir
163 )
164 run_on = Runner.LOCAL # TODO: Let this function also handle Slurm runs
165 extractor_run = rrr.add_to_queue(runner=run_on, cmd=" ".join(cmd_extractor))
166 if isinstance(extractor_run, LocalRun):
167 extractor_run.wait()
168 if extractor_run.status == Status.ERROR:
169 print(f"{self.name} failed to compute features for {instance}.")
170 for i, job in enumerate(extractor_run.jobs):
171 print(
172 f"Job {i} error yielded was:\n"
173 f"\t-stdout: '{job.stdout}'\n"
174 f"\t-stderr: '{job.stderr}'\n"
175 )
176 return None
177 # RunRunner adds a timestamp before the statement
178 import re
180 pattern = re.compile(
181 r"^(?P<timestamp1>\d+\.\d+)\/(?P<timestamp2>\d+\.\d+)\s+(?P<output>.*)$"
182 )
183 output = []
184 for job in extractor_run.jobs:
185 match = pattern.match(job.stdout)
186 if match:
187 output.append(ast.literal_eval(match.group("output")))
188 if len(output) == 1:
189 return output[0]
190 return output
191 return None
193 def run_cli(
194 self: Extractor,
195 instance_set: InstanceSet | list[Path],
196 feature_dataframe: FeatureDataFrame,
197 cutoff_time: int,
198 feature_group: str = None,
199 run_on: Runner = Runner.SLURM,
200 sbatch_options: list[str] = None,
201 srun_options: list[str] = None,
202 parallel_jobs: int = None,
203 slurm_prepend: str | list[str] | Path = None,
204 dependencies: list[Run] = None,
205 log_dir: Path = None,
206 ) -> None:
207 """Run the Extractor CLI and write result to the FeatureDataFrame.
209 Args:
210 instance_set: The instance set to run the Extractor on.
211 feature_dataframe: The feature dataframe to write to.
212 cutoff_time: CPU cutoff time in seconds
213 feature_group: The feature group to compute. If left empty,
214 will run on all feature groups.
215 run_on: The runner to use.
216 sbatch_options: Additional options to pass to sbatch.
217 srun_options: Additional options to pass to srun.
218 parallel_jobs: Number of parallel jobs to run.
219 slurm_prepend: Slurm script to prepend to the sbatch
220 dependencies: List of dependencies to add to the job.
221 log_dir: The directory to write logs to.
222 """
223 instances = (
224 instance_set
225 if isinstance(instance_set, list)
226 else instance_set.instance_paths
227 )
228 log_dir = Path() if log_dir is None else log_dir
229 feature_group = f"--feature-group {feature_group} " if feature_group else ""
230 commands = [
231 f"python3 {Extractor.extractor_cli} "
232 f"--extractor {self.directory} "
233 f"--instance {instance_path} "
234 f"--feature-csv {feature_dataframe.csv_filepath} "
235 f"{feature_group}"
236 f"--cutoff {cutoff_time} "
237 f"--log-dir {log_dir}"
238 for instance_path in instances
239 ]
241 job_name = f"Run Extractor: {self.name} on {len(instances)} instances"
242 import subprocess
244 run = rrr.add_to_queue(
245 runner=run_on,
246 cmd=commands,
247 name=job_name,
248 stdout=None if run_on == Runner.LOCAL else subprocess.PIPE, # Print
249 stderr=None if run_on == Runner.LOCAL else subprocess.PIPE, # Print
250 base_dir=log_dir,
251 sbatch_options=sbatch_options,
252 srun_options=srun_options,
253 parallel_jobs=parallel_jobs,
254 prepend=slurm_prepend,
255 dependencies=dependencies,
256 )
257 if isinstance(run, LocalRun):
258 print("Waiting for the local calculations to finish.")
259 run.wait()
260 for job in run.jobs:
261 jobs_done = sum(j.status == Status.COMPLETED for j in run.jobs)
262 print(f"Executing Progress: {jobs_done} out of {len(run.jobs)}")
263 if jobs_done == len(run.jobs):
264 break
265 job.wait()
266 print("Computing features done!")
267 else:
268 print(f"Running {self.name} through Slurm with Job IDs: {run.run_id}")
269 return run
271 def get_feature_vector(
272 self: Extractor, result: Path, runsolver_values: Path = None
273 ) -> list[str]:
274 """Extracts feature vector from an output file.
276 Args:
277 result: The raw output of the extractor
278 runsolver_values: The output of runsolver.
280 Returns:
281 A list of features. Vector of missing values upon failure.
282 """
283 if (
284 result.exists()
285 and RunSolver.get_status(runsolver_values, None) != SolverStatus.TIMEOUT
286 ):
287 feature_values = ast.literal_eval(result.read_text())
288 return [str(value) for _, _, value in feature_values]
289 return [FeatureDataFrame.missing_value] * self.output_dimension