Coverage for sparkle/configurator/configurator.py: 71%

333 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-09-29 10:17 +0000

1"""Configurator class to use different algorithm configurators.""" 

2 

3from __future__ import annotations 

4import re 

5import shutil 

6import decimal 

7from pathlib import Path 

8from datetime import datetime 

9from typing import Optional 

10import random 

11 

12import runrunner as rrr 

13from runrunner import Runner, Run 

14 

15from sparkle.solver import Solver 

16from sparkle.instance import InstanceSet, Instance_Set 

17from sparkle.structures import PerformanceDataFrame 

18from sparkle.types import SparkleObjective 

19 

20 

21class Configurator: 

22 """Abstact class to use different configurators like SMAC.""" 

23 

24 configurator_cli_path = Path(__file__).parent.resolve() / "configurator_cli.py" 

25 

26 full_name = "Configurator Abstract Class" 

27 version = "NaN" 

28 

29 def __init__(self: Configurator, multi_objective_support: bool = False) -> None: 

30 """Initialize Configurator. 

31 

32 Args: 

33 multi_objective_support: Whether the configurator supports 

34 multi objective optimization for solvers. 

35 """ 

36 self.multiobjective = multi_objective_support 

37 

38 @property 

39 def name(self: Configurator) -> str: 

40 """Return the name of the configurator.""" 

41 return self.__class__.__name__ 

42 

43 @staticmethod 

44 def scenario_class() -> ConfigurationScenario: 

45 """Return the scenario class of the configurator.""" 

46 return ConfigurationScenario 

47 

48 @staticmethod 

49 def check_requirements(verbose: bool = False) -> bool: 

50 """Check if the configurator is installed.""" 

51 raise NotImplementedError 

52 

53 @staticmethod 

54 def download_requirements() -> None: 

55 """Download the configurator.""" 

56 raise NotImplementedError 

57 

58 def configure( 

59 self: Configurator, 

60 configuration_commands: list[str], 

61 data_target: PerformanceDataFrame, 

62 output: Path, 

63 scenario: ConfigurationScenario, 

64 configuration_ids: list[str] = None, 

65 validate_after: bool = True, 

66 sbatch_options: list[str] = None, 

67 slurm_prepend: str | list[str] | Path = None, 

68 num_parallel_jobs: int = None, 

69 base_dir: Path = None, 

70 run_on: Runner = Runner.SLURM, 

71 ) -> Run: 

72 """Start configuration job. 

73 

74 This method is shared by the configurators and should be called by the 

75 implementation/subclass of the configurator. 

76 

77 Args: 

78 configuration_commands: List of configurator commands to execute 

79 data_target: Performance data to store the results. 

80 output: Output directory. 

81 scenario: ConfigurationScenario to execute. 

82 configuration_ids: List of configuration ids that are to be created 

83 validate_after: Whether the configurations should be validated 

84 sbatch_options: List of slurm batch options to use 

85 slurm_prepend: Slurm script to prepend to the sbatch 

86 num_parallel_jobs: The maximum number of jobs to run in parallel 

87 base_dir: The base_dir of RunRunner where the sbatch scripts will be placed 

88 run_on: On which platform to run the jobs. Default: Slurm. 

89 

90 Returns: 

91 A RunRunner Run object. 

92 """ 

93 if not self.check_requirements(verbose=True): 

94 raise RuntimeError( 

95 f"{self.name} is not installed. Please install {self.name} and try again." 

96 ) 

97 # Add the configuration IDs to the dataframe with empty configurations 

98 data_target.add_configuration( 

99 str(scenario.solver.directory), 

100 configuration_ids, 

101 [{}] * len(configuration_ids), 

102 ) 

103 data_target.save_csv() 

104 # Submit the configuration job 

105 runs = [ 

106 rrr.add_to_queue( 

107 runner=run_on, 

108 cmd=configuration_commands, 

109 name=f"{self.name}: {scenario.solver.name} on {scenario.instance_set.name}", 

110 base_dir=base_dir, 

111 output_path=output, 

112 parallel_jobs=num_parallel_jobs, 

113 sbatch_options=sbatch_options, 

114 prepend=slurm_prepend, 

115 ) 

116 ] 

117 

118 if validate_after: 

