Coverage for sparkle/solver/solver.py: 92%

214 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-09-29 10:17 +0000

1"""File to handle a solver and its directories.""" 

2 

3from __future__ import annotations 

4import sys 

5from typing import Any 

6import shlex 

7import ast 

8import json 

9import random 

10from pathlib import Path 

11 

12from ConfigSpace import ConfigurationSpace 

13 

14import runrunner as rrr 

15from runrunner.local import LocalRun 

16from runrunner.slurm import Run, SlurmRun 

17from runrunner.base import Status, Runner 

18 

19from sparkle.tools.parameters import PCSConverter, PCSConvention 

20from sparkle.tools import RunSolver 

21from sparkle.types import SparkleCallable, SolverStatus 

22from sparkle.solver import verifiers 

23from sparkle.instance import InstanceSet 

24from sparkle.structures import PerformanceDataFrame 

25from sparkle.types import resolve_objective, SparkleObjective, UseTime 

26 

27 

28class Solver(SparkleCallable): 

29 """Class to handle a solver and its directories.""" 

30 

31 meta_data = "solver_meta.txt" 

32 _wrapper_file = "sparkle_solver_wrapper" 

33 solver_cli = Path(__file__).parent / "solver_cli.py" 

34 

35 def __init__( 

36 self: Solver, 

37 directory: Path, 

38 runsolver_exec: Path = None, 

39 deterministic: bool = None, 

40 verifier: verifiers.SolutionVerifier = None, 

41 ) -> None: 

42 """Initialize solver. 

43 

44 Args: 

45 directory: Directory of the solver. 

46 runsolver_exec: Path to the runsolver executable. 

47 By default, runsolver in directory. 

48 deterministic: Bool indicating determinism of the algorithm. 

49 Defaults to False. 

50 verifier: The solution verifier to use. If None, no verifier is used. 

51 """ 

52 super().__init__(directory, runsolver_exec) 

53 self.deterministic = deterministic 

54 self.verifier = verifier 

55 self._pcs_file: Path = None 

56 self._interpreter: str = None 

57 self._wrapper_extension: str = None 

58 

59 meta_data_file = self.directory / Solver.meta_data 

60 if meta_data_file.exists(): 

61 meta_data = ast.literal_eval(meta_data_file.open().read()) 

62 # We only override the deterministic and verifier from file if not set 

63 if self.deterministic is None: 

64 if ( 

65 "deterministic" in meta_data 

66 and meta_data["deterministic"] is not None 

67 ): 

68 self.deterministic = meta_data["deterministic"] 

69 if self.verifier is None and "verifier" in meta_data: 

70 if isinstance(meta_data["verifier"], tuple): # File verifier 

71 self.verifier = verifiers.mapping[meta_data["verifier"][0]]( 

72 Path(meta_data["verifier"][1]) 

73 ) 

74 elif meta_data["verifier"] in verifiers.mapping: 

75 self.verifier = verifiers.mapping[meta_data["verifier"]] 

76 if self.deterministic is None: # Default to False 

77 self.deterministic = False 

78 

79 def __str__(self: Solver) -> str: 

80 """Return the string representation of the solver.""" 

81 return self.name 

82 

83 def __repr__(self: Solver) -> str: 

84 """Return detailed representation of the solver.""" 

85 return ( 

86 f"{self.name}:\n" 

87 f"\t- Directory: {self.directory}\n" 

88 f"\t- Deterministic: {self.deterministic}\n" 

89 f"\t- Verifier: {self.verifier}\n" 

90 f"\t- PCS File: {self.pcs_file}\n" 

91 f"\t- Wrapper: {self.wrapper}" 

92 ) 

93 

94 @property 

95 def pcs_file(self: Solver) -> Path: 

96 """Get path of the parameter file.""" 

97 if self._pcs_file is None: 

98 files = sorted([p for p in self.directory.iterdir() if p.suffix == ".pcs"]) 

99 if len(files) == 0: 

100 return None 

101 self._pcs_file = files[0] 

102 return self._pcs_file 

103 

104 @property 

105 def wrapper_extension(self: Solver) -> str: 

106 """Get the extension of the wrapper file.""" 

107 if self._wrapper_extension is None: 

108 # Determine which file is the wrapper by sorting alphabetically 

109 wrapper = sorted( 

110 [p for p in self.directory.iterdir() if p.stem == Solver._wrapper_file] 

111 )[0] 

