Coverage for sparkle/configurator/configurator.py: 69%
278 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-07-01 13:21 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-07-01 13:21 +0000
1"""Configurator class to use different algorithm configurators."""
2from __future__ import annotations
3import re
4import shutil
5import decimal
6from pathlib import Path
8import runrunner as rrr
9from runrunner import Runner, Run
11from sparkle.solver import Solver
12from sparkle.instance import InstanceSet, Instance_Set
13from sparkle.structures import PerformanceDataFrame
14from sparkle.types import SparkleObjective
17class Configurator:
18 """Abstact class to use different configurators like SMAC."""
19 configurator_cli_path = Path(__file__).parent.resolve() / "configurator_cli.py"
21 full_name = "Configurator Abstract Class"
22 version = "NaN"
24 def __init__(self: Configurator,
25 multi_objective_support: bool = False) -> None:
26 """Initialize Configurator.
28 Args:
29 multi_objective_support: Whether the configurator supports
30 multi objective optimization for solvers.
31 """
32 self.multiobjective = multi_objective_support
34 @property
35 def name(self: Configurator) -> str:
36 """Return the name of the configurator."""
37 return self.__class__.__name__
39 @staticmethod
40 def scenario_class() -> ConfigurationScenario:
41 """Return the scenario class of the configurator."""
42 return ConfigurationScenario
44 @staticmethod
45 def check_requirements(verbose: bool = False) -> bool:
46 """Check if the configurator is installed."""
47 raise NotImplementedError
49 @staticmethod
50 def download_requirements() -> None:
51 """Download the configurator."""
52 raise NotImplementedError
54 def configure(self: Configurator,
55 configuration_commands: list[str],
56 data_target: PerformanceDataFrame,
57 output: Path,
58 scenario: ConfigurationScenario,
59 configuration_ids: list[str] = None,
60 validate_after: bool = True,
61 sbatch_options: list[str] = None,
62 slurm_prepend: str | list[str] | Path = None,
63 num_parallel_jobs: int = None,
64 base_dir: Path = None,
65 run_on: Runner = Runner.SLURM) -> Run:
66 """Start configuration job.
68 This method is shared by the configurators and should be called by the
69 implementation/subclass of the configurator.
71 Args:
72 configuration_commands: List of configurator commands to execute
73 data_target: Performance data to store the results.
74 output: Output directory.
75 scenario: ConfigurationScenario to execute.
76 configuration_ids: List of configuration ids that are to be created
77 validate_after: Whether the configurations should be validated
78 sbatch_options: List of slurm batch options to use
79 slurm_prepend: Slurm script to prepend to the sbatch
80 num_parallel_jobs: The maximum number of jobs to run in parallel
81 base_dir: The base_dir of RunRunner where the sbatch scripts will be placed
82 run_on: On which platform to run the jobs. Default: Slurm.
84 Returns:
85 A RunRunner Run object.
86 """
87 if not self.check_requirements(verbose=True):
88 raise RuntimeError(
89 f"{self.name} is not installed. Please install {self.name} "
90 "and try again.")
91 # Add the configuration IDs to the dataframe with empty configurations
92 data_target.add_configuration(str(scenario.solver.directory),
93 configuration_ids,
94 [{}] * len(configuration_ids))
95 data_target.save_csv()
96 # Submit the configuration job
97 runs = [rrr.add_to_queue(
98 runner=run_on,
99 cmd=configuration_commands,
100 name=f"{self.name}: {scenario.solver.name} on {scenario.instance_set.name}",
101 base_dir=base_dir,
102 output_path=output,
103 parallel_jobs=num_parallel_jobs,
104 sbatch_options=sbatch_options,
105 prepend=slurm_prepend)]
107 if validate_after:
108 validate = scenario.solver.run_performance_dataframe(
109 scenario.instance_set,
110 config_ids=configuration_ids,
111 performance_dataframe=data_target,
112 cutoff_time=scenario.solver_cutoff_time,
113 sbatch_options=sbatch_options,
114 slurm_prepend=slurm_prepend,
115 log_dir=scenario.validation,
116 base_dir=base_dir,
117 dependencies=runs,
118 job_name=f"{self.name}: Validating {len(configuration_ids)} "
119 f"{scenario.solver.name} Configurations on "
120 f"{scenario.instance_set.name}",
121 run_on=run_on,
122 )
123 runs.append(validate)
125 if run_on == Runner.LOCAL:
126 print(f"[{self.name}] Running {len(runs)} jobs locally...")
127 for run in runs:
128 run.wait()
129 print(f"[{self.name}] Finished running {len(runs)} jobs locally.")
130 return runs
132 @staticmethod
133 def organise_output(output_source: Path,
134 output_target: Path,
135 scenario: ConfigurationScenario,
136 configuration_id: str) -> None | str:
137 """Method to restructure and clean up after a single configurator call.
139 Args:
140 output_source: Path to the output file of the configurator run.
141 output_target: Path to the Performance DataFrame to store result.
142 scenario: ConfigurationScenario of the configuration.
143 configuration_id: ID (of the run) of the configuration.
144 """
145 raise NotImplementedError
147 @staticmethod
148 def save_configuration(scenario: ConfigurationScenario,
149 configuration_id: str,
150 configuration: dict,
151 output_target: Path) -> dict | None:
152 """Method to save a configuration to a file.
154 If the output_target is None, return the configuration.
156 Args:
157 scenario: ConfigurationScenario of the configuration. Should be removed.
158 configuration_id: ID (of the run) of the configuration.
159 configuration: Configuration to save.
160 output_target: Path to the Performance DataFrame to store result.
161 """
162 if output_target is None or not output_target.exists():
163 return configuration
164 # Save result to Performance DataFrame
165 from filelock import FileLock
166 lock = FileLock(f"{output_target}.lock")
167 with lock.acquire(timeout=600):
168 performance_data = PerformanceDataFrame(output_target)
169 # Resolve absolute path to Solver column
170 solver = [s for s in performance_data.solvers
171 if Path(s).name == scenario.solver.name][0]
172 # Update the configuration ID by adding the configuration
173 performance_data.add_configuration(
174 solver=solver,
175 configuration_id=configuration_id,
176 configuration=configuration)
177 performance_data.save_csv()
179 def get_status_from_logs(self: Configurator) -> None:
180 """Method to scan the log files of the configurator for warnings."""
181 raise NotImplementedError
184class ConfigurationScenario:
185 """Template class to handle a configuration scenarios."""
187 def __init__(self: ConfigurationScenario,
188 solver: Solver,
189 instance_set: InstanceSet,
190 sparkle_objectives: list[SparkleObjective],
191 number_of_runs: int,
192 parent_directory: Path) -> None:
193 """Initialize scenario paths and names.
195 Args:
196 solver: Solver that should be configured.
197 instance_set: Instances object for the scenario.
198 sparkle_objectives: Sparkle Objectives to optimize.
199 number_of_runs: The number of configurator runs to perform.
200 parent_directory: Directory in which the scenario should be placed.
201 """
202 self.solver = solver
203 self.instance_set = instance_set
204 self.sparkle_objectives = sparkle_objectives
205 self.number_of_runs = number_of_runs
207 self.directory = parent_directory / self.name
208 self.scenario_file_path = self.directory / "scenario.txt"
209 self.timestamp_path = self.directory / "timestamp"
210 self.validation: Path = self.directory / "validation"
211 self.tmp: Path = self.directory / "tmp"
212 self.results_directory: Path = self.directory / "results"
213 self._ablation_scenario: AblationScenario = None
214 self._timestamp: str = None
216 @property
217 def configurator(self: ConfigurationScenario) -> Configurator:
218 """Return the type of configurator the scenario belongs to."""
219 return Configurator
221 @property
222 def name(self: ConfigurationScenario) -> str:
223 """Return the name of the scenario."""
224 return f"{self.solver.name}_{self.instance_set.name}"
226 @property
227 def timestamp(self: ConfigurationScenario) -> str:
228 """Return the timestamp of the scenario."""
229 if not self.timestamp_path.exists():
230 return None
231 if self._timestamp is None:
232 self._timestamp = self.timestamp_path.read_text().strip()
233 return self._timestamp
234 from datetime import datetime
235 stamp = datetime.fromtimestamp(self.scenario_file_path.stat().st_mtime)
236 return stamp.strftime("%Y%m%d-%H%M")
238 @property
239 def configuration_ids(self: ConfigurationScenario) -> list[str]:
240 """Return the IDs of the configurations for the scenario.
242 Only exists after the scenario has been created.
244 Returns:
245 List of configuration IDs, one for each run.
246 """
247 return [f"{self.configurator.__name__}_{self.timestamp}_{i}"
248 for i in range(self.number_of_runs)]
250 @property
251 def ablation_scenario(self: ConfigurationScenario) -> AblationScenario:
252 """Return the ablation scenario for the scenario if it exists."""
253 if self._ablation_scenario is not None:
254 return self._ablation_scenario
255 for scenario in self.directory.glob("*/ablation_config.txt"):
256 self._ablation_scenario = AblationScenario.from_file(scenario, self)
257 return self._ablation_scenario
258 return None
260 def create_scenario(self: ConfigurationScenario, parent_directory: Path) -> None:
261 """Create scenario with solver and instances in the parent directory.
263 This prepares all the necessary subdirectories related to configuration.
265 Args:
266 parent_directory: Directory in which the scenario should be created.
267 """
268 raise NotImplementedError
270 def create_scenario_file(self: ConfigurationScenario) -> Path:
271 """Create a file with the configuration scenario."""
272 with self.timestamp_path.open("w") as fout:
273 from datetime import datetime
274 stamp = datetime.fromtimestamp(datetime.now().timestamp())
275 fout.write(stamp.strftime("%Y%m%d-%H%M"))
277 def serialise(self: ConfigurationScenario) -> dict:
278 """Serialize the configuration scenario."""
279 raise NotImplementedError
281 @classmethod
282 def find_scenario(cls: ConfigurationScenario,
283 directory: Path,
284 solver: Solver,
285 instance_set: InstanceSet) -> ConfigurationScenario:
286 """Resolve a scenario from a directory and Solver / Training set."""
287 scenario_name = f"{solver.name}_{instance_set.name}"
288 path = directory / f"{scenario_name}" / "scenario.txt"
289 if not path.exists():
290 return None
291 return cls.from_file(path)
293 @staticmethod
294 def from_file(scenario_file: Path) -> ConfigurationScenario:
295 """Reads scenario file and initalises ConfigurationScenario."""
296 raise NotImplementedError
299class AblationScenario:
300 """Class for ablation analysis."""
302 # We use the SMAC2 target algorithm for solver output handling
303 configurator_target = Path(__file__).parent.resolve() /\
304 "implementations" / "SMAC2" / "smac2_target_algorithm.py"
306 ablation_dir = Path(__file__).parent / "implementations" / "ablationAnalysis-0.9.4"
307 ablation_executable = ablation_dir / "ablationAnalysis"
308 ablation_validation_executable = ablation_dir / "ablationValidation"
310 def __init__(self: AblationScenario,
311 configuration_scenario: ConfigurationScenario,
312 test_set: InstanceSet,
313 cutoff_length: str,
314 concurrent_clis: int,
315 best_configuration: dict,
316 ablation_racing: bool = False) -> None:
317 """Initialize ablation scenario.
319 Args:
320 solver: Solver object
321 configuration_scenario: Configuration scenario
322 train_set: The training instance
323 test_set: The test instance
324 cutoff_length: The cutoff length for ablation analysis
325 concurrent_clis: The maximum number of concurrent jobs on a single node
326 best_configuration: The configuration to ablate from.
327 ablation_racing: Whether to use ablation racing
328 """
329 self.config_scenario = configuration_scenario
330 self.solver = configuration_scenario.solver
331 self.train_set = configuration_scenario.instance_set
332 self.concurrent_clis = None
333 self.test_set = test_set
334 self.cutoff_time = configuration_scenario.solver_cutoff_time
335 self.cutoff_length = cutoff_length
336 self.concurrent_clis = concurrent_clis
337 self.best_configuration = best_configuration
338 self.ablation_racing = ablation_racing
339 self.scenario_name = f"ablation_{configuration_scenario.name}"
340 if self.test_set is not None:
341 self.scenario_name += f"_{self.test_set.name}"
342 self.scenario_dir = configuration_scenario.directory / self.scenario_name
344 # Create required scenario Paths
345 self.tmp_dir = self.scenario_dir / "tmp"
346 self.validation_dir = self.scenario_dir / "validation"
347 self.validation_dir_tmp = self.validation_dir / "tmp"
348 self.table_file = self.validation_dir / "log" / "ablation-validation-run1234.txt"
350 @staticmethod
351 def check_requirements(verbose: bool = False) -> bool:
352 """Check if Ablation Analysis is installed."""
353 import warnings
354 if no_java := shutil.which("java") is None:
355 if verbose:
356 warnings.warn(
357 "AblationAnalysis requires Java 1.8.0_402, but Java is not installed"
358 ". Please ensure Java is installed."
359 )
360 if no_exec := not AblationScenario.ablation_executable.exists():
361 if verbose:
362 warnings.warn(
363 "AblationAnalysis executable not found. Please ensure Ablation"
364 " Analysis is installed in the expected Path "
365 f"({AblationScenario.ablation_executable}).")
366 if no_validation := not AblationScenario.ablation_validation_executable.exists():
367 if verbose:
368 warnings.warn(
369 "AblationAnalysis Validation executable not found. Please ensure "
370 "Ablation Analysis is installed in the expected Path "
371 f"({AblationScenario.ablation_validation_executable}).")
372 return not (no_java or no_exec or no_validation)
374 @staticmethod
375 def download_requirements(
376 ablation_url: str =
377 "https://github.com/ADA-research/Sparkle/raw/refs/heads/development"
378 "/Resources/Other/ablationAnalysis-0.9.4.zip"
379 ) -> None:
380 """Download Ablation Analysis executable."""
381 if AblationScenario.ablation_executable.exists():
382 return # Already installed
383 from urllib.request import urlopen
384 import zipfile, io
385 AblationScenario.ablation_dir.mkdir(parents=True, exist_ok=True)
386 r = urlopen(ablation_url, timeout=60)
387 z = zipfile.ZipFile(io.BytesIO(r.read()))
388 z.extractall(AblationScenario.ablation_dir)
389 # Ensure execution rights
390 AblationScenario.ablation_executable.chmod(0o755)
391 AblationScenario.ablation_validation_executable.chmod(0o755)
393 def create_configuration_file(self: AblationScenario) -> Path:
394 """Create a configuration file for ablation analysis.
396 Returns:
397 Path to the created configuration file.
398 """
399 objective = self.config_scenario.sparkle_objectives[0]
400 pcs = self.solver.get_configuration_space()
401 parameter_names = [p.name for p in pcs.values()]
402 # We need to remove any redundant keys that are not in PCS
403 best_configuration = self.best_configuration.copy()
404 removable_keys = [key for key in best_configuration
405 if key not in parameter_names]
406 for key in removable_keys:
407 del best_configuration[key]
408 opt_config_str = " ".join([f"-{k} {v}" for k, v in best_configuration.items()])
409 # We need to check which params are missing and supplement with default values
410 for p in list(pcs.values()):
411 if p.name not in opt_config_str:
412 opt_config_str += f" -{p.name} {p.default_value}"
414 # Ablation cannot deal with E scientific notation in floats
415 ctx = decimal.Context(prec=16)
416 for config in opt_config_str.split(" -"):
417 _, value = config.strip().split(" ")
418 if "e" in value.lower():
419 value = value.strip("'")
420 float_value = float(value.lower())
421 formatted = format(ctx.create_decimal(float_value), "f")
422 opt_config_str = opt_config_str.replace(value, formatted)
424 smac_run_obj = "RUNTIME" if objective.time else "QUALITY"
425 objective_str = "MEAN10" if objective.time else "MEAN"
426 pcs_file_path = f"{self.config_scenario.solver.pcs_file.absolute()}"
428 # Create config file
429 config_file = self.scenario_dir / "ablation_config.txt"
430 config = (f'algo = "{AblationScenario.configurator_target.absolute()} '
431 f"{self.config_scenario.solver.directory.absolute()} "
432 f'{self.tmp_dir.absolute()} {objective}"\n'
433 f"execdir = {self.tmp_dir.absolute()}\n"
434 "experimentDir = ./\n"
435 f"deterministic = {1 if self.solver.deterministic else 0}\n"
436 f"run_obj = {smac_run_obj}\n"
437 f"overall_obj = {objective_str}\n"
438 f"cutoffTime = {self.cutoff_time}\n"
439 f"cutoff_length = {self.cutoff_length}\n"
440 f"cli-cores = {self.concurrent_clis}\n"
441 f"useRacing = {self.ablation_racing}\n"
442 "seed = 1234\n" # NOTE: This does not seem right
443 f"paramfile = {pcs_file_path}\n"
444 "instance_file = instances_train.txt\n"
445 "test_instance_file = instances_test.txt\n"
446 "sourceConfiguration = DEFAULT\n"
447 f'targetConfiguration = "{opt_config_str}"')
448 config_file.open("w").write(config)
449 # Write config to validation directory
450 conf_valid = config.replace(f"execdir = {self.tmp_dir.absolute()}\n",
451 f"execdir = {self.validation_dir_tmp.absolute()}\n")
452 (self.validation_dir / config_file.name).open("w").write(conf_valid)
453 return self.validation_dir / config_file.name
455 def create_instance_file(self: AblationScenario, test: bool = False) -> Path:
456 """Create an instance file for ablation analysis."""
457 file_suffix = "_train.txt"
458 instance_set = self.train_set
459 if test:
460 file_suffix = "_test.txt"
461 instance_set = self.test_set if self.test_set is not None else self.train_set
462 # We give the Ablation script the paths of the instances
463 file_instance = self.scenario_dir / f"instances{file_suffix}"
464 with file_instance.open("w") as fh:
465 for instance in instance_set._instance_paths:
466 # We need to unpack the multi instance file paths in quotes
467 if isinstance(instance, list):
468 joined_instances = " ".join(
469 [str(file.absolute()) for file in instance])
470 fh.write(f"{joined_instances}\n")
471 else:
472 fh.write(f"{instance.absolute()}\n")
473 # Copy to validation directory
474 shutil.copyfile(file_instance, self.validation_dir / file_instance.name)
475 return file_instance
477 def create_scenario(self: AblationScenario, override_dirs: bool = False) -> None:
478 """Create scenario directory and files."""
479 if self.scenario_dir.exists():
480 print("WARNING: Found existing ablation scenario.")
481 if not override_dirs:
482 print("Set override to True to overwrite existing scenario.")
483 return
484 print("Overwriting existing scenario...")
485 shutil.rmtree(self.scenario_dir)
486 self.tmp_dir.mkdir(parents=True, exist_ok=True)
487 self.validation_dir_tmp.mkdir(parents=True, exist_ok=True)
488 self.create_instance_file()
489 self.create_instance_file(test=True)
490 self.create_configuration_file()
492 def check_for_ablation(self: AblationScenario) -> bool:
493 """Checks if ablation has terminated successfully."""
494 if not self.table_file.is_file():
495 return False
496 # First line in the table file should be "Ablation analysis validation complete."
497 table_line = self.table_file.open().readline().strip()
498 return table_line == "Ablation analysis validation complete."
500 def read_ablation_table(self: AblationScenario) -> list[list[str]]:
501 """Read from ablation table of a scenario."""
502 if not self.check_for_ablation():
503 # No ablation table exists for this solver-instance pair
504 return []
505 results = [["Round", "Flipped parameter", "Source value", "Target value",
506 "Validation result"]]
508 for line in self.table_file.open().readlines():
509 # Pre-process lines from the ablation file and add to the results dictionary.
510 # Sometimes ablation rounds switch multiple parameters at once.
511 # EXAMPLE: 2 EDR, EDRalpha 0, 0.1 1, 0.1013241633106732 486.31691
512 # To split the row correctly, we remove the space before the comma separated
513 # parameters and add it back.
514 # T.S. 30-01-2024: the results object is a nested list not dictionary?
515 values = re.sub(r"\s+", " ", line.strip())
516 values = re.sub(r", ", ",", values)
517 values = [val.replace(",", ", ") for val in values.split(" ")]
518 if len(values) == 5:
519 results.append(values)
520 return results
522 def submit_ablation(self: AblationScenario,
523 log_dir: Path,
524 sbatch_options: list[str] = [],
525 slurm_prepend: str | list[str] | Path = None,
526 run_on: Runner = Runner.SLURM) -> list[Run]:
527 """Submit an ablation job.
529 Args:
530 log_dir: Directory to store job logs
531 sbatch_options: Options to pass to sbatch
532 slurm_prepend: Script to prepend to sbatch script
533 run_on: Determines to which RunRunner queue the job is added
535 Returns:
536 A list of Run objects. Empty when running locally.
537 """
538 if not self.check_requirements(verbose=True):
539 raise RuntimeError(
540 "Ablation Analysis is not available. Please ensure Java and Ablation "
541 "Analysis is installed and try again."
542 )
543 # 1. submit the ablation to the runrunner queue
544 cmd = (f"{AblationScenario.ablation_executable.absolute()} "
545 "--optionFile ablation_config.txt")
546 srun_options = ["-N1", "-n1", f"-c{self.concurrent_clis}"]
547 sbatch_options += [f"--cpus-per-task={self.concurrent_clis}"]
548 run_ablation = rrr.add_to_queue(
549 runner=run_on,
550 cmd=cmd,
551 name=f"Ablation analysis: {self.solver.name} on {self.train_set.name}",
552 base_dir=log_dir,
553 path=self.scenario_dir,
554 sbatch_options=sbatch_options,
555 srun_options=srun_options,
556 prepend=slurm_prepend)
558 runs = []
559 if run_on == Runner.LOCAL:
560 run_ablation.wait()
561 runs.append(run_ablation)
563 # 2. Run ablation validation run if we have a test set to run on
564 if self.test_set is not None:
565 # Validation dir should have a copy of all needed files, except for the
566 # output of the ablation run, which is stored in ablation-run[seed].txt
567 cmd = f"{AblationScenario.ablation_validation_executable.absolute()} "\
568 "--optionFile ablation_config.txt "\
569 "--ablationLogFile ../log/ablation-run1234.txt"
571 run_ablation_validation = rrr.add_to_queue(
572 runner=run_on,
573 cmd=cmd,
574 name=f"Ablation validation: Test set {self.test_set.name}",
575 path=self.validation_dir,
576 base_dir=log_dir,
577 dependencies=run_ablation,
578 sbatch_options=sbatch_options,
579 prepend=slurm_prepend)
581 if run_on == Runner.LOCAL:
582 run_ablation_validation.wait()
583 runs.append(run_ablation_validation)
584 return runs
586 @staticmethod
587 def from_file(path: Path,
588 config_scenario: ConfigurationScenario) -> AblationScenario:
589 """Reads scenario file and initalises AblationScenario."""
590 variables = {}
591 for line in path.open().readlines():
592 if line.strip() == "":
593 continue
594 key, value = line.strip().split(" = ", maxsplit=1)
595 variables[key] = value
596 best_conf = {}
597 for keyvalue in variables["targetConfiguration"].replace('"', "").split("-"):
598 keyvalue = keyvalue.strip()
599 if keyvalue:
600 key, value = keyvalue.strip().split(" ", maxsplit=1)
601 best_conf[key] = value
602 test_set = None
603 if (path.parent / "instances_test.txt").exists():
604 test_path = (path.parent / "instances_test.txt").open().readline().strip()
605 test_path = Path(test_path).parent
606 if test_path != config_scenario.instance_set.directory:
607 test_set = Instance_Set(test_path)
608 return AblationScenario(config_scenario,
609 test_set,
610 variables["cutoff_length"],
611 int(variables["cli-cores"]),
612 best_conf,
613 ablation_racing=bool(variables["useRacing"]))