Coverage for sparkle/configurator/configurator.py: 71%
333 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-29 10:17 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-29 10:17 +0000
1"""Configurator class to use different algorithm configurators."""
3from __future__ import annotations
4import re
5import shutil
6import decimal
7from pathlib import Path
8from datetime import datetime
9from typing import Optional
10import random
12import runrunner as rrr
13from runrunner import Runner, Run
15from sparkle.solver import Solver
16from sparkle.instance import InstanceSet, Instance_Set
17from sparkle.structures import PerformanceDataFrame
18from sparkle.types import SparkleObjective
21class Configurator:
22 """Abstact class to use different configurators like SMAC."""
24 configurator_cli_path = Path(__file__).parent.resolve() / "configurator_cli.py"
26 full_name = "Configurator Abstract Class"
27 version = "NaN"
29 def __init__(self: Configurator, multi_objective_support: bool = False) -> None:
30 """Initialize Configurator.
32 Args:
33 multi_objective_support: Whether the configurator supports
34 multi objective optimization for solvers.
35 """
36 self.multiobjective = multi_objective_support
38 @property
39 def name(self: Configurator) -> str:
40 """Return the name of the configurator."""
41 return self.__class__.__name__
43 @staticmethod
44 def scenario_class() -> ConfigurationScenario:
45 """Return the scenario class of the configurator."""
46 return ConfigurationScenario
48 @staticmethod
49 def check_requirements(verbose: bool = False) -> bool:
50 """Check if the configurator is installed."""
51 raise NotImplementedError
53 @staticmethod
54 def download_requirements() -> None:
55 """Download the configurator."""
56 raise NotImplementedError
58 def configure(
59 self: Configurator,
60 configuration_commands: list[str],
61 data_target: PerformanceDataFrame,
62 output: Path,
63 scenario: ConfigurationScenario,
64 configuration_ids: list[str] = None,
65 validate_after: bool = True,
66 sbatch_options: list[str] = None,
67 slurm_prepend: str | list[str] | Path = None,
68 num_parallel_jobs: int = None,
69 base_dir: Path = None,
70 run_on: Runner = Runner.SLURM,
71 ) -> Run:
72 """Start configuration job.
74 This method is shared by the configurators and should be called by the
75 implementation/subclass of the configurator.
77 Args:
78 configuration_commands: List of configurator commands to execute
79 data_target: Performance data to store the results.
80 output: Output directory.
81 scenario: ConfigurationScenario to execute.
82 configuration_ids: List of configuration ids that are to be created
83 validate_after: Whether the configurations should be validated
84 sbatch_options: List of slurm batch options to use
85 slurm_prepend: Slurm script to prepend to the sbatch
86 num_parallel_jobs: The maximum number of jobs to run in parallel
87 base_dir: The base_dir of RunRunner where the sbatch scripts will be placed
88 run_on: On which platform to run the jobs. Default: Slurm.
90 Returns:
91 A RunRunner Run object.
92 """
93 if not self.check_requirements(verbose=True):
94 raise RuntimeError(
95 f"{self.name} is not installed. Please install {self.name} and try again."
96 )
97 # Add the configuration IDs to the dataframe with empty configurations
98 data_target.add_configuration(
99 str(scenario.solver.directory),
100 configuration_ids,
101 [{}] * len(configuration_ids),
102 )
103 data_target.save_csv()
104 # Submit the configuration job
105 runs = [
106 rrr.add_to_queue(
107 runner=run_on,
108 cmd=configuration_commands,
109 name=f"{self.name}: {scenario.solver.name} on {scenario.instance_set.name}",
110 base_dir=base_dir,
111 output_path=output,
112 parallel_jobs=num_parallel_jobs,
113 sbatch_options=sbatch_options,
114 prepend=slurm_prepend,
115 )
116 ]
118 if validate_after:
119 validate = scenario.solver.run_performance_dataframe(
120 scenario.instance_set,
121 config_ids=configuration_ids,
122 performance_dataframe=data_target,
123 cutoff_time=scenario.solver_cutoff_time,
124 sbatch_options=sbatch_options,
125 slurm_prepend=slurm_prepend,
126 log_dir=scenario.validation,
127 base_dir=base_dir,
128 dependencies=runs,
129 job_name=f"{self.name}: Validating {len(configuration_ids)} "
130 f"{scenario.solver.name} Configurations on "
131 f"{scenario.instance_set.name}",
132 run_on=run_on,
133 )
134 runs.append(validate)
136 if run_on == Runner.LOCAL:
137 print(f"[{self.name}] Running {len(runs)} jobs locally...")
138 for run in runs:
139 run.wait()
140 print(f"[{self.name}] Finished running {len(runs)} jobs locally.")
141 return runs
143 @staticmethod
144 def organise_output(
145 output_source: Path,
146 output_target: Path,
147 scenario: ConfigurationScenario,
148 configuration_id: str,
149 ) -> None | str:
150 """Method to restructure and clean up after a single configurator call.
152 Args:
153 output_source: Path to the output file of the configurator run.
154 output_target: Path to the Performance DataFrame to store result.
155 scenario: ConfigurationScenario of the configuration.
156 configuration_id: ID (of the run) of the configuration.
157 """
158 raise NotImplementedError
160 @staticmethod
161 def save_configuration(
162 scenario: ConfigurationScenario,
163 configuration_id: str,
164 configuration: dict,
165 output_target: Path,
166 ) -> dict | None:
167 """Method to save a configuration to a file.
169 If the output_target is None, return the configuration.
171 Args:
172 scenario: ConfigurationScenario of the configuration. Should be removed.
173 configuration_id: ID (of the run) of the configuration.
174 configuration: Configuration to save.
175 output_target: Path to the Performance DataFrame to store result.
176 """
177 if output_target is None or not output_target.exists():
178 return configuration
179 # Save result to Performance DataFrame
180 from filelock import FileLock
182 lock = FileLock(f"{output_target}.lock")
183 with lock.acquire(timeout=600):
184 performance_data = PerformanceDataFrame(output_target)
185 # Resolve absolute path to Solver column
186 solver = [
187 s
188 for s in performance_data.solvers
189 if Path(s).name == scenario.solver.name
190 ][0]
191 # Update the configuration ID by adding the configuration
192 performance_data.add_configuration(
193 solver=solver,
194 configuration_id=configuration_id,
195 configuration=configuration,
196 )
197 performance_data.save_csv()
199 def get_status_from_logs(self: Configurator) -> None:
200 """Method to scan the log files of the configurator for warnings."""
201 raise NotImplementedError
204class ConfigurationScenario:
205 """Template class to handle a configuration scenarios."""
207 def __init__(
208 self: ConfigurationScenario,
209 solver: Solver,
210 instance_set: InstanceSet,
211 sparkle_objectives: list[SparkleObjective],
212 number_of_runs: int,
213 parent_directory: Path,
214 timestamp: str = None,
215 ) -> None:
216 """Initialize scenario paths and names.
218 Args:
219 solver: Solver that should be configured.
220 instance_set: Instances object for the scenario.
221 sparkle_objectives: Sparkle Objectives to optimize.
222 number_of_runs: The number of configurator runs to perform.
223 parent_directory: Directory in which the scenario should be placed.
224 timestamp: The timestamp of the scenario directory/file creation.
225 Only set when read from file, otherwise generated at time of creation.
226 """
227 self.solver = solver
228 self.instance_set = instance_set
229 self.sparkle_objectives = sparkle_objectives
230 self.number_of_runs = number_of_runs
231 self.parent_directory = parent_directory
232 self._timestamp = timestamp
233 self._ablation_scenario: AblationScenario = None
235 @property
236 def configurator(self: ConfigurationScenario) -> Configurator:
237 """Return the type of configurator the scenario belongs to."""
238 return Configurator
240 @property
241 def name(self: ConfigurationScenario) -> str:
242 """Return the name of the scenario."""
243 return f"{self.solver.name}_{self.instance_set.name}_{self.timestamp}"
245 @property
246 def timestamp(self: ConfigurationScenario) -> str:
247 """Return the timestamp."""
248 return self._timestamp
250 @property
251 def directory(self: ConfigurationScenario) -> Path:
252 """Return the path of the scenario directory."""
253 return None if self.timestamp is None else self.parent_directory / self.name
255 @property
256 def scenario_file_path(self: ConfigurationScenario) -> Path:
257 """Return the path of the scenario file."""
258 if self.directory:
259 return self.directory / "scenario.txt"
260 return None
262 @property
263 def validation(self: ConfigurationScenario) -> Path:
264 """Return the path of the validation directory."""
265 if self.directory:
266 return self.directory / "validation"
267 return None
269 @property
270 def tmp(self: ConfigurationScenario) -> Path:
271 """Return the path of the tmp directory."""
272 if self.directory:
273 return self.directory / "tmp"
274 return None
276 @property
277 def results_directory(self: ConfigurationScenario) -> Path:
278 """Return the path of the results directory."""
279 if self.directory:
280 return self.directory / "results"
281 return None
283 @property
284 def configuration_ids(self: ConfigurationScenario) -> list[str]:
285 """Return the IDs of the configurations for the scenario.
287 Only exists after the scenario has been created.
289 Returns:
290 List of configuration IDs, one for each run.
291 """
292 return [
293 f"{self.configurator.__name__}_{self.timestamp}_{i}"
294 for i in range(self.number_of_runs)
295 ]
297 @property
298 def ablation_scenario(self: ConfigurationScenario) -> AblationScenario:
299 """Return the ablation scenario for the scenario if it exists."""
300 if self._ablation_scenario is not None:
301 return self._ablation_scenario
302 for scenario in self.directory.glob("*/ablation_config.txt"):
303 self._ablation_scenario = AblationScenario.from_file(scenario, self)
304 return self._ablation_scenario
305 return None
307 def create_scenario(self: ConfigurationScenario) -> None:
308 """Create scenario with solver and instances in the parent directory.
310 This prepares all the necessary subdirectories related to configuration.
312 Args:
313 parent_directory: Directory in which the scenario should be created.
314 """
315 self._timestamp = datetime.now().strftime("%Y%m%d-%H%M")
316 # Prepare scenario directory
317 shutil.rmtree(self.directory, ignore_errors=True)
318 self.directory.mkdir(parents=True)
319 # Create empty directories as needed
320 self.tmp.mkdir(exist_ok=True)
321 self.validation.mkdir(exist_ok=True)
322 self.results_directory.mkdir(exist_ok=True)
324 def create_scenario_file(self: ConfigurationScenario) -> Path:
325 """Create a file with the configuration scenario."""
326 raise NotImplementedError
328 def serialise(self: ConfigurationScenario) -> dict:
329 """Serialize the configuration scenario."""
330 raise NotImplementedError
332 @classmethod
333 def find_scenario(
334 cls: ConfigurationScenario,
335 directory: Path,
336 solver: Solver,
337 instance_set: InstanceSet,
338 timestamp: str = None,
339 ) -> ConfigurationScenario:
340 """Resolve a scenario from a directory and Solver / Training set."""
341 if timestamp is None:
342 # Get the newest timestamp
343 timestamp_list: list[datetime] = []
344 for subdir in directory.iterdir():
345 if subdir.is_dir():
346 dir_timestamp = subdir.name.split("_")[-1]
347 try:
348 dir_timestamp = datetime.strptime(dir_timestamp, "%Y%m%d-%H%M")
349 timestamp_list.append(dir_timestamp)
350 except ValueError:
351 continue
353 if timestamp_list == []:
354 return None
355 timestamp = max(timestamp_list).strftime("%Y%m%d-%H%M")
357 scenario_name = f"{solver.name}_{instance_set.name}_{timestamp}"
358 path = directory / f"{scenario_name}" / "scenario.txt"
359 if not path.exists():
360 return None
361 return cls.from_file(path)
363 @staticmethod
364 def from_file(scenario_file: Path) -> ConfigurationScenario:
365 """Reads scenario file and initalises ConfigurationScenario."""
366 raise NotImplementedError
369class AblationScenario:
370 """Class for ablation analysis."""
372 # We use the SMAC2 target algorithm for solver output handling
373 configurator_target = (
374 Path(__file__).parent.resolve()
375 / "implementations"
376 / "SMAC2"
377 / "smac2_target_algorithm.py"
378 )
380 ablation_dir = Path(__file__).parent / "implementations" / "ablationAnalysis-0.9.4"
381 ablation_executable = ablation_dir / "ablationAnalysis"
382 ablation_validation_executable = ablation_dir / "ablationValidation"
384 def __init__(
385 self: AblationScenario,
386 configuration_scenario: ConfigurationScenario,
387 test_set: InstanceSet,
388 cutoff_length: str,
389 concurrent_clis: int,
390 best_configuration: dict,
391 ablation_racing: bool = False,
392 ) -> None:
393 """Initialize ablation scenario.
395 Args:
396 solver: Solver object
397 configuration_scenario: Configuration scenario
398 train_set: The training instance
399 test_set: The test instance
400 cutoff_length: The cutoff length for ablation analysis
401 concurrent_clis: The maximum number of concurrent jobs on a single node
402 best_configuration: The configuration to ablate from.
403 ablation_racing: Whether to use ablation racing
404 """
405 self.config_scenario = configuration_scenario
406 self.solver = configuration_scenario.solver
407 self.train_set = configuration_scenario.instance_set
408 self.test_set = test_set
409 self.cutoff_time = configuration_scenario.solver_cutoff_time
410 self.cutoff_length = cutoff_length
411 self.concurrent_clis = concurrent_clis
412 self.best_configuration = best_configuration
413 self.ablation_racing = ablation_racing
414 self.scenario_name = f"ablation_{configuration_scenario.name}"
415 self._table_file: Optional[Path] = None
416 if self.test_set is not None:
417 self.scenario_name += f"_{self.test_set.name}"
419 @property
420 def scenario_dir(self: AblationScenario) -> Path:
421 """Return the path of the scenario directory."""
422 if self.config_scenario.directory:
423 return self.config_scenario.directory / self.scenario_name
424 return None
426 @property
427 def tmp_dir(self: AblationScenario) -> Path:
428 """Return the path of the tmp directory."""
429 if self.scenario_dir:
430 return self.scenario_dir / "tmp"
431 return None
433 @property
434 def validation_dir(self: AblationScenario) -> Path:
435 """Return the path of the validation directory."""
436 if self.scenario_dir:
437 return self.scenario_dir / "validation"
438 return None
440 @property
441 def validation_dir_tmp(self: AblationScenario) -> Path:
442 """Return the path of the validation tmp directory."""
443 if self.validation_dir:
444 return self.validation_dir / "tmp"
445 return None
447 @property
448 def table_file(self: AblationScenario) -> Path:
449 """Return the path of the table file."""
450 if self._table_file:
451 return self._table_file
452 elif self.validation_dir:
453 return self.validation_dir / "log" / "ablation-validation-run1234.txt"
454 else:
455 return None
457 @staticmethod
458 def check_requirements(verbose: bool = False) -> bool:
459 """Check if Ablation Analysis is installed."""
460 import warnings
462 if no_java := shutil.which("java") is None:
463 if verbose:
464 warnings.warn(
465 "AblationAnalysis requires Java 1.8.0_402, but Java is not installed"
466 ". Please ensure Java is installed."
467 )
468 if no_exec := not AblationScenario.ablation_executable.exists():
469 if verbose:
470 warnings.warn(
471 "AblationAnalysis executable not found. Please ensure Ablation"
472 " Analysis is installed in the expected Path "
473 f"({AblationScenario.ablation_executable})."
474 )
475 if no_validation := not AblationScenario.ablation_validation_executable.exists():
476 if verbose:
477 warnings.warn(
478 "AblationAnalysis Validation executable not found. Please ensure "
479 "Ablation Analysis is installed in the expected Path "
480 f"({AblationScenario.ablation_validation_executable})."
481 )
482 return not (no_java or no_exec or no_validation)
484 @staticmethod
485 def download_requirements(
486 ablation_url: str = "https://github.com/ADA-research/Sparkle/raw/refs/heads/development"
487 "/Resources/Other/ablationAnalysis-0.9.4.zip",
488 ) -> None:
489 """Download Ablation Analysis executable."""
490 if AblationScenario.ablation_executable.exists():
491 return # Already installed
492 from urllib.request import urlopen
493 import zipfile
494 import io
496 AblationScenario.ablation_dir.mkdir(parents=True, exist_ok=True)
497 r = urlopen(ablation_url, timeout=60)
498 z = zipfile.ZipFile(io.BytesIO(r.read()))
499 z.extractall(AblationScenario.ablation_dir)
500 # Ensure execution rights
501 AblationScenario.ablation_executable.chmod(0o755)
502 AblationScenario.ablation_validation_executable.chmod(0o755)
504 def create_configuration_file(self: AblationScenario) -> Path:
505 """Create a configuration file for ablation analysis.
507 Returns:
508 Path to the created configuration file.
509 """
510 objective = self.config_scenario.sparkle_objectives[0]
511 pcs = self.solver.get_configuration_space()
512 parameter_names = [p.name for p in pcs.values()]
513 # We need to remove any redundant keys that are not in PCS
514 best_configuration = self.best_configuration.copy()
515 removable_keys = [
516 key for key in best_configuration if key not in parameter_names
517 ]
518 for key in removable_keys:
519 del best_configuration[key]
520 opt_config_str = " ".join([f"-{k} {v}" for k, v in best_configuration.items()])
521 # We need to check which params are missing and supplement with default values
522 for p in list(pcs.values()):
523 if p.name not in opt_config_str:
524 opt_config_str += f" -{p.name} {p.default_value}"
526 # Ablation cannot deal with E scientific notation in floats
527 ctx = decimal.Context(prec=16)
528 for config in opt_config_str.split(" -"):
529 _, value = config.strip().split(" ")
530 if "e" in value.lower():
531 value = value.strip("'")
532 float_value = float(value.lower())
533 formatted = format(ctx.create_decimal(float_value), "f")
534 opt_config_str = opt_config_str.replace(value, formatted)
536 smac_run_obj = "RUNTIME" if objective.time else "QUALITY"
537 objective_str = "MEAN10" if objective.time else "MEAN"
538 pcs_file_path = f"{self.config_scenario.solver.pcs_file.absolute()}"
540 # Create config file
541 config_file = self.scenario_dir / "ablation_config.txt"
542 config = (
543 f'algo = "{AblationScenario.configurator_target.absolute()} '
544 f"{self.config_scenario.solver.directory.absolute()} "
545 f'{self.tmp_dir.absolute()} {objective}"\n'
546 f"execdir = {self.tmp_dir.absolute()}\n"
547 "experimentDir = ./\n"
548 f"deterministic = {1 if self.solver.deterministic else 0}\n"
549 f"run_obj = {smac_run_obj}\n"
550 f"overall_obj = {objective_str}\n"
551 f"cutoffTime = {self.cutoff_time}\n"
552 f"cutoff_length = {self.cutoff_length}\n"
553 f"cli-cores = {self.concurrent_clis}\n"
554 f"useRacing = {self.ablation_racing}\n"
555 f"seed = {random.randint(0, 2**32 - 1)}\n"
556 f"paramfile = {pcs_file_path}\n"
557 "instance_file = instances_train.txt\n"
558 "test_instance_file = instances_test.txt\n"
559 "sourceConfiguration = DEFAULT\n"
560 f'targetConfiguration = "{opt_config_str}"'
561 )
562 config_file.open("w").write(config)
563 # Write config to validation directory
564 conf_valid = config.replace(
565 f"execdir = {self.tmp_dir.absolute()}\n",
566 f"execdir = {self.validation_dir_tmp.absolute()}\n",
567 )
568 (self.validation_dir / config_file.name).open("w").write(conf_valid)
569 return self.validation_dir / config_file.name
571 def create_instance_file(self: AblationScenario, test: bool = False) -> Path:
572 """Create an instance file for ablation analysis."""
573 file_suffix = "_train.txt"
574 instance_set = self.train_set
575 if test:
576 file_suffix = "_test.txt"
577 instance_set = self.test_set if self.test_set is not None else self.train_set
578 # We give the Ablation script the paths of the instances
579 file_instance = self.scenario_dir / f"instances{file_suffix}"
580 with file_instance.open("w") as fh:
581 for instance in instance_set._instance_paths:
582 # We need to unpack the multi instance file paths in quotes
583 if isinstance(instance, list):
584 joined_instances = " ".join(
585 [str(file.absolute()) for file in instance]
586 )
587 fh.write(f"{joined_instances}\n")
588 else:
589 fh.write(f"{instance.absolute()}\n")
590 # Copy to validation directory
591 shutil.copyfile(file_instance, self.validation_dir / file_instance.name)
592 return file_instance
594 def create_scenario(self: AblationScenario, override_dirs: bool = False) -> None:
595 """Create scenario directory and files."""
596 if self.scenario_dir.exists():
597 print("WARNING: Found existing ablation scenario.")
598 if not override_dirs:
599 print("Set override to True to overwrite existing scenario.")
600 return
601 print("Overwriting existing scenario...")
602 shutil.rmtree(self.scenario_dir)
603 self.tmp_dir.mkdir(parents=True, exist_ok=True)
604 self.validation_dir_tmp.mkdir(parents=True, exist_ok=True)
605 self.create_instance_file()
606 self.create_instance_file(test=True)
607 self.create_configuration_file()
609 def check_for_ablation(self: AblationScenario) -> bool:
610 """Checks if ablation has terminated successfully."""
611 if not self.table_file.is_file():
612 return False
613 # First line in the table file should be "Ablation analysis validation complete."
614 table_line = self.table_file.open().readline().strip()
615 return table_line == "Ablation analysis validation complete."
617 def read_ablation_table(self: AblationScenario) -> list[list[str]]:
618 """Read from ablation table of a scenario."""
619 if not self.check_for_ablation():
620 # No ablation table exists for this solver-instance pair
621 return []
622 results = [
623 [
624 "Round",
625 "Flipped parameter",
626 "Source value",
627 "Target value",
628 "Validation result",
629 ]
630 ]
632 for line in self.table_file.open().readlines():
633 # Pre-process lines from the ablation file and add to the results dictionary.
634 # Sometimes ablation rounds switch multiple parameters at once.
635 # EXAMPLE: 2 EDR, EDRalpha 0, 0.1 1, 0.1013241633106732 486.31691
636 # To split the row correctly, we remove the space before the comma separated
637 # parameters and add it back.
638 # T.S. 30-01-2024: the results object is a nested list not dictionary?
639 values = re.sub(r"\s+", " ", line.strip())
640 values = re.sub(r", ", ",", values)
641 values = [val.replace(",", ", ") for val in values.split(" ")]
642 if len(values) == 5:
643 results.append(values)
644 return results
646 def submit_ablation(
647 self: AblationScenario,
648 log_dir: Path,
649 sbatch_options: list[str] = [],
650 slurm_prepend: str | list[str] | Path = None,
651 run_on: Runner = Runner.SLURM,
652 ) -> list[Run]:
653 """Submit an ablation job.
655 Args:
656 log_dir: Directory to store job logs
657 sbatch_options: Options to pass to sbatch
658 slurm_prepend: Script to prepend to sbatch script
659 run_on: Determines to which RunRunner queue the job is added
661 Returns:
662 A list of Run objects. Empty when running locally.
663 """
664 if not self.check_requirements(verbose=True):
665 raise RuntimeError(
666 "Ablation Analysis is not available. Please ensure Java and Ablation "
667 "Analysis is installed and try again."
668 )
669 # 1. submit the ablation to the runrunner queue
670 cmd = (
671 f"{AblationScenario.ablation_executable.absolute()} "
672 "--optionFile ablation_config.txt"
673 )
674 srun_options = ["-N1", "-n1", f"-c{self.concurrent_clis}"]
675 sbatch_options += [f"--cpus-per-task={self.concurrent_clis}"]
676 run_ablation = rrr.add_to_queue(
677 runner=run_on,
678 cmd=cmd,
679 name=f"Ablation analysis: {self.solver.name} on {self.train_set.name}",
680 base_dir=log_dir,
681 path=self.scenario_dir,
682 sbatch_options=sbatch_options,
683 srun_options=srun_options,
684 prepend=slurm_prepend,
685 )
687 runs = []
688 if run_on == Runner.LOCAL:
689 run_ablation.wait()
690 runs.append(run_ablation)
692 # 2. Run ablation validation run if we have a test set to run on
693 if self.test_set is not None:
694 # Validation dir should have a copy of all needed files, except for the
695 # output of the ablation run, which is stored in ablation-run[seed].txt
696 cmd = (
697 f"{AblationScenario.ablation_validation_executable.absolute()} "
698 "--optionFile ablation_config.txt "
699 "--ablationLogFile ../log/ablation-run1234.txt"
700 )
702 run_ablation_validation = rrr.add_to_queue(
703 runner=run_on,
704 cmd=cmd,
705 name=f"Ablation validation: Test set {self.test_set.name}",
706 path=self.validation_dir,
707 base_dir=log_dir,
708 dependencies=run_ablation,
709 sbatch_options=sbatch_options,
710 prepend=slurm_prepend,
711 )
713 if run_on == Runner.LOCAL:
714 run_ablation_validation.wait()
715 runs.append(run_ablation_validation)
716 return runs
718 @staticmethod
719 def from_file(
720 path: Path, config_scenario: ConfigurationScenario
721 ) -> AblationScenario:
722 """Reads scenario file and initalises AblationScenario."""
723 variables = {}
724 for line in path.open().readlines():
725 if line.strip() == "":
726 continue
727 key, value = line.strip().split(" = ", maxsplit=1)
728 variables[key] = value
729 best_conf = {}
730 for keyvalue in variables["targetConfiguration"].replace('"', "").split("-"):
731 keyvalue = keyvalue.strip()
732 if keyvalue:
733 key, value = keyvalue.strip().split(" ", maxsplit=1)
734 best_conf[key] = value
735 test_set = None
736 if (path.parent / "instances_test.txt").exists():
737 test_path = (path.parent / "instances_test.txt").open().readline().strip()
738 test_path = Path(test_path).parent
739 if test_path != config_scenario.instance_set.directory:
740 test_set = Instance_Set(test_path)
741 return AblationScenario(
742 config_scenario,
743 test_set,
744 variables["cutoff_length"],
745 int(variables["cli-cores"]),
746 best_conf,
747 ablation_racing=bool(variables["useRacing"]),
748 )