Coverage for sparkle/selector/selector.py: 89%
158 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-29 10:17 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-29 10:17 +0000
1"""File to handle a Selector for selecting Solvers."""
3from __future__ import annotations
4import random
5from pathlib import Path
7from sklearn.base import ClassifierMixin, RegressorMixin
8from asf.cli import cli_train as asf_cli
9from asf.scenario.scenario_metadata import ScenarioMetadata
10from asf.predictors import AbstractPredictor
11from asf.selectors.abstract_model_based_selector import AbstractModelBasedSelector
13import runrunner as rrr
14from runrunner import Runner, Run
15import pandas as pd
17from sparkle.types import SparkleObjective, resolve_objective
18from sparkle.structures import FeatureDataFrame, PerformanceDataFrame
19from sparkle.instance import InstanceSet
22class Selector:
23 """The Selector class for handling Algorithm Selection."""
25 selector_cli = Path(__file__).parent / "selector_cli.py"
27 def __init__(
28 self: Selector,
29 selector_class: AbstractModelBasedSelector,
30 model_class: AbstractPredictor | ClassifierMixin | RegressorMixin,
31 ) -> None:
32 """Initialize the Selector object.
34 Args:
35 selector_class: The (name of) Selector class to construct.
36 model_class: The (name of) model class the selector will use.
37 """
38 if isinstance(selector_class, str): # Resolve class name
39 from asf import selectors
41 selector_class = getattr(selectors, selector_class)
42 if isinstance(model_class, str): # Resolve class name
43 from sklearn import ensemble
45 model_class = getattr(ensemble, model_class)
46 self.selector_class = selector_class
47 self.model_class = model_class
49 @property
50 def name(self: Selector) -> str:
51 """Return the name of the selector."""
52 return f"{self.selector_class.__name__}_{self.model_class.__name__}"
54 def construct(
55 self: Selector,
56 selection_scenario: SelectionScenario,
57 run_on: Runner = Runner.SLURM,
58 job_name: str = None,
59 sbatch_options: list[str] = None,
60 slurm_prepend: str | list[str] | Path = None,
61 base_dir: Path = Path(),
62 ) -> Run:
63 """Construct the Selector.
65 Args:
66 selection_scenario: The scenario to construct the Selector for.
67 run_on: Which runner to use. Defaults to slurm.
68 job_name: Name to give the construction job when submitting.
69 sbatch_options: Additional options to pass to sbatch.
70 slurm_prepend: Slurm script to prepend to the sbatch
71 base_dir: The base directory to run the Selector in.
73 Returns:
74 The construction Run
75 """
76 selection_scenario.create_scenario()
77 selector = self.selector_class(
78 self.model_class,
79 ScenarioMetadata(
80 algorithms=selection_scenario.performance_data.columns.to_list(),
81 features=selection_scenario.feature_data.columns.to_list(),
82 performance_metric=selection_scenario.objective.name,
83 maximize=not selection_scenario.objective.minimise,
84 budget=selection_scenario.solver_cutoff,
85 ),
86 )
87 cmd = asf_cli.build_cli_command(
88 selector,
89 selection_scenario.feature_target_path,
90 selection_scenario.performance_target_path,
91 selection_scenario.selector_file_path,
92 )
93 cmd = [" ".join([str(c) for c in cmd])]
95 job_name = job_name or f"Selector Construction: {selection_scenario.name}"
96 construct = rrr.add_to_queue(
97 runner=run_on,
98 cmd=cmd,
99 name=job_name,
100 base_dir=base_dir,
101 sbatch_options=sbatch_options,
102 prepend=slurm_prepend,
103 )
105 if run_on == Runner.LOCAL:
106 construct.wait()
107 if not selection_scenario.selector_file_path.is_file():
108 print(f"Selector construction of {self.name} failed!")
109 return construct
111 def run(
112 self: Selector,
113 selector_path: Path,
114 instance: str,
115 feature_data: FeatureDataFrame,
116 ) -> list:
117 """Run the Selector, returning the prediction schedule upon success."""
118 instance_features = feature_data[
119 [
120 instance,
121 ]
122 ]
123 instance_features.index = instance_features.index.map("_".join) # Reduce
124 instance_features = instance_features.T # ASF dataframe structure
125 selector = self.selector_class.load(selector_path)
126 schedule = selector.predict(instance_features)
127 if schedule is None:
128 print(f"ERROR: Selector {self.name} failed predict schedule!")
129 return None
130 # ASF presents result as schedule per instance, we only use one in this setting
131 schedule = schedule[instance]
132 for index, (solver, time) in enumerate(schedule):
133 # Split solver name back into solver and config id
134 solver_name, conf_index = solver.split("_", maxsplit=1)
135 schedule[index] = (solver_name, conf_index, time)
136 return schedule
138 def run_cli(
139 self: Selector,
140 scenario_path: Path,
141 instance_set: InstanceSet | list[Path],
142 feature_data: Path,
143 run_on: Runner = Runner.LOCAL,
144 sbatch_options: list[str] = None,
145 slurm_prepend: str | list[str] | Path = None,
146 job_name: str = None,
147 dependencies: list[Run] = None,
148 log_dir: Path = None,
149 ) -> Run:
150 """Run the Selector CLI and write result to the Scenario PerformanceDataFrame.
152 Args:
153 scenario_path: The path to the scenario with the Selector to run.
154 instance_set: The instance set to run the Selector on.
155 feature_data: The instance feature data to use.
156 run_on: Which runner to use. Defaults to slurm.
157 sbatch_options: Additional options to pass to sbatch.
158 slurm_prepend: Slurm script to prepend to the sbatch
159 job_name: Name to give the Slurm job when submitting.
160 dependencies: List of dependencies to add to the job.
161 log_dir: The directory to write logs to.
163 Returns:
164 The Run object.
165 """
166 # NOTE: The selector object and the scenario selector could differ which could
167 # cause unintended behaviour (e.g. running a different selector than desired)
168 instances = (
169 instance_set
170 if isinstance(instance_set, list)
171 else instance_set.instance_paths
172 )
173 commands = [
174 f"python3 {Selector.selector_cli} "
175 f"--selector-scenario {scenario_path} "
176 f"--instance {instance_path} "
177 f"--feature-data {feature_data} "
178 f"--log-dir {log_dir} "
179 f"--seed {random.randint(0, 2**32 - 1)}"
180 for instance_path in instances
181 ]
183 job_name = (
184 f"Run Selector: {self.name} on {len(instances)} instances"
185 if not job_name
186 else job_name
187 )
188 import subprocess
190 r = rrr.add_to_queue(
191 runner=run_on,
192 cmd=commands,
193 name=job_name,
194 stdout=None if run_on == Runner.LOCAL else subprocess.PIPE, # Print
195 stderr=None if run_on == Runner.LOCAL else subprocess.PIPE, # Print
196 base_dir=log_dir,
197 sbatch_options=sbatch_options,
198 prepend=slurm_prepend,
199 dependencies=dependencies,
200 )
201 if run_on == Runner.LOCAL:
202 r.wait()
203 return r
206class SelectionScenario:
207 """A scenario for a Selector."""
209 __selector_solver_name__ = "portfolio_selector"
211 def __init__(
212 self: SelectionScenario,
213 parent_directory: Path,
214 selector: Selector,
215 objective: SparkleObjective,
216 performance_data: PerformanceDataFrame | Path,
217 feature_data: FeatureDataFrame | Path,
218 feature_extractors: list[str] = None,
219 solver_cutoff: int | float = None,
220 extractor_cutoff: int | float = None,
221 ablate: bool = False,
222 subdir_path: Path = None,
223 ) -> None:
224 """Initialize a scenario for a selector."""
225 self.selector: Selector = selector
226 self.objective: SparkleObjective = objective
227 self.solver_cutoff: float = solver_cutoff
228 self.extractor_cutoff: float = extractor_cutoff
229 if subdir_path is not None:
230 self.directory = parent_directory / subdir_path
231 elif isinstance(performance_data, PerformanceDataFrame):
232 self.directory: Path = (
233 parent_directory
234 / selector.name
235 / "_".join([Path(s).name for s in performance_data.solvers])
236 )
237 else:
238 self.directory = performance_data.parent
239 self.name = f"{selector.name} on {self.directory.name}"
240 self.selector_file_path: Path = self.directory / "portfolio_selector"
241 self.scenario_file: Path = self.directory / "scenario.txt"
242 self.selector_performance_path: Path = (
243 self.directory / "selector_performance.csv"
244 )
245 if self.selector_performance_path.exists():
246 self.selector_performance_data = PerformanceDataFrame(
247 self.selector_performance_path
248 )
249 else: # Create new performance data frame for selector, write to file later
250 self.selector_performance_data = performance_data.clone()
251 self.selector_performance_data.add_solver(
252 SelectionScenario.__selector_solver_name__
253 )
255 if isinstance(performance_data, PerformanceDataFrame): # Convert
256 # Convert the dataframes to Selector Format
257 new_column_names: list[str] = []
258 for solver, config_id, _ in performance_data.columns:
259 if f"{solver}_{config_id}" not in new_column_names:
260 new_column_names.append(f"{solver}_{config_id}")
261 self.performance_data = performance_data.drop(
262 [PerformanceDataFrame.column_seed], axis=1, level=2
263 )
264 self.performance_data = self.performance_data.droplevel(
265 [
266 PerformanceDataFrame.column_configuration,
267 PerformanceDataFrame.column_meta,
268 ],
269 axis=1,
270 )
271 self.performance_data = self.performance_data.droplevel(
272 PerformanceDataFrame.index_objective, axis=0
273 )
274 self.performance_data.columns = new_column_names
275 # Requires instances as index for both, columns as features / solvers
276 # TODO: This should be an aggregation instead?
277 self.performance_data.index = self.performance_data.index.droplevel("Run")
278 # Enforce data type to be numeric
279 self.performance_data = self.performance_data.astype(float)
280 self.performance_target_path = self.directory / "performance_data.csv"
281 else: # Read from Path
282 self.performance_data: pd.DataFrame = pd.read_csv(
283 performance_data, index_col=0
284 )
285 self.performance_target_path: Path = performance_data
287 if isinstance(feature_data, FeatureDataFrame): # Convert
288 self.feature_extractors = feature_data.extractors
289 # Features requires instances as index, columns as feature names
290 feature_target = feature_data.copy()
291 feature_target.index = feature_target.index.map("_".join) # Reduce Index
292 # ASF -> feature columns, instance rows
293 self.feature_data: pd.DataFrame = feature_target.T.astype(float)
294 self.feature_target_path: Path = self.directory / "feature_data.csv"
295 else: # Read from Path
296 self.feature_extractors = feature_extractors
297 self.feature_data: pd.DataFrame = pd.read_csv(feature_data)
298 self.feature_target_path: Path = feature_data
300 self.ablation_scenarios: list[SelectionScenario] = []
301 if ablate and len(self.performance_data.columns) > 2:
302 for solver in self.performance_data.columns:
303 solver_key, conf_id = solver.split("_", maxsplit=1)
304 ablate_subdir = Path(f"ablated_{Path(solver).name}")
305 ablated_directory = self.directory / ablate_subdir
306 if (ablated_directory / "performance_data.csv").exists():
307 ablated_pd = ablated_directory / "performance_data.csv"
308 elif isinstance(performance_data, PerformanceDataFrame):
309 ablated_pd = performance_data.clone()
310 ablated_pd.remove_configuration(solver_key, conf_id)
311 else: # Note we could do this but it would be hacky?
312 raise ValueError(
313 "Cannot ablate scenario after loading from file! "
314 "Requires original PerformanceDataFrame."
315 )
317 self.ablation_scenarios.append(
318 SelectionScenario(
319 parent_directory=self.directory,
320 selector=selector,
321 objective=objective,
322 performance_data=ablated_pd,
323 feature_data=feature_data,
324 solver_cutoff=solver_cutoff,
325 ablate=False, # If we set to true here, recursion would happen
326 subdir_path=ablate_subdir,
327 )
328 )
330 @property
331 def training_instances(self: SelectionScenario) -> list[str]:
332 """Get the training instances."""
333 return self.performance_data.index.to_list()
335 @property
336 def test_instances(self: SelectionScenario) -> list[str]:
337 """Get the test instances."""
338 instances = self.selector_performance_data.instances
339 return [i for i in instances if i not in self.training_instances]
341 @property
342 def training_instance_sets(self: SelectionScenario) -> list[str]:
343 """Get the training instance sets."""
344 # NOTE: This no longer works as instances no longer have their set in the name
345 return list(set(Path(i).parent.name for i in self.training_instances))
347 @property
348 def test_instance_sets(self: SelectionScenario) -> list[str]:
349 """Get the test instance sets."""
350 # NOTE: This no longer works as instances no longer have their set in the name
351 return list(set(Path(i).parent.name for i in self.test_instances))
353 @property
354 def instance_sets(self: SelectionScenario) -> list[str]:
355 """Get all the instance sets used in this scenario."""
356 return list(
357 set(Path(i).parent.name for i in self.selector_performance_data.instances)
358 )
360 @property
361 def solvers(self: SelectionScenario) -> list[str]:
362 """Get the solvers used for the selector."""
363 return self.performance_data.columns.to_list()
365 def create_scenario(self: SelectionScenario) -> None:
366 """Prepare the scenario directories."""
367 self.directory.mkdir(parents=True, exist_ok=True)
368 self.performance_data.to_csv(self.performance_target_path)
369 self.feature_data.to_csv(self.feature_target_path)
370 self.selector_performance_data.save_csv(self.selector_performance_path)
371 self.create_scenario_file()
373 def create_scenario_file(self: SelectionScenario) -> None:
374 """Create the scenario file.
376 Write the scenario to file.
377 """
378 with self.scenario_file.open("w") as fout:
379 fout.write(self.serialise())
381 def serialise(self: SelectionScenario) -> dict:
382 """Serialize the scenario."""
383 return (
384 f"selector: {self.selector.name}\n"
385 f"solver_cutoff: {self.solver_cutoff}\n"
386 f"extractor_cutoff: {self.extractor_cutoff}\n"
387 f"ablate: {len(self.ablation_scenarios) > 0}\n"
388 f"objective: {self.objective}\n"
389 f"selector_performance_data: {self.selector_performance_path}\n"
390 f"performance_data: {self.performance_target_path}\n"
391 f"feature_data: {self.feature_target_path}\n"
392 f"feature_extractors: {','.join(self.feature_extractors)}\n"
393 )
395 @staticmethod
396 def from_file(scenario_file: Path) -> SelectionScenario:
397 """Reads scenario file and initalises SelectorScenario."""
398 if not scenario_file.is_file() and (scenario_file / "scenario.txt").is_file():
399 scenario_file = scenario_file / "scenario.txt" # Resolve from directory
400 values = {
401 key: value.strip()
402 for key, value in [
403 line.split(": ", maxsplit=1) for line in scenario_file.open()
404 ]
405 }
406 selector_class, selector_model = values["selector"].split("_", maxsplit=1)
407 import ast
409 selector = Selector(selector_class, selector_model)
410 return SelectionScenario(
411 parent_directory=scenario_file.parent,
412 selector=selector,
413 objective=resolve_objective(values["objective"]),
414 performance_data=Path(values["performance_data"]),
415 feature_data=Path(values["feature_data"]),
416 feature_extractors=values["feature_extractors"].split(","),
417 solver_cutoff=float(values["solver_cutoff"]),
418 ablate=ast.literal_eval(values["ablate"]),
419 )