112 self._wrapper_extension = wrapper.suffix 

113 return self._wrapper_extension 

114 

115 @property 

116 def wrapper(self: Solver) -> str: 

117 """Get name of the wrapper file.""" 

118 return f"{Solver._wrapper_file}{self.wrapper_extension}" 

119 

120 @property 

121 def wrapper_file(self: Solver) -> Path: 

122 """Get path of the wrapper file.""" 

123 return self.directory / self.wrapper 

124 

125 def get_pcs_file(self: Solver, port_type: PCSConvention) -> Path: 

126 """Get path of the parameter file of a specific convention. 

127 

128 Args: 

129 port_type: Port type of the parameter file. If None, will return the 

130 file with the shortest name. 

131 

132 Returns: 

133 Path to the parameter file. None if it can not be resolved. 

134 """ 

135 pcs_files = sorted([p for p in self.directory.iterdir() if p.suffix == ".pcs"]) 

136 if port_type is None: 

137 return pcs_files[0] 

138 for file in pcs_files: 

139 if port_type == PCSConverter.get_convention(file): 

140 return file 

141 return None 

142 

143 def read_pcs_file(self: Solver) -> bool: 

144 """Checks if the pcs file can be read.""" 

145 # TODO: Should be a .validate method instead 

146 return PCSConverter.get_convention(self.pcs_file) is not None 

147 

148 def get_configuration_space(self: Solver) -> ConfigurationSpace: 

149 """Get the ConfigurationSpace of the PCS file.""" 

150 if not self.pcs_file: 

151 return None 

152 return PCSConverter.parse(self.pcs_file) 

153 

154 def port_pcs(self: Solver, port_type: PCSConvention) -> None: 

155 """Port the parameter file to the given port type.""" 

156 target_pcs_file = ( 

157 self.pcs_file.parent / f"{self.pcs_file.stem}_{port_type.name}.pcs" 

158 ) 

159 if target_pcs_file.exists(): # Already exists, possibly user defined 

160 return 

161 PCSConverter.export(self.get_configuration_space(), port_type, target_pcs_file) 

162 

163 def build_cmd( 

164 self: Solver, 

165 instance: str | list[str], 

166 objectives: list[SparkleObjective], 

167 seed: int, 

168 cutoff_time: int = None, 

169 configuration: dict = None, 

170 log_dir: Path = None, 

171 ) -> list[str]: 

172 """Build the solver call on an instance with a configuration. 

173 

174 Args: 

175 instance: Path to the instance. 

176 objectives: List of sparkle objectives. 

177 seed: Seed of the solver. 

178 cutoff_time: Cutoff time for the solver. 

179 configuration: Configuration of the solver. 

180 log_dir: Directory path for logs. 

181 

182 Returns: 

183 List of commands and arguments to execute the solver. 

184 """ 

185 if configuration is None: 

186 configuration = {} 

187 # Ensure configuration contains required entries for each wrapper 

188 configuration["solver_dir"] = str(self.directory.absolute()) 

189 configuration["instance"] = instance 

190 configuration["seed"] = seed 

191 configuration["objectives"] = ",".join([str(obj) for obj in objectives]) 

192 configuration["cutoff_time"] = ( 

193 cutoff_time if cutoff_time is not None else sys.maxsize 

194 ) 

195 if "configuration_id" in configuration: 

196 del configuration["configuration_id"] 

197 # Ensure stringification of dictionary will go correctly for key value pairs 

198 configuration = {key: str(configuration[key]) for key in configuration} 

199 solver_cmd = [ 

200 str(self.directory / self.wrapper), 

201 f"'{json.dumps(configuration)}'", 

202 ] 

203 if log_dir is None: 

204 log_dir = Path() 

205 if cutoff_time is not None: # Use RunSolver 

206 log_path_str = instance[0] if isinstance(instance, list) else instance 

207 log_name_base = f"{Path(log_path_str).name}_{self.name}" 

208 return RunSolver.wrap_command( 

209 self.runsolver_exec, 

210 solver_cmd, 

211 cutoff_time, 

212 log_dir, 

213 log_name_base=log_name_base, 

214 ) 

215 return solver_cmd 

216 

