Coverage for sparkle/solver/solver.py: 90%

213 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-07-01 13:21 +0000

1"""File to handle a solver and its directories.""" 

2from __future__ import annotations 

3import sys 

4from typing import Any 

5import shlex 

6import ast 

7import json 

8from pathlib import Path 

9 

10from ConfigSpace import ConfigurationSpace 

11 

12import runrunner as rrr 

13from runrunner.local import LocalRun 

14from runrunner.slurm import Run, SlurmRun 

15from runrunner.base import Status, Runner 

16 

17from sparkle.tools.parameters import PCSConverter, PCSConvention 

18from sparkle.tools import RunSolver 

19from sparkle.types import SparkleCallable, SolverStatus 

20from sparkle.solver import verifiers 

21from sparkle.instance import InstanceSet 

22from sparkle.structures import PerformanceDataFrame 

23from sparkle.types import resolve_objective, SparkleObjective, UseTime 

24 

25 

26class Solver(SparkleCallable): 

27 """Class to handle a solver and its directories.""" 

28 meta_data = "solver_meta.txt" 

29 _wrapper_file = "sparkle_solver_wrapper" 

30 solver_cli = Path(__file__).parent / "solver_cli.py" 

31 

32 def __init__(self: Solver, 

33 directory: Path, 

34 runsolver_exec: Path = None, 

35 deterministic: bool = None, 

36 verifier: verifiers.SolutionVerifier = None) -> None: 

37 """Initialize solver. 

38 

39 Args: 

40 directory: Directory of the solver. 

41 runsolver_exec: Path to the runsolver executable. 

42 By default, runsolver in directory. 

43 deterministic: Bool indicating determinism of the algorithm. 

44 Defaults to False. 

45 verifier: The solution verifier to use. If None, no verifier is used. 

46 """ 

47 super().__init__(directory, runsolver_exec) 

48 self.deterministic = deterministic 

49 self.verifier = verifier 

50 self._pcs_file: Path = None 

51 self._interpreter: str = None 

52 self._wrapper_extension: str = None 

53 

54 meta_data_file = self.directory / Solver.meta_data 

55 if self.runsolver_exec is None: 

56 self.runsolver_exec = self.directory / "runsolver" 

57 if meta_data_file.exists(): 

58 meta_data = ast.literal_eval(meta_data_file.open().read()) 

59 # We only override the deterministic and verifier from file if not set 

60 if self.deterministic is None: 

61 if ("deterministic" in meta_data 

62 and meta_data["deterministic"] is not None): 

63 self.deterministic = meta_data["deterministic"] 

64 if self.verifier is None and "verifier" in meta_data: 

65 if isinstance(meta_data["verifier"], tuple): # File verifier 

66 self.verifier = verifiers.mapping[meta_data["verifier"][0]]( 

67 Path(meta_data["verifier"][1]) 

68 ) 

69 elif meta_data["verifier"] in verifiers.mapping: 

70 self.verifier = verifiers.mapping[meta_data["verifier"]] 

71 if self.deterministic is None: # Default to False 

72 self.deterministic = False 

73 

74 def __str__(self: Solver) -> str: 

75 """Return the string representation of the solver.""" 

76 return self.name 

77 

78 def __repr__(self: Solver) -> str: 

79 """Return detailed representation of the solver.""" 

80 return f"{self.name}:\n"\ 

81 f"\t- Directory: {self.directory}\n"\ 

82 f"\t- Deterministic: {self.deterministic}\n"\ 

83 f"\t- Verifier: {self.verifier}\n"\ 

84 f"\t- PCS File: {self.pcs_file}\n"\ 

85 f"\t- Wrapper: {self.wrapper}" 

86 

87 @property 

88 def pcs_file(self: Solver) -> Path: 

89 """Get path of the parameter file.""" 

90 if self._pcs_file is None: 

91 files = sorted([p for p in self.directory.iterdir() if p.suffix == ".pcs"]) 

92 if len(files) == 0: 

93 return None 

94 self._pcs_file = files[0] 

95 return self._pcs_file 

96 

97 @property 

