Coverage for src / sparkle / configurator / configurator.py: 66%

339 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 15:31 +0000

1"""Configurator class to use different algorithm configurators.""" 

2 

3from __future__ import annotations 

4import re 

5import shutil 

6import decimal 

7from pathlib import Path 

8from datetime import datetime 

9from typing import Optional 

10import random 

11 

12import runrunner as rrr 

13from runrunner import Runner, Run 

14 

15from sparkle.solver import Solver 

16from sparkle.tools.parameters import PCSConvention 

17from sparkle.instance import InstanceSet, Instance_Set 

18from sparkle.structures import PerformanceDataFrame 

19from sparkle.types import SparkleObjective 

20 

21 

22class Configurator: 

23 """Abstact class to use different configurators like SMAC.""" 

24 

25 configurator_cli_path = Path(__file__).parent.resolve() / "configurator_cli.py" 

26 

27 full_name = "Configurator Abstract Class" 

28 version = "NaN" 

29 

30 def __init__(self: Configurator, multi_objective_support: bool = False) -> None: 

31 """Initialize Configurator. 

32 

33 Args: 

34 multi_objective_support: Whether the configurator supports 

35 multi objective optimization for solvers. 

36 """ 

37 self.multiobjective = multi_objective_support 

38 

39 @property 

40 def name(self: Configurator) -> str: 

41 """Return the name of the configurator.""" 

42 return self.__class__.__name__ 

43 

44 @staticmethod 

45 def scenario_class() -> ConfigurationScenario: 

46 """Return the scenario class of the configurator.""" 

47 return ConfigurationScenario 

48 

49 @staticmethod 

50 def check_requirements(verbose: bool = False) -> bool: 

51 """Check if the configurator is installed.""" 

52 raise NotImplementedError 

53 

54 @staticmethod 

55 def download_requirements() -> None: 

56 """Download the configurator.""" 

57 raise NotImplementedError 

58 

59 def configure( 

60 self: Configurator, 

61 configuration_commands: list[str], 

62 data_target: PerformanceDataFrame, 

63 output: Path, 

64 scenario: ConfigurationScenario, 

65 configuration_ids: list[str] = None, 

66 validate_after: bool = True, 

67 sbatch_options: list[str] = None, 

68 slurm_prepend: str | list[str] | Path = None, 

69 num_parallel_jobs: int = None, 

70 base_dir: Path = None, 

71 run_on: Runner = Runner.SLURM, 

72 ) -> Run: 

73 """Start configuration job. 

74 

75 This method is shared by the configurators and should be called by the 

76 implementation/subclass of the configurator. 

77 

78 Args: 

79 configuration_commands: List of configurator commands to execute 

80 data_target: Performance data to store the results. 

81 output: Output directory. 

82 scenario: ConfigurationScenario to execute. 

83 configuration_ids: List of configuration ids that are to be created 

84 validate_after: Whether the configurations should be validated 

85 sbatch_options: List of slurm batch options to use 

86 slurm_prepend: Slurm script to prepend to the sbatch 

87 num_parallel_jobs: The maximum number of jobs to run in parallel 

88 base_dir: The base_dir of RunRunner where the sbatch scripts will be placed 

89 run_on: On which platform to run the jobs. Default: Slurm. 

90 

91 Returns: 

92 A RunRunner Run object. 

93 """ 

94 if not self.check_requirements(verbose=True): 

95 raise RuntimeError( 

96 f"{self.name} is not installed. Please install {self.name} and try again." 

97 ) 

98 # Add the configuration IDs to the dataframe with empty configurations 

99 data_target.add_configuration( 

100 str(scenario.solver.directory), 

101 configuration_ids, 

102 [{}] * len(configuration_ids), 

103 ) 

104 data_target.save_csv() 

105 # Submit the configuration job 

106 runs = [ 

107 rrr.add_to_queue( 

108 runner=run_on, 

109 cmd=configuration_commands, 

110 name=f"{self.name} {scenario.solver.name} on {scenario.instance_set.name}", 

111 base_dir=base_dir, 

112 output_path=output, 

113 parallel_jobs=num_parallel_jobs, 

114 sbatch_options=sbatch_options, 

115 prepend=slurm_prepend, 

116 ) 

117 ] 

118 

119 if validate_after: 

