Coverage for sparkle/solver/extractor.py: 52%
66 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-07 15:22 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-07 15:22 +0000
1"""Methods regarding feature extractors."""
2from __future__ import annotations
3from pathlib import Path
4import ast
5import subprocess
6from sparkle.types import SparkleCallable, SolverStatus
7from sparkle.structures import FeatureDataFrame
8from sparkle.tools import RunSolver
11class Extractor(SparkleCallable):
12 """Extractor base class for extracting features from instances."""
13 wrapper = "sparkle_extractor_wrapper.py"
15 def __init__(self: Extractor,
16 directory: Path,
17 runsolver_exec: Path = None,
18 raw_output_directory: Path = None,
19 ) -> None:
20 """Initialize solver.
22 Args:
23 directory: Directory of the solver.
24 runsolver_exec: Path to the runsolver executable.
25 By default, runsolver in directory.
26 raw_output_directory: Directory where solver will write its raw output.
27 Defaults to directory / tmp
28 """
29 super().__init__(directory, runsolver_exec, raw_output_directory)
30 self._features = None
31 self._feature_groups = None
32 self._output_dimension = None
33 self._groupwise_computation = None
35 @property
36 def features(self: Extractor) -> list[tuple[str, str]]:
37 """Determines the features of the extractor."""
38 if self._features is None:
39 extractor_process = subprocess.run(
40 [self.directory / Extractor.wrapper, "-features"], capture_output=True)
41 self._features = ast.literal_eval(extractor_process.stdout.decode())
42 return self._features
44 @property
45 def feature_groups(self: Extractor) -> list[str]:
46 """Returns the various feature groups the Extractor has."""
47 if self._feature_groups is None:
48 self._feature_groups = list(set([group for group, _ in self.features]))
49 return self._feature_groups
51 @property
52 def output_dimension(self: Extractor) -> int:
53 """The size of the output vector of the extractor."""
54 return len(self.features)
56 @property
57 def groupwise_computation(self: Extractor) -> bool:
58 """Determines if you can call the extractor per group for parallelisation."""
59 if self._groupwise_computation is None:
60 extractor_help = subprocess.run([self.directory / Extractor.wrapper, "-h"],
61 capture_output=True)
62 # Not the cleanest / most precise way to determine this
63 self._groupwise_computation =\
64 "-feature_group" in extractor_help.stdout.decode()
65 return self._groupwise_computation
67 def build_cmd(self: Extractor,
68 instance: Path | list[Path],
69 feature_group: str = None,
70 output_file: Path = None,
71 cutoff_time: int = None,
72 log_dir: Path = None,
73 ) -> list[str]:
74 """Builds a command line string seperated by space.
76 Args:
77 instance: The instance to run on
78 feature_group: The optional feature group to run the extractor for.
79 outputfile: Optional file to write the output to.
80 runsolver_args: The arguments for runsolver. If not present,
81 will run the extractor without runsolver.
83 Returns:
84 The command seperated per item in the list.
85 """
86 cmd_list_extractor = []
87 if not isinstance(instance, list):
88 instance = [instance]
89 cmd_list_extractor = [f"{self.directory / Extractor.wrapper}",
90 "-extractor_dir", f"{self.directory}/",
91 "-instance_file"] + [str(file) for file in instance]
92 if feature_group is not None:
93 cmd_list_extractor += ["-feature_group", feature_group]
94 if output_file is not None:
95 cmd_list_extractor += ["-output_file", str(output_file)]
96 if cutoff_time is not None:
97 # Extractor handles output file itself
98 return RunSolver.wrap_command(self.runsolver_exec,
99 cmd_list_extractor,
100 cutoff_time,
101 log_dir,
102 log_name_base=self.name,
103 raw_results_file=False)
104 return cmd_list_extractor
106 def run(self: Extractor,
107 instance: Path | list[Path],
108 feature_group: str = None,
109 output_file: Path = None,
110 cutoff_time: int = None,
111 log_dir: Path = None) -> list | None:
112 """Runs an extractor job with Runrunner.
114 Args:
115 extractor_path: Path to the executable
116 instance: Path to the instance to run on
117 feature_group: The feature group to compute. Must be supported by the
118 extractor to use.
119 output_file: Target output. If None, piped to the RunRunner job.
120 cutoff_time: CPU cutoff time in seconds
121 log_dir: Directory to write logs. Defaults to self.raw_output_directory.
123 Returns:
124 The features or None if an output file is used, or features can not be found.
125 """
126 if log_dir is None:
127 log_dir = self.raw_output_directory
128 if feature_group is not None and not self.groupwise_computation:
129 # This extractor cannot handle groups, compute all features
130 feature_group = None
131 cmd_extractor = self.build_cmd(
132 instance, feature_group, output_file, cutoff_time, log_dir)
133 extractor = subprocess.run(cmd_extractor, capture_output=True)
134 if output_file is None:
135 try:
136 features = ast.literal_eval(
137 extractor.stdout.decode().split(maxsplit=1)[1])
138 return features
139 except Exception:
140 return None
141 return None
143 def get_feature_vector(self: Extractor,
144 result: Path,
145 runsolver_values: Path = None) -> list[str]:
146 """Extracts feature vector from an output file.
148 Args:
149 result: The raw output of the extractor
150 runsolver_values: The output of runsolver.
152 Returns:
153 A list of features. Vector of missing values upon failure.
154 """
155 if result.exists() and RunSolver.get_status(runsolver_values,
156 None) != SolverStatus.TIMEOUT:
157 feature_values = ast.literal_eval(result.read_text())
158 return [str(value) for _, _, value in feature_values]
159 return [FeatureDataFrame.missing_value] * self.output_dimension