Coverage for sparkle/selector/selector.py: 89%
143 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-07-01 13:21 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-07-01 13:21 +0000
1"""File to handle a Selector for selecting Solvers."""
2from __future__ import annotations
3from pathlib import Path
5from sklearn.base import ClassifierMixin, RegressorMixin
6from asf.cli import cli_train as asf_cli
7from asf.scenario.scenario_metadata import ScenarioMetadata
8from asf.predictors import AbstractPredictor
9from asf.selectors.abstract_model_based_selector import AbstractModelBasedSelector
11import runrunner as rrr
12from runrunner import Runner, Run
13import pandas as pd
15from sparkle.types import SparkleObjective, resolve_objective
16from sparkle.structures import FeatureDataFrame, PerformanceDataFrame
19class Selector:
20 """The Selector class for handling Algorithm Selection."""
22 def __init__(
23 self: Selector,
24 selector_class: AbstractModelBasedSelector,
25 model_class: AbstractPredictor | ClassifierMixin | RegressorMixin) -> None:
26 """Initialize the Selector object.
28 Args:
29 selector_class: The Selector class to construct.
30 model_class: The model class the selector will use.
31 """
32 self.selector_class = selector_class
33 self.model_class = model_class
35 @property
36 def name(self: Selector) -> str:
37 """Return the name of the selector."""
38 return f"{self.selector_class.__name__}_{self.model_class.__name__}"
40 def construct(self: Selector,
41 selection_scenario: SelectionScenario,
42 run_on: Runner = Runner.SLURM,
43 job_name: str = None,
44 sbatch_options: list[str] = None,
45 slurm_prepend: str | list[str] | Path = None,
46 base_dir: Path = Path()) -> Run:
47 """Construct the Selector.
49 Args:
50 selector_scenario: The scenario to construct the Selector for.
51 run_on: Which runner to use. Defaults to slurm.
52 job_name: Name to give the construction job when submitting.
53 sbatch_options: Additional options to pass to sbatch.
54 slurm_prepend: Slurm script to prepend to the sbatch
55 base_dir: The base directory to run the Selector in.
57 Returns:
58 The construction Run
59 """
60 selection_scenario.create_scenario()
61 selector = self.selector_class(
62 self.model_class, ScenarioMetadata(
63 algorithms=selection_scenario.performance_data.columns.to_list(),
64 features=selection_scenario.feature_data.columns.to_list(),
65 performance_metric=selection_scenario.objective.name,
66 maximize=not selection_scenario.objective.minimise,
67 budget=selection_scenario.solver_cutoff
68 )
69 )
70 cmd = asf_cli.build_cli_command(selector,
71 selection_scenario.feature_target_path,
72 selection_scenario.performance_target_path,
73 selection_scenario.selector_file_path)
74 cmd = [" ".join([str(c) for c in cmd])]
76 job_name = job_name or f"Selector Construction: {selection_scenario.name}"
77 construct = rrr.add_to_queue(
78 runner=run_on,
79 cmd=cmd,
80 name=job_name,
81 base_dir=base_dir,
82 sbatch_options=sbatch_options,
83 prepend=slurm_prepend)
85 if run_on == Runner.LOCAL:
86 construct.wait()
87 if not selection_scenario.selector_file_path.is_file():
88 print(f"Selector construction of {self.name} failed!")
89 return construct
91 def run(self: Selector,
92 selector_path: Path,
93 instance: str,
94 feature_data: FeatureDataFrame) -> list:
95 """Run the Selector, returning the prediction schedule upon success."""
96 instance_features = feature_data.dataframe[[instance, ]]
97 instance_features.index = instance_features.index.map("_".join) # Reduce
98 instance_features = instance_features.T # ASF dataframe structure
99 selector = self.selector_class.load(selector_path)
100 schedule = selector.predict(instance_features)
101 if schedule is None:
102 print(f"ERROR: Selector {self.name} failed predict schedule!")
103 return None
104 # ASF presents result as schedule per instance, we only use one in this setting
105 schedule = schedule[instance]
106 for index, (solver, time) in enumerate(schedule):
107 # Split solver name back into solver and config id
108 solver_name, conf_index = solver.split("_", maxsplit=1)
109 schedule[index] = (solver_name, conf_index, time)
110 return schedule
113class SelectionScenario:
114 """A scenario for a Selector."""
116 __selector_solver_name__ = "portfolio_selector"
118 def __init__(self: SelectionScenario,
119 parent_directory: Path,
120 selector: Selector,
121 objective: SparkleObjective,
122 performance_data: PerformanceDataFrame | Path,
123 feature_data: FeatureDataFrame | Path,
124 feature_extractors: list[str] = None,
125 solver_cutoff: int | float = None,
126 extractor_cutoff: int | float = None,
127 ablate: bool = False,
128 subdir_path: Path = None
129 ) -> None:
130 """Initialize a scenario for a selector."""
131 self.selector: Selector = selector
132 self.objective: SparkleObjective = objective
133 self.solver_cutoff: float = solver_cutoff
134 self.extractor_cutoff: float = extractor_cutoff
135 if subdir_path is not None:
136 self.directory = parent_directory / subdir_path
137 elif isinstance(performance_data, PerformanceDataFrame):
138 self.directory: Path =\
139 parent_directory / selector.name / "_".join(
140 [Path(s).name for s in performance_data.solvers])
141 else:
142 self.directory = performance_data.parent
143 self.name = f"{selector.name} on {self.directory.name}"
144 self.selector_file_path: Path = self.directory / "portfolio_selector"
145 self.scenario_file: Path = self.directory / "scenario.txt"
146 self.selector_performance_path: Path =\
147 self.directory / "selector_performance.csv"
148 if self.selector_performance_path.exists():
149 self.selector_performance_data = PerformanceDataFrame(
150 self.selector_performance_path)
151 else: # Create new performance data frame for selector, write to file later
152 self.selector_performance_data = performance_data.clone()
153 self.selector_performance_data.add_solver(
154 SelectionScenario.__selector_solver_name__)
156 if isinstance(performance_data, PerformanceDataFrame): # Convert
157 # Convert the dataframes to Selector Format
158 new_column_names: list[str] = []
159 for solver, config_id, _ in performance_data.columns:
160 if f"{solver}_{config_id}" not in new_column_names:
161 new_column_names.append(f"{solver}_{config_id}")
162 self.performance_data = performance_data.drop(
163 [PerformanceDataFrame.column_seed],
164 axis=1, level=2)
165 self.performance_data = self.performance_data.droplevel([
166 PerformanceDataFrame.column_configuration,
167 PerformanceDataFrame.column_meta], axis=1)
168 self.performance_data = self.performance_data.droplevel(
169 PerformanceDataFrame.index_objective, axis=0)
170 self.performance_data.columns = new_column_names
171 # Requires instances as index for both, columns as features / solvers
172 # TODO: This should be an aggregation instead?
173 self.performance_data.index = self.performance_data.index.droplevel("Run")
174 # Enforce data type to be numeric
175 self.performance_data = self.performance_data.astype(float)
176 self.performance_target_path = self.directory / "performance_data.csv"
177 else: # Read from Path
178 self.performance_data: pd.DataFrame = pd.read_csv(performance_data,
179 index_col=0)
180 self.performance_target_path: Path = performance_data
182 if isinstance(feature_data, FeatureDataFrame): # Convert
183 self.feature_extractors = feature_data.extractors
184 # Features requires instances as index, columns as feature names
185 feature_target = feature_data.dataframe.copy()
186 feature_target.index = feature_target.index.map("_".join) # Reduce Index
187 # ASF -> feature columns, instance rows
188 self.feature_data: pd.DataFrame = feature_target.T.astype(float)
189 self.feature_target_path: Path = self.directory / "feature_data.csv"
190 else: # Read from Path
191 self.feature_extractors = feature_extractors
192 self.feature_data: pd.DataFrame = pd.read_csv(feature_data)
193 self.feature_target_path: Path = feature_data
195 self.ablation_scenarios: list[SelectionScenario] = []
196 if ablate and len(self.performance_data.columns) > 2:
197 for solver in self.performance_data.columns:
198 solver_key, conf_id = solver.split("_", maxsplit=1)
199 ablate_subdir = Path(f"ablated_{Path(solver).name}")
200 ablated_directory = self.directory / ablate_subdir
201 if (ablated_directory / "performance_data.csv").exists():
202 ablated_pd = ablated_directory / "performance_data.csv"
203 elif isinstance(performance_data, PerformanceDataFrame):
204 ablated_pd = performance_data.clone()
205 ablated_pd.remove_configuration(solver_key, conf_id)
206 else: # Note we could do this but it would be hacky?
207 raise ValueError("Cannot ablate scenario after loading from file! "
208 "Requires original PerformanceDataFrame.")
210 self.ablation_scenarios.append(SelectionScenario(
211 parent_directory=self.directory,
212 selector=selector,
213 objective=objective,
214 performance_data=ablated_pd,
215 feature_data=feature_data,
216 solver_cutoff=solver_cutoff,
217 ablate=False, # If we set to true here, recursion would happen
218 subdir_path=ablate_subdir)
219 )
221 @property
222 def training_instances(self: SelectionScenario) -> list[str]:
223 """Get the training instances."""
224 return self.performance_data.index.to_list()
226 @property
227 def test_instances(self: SelectionScenario) -> list[str]:
228 """Get the test instances."""
229 instances = self.selector_performance_data.instances
230 return [i for i in instances if i not in self.training_instances]
232 @property
233 def training_instance_sets(self: SelectionScenario) -> list[str]:
234 """Get the training instance sets."""
235 return list(set(Path(i).parent.name for i in self.training_instances))
237 @property
238 def test_instance_sets(self: SelectionScenario) -> list[str]:
239 """Get the test instance sets."""
240 return list(set(Path(i).parent.name for i in self.test_instances))
242 @property
243 def instance_sets(self: SelectionScenario) -> list[str]:
244 """Get all the instance sets used in this scenario."""
245 return list(set(Path(i).parent.name
246 for i in self.selector_performance_data.instances))
248 @property
249 def solvers(self: SelectionScenario) -> list[str]:
250 """Get the solvers used for the selector."""
251 return self.performance_data.columns.to_list()
253 def create_scenario(self: SelectionScenario) -> None:
254 """Prepare the scenario directories."""
255 self.directory.mkdir(parents=True, exist_ok=True)
256 self.performance_data.to_csv(self.performance_target_path)
257 self.feature_data.to_csv(self.feature_target_path)
258 self.selector_performance_data.save_csv(self.selector_performance_path)
259 self.create_scenario_file()
261 def create_scenario_file(self: SelectionScenario) -> None:
262 """Create the scenario file.
264 Write the scenario to file.
265 """
266 with self.scenario_file.open("w") as fout:
267 fout.write(self.serialise())
269 def serialise(self: SelectionScenario) -> dict:
270 """Serialize the scenario."""
271 return f"selector: {self.selector.name}\n"\
272 f"solver_cutoff: {self.solver_cutoff}\n"\
273 f"extractor_cutoff: {self.extractor_cutoff}\n"\
274 f"ablate: {self.ablation_scenarios is not None}\n"\
275 f"objective: {self.objective}\n"\
276 f"selector_performance_data: {self.selector_performance_path}\n"\
277 f"performance_data: {self.performance_target_path}\n"\
278 f"feature_data: {self.feature_target_path}\n"\
279 f"feature_extractors: {','.join(self.feature_extractors)}\n"
281 @staticmethod
282 def from_file(scenario_file: Path) -> SelectionScenario:
283 """Reads scenario file and initalises SelectorScenario."""
284 if not scenario_file.is_file() and (scenario_file / "scenario.txt").is_file():
285 scenario_file = scenario_file / "scenario.txt" # Resolve from directory
286 values = {key: value.strip() for key, value in
287 [line.split(": ", maxsplit=1) for line in scenario_file.open()]}
288 selector_class, selector_model = values["selector"].split("_", maxsplit=1)
289 # Evaluate string to class
290 from sklearn import ensemble
291 from asf import selectors
292 selector_class = getattr(selectors, selector_class)
293 selector_model = getattr(ensemble, selector_model)
294 selector = Selector(selector_class, selector_model)
295 return SelectionScenario(
296 parent_directory=scenario_file.parent,
297 selector=selector,
298 objective=resolve_objective(values["objective"]),
299 performance_data=Path(values["performance_data"]),
300 feature_data=Path(values["feature_data"]),
301 feature_extractors=values["feature_extractors"].split(","),
302 solver_cutoff=float(values["solver_cutoff"]),
303 ablate=bool(values["ablate"]))