120 validate = scenario.solver.run_performance_dataframe( 

121 scenario.instance_set, 

122 config_ids=configuration_ids, 

123 performance_dataframe=data_target, 

124 cutoff_time=scenario.solver_cutoff_time, 

125 sbatch_options=sbatch_options, 

126 slurm_prepend=slurm_prepend, 

127 log_dir=scenario.validation, 

128 base_dir=base_dir, 

129 dependencies=runs, 

130 job_name=f"{self.name}: Validating {len(configuration_ids)} " 

131 f"{scenario.solver.name} Configurations on " 

132 f"{scenario.instance_set.name}", 

133 run_on=run_on, 

134 ) 

135 runs.append(validate) 

136 

137 if run_on == Runner.LOCAL: 

138 print(f"[{self.name}] Running {len(runs)} jobs locally...") 

139 for run in runs: 

140 run.wait() 

141 print(f"[{self.name}] Finished running {len(runs)} jobs locally.") 

142 return runs 

143 

144 @staticmethod 

145 def organise_output( 

146 output_source: Path, 

147 output_target: Path, 

148 scenario: ConfigurationScenario, 

149 configuration_id: str, 

150 ) -> None | str: 

151 """Method to restructure and clean up after a single configurator call. 

152 

153 Args: 

154 output_source: Path to the output file of the configurator run. 

155 output_target: Path to the Performance DataFrame to store result. 

156 scenario: ConfigurationScenario of the configuration. 

157 configuration_id: ID (of the run) of the configuration. 

158 """ 

159 raise NotImplementedError 

160 

161 @staticmethod 

162 def save_configuration( 

163 scenario: ConfigurationScenario, 

164 configuration_id: str, 

165 configuration: dict, 

166 output_target: Path, 

167 ) -> dict | None: 

168 """Method to save a configuration to a file. 

169 

170 If the output_target is None, return the configuration. 

171 

172 Args: 

173 scenario: ConfigurationScenario of the configuration. Should be removed. 

174 configuration_id: ID (of the run) of the configuration. 

175 configuration: Configuration to save. 

176 output_target: Path to the Performance DataFrame to store result. 

177 """ 

178 if output_target is None or not output_target.exists(): 

179 return configuration 

180 # Save result to Performance DataFrame 

181 from filelock import FileLock 

182 

183 lock = FileLock(f"{output_target}.lock") 

184 with lock.acquire(timeout=600): 

185 performance_data = PerformanceDataFrame(output_target) 

186 # Resolve absolute path to Solver column 

187 solver = [ 

188 s 

189 for s in performance_data.solvers 

190 if Path(s).name == scenario.solver.name 

191 ][0] 

192 # Update the configuration ID by adding the configuration 

193 performance_data.add_configuration( 

194 solver=solver, 

195 configuration_id=configuration_id, 

196 configuration=configuration, 

197 ) 

198 performance_data.save_csv() 

199 

200 def get_status_from_logs(self: Configurator) -> None: 

201 """Method to scan the log files of the configurator for warnings.""" 

202 raise NotImplementedError 

203 

204 

205class ConfigurationScenario: 

206 """Template class to handle a configuration scenarios.""" 

207 

208 def __init__( 

209 self: ConfigurationScenario, 

210 solver: Solver, 

211 instance_set: InstanceSet, 

212 sparkle_objectives: list[SparkleObjective], 

213 number_of_runs: int, 

214 parent_directory: Path, 

215 timestamp: str = None, 

216 ) -> None: 

217 """Initialize scenario paths and names. 

218 

219 Args: 

220 solver: Solver that should be configured. 

221 instance_set: Instances object for the scenario. 

222 sparkle_objectives: Sparkle Objectives to optimize. 

223 number_of_runs: The number of configurator runs to perform. 

224 parent_directory: Directory in which the scenario should be placed. 

225 timestamp: The timestamp of the scenario directory/file creation. 

226 Only set when read from file, otherwise generated at time of creation. 

227 """ 

228 self.solver = solver 

229 self.instance_set = instance_set 

230 self.sparkle_objectives = sparkle_objectives 

231 self.number_of_runs = number_of_runs 

232 self.parent_directory = parent_directory 

233 self._timestamp = timestamp 

234 self._ablation_scenario: AblationScenario = None 

235 

236 @property 