119 validate = scenario.solver.run_performance_dataframe( 

120 scenario.instance_set, 

121 config_ids=configuration_ids, 

122 performance_dataframe=data_target, 

123 cutoff_time=scenario.solver_cutoff_time, 

124 sbatch_options=sbatch_options, 

125 slurm_prepend=slurm_prepend, 

126 log_dir=scenario.validation, 

127 base_dir=base_dir, 

128 dependencies=runs, 

129 job_name=f"{self.name}: Validating {len(configuration_ids)} " 

130 f"{scenario.solver.name} Configurations on " 

131 f"{scenario.instance_set.name}", 

132 run_on=run_on, 

133 ) 

134 runs.append(validate) 

135 

136 if run_on == Runner.LOCAL: 

137 print(f"[{self.name}] Running {len(runs)} jobs locally...") 

138 for run in runs: 

139 run.wait() 

140 print(f"[{self.name}] Finished running {len(runs)} jobs locally.") 

141 return runs 

142 

143 @staticmethod 

144 def organise_output( 

145 output_source: Path, 

146 output_target: Path, 

147 scenario: ConfigurationScenario, 

148 configuration_id: str, 

149 ) -> None | str: 

150 """Method to restructure and clean up after a single configurator call. 

151 

152 Args: 

153 output_source: Path to the output file of the configurator run. 

154 output_target: Path to the Performance DataFrame to store result. 

155 scenario: ConfigurationScenario of the configuration. 

156 configuration_id: ID (of the run) of the configuration. 

157 """ 

158 raise NotImplementedError 

159 

160 @staticmethod 

161 def save_configuration( 

162 scenario: ConfigurationScenario, 

163 configuration_id: str, 

164 configuration: dict, 

165 output_target: Path, 

166 ) -> dict | None: 

167 """Method to save a configuration to a file. 

168 

169 If the output_target is None, return the configuration. 

170 

171 Args: 

172 scenario: ConfigurationScenario of the configuration. Should be removed. 

173 configuration_id: ID (of the run) of the configuration. 

174 configuration: Configuration to save. 

175 output_target: Path to the Performance DataFrame to store result. 

176 """ 

177 if output_target is None or not output_target.exists(): 

178 return configuration 

179 # Save result to Performance DataFrame 

180 from filelock import FileLock 

181 

182 lock = FileLock(f"{output_target}.lock") 

183 with lock.acquire(timeout=600): 

184 performance_data = PerformanceDataFrame(output_target) 

185 # Resolve absolute path to Solver column 

186 solver = [ 

187 s 

188 for s in performance_data.solvers 

189 if Path(s).name == scenario.solver.name 

190 ][0] 

191 # Update the configuration ID by adding the configuration 

192 performance_data.add_configuration( 

193 solver=solver, 

194 configuration_id=configuration_id, 

195 configuration=configuration, 

196 ) 

197 performance_data.save_csv() 

198 

199 def get_status_from_logs(self: Configurator) -> None: 

200 """Method to scan the log files of the configurator for warnings.""" 

201 raise NotImplementedError 

202 

203 

204class ConfigurationScenario: 

205 """Template class to handle a configuration scenarios.""" 

206 

207 def __init__( 

208 self: ConfigurationScenario, 

209 solver: Solver, 

210 instance_set: InstanceSet, 

211 sparkle_objectives: list[SparkleObjective], 

212 number_of_runs: int, 

213 parent_directory: Path, 

214 timestamp: str = None, 

215 ) -> None: 

216 """Initialize scenario paths and names. 

217 

218 Args: 

219 solver: Solver that should be configured. 

220 instance_set: Instances object for the scenario. 

221 sparkle_objectives: Sparkle Objectives to optimize. 

222 number_of_runs: The number of configurator runs to perform. 

223 parent_directory: Directory in which the scenario should be placed. 

224 timestamp: The timestamp of the scenario directory/file creation. 

225 Only set when read from file, otherwise generated at time of creation. 

226 """ 

227 self.solver = solver 

228 self.instance_set = instance_set 

229 self.sparkle_objectives = sparkle_objectives 

230 self.number_of_runs = number_of_runs 

231 self.parent_directory = parent_directory 

232 self._timestamp = timestamp 

233 self._ablation_scenario: AblationScenario = None 

