Coverage for src / sparkle / configurator / configurator.py: 66%
339 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 15:31 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 15:31 +0000
1"""Configurator class to use different algorithm configurators."""
3from __future__ import annotations
4import re
5import shutil
6import decimal
7from pathlib import Path
8from datetime import datetime
9from typing import Optional
10import random
12import runrunner as rrr
13from runrunner import Runner, Run
15from sparkle.solver import Solver
16from sparkle.tools.parameters import PCSConvention
17from sparkle.instance import InstanceSet, Instance_Set
18from sparkle.structures import PerformanceDataFrame
19from sparkle.types import SparkleObjective
22class Configurator:
23 """Abstact class to use different configurators like SMAC."""
25 configurator_cli_path = Path(__file__).parent.resolve() / "configurator_cli.py"
27 full_name = "Configurator Abstract Class"
28 version = "NaN"
30 def __init__(self: Configurator, multi_objective_support: bool = False) -> None:
31 """Initialize Configurator.
33 Args:
34 multi_objective_support: Whether the configurator supports
35 multi objective optimization for solvers.
36 """
37 self.multiobjective = multi_objective_support
39 @property
40 def name(self: Configurator) -> str:
41 """Return the name of the configurator."""
42 return self.__class__.__name__
44 @staticmethod
45 def scenario_class() -> ConfigurationScenario:
46 """Return the scenario class of the configurator."""
47 return ConfigurationScenario
49 @staticmethod
50 def check_requirements(verbose: bool = False) -> bool:
51 """Check if the configurator is installed."""
52 raise NotImplementedError
54 @staticmethod
55 def download_requirements() -> None:
56 """Download the configurator."""
57 raise NotImplementedError
59 def configure(
60 self: Configurator,
61 configuration_commands: list[str],
62 data_target: PerformanceDataFrame,
63 output: Path,
64 scenario: ConfigurationScenario,
65 configuration_ids: list[str] = None,
66 validate_after: bool = True,
67 sbatch_options: list[str] = None,
68 slurm_prepend: str | list[str] | Path = None,
69 num_parallel_jobs: int = None,
70 base_dir: Path = None,
71 run_on: Runner = Runner.SLURM,
72 ) -> Run:
73 """Start configuration job.
75 This method is shared by the configurators and should be called by the
76 implementation/subclass of the configurator.
78 Args:
79 configuration_commands: List of configurator commands to execute
80 data_target: Performance data to store the results.
81 output: Output directory.
82 scenario: ConfigurationScenario to execute.
83 configuration_ids: List of configuration ids that are to be created
84 validate_after: Whether the configurations should be validated
85 sbatch_options: List of slurm batch options to use
86 slurm_prepend: Slurm script to prepend to the sbatch
87 num_parallel_jobs: The maximum number of jobs to run in parallel
88 base_dir: The base_dir of RunRunner where the sbatch scripts will be placed
89 run_on: On which platform to run the jobs. Default: Slurm.
91 Returns:
92 A RunRunner Run object.
93 """
94 if not self.check_requirements(verbose=True):
95 raise RuntimeError(
96 f"{self.name} is not installed. Please install {self.name} and try again."
97 )
98 # Add the configuration IDs to the dataframe with empty configurations
99 data_target.add_configuration(
100 str(scenario.solver.directory),
101 configuration_ids,
102 [{}] * len(configuration_ids),
103 )
104 data_target.save_csv()
105 # Submit the configuration job
106 runs = [
107 rrr.add_to_queue(
108 runner=run_on,
109 cmd=configuration_commands,
110 name=f"{self.name} {scenario.solver.name} on {scenario.instance_set.name}",
111 base_dir=base_dir,
112 output_path=output,
113 parallel_jobs=num_parallel_jobs,
114 sbatch_options=sbatch_options,
115 prepend=slurm_prepend,
116 )
117 ]
119 if validate_after:
120 validate = scenario.solver.run_performance_dataframe(
121 scenario.instance_set,
122 config_ids=configuration_ids,
123 performance_dataframe=data_target,
124 cutoff_time=scenario.solver_cutoff_time,
125 sbatch_options=sbatch_options,
126 slurm_prepend=slurm_prepend,
127 log_dir=scenario.validation,
128 base_dir=base_dir,
129 dependencies=runs,
130 job_name=f"{self.name}: Validating {len(configuration_ids)} "
131 f"{scenario.solver.name} Configurations on "
132 f"{scenario.instance_set.name}",
133 run_on=run_on,
134 )
135 runs.append(validate)
137 if run_on == Runner.LOCAL:
138 print(f"[{self.name}] Running {len(runs)} jobs locally...")
139 for run in runs:
140 run.wait()
141 print(f"[{self.name}] Finished running {len(runs)} jobs locally.")
142 return runs
144 @staticmethod
145 def organise_output(
146 output_source: Path,
147 output_target: Path,
148 scenario: ConfigurationScenario,
149 configuration_id: str,
150 ) -> None | str:
151 """Method to restructure and clean up after a single configurator call.
153 Args:
154 output_source: Path to the output file of the configurator run.
155 output_target: Path to the Performance DataFrame to store result.
156 scenario: ConfigurationScenario of the configuration.
157 configuration_id: ID (of the run) of the configuration.
158 """
159 raise NotImplementedError
161 @staticmethod
162 def save_configuration(
163 scenario: ConfigurationScenario,
164 configuration_id: str,
165 configuration: dict,
166 output_target: Path,
167 ) -> dict | None:
168 """Method to save a configuration to a file.
170 If the output_target is None, return the configuration.
172 Args:
173 scenario: ConfigurationScenario of the configuration. Should be removed.
174 configuration_id: ID (of the run) of the configuration.
175 configuration: Configuration to save.
176 output_target: Path to the Performance DataFrame to store result.
177 """
178 if output_target is None or not output_target.exists():
179 return configuration
180 # Save result to Performance DataFrame
181 from filelock import FileLock
183 lock = FileLock(f"{output_target}.lock")
184 with lock.acquire(timeout=600):
185 performance_data = PerformanceDataFrame(output_target)
186 # Resolve absolute path to Solver column
187 solver = [
188 s
189 for s in performance_data.solvers
190 if Path(s).name == scenario.solver.name
191 ][0]
192 # Update the configuration ID by adding the configuration
193 performance_data.add_configuration(
194 solver=solver,
195 configuration_id=configuration_id,
196 configuration=configuration,
197 )
198 performance_data.save_csv()
200 def get_status_from_logs(self: Configurator) -> None:
201 """Method to scan the log files of the configurator for warnings."""
202 raise NotImplementedError
205class ConfigurationScenario:
206 """Template class to handle a configuration scenarios."""
208 def __init__(
209 self: ConfigurationScenario,
210 solver: Solver,
211 instance_set: InstanceSet,
212 sparkle_objectives: list[SparkleObjective],
213 number_of_runs: int,
214 parent_directory: Path,
215 timestamp: str = None,
216 ) -> None:
217 """Initialize scenario paths and names.
219 Args:
220 solver: Solver that should be configured.
221 instance_set: Instances object for the scenario.
222 sparkle_objectives: Sparkle Objectives to optimize.
223 number_of_runs: The number of configurator runs to perform.
224 parent_directory: Directory in which the scenario should be placed.
225 timestamp: The timestamp of the scenario directory/file creation.
226 Only set when read from file, otherwise generated at time of creation.
227 """
228 self.solver = solver
229 self.instance_set = instance_set
230 self.sparkle_objectives = sparkle_objectives
231 self.number_of_runs = number_of_runs
232 self.parent_directory = parent_directory
233 self._timestamp = timestamp
234 self._ablation_scenario: AblationScenario = None
236 @property
237 def configurator(self: ConfigurationScenario) -> Configurator:
238 """Return the type of configurator the scenario belongs to."""
239 return Configurator
241 @property
242 def name(self: ConfigurationScenario) -> str:
243 """Return the name of the scenario."""
244 return f"{self.solver.name}_{self.instance_set.name}_{self.timestamp}"
246 @property
247 def timestamp(self: ConfigurationScenario) -> str:
248 """Return the timestamp."""
249 return self._timestamp
251 @property
252 def directory(self: ConfigurationScenario) -> Path:
253 """Return the path of the scenario directory."""
254 return None if self.timestamp is None else self.parent_directory / self.name
256 @property
257 def scenario_file_path(self: ConfigurationScenario) -> Path:
258 """Return the path of the scenario file."""
259 if self.directory:
260 return self.directory / "scenario.txt"
261 return None
263 @property
264 def validation(self: ConfigurationScenario) -> Path:
265 """Return the path of the validation directory."""
266 if self.directory:
267 return self.directory / "validation"
268 return None
270 @property
271 def tmp(self: ConfigurationScenario) -> Path:
272 """Return the path of the tmp directory."""
273 if self.directory:
274 return self.directory / "tmp"
275 return None
277 @property
278 def results_directory(self: ConfigurationScenario) -> Path:
279 """Return the path of the results directory."""
280 if self.directory:
281 return self.directory / "results"
282 return None
284 @property
285 def configuration_ids(self: ConfigurationScenario) -> list[str]:
286 """Return the IDs of the configurations for the scenario.
288 Only exists after the scenario has been created.
290 Returns:
291 List of configuration IDs, one for each run.
292 """
293 return [
294 f"{self.configurator.__name__}_{self.timestamp}_{i}"
295 for i in range(self.number_of_runs)
296 ]
298 @property
299 def ablation_scenario(self: ConfigurationScenario) -> AblationScenario:
300 """Return the ablation scenario for the scenario if it exists."""
301 if self._ablation_scenario is not None:
302 return self._ablation_scenario
303 for scenario in self.directory.glob("*/ablation_config.txt"):
304 self._ablation_scenario = AblationScenario.from_file(scenario, self)
305 return self._ablation_scenario
306 return None
308 def create_scenario(self: ConfigurationScenario) -> None:
309 """Create scenario with solver and instances in the parent directory.
311 This prepares all the necessary subdirectories related to configuration.
313 Args:
314 parent_directory: Directory in which the scenario should be created.
315 """
316 self._timestamp = datetime.now().strftime("%Y%m%d-%H%M")
317 # Prepare scenario directory
318 shutil.rmtree(self.directory, ignore_errors=True)
319 self.directory.mkdir(parents=True)
320 # Create empty directories as needed
321 self.tmp.mkdir(exist_ok=True)
322 self.validation.mkdir(exist_ok=True)
323 self.results_directory.mkdir(exist_ok=True)
325 def create_scenario_file(self: ConfigurationScenario) -> Path:
326 """Create a file with the configuration scenario."""
327 raise NotImplementedError
329 def serialise(self: ConfigurationScenario) -> dict:
330 """Serialize the configuration scenario."""
331 raise NotImplementedError
333 @classmethod
334 def find_scenario(
335 cls: ConfigurationScenario,
336 directory: Path,
337 solver: Solver,
338 instance_set: InstanceSet,
339 timestamp: str = None,
340 ) -> ConfigurationScenario:
341 """Resolve a scenario from a directory and Solver / Training set."""
342 if timestamp is None:
343 # Get the newest timestamp
344 timestamp_list: list[datetime] = []
345 for subdir in directory.iterdir():
346 if subdir.is_dir():
347 dir_timestamp = subdir.name.split("_")[-1]
348 try:
349 dir_timestamp = datetime.strptime(dir_timestamp, "%Y%m%d-%H%M")
350 timestamp_list.append(dir_timestamp)
351 except ValueError:
352 continue
354 if timestamp_list == []:
355 return None
356 timestamp = max(timestamp_list).strftime("%Y%m%d-%H%M")
358 scenario_name = f"{solver.name}_{instance_set.name}_{timestamp}"
359 path = directory / f"{scenario_name}" / "scenario.txt"
360 if not path.exists():
361 return None
362 return cls.from_file(path)
364 @staticmethod
365 def from_file(scenario_file: Path) -> ConfigurationScenario:
366 """Reads scenario file and initalises ConfigurationScenario."""
367 raise NotImplementedError
370class AblationScenario:
371 """Class for ablation analysis."""
373 # We use the SMAC2 target algorithm for solver output handling
374 configurator_target = (
375 Path(__file__).parent.resolve()
376 / "implementations"
377 / "SMAC2"
378 / "smac2_target_algorithm.py"
379 )
381 ablation_dir = Path(__file__).parent / "implementations" / "ablationAnalysis-0.9.4"
382 ablation_executable = ablation_dir / "ablationAnalysis"
383 ablation_validation_executable = ablation_dir / "ablationValidation"
385 def __init__(
386 self: AblationScenario,
387 configuration_scenario: ConfigurationScenario,
388 test_set: InstanceSet,
389 cutoff_length: str,
390 concurrent_clis: int,
391 best_configuration: dict,
392 ablation_racing: bool = False,
393 ) -> None:
394 """Initialize ablation scenario.
396 Args:
397 solver: Solver object
398 configuration_scenario: Configuration scenario
399 train_set: The training instance
400 test_set: The test instance
401 cutoff_length: The cutoff length for ablation analysis
402 concurrent_clis: The maximum number of concurrent jobs on a single node
403 best_configuration: The configuration to ablate from.
404 ablation_racing: Whether to use ablation racing
405 """
406 self.config_scenario = configuration_scenario
407 self.solver = configuration_scenario.solver
408 self.train_set = configuration_scenario.instance_set
409 self.test_set = test_set
410 self.cutoff_time = configuration_scenario.solver_cutoff_time
411 self.cutoff_length = cutoff_length
412 self.concurrent_clis = concurrent_clis
413 self.best_configuration = best_configuration
414 self.ablation_racing = ablation_racing
415 self.scenario_name = f"ablation_{configuration_scenario.name}"
416 self._table_file: Optional[Path] = None
417 if self.test_set is not None:
418 self.scenario_name += f"_{self.test_set.name}"
420 @property
421 def scenario_dir(self: AblationScenario) -> Path:
422 """Return the path of the scenario directory."""
423 if self.config_scenario.directory:
424 return self.config_scenario.directory / self.scenario_name
425 return None
427 @property
428 def tmp_dir(self: AblationScenario) -> Path:
429 """Return the path of the tmp directory."""
430 if self.scenario_dir:
431 return self.scenario_dir / "tmp"
432 return None
434 @property
435 def validation_dir(self: AblationScenario) -> Path:
436 """Return the path of the validation directory."""
437 if self.scenario_dir:
438 return self.scenario_dir / "validation"
439 return None
441 @property
442 def validation_dir_tmp(self: AblationScenario) -> Path:
443 """Return the path of the validation tmp directory."""
444 if self.validation_dir:
445 return self.validation_dir / "tmp"
446 return None
448 @property
449 def table_file(self: AblationScenario) -> Path:
450 """Return the path of the table file."""
451 if self._table_file:
452 return self._table_file
453 elif self.validation_dir:
454 return self.validation_dir / "log" / "ablation-validation-run1234.txt"
455 else:
456 return None
458 @staticmethod
459 def check_requirements(verbose: bool = False) -> bool:
460 """Check if Ablation Analysis is installed."""
461 import warnings
463 if no_java := shutil.which("java") is None:
464 if verbose:
465 warnings.warn(
466 "AblationAnalysis requires Java 1.8.0_402, but Java is not installed"
467 ". Please ensure Java is installed."
468 )
469 if no_exec := not AblationScenario.ablation_executable.exists():
470 if verbose:
471 warnings.warn(
472 "AblationAnalysis executable not found. Please ensure Ablation"
473 " Analysis is installed in the expected Path "
474 f"({AblationScenario.ablation_executable})."
475 )
476 if no_validation := not AblationScenario.ablation_validation_executable.exists():
477 if verbose:
478 warnings.warn(
479 "AblationAnalysis Validation executable not found. Please ensure "
480 "Ablation Analysis is installed in the expected Path "
481 f"({AblationScenario.ablation_validation_executable})."
482 )
483 return not (no_java or no_exec or no_validation)
485 @staticmethod
486 def download_requirements(
487 ablation_url: str = "https://github.com/ADA-research/Sparkle/raw/refs/heads/development"
488 "/Resources/Other/ablationAnalysis-0.9.4.zip",
489 ) -> None:
490 """Download Ablation Analysis executable."""
491 if AblationScenario.ablation_executable.exists():
492 return # Already installed
493 from urllib.request import urlopen
494 import zipfile
495 import io
497 AblationScenario.ablation_dir.mkdir(parents=True, exist_ok=True)
498 r = urlopen(ablation_url, timeout=60)
499 z = zipfile.ZipFile(io.BytesIO(r.read()))
500 z.extractall(AblationScenario.ablation_dir)
501 # Ensure execution rights
502 AblationScenario.ablation_executable.chmod(0o755)
503 AblationScenario.ablation_validation_executable.chmod(0o755)
505 def create_configuration_file(self: AblationScenario) -> Path:
506 """Create a configuration file for ablation analysis.
508 Returns:
509 Path to the created configuration file.
510 """
511 objective = self.config_scenario.sparkle_objectives[0]
512 pcs = self.solver.get_configuration_space()
513 parameter_names = [p.name for p in pcs.values()]
514 # We need to remove any redundant keys that are not in PCS
515 best_configuration = self.best_configuration.copy()
516 removable_keys = [
517 key for key in best_configuration if key not in parameter_names
518 ]
519 for key in removable_keys:
520 del best_configuration[key]
521 opt_config_str = " ".join([f"-{k} {v}" for k, v in best_configuration.items()])
522 # We need to check which params are missing and supplement with default values
523 for p in list(pcs.values()):
524 if p.name not in opt_config_str:
525 opt_config_str += f" -{p.name} {p.default_value}"
527 # Ablation cannot deal with E scientific notation in floats
528 ctx = decimal.Context(prec=16)
529 for config in opt_config_str.split(" -"):
530 _, value = config.strip().split(" ")
531 if "e" in value.lower():
532 value = value.strip("'")
533 float_value = float(value.lower())
534 formatted = format(ctx.create_decimal(float_value), "f")
535 opt_config_str = opt_config_str.replace(value, formatted)
537 smac_run_obj = "RUNTIME" if objective.time else "QUALITY"
538 objective_str = "MEAN10" if objective.time else "MEAN"
539 # Fetch the SMAC2 PCS file path
540 pcs_file_path = self.config_scenario.solver.get_pcs_file_type(PCSConvention.SMAC)
541 if not pcs_file_path:
542 raise ValueError(
543 "Could not find SMAC2 PCS file, which is required for ablation analysis."
544 )
545 pcs_file_path = pcs_file_path.absolute()
547 # Create config file
548 config_file = self.scenario_dir / "ablation_config.txt"
549 config = (
550 f'algo = "{AblationScenario.configurator_target.absolute()} '
551 f"{self.config_scenario.solver.directory.absolute()} "
552 f'{self.tmp_dir.absolute()} {objective}"\n'
553 f"execdir = {self.tmp_dir.absolute()}\n"
554 "experimentDir = ./\n"
555 f"deterministic = {1 if self.solver.deterministic else 0}\n"
556 f"run_obj = {smac_run_obj}\n"
557 f"overall_obj = {objective_str}\n"
558 f"cutoffTime = {self.cutoff_time}\n"
559 f"cutoff_length = {self.cutoff_length}\n"
560 f"cli-cores = {self.concurrent_clis}\n"
561 f"useRacing = {self.ablation_racing}\n"
562 f"seed = {random.randint(0, 2**32 - 1)}\n"
563 f"paramfile = {pcs_file_path}\n"
564 "instance_file = instances_train.txt\n"
565 "test_instance_file = instances_test.txt\n"
566 "sourceConfiguration = DEFAULT\n"
567 f'targetConfiguration = "{opt_config_str}"'
568 )
569 with config_file.open("w") as file:
570 file.write(config)
571 # Write config to validation directory
572 conf_valid = config.replace(
573 f"execdir = {self.tmp_dir.absolute()}\n",
574 f"execdir = {self.validation_dir_tmp.absolute()}\n",
575 )
576 with (self.validation_dir / config_file.name).open("w") as file:
577 file.write(conf_valid)
578 return self.validation_dir / config_file.name
580 def create_instance_file(self: AblationScenario, test: bool = False) -> Path:
581 """Create an instance file for ablation analysis."""
582 file_suffix = "_train.txt"
583 instance_set = self.train_set
584 if test:
585 file_suffix = "_test.txt"
586 instance_set = self.test_set if self.test_set is not None else self.train_set
587 # We give the Ablation script the paths of the instances
588 file_instance = self.scenario_dir / f"instances{file_suffix}"
589 with file_instance.open("w") as fh:
590 for instance in instance_set._instance_paths:
591 # We need to unpack the multi instance file paths in quotes
592 if isinstance(instance, list):
593 joined_instances = " ".join(
594 [str(file.absolute()) for file in instance]
595 )
596 fh.write(f"{joined_instances}\n")
597 else:
598 fh.write(f"{instance.absolute()}\n")
599 # Copy to validation directory
600 shutil.copyfile(file_instance, self.validation_dir / file_instance.name)
601 return file_instance
603 def create_scenario(self: AblationScenario, override_dirs: bool = False) -> None:
604 """Create scenario directory and files."""
605 if self.scenario_dir.exists():
606 print("WARNING: Found existing ablation scenario.")
607 if not override_dirs:
608 print("Set override to True to overwrite existing scenario.")
609 return
610 print("Overwriting existing scenario...")
611 shutil.rmtree(self.scenario_dir)
612 self.tmp_dir.mkdir(parents=True, exist_ok=True)
613 self.validation_dir_tmp.mkdir(parents=True, exist_ok=True)
614 self.create_instance_file()
615 self.create_instance_file(test=True)
616 self.create_configuration_file()
618 def check_for_ablation(self: AblationScenario) -> bool:
619 """Checks if ablation has terminated successfully."""
620 if not self.table_file.is_file():
621 return False
622 # First line in the table file should be "Ablation analysis validation complete."
623 table_line = self.table_file.open().readline().strip()
624 return table_line == "Ablation analysis validation complete."
626 def read_ablation_table(self: AblationScenario) -> list[list[str]]:
627 """Read from ablation table of a scenario."""
628 if not self.check_for_ablation():
629 # No ablation table exists for this solver-instance pair
630 return []
631 results = [
632 [
633 "Round",
634 "Flipped parameter",
635 "Source value",
636 "Target value",
637 "Validation result",
638 ]
639 ]
641 for line in self.table_file.open().readlines():
642 # Pre-process lines from the ablation file and add to the results dictionary.
643 # Sometimes ablation rounds switch multiple parameters at once.
644 # EXAMPLE: 2 EDR, EDRalpha 0, 0.1 1, 0.1013241633106732 486.31691
645 # To split the row correctly, we remove the space before the comma separated
646 # parameters and add it back.
647 # T.S. 30-01-2024: the results object is a nested list not dictionary?
648 values = re.sub(r"\s+", " ", line.strip())
649 values = re.sub(r", ", ",", values)
650 values = [val.replace(",", ", ") for val in values.split(" ")]
651 if len(values) == 5:
652 results.append(values)
653 return results
655 def submit_ablation(
656 self: AblationScenario,
657 log_dir: Path,
658 sbatch_options: list[str] = [],
659 slurm_prepend: str | list[str] | Path = None,
660 run_on: Runner = Runner.SLURM,
661 ) -> list[Run]:
662 """Submit an ablation job.
664 Args:
665 log_dir: Directory to store job logs
666 sbatch_options: Options to pass to sbatch
667 slurm_prepend: Script to prepend to sbatch script
668 run_on: Determines to which RunRunner queue the job is added
670 Returns:
671 A list of Run objects. Empty when running locally.
672 """
673 if not self.check_requirements(verbose=True):
674 raise RuntimeError(
675 "Ablation Analysis is not available. Please ensure Java and Ablation "
676 "Analysis is installed and try again."
677 )
678 # 1. submit the ablation to the runrunner queue
679 cmd = (
680 f"{AblationScenario.ablation_executable.absolute()} "
681 "--optionFile ablation_config.txt"
682 )
683 srun_options = ["-N1", "-n1", f"-c{self.concurrent_clis}"]
684 sbatch_options += [f"--cpus-per-task={self.concurrent_clis}"]
685 run_ablation = rrr.add_to_queue(
686 runner=run_on,
687 cmd=cmd,
688 name=f"Ablation analysis {self.solver.name} on {self.train_set.name}",
689 base_dir=log_dir,
690 path=self.scenario_dir,
691 sbatch_options=sbatch_options,
692 srun_options=srun_options,
693 prepend=slurm_prepend,
694 )
696 runs = []
697 if run_on == Runner.LOCAL:
698 run_ablation.wait()
699 runs.append(run_ablation)
701 # 2. Run ablation validation run if we have a test set to run on
702 if self.test_set is not None:
703 # Validation dir should have a copy of all needed files, except for the
704 # output of the ablation run, which is stored in ablation-run[seed].txt
705 cmd = (
706 f"{AblationScenario.ablation_validation_executable.absolute()} "
707 "--optionFile ablation_config.txt "
708 "--ablationLogFile ../log/ablation-run1234.txt"
709 )
711 run_ablation_validation = rrr.add_to_queue(
712 runner=run_on,
713 cmd=cmd,
714 name=f"Ablation validation Test set {self.test_set.name}",
715 path=self.validation_dir,
716 base_dir=log_dir,
717 dependencies=run_ablation,
718 sbatch_options=sbatch_options,
719 prepend=slurm_prepend,
720 )
722 if run_on == Runner.LOCAL:
723 run_ablation_validation.wait()
724 runs.append(run_ablation_validation)
725 return runs
727 @staticmethod
728 def from_file(
729 path: Path, config_scenario: ConfigurationScenario
730 ) -> AblationScenario:
731 """Reads scenario file and initalises AblationScenario."""
732 variables = {}
733 for line in path.open().readlines():
734 if line.strip() == "":
735 continue
736 key, value = line.strip().split(" = ", maxsplit=1)
737 variables[key] = value
738 best_conf = {}
739 for keyvalue in variables["targetConfiguration"].replace('"', "").split("-"):
740 keyvalue = keyvalue.strip()
741 if keyvalue:
742 key, value = keyvalue.strip().split(" ", maxsplit=1)
743 best_conf[key] = value
744 test_set = None
745 if (path.parent / "instances_test.txt").exists():
746 test_path = (path.parent / "instances_test.txt").open().readline().strip()
747 test_path = Path(test_path).parent
748 if test_path != config_scenario.instance_set.directory:
749 test_set = Instance_Set(test_path)
750 return AblationScenario(
751 config_scenario,
752 test_set,
753 variables["cutoff_length"],
754 int(variables["cli-cores"]),
755 best_conf,
756 ablation_racing=bool(variables["useRacing"]),
757 )