Coverage for sparkle/configurator/ablation.py: 85%
108 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-07 15:22 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-07 15:22 +0000
1#!/usr/bin/env python3
2# -*- coding: UTF-8 -*-
3"""Helper functions for ablation analysis."""
4from __future__ import annotations
5import re
6import shutil
7import decimal
8from pathlib import Path
10import runrunner as rrr
11from runrunner.base import Runner, Run
13from sparkle.configurator import ConfigurationScenario
14from sparkle.instance import InstanceSet
17class AblationScenario:
18 """Class for ablation analysis."""
20 # We use the SMAC2 target algorithm for solver output handling
21 configurator_target = Path(__file__).parent.parent.resolve() /\
22 "Components" / "smac2-v2.10.03-master-778" / "smac2_target_algorithm.py"
24 ablation_dir = Path(__file__).parent.parent / "Components" /\
25 "ablationAnalysis-0.9.4"
26 ablation_executable = ablation_dir / "ablationAnalysis"
27 ablation_validation_executable = ablation_dir / "ablationValidation"
29 def __init__(self: AblationScenario,
30 configuration_scenario: ConfigurationScenario,
31 test_set: InstanceSet,
32 output_dir: Path,
33 override_dirs: bool = False) -> None:
34 """Initialize ablation scenario.
36 Args:
37 solver: Solver object
38 configuration_scenario: Configuration scenario
39 train_set: The training instance
40 test_set: The test instance
41 output_dir: The output directory
42 override_dirs: Whether to clean the scenario directory if it already exists
43 """
44 self.config_scenario = configuration_scenario
45 self.solver = configuration_scenario.solver
46 self.train_set = configuration_scenario.instance_set
47 self.test_set = test_set
48 self.output_dir = output_dir
49 self.scenario_name = configuration_scenario.name
50 if self.test_set is not None:
51 self.scenario_name += f"_{self.test_set.name}"
52 self.scenario_dir = self.output_dir / self.scenario_name
53 if override_dirs and self.scenario_dir.exists():
54 print("Warning: found existing ablation scenario. This will be removed.")
55 shutil.rmtree(self.scenario_dir)
57 # Create required scenario directories
58 self.tmp_dir = self.scenario_dir / "tmp"
59 self.tmp_dir.mkdir(parents=True, exist_ok=True)
61 self.validation_dir = self.scenario_dir / "validation"
62 self.validation_dir_tmp = self.validation_dir / "tmp"
63 self.validation_dir_tmp.mkdir(parents=True, exist_ok=True)
64 self.table_file = self.validation_dir / "log" / "ablation-validation-run1234.txt"
66 def create_configuration_file(self: AblationScenario,
67 cutoff_time: int,
68 cutoff_length: str,
69 concurrent_clis: int,
70 best_configuration: dict,
71 ablation_racing: bool = False) -> None:
72 """Create a configuration file for ablation analysis.
74 Args:
75 cutoff_time: The cutoff time for ablation analysis
76 cutoff_length: The cutoff length for ablation analysis
77 concurrent_clis: The maximum number of concurrent jobs on a single node
79 Returns:
80 None
81 """
82 self.concurrent_clis = concurrent_clis
83 ablation_scenario_dir = self.scenario_dir
84 objective = self.config_scenario.sparkle_objective
85 pcs = self.solver.get_pcs()
86 parameter_names = [p["name"] for p in pcs]
87 # We need to remove any redundant keys that are not in PCS
88 removable_keys = [key for key in best_configuration
89 if key not in parameter_names]
90 for key in removable_keys:
91 del best_configuration[key]
92 opt_config_str = " ".join([f"-{k} {v}" for k, v in best_configuration.items()])
93 # We need to check which params are missing and supplement with default values
94 for p in pcs:
95 if p["name"] not in opt_config_str:
96 opt_config_str += f" -{p['name']} {p['default']}"
98 # Ablation cannot deal with E scientific notation in floats
99 ctx = decimal.Context(prec=16)
100 for config in opt_config_str.split(" -"):
101 _, value = config.strip().split(" ")
102 if "e" in value.lower():
103 value = value.strip("'")
104 float_value = float(value.lower())
105 formatted = format(ctx.create_decimal(float_value), "f")
106 opt_config_str = opt_config_str.replace(value, formatted)
108 smac_run_obj = "RUNTIME" if objective.time else "QUALITY"
109 objective_str = "MEAN10" if objective.time else "MEAN"
110 pcs_file_path = f"{self.config_scenario.solver.get_pcs_file().absolute()}"
112 # Create config file
113 config_file = Path(f"{ablation_scenario_dir}/ablation_config.txt")
114 config = (f'algo = "{AblationScenario.configurator_target.absolute()} '
115 f"{self.config_scenario.solver.directory.absolute()} "
116 f'{self.tmp_dir.absolute()} {objective}"\n'
117 f"execdir = {self.tmp_dir.absolute()}\n"
118 "experimentDir = ./\n"
119 f"deterministic = {1 if self.solver.deterministic else 0}\n"
120 f"run_obj = {smac_run_obj}\n"
121 f"overall_obj = {objective_str}\n"
122 f"cutoffTime = {cutoff_time}\n"
123 f"cutoff_length = {cutoff_length}\n"
124 f"cli-cores = {self.concurrent_clis}\n"
125 f"useRacing = {ablation_racing}\n"
126 "seed = 1234\n"
127 f"paramfile = {pcs_file_path}\n"
128 "instance_file = instances_train.txt\n"
129 "test_instance_file = instances_test.txt\n"
130 "sourceConfiguration=DEFAULT\n"
131 f'targetConfiguration="{opt_config_str}"')
132 config_file.open("w").write(config)
133 # Write config to validation directory
134 conf_valid = config.replace(f"execdir = {self.tmp_dir.absolute()}\n",
135 f"execdir = {self.validation_dir_tmp.absolute()}\n")
136 (self.validation_dir / config_file.name).open("w").write(conf_valid)
138 def create_instance_file(self: AblationScenario, test: bool = False) -> None:
139 """Create an instance file for ablation analysis."""
140 file_suffix = "_train.txt"
141 instance_set = self.train_set
142 if test:
143 file_suffix = "_test.txt"
144 instance_set = self.test_set if self.test_set is not None else self.train_set
145 # We give the Ablation script the paths of the instances
146 file_instance = self.scenario_dir / f"instances{file_suffix}"
147 with file_instance.open("w") as fh:
148 for instance in instance_set._instance_paths:
149 # We need to unpack the multi instance file paths in quotes
150 if isinstance(instance, list):
151 joined_instances = " ".join(
152 [str(file.absolute()) for file in instance])
153 fh.write(f"{joined_instances}\n")
154 else:
155 fh.write(f"{instance.absolute()}\n")
156 # Copy to validation directory
157 shutil.copyfile(file_instance, self.validation_dir / file_instance.name)
159 def check_for_ablation(self: AblationScenario) -> bool:
160 """Checks if ablation has terminated successfully."""
161 if not self.table_file.is_file():
162 return False
163 # First line in the table file should be "Ablation analysis validation complete."
164 table_line = self.table_file.open().readline().strip()
165 return table_line == "Ablation analysis validation complete."
167 def read_ablation_table(self: AblationScenario) -> list[list[str]]:
168 """Read from ablation table of a scenario."""
169 if not self.check_for_ablation():
170 # No ablation table exists for this solver-instance pair
171 return []
172 results = [["Round", "Flipped parameter", "Source value", "Target value",
173 "Validation result"]]
175 for line in self.table_file.open().readlines():
176 # Pre-process lines from the ablation file and add to the results dictionary.
177 # Sometimes ablation rounds switch multiple parameters at once.
178 # EXAMPLE: 2 EDR, EDRalpha 0, 0.1 1, 0.1013241633106732 486.31691
179 # To split the row correctly, we remove the space before the comma separated
180 # parameters and add it back.
181 # T.S. 30-01-2024: the results object is a nested list not dictionary?
182 values = re.sub(r"\s+", " ", line.strip())
183 values = re.sub(r", ", ",", values)
184 values = [val.replace(",", ", ") for val in values.split(" ")]
185 if len(values) == 5:
186 results.append(values)
187 return results
189 def submit_ablation(self: AblationScenario,
190 log_dir: Path,
191 sbatch_options: list[str] = [],
192 run_on: Runner = Runner.SLURM) -> list[Run]:
193 """Submit an ablation job.
195 Args:
196 log_dir: Directory to store job logs
197 sbatch_options: Options to pass to sbatch
198 run_on: Determines to which RunRunner queue the job is added
200 Returns:
201 A list of Run objects. Empty when running locally.
202 """
203 # 1. submit the ablation to the runrunner queue
204 cmd = (f"{AblationScenario.ablation_executable.absolute()} "
205 "--optionFile ablation_config.txt")
206 srun_options = ["-N1", "-n1", f"-c{self.concurrent_clis}"]
207 sbatch_options += [f"--cpus-per-task={self.concurrent_clis}"]
208 run_ablation = rrr.add_to_queue(
209 runner=run_on,
210 cmd=cmd,
211 name=f"Ablation analysis: {self.solver.name} on {self.train_set.name}",
212 base_dir=log_dir,
213 path=self.scenario_dir,
214 sbatch_options=sbatch_options,
215 srun_options=srun_options)
217 runs = []
218 if run_on == Runner.LOCAL:
219 run_ablation.wait()
220 runs.append(run_ablation)
222 # 2. Run ablation validation run if we have a test set to run on
223 if self.test_set is not None:
224 # Validation dir should have a copy of all needed files, except for the
225 # output of the ablation run, which is stored in ablation-run[seed].txt
226 cmd = f"{AblationScenario.ablation_validation_executable.absolute()} "\
227 "--optionFile ablation_config.txt "\
228 "--ablationLogFile ../log/ablation-run1234.txt"
230 run_ablation_validation = rrr.add_to_queue(
231 runner=run_on,
232 cmd=cmd,
233 name=f"Ablation validation: Test set {self.test_set.name}",
234 path=self.validation_dir,
235 base_dir=log_dir,
236 dependencies=run_ablation,
237 sbatch_options=sbatch_options)
239 if run_on == Runner.LOCAL:
240 run_ablation_validation.wait()
241 runs.append(run_ablation_validation)
243 return runs