217 def run( 

218 self: Solver, 

219 instances: str | list[str] | InstanceSet | list[InstanceSet], 

220 objectives: list[SparkleObjective], 

221 seed: int, 

222 cutoff_time: int = None, 

223 configuration: dict = None, 

224 run_on: Runner = Runner.LOCAL, 

225 sbatch_options: list[str] = None, 

226 slurm_prepend: str | list[str] | Path = None, 

227 log_dir: Path = None, 

228 ) -> SlurmRun | list[dict[str, Any]] | dict[str, Any]: 

229 """Run the solver on an instance with a certain configuration. 

230 

231 Args: 

232 instances: The instance(s) to run the solver on, list in case of multi-file. 

233 In case of an instance set, will run on all instances in the set. 

234 objectives: List of sparkle objectives. 

235 seed: Seed to run the solver with. Fill with abitrary int in case of 

236 determnistic solver. 

237 cutoff_time: The cutoff time for the solver, measured through RunSolver. 

238 If None, will be executed without RunSolver. 

239 configuration: The solver configuration to use. Can be empty. 

240 run_on: Whether to run on slurm or locally. 

241 sbatch_options: The sbatch options to use. 

242 slurm_prepend: The script to prepend to a slurm script. 

243 log_dir: The log directory to use. 

244 

245 Returns: 

246 Solver output dict possibly with runsolver values. 

247 """ 

248 cmds = [] 

249 set_label = instances.name if isinstance(instances, InstanceSet) else "instances" 

250 instances = [instances] if not isinstance(instances, list) else instances 

251 log_dir = Path() if log_dir is None else log_dir 

252 for instance in instances: 

253 paths = ( 

254 instance.instance_paths 

255 if isinstance(instance, InstanceSet) 

256 else [instance] 

257 ) 

258 for instance_path in paths: 

259 instance_path = ( 

260 [str(p) for p in instance_path] 

261 if isinstance(instance_path, list) 

262 else instance_path 

263 ) 

264 solver_cmd = self.build_cmd( 

265 instance_path, 

266 objectives=objectives, 

267 seed=seed, 

268 cutoff_time=cutoff_time, 

269 configuration=configuration, 

270 log_dir=log_dir, 

271 ) 

272 cmds.append(" ".join(solver_cmd)) 

273 

274 commandname = f"Run Solver: {self.name} on {set_label}" 

275 run = rrr.add_to_queue( 

276 runner=run_on, 

277 cmd=cmds, 

278 name=commandname, 

279 base_dir=log_dir, 

280 sbatch_options=sbatch_options, 

281 prepend=slurm_prepend, 

282 ) 

283 

284 if isinstance(run, LocalRun): 

285 run.wait() 

286 if run.status == Status.ERROR: # Subprocess resulted in error 

287 print(f"WARNING: Solver {self.name} execution seems to have failed!\n") 

288 for i, job in enumerate(run.jobs): 

289 print( 

290 f"[Job {i}] The used command was: {cmds[i]}\n" 

291 "The error yielded was:\n" 

292 f"\t-stdout: '{job.stdout}'\n" 

293 f"\t-stderr: '{job.stderr}'\n" 

294 ) 

295 return { 

296 "status": SolverStatus.ERROR, 

297 } 

298 

299 solver_outputs = [] 

300 for i, job in enumerate(run.jobs): 

301 solver_cmd = cmds[i].split(" ") 

302 solver_output = Solver.parse_solver_output( 

303 run.jobs[i].stdout, 

304 solver_call=solver_cmd, 

305 objectives=objectives, 

306 verifier=self.verifier, 

307 ) 

308 solver_outputs.append(solver_output) 

309 return solver_outputs if len(solver_outputs) > 1 else solver_output 

310 return run 

311 

312 def run_performance_dataframe( 

313 self: Solver, 

314 instances: str | list[str] | InstanceSet, 

315 performance_dataframe: PerformanceDataFrame, 

316 config_ids: str | list[str] = None, 

317 run_ids: list[int] | list[list[int]] = None, 

318 cutoff_time: int = None, 

319 objective: SparkleObjective = None, 

320 train_set: InstanceSet = None, 

321 sbatch_options: list[str] = None, 

322 slurm_prepend: str | list[str] | Path = None, 

323 dependencies: list[SlurmRun] = None, 

324 log_dir: Path = None, 

325 base_dir: Path = None, 

326 job_name: str = None, 

327 run_on: Runner = Runner.SLURM, 

328 ) -> Run: 