234 

235 @property 

236 def configurator(self: ConfigurationScenario) -> Configurator: 

237 """Return the type of configurator the scenario belongs to.""" 

238 return Configurator 

239 

240 @property 

241 def name(self: ConfigurationScenario) -> str: 

242 """Return the name of the scenario.""" 

243 return f"{self.solver.name}_{self.instance_set.name}_{self.timestamp}" 

244 

245 @property 

246 def timestamp(self: ConfigurationScenario) -> str: 

247 """Return the timestamp.""" 

248 return self._timestamp 

249 

250 @property 

251 def directory(self: ConfigurationScenario) -> Path: 

252 """Return the path of the scenario directory.""" 

253 return None if self.timestamp is None else self.parent_directory / self.name 

254 

255 @property 

256 def scenario_file_path(self: ConfigurationScenario) -> Path: 

257 """Return the path of the scenario file.""" 

258 if self.directory: 

259 return self.directory / "scenario.txt" 

260 return None 

261 

262 @property 

263 def validation(self: ConfigurationScenario) -> Path: 

264 """Return the path of the validation directory.""" 

265 if self.directory: 

266 return self.directory / "validation" 

267 return None 

268 

269 @property 

270 def tmp(self: ConfigurationScenario) -> Path: 

271 """Return the path of the tmp directory.""" 

272 if self.directory: 

273 return self.directory / "tmp" 

274 return None 

275 

276 @property 

277 def results_directory(self: ConfigurationScenario) -> Path: 

278 """Return the path of the results directory.""" 

279 if self.directory: 

280 return self.directory / "results" 

281 return None 

282 

283 @property 

284 def configuration_ids(self: ConfigurationScenario) -> list[str]: 

285 """Return the IDs of the configurations for the scenario. 

286 

287 Only exists after the scenario has been created. 

288 

289 Returns: 

290 List of configuration IDs, one for each run. 

291 """ 

292 return [ 

293 f"{self.configurator.__name__}_{self.timestamp}_{i}" 

294 for i in range(self.number_of_runs) 

295 ] 

296 

297 @property 

298 def ablation_scenario(self: ConfigurationScenario) -> AblationScenario: 

299 """Return the ablation scenario for the scenario if it exists.""" 

300 if self._ablation_scenario is not None: 

301 return self._ablation_scenario 

302 for scenario in self.directory.glob("*/ablation_config.txt"): 

303 self._ablation_scenario = AblationScenario.from_file(scenario, self) 

304 return self._ablation_scenario 

305 return None 

306 

307 def create_scenario(self: ConfigurationScenario) -> None: 

308 """Create scenario with solver and instances in the parent directory. 

309 

310 This prepares all the necessary subdirectories related to configuration. 

311 

312 Args: 

313 parent_directory: Directory in which the scenario should be created. 

314 """ 

315 self._timestamp = datetime.now().strftime("%Y%m%d-%H%M") 

316 # Prepare scenario directory 

317 shutil.rmtree(self.directory, ignore_errors=True) 

318 self.directory.mkdir(parents=True) 

319 # Create empty directories as needed 

320 self.tmp.mkdir(exist_ok=True) 

321 self.validation.mkdir(exist_ok=True) 

322 self.results_directory.mkdir(exist_ok=True) 

323 

324 def create_scenario_file(self: ConfigurationScenario) -> Path: 

325 """Create a file with the configuration scenario.""" 

326 raise NotImplementedError 

327 

328 def serialise(self: ConfigurationScenario) -> dict: 

329 """Serialize the configuration scenario.""" 

330 raise NotImplementedError 

331 

332 @classmethod 

333 def find_scenario( 

334 cls: ConfigurationScenario, 

335 directory: Path, 

336 solver: Solver, 

337 instance_set: InstanceSet, 

338 timestamp: str = None, 

339 ) -> ConfigurationScenario: 

340 """Resolve a scenario from a directory and Solver / Training set.""" 

341 if timestamp is None: 

342 # Get the newest timestamp 

343 timestamp_list: list[datetime] = [] 

344 for subdir in directory.iterdir(): 

345 if subdir.is_dir(): 

346 dir_timestamp = subdir.name.split("_")[-1] 

347 try: 

