Coverage for src/sparkle/solver/solver.py: 92%

214 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-15 14:11 +0000

1"""File to handle a solver and its directories.""" 

2 

3from __future__ import annotations 

4import sys 

5from typing import Any 

6import shlex 

7import ast 

8import json 

9import random 

10from pathlib import Path 

11 

12from ConfigSpace import ConfigurationSpace 

13 

14import runrunner as rrr 

15from runrunner.local import LocalRun 

16from runrunner.slurm import Run, SlurmRun 

17from runrunner.base import Status, Runner 

18 

19from sparkle.tools.parameters import PCSConverter, PCSConvention 

20from sparkle.tools import RunSolver 

21from sparkle.types import SparkleCallable, SolverStatus 

22from sparkle.solver import verifiers 

23from sparkle.instance import InstanceSet 

24from sparkle.structures import PerformanceDataFrame 

25from sparkle.types import resolve_objective, SparkleObjective, UseTime 

26 

27 

28class Solver(SparkleCallable): 

29 """Class to handle a solver and its directories.""" 

30 

31 meta_data = "solver_meta.txt" 

32 _wrapper_file = "sparkle_solver_wrapper" 

33 solver_cli = Path(__file__).parent / "solver_cli.py" 

34 

35 def __init__( 

36 self: Solver, 

37 directory: Path, 

38 runsolver_exec: Path = None, 

39 deterministic: bool = None, 

40 verifier: verifiers.SolutionVerifier = None, 

41 ) -> None: 

42 """Initialize solver. 

43 

44 Args: 

45 directory: Directory of the solver. 

46 runsolver_exec: Path to the runsolver executable. 

47 By default, runsolver in directory. 

48 deterministic: Bool indicating determinism of the algorithm. 

49 Defaults to False. 

50 verifier: The solution verifier to use. If None, no verifier is used. 

51 """ 

52 super().__init__(directory, runsolver_exec) 

53 self.deterministic = deterministic 

54 self.verifier = verifier 

55 self._pcs_file: Path = None 

56 self._interpreter: str = None 

57 self._wrapper_extension: str = None 

58 

59 meta_data_file = self.directory / Solver.meta_data 

60 if meta_data_file.exists(): 

61 meta_data = ast.literal_eval(meta_data_file.open().read()) 

62 # We only override the deterministic and verifier from file if not set 

63 if self.deterministic is None: 

64 if ( 

65 "deterministic" in meta_data 

66 and meta_data["deterministic"] is not None 

67 ): 

68 self.deterministic = meta_data["deterministic"] 

69 if self.verifier is None and "verifier" in meta_data: 

70 if isinstance(meta_data["verifier"], tuple): # File verifier 

71 self.verifier = verifiers.mapping[meta_data["verifier"][0]]( 

72 Path(meta_data["verifier"][1]) 

73 ) 

74 elif meta_data["verifier"] in verifiers.mapping: 

75 self.verifier = verifiers.mapping[meta_data["verifier"]] 

76 if self.deterministic is None: # Default to False 

77 self.deterministic = False 

78 

79 def __str__(self: Solver) -> str: 

80 """Return the string representation of the solver.""" 

81 return self.name 

82 

83 def __repr__(self: Solver) -> str: 

84 """Return detailed representation of the solver.""" 

85 return ( 

86 f"{self.name}:\n" 

87 f"\t- Directory: {self.directory}\n" 

88 f"\t- Deterministic: {self.deterministic}\n" 

89 f"\t- Verifier: {self.verifier}\n" 

90 f"\t- PCS File: {self.pcs_file}\n" 

91 f"\t- Wrapper: {self.wrapper}" 

92 ) 

93 

94 @property 

95 def pcs_file(self: Solver) -> Path: 

96 """Get path of the parameter file.""" 

97 if self._pcs_file is None: 

98 files = sorted([p for p in self.directory.iterdir() if p.suffix == ".pcs"]) 

99 if len(files) == 0: 

100 return None 

101 self._pcs_file = files[0] 

102 return self._pcs_file 

103 

104 @property 

105 def wrapper_extension(self: Solver) -> str: 

106 """Get the extension of the wrapper file.""" 

107 if self._wrapper_extension is None: 

108 # Determine which file is the wrapper by sorting alphabetically 

109 wrapper = sorted( 

110 [p for p in self.directory.iterdir() if p.stem == Solver._wrapper_file] 

111 )[0] 

