Coverage for sparkle/CLI/run_solvers.py: 88%

160 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-09-29 10:17 +0000

1#!/usr/bin/env python3 

2"""Sparkle command to run solvers to get their performance data.""" 

3 

4from __future__ import annotations 

5import random 

6import sys 

7import argparse 

8from pathlib import Path 

9 

10from runrunner.base import Runner, Run 

11 

12from sparkle.solver import Solver 

13from sparkle.instance import Instance_Set 

14from sparkle.structures import PerformanceDataFrame 

15from sparkle.types import SparkleObjective, resolve_objective 

16from sparkle.instance import InstanceSet 

17from sparkle.platform.settings_objects import Settings 

18from sparkle.CLI.help import global_variables as gv 

19from sparkle.CLI.help import logging as sl 

20from sparkle.CLI.help import argparse_custom as ac 

21from sparkle.CLI.help.nicknames import resolve_object_name, resolve_instance_name 

22from sparkle.CLI.initialise import check_for_initialise 

23 

24 

25def parser_function() -> argparse.ArgumentParser: 

26 """Define the command line arguments.""" 

27 parser = argparse.ArgumentParser( 

28 description="Run solvers on instances to get their performance data." 

29 ) 

30 parser.add_argument(*ac.SolversArgument.names, **ac.SolversArgument.kwargs) 

31 parser.add_argument( 

32 *ac.InstanceSetPathsArgument.names, **ac.InstanceSetPathsArgument.kwargs 

33 ) 

34 

35 # Mutually exclusive: specific configuration or best configuration 

36 configuration_group = parser.add_mutually_exclusive_group() 

37 configuration_group.add_argument( 

38 *ac.ConfigurationArgument.names, **ac.ConfigurationArgument.kwargs 

39 ) 

40 configuration_group.add_argument( 

41 *ac.BestConfigurationArgument.names, **ac.BestConfigurationArgument.kwargs 

42 ) 

43 configuration_group.add_argument( 

44 *ac.AllConfigurationArgument.names, **ac.AllConfigurationArgument.kwargs 

45 ) 

46 parser.add_argument(*ac.ObjectiveArgument.names, **ac.ObjectiveArgument.kwargs) 

47 parser.add_argument( 

48 *ac.PerformanceDataJobsArgument.names, **ac.PerformanceDataJobsArgument.kwargs 

49 ) 

50 # This one is only relevant if the argument above is given 

51 parser.add_argument( 

52 *ac.RecomputeRunSolversArgument.names, **ac.RecomputeRunSolversArgument.kwargs 

53 ) 

54 # Settings arguments 

55 parser.add_argument(*ac.SettingsFileArgument.names, **ac.SettingsFileArgument.kwargs) 

56 parser.add_argument( 

57 *Settings.OPTION_solver_cutoff_time.args, 

58 **Settings.OPTION_solver_cutoff_time.kwargs, 

59 ) 

60 parser.add_argument(*Settings.OPTION_run_on.args, **Settings.OPTION_run_on.kwargs) 

61 return parser 

62 

63 

64def run_solvers( 

65 solvers: list[Solver], 

66 instances: list[str] | list[InstanceSet], 

67 objectives: list[SparkleObjective], 

68 seed: int, 

69 cutoff_time: int, 

70 configurations: list[list[dict[str, str]]], 

71 sbatch_options: list[str] = None, 

72 slurm_prepend: str | list[str] | Path = None, 

73 log_dir: Path = None, 

74 run_on: Runner = Runner.SLURM, 

75) -> list[Run]: 

76 """Run the solvers. 

77 

78 Parameters 

79 ---------- 

80 solvers: list[solvers] 

81 The solvers to run 

82 instances: list[str] | list[InstanceSet] 

83 The instances to run the solvers on 

84 objectives: list[SparkleObjective] 

85 The objective values to retrieve from the solvers 

86 seed: int 

87 The seed to use 

88 cutoff_time: int 

89 The cut off time for the solvers 

90 configurations: list[list[str]] 

91 The configurations to use for each solver 

92 sbatch_options: list[str] 

93 The sbatch options to use for the solvers 

94 slurm_prepend: str | list[str] | Path 

95 The script to prepend to a slurm script 

96 log_dir: Path 

97 The directory to use for the logs 

98 run_on: Runner 

99 Where to execute the solvers. 

100 

101 Returns 

102 ------- 

103 run: runrunner.LocalRun or runrunner.SlurmRun 

104 """ 