329 """Run the solver from and place the results in the performance dataframe. 

330 

331 This in practice actually runs Solver.run, but has a little script before/after, 

332 to read and write to the performance dataframe. 

333 

334 Args: 

335 instances: The instance(s) to run the solver on. In case of an instance set, 

336 or list, will create a job for all instances in the set/list. 

337 config_ids: The config indices to use in the performance dataframe. 

338 performance_dataframe: The performance dataframe to use. 

339 run_ids: List of run ids to use. If list of list, a list of runs is given 

340 per instance. Otherwise, all runs are used for each instance. 

341 cutoff_time: The cutoff time for the solver, measured through RunSolver. 

342 objective: The objective to use, only relevant when determining the best 

343 configuration. 

344 train_set: The training set to use. If present, will determine the best 

345 configuration of the solver using these instances and run with it on 

346 all instances in the instance argument. 

347 sbatch_options: List of slurm batch options to use 

348 slurm_prepend: Slurm script to prepend to the sbatch 

349 dependencies: List of slurm runs to use as dependencies 

350 log_dir: Path where to place output files. Defaults to CWD. 

351 base_dir: Path where to place output files. 

352 job_name: Name of the job 

353 If None, will generate a name based on Solver and Instances 

354 run_on: On which platform to run the jobs. Default: Slurm. 

355 

356 Returns: 

357 SlurmRun or Local run of the job. 

358 """ 

359 instances = [instances] if isinstance(instances, str) else instances 

360 set_name = "instances" 

361 if isinstance(instances, InstanceSet): 

362 set_name = instances.name 

363 instances = [str(i) for i in instances.instance_paths] 

364 if not isinstance(config_ids, list): 

365 config_ids = [config_ids] 

366 configurations = [ 

367 performance_dataframe.get_full_configuration(str(self.directory), config_id) 

368 if config_id 

369 else None 

370 for config_id in config_ids 

371 ] 

372 if run_ids is None: 

373 run_ids = performance_dataframe.run_ids 

374 if isinstance(run_ids[0], list): # Runs per instance 

375 combinations = [] 

376 for index, instance in enumerate(instances): 

377 for run_id in run_ids[index]: 

378 combinations.extend( 

379 [ 

380 (instance, config_id, config, run_id) 

381 for config_id, config in zip(config_ids, configurations) 

382 ] 

383 ) 

384 else: # Runs for all instances 

385 import itertools 

386 

387 combinations = [ 

388 (instance, config_data[0], config_data[1], run_id) 

389 for instance, config_data, run_id in itertools.product( 

390 instances, 

391 zip(config_ids, configurations), 

392 performance_dataframe.run_ids, 

393 ) 

394 ] 

395 objective_arg = f"--target-objective {objective.name}" if objective else "" 

396 train_arg = ( 

397 "--best-configuration-instances " 

398 + " ".join([str(i) for i in train_set.instance_paths]) 

399 if train_set 

400 else "" 

401 ) 

402 configuration_args = [ 

403 "" 

404 if not config_id and not config 

405 else f"--configuration-id {config_id}" 

406 if not config 

407 else f"--configuration '{json.dumps(config)}'" 

408 for _, config_id, config, _ in combinations 

409 ] 

410 

411 # We run all instances/configs/runs combinations 

412 # For each value we try to resolve from the PDF, to avoid high read loads during executions 

413 cmds = [ 

414 f"python3 {Solver.solver_cli} " 

415 f"--solver {self.directory} " 

416 f"--instance {instance} " 

417 f"{config_arg} " 

418 # f"{'--configuration-id ' + config_id if not config else '--configuration"' + str(config) + '\"'} " 

419 f"--run-index {run_id} " 

420 f"--objectives {' '.join([obj.name for obj in performance_dataframe.objectives])} " 

421 f"--performance-dataframe {performance_dataframe.csv_filepath} " 

422 f"--cutoff-time {cutoff_time} " 

423 f"--log-dir {log_dir} " 

424 f"--seed {random.randint(0, 2**32 - 1)} " 

425 f"{objective_arg} " 

426 f"{train_arg}" 

427 for (instance, _, _, run_id), config_arg in zip( 

428 combinations, configuration_args 

429 ) 

430 ] 

431 job_name = f"Run: {self.name} on {set_name}" if job_name is None else job_name 

432 r = rrr.add_to_queue( 

433 runner=run_on, 

434 cmd=cmds, 

435 name=job_name, 

436 base_dir=base_dir, 

437 sbatch_options=sbatch_options, 

438 prepend=slurm_prepend, 

439 dependencies=dependencies, 

440 ) 