348 dir_timestamp = datetime.strptime(dir_timestamp, "%Y%m%d-%H%M") 

349 timestamp_list.append(dir_timestamp) 

350 except ValueError: 

351 continue 

352 

353 if timestamp_list == []: 

354 return None 

355 timestamp = max(timestamp_list).strftime("%Y%m%d-%H%M") 

356 

357 scenario_name = f"{solver.name}_{instance_set.name}_{timestamp}" 

358 path = directory / f"{scenario_name}" / "scenario.txt" 

359 if not path.exists(): 

360 return None 

361 return cls.from_file(path) 

362 

363 @staticmethod 

364 def from_file(scenario_file: Path) -> ConfigurationScenario: 

365 """Reads scenario file and initalises ConfigurationScenario.""" 

366 raise NotImplementedError 

367 

368 

369class AblationScenario: 

370 """Class for ablation analysis.""" 

371 

372 # We use the SMAC2 target algorithm for solver output handling 

373 configurator_target = ( 

374 Path(__file__).parent.resolve() 

375 / "implementations" 

376 / "SMAC2" 

377 / "smac2_target_algorithm.py" 

378 ) 

379 

380 ablation_dir = Path(__file__).parent / "implementations" / "ablationAnalysis-0.9.4" 

381 ablation_executable = ablation_dir / "ablationAnalysis" 

382 ablation_validation_executable = ablation_dir / "ablationValidation" 

383 

384 def __init__( 

385 self: AblationScenario, 

386 configuration_scenario: ConfigurationScenario, 

387 test_set: InstanceSet, 

388 cutoff_length: str, 

389 concurrent_clis: int, 

390 best_configuration: dict, 

391 ablation_racing: bool = False, 

392 ) -> None: 

393 """Initialize ablation scenario. 

394 

395 Args: 

396 solver: Solver object 

397 configuration_scenario: Configuration scenario 

398 train_set: The training instance 

399 test_set: The test instance 

400 cutoff_length: The cutoff length for ablation analysis 

401 concurrent_clis: The maximum number of concurrent jobs on a single node 

402 best_configuration: The configuration to ablate from. 

403 ablation_racing: Whether to use ablation racing 

404 """ 

405 self.config_scenario = configuration_scenario 

406 self.solver = configuration_scenario.solver 

407 self.train_set = configuration_scenario.instance_set 

408 self.test_set = test_set 

409 self.cutoff_time = configuration_scenario.solver_cutoff_time 

410 self.cutoff_length = cutoff_length 

411 self.concurrent_clis = concurrent_clis 

412 self.best_configuration = best_configuration 

413 self.ablation_racing = ablation_racing 

414 self.scenario_name = f"ablation_{configuration_scenario.name}" 

415 self._table_file: Optional[Path] = None 

416 if self.test_set is not None: 

417 self.scenario_name += f"_{self.test_set.name}" 

418 

419 @property 

420 def scenario_dir(self: AblationScenario) -> Path: 

421 """Return the path of the scenario directory.""" 

422 if self.config_scenario.directory: 

423 return self.config_scenario.directory / self.scenario_name 

424 return None 

425 

426 @property 

427 def tmp_dir(self: AblationScenario) -> Path: 

428 """Return the path of the tmp directory.""" 

429 if self.scenario_dir: 

430 return self.scenario_dir / "tmp" 

431 return None 

432 

433 @property 

434 def validation_dir(self: AblationScenario) -> Path: 

435 """Return the path of the validation directory.""" 

436 if self.scenario_dir: 

437 return self.scenario_dir / "validation" 

438 return None 

439 

440 @property 

441 def validation_dir_tmp(self: AblationScenario) -> Path: 

442 """Return the path of the validation tmp directory.""" 

443 if self.validation_dir: 

444 return self.validation_dir / "tmp" 

445 return None 

446 

447 @property 

448 def table_file(self: AblationScenario) -> Path: 

449 """Return the path of the table file.""" 

450 if self._table_file: 

451 return self._table_file 

452 elif self.validation_dir: 

453 return self.validation_dir / "log" / "ablation-validation-run1234.txt" 

454 else: 

455 return None 

456 

457 @staticmethod 

458 def check_requirements(verbose: bool = False) -> bool: 

459 """Check if Ablation Analysis is installed.""" 