105 runs = [] 

106 # Run the solvers 

107 for solver, solver_confs in zip(solvers, configurations): 

108 for conf_index, conf in enumerate(solver_confs): 

109 if "configuration_id" in conf.keys(): 

110 conf_name = conf["configuration_id"] 

111 else: 

112 conf_name = conf_index 

113 run = solver.run( 

114 instances=instances, 

115 objectives=objectives, 

116 seed=seed, 

117 configuration=conf, 

118 cutoff_time=cutoff_time, 

119 run_on=run_on, 

120 sbatch_options=sbatch_options, 

121 slurm_prepend=slurm_prepend, 

122 log_dir=log_dir, 

123 ) 

124 if run_on == Runner.LOCAL: 

125 if isinstance(run, dict): 

126 run = [run] 

127 # TODO: Refactor resolving objective keys 

128 status_key = [key for key in run[0] if key.lower().startswith("status")][ 

129 0 

130 ] 

131 time_key = [key for key in run[0] if key.lower().startswith("cpu_time")][ 

132 0 

133 ] 

134 for i, solver_output in enumerate(run): 

135 print( 

136 f"Execution of {solver.name} ({conf_name}) on instance " 

137 f"{instances[i]} completed with status " 

138 f"{solver_output[status_key]} in {solver_output[time_key]} " 

139 f"seconds." 

140 ) 

141 print("Running configured solver done!") 

142 else: 

143 runs.append(run) 

144 return runs 

145 

146 

147def run_solvers_performance_data( 

148 performance_data: PerformanceDataFrame, 

149 cutoff_time: int, 

150 rerun: bool = False, 

151 solvers: list[Solver] = None, 

152 instances: list[str] = None, 

153 sbatch_options: list[str] = None, 

154 slurm_prepend: str | list[str] | Path = None, 

155 run_on: Runner = Runner.SLURM, 

156) -> list[Run]: 

157 """Run the solvers for the performance data. 

158 

159 Parameters 

160 ---------- 

161 performance_data: PerformanceDataFrame 

162 The performance data 

163 cutoff_time: int 

164 The cut off time for the solvers 

165 rerun: bool 

166 Run only solvers for which no data is available yet (False) or (re)run all 

167 solvers to get (new) performance data for them (True) 

168 solvers: list[solvers] 

169 The solvers to run. If None, run all found solvers. 

170 instances: list[str] 

171 The instances to run the solvers on. If None, run all found instances. 

172 sbatch_options: list[str] 

173 The sbatch options to use 

174 slurm_prepend: str | list[str] | Path 

175 The script to prepend to a slurm script 

176 run_on: Runner 

177 Where to execute the solvers. For available values see runrunner.base.Runner 

178 enum. Default: "Runner.SLURM". 

179 

180 Returns 

181 ------- 

182 run: runrunner.LocalRun or runrunner.SlurmRun 

183 If the run is local return a QueuedRun object with the information concerning 

184 the run. 

185 """ 

186 # List of jobs to do 

187 jobs = performance_data.get_job_list(rerun=rerun) 

188 

189 # Edit jobs to incorporate file paths 

190 jobs_with_paths = [] 

191 for solver, config, instance, run in jobs: 

192 instance_path = resolve_instance_name( 

193 instance, gv.settings().DEFAULT_instance_dir 

194 ) 

195 jobs_with_paths.append((solver, config, instance_path, run)) 

196 jobs = jobs_with_paths 

197 

198 print(f"Total number of jobs to run: {len(jobs)}") 

199 # If there are no jobs, stop 

200 if len(jobs) == 0: 

201 return None 

202 

203 if run_on == Runner.LOCAL: 

204 print("Running the solvers locally") 

205 elif run_on == Runner.SLURM: 

206 print("Running the solvers through Slurm") 

207 

208 solvers = performance_data.solvers if solvers is None else solvers 

209 if solvers is None: 

210 solver_keys = performance_data.solvers 