237 def configurator(self: ConfigurationScenario) -> Configurator: 

238 """Return the type of configurator the scenario belongs to.""" 

239 return Configurator 

240 

241 @property 

242 def name(self: ConfigurationScenario) -> str: 

243 """Return the name of the scenario.""" 

244 return f"{self.solver.name}_{self.instance_set.name}_{self.timestamp}" 

245 

246 @property 

247 def timestamp(self: ConfigurationScenario) -> str: 

248 """Return the timestamp.""" 

249 return self._timestamp 

250 

251 @property 

252 def directory(self: ConfigurationScenario) -> Path: 

253 """Return the path of the scenario directory.""" 

254 return None if self.timestamp is None else self.parent_directory / self.name 

255 

256 @property 

257 def scenario_file_path(self: ConfigurationScenario) -> Path: 

258 """Return the path of the scenario file.""" 

259 if self.directory: 

260 return self.directory / "scenario.txt" 

261 return None 

262 

263 @property 

264 def validation(self: ConfigurationScenario) -> Path: 

265 """Return the path of the validation directory.""" 

266 if self.directory: 

267 return self.directory / "validation" 

268 return None 

269 

270 @property 

271 def tmp(self: ConfigurationScenario) -> Path: 

272 """Return the path of the tmp directory.""" 

273 if self.directory: 

274 return self.directory / "tmp" 

275 return None 

276 

277 @property 

278 def results_directory(self: ConfigurationScenario) -> Path: 

279 """Return the path of the results directory.""" 

280 if self.directory: 

281 return self.directory / "results" 

282 return None 

283 

284 @property 

285 def configuration_ids(self: ConfigurationScenario) -> list[str]: 

286 """Return the IDs of the configurations for the scenario. 

287 

288 Only exists after the scenario has been created. 

289 

290 Returns: 

291 List of configuration IDs, one for each run. 

292 """ 

293 return [ 

294 f"{self.configurator.__name__}_{self.timestamp}_{i}" 

295 for i in range(self.number_of_runs) 

296 ] 

297 

298 @property 

299 def ablation_scenario(self: ConfigurationScenario) -> AblationScenario: 

300 """Return the ablation scenario for the scenario if it exists.""" 

301 if self._ablation_scenario is not None: 

302 return self._ablation_scenario 

303 for scenario in self.directory.glob("*/ablation_config.txt"): 

304 self._ablation_scenario = AblationScenario.from_file(scenario, self) 

305 return self._ablation_scenario 

306 return None 

307 

308 def create_scenario(self: ConfigurationScenario) -> None: 

309 """Create scenario with solver and instances in the parent directory. 

310 

311 This prepares all the necessary subdirectories related to configuration. 

312 

313 Args: 

314 parent_directory: Directory in which the scenario should be created. 

315 """ 

316 self._timestamp = datetime.now().strftime("%Y%m%d-%H%M") 

317 # Prepare scenario directory 

318 shutil.rmtree(self.directory, ignore_errors=True) 

319 self.directory.mkdir(parents=True) 

320 # Create empty directories as needed 

321 self.tmp.mkdir(exist_ok=True) 

322 self.validation.mkdir(exist_ok=True) 

323 self.results_directory.mkdir(exist_ok=True) 

324 

325 def create_scenario_file(self: ConfigurationScenario) -> Path: 

326 """Create a file with the configuration scenario.""" 

327 raise NotImplementedError 

328 

329 def serialise(self: ConfigurationScenario) -> dict: 

330 """Serialize the configuration scenario.""" 

331 raise NotImplementedError 

332 

333 @classmethod 

334 def find_scenario( 

335 cls: ConfigurationScenario, 

336 directory: Path, 

337 solver: Solver, 

338 instance_set: InstanceSet, 

339 timestamp: str = None, 

340 ) -> ConfigurationScenario: 

341 """Resolve a scenario from a directory and Solver / Training set.""" 

342 if timestamp is None: 

343 # Get the newest timestamp 

344 timestamp_list: list[datetime] = [] 

345 for subdir in directory.iterdir(): 

346 if subdir.is_dir(): 

347 dir_timestamp = subdir.name.split("_")[-1] 

348 try: 

349 dir_timestamp = datetime.strptime(dir_timestamp, "%Y%m%d-%H%M") 

350 timestamp_list.append(dir_timestamp) 