460 import warnings 

461 

462 if no_java := shutil.which("java") is None: 

463 if verbose: 

464 warnings.warn( 

465 "AblationAnalysis requires Java 1.8.0_402, but Java is not installed" 

466 ". Please ensure Java is installed." 

467 ) 

468 if no_exec := not AblationScenario.ablation_executable.exists(): 

469 if verbose: 

470 warnings.warn( 

471 "AblationAnalysis executable not found. Please ensure Ablation" 

472 " Analysis is installed in the expected Path " 

473 f"({AblationScenario.ablation_executable})." 

474 ) 

475 if no_validation := not AblationScenario.ablation_validation_executable.exists(): 

476 if verbose: 

477 warnings.warn( 

478 "AblationAnalysis Validation executable not found. Please ensure " 

479 "Ablation Analysis is installed in the expected Path " 

480 f"({AblationScenario.ablation_validation_executable})." 

481 ) 

482 return not (no_java or no_exec or no_validation) 

483 

484 @staticmethod 

485 def download_requirements( 

486 ablation_url: str = "https://github.com/ADA-research/Sparkle/raw/refs/heads/development" 

487 "/Resources/Other/ablationAnalysis-0.9.4.zip", 

488 ) -> None: 

489 """Download Ablation Analysis executable.""" 

490 if AblationScenario.ablation_executable.exists(): 

491 return # Already installed 

492 from urllib.request import urlopen 

493 import zipfile 

494 import io 

495 

496 AblationScenario.ablation_dir.mkdir(parents=True, exist_ok=True) 

497 r = urlopen(ablation_url, timeout=60) 

498 z = zipfile.ZipFile(io.BytesIO(r.read())) 

499 z.extractall(AblationScenario.ablation_dir) 

500 # Ensure execution rights 

501 AblationScenario.ablation_executable.chmod(0o755) 

502 AblationScenario.ablation_validation_executable.chmod(0o755) 

503 

504 def create_configuration_file(self: AblationScenario) -> Path: 

505 """Create a configuration file for ablation analysis. 

506 

507 Returns: 

508 Path to the created configuration file. 

509 """ 

510 objective = self.config_scenario.sparkle_objectives[0] 

511 pcs = self.solver.get_configuration_space() 

512 parameter_names = [p.name for p in pcs.values()] 

513 # We need to remove any redundant keys that are not in PCS 

514 best_configuration = self.best_configuration.copy() 

515 removable_keys = [ 

516 key for key in best_configuration if key not in parameter_names 

517 ] 

518 for key in removable_keys: 

519 del best_configuration[key] 

520 opt_config_str = " ".join([f"-{k} {v}" for k, v in best_configuration.items()]) 

521 # We need to check which params are missing and supplement with default values 

522 for p in list(pcs.values()): 

523 if p.name not in opt_config_str: 

524 opt_config_str += f" -{p.name} {p.default_value}" 

525 

526 # Ablation cannot deal with E scientific notation in floats 

527 ctx = decimal.Context(prec=16) 

528 for config in opt_config_str.split(" -"): 

529 _, value = config.strip().split(" ") 

530 if "e" in value.lower(): 

531 value = value.strip("'") 

532 float_value = float(value.lower()) 

533 formatted = format(ctx.create_decimal(float_value), "f") 

534 opt_config_str = opt_config_str.replace(value, formatted) 

535 

536 smac_run_obj = "RUNTIME" if objective.time else "QUALITY" 

537 objective_str = "MEAN10" if objective.time else "MEAN" 

538 pcs_file_path = f"{self.config_scenario.solver.pcs_file.absolute()}" 

539 

540 # Create config file 

541 config_file = self.scenario_dir / "ablation_config.txt" 