211 solvers = [Solver(Path(s)) for s in solver_keys] 

212 else: # Filter the Solvers 

213 solver_keys = [str(s.directory) for s in solvers] 

214 jobs = [j for j in jobs if j[0] in solver_keys] 

215 # Filter the instances 

216 if instances is not None: 

217 jobs = [j for j in jobs if j[2] in instances] 

218 # Sort the jobs per solver 

219 solver_jobs = {p_solver: {} for p_solver, _, _, _ in jobs} 

220 for p_solver, p_config, p_instance, p_run in jobs: 

221 if p_config not in solver_jobs[p_solver]: 

222 solver_jobs[p_solver][p_config] = {} 

223 if p_instance not in solver_jobs[p_solver][p_config]: 

224 solver_jobs[p_solver][p_config][p_instance] = [p_run] 

225 else: 

226 solver_jobs[p_solver][p_config][p_instance].append(p_run) 

227 runrunner_runs = [] 

228 if run_on == Runner.LOCAL: 

229 print(f"Cutoff time for each solver run: {cutoff_time} seconds") 

230 for solver, solver_key in zip(solvers, solver_keys): 

231 for solver_config in solver_jobs[solver_key].keys(): 

232 solver_instances = solver_jobs[solver_key][solver_config].keys() 

233 run_ids = [ 

234 solver_jobs[solver_key][solver_config][instance] 

235 for instance in solver_instances 

236 ] 

237 if solver_instances == []: 

238 print(f"Warning: No jobs for instances found for solver {solver_key}") 

239 continue 

240 run = solver.run_performance_dataframe( 

241 solver_instances, 

242 performance_data, 

243 solver_config, 

244 run_ids=run_ids, 

245 cutoff_time=cutoff_time, 

246 sbatch_options=sbatch_options, 

247 slurm_prepend=slurm_prepend, 

248 log_dir=sl.caller_log_dir, 

249 base_dir=sl.caller_log_dir, 

250 run_on=run_on, 

251 ) 

252 runrunner_runs.append(run) 

253 if run_on == Runner.LOCAL: 

254 # Do some printing? 

255 pass 

256 if run_on == Runner.SLURM: 

257 num_jobs = sum(len(r.jobs) for r in runrunner_runs) 

258 print(f"Total number of jobs submitted: {num_jobs}") 

259 

260 return runrunner_runs 

261 

262 

263def main(argv: list[str]) -> None: 

264 """Main function of the run solvers command.""" 

265 # Define command line arguments 

266 parser = parser_function() 

267 

268 # Process command line arguments 

269 args = parser.parse_args(argv) 

270 settings = gv.settings(args) 

271 

272 # Log command call 

273 sl.log_command(sys.argv, seed=settings.random_state) 

274 check_for_initialise() 

275 

276 if args.best_configuration: 

277 if not args.objective: 

278 objective = settings.objectives[0] 

279 print( 

280 "WARNING: Best configuration requested, but no objective specified. " 

281 f"Defaulting to first objective: {objective}" 

282 ) 

283 else: 

284 objective = resolve_objective(args.objective) 

285 

286 # Compare current settings to latest.ini 

287 prev_settings = Settings(Settings.DEFAULT_previous_settings_path) 

288 Settings.check_settings_changes(settings, prev_settings) 

289 

290 if args.solvers: 

291 solvers = [ 

292 resolve_object_name( 

293 solver_path, 

294 gv.file_storage_data_mapping[gv.solver_nickname_list_path], 

295 settings.DEFAULT_solver_dir, 

296 Solver, 

297 ) 

298 for solver_path in args.solvers 

299 ] 

300 else: 

301 solvers = [ 

302 Solver(p) for p in settings.DEFAULT_solver_dir.iterdir() if p.is_dir() 

303 ] 

304 

305 if args.instance_path: 

306 instances = [ 

307 resolve_object_name( 

308 instance_path, 

309 gv.file_storage_data_mapping[gv.instances_nickname_path], 

310 settings.DEFAULT_instance_dir, 

311 Instance_Set, 

312 ) 

313 for instance_path in args.instance_path 

314 ] 

315 # Unpack the sets into instance strings 

316 instances = [str(path) for set in instances for path in set.instance_paths] 

