Coverage for src / sparkle / selector / selector.py: 82%
155 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 15:31 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 15:31 +0000
1"""File to handle a Selector for selecting Solvers."""
3from __future__ import annotations
4import random
5from pathlib import Path
8from sklearn.base import ClassifierMixin, RegressorMixin
9from asf.cli import cli_train as asf_cli
10from asf.predictors import AbstractPredictor
11from asf.selectors.abstract_model_based_selector import AbstractModelBasedSelector
14import runrunner as rrr
15from runrunner import Runner, Run
16import pandas as pd
18from sparkle.types import SparkleObjective, resolve_objective
19from sparkle.structures import FeatureDataFrame, PerformanceDataFrame
20from sparkle.instance import InstanceSet
23class Selector:
24 """The Selector class for handling Algorithm Selection."""
26 selector_cli = Path(__file__).parent / "selector_cli.py"
28 def __init__(
29 self: Selector,
30 selector_class: AbstractModelBasedSelector,
31 model_class: AbstractPredictor | ClassifierMixin | RegressorMixin,
32 ) -> None:
33 """Initialize the Selector object.
35 Args:
36 selector_class: The (name of) Selector class to construct.
37 model_class: The (name of) model class the selector will use.
38 """
39 if isinstance(selector_class, str): # Resolve class name
40 from asf import selectors
42 selector_class = getattr(selectors, selector_class)
43 if isinstance(model_class, str): # Resolve class name
44 from sklearn import ensemble
46 model_class = getattr(ensemble, model_class)
48 self.selector_class = selector_class
49 self.model_class = model_class
51 @property
52 def name(self: Selector) -> str:
53 """Return the name of the selector."""
54 return f"{self.selector_class.__name__}_{self.model_class.__name__}"
56 def construct(
57 self: Selector,
58 selection_scenario: SelectionScenario,
59 run_on: Runner = Runner.SLURM,
60 job_name: str = None,
61 sbatch_options: list[str] = None,
62 slurm_prepend: str | list[str] | Path = None,
63 base_dir: Path = Path(),
64 ) -> Run:
65 """Construct the Selector.
67 Args:
68 selection_scenario: The scenario to construct the Selector for.
69 run_on: Which runner to use. Defaults to slurm.
70 job_name: Name to give the construction job when submitting.
71 sbatch_options: Additional options to pass to sbatch.
72 slurm_prepend: Slurm script to prepend to the sbatch
73 base_dir: The base directory to run the Selector in.
75 Returns:
76 The construction Run
77 """
78 selection_scenario.create_scenario()
79 selector = self.selector_class(
80 model_class=self.model_class,
81 budget=selection_scenario.solver_cutoff,
82 maximize=not selection_scenario.objective.minimise,
83 )
84 cmd = asf_cli.build_cli_command(
85 selector,
86 selection_scenario.feature_target_path,
87 selection_scenario.performance_target_path,
88 selection_scenario.selector_file_path,
89 )
90 cmd = [" ".join([str(c) for c in cmd])]
92 job_name = job_name or f"Selector Construction {selection_scenario.name}"
93 construct = rrr.add_to_queue(
94 runner=run_on,
95 cmd=cmd,
96 name=job_name,
97 base_dir=base_dir,
98 sbatch_options=sbatch_options,
99 prepend=slurm_prepend,
100 )
102 if run_on == Runner.LOCAL:
103 construct.wait()
104 if not selection_scenario.selector_file_path.is_file():
105 print(f"Selector construction of {self.name} failed!")
106 return construct
108 def run(
109 self: Selector,
110 selector_path: Path,
111 instance: str,
112 feature_data: FeatureDataFrame,
113 ) -> list:
114 """Run the Selector, returning the prediction schedule upon success."""
115 instance_features = feature_data.get_instance(instance, as_dataframe=True)
116 # instance_features = feature_data[
117 # [
118 # instance,
119 # ]
120 # ]
121 # instance_features.columns = instance_features.columns.map("_".join) # Reduce columns multi index
122 selector = self.selector_class.load(selector_path)
123 schedule = selector.predict(instance_features)
124 if schedule is None:
125 print(f"ERROR: Selector {self.name} failed predict schedule!")
126 return None
127 # ASF presents result as schedule per instance, we only use one in this setting
128 schedule = schedule[instance]
129 for index, (solver, time) in enumerate(schedule):
130 # Split solver name back into solver and config id
131 # NOTE: There is an issue with this incase the Solver name has an "_" in its name... We need to change the delimiter to different character(s)
132 solver_name, conf_index = solver.split("_", maxsplit=1)
133 schedule[index] = (solver_name, conf_index, time)
134 return schedule
136 def run_cli(
137 self: Selector,
138 scenario_path: Path,
139 instance_set: InstanceSet | list[Path],
140 feature_data: Path,
141 run_on: Runner = Runner.LOCAL,
142 sbatch_options: list[str] = None,
143 slurm_prepend: str | list[str] | Path = None,
144 job_name: str = None,
145 dependencies: list[Run] = None,
146 log_dir: Path = None,
147 ) -> Run:
148 """Run the Selector CLI and write result to the Scenario PerformanceDataFrame.
150 Args:
151 scenario_path: The path to the scenario with the Selector to run.
152 instance_set: The instance set to run the Selector on.
153 feature_data: The instance feature data to use.
154 run_on: Which runner to use. Defaults to slurm.
155 sbatch_options: Additional options to pass to sbatch.
156 slurm_prepend: Slurm script to prepend to the sbatch
157 job_name: Name to give the Slurm job when submitting.
158 dependencies: List of dependencies to add to the job.
159 log_dir: The directory to write logs to.
161 Returns:
162 The Run object.
163 """
164 # NOTE: The selector object and the scenario selector could differ which could
165 # cause unintended behaviour (e.g. running a different selector than desired)
166 instances = (
167 instance_set
168 if isinstance(instance_set, list)
169 else instance_set.instance_paths
170 )
171 commands = [
172 f"python3 {Selector.selector_cli} "
173 f"--selector-scenario {scenario_path} "
174 f"--instance {instance_path} "
175 f"--feature-data {feature_data} "
176 f"--log-dir {log_dir} "
177 f"--seed {random.randint(0, 2**32 - 1)}"
178 for instance_path in instances
179 ]
181 job_name = (
182 f"Run Selector {self.name} on {len(instances)} instances"
183 if not job_name
184 else job_name
185 )
186 import subprocess
188 r = rrr.add_to_queue(
189 cmd=commands,
190 name=job_name,
191 stdout=None if run_on == Runner.LOCAL else subprocess.PIPE, # Print
192 stderr=None if run_on == Runner.LOCAL else subprocess.PIPE, # Print
193 base_dir=log_dir,
194 runner=run_on,
195 sbatch_options=sbatch_options,
196 prepend=slurm_prepend,
197 dependencies=dependencies,
198 )
199 if run_on == Runner.LOCAL:
200 r.wait()
201 return r
204class SelectionScenario:
205 """A scenario for a Selector."""
207 __selector_solver_name__ = "portfolio_selector"
209 def __init__(
210 self: SelectionScenario,
211 parent_directory: Path,
212 selector: Selector,
213 objective: SparkleObjective,
214 performance_data: PerformanceDataFrame | Path,
215 feature_data: FeatureDataFrame | Path,
216 feature_extractors: list[str] = None,
217 solver_cutoff: int | float = None,
218 extractor_cutoff: int | float = None,
219 ablate: bool = False,
220 subdir_path: Path = None,
221 ) -> None:
222 """Initialize a scenario for a selector."""
223 self.selector: Selector = selector
224 self.objective: SparkleObjective = objective
225 self.solver_cutoff: float = solver_cutoff
226 self.extractor_cutoff: float = extractor_cutoff
227 if subdir_path is not None:
228 self.directory = parent_directory / subdir_path
229 elif isinstance(performance_data, PerformanceDataFrame):
230 self.directory: Path = (
231 parent_directory
232 / selector.name
233 / "_".join([Path(s).name for s in performance_data.solvers])
234 )
235 else:
236 self.directory = performance_data.parent
237 self.name = f"{selector.name} on {self.directory.name}"
238 self.selector_file_path: Path = self.directory / "portfolio_selector"
239 self.scenario_file: Path = self.directory / "scenario.txt"
240 self.selector_performance_path: Path = (
241 self.directory / "selector_performance.csv"
242 )
243 if self.selector_performance_path.exists():
244 self.selector_performance_data = PerformanceDataFrame(
245 self.selector_performance_path
246 )
247 else: # Create new performance data frame for selector, write to file later
248 self.selector_performance_data = performance_data.clone()
249 self.selector_performance_data.add_solver(
250 SelectionScenario.__selector_solver_name__
251 )
253 if isinstance(performance_data, PerformanceDataFrame): # Convert
254 # Convert the dataframes to Selector Format
255 new_column_names: list[str] = []
256 for solver, config_id, _ in performance_data.columns:
257 if f"{solver}_{config_id}" not in new_column_names:
258 new_column_names.append(f"{solver}_{config_id}")
259 self.performance_data = performance_data.drop(
260 [PerformanceDataFrame.column_seed], axis=1, level=2
261 )
262 self.performance_data = self.performance_data.droplevel(
263 [
264 PerformanceDataFrame.column_configuration,
265 PerformanceDataFrame.column_meta,
266 ],
267 axis=1,
268 )
269 self.performance_data = self.performance_data.droplevel(
270 PerformanceDataFrame.index_objective, axis=0
271 )
272 self.performance_data.columns = new_column_names
273 # Requires instances as index for both, columns as features / solvers
274 # TODO: This should be an aggregation instead?
275 self.performance_data.index = self.performance_data.index.droplevel("Run")
276 # Enforce data type to be numeric
277 self.performance_data = self.performance_data.astype(float)
278 self.performance_target_path = self.directory / "performance_data.csv"
279 else: # Read from Path
280 self.performance_data: pd.DataFrame = pd.read_csv(
281 performance_data, index_col=0
282 )
283 self.performance_target_path: Path = performance_data
285 if isinstance(feature_data, FeatureDataFrame): # Convert
286 self.feature_extractors = feature_data.extractors
287 # Features requires instances as index, columns as feature names
288 feature_target = feature_data.copy()
289 feature_target.columns = feature_target.columns.map(
290 "_".join
291 ) # Reduce Column Multi Index to single
292 # ASF -> feature columns, instance rows
293 self.feature_data: pd.DataFrame = feature_target.astype(float)
294 self.feature_target_path: Path = self.directory / "feature_data.csv"
295 else: # Read from Path
296 self.feature_extractors = feature_extractors
297 self.feature_data: pd.DataFrame = pd.read_csv(feature_data)
298 self.feature_target_path: Path = feature_data
300 self.ablation_scenarios: list[SelectionScenario] = []
301 if ablate and len(self.performance_data.columns) > 2:
302 for solver in self.performance_data.columns:
303 solver_key, conf_id = solver.split("_", maxsplit=1)
304 ablate_subdir = Path(f"ablated_{Path(solver).name}")
305 ablated_directory = self.directory / ablate_subdir
306 if (ablated_directory / "performance_data.csv").exists():
307 ablated_pd = ablated_directory / "performance_data.csv"
308 elif isinstance(performance_data, PerformanceDataFrame):
309 ablated_pd = performance_data.clone()
310 ablated_pd.remove_configuration(solver_key, conf_id)
311 else: # Note we could do this but it would be hacky?
312 raise ValueError(
313 "Cannot ablate scenario after loading from file! "
314 "Requires original PerformanceDataFrame."
315 )
317 self.ablation_scenarios.append(
318 SelectionScenario(
319 parent_directory=self.directory,
320 selector=selector,
321 objective=objective,
322 performance_data=ablated_pd,
323 feature_data=feature_data,
324 solver_cutoff=solver_cutoff,
325 ablate=False, # If we set to true here, recursion would happen
326 subdir_path=ablate_subdir,
327 )
328 )
330 @property
331 def training_instances(self: SelectionScenario) -> list[str]:
332 """Get the training instances."""
333 return self.performance_data.index.to_list()
335 @property
336 def test_instances(self: SelectionScenario) -> list[str]:
337 """Get the test instances."""
338 instances = self.selector_performance_data.instances
339 return [i for i in instances if i not in self.training_instances]
341 @property
342 def training_instance_sets(self: SelectionScenario) -> list[str]:
343 """Get the training instance sets."""
344 # NOTE: This no longer works as instances no longer have their set in the name
345 return list(set(Path(i).parent.name for i in self.training_instances))
347 @property
348 def test_instance_sets(self: SelectionScenario) -> list[str]:
349 """Get the test instance sets."""
350 # NOTE: This no longer works as instances no longer have their set in the name
351 return list(set(Path(i).parent.name for i in self.test_instances))
353 @property
354 def instance_sets(self: SelectionScenario) -> list[str]:
355 """Get all the instance sets used in this scenario."""
356 return list(
357 set(Path(i).parent.name for i in self.selector_performance_data.instances)
358 )
360 @property
361 def solvers(self: SelectionScenario) -> list[str]:
362 """Get the solvers used for the selector."""
363 return self.performance_data.columns.to_list()
365 def create_scenario(self: SelectionScenario) -> None:
366 """Prepare the scenario directories."""
367 self.directory.mkdir(parents=True, exist_ok=True)
368 self.performance_data.to_csv(self.performance_target_path)
369 self.feature_data.to_csv(self.feature_target_path)
370 self.selector_performance_data.save_csv(self.selector_performance_path)
371 self.create_scenario_file()
373 def create_scenario_file(self: SelectionScenario) -> None:
374 """Create the scenario file.
376 Write the scenario to file.
377 """
378 with self.scenario_file.open("w") as fout:
379 fout.write(self.serialise())
381 def serialise(self: SelectionScenario) -> dict:
382 """Serialize the scenario."""
383 return (
384 f"selector: {self.selector.name}\n"
385 f"solver_cutoff: {self.solver_cutoff}\n"
386 f"extractor_cutoff: {self.extractor_cutoff}\n"
387 f"ablate: {len(self.ablation_scenarios) > 0}\n"
388 f"objective: {self.objective}\n"
389 f"selector_performance_data: {self.selector_performance_path}\n"
390 f"performance_data: {self.performance_target_path}\n"
391 f"feature_data: {self.feature_target_path}\n"
392 f"feature_extractors: {','.join(self.feature_extractors)}\n"
393 )
395 @staticmethod
396 def from_file(scenario_file: Path) -> SelectionScenario:
397 """Reads scenario file and initalises SelectorScenario."""
398 if not scenario_file.is_file() and (scenario_file / "scenario.txt").is_file():
399 scenario_file = scenario_file / "scenario.txt" # Resolve from directory
400 values = {
401 key: value.strip()
402 for key, value in [
403 line.split(": ", maxsplit=1) for line in scenario_file.open()
404 ]
405 }
406 selector_class, selector_model = values["selector"].split("_", maxsplit=1)
407 import ast
409 selector = Selector(selector_class, selector_model)
410 return SelectionScenario(
411 parent_directory=scenario_file.parent,
412 selector=selector,
413 objective=resolve_objective(values["objective"]),
414 performance_data=Path(values["performance_data"]),
415 feature_data=Path(values["feature_data"]),
416 feature_extractors=values["feature_extractors"].split(","),
417 solver_cutoff=float(values["solver_cutoff"]),
418 extractor_cutoff=float(values["extractor_cutoff"]),
419 ablate=ast.literal_eval(values["ablate"]),
420 )