Coverage for src / sparkle / CLI / run_solvers.py: 88%

155 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 15:31 +0000

1#!/usr/bin/env python3 

2"""Sparkle command to run solvers to get their performance data.""" 

3 

4from __future__ import annotations 

5import random 

6import sys 

7import argparse 

8from pathlib import Path 

9 

10from runrunner.base import Runner, Run 

11 

12from sparkle.solver import Solver 

13from sparkle.instance import Instance_Set 

14from sparkle.structures import PerformanceDataFrame 

15from sparkle.types import SparkleObjective, resolve_objective 

16from sparkle.instance import InstanceSet 

17from sparkle.platform.settings_objects import Settings 

18from sparkle.CLI.help import global_variables as gv 

19from sparkle.CLI.help import logging as sl 

20from sparkle.CLI.help import argparse_custom as ac 

21from sparkle.CLI.help.nicknames import resolve_object_name, resolve_instance_name 

22from sparkle.CLI.initialise import check_for_initialise 

23 

24 

25def parser_function() -> argparse.ArgumentParser: 

26 """Define the command line arguments.""" 

27 parser = argparse.ArgumentParser( 

28 description="Run solvers on instances to get their performance data." 

29 ) 

30 parser.add_argument(*ac.SolversArgument.names, **ac.SolversArgument.kwargs) 

31 parser.add_argument( 

32 *ac.InstanceSetPathsArgument.names, **ac.InstanceSetPathsArgument.kwargs 

33 ) 

34 

35 # Mutually exclusive: specific configuration or best configuration 

36 configuration_group = parser.add_mutually_exclusive_group() 

37 configuration_group.add_argument( 

38 *ac.ConfigurationArgument.names, **ac.ConfigurationArgument.kwargs 

39 ) 

40 configuration_group.add_argument( 

41 *ac.BestConfigurationArgument.names, **ac.BestConfigurationArgument.kwargs 

42 ) 

43 configuration_group.add_argument( 

44 *ac.AllConfigurationArgument.names, **ac.AllConfigurationArgument.kwargs 

45 ) 

46 parser.add_argument(*ac.ObjectiveArgument.names, **ac.ObjectiveArgument.kwargs) 

47 parser.add_argument( 

48 *ac.PerformanceDataJobsArgument.names, **ac.PerformanceDataJobsArgument.kwargs 

49 ) 

50 # This one is only relevant if the argument above is given 

51 parser.add_argument( 

52 *ac.RecomputeRunSolversArgument.names, **ac.RecomputeRunSolversArgument.kwargs 

53 ) 

54 # Settings arguments 

55 parser.add_argument(*ac.SettingsFileArgument.names, **ac.SettingsFileArgument.kwargs) 

56 parser.add_argument( 

57 *Settings.OPTION_solver_cutoff_time.args, 

58 **Settings.OPTION_solver_cutoff_time.kwargs, 

59 ) 

60 parser.add_argument(*Settings.OPTION_run_on.args, **Settings.OPTION_run_on.kwargs) 

61 return parser 

62 

63 

64def run_solvers( 

65 solvers: list[Solver], 

66 instances: list[str] | list[InstanceSet], 

67 objectives: list[SparkleObjective], 

68 seed: int, 

69 cutoff_time: int, 

70 configurations: list[list[dict[str, str]]], 

71 sbatch_options: list[str] = None, 

72 slurm_prepend: str | list[str] | Path = None, 

73 log_dir: Path = None, 

74 run_on: Runner = Runner.SLURM, 

75) -> list[Run]: 

76 """Run the solvers. 

77 

78 Parameters 

79 ---------- 

80 solvers: list[solvers] 

81 The solvers to run 

82 instances: list[str] | list[InstanceSet] 

83 The instances to run the solvers on 

84 objectives: list[SparkleObjective] 

85 The objective values to retrieve from the solvers 

86 seed: int 

87 The seed to use 

88 cutoff_time: int 

89 The cut off time for the solvers 

90 configurations: list[list[str]] 

91 The configurations to use for each solver 

92 sbatch_options: list[str] 

93 The sbatch options to use for the solvers 

94 slurm_prepend: str | list[str] | Path 

95 The script to prepend to a slurm script 

96 log_dir: Path 

97 The directory to use for the logs 

98 run_on: Runner 

99 Where to execute the solvers. 

100 

101 Returns 

102 ------- 

103 run: runrunner.LocalRun or runrunner.SlurmRun 

104 """ 