317 else: 

318 instances = None # TODO: Fix? Or its good like this 

319 

320 sbatch_options = settings.sbatch_settings 

321 slurm_prepend = settings.slurm_job_prepend 

322 # Write settings to file before starting, since they are used in callback scripts 

323 settings.write_used_settings() 

324 run_on = settings.run_on 

325 cutoff_time = settings.solver_cutoff_time 

326 # Open the performance data csv file 

327 performance_dataframe = PerformanceDataFrame(settings.DEFAULT_performance_data_path) 

328 

329 print("Start running solvers ...") 

330 if args.performance_data_jobs: 

331 runs = run_solvers_performance_data( 

332 performance_data=performance_dataframe, 

333 solvers=solvers, 

334 instances=instances, 

335 cutoff_time=cutoff_time, 

336 rerun=args.recompute, 

337 sbatch_options=sbatch_options, 

338 slurm_prepend=slurm_prepend, 

339 run_on=run_on, 

340 ) 

341 else: 

342 if args.best_configuration: 

343 train_instances = None 

344 if isinstance(args.best_configuration, list): 

345 train_instances = [ 

346 resolve_object_name( 

347 instance_path, 

348 gv.file_storage_data_mapping[gv.instances_nickname_path], 

349 settings.DEFAULT_instance_dir, 

350 Instance_Set, 

351 ) 

352 for instance_path in args.best_configuration 

353 ] 

354 # Unpack the sets into instance strings 

355 instances = [ 

356 str(path) for set in train_instances for path in set.instance_paths 

357 ] 

358 # Determine best configuration 

359 configurations = [ 

360 [ 

361 performance_dataframe.best_configuration( 

362 str(solver.directory), objective, train_instances 

363 )[0] 

364 ] 

365 for solver in solvers 

366 ] 

367 elif args.configuration: 

368 # Sort the configurations to the solvers 

369 # TODO: Add a better check that the id could only match this solver 

370 configurations = [] 

371 for solver in solvers: 

372 configurations.append([]) 

373 for c in args.configuration: 

374 if c not in performance_dataframe.configuration_ids: 

375 raise ValueError(f"Configuration id {c} not found.") 

376 if c in performance_dataframe.get_configurations( 

377 str(solver.directory) 

378 ): 

379 configurations[-1].append(c) 

380 elif args.all_configurations: # All known configurations 

381 configurations = [ 

382 performance_dataframe.get_configurations(str(solver.directory)) 

383 for solver in solvers 

384 ] 

385 else: # Only default configurations 

386 configurations = [ 

387 [PerformanceDataFrame.default_configuration] for _ in solvers 

388 ] 

389 # Look up and replace with the actual configurations 

390 for solver_index, configs in enumerate(configurations): 

391 for config_index, config in enumerate(configs): 

392 configurations[solver_index][config_index] = ( 

393 performance_dataframe.get_full_configuration( 

394 str(solvers[solver_index].directory), config 

395 ) 

396 ) 

397 if instances is None: 

398 instances = [] 

399 for instance_dir in settings.DEFAULT_instance_dir.iterdir(): 

400 if instance_dir.is_dir(): 

401 instances.append(Instance_Set(instance_dir)) 

402 

403 # TODO Objective arg not used in Multi-file-instances case? 

404 runs = run_solvers( 

405 solvers=solvers, 

406 configurations=configurations, 

407 instances=instances, 

408 objectives=settings.objectives, 

409 seed=random.randint(0, 2**32 - 1), 

410 cutoff_time=cutoff_time, 

411 sbatch_options=sbatch_options, 

412 slurm_prepend=slurm_prepend, 

413 log_dir=sl.caller_log_dir, 

414 run_on=run_on, 

415 ) 

416 

417 # If there are no jobs return 

418 if runs is None or all(run is None for run in runs): 

419 print("Running solvers done!") 

420 elif run_on == Runner.SLURM: 

421 print( 

422 "Running solvers through Slurm with job id(s): " 

423 f"{','.join(r.run_id for r in runs if r is not None)}" 

424 ) 

425 sys.exit(0) 

426 

427 

428if __name__ == "__main__": 

429 main(sys.argv[1:])