Coverage for src/sparkle/selector/selector.py: 89%
157 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-15 14:11 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-15 14:11 +0000
1"""File to handle a Selector for selecting Solvers."""
3from __future__ import annotations
4import random
5from pathlib import Path
7from sklearn.base import ClassifierMixin, RegressorMixin
8from asf.cli import cli_train as asf_cli
9from asf.predictors import AbstractPredictor
10from asf.selectors.abstract_model_based_selector import AbstractModelBasedSelector
12import runrunner as rrr
13from runrunner import Runner, Run
14import pandas as pd
16from sparkle.types import SparkleObjective, resolve_objective
17from sparkle.structures import FeatureDataFrame, PerformanceDataFrame
18from sparkle.instance import InstanceSet
21class Selector:
22 """The Selector class for handling Algorithm Selection."""
24 selector_cli = Path(__file__).parent / "selector_cli.py"
26 def __init__(
27 self: Selector,
28 selector_class: AbstractModelBasedSelector,
29 model_class: AbstractPredictor | ClassifierMixin | RegressorMixin,
30 ) -> None:
31 """Initialize the Selector object.
33 Args:
34 selector_class: The (name of) Selector class to construct.
35 model_class: The (name of) model class the selector will use.
36 """
37 if isinstance(selector_class, str): # Resolve class name
38 from asf import selectors
40 selector_class = getattr(selectors, selector_class)
41 if isinstance(model_class, str): # Resolve class name
42 from sklearn import ensemble
44 model_class = getattr(ensemble, model_class)
45 self.selector_class = selector_class
46 self.model_class = model_class
48 @property
49 def name(self: Selector) -> str:
50 """Return the name of the selector."""
51 return f"{self.selector_class.__name__}_{self.model_class.__name__}"
53 def construct(
54 self: Selector,
55 selection_scenario: SelectionScenario,
56 run_on: Runner = Runner.SLURM,
57 job_name: str = None,
58 sbatch_options: list[str] = None,
59 slurm_prepend: str | list[str] | Path = None,
60 base_dir: Path = Path(),
61 ) -> Run:
62 """Construct the Selector.
64 Args:
65 selection_scenario: The scenario to construct the Selector for.
66 run_on: Which runner to use. Defaults to slurm.
67 job_name: Name to give the construction job when submitting.
68 sbatch_options: Additional options to pass to sbatch.
69 slurm_prepend: Slurm script to prepend to the sbatch
70 base_dir: The base directory to run the Selector in.
72 Returns:
73 The construction Run
74 """
75 selection_scenario.create_scenario()
76 selector = self.selector_class(
77 model_class=self.model_class,
78 budget=selection_scenario.solver_cutoff,
79 maximize=not selection_scenario.objective.minimise,
80 )
81 cmd = asf_cli.build_cli_command(
82 selector,
83 selection_scenario.feature_target_path,
84 selection_scenario.performance_target_path,
85 selection_scenario.selector_file_path,
86 )
87 cmd = [" ".join([str(c) for c in cmd])]
89 job_name = job_name or f"Selector Construction: {selection_scenario.name}"
90 construct = rrr.add_to_queue(
91 runner=run_on,
92 cmd=cmd,
93 name=job_name,
94 base_dir=base_dir,
95 sbatch_options=sbatch_options,
96 prepend=slurm_prepend,
97 )
99 if run_on == Runner.LOCAL:
100 construct.wait()
101 if not selection_scenario.selector_file_path.is_file():
102 print(f"Selector construction of {self.name} failed!")
103 return construct
105 def run(
106 self: Selector,
107 selector_path: Path,
108 instance: str,
109 feature_data: FeatureDataFrame,
110 ) -> list:
111 """Run the Selector, returning the prediction schedule upon success."""
112 instance_features = feature_data[
113 [
114 instance,
115 ]
116 ]
117 instance_features.index = instance_features.index.map("_".join) # Reduce
118 instance_features = instance_features.T # ASF dataframe structure
119 selector = self.selector_class.load(selector_path)
120 schedule = selector.predict(instance_features)
121 if schedule is None:
122 print(f"ERROR: Selector {self.name} failed predict schedule!")
123 return None
124 # ASF presents result as schedule per instance, we only use one in this setting
125 schedule = schedule[instance]
126 for index, (solver, time) in enumerate(schedule):
127 # Split solver name back into solver and config id
128 solver_name, conf_index = solver.split("_", maxsplit=1)
129 schedule[index] = (solver_name, conf_index, time)
130 return schedule
132 def run_cli(
133 self: Selector,
134 scenario_path: Path,
135 instance_set: InstanceSet | list[Path],
136 feature_data: Path,
137 run_on: Runner = Runner.LOCAL,
138 sbatch_options: list[str] = None,
139 slurm_prepend: str | list[str] | Path = None,
140 job_name: str = None,
141 dependencies: list[Run] = None,
142 log_dir: Path = None,
143 ) -> Run:
144 """Run the Selector CLI and write result to the Scenario PerformanceDataFrame.
146 Args:
147 scenario_path: The path to the scenario with the Selector to run.
148 instance_set: The instance set to run the Selector on.
149 feature_data: The instance feature data to use.
150 run_on: Which runner to use. Defaults to slurm.
151 sbatch_options: Additional options to pass to sbatch.
152 slurm_prepend: Slurm script to prepend to the sbatch
153 job_name: Name to give the Slurm job when submitting.
154 dependencies: List of dependencies to add to the job.
155 log_dir: The directory to write logs to.
157 Returns:
158 The Run object.
159 """
160 # NOTE: The selector object and the scenario selector could differ which could
161 # cause unintended behaviour (e.g. running a different selector than desired)
162 instances = (
163 instance_set
164 if isinstance(instance_set, list)
165 else instance_set.instance_paths
166 )
167 commands = [
168 f"python3 {Selector.selector_cli} "
169 f"--selector-scenario {scenario_path} "
170 f"--instance {instance_path} "
171 f"--feature-data {feature_data} "
172 f"--log-dir {log_dir} "
173 f"--seed {random.randint(0, 2**32 - 1)}"
174 for instance_path in instances
175 ]
177 job_name = (
178 f"Run Selector: {self.name} on {len(instances)} instances"
179 if not job_name
180 else job_name
181 )
182 import subprocess
184 r = rrr.add_to_queue(
185 cmd=commands,
186 name=job_name,
187 stdout=None if run_on == Runner.LOCAL else subprocess.PIPE, # Print
188 stderr=None if run_on == Runner.LOCAL else subprocess.PIPE, # Print
189 base_dir=log_dir,
190 runner=run_on,
191 sbatch_options=sbatch_options,
192 prepend=slurm_prepend,
193 dependencies=dependencies,
194 )
195 if run_on == Runner.LOCAL:
196 r.wait()
197 return r
200class SelectionScenario:
201 """A scenario for a Selector."""
203 __selector_solver_name__ = "portfolio_selector"
205 def __init__(
206 self: SelectionScenario,
207 parent_directory: Path,
208 selector: Selector,
209 objective: SparkleObjective,
210 performance_data: PerformanceDataFrame | Path,
211 feature_data: FeatureDataFrame | Path,
212 feature_extractors: list[str] = None,
213 solver_cutoff: int | float = None,
214 extractor_cutoff: int | float = None,
215 ablate: bool = False,
216 subdir_path: Path = None,
217 ) -> None:
218 """Initialize a scenario for a selector."""
219 self.selector: Selector = selector
220 self.objective: SparkleObjective = objective
221 self.solver_cutoff: float = solver_cutoff
222 self.extractor_cutoff: float = extractor_cutoff
223 if subdir_path is not None:
224 self.directory = parent_directory / subdir_path
225 elif isinstance(performance_data, PerformanceDataFrame):
226 self.directory: Path = (
227 parent_directory
228 / selector.name
229 / "_".join([Path(s).name for s in performance_data.solvers])
230 )
231 else:
232 self.directory = performance_data.parent
233 self.name = f"{selector.name} on {self.directory.name}"
234 self.selector_file_path: Path = self.directory / "portfolio_selector"
235 self.scenario_file: Path = self.directory / "scenario.txt"
236 self.selector_performance_path: Path = (
237 self.directory / "selector_performance.csv"
238 )
239 if self.selector_performance_path.exists():
240 self.selector_performance_data = PerformanceDataFrame(
241 self.selector_performance_path
242 )
243 else: # Create new performance data frame for selector, write to file later
244 self.selector_performance_data = performance_data.clone()
245 self.selector_performance_data.add_solver(
246 SelectionScenario.__selector_solver_name__
247 )
249 if isinstance(performance_data, PerformanceDataFrame): # Convert
250 # Convert the dataframes to Selector Format
251 new_column_names: list[str] = []
252 for solver, config_id, _ in performance_data.columns:
253 if f"{solver}_{config_id}" not in new_column_names:
254 new_column_names.append(f"{solver}_{config_id}")
255 self.performance_data = performance_data.drop(
256 [PerformanceDataFrame.column_seed], axis=1, level=2
257 )
258 self.performance_data = self.performance_data.droplevel(
259 [
260 PerformanceDataFrame.column_configuration,
261 PerformanceDataFrame.column_meta,
262 ],
263 axis=1,
264 )
265 self.performance_data = self.performance_data.droplevel(
266 PerformanceDataFrame.index_objective, axis=0
267 )
268 self.performance_data.columns = new_column_names
269 # Requires instances as index for both, columns as features / solvers
270 # TODO: This should be an aggregation instead?
271 self.performance_data.index = self.performance_data.index.droplevel("Run")
272 # Enforce data type to be numeric
273 self.performance_data = self.performance_data.astype(float)
274 self.performance_target_path = self.directory / "performance_data.csv"
275 else: # Read from Path
276 self.performance_data: pd.DataFrame = pd.read_csv(
277 performance_data, index_col=0
278 )
279 self.performance_target_path: Path = performance_data
281 if isinstance(feature_data, FeatureDataFrame): # Convert
282 self.feature_extractors = feature_data.extractors
283 # Features requires instances as index, columns as feature names
284 feature_target = feature_data.copy()
285 feature_target.index = feature_target.index.map("_".join) # Reduce Index
286 # ASF -> feature columns, instance rows
287 self.feature_data: pd.DataFrame = feature_target.T.astype(float)
288 self.feature_target_path: Path = self.directory / "feature_data.csv"
289 else: # Read from Path
290 self.feature_extractors = feature_extractors
291 self.feature_data: pd.DataFrame = pd.read_csv(feature_data)
292 self.feature_target_path: Path = feature_data
294 self.ablation_scenarios: list[SelectionScenario] = []
295 if ablate and len(self.performance_data.columns) > 2:
296 for solver in self.performance_data.columns:
297 solver_key, conf_id = solver.split("_", maxsplit=1)
298 ablate_subdir = Path(f"ablated_{Path(solver).name}")
299 ablated_directory = self.directory / ablate_subdir
300 if (ablated_directory / "performance_data.csv").exists():
301 ablated_pd = ablated_directory / "performance_data.csv"
302 elif isinstance(performance_data, PerformanceDataFrame):
303 ablated_pd = performance_data.clone()
304 ablated_pd.remove_configuration(solver_key, conf_id)
305 else: # Note we could do this but it would be hacky?
306 raise ValueError(
307 "Cannot ablate scenario after loading from file! "
308 "Requires original PerformanceDataFrame."
309 )
311 self.ablation_scenarios.append(
312 SelectionScenario(
313 parent_directory=self.directory,
314 selector=selector,
315 objective=objective,
316 performance_data=ablated_pd,
317 feature_data=feature_data,
318 solver_cutoff=solver_cutoff,
319 ablate=False, # If we set to true here, recursion would happen
320 subdir_path=ablate_subdir,
321 )
322 )
324 @property
325 def training_instances(self: SelectionScenario) -> list[str]:
326 """Get the training instances."""
327 return self.performance_data.index.to_list()
329 @property
330 def test_instances(self: SelectionScenario) -> list[str]:
331 """Get the test instances."""
332 instances = self.selector_performance_data.instances
333 return [i for i in instances if i not in self.training_instances]
335 @property
336 def training_instance_sets(self: SelectionScenario) -> list[str]:
337 """Get the training instance sets."""
338 # NOTE: This no longer works as instances no longer have their set in the name
339 return list(set(Path(i).parent.name for i in self.training_instances))
341 @property
342 def test_instance_sets(self: SelectionScenario) -> list[str]:
343 """Get the test instance sets."""
344 # NOTE: This no longer works as instances no longer have their set in the name
345 return list(set(Path(i).parent.name for i in self.test_instances))
347 @property
348 def instance_sets(self: SelectionScenario) -> list[str]:
349 """Get all the instance sets used in this scenario."""
350 return list(
351 set(Path(i).parent.name for i in self.selector_performance_data.instances)
352 )
354 @property
355 def solvers(self: SelectionScenario) -> list[str]:
356 """Get the solvers used for the selector."""
357 return self.performance_data.columns.to_list()
359 def create_scenario(self: SelectionScenario) -> None:
360 """Prepare the scenario directories."""
361 self.directory.mkdir(parents=True, exist_ok=True)
362 self.performance_data.to_csv(self.performance_target_path)
363 self.feature_data.to_csv(self.feature_target_path)
364 self.selector_performance_data.save_csv(self.selector_performance_path)
365 self.create_scenario_file()
367 def create_scenario_file(self: SelectionScenario) -> None:
368 """Create the scenario file.
370 Write the scenario to file.
371 """
372 with self.scenario_file.open("w") as fout:
373 fout.write(self.serialise())
375 def serialise(self: SelectionScenario) -> dict:
376 """Serialize the scenario."""
377 return (
378 f"selector: {self.selector.name}\n"
379 f"solver_cutoff: {self.solver_cutoff}\n"
380 f"extractor_cutoff: {self.extractor_cutoff}\n"
381 f"ablate: {len(self.ablation_scenarios) > 0}\n"
382 f"objective: {self.objective}\n"
383 f"selector_performance_data: {self.selector_performance_path}\n"
384 f"performance_data: {self.performance_target_path}\n"
385 f"feature_data: {self.feature_target_path}\n"
386 f"feature_extractors: {','.join(self.feature_extractors)}\n"
387 )
389 @staticmethod
390 def from_file(scenario_file: Path) -> SelectionScenario:
391 """Reads scenario file and initalises SelectorScenario."""
392 if not scenario_file.is_file() and (scenario_file / "scenario.txt").is_file():
393 scenario_file = scenario_file / "scenario.txt" # Resolve from directory
394 values = {
395 key: value.strip()
396 for key, value in [
397 line.split(": ", maxsplit=1) for line in scenario_file.open()
398 ]
399 }
400 selector_class, selector_model = values["selector"].split("_", maxsplit=1)
401 import ast
403 selector = Selector(selector_class, selector_model)
404 return SelectionScenario(
405 parent_directory=scenario_file.parent,
406 selector=selector,
407 objective=resolve_objective(values["objective"]),
408 performance_data=Path(values["performance_data"]),
409 feature_data=Path(values["feature_data"]),
410 feature_extractors=values["feature_extractors"].split(","),
411 solver_cutoff=float(values["solver_cutoff"]),
412 ablate=ast.literal_eval(values["ablate"]),
413 )