351 except ValueError: 

352 continue 

353 

354 if timestamp_list == []: 

355 return None 

356 timestamp = max(timestamp_list).strftime("%Y%m%d-%H%M") 

357 

358 scenario_name = f"{solver.name}_{instance_set.name}_{timestamp}" 

359 path = directory / f"{scenario_name}" / "scenario.txt" 

360 if not path.exists(): 

361 return None 

362 return cls.from_file(path) 

363 

364 @staticmethod 

365 def from_file(scenario_file: Path) -> ConfigurationScenario: 

366 """Reads scenario file and initalises ConfigurationScenario.""" 

367 raise NotImplementedError 

368 

369 

370class AblationScenario: 

371 """Class for ablation analysis.""" 

372 

373 # We use the SMAC2 target algorithm for solver output handling 

374 configurator_target = ( 

375 Path(__file__).parent.resolve() 

376 / "implementations" 

377 / "SMAC2" 

378 / "smac2_target_algorithm.py" 

379 ) 

380 

381 ablation_dir = Path(__file__).parent / "implementations" / "ablationAnalysis-0.9.4" 

382 ablation_executable = ablation_dir / "ablationAnalysis" 

383 ablation_validation_executable = ablation_dir / "ablationValidation" 

384 

385 def __init__( 

386 self: AblationScenario, 

387 configuration_scenario: ConfigurationScenario, 

388 test_set: InstanceSet, 

389 cutoff_length: str, 

390 concurrent_clis: int, 

391 best_configuration: dict, 

392 ablation_racing: bool = False, 

393 ) -> None: 

394 """Initialize ablation scenario. 

395 

396 Args: 

397 solver: Solver object 

398 configuration_scenario: Configuration scenario 

399 train_set: The training instance 

400 test_set: The test instance 

401 cutoff_length: The cutoff length for ablation analysis 

402 concurrent_clis: The maximum number of concurrent jobs on a single node 

403 best_configuration: The configuration to ablate from. 

404 ablation_racing: Whether to use ablation racing 

405 """ 

406 self.config_scenario = configuration_scenario 

407 self.solver = configuration_scenario.solver 

408 self.train_set = configuration_scenario.instance_set 

409 self.test_set = test_set 

410 self.cutoff_time = configuration_scenario.solver_cutoff_time 

411 self.cutoff_length = cutoff_length 

412 self.concurrent_clis = concurrent_clis 

413 self.best_configuration = best_configuration 

414 self.ablation_racing = ablation_racing 

415 self.scenario_name = f"ablation_{configuration_scenario.name}" 

416 self._table_file: Optional[Path] = None 

417 if self.test_set is not None: 

418 self.scenario_name += f"_{self.test_set.name}" 

419 

420 @property 

421 def scenario_dir(self: AblationScenario) -> Path: 

422 """Return the path of the scenario directory.""" 

423 if self.config_scenario.directory: 

424 return self.config_scenario.directory / self.scenario_name 

425 return None 

426 

427 @property 

428 def tmp_dir(self: AblationScenario) -> Path: 

429 """Return the path of the tmp directory.""" 

430 if self.scenario_dir: 

431 return self.scenario_dir / "tmp" 

432 return None 

433 

434 @property 

435 def validation_dir(self: AblationScenario) -> Path: 

436 """Return the path of the validation directory.""" 

437 if self.scenario_dir: 

438 return self.scenario_dir / "validation" 

439 return None 

440 

441 @property 

442 def validation_dir_tmp(self: AblationScenario) -> Path: 

443 """Return the path of the validation tmp directory.""" 

444 if self.validation_dir: 

445 return self.validation_dir / "tmp" 

446 return None 

447 

448 @property 

449 def table_file(self: AblationScenario) -> Path: 

450 """Return the path of the table file.""" 

451 if self._table_file: 

452 return self._table_file 

453 elif self.validation_dir: 

454 return self.validation_dir / "log" / "ablation-validation-run1234.txt" 

455 else: 

456 return None 

457 

458 @staticmethod 

459 def check_requirements(verbose: bool = False) -> bool: 

460 """Check if Ablation Analysis is installed.""" 

461 import warnings 

462 

463 if no_java := shutil.which("java") is None: 

464 if verbose: 