105 runs = [] 

106 # Run the solvers 

107 for solver, solver_confs in zip(solvers, configurations): 

108 for conf_index, conf in enumerate(solver_confs): 

109 if "configuration_id" in conf.keys(): 

110 conf_name = conf["configuration_id"] 

111 else: 

112 conf_name = conf_index 

113 run = solver.run( 

114 instances=instances, 

115 objectives=objectives, 

116 seed=seed, 

117 configuration=conf, 

118 cutoff_time=cutoff_time, 

119 run_on=run_on, 

120 sbatch_options=sbatch_options, 

121 slurm_prepend=slurm_prepend, 

122 log_dir=log_dir, 

123 ) 

124 if run_on == Runner.LOCAL: 

125 if isinstance(run, dict): 

126 run = [run] 

127 # TODO: Refactor resolving objective keys 

128 status_key = [key for key in run[0] if key.lower().startswith("status")][ 

129 0 

130 ] 

131 time_key = [key for key in run[0] if key.lower().startswith("cpu_time")][ 

132 0 

133 ] 

134 for i, solver_output in enumerate(run): 

135 print( 

136 f"Execution of {solver.name} ({conf_name}) on instance " 

137 f"{instances[i]} completed with status " 

138 f"{solver_output[status_key]} in {solver_output[time_key]} " 

139 f"seconds." 

140 ) 

141 print("Running configured solver done!") 

142 else: 

143 runs.append(run) 

144 return runs 

145 

146 

147def run_solvers_performance_data( 

148 performance_data: PerformanceDataFrame, 

149 cutoff_time: int, 

150 rerun: bool = False, 

151 solvers: list[Solver] = None, 

152 instances: list[str] = None, 

153 sbatch_options: list[str] = None, 

154 slurm_prepend: str | list[str] | Path = None, 

155 run_on: Runner = Runner.SLURM, 

156) -> list[Run]: 

157 """Run the solvers for the performance data. 

158 

159 Parameters 

160 ---------- 

161 performance_data: PerformanceDataFrame 

162 The performance data 

163 cutoff_time: int 

164 The cut off time for the solvers 

165 rerun: bool 

166 Run only solvers for which no data is available yet (False) or (re)run all 

167 solvers to get (new) performance data for them (True) 

168 solvers: list[solvers] 

169 The solvers to run. If None, run all found solvers. 

170 instances: list[str] 

171 The instances to run the solvers on. If None, run all found instances. 

172 sbatch_options: list[str] 

173 The sbatch options to use 

174 slurm_prepend: str | list[str] | Path 

175 The script to prepend to a slurm script 

176 run_on: Runner 

177 Where to execute the solvers. For available values see runrunner.base.Runner 

178 enum. Default: "Runner.SLURM". 

179 

180 Returns 

181 ------- 

182 run: runrunner.LocalRun or runrunner.SlurmRun 

183 If the run is local return a QueuedRun object with the information concerning 

184 the run. 

185 """ 

186 jobs = performance_data.get_job_list(rerun=rerun) # List of jobs to do 

187 

188 # Edit jobs to incorporate file paths 

189 for index, (solver, config, instance, run) in enumerate(jobs): 

190 instance_path = resolve_instance_name( 

191 instance, gv.settings().DEFAULT_instance_dir 

192 ) 

193 jobs[index] = (solver, config, instance_path, run) 

194 

195 print(f"Total number of jobs to run: {len(jobs)}") 

196 if len(jobs) == 0: # If there are no jobs, stop 

197 return None 

198 

199 if run_on == Runner.LOCAL: 

200 print("Running the solvers locally") 

201 elif run_on == Runner.SLURM: 

202 print("Running the solvers through Slurm") 

203 

204 if solvers is None: 

205 solvers = [Solver(Path(s)) for s in performance_data.solvers] 