112 self._wrapper_extension = wrapper.suffix 

113 return self._wrapper_extension 

114 

115 @property 

116 def wrapper(self: Solver) -> str: 

117 """Get name of the wrapper file.""" 

118 return f"{Solver._wrapper_file}{self.wrapper_extension}" 

119 

120 @property 

121 def wrapper_file(self: Solver) -> Path: 

122 """Get path of the wrapper file.""" 

123 return self.directory / self.wrapper 

124 

125 def get_pcs_file(self: Solver, port_type: PCSConvention) -> Path: 

126 """Get path of the parameter file of a specific convention. 

127 

128 Args: 

129 port_type: Port type of the parameter file. If None, will return the 

130 file with the shortest name. 

131 

132 Returns: 

133 Path to the parameter file. None if it can not be resolved. 

134 """ 

135 pcs_files = sorted([p for p in self.directory.iterdir() if p.suffix == ".pcs"]) 

136 if port_type is None: 

137 return pcs_files[0] 

138 for file in pcs_files: 

139 if port_type == PCSConverter.get_convention(file): 

140 return file 

141 return None 

142 

143 def read_pcs_file(self: Solver) -> bool: 

144 """Checks if the pcs file can be read.""" 

145 # TODO: Should be a .validate method instead 

146 return PCSConverter.get_convention(self.pcs_file) is not None 

147 

148 def get_configuration_space(self: Solver) -> ConfigurationSpace: 

149 """Get the ConfigurationSpace of the PCS file.""" 

150 if not self.pcs_file: 

151 return None 

152 return PCSConverter.parse(self.pcs_file) 

153 

154 def port_pcs(self: Solver, port_type: PCSConvention) -> None: 

155 """Port the parameter file to the given port type.""" 

156 target_pcs_file = ( 

157 self.pcs_file.parent / f"{self.pcs_file.stem}_{port_type.name}.pcs" 

158 ) 

159 if target_pcs_file.exists(): # Already exists, possibly user defined 

160 return 

161 PCSConverter.export(self.get_configuration_space(), port_type, target_pcs_file) 

162 

163 def build_cmd( 

164 self: Solver, 

165 instance: str | list[str], 

166 objectives: list[SparkleObjective], 

167 seed: int, 

168 cutoff_time: int = None, 

169 configuration: dict = None, 

170 log_dir: Path = None, 

171 ) -> list[str]: 

172 """Build the solver call on an instance with a configuration. 

173 

174 Args: 

175 instance: Path to the instance. 

176 objectives: List of sparkle objectives. 

177 seed: Seed of the solver. 

178 cutoff_time: Cutoff time for the solver. 

179 configuration: Configuration of the solver. 

180 log_dir: Directory path for logs. 

181 

182 Returns: 

183 List of commands and arguments to execute the solver. 

184 """ 

185 if configuration is None: 

186 configuration = {} 

187 # Ensure configuration contains required entries for each wrapper 

188 configuration["solver_dir"] = str(self.directory.absolute()) 

189 configuration["instance"] = instance 

190 configuration["seed"] = seed 

191 configuration["objectives"] = ",".join([str(obj) for obj in objectives]) 

192 configuration["cutoff_time"] = ( 

193 cutoff_time if cutoff_time is not None else sys.maxsize 

194 ) 

195 if "configuration_id" in configuration: 

196 del configuration["configuration_id"] 

197 # Ensure stringification of dictionary will go correctly for key value pairs 

198 configuration = {key: str(configuration[key]) for key in configuration} 

199 solver_cmd = [ 

200 str(self.directory / self.wrapper), 

201 f"'{json.dumps(configuration)}'", 

202 ] 

203 if log_dir is None: 

204 log_dir = Path() 

205 if cutoff_time is not None: # Use RunSolver 

206 log_path_str = instance[0] if isinstance(instance, list) else instance 

207 log_name_base = f"{Path(log_path_str).name}_{self.name}" 

208 return RunSolver.wrap_command( 

209 self.runsolver_exec, 

210 solver_cmd, 

211 cutoff_time, 

212 log_dir, 

213 log_name_base=log_name_base, 

214 ) 

215 return solver_cmd 

216 