465 warnings.warn( 

466 "AblationAnalysis requires Java 1.8.0_402, but Java is not installed" 

467 ". Please ensure Java is installed." 

468 ) 

469 if no_exec := not AblationScenario.ablation_executable.exists(): 

470 if verbose: 

471 warnings.warn( 

472 "AblationAnalysis executable not found. Please ensure Ablation" 

473 " Analysis is installed in the expected Path " 

474 f"({AblationScenario.ablation_executable})." 

475 ) 

476 if no_validation := not AblationScenario.ablation_validation_executable.exists(): 

477 if verbose: 

478 warnings.warn( 

479 "AblationAnalysis Validation executable not found. Please ensure " 

480 "Ablation Analysis is installed in the expected Path " 

481 f"({AblationScenario.ablation_validation_executable})." 

482 ) 

483 return not (no_java or no_exec or no_validation) 

484 

485 @staticmethod 

486 def download_requirements( 

487 ablation_url: str = "https://github.com/ADA-research/Sparkle/raw/refs/heads/development" 

488 "/Resources/Other/ablationAnalysis-0.9.4.zip", 

489 ) -> None: 

490 """Download Ablation Analysis executable.""" 

491 if AblationScenario.ablation_executable.exists(): 

492 return # Already installed 

493 from urllib.request import urlopen 

494 import zipfile 

495 import io 

496 

497 AblationScenario.ablation_dir.mkdir(parents=True, exist_ok=True) 

498 r = urlopen(ablation_url, timeout=60) 

499 z = zipfile.ZipFile(io.BytesIO(r.read())) 

500 z.extractall(AblationScenario.ablation_dir) 

501 # Ensure execution rights 

502 AblationScenario.ablation_executable.chmod(0o755) 

503 AblationScenario.ablation_validation_executable.chmod(0o755) 

504 

505 def create_configuration_file(self: AblationScenario) -> Path: 

506 """Create a configuration file for ablation analysis. 

507 

508 Returns: 

509 Path to the created configuration file. 

510 """ 

511 objective = self.config_scenario.sparkle_objectives[0] 

512 pcs = self.solver.get_configuration_space() 

513 parameter_names = [p.name for p in pcs.values()] 

514 # We need to remove any redundant keys that are not in PCS 

515 best_configuration = self.best_configuration.copy() 

516 removable_keys = [ 

517 key for key in best_configuration if key not in parameter_names 

518 ] 

519 for key in removable_keys: 

520 del best_configuration[key] 

521 opt_config_str = " ".join([f"-{k} {v}" for k, v in best_configuration.items()]) 

522 # We need to check which params are missing and supplement with default values 

523 for p in list(pcs.values()): 

524 if p.name not in opt_config_str: 

525 opt_config_str += f" -{p.name} {p.default_value}" 

526 

527 # Ablation cannot deal with E scientific notation in floats 

528 ctx = decimal.Context(prec=16) 

529 for config in opt_config_str.split(" -"): 

530 _, value = config.strip().split(" ") 

531 if "e" in value.lower(): 

532 value = value.strip("'") 

533 float_value = float(value.lower()) 

534 formatted = format(ctx.create_decimal(float_value), "f") 

535 opt_config_str = opt_config_str.replace(value, formatted) 

536 

537 smac_run_obj = "RUNTIME" if objective.time else "QUALITY" 

538 objective_str = "MEAN10" if objective.time else "MEAN" 

539 # Fetch the SMAC2 PCS file path 

540 pcs_file_path = self.config_scenario.solver.get_pcs_file_type(PCSConvention.SMAC) 

541 if not pcs_file_path: 

542 raise ValueError( 

543 "Could not find SMAC2 PCS file, which is required for ablation analysis." 

544 ) 

545 pcs_file_path = pcs_file_path.absolute() 

546 

547 # Create config file 

548 config_file = self.scenario_dir / "ablation_config.txt" 