206 else: # Filter the Solvers in remaining jobs 

207 jobs = [ 

208 (solvers[solvers.index(s)], c, i, r) for (s, c, i, r) in jobs if s in solvers 

209 ] 

210 

211 if instances is not None: # Filter the instances 

212 jobs = [j for j in jobs if j[2] in instances] 

213 

214 # Sort the jobs per solver 

215 solver_jobs = {p_solver: {} for p_solver in solvers} 

216 for p_solver, p_config, p_instance, p_run in jobs: 

217 if p_config not in solver_jobs[p_solver]: 

218 solver_jobs[p_solver][p_config] = {} 

219 if p_instance not in solver_jobs[p_solver][p_config]: 

220 solver_jobs[p_solver][p_config][p_instance] = [p_run] 

221 else: 

222 solver_jobs[p_solver][p_config][p_instance].append(p_run) 

223 

224 runrunner_runs = [] 

225 if run_on == Runner.LOCAL: 

226 print(f"Cutoff time for each solver run: {cutoff_time} seconds") 

227 for solver in solvers: 

228 for solver_config in solver_jobs[solver].keys(): 

229 solver_instances = solver_jobs[solver][solver_config].keys() 

230 run_ids = [ 

231 solver_jobs[solver][solver_config][instance] 

232 for instance in solver_instances 

233 ] 

234 if solver_instances == []: 

235 print(f"Warning: No jobs for instances found for solver {solver}") 

236 continue 

237 run = solver.run_performance_dataframe( 

238 solver_instances, 

239 performance_data, 

240 solver_config, 

241 run_ids=run_ids, 

242 cutoff_time=cutoff_time, 

243 sbatch_options=sbatch_options, 

244 slurm_prepend=slurm_prepend, 

245 log_dir=sl.caller_log_dir, 

246 base_dir=sl.caller_log_dir, 

247 run_on=run_on, 

248 ) 

249 runrunner_runs.append(run) 

250 if run_on == Runner.LOCAL: 

251 # Do some printing? 

252 pass 

253 if run_on == Runner.SLURM: 

254 num_jobs = sum(len(r.jobs) for r in runrunner_runs) 

255 print(f"Total number of jobs submitted: {num_jobs}") 

256 

257 return runrunner_runs 

258 

259 

260def main(argv: list[str]) -> None: 

261 """Main function of the run solvers command.""" 

262 # Define command line arguments 

263 parser = parser_function() 

264 

265 # Process command line arguments 

266 args = parser.parse_args(argv) 

267 settings = gv.settings(args) 

268 

269 # Log command call 

270 sl.log_command(sys.argv, seed=settings.random_state) 

271 check_for_initialise() 

272 

273 if args.best_configuration: 

274 if not args.objective: 

275 objective = settings.objectives[0] 

276 print( 

277 "WARNING: Best configuration requested, but no objective specified. " 

278 f"Defaulting to first objective: {objective}" 

279 ) 

280 else: 

281 objective = resolve_objective(args.objective) 

282 

283 # Compare current settings to latest.ini 

284 prev_settings = Settings(Settings.DEFAULT_previous_settings_path) 

285 Settings.check_settings_changes(settings, prev_settings) 

286 

287 if args.solvers: 

288 solvers = [ 

289 resolve_object_name( 

290 solver_path, 

291 gv.file_storage_data_mapping[gv.solver_nickname_list_path], 

292 settings.DEFAULT_solver_dir, 

293 Solver, 

294 ) 

295 for solver_path in args.solvers 

296 ] 

297 else: 

298 solvers = [ 

299 Solver(p) for p in settings.DEFAULT_solver_dir.iterdir() if p.is_dir() 

300 ] 

301 

302 if args.instance_path: 

303 instances = [ 

304 resolve_object_name( 

305 instance_path, 

306 gv.file_storage_data_mapping[gv.instances_nickname_path], 

307 settings.DEFAULT_instance_dir, 

308 Instance_Set, 

309 ) 

310 for instance_path in args.instance_path 

311 ] 

312 # Unpack the sets into instance strings 

313 instances = [str(path) for set in instances for path in set.instance_paths] 

314 else: 