217 def run( 

218 self: Solver, 

219 instances: str | list[str] | InstanceSet | list[InstanceSet], 

220 objectives: list[SparkleObjective], 

221 seed: int, 

222 cutoff_time: int = None, 

223 configuration: dict = None, 

224 run_on: Runner = Runner.LOCAL, 

225 sbatch_options: list[str] = None, 

226 slurm_prepend: str | list[str] | Path = None, 

227 log_dir: Path = None, 

228 ) -> SlurmRun | list[dict[str, Any]] | dict[str, Any]: 

229 """Run the solver on an instance with a certain configuration. 

230 

231 Args: 

232 instances: The instance(s) to run the solver on, list in case of multi-file. 

233 In case of an instance set, will run on all instances in the set. 

234 objectives: List of sparkle objectives. 

235 seed: Seed to run the solver with. Fill with abitrary int in case of 

236 determnistic solver. 

237 cutoff_time: The cutoff time for the solver, measured through RunSolver. 

238 If None, will be executed without RunSolver. 

239 configuration: The solver configuration to use. Can be empty. 

240 run_on: Whether to run on slurm or locally. 

241 sbatch_options: The sbatch options to use. 

242 slurm_prepend: The script to prepend to a slurm script. 

243 log_dir: The log directory to use. 

244 

245 Returns: 

246 Solver output dict possibly with runsolver values. 

247 """ 

248 cmds = [] 

249 set_label = instances.name if isinstance(instances, InstanceSet) else "instances" 

250 instances = [instances] if not isinstance(instances, list) else instances 

251 log_dir = Path() if log_dir is None else log_dir 

252 

253 for instance in instances: 

254 paths = ( 

255 instance.instance_paths 

256 if isinstance(instance, InstanceSet) 

257 else [instance] 

258 ) 

259 for instance_path in paths: 

260 instance_path = ( 

261 [str(p) for p in instance_path] 

262 if isinstance(instance_path, list) 

263 else instance_path 

264 ) 

265 solver_cmd = self.build_cmd( 

266 instance_path, 

267 objectives=objectives, 

268 seed=seed, 

269 cutoff_time=cutoff_time, 

270 configuration=configuration, 

271 log_dir=log_dir, 

272 ) 

273 cmds.append(" ".join(solver_cmd)) 

274 

275 commandname = f"Run Solver: {self.name} on {set_label}" 

276 run = rrr.add_to_queue( 

277 runner=run_on, 

278 cmd=cmds, 

279 name=commandname, 

280 base_dir=log_dir, 

281 sbatch_options=sbatch_options, 

282 prepend=slurm_prepend, 

283 ) 

284 

285 if isinstance(run, LocalRun): 

286 run.wait() 

287 if run.status == Status.ERROR: # Subprocess resulted in error 

288 print(f"WARNING: Solver {self.name} execution seems to have failed!\n") 

289 for i, job in enumerate(run.jobs): 

290 print( 

291 f"[Job {i}] The used command was: {cmds[i]}\n" 

292 "The error yielded was:\n" 

293 f"\t-stdout: '{job.stdout}'\n" 

294 f"\t-stderr: '{job.stderr}'\n" 

295 ) 

296 return { 

297 "status": SolverStatus.ERROR, 

298 } 

299 

300 solver_outputs = [] 

301 for i, job in enumerate(run.jobs): 

302 solver_cmd = cmds[i].split(" ") 

303 solver_output = Solver.parse_solver_output( 

304 run.jobs[i].stdout, 

305 solver_call=solver_cmd, 

306 objectives=objectives, 

307 verifier=self.verifier, 

308 ) 

309 solver_outputs.append(solver_output) 

310 return solver_outputs if len(solver_outputs) > 1 else solver_output 

311 return run 

312 

313 def run_performance_dataframe( 

314 self: Solver, 

315 instances: str | list[str] | InstanceSet, 

316 performance_dataframe: PerformanceDataFrame, 

317 config_ids: str | list[str] = None, 

318 run_ids: list[int] | list[list[int]] = None, 

319 cutoff_time: int = None, 

320 objective: SparkleObjective = None, 

321 train_set: InstanceSet = None, 

322 sbatch_options: list[str] = None, 

323 slurm_prepend: str | list[str] | Path = None, 

324 dependencies: list[SlurmRun] = None, 

325 log_dir: Path = None, 

326 base_dir: Path = None, 

327 job_name: str = None, 

328 run_on: Runner = Runner.SLURM, 

329 ) -> Run: 