549 config = ( 

550 f'algo = "{AblationScenario.configurator_target.absolute()} ' 

551 f"{self.config_scenario.solver.directory.absolute()} " 

552 f'{self.tmp_dir.absolute()} {objective}"\n' 

553 f"execdir = {self.tmp_dir.absolute()}\n" 

554 "experimentDir = ./\n" 

555 f"deterministic = {1 if self.solver.deterministic else 0}\n" 

556 f"run_obj = {smac_run_obj}\n" 

557 f"overall_obj = {objective_str}\n" 

558 f"cutoffTime = {self.cutoff_time}\n" 

559 f"cutoff_length = {self.cutoff_length}\n" 

560 f"cli-cores = {self.concurrent_clis}\n" 

561 f"useRacing = {self.ablation_racing}\n" 

562 f"seed = {random.randint(0, 2**32 - 1)}\n" 

563 f"paramfile = {pcs_file_path}\n" 

564 "instance_file = instances_train.txt\n" 

565 "test_instance_file = instances_test.txt\n" 

566 "sourceConfiguration = DEFAULT\n" 

567 f'targetConfiguration = "{opt_config_str}"' 

568 ) 

569 with config_file.open("w") as file: 

570 file.write(config) 

571 # Write config to validation directory 

572 conf_valid = config.replace( 

573 f"execdir = {self.tmp_dir.absolute()}\n", 

574 f"execdir = {self.validation_dir_tmp.absolute()}\n", 

575 ) 

576 with (self.validation_dir / config_file.name).open("w") as file: 

577 file.write(conf_valid) 

578 return self.validation_dir / config_file.name 

579 

580 def create_instance_file(self: AblationScenario, test: bool = False) -> Path: 

581 """Create an instance file for ablation analysis.""" 

582 file_suffix = "_train.txt" 

583 instance_set = self.train_set 

584 if test: 

585 file_suffix = "_test.txt" 

586 instance_set = self.test_set if self.test_set is not None else self.train_set 

587 # We give the Ablation script the paths of the instances 

588 file_instance = self.scenario_dir / f"instances{file_suffix}" 

589 with file_instance.open("w") as fh: 

590 for instance in instance_set._instance_paths: 

591 # We need to unpack the multi instance file paths in quotes 

592 if isinstance(instance, list): 

593 joined_instances = " ".join( 

594 [str(file.absolute()) for file in instance] 

595 ) 

596 fh.write(f"{joined_instances}\n") 

597 else: 

598 fh.write(f"{instance.absolute()}\n") 

599 # Copy to validation directory 

600 shutil.copyfile(file_instance, self.validation_dir / file_instance.name) 

601 return file_instance 

602 

603 def create_scenario(self: AblationScenario, override_dirs: bool = False) -> None: 

604 """Create scenario directory and files.""" 

605 if self.scenario_dir.exists(): 

606 print("WARNING: Found existing ablation scenario.") 

607 if not override_dirs: 

608 print("Set override to True to overwrite existing scenario.") 

609 return 

610 print("Overwriting existing scenario...") 

611 shutil.rmtree(self.scenario_dir) 

612 self.tmp_dir.mkdir(parents=True, exist_ok=True) 

613 self.validation_dir_tmp.mkdir(parents=True, exist_ok=True) 

614 self.create_instance_file() 

615 self.create_instance_file(test=True) 

616 self.create_configuration_file() 

617 

618 def check_for_ablation(self: AblationScenario) -> bool: 

619 """Checks if ablation has terminated successfully.""" 

620 if not self.table_file.is_file(): 

621 return False 

622 # First line in the table file should be "Ablation analysis validation complete." 

623 table_line = self.table_file.open().readline().strip() 

624 return table_line == "Ablation analysis validation complete." 

625 

626 def read_ablation_table(self: AblationScenario) -> list[list[str]]: 

627 """Read from ablation table of a scenario.""" 

628 if not self.check_for_ablation(): 

629 # No ablation table exists for this solver-instance pair 

630 return [] 

631 results = [ 

632 [ 

633 "Round", 

634 "Flipped parameter", 

635 "Source value", 

636 "Target value", 

637 "Validation result", 

638 ] 

639 ] 

640 

641 for line in self.table_file.open().readlines(): 

642 # Pre-process lines from the ablation file and add to the results dictionary. 

643 # Sometimes ablation rounds switch multiple parameters at once. 

644 # EXAMPLE: 2 EDR, EDRalpha 0, 0.1 1, 0.1013241633106732 486.31691 

645 # To split the row correctly, we remove the space before the comma separated 

646 # parameters and add it back. 

647 # T.S. 30-01-2024: the results object is a nested list not dictionary? 

648 values = re.sub(r"\s+", " ", line.strip()) 

649 values = re.sub(r", ", ",", values) 