98 def wrapper_extension(self: Solver) -> str: 

99 """Get the extension of the wrapper file.""" 

100 if self._wrapper_extension is None: 

101 # Determine which file is the wrapper by sorting alphabetically 

102 wrapper = sorted([p for p in self.directory.iterdir() 

103 if p.stem == Solver._wrapper_file])[0] 

104 self._wrapper_extension = wrapper.suffix 

105 return self._wrapper_extension 

106 

107 @property 

108 def wrapper(self: Solver) -> str: 

109 """Get name of the wrapper file.""" 

110 return f"{Solver._wrapper_file}{self.wrapper_extension}" 

111 

112 @property 

113 def wrapper_file(self: Solver) -> Path: 

114 """Get path of the wrapper file.""" 

115 return self.directory / self.wrapper 

116 

117 def get_pcs_file(self: Solver, port_type: PCSConvention) -> Path: 

118 """Get path of the parameter file of a specific convention. 

119 

120 Args: 

121 port_type: Port type of the parameter file. If None, will return the 

122 file with the shortest name. 

123 

124 Returns: 

125 Path to the parameter file. None if it can not be resolved. 

126 """ 

127 pcs_files = sorted([p for p in self.directory.iterdir() if p.suffix == ".pcs"]) 

128 if port_type is None: 

129 return pcs_files[0] 

130 for file in pcs_files: 

131 if port_type == PCSConverter.get_convention(file): 

132 return file 

133 return None 

134 

135 def read_pcs_file(self: Solver) -> bool: 

136 """Checks if the pcs file can be read.""" 

137 # TODO: Should be a .validate method instead 

138 return PCSConverter.get_convention(self.pcs_file) is not None 

139 

140 def get_configuration_space(self: Solver) -> ConfigurationSpace: 

141 """Get the ConfigurationSpace of the PCS file.""" 

142 if not self.pcs_file: 

143 return None 

144 return PCSConverter.parse(self.pcs_file) 

145 

146 def port_pcs(self: Solver, port_type: PCSConvention) -> None: 

147 """Port the parameter file to the given port type.""" 

148 target_pcs_file =\ 

149 self.pcs_file.parent / f"{self.pcs_file.stem}_{port_type.name}.pcs" 

150 if target_pcs_file.exists(): # Already exists, possibly user defined 

151 return 

152 PCSConverter.export(self.get_configuration_space(), port_type, target_pcs_file) 

153 

154 def build_cmd(self: Solver, 

155 instance: str | list[str], 

156 objectives: list[SparkleObjective], 

157 seed: int, 

158 cutoff_time: int = None, 

159 configuration: dict = None, 

160 log_dir: Path = None) -> list[str]: 

161 """Build the solver call on an instance with a configuration. 

162 

163 Args: 

164 instance: Path to the instance. 

165 seed: Seed of the solver. 

166 cutoff_time: Cutoff time for the solver. 

167 configuration: Configuration of the solver. 

168 

169 Returns: 

170 List of commands and arguments to execute the solver. 

171 """ 

172 if configuration is None: 

173 configuration = {} 

174 # Ensure configuration contains required entries for each wrapper 

175 configuration["solver_dir"] = str(self.directory.absolute()) 

176 configuration["instance"] = instance 

177 configuration["seed"] = seed 

178 configuration["objectives"] = ",".join([str(obj) for obj in objectives]) 

179 configuration["cutoff_time"] =\ 

180 cutoff_time if cutoff_time is not None else sys.maxsize 

181 if "configuration_id" in configuration: 

182 del configuration["configuration_id"] 

183 # Ensure stringification of dictionary will go correctly for key value pairs 

184 configuration = {key: str(configuration[key]) for key in configuration} 

185 solver_cmd = [str(self.directory / self.wrapper), 

186 f"'{json.dumps(configuration)}'"] 

187 if log_dir is None: 

188 log_dir = Path() 

189 if cutoff_time is not None: # Use RunSolver 

190 log_path_str = instance[0] if isinstance(instance, list) else instance 

191 log_name_base = f"{Path(log_path_str).name}_{self.name}" 