441 if run_on == Runner.LOCAL: 

442 r.wait() 

443 return r 

444 

445 @staticmethod 

446 def config_str_to_dict(config_str: str) -> dict[str, str]: 

447 """Parse a configuration string to a dictionary.""" 

448 # First we filter the configuration of unwanted characters 

449 config_str = config_str.strip().replace("-", "") 

450 # Then we split the string by spaces, but conserve substrings 

451 config_list = shlex.split(config_str) 

452 # We return empty for empty input OR uneven input 

453 if config_str == "" or config_str == r"{}" or len(config_list) & 1: 

454 return {} 

455 config_dict = {} 

456 for index in range(0, len(config_list), 2): 

457 # As the value will already be a string object, no quotes are allowed in it 

458 value = config_list[index + 1].strip('"').strip("'") 

459 config_dict[config_list[index]] = value 

460 return config_dict 

461 

462 @staticmethod 

463 def parse_solver_output( 

464 solver_output: str, 

465 solver_call: list[str | Path] = None, 

466 objectives: list[SparkleObjective] = None, 

467 verifier: verifiers.SolutionVerifier = None, 

468 ) -> dict[str, Any]: 

469 """Parse the output of the solver. 

470 

471 Args: 

472 solver_output: The output of the solver run which needs to be parsed 

473 solver_call: The solver call used to run the solver 

474 objectives: The objectives to apply to the solver output 

475 verifier: The verifier to check the solver output 

476 

477 Returns: 

478 Dictionary representing the parsed solver output 

479 """ 

480 used_runsolver = False 

481 if solver_call is not None and len(solver_call) > 2: 

482 used_runsolver = True 

483 parsed_output = RunSolver.get_solver_output(solver_call, solver_output) 

484 else: 

485 parsed_output = ast.literal_eval(solver_output) 

486 # cast status attribute from str to Enum 

487 parsed_output["status"] = SolverStatus(parsed_output["status"]) 

488 # Apply objectives to parsed output, runtime based objectives added here 

489 if verifier is not None and used_runsolver: 

490 # Horrible hack to get the instance from the solver input 

491 solver_call_str: str = " ".join(solver_call) 

492 solver_input_str = solver_call_str.split(Solver._wrapper_file, maxsplit=1)[1] 

493 solver_input_str = solver_input_str.split(" ", maxsplit=1)[1] 

494 solver_input_str = solver_input_str[ 

495 solver_input_str.index("{") : solver_input_str.index("}") + 1 

496 ] 

497 solver_input = ast.literal_eval(solver_input_str) 

498 target_instance = Path(solver_input["instance"]) 

499 parsed_output["status"] = verifier.verify( 

500 target_instance, parsed_output, solver_call 

501 ) 

502 

503 # Create objective map 

504 objectives = {o.stem: o for o in objectives} if objectives else {} 

505 removable_keys = ["cutoff_time"] # Keys to remove 

506 

507 # apply objectives to parsed output, runtime based objectives added here 

508 for key, value in parsed_output.items(): 

509 if objectives and key in objectives: 

510 objective = objectives[key] 

511 removable_keys.append(key) # We translate it into the full name 

512 else: 

513 objective = resolve_objective(key) 

514 # If not found in objectives, resolve to which objective the output belongs 

515 if objective is None: # Could not parse, skip 

516 continue 

517 if objective.use_time == UseTime.NO: 

518 if objective.post_process is not None: 

519 parsed_output[key] = objective.post_process(value) 

520 else: 

521 if not used_runsolver: 

522 continue 

523 if objective.use_time == UseTime.CPU_TIME: 

524 parsed_output[key] = parsed_output["cpu_time"] 

525 else: 

526 parsed_output[key] = parsed_output["wall_time"] 

527 if objective.post_process is not None: 

528 parsed_output[key] = objective.post_process( 

529 parsed_output[key], 

530 parsed_output["cutoff_time"], 

531 parsed_output["status"], 

532 ) 

533 

534 # Replace or remove keys based on the objective names 

535 for key in removable_keys: 

536 if key in parsed_output: 

537 if key in objectives: 

538 # Map the result to the objective 

539 parsed_output[objectives[key].name] = parsed_output[key] 

540 if key != objectives[key].name: # Only delete actual mappings 

541 del parsed_output[key] 

542 else: 

543 del parsed_output[key] 

544 return parsed_output