Coverage for sparkle/selector/extractor.py: 55%
78 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-07-01 13:21 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-07-01 13:21 +0000
1"""Methods regarding feature extractors."""
2from __future__ import annotations
3from typing import Any
4from pathlib import Path
5import ast
6import subprocess
8import runrunner as rrr
9from runrunner.base import Status, Runner
10from runrunner.local import LocalRun
12from sparkle.types import SparkleCallable, SolverStatus
13from sparkle.structures import FeatureDataFrame
14from sparkle.tools import RunSolver
17class Extractor(SparkleCallable):
18 """Extractor base class for extracting features from instances."""
19 wrapper = "sparkle_extractor_wrapper.py"
21 def __init__(self: Extractor,
22 directory: Path,
23 runsolver_exec: Path = None) -> None:
24 """Initialize solver.
26 Args:
27 directory: Directory of the solver.
28 runsolver_exec: Path to the runsolver executable.
29 By default, runsolver in directory.
30 """
31 super().__init__(directory, runsolver_exec)
32 self._features = None
33 self._feature_groups = None
34 self._groupwise_computation = None
36 def __str__(self: Extractor) -> str:
37 """Return the string representation of the extractor."""
38 return self.name
40 def __repr__(self: Extractor) -> str:
41 """Return detailed representation of the extractor."""
42 return f"{self.name}:\n"\
43 f"\t- Directory: {self.directory}\n"\
44 f"\t- Wrapper: {self.wrapper}\n"\
45 f"\t- # Feature Groups: {len(self.feature_groups)}\n"\
46 f"\t- Output Dimension (# Features): {self.output_dimension}\n"\
47 f"\t- Groupwise Computation Enabled: {self.groupwise_computation}"
49 @property
50 def features(self: Extractor) -> list[tuple[str, str]]:
51 """Determines the features of the extractor."""
52 if self._features is None:
53 extractor_process = subprocess.run(
54 [self.directory / Extractor.wrapper, "-features"], capture_output=True)
55 self._features = ast.literal_eval(extractor_process.stdout.decode())
56 return self._features
58 @property
59 def feature_groups(self: Extractor) -> list[str]:
60 """Returns the various feature groups the Extractor has."""
61 if self._feature_groups is None:
62 self._feature_groups = list(set([group for group, _ in self.features]))
63 return self._feature_groups
65 @property
66 def output_dimension(self: Extractor) -> int:
67 """The size of the output vector of the extractor."""
68 return len(self.features)
70 @property
71 def groupwise_computation(self: Extractor) -> bool:
72 """Determines if you can call the extractor per group for parallelisation."""
73 if self._groupwise_computation is None:
74 extractor_help = subprocess.run([self.directory / Extractor.wrapper, "-h"],
75 capture_output=True)
76 # Not the cleanest / most precise way to determine this
77 self._groupwise_computation =\
78 "-feature_group" in extractor_help.stdout.decode()
79 return self._groupwise_computation
81 def build_cmd(self: Extractor,
82 instance: Path | list[Path],
83 feature_group: str = None,
84 output_file: Path = None,
85 cutoff_time: int = None,
86 log_dir: Path = None,
87 ) -> list[str]:
88 """Builds a command line string seperated by space.
90 Args:
91 instance: The instance to run on
92 feature_group: The optional feature group to run the extractor for.
93 outputfile: Optional file to write the output to.
94 runsolver_args: The arguments for runsolver. If not present,
95 will run the extractor without runsolver.
97 Returns:
98 The command seperated per item in the list.
99 """
100 cmd_list_extractor = []
101 if not isinstance(instance, list):
102 instance = [instance]
103 cmd_list_extractor = [f"{self.directory / Extractor.wrapper}",
104 "-extractor_dir", f"{self.directory}/",
105 "-instance_file"] + [str(file) for file in instance]
106 if feature_group is not None:
107 cmd_list_extractor += ["-feature_group", feature_group]
108 if output_file is not None:
109 cmd_list_extractor += ["-output_file", str(output_file)]
110 if cutoff_time is not None:
111 # Extractor handles output file itself
112 return RunSolver.wrap_command(self.runsolver_exec,
113 cmd_list_extractor,
114 cutoff_time,
115 log_dir,
116 log_name_base=self.name,
117 raw_results_file=False)
118 return cmd_list_extractor
120 def run(self: Extractor,
121 instance: Path | list[Path],
122 feature_group: str = None,
123 output_file: Path = None,
124 cutoff_time: int = None,
125 log_dir: Path = None) -> list[list[Any]] | list[Any] | None:
126 """Runs an extractor job with Runrunner.
128 Args:
129 extractor_path: Path to the executable
130 instance: Path to the instance to run on
131 feature_group: The feature group to compute. Must be supported by the
132 extractor to use.
133 output_file: Target output. If None, piped to the RunRunner job.
134 cutoff_time: CPU cutoff time in seconds
135 log_dir: Directory to write logs. Defaults to CWD.
137 Returns:
138 The features or None if an output file is used, or features can not be found.
139 """
140 log_dir = Path() if log_dir is None else log_dir
141 if feature_group is not None and not self.groupwise_computation:
142 # This extractor cannot handle groups, compute all features
143 feature_group = None
144 cmd_extractor = self.build_cmd(
145 instance, feature_group, output_file, cutoff_time, log_dir)
146 run_on = Runner.LOCAL # TODO: Let this function also handle Slurm runs
147 extractor_run = rrr.add_to_queue(runner=run_on,
148 cmd=" ".join(cmd_extractor))
149 if isinstance(extractor_run, LocalRun):
150 extractor_run.wait()
151 if extractor_run.status == Status.ERROR:
152 print(f"{self.name} failed to compute features for {instance}.")
153 for i, job in enumerate(extractor_run.jobs):
154 print(f"Job {i} error yielded was:\n"
155 f"\t-stdout: '{job.stdout}'\n"
156 f"\t-stderr: '{job.stderr}'\n")
157 return None
158 # RunRunner adds a stamp before the statement
159 output = [ast.literal_eval(job.stdout.split("\t", maxsplit=1)[-1])
160 for job in extractor_run.jobs]
161 if len(output) == 1:
162 return output[0]
163 return output
164 return None
166 def get_feature_vector(self: Extractor,
167 result: Path,
168 runsolver_values: Path = None) -> list[str]:
169 """Extracts feature vector from an output file.
171 Args:
172 result: The raw output of the extractor
173 runsolver_values: The output of runsolver.
175 Returns:
176 A list of features. Vector of missing values upon failure.
177 """
178 if result.exists() and RunSolver.get_status(runsolver_values,
179 None) != SolverStatus.TIMEOUT:
180 feature_values = ast.literal_eval(result.read_text())
181 return [str(value) for _, _, value in feature_values]
182 return [FeatureDataFrame.missing_value] * self.output_dimension