542 config = ( 

543 f'algo = "{AblationScenario.configurator_target.absolute()} ' 

544 f"{self.config_scenario.solver.directory.absolute()} " 

545 f'{self.tmp_dir.absolute()} {objective}"\n' 

546 f"execdir = {self.tmp_dir.absolute()}\n" 

547 "experimentDir = ./\n" 

548 f"deterministic = {1 if self.solver.deterministic else 0}\n" 

549 f"run_obj = {smac_run_obj}\n" 

550 f"overall_obj = {objective_str}\n" 

551 f"cutoffTime = {self.cutoff_time}\n" 

552 f"cutoff_length = {self.cutoff_length}\n" 

553 f"cli-cores = {self.concurrent_clis}\n" 

554 f"useRacing = {self.ablation_racing}\n" 

555 f"seed = {random.randint(0, 2**32 - 1)}\n" 

556 f"paramfile = {pcs_file_path}\n" 

557 "instance_file = instances_train.txt\n" 

558 "test_instance_file = instances_test.txt\n" 

559 "sourceConfiguration = DEFAULT\n" 

560 f'targetConfiguration = "{opt_config_str}"' 

561 ) 

562 config_file.open("w").write(config) 

563 # Write config to validation directory 

564 conf_valid = config.replace( 

565 f"execdir = {self.tmp_dir.absolute()}\n", 

566 f"execdir = {self.validation_dir_tmp.absolute()}\n", 

567 ) 

568 (self.validation_dir / config_file.name).open("w").write(conf_valid) 

569 return self.validation_dir / config_file.name 

570 

571 def create_instance_file(self: AblationScenario, test: bool = False) -> Path: 

572 """Create an instance file for ablation analysis.""" 

573 file_suffix = "_train.txt" 

574 instance_set = self.train_set 

575 if test: 

576 file_suffix = "_test.txt" 

577 instance_set = self.test_set if self.test_set is not None else self.train_set 

578 # We give the Ablation script the paths of the instances 

579 file_instance = self.scenario_dir / f"instances{file_suffix}" 

580 with file_instance.open("w") as fh: 

581 for instance in instance_set._instance_paths: 

582 # We need to unpack the multi instance file paths in quotes 

583 if isinstance(instance, list): 

584 joined_instances = " ".join( 

585 [str(file.absolute()) for file in instance] 

586 ) 

587 fh.write(f"{joined_instances}\n") 

588 else: 

589 fh.write(f"{instance.absolute()}\n") 

590 # Copy to validation directory 

591 shutil.copyfile(file_instance, self.validation_dir / file_instance.name) 

592 return file_instance 

593 

594 def create_scenario(self: AblationScenario, override_dirs: bool = False) -> None: 

595 """Create scenario directory and files.""" 

596 if self.scenario_dir.exists(): 

597 print("WARNING: Found existing ablation scenario.") 

598 if not override_dirs: 

599 print("Set override to True to overwrite existing scenario.") 

600 return 

601 print("Overwriting existing scenario...") 

602 shutil.rmtree(self.scenario_dir) 

603 self.tmp_dir.mkdir(parents=True, exist_ok=True) 

604 self.validation_dir_tmp.mkdir(parents=True, exist_ok=True) 

605 self.create_instance_file() 

606 self.create_instance_file(test=True) 

607 self.create_configuration_file() 

608 

609 def check_for_ablation(self: AblationScenario) -> bool: 

610 """Checks if ablation has terminated successfully.""" 

611 if not self.table_file.is_file(): 

612 return False 

613 # First line in the table file should be "Ablation analysis validation complete." 

614 table_line = self.table_file.open().readline().strip() 

615 return table_line == "Ablation analysis validation complete." 

616 

617 def read_ablation_table(self: AblationScenario) -> list[list[str]]: 

618 """Read from ablation table of a scenario.""" 

619 if not self.check_for_ablation(): 

620 # No ablation table exists for this solver-instance pair 

621 return [] 

622 results = [ 

623 [ 

624 "Round", 

625 "Flipped parameter", 

626 "Source value", 

627 "Target value", 

628 "Validation result", 

629 ] 

630 ] 

631 

632 for line in self.table_file.open().readlines(): 

633 # Pre-process lines from the ablation file and add to the results dictionary. 

634 # Sometimes ablation rounds switch multiple parameters at once. 

635 # EXAMPLE: 2 EDR, EDRalpha 0, 0.1 1, 0.1013241633106732 486.31691 

636 # To split the row correctly, we remove the space before the comma separated 

637 # parameters and add it back. 

638 # T.S. 30-01-2024: the results object is a nested list not dictionary? 

639 values = re.sub(r"\s+", " ", line.strip()) 

640 values = re.sub(r", ", ",", values) 

641 values = [val.replace(",", ", ") for val in values.split(" ")] 