192 return RunSolver.wrap_command(self.runsolver_exec, 

193 solver_cmd, 

194 cutoff_time, 

195 log_dir, 

196 log_name_base=log_name_base) 

197 return solver_cmd 

198 

199 def run(self: Solver, 

200 instances: str | list[str] | InstanceSet | list[InstanceSet], 

201 objectives: list[SparkleObjective], 

202 seed: int, 

203 cutoff_time: int = None, 

204 configuration: dict = None, 

205 run_on: Runner = Runner.LOCAL, 

206 sbatch_options: list[str] = None, 

207 slurm_prepend: str | list[str] | Path = None, 

208 log_dir: Path = None, 

209 ) -> SlurmRun | list[dict[str, Any]] | dict[str, Any]: 

210 """Run the solver on an instance with a certain configuration. 

211 

212 Args: 

213 instance: The instance(s) to run the solver on, list in case of multi-file. 

214 In case of an instance set, will run on all instances in the set. 

215 seed: Seed to run the solver with. Fill with abitrary int in case of 

216 determnistic solver. 

217 cutoff_time: The cutoff time for the solver, measured through RunSolver. 

218 If None, will be executed without RunSolver. 

219 configuration: The solver configuration to use. Can be empty. 

220 run_on: Whether to run on slurm or locally. 

221 sbatch_options: The sbatch options to use. 

222 slurm_prepend: The script to prepend to a slurm script. 

223 log_dir: The log directory to use. 

224 

225 Returns: 

226 Solver output dict possibly with runsolver values. 

227 """ 

228 cmds = [] 

229 set_label = instances.name if isinstance( 

230 instances, InstanceSet) else "instances" 

231 instances = [instances] if not isinstance(instances, list) else instances 

232 log_dir = Path() if log_dir is None else log_dir 

233 for instance in instances: 

234 paths = instance.instance_paths if isinstance(instance, 

235 InstanceSet) else [instance] 

236 for instance_path in paths: 

237 instance_path = [str(p) for p in instance_path] if isinstance( 

238 instance_path, list) else instance_path 

239 solver_cmd = self.build_cmd(instance_path, 

240 objectives=objectives, 

241 seed=seed, 

242 cutoff_time=cutoff_time, 

243 configuration=configuration, 

244 log_dir=log_dir) 

245 cmds.append(" ".join(solver_cmd)) 

246 

247 commandname = f"Run Solver: {self.name} on {set_label}" 

248 run = rrr.add_to_queue(runner=run_on, 

249 cmd=cmds, 

250 name=commandname, 

251 base_dir=log_dir, 

252 sbatch_options=sbatch_options, 

253 prepend=slurm_prepend) 

254 

255 if isinstance(run, LocalRun): 

256 run.wait() 

257 if run.status == Status.ERROR: # Subprocess resulted in error 

258 print(f"WARNING: Solver {self.name} execution seems to have failed!\n") 

259 for i, job in enumerate(run.jobs): 

260 print(f"[Job {i}] The used command was: {cmds[i]}\n" 

261 "The error yielded was:\n" 

262 f"\t-stdout: '{job.stdout}'\n" 

263 f"\t-stderr: '{job.stderr}'\n") 

264 return {"status": SolverStatus.ERROR, } 

265 

266 solver_outputs = [] 

267 for i, job in enumerate(run.jobs): 

268 solver_cmd = cmds[i].split(" ") 

269 solver_output = Solver.parse_solver_output(run.jobs[i].stdout, 

270 solver_call=solver_cmd, 

271 objectives=objectives, 

272 verifier=self.verifier) 

273 solver_outputs.append(solver_output) 

274 return solver_outputs if len(solver_outputs) > 1 else solver_output 

275 return run 

276 

277 def run_performance_dataframe( 

278 self: Solver, 

279 instances: str | list[str] | InstanceSet, 

280 config_ids: str | list[str], 

281 performance_dataframe: PerformanceDataFrame, 

282 run_ids: list[int] | list[list[int]] = None, 

283 cutoff_time: int = None, 

284 objective: SparkleObjective = None, 

285 train_set: InstanceSet = None, 

286 sbatch_options: list[str] = None, 

287 slurm_prepend: str | list[str] | Path = None, 

288 dependencies: list[SlurmRun] = None, 

289 log_dir: Path = None, 

290 base_dir: Path = None, 

291 job_name: str = None, 

292 run_on: Runner = Runner.SLURM) -> Run: 