315 instances = None # TODO: Fix? Or its good like this 

316 

317 sbatch_options = settings.sbatch_settings 

318 slurm_prepend = settings.slurm_job_prepend 

319 # Write settings to file before starting, since they are used in callback scripts 

320 settings.write_used_settings() 

321 run_on = settings.run_on 

322 cutoff_time = settings.solver_cutoff_time 

323 # Open the performance data csv file 

324 performance_dataframe = PerformanceDataFrame(settings.DEFAULT_performance_data_path) 

325 

326 print("Start running solvers ...") 

327 if args.performance_data_jobs: 

328 runs = run_solvers_performance_data( 

329 performance_data=performance_dataframe, 

330 solvers=solvers, 

331 instances=instances, 

332 cutoff_time=cutoff_time, 

333 rerun=args.recompute, 

334 sbatch_options=sbatch_options, 

335 slurm_prepend=slurm_prepend, 

336 run_on=run_on, 

337 ) 

338 else: 

339 if args.best_configuration: 

340 train_instances = None 

341 if isinstance(args.best_configuration, list): 

342 train_instances = [ 

343 resolve_object_name( 

344 instance_path, 

345 gv.file_storage_data_mapping[gv.instances_nickname_path], 

346 settings.DEFAULT_instance_dir, 

347 Instance_Set, 

348 ) 

349 for instance_path in args.best_configuration 

350 ] 

351 # Unpack the sets into instance strings 

352 instances = [ 

353 str(path) for set in train_instances for path in set.instance_paths 

354 ] 

355 # Determine best configuration 

356 configurations = [ 

357 [ 

358 performance_dataframe.best_configuration( 

359 str(solver.directory), objective, train_instances 

360 )[0] 

361 ] 

362 for solver in solvers 

363 ] 

364 elif args.configuration: 

365 # Sort the configurations to the solvers 

366 # TODO: Add a better check that the id could only match this solver 

367 configurations = [] 

368 for solver in solvers: 

369 configurations.append([]) 

370 for c in args.configuration: 

371 if c not in performance_dataframe.configuration_ids: 

372 raise ValueError(f"Configuration id {c} not found.") 

373 if c in performance_dataframe.get_configurations( 

374 str(solver.directory) 

375 ): 

376 configurations[-1].append(c) 

377 elif args.all_configurations: # All known configurations 

378 configurations = [ 

379 performance_dataframe.get_configurations(str(solver.directory)) 

380 for solver in solvers 

381 ] 

382 else: # Only default configurations 

383 configurations = [ 

384 [PerformanceDataFrame.default_configuration] for _ in solvers 

385 ] 

386 # Look up and replace with the actual configurations 

387 for solver_index, configs in enumerate(configurations): 

388 for config_index, config in enumerate(configs): 

389 configurations[solver_index][config_index] = ( 

390 performance_dataframe.get_full_configuration( 

391 str(solvers[solver_index].directory), config 

392 ) 

393 ) 

394 if instances is None: 

395 instances = [] 

396 for instance_dir in settings.DEFAULT_instance_dir.iterdir(): 

397 if instance_dir.is_dir(): 

398 instances.append(Instance_Set(instance_dir)) 

399 

400 # TODO Objective arg not used in Multi-file-instances case? 

401 runs = run_solvers( 

402 solvers=solvers, 

403 configurations=configurations, 

404 instances=instances, 

405 objectives=settings.objectives, 

406 seed=random.randint(0, 2**32 - 1), 

407 cutoff_time=cutoff_time, 

408 sbatch_options=sbatch_options, 

409 slurm_prepend=slurm_prepend, 

410 log_dir=sl.caller_log_dir, 

411 run_on=run_on, 

412 ) 

413 

414 # If there are no jobs return 

415 if runs is None or all(run is None for run in runs): 

416 print("Running solvers done!") 

417 elif run_on == Runner.SLURM: 

418 print( 

419 "Running solvers through Slurm with job id(s): " 

420 f"{','.join(r.run_id for r in runs if r is not None)}" 

421 ) 

422 sys.exit(0) 

423 

424 

425if __name__ == "__main__": 

426 main(sys.argv[1:])