642 if len(values) == 5: 

643 results.append(values) 

644 return results 

645 

646 def submit_ablation( 

647 self: AblationScenario, 

648 log_dir: Path, 

649 sbatch_options: list[str] = [], 

650 slurm_prepend: str | list[str] | Path = None, 

651 run_on: Runner = Runner.SLURM, 

652 ) -> list[Run]: 

653 """Submit an ablation job. 

654 

655 Args: 

656 log_dir: Directory to store job logs 

657 sbatch_options: Options to pass to sbatch 

658 slurm_prepend: Script to prepend to sbatch script 

659 run_on: Determines to which RunRunner queue the job is added 

660 

661 Returns: 

662 A list of Run objects. Empty when running locally. 

663 """ 

664 if not self.check_requirements(verbose=True): 

665 raise RuntimeError( 

666 "Ablation Analysis is not available. Please ensure Java and Ablation " 

667 "Analysis is installed and try again." 

668 ) 

669 # 1. submit the ablation to the runrunner queue 

670 cmd = ( 

671 f"{AblationScenario.ablation_executable.absolute()} " 

672 "--optionFile ablation_config.txt" 

673 ) 

674 srun_options = ["-N1", "-n1", f"-c{self.concurrent_clis}"] 

675 sbatch_options += [f"--cpus-per-task={self.concurrent_clis}"] 

676 run_ablation = rrr.add_to_queue( 

677 runner=run_on, 

678 cmd=cmd, 

679 name=f"Ablation analysis: {self.solver.name} on {self.train_set.name}", 

680 base_dir=log_dir, 

681 path=self.scenario_dir, 

682 sbatch_options=sbatch_options, 

683 srun_options=srun_options, 

684 prepend=slurm_prepend, 

685 ) 

686 

687 runs = [] 

688 if run_on == Runner.LOCAL: 

689 run_ablation.wait() 

690 runs.append(run_ablation) 

691 

692 # 2. Run ablation validation run if we have a test set to run on 

693 if self.test_set is not None: 

694 # Validation dir should have a copy of all needed files, except for the 

695 # output of the ablation run, which is stored in ablation-run[seed].txt 

696 cmd = ( 

697 f"{AblationScenario.ablation_validation_executable.absolute()} " 

698 "--optionFile ablation_config.txt " 

699 "--ablationLogFile ../log/ablation-run1234.txt" 

700 ) 

701 

702 run_ablation_validation = rrr.add_to_queue( 

703 runner=run_on, 

704 cmd=cmd, 

705 name=f"Ablation validation: Test set {self.test_set.name}", 

706 path=self.validation_dir, 

707 base_dir=log_dir, 

708 dependencies=run_ablation, 

709 sbatch_options=sbatch_options, 

710 prepend=slurm_prepend, 

711 ) 

712 

713 if run_on == Runner.LOCAL: 

714 run_ablation_validation.wait() 

715 runs.append(run_ablation_validation) 

716 return runs 

717 

718 @staticmethod 

719 def from_file( 

720 path: Path, config_scenario: ConfigurationScenario 

721 ) -> AblationScenario: 

722 """Reads scenario file and initalises AblationScenario.""" 

723 variables = {} 

724 for line in path.open().readlines(): 

725 if line.strip() == "": 

726 continue 

727 key, value = line.strip().split(" = ", maxsplit=1) 

728 variables[key] = value 

729 best_conf = {} 

730 for keyvalue in variables["targetConfiguration"].replace('"', "").split("-"): 

731 keyvalue = keyvalue.strip() 

732 if keyvalue: 

733 key, value = keyvalue.strip().split(" ", maxsplit=1) 

734 best_conf[key] = value 

735 test_set = None 

736 if (path.parent / "instances_test.txt").exists(): 

737 test_path = (path.parent / "instances_test.txt").open().readline().strip() 

738 test_path = Path(test_path).parent 

739 if test_path != config_scenario.instance_set.directory: 

740 test_set = Instance_Set(test_path) 

741 return AblationScenario( 

742 config_scenario, 

743 test_set, 

744 variables["cutoff_length"], 

745 int(variables["cli-cores"]), 

746 best_conf, 

747 ablation_racing=bool(variables["useRacing"]), 

748 )