293 """Run the solver from and place the results in the performance dataframe. 

294 

295 This in practice actually runs Solver.run, but has a little script before/after, 

296 to read and write to the performance dataframe. 

297 

298 Args: 

299 instance: The instance(s) to run the solver on. In case of an instance set, 

300 or list, will create a job for all instances in the set/list. 

301 config_ids: The config indices to use in the performance dataframe. 

302 performance_dataframe: The performance dataframe to use. 

303 run_ids: List of run ids to use. If list of list, a list of runs is given 

304 per instance. Otherwise, all runs are used for each instance. 

305 cutoff_time: The cutoff time for the solver, measured through RunSolver. 

306 objective: The objective to use, only relevant for train set best config 

307 determining 

308 train_set: The training set to use. If present, will determine the best 

309 configuration of the solver using these instances and run with it on 

310 all instances in the instance argument. 

311 sbatch_options: List of slurm batch options to use 

312 slurm_prepend: Slurm script to prepend to the sbatch 

313 dependencies: List of slurm runs to use as dependencies 

314 log_dir: Path where to place output files. Defaults to CWD. 

315 base_dir: Path where to place output files. 

316 job_name: Name of the job 

317 If None, will generate a name based on Solver and Instances 

318 run_on: On which platform to run the jobs. Default: Slurm. 

319 

320 Returns: 

321 SlurmRun or Local run of the job. 

322 """ 

323 instances = [instances] if isinstance(instances, str) else instances 

324 set_name = "instances" 

325 if isinstance(instances, InstanceSet): 

326 set_name = instances.name 

327 instances = [str(i) for i in instances.instance_paths] 

328 if not isinstance(config_ids, list): 

329 config_ids = [config_ids] 

330 if run_ids is None: 

331 run_ids = performance_dataframe.run_ids 

332 if isinstance(run_ids[0], list): # Runs per instance 

333 combinations = [] 

334 for index, instance in enumerate(instances): 

335 for run_id in run_ids[index]: 

336 combinations.extend([(instance, config_id, run_id) 

337 for config_id in config_ids]) 

338 else: # Runs for all instances 

339 import itertools 

340 combinations = [(instance, config_id, run_id) for instance, config_id, run_id 

341 in itertools.product(instances, config_ids, 

342 performance_dataframe.run_ids)] 

343 objective_arg = f"--target-objective {objective.name}" if objective else "" 

344 train_arg =\ 

345 " ".join([str(i) for i in train_set.instance_paths]) if train_set else "" 

346 # We run all instances/configs/runs combinations 

347 cmds = [ 

348 f"python3 {Solver.solver_cli} " 

349 f"--solver {self.directory} " 

350 f"--instance {instance} " 

351 f"--configuration-id {config_id} " 

352 f"--run-index {run_id} " 

353 f"--performance-dataframe {performance_dataframe.csv_filepath} " 

354 f"--cutoff-time {cutoff_time} " 

355 f"--log-dir {log_dir} " 

356 f"{objective_arg} " 

357 f"{'--best-configuration-instances' if train_set else ''} {train_arg}" 

358 for instance, config_id, run_id in combinations] 

359 job_name = f"Run: {self.name} on {set_name}" if job_name is None else job_name 

360 r = rrr.add_to_queue( 

361 runner=run_on, 

362 cmd=cmds, 

363 name=job_name, 

364 base_dir=base_dir, 

365 sbatch_options=sbatch_options, 

366 prepend=slurm_prepend, 

367 dependencies=dependencies 

368 ) 

369 if run_on == Runner.LOCAL: 

370 r.wait() 

371 return r 

372 

373 @staticmethod 

374 def config_str_to_dict(config_str: str) -> dict[str, str]: 

375 """Parse a configuration string to a dictionary.""" 