650 values = [val.replace(",", ", ") for val in values.split(" ")] 

651 if len(values) == 5: 

652 results.append(values) 

653 return results 

654 

655 def submit_ablation( 

656 self: AblationScenario, 

657 log_dir: Path, 

658 sbatch_options: list[str] = [], 

659 slurm_prepend: str | list[str] | Path = None, 

660 run_on: Runner = Runner.SLURM, 

661 ) -> list[Run]: 

662 """Submit an ablation job. 

663 

664 Args: 

665 log_dir: Directory to store job logs 

666 sbatch_options: Options to pass to sbatch 

667 slurm_prepend: Script to prepend to sbatch script 

668 run_on: Determines to which RunRunner queue the job is added 

669 

670 Returns: 

671 A list of Run objects. Empty when running locally. 

672 """ 

673 if not self.check_requirements(verbose=True): 

674 raise RuntimeError( 

675 "Ablation Analysis is not available. Please ensure Java and Ablation " 

676 "Analysis is installed and try again." 

677 ) 

678 # 1. submit the ablation to the runrunner queue 

679 cmd = ( 

680 f"{AblationScenario.ablation_executable.absolute()} " 

681 "--optionFile ablation_config.txt" 

682 ) 

683 srun_options = ["-N1", "-n1", f"-c{self.concurrent_clis}"] 

684 sbatch_options += [f"--cpus-per-task={self.concurrent_clis}"] 

685 run_ablation = rrr.add_to_queue( 

686 runner=run_on, 

687 cmd=cmd, 

688 name=f"Ablation analysis {self.solver.name} on {self.train_set.name}", 

689 base_dir=log_dir, 

690 path=self.scenario_dir, 

691 sbatch_options=sbatch_options, 

692 srun_options=srun_options, 

693 prepend=slurm_prepend, 

694 ) 

695 

696 runs = [] 

697 if run_on == Runner.LOCAL: 

698 run_ablation.wait() 

699 runs.append(run_ablation) 

700 

701 # 2. Run ablation validation run if we have a test set to run on 

702 if self.test_set is not None: 

703 # Validation dir should have a copy of all needed files, except for the 

704 # output of the ablation run, which is stored in ablation-run[seed].txt 

705 cmd = ( 

706 f"{AblationScenario.ablation_validation_executable.absolute()} " 

707 "--optionFile ablation_config.txt " 

708 "--ablationLogFile ../log/ablation-run1234.txt" 

709 ) 

710 

711 run_ablation_validation = rrr.add_to_queue( 

712 runner=run_on, 

713 cmd=cmd, 

714 name=f"Ablation validation Test set {self.test_set.name}", 

715 path=self.validation_dir, 

716 base_dir=log_dir, 

717 dependencies=run_ablation, 

718 sbatch_options=sbatch_options, 

719 prepend=slurm_prepend, 

720 ) 

721 

722 if run_on == Runner.LOCAL: 

723 run_ablation_validation.wait() 

724 runs.append(run_ablation_validation) 

725 return runs 

726 

727 @staticmethod 

728 def from_file( 

729 path: Path, config_scenario: ConfigurationScenario 

730 ) -> AblationScenario: 

731 """Reads scenario file and initalises AblationScenario.""" 

732 variables = {} 

733 for line in path.open().readlines(): 

734 if line.strip() == "": 

735 continue 

736 key, value = line.strip().split(" = ", maxsplit=1) 

737 variables[key] = value 

738 best_conf = {} 

739 for keyvalue in variables["targetConfiguration"].replace('"', "").split("-"): 

740 keyvalue = keyvalue.strip() 

741 if keyvalue: 

742 key, value = keyvalue.strip().split(" ", maxsplit=1) 

743 best_conf[key] = value 

744 test_set = None 

745 if (path.parent / "instances_test.txt").exists(): 

746 test_path = (path.parent / "instances_test.txt").open().readline().strip() 

747 test_path = Path(test_path).parent 

748 if test_path != config_scenario.instance_set.directory: 

749 test_set = Instance_Set(test_path) 

750 return AblationScenario( 

751 config_scenario, 

752 test_set, 

753 variables["cutoff_length"], 

754 int(variables["cli-cores"]), 

755 best_conf, 

756 ablation_racing=bool(variables["useRacing"]), 

757 )