330 """Run the solver from and place the results in the performance dataframe. 

331 

332 This in practice actually runs Solver.run, but has a little script before/after, 

333 to read and write to the performance dataframe. 

334 

335 Args: 

336 instances: The instance(s) to run the solver on. In case of an instance set, 

337 or list, will create a job for all instances in the set/list. 

338 config_ids: The config indices to use in the performance dataframe. 

339 performance_dataframe: The performance dataframe to use. 

340 run_ids: List of run ids to use. If list of list, a list of runs is given 

341 per instance. Otherwise, all runs are used for each instance. 

342 cutoff_time: The cutoff time for the solver, measured through RunSolver. 

343 objective: The objective to use, only relevant when determining the best 

344 configuration. 

345 train_set: The training set to use. If present, will determine the best 

346 configuration of the solver using these instances and run with it on 

347 all instances in the instance argument. 

348 sbatch_options: List of slurm batch options to use 

349 slurm_prepend: Slurm script to prepend to the sbatch 

350 dependencies: List of slurm runs to use as dependencies 

351 log_dir: Path where to place output files. Defaults to CWD. 

352 base_dir: Path where to place output files. 

353 job_name: Name of the job 

354 If None, will generate a name based on Solver and Instances 

355 run_on: On which platform to run the jobs. Default: Slurm. 

356 

357 Returns: 

358 SlurmRun or Local run of the job. 

359 """ 

360 instances = [instances] if isinstance(instances, str) else instances 

361 set_name = "instances" 

362 if isinstance(instances, InstanceSet): 

363 set_name = instances.name 

364 instances = [str(i) for i in instances.instance_paths] 

365 if not isinstance(config_ids, list): 

366 config_ids = [config_ids] 

367 configurations = [ 

368 performance_dataframe.get_full_configuration(str(self.directory), config_id) 

369 if config_id 

370 else None 

371 for config_id in config_ids 

372 ] 

373 if run_ids is None: 

374 run_ids = performance_dataframe.run_ids 

375 if isinstance(run_ids[0], list): # Runs per instance 

376 combinations = [] 

377 for index, instance in enumerate(instances): 

378 for run_id in run_ids[index]: 

379 combinations.extend( 

380 [ 

381 (instance, config_id, config, run_id) 

382 for config_id, config in zip(config_ids, configurations) 

383 ] 

384 ) 

385 else: # Runs for all instances 

386 import itertools 

387 

388 combinations = [ 

389 (instance, config_data[0], config_data[1], run_id) 

390 for instance, config_data, run_id in itertools.product( 

391 instances, 

392 zip(config_ids, configurations), 

393 performance_dataframe.run_ids, 

394 ) 

395 ] 

396 objective_arg = f"--target-objective {objective.name}" if objective else "" 

397 train_arg = ( 

398 "--best-configuration-instances " 

399 + " ".join([str(i) for i in train_set.instance_paths]) 

400 if train_set 

401 else "" 

402 ) 

403 configuration_args = [ 

404 "" 

405 if not config_id and not config 

406 else f"--configuration-id {config_id}" 

407 if not config 

408 else f"--configuration '{json.dumps(config)}'" 

409 for _, config_id, config, _ in combinations 

410 ] 

411 

412 # We run all instances/configs/runs combinations 

413 # For each value we try to resolve from the PDF, to avoid high read loads during executions 

414 cmds = [ 

415 f"python3 {Solver.solver_cli} " 

416 f"--solver {self.directory} " 

417 f"--instance {instance} " 

418 f"{config_arg} " 

419 # f"{'--configuration-id ' + config_id if not config else '--configuration"' + str(config) + '\"'} " 

420 f"--run-index {run_id} " 

421 f"--objectives {' '.join([obj.name for obj in performance_dataframe.objectives])} " 

422 f"--performance-dataframe {performance_dataframe.csv_filepath} " 

423 f"--cutoff-time {cutoff_time} " 

424 f"--log-dir {log_dir} " 

425 f"--seed {random.randint(0, 2**32 - 1)} " 

426 f"{objective_arg} " 

427 f"{train_arg}" 

428 for (instance, _, _, run_id), config_arg in zip( 

429 combinations, configuration_args 

430 ) 

431 ] 

432 job_name = f"Run: {self.name} on {set_name}" if job_name is None else job_name 

433 r = rrr.add_to_queue( 

434 runner=run_on, 

435 cmd=cmds, 

436 name=job_name, 

437 base_dir=base_dir, 

438 sbatch_options=sbatch_options, 

439 prepend=slurm_prepend, 

440 dependencies=dependencies, 

441 ) 

442 if run_on == Runner.LOCAL: 

443 r.wait() 

444 return r 

445 

446 @staticmethod 

447 def config_str_to_dict(config_str: str) -> dict[str, str]: 

448 """Parse a configuration string to a dictionary.""" 

449 # First we filter the configuration of unwanted characters 

450 config_str = config_str.strip().replace("-", "") 

451 # Then we split the string by spaces, but conserve substrings 

452 config_list = shlex.split(config_str) 

453 # We return empty for empty input OR uneven input 

454 if config_str == "" or config_str == r"{}" or len(config_list) & 1: 

455 return {} 

456 config_dict = {} 

457 for index in range(0, len(config_list), 2): 

458 # As the value will already be a string object, no quotes are allowed in it 

459 value = config_list[index + 1].strip('"').strip("'") 

460 config_dict[config_list[index]] = value 

461 return config_dict 

462 

463 @staticmethod 

464 def parse_solver_output( 

465 solver_output: str, 

466 solver_call: list[str | Path] = None, 

467 objectives: list[SparkleObjective] = None, 

468 verifier: verifiers.SolutionVerifier = None, 

469 ) -> dict[str, Any]: 

470 """Parse the output of the solver. 

471 

472 Args: 

473 solver_output: The output of the solver run which needs to be parsed 

474 solver_call: The solver call used to run the solver 

475 objectives: The objectives to apply to the solver output 

476 verifier: The verifier to check the solver output 

477 

478 Returns: 

479 Dictionary representing the parsed solver output 

480 """ 

481 used_runsolver = False 

482 if solver_call is not None and len(solver_call) > 2: 

483 used_runsolver = True 

484 parsed_output = RunSolver.get_solver_output(solver_call, solver_output) 

485 else: 

486 parsed_output = ast.literal_eval(solver_output) 

487 # cast status attribute from str to Enum 

488 parsed_output["status"] = SolverStatus(parsed_output["status"]) 

489 # Apply objectives to parsed output, runtime based objectives added here 

490 if verifier is not None and used_runsolver: 

491 # Horrible hack to get the instance from the solver input 

492 solver_call_str: str = " ".join(solver_call) 

493 solver_input_str = solver_call_str.split(Solver._wrapper_file, maxsplit=1)[1] 

494 solver_input_str = solver_input_str.split(" ", maxsplit=1)[1] 

495 solver_input_str = solver_input_str[ 

496 solver_input_str.index("{") : solver_input_str.index("}") + 1 

497 ] 

498 solver_input = ast.literal_eval(solver_input_str) 

499 target_instance = Path(solver_input["instance"]) 

500 parsed_output["status"] = verifier.verify( 

501 target_instance, parsed_output, solver_call 

502 ) 

503 

504 # Create objective map 

505 objectives = {o.stem: o for o in objectives} if objectives else {} 

506 removable_keys = ["cutoff_time"] # Keys to remove 

507 

508 # apply objectives to parsed output, runtime based objectives added here 

509 for key, value in parsed_output.items(): 

510 if objectives and key in objectives: 

511 objective = objectives[key] 

512 removable_keys.append(key) # We translate it into the full name 

513 else: 

514 objective = resolve_objective(key) 

515 # If not found in objectives, resolve to which objective the output belongs 

516 if objective is None: # Could not parse, skip 

517 continue 

518 if objective.use_time == UseTime.NO: 

519 if objective.post_process is not None: 

520 parsed_output[key] = objective.post_process(value) 

521 else: 

522 if not used_runsolver: 

523 continue 

524 if objective.use_time == UseTime.CPU_TIME: 

525 parsed_output[key] = parsed_output["cpu_time"] 

526 else: 

527 parsed_output[key] = parsed_output["wall_time"] 

528 if objective.post_process is not None: 

529 parsed_output[key] = objective.post_process( 

530 parsed_output[key], 

531 parsed_output["cutoff_time"], 

532 parsed_output["status"], 

533 ) 

534 

535 # Replace or remove keys based on the objective names 

536 for key in removable_keys: 

537 if key in parsed_output: 

538 if key in objectives: 

539 # Map the result to the objective 

540 parsed_output[objectives[key].name] = parsed_output[key] 

541 if key != objectives[key].name: # Only delete actual mappings 

542 del parsed_output[key] 

543 else: 

544 del parsed_output[key] 

545 return parsed_output