376 # First we filter the configuration of unwanted characters 

377 config_str = config_str.strip().replace("-", "") 

378 # Then we split the string by spaces, but conserve substrings 

379 config_list = shlex.split(config_str) 

380 # We return empty for empty input OR uneven input 

381 if config_str == "" or config_str == r"{}" or len(config_list) & 1: 

382 return {} 

383 config_dict = {} 

384 for index in range(0, len(config_list), 2): 

385 # As the value will already be a string object, no quotes are allowed in it 

386 value = config_list[index + 1].strip('"').strip("'") 

387 config_dict[config_list[index]] = value 

388 return config_dict 

389 

390 @staticmethod 

391 def parse_solver_output( 

392 solver_output: str, 

393 solver_call: list[str | Path] = None, 

394 objectives: list[SparkleObjective] = None, 

395 verifier: verifiers.SolutionVerifier = None) -> dict[str, Any]: 

396 """Parse the output of the solver. 

397 

398 Args: 

399 solver_output: The output of the solver run which needs to be parsed 

400 solver_call: The solver call used to run the solver 

401 objectives: The objectives to apply to the solver output 

402 verifier: The verifier to check the solver output 

403 

404 Returns: 

405 Dictionary representing the parsed solver output 

406 """ 

407 used_runsolver = False 

408 if solver_call is not None and len(solver_call) > 2: 

409 used_runsolver = True 

410 parsed_output = RunSolver.get_solver_output(solver_call, 

411 solver_output) 

412 else: 

413 parsed_output = ast.literal_eval(solver_output) 

414 # cast status attribute from str to Enum 

415 parsed_output["status"] = SolverStatus(parsed_output["status"]) 

416 # Apply objectives to parsed output, runtime based objectives added here 

417 if verifier is not None and used_runsolver: 

418 # Horrible hack to get the instance from the solver input 

419 solver_call_str: str = " ".join(solver_call) 

420 solver_input_str = solver_call_str.split(Solver._wrapper_file, maxsplit=1)[1] 

421 solver_input_str = solver_input_str.split(" ", maxsplit=1)[1] 

422 solver_input_str = solver_input_str[solver_input_str.index("{"): 

423 solver_input_str.index("}") + 1] 

424 solver_input = ast.literal_eval(solver_input_str) 

425 target_instance = Path(solver_input["instance"]) 

426 parsed_output["status"] = verifier.verify( 

427 target_instance, parsed_output, solver_call) 

428 

429 # Create objective map 

430 objectives = {o.stem: o for o in objectives} if objectives else {} 

431 removable_keys = ["cutoff_time"] # Keys to remove 

432 

433 # apply objectives to parsed output, runtime based objectives added here 

434 for key, value in parsed_output.items(): 

435 if objectives and key in objectives: 

436 objective = objectives[key] 

437 removable_keys.append(key) # We translate it into the full name 

438 else: 

439 objective = resolve_objective(key) 

440 # If not found in objectives, resolve to which objective the output belongs 

441 if objective is None: # Could not parse, skip 

442 continue 

443 if objective.use_time == UseTime.NO: 

444 if objective.post_process is not None: 

445 parsed_output[key] = objective.post_process(value) 

446 else: 

447 if not used_runsolver: 

448 continue 

449 if objective.use_time == UseTime.CPU_TIME: 

450 parsed_output[key] = parsed_output["cpu_time"] 

451 else: 

452 parsed_output[key] = parsed_output["wall_time"] 

453 if objective.post_process is not None: 

454 parsed_output[key] = objective.post_process( 

455 parsed_output[key], 

456 parsed_output["cutoff_time"], 

457 parsed_output["status"]) 

458 

459 # Replace or remove keys based on the objective names 

460 for key in removable_keys: 

461 if key in parsed_output: 

462 if key in objectives: 

463 # Map the result to the objective 

464 parsed_output[objectives[key].name] = parsed_output[key] 

465 if key != objectives[key].name: # Only delete actual mappings 

466 del parsed_output[key] 

467 else: 

468 del parsed_output[key] 

469 return parsed_output