Coverage for sparkle/CLI/run_solvers.py: 91%

135 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-03 10:42 +0000

1#!/usr/bin/env python3 

2"""Sparkle command to run solvers to get their performance data.""" 

3from __future__ import annotations 

4import random 

5import sys 

6import ast 

7import argparse 

8from pathlib import PurePath, Path 

9 

10from runrunner.base import Runner, Run 

11 

12from sparkle.solver import Solver 

13from sparkle.instance import Instance_Set 

14from sparkle.structures import PerformanceDataFrame 

15from sparkle.types import SparkleObjective, resolve_objective 

16from sparkle.platform.settings_objects import Settings, SettingState 

17from sparkle.CLI.help import global_variables as gv 

18from sparkle.CLI.help import logging as sl 

19from sparkle.CLI.help import argparse_custom as ac 

20from sparkle.CLI.help.nicknames import resolve_object_name 

21from sparkle.CLI.initialise import check_for_initialise 

22 

23 

24def parser_function() -> argparse.ArgumentParser: 

25 """Define the command line arguments.""" 

26 parser = argparse.ArgumentParser( 

27 description="Run solvers on instances to get their performance data.") 

28 parser.add_argument(*ac.SolversArgument.names, 

29 **ac.SolversArgument.kwargs) 

30 parser.add_argument(*ac.InstanceSetPathsArgument.names, 

31 **ac.InstanceSetPathsArgument.kwargs) 

32 

33 # Mutually exclusive: specific configuration or best configuration 

34 configuration_group = parser.add_mutually_exclusive_group() 

35 configuration_group.add_argument(*ac.ConfigurationArgument.names, 

36 **ac.ConfigurationArgument.kwargs) 

37 configuration_group.add_argument(*ac.BestConfigurationArgument.names, 

38 **ac.BestConfigurationArgument.kwargs) 

39 

40 parser.add_argument(*ac.ObjectiveArgument.names, 

41 **ac.ObjectiveArgument.kwargs) 

42 parser.add_argument(*ac.PerformanceDataJobsArgument.names, 

43 **ac.PerformanceDataJobsArgument.kwargs) 

44 # This one is only relevant if the argument above is given 

45 parser.add_argument(*ac.RecomputeRunSolversArgument.names, 

46 **ac.RecomputeRunSolversArgument.kwargs) 

47 parser.add_argument(*ac.TargetCutOffTimeArgument.names, 

48 **ac.TargetCutOffTimeArgument.kwargs) 

49 parser.add_argument(*ac.RunOnArgument.names, 

50 **ac.RunOnArgument.kwargs) 

51 parser.add_argument(*ac.SettingsFileArgument.names, 

52 **ac.SettingsFileArgument.kwargs) 

53 return parser 

54 

55 

56def run_solvers( 

57 solvers: list[Solver], 

58 instances: list[str], 

59 objectives: list[SparkleObjective], 

60 seed: int, 

61 cutoff_time: int, 

62 configuration: list[dict] = None, 

63 sbatch_options: list[str] = None, 

64 slurm_prepend: str | list[str] | Path = None, 

65 log_dir: Path = None, 

66 run_on: Runner = Runner.SLURM,) -> list[Run]: 

67 """Run the solvers. 

68 

69 Parameters 

70 ---------- 

71 solvers: list[solvers] 

72 The solvers to run 

73 instances: list[str] 

74 The instances to run the solvers on 

75 objectives: list[SparkleObjective] 

76 The objective values to retrieve from the solvers 

77 seed: int 

78 The seed to use 

79 cutoff_time: int 

80 The cut off time for the solvers 

81 configuration: list[dict] 

82 The configuration to use for the solvers 

83 sbatch_options: list[str] 

84 The sbatch options to use for the solvers 

85 slurm_prepend: str | list[str] | Path 

86 The script to prepend to a slurm script 

87 log_dir: Path 

88 The directory to use for the logs 

89 run_on: Runner 

90 Where to execute the solvers. 

91 

92 Returns 

93 ------- 

94 run: runrunner.LocalRun or runrunner.SlurmRun 

95 """ 

96 runs = [] 

97 # Run the solvers 

98 for solver, configuration in zip(solvers, configuration): 

99 run = solver.run(instances=instances, 

100 objectives=objectives, 

101 seed=seed, 

102 configuration=configuration, 

103 cutoff_time=cutoff_time, 

104 run_on=run_on, 

105 sbatch_options=sbatch_options, 

106 slurm_prepend=slurm_prepend, 

107 log_dir=log_dir) 

108 if run_on == Runner.LOCAL: 

109 if isinstance(run, dict): 

110 run = [run] 

111 # Resolve objective keys 

112 status_key = [key for key in run[0] if key.lower().startswith("status")][0] 

113 time_key = [key for key in run[0] if key.lower().startswith("cpu_time")][0] 

114 for i, solver_output in enumerate(run): 

115 print(f"Execution of {solver.name} on instance {instances[i]} " 

116 f"completed with status {solver_output[status_key]} " 

117 f"in {solver_output[time_key]} seconds.") 

118 print("Running configured solver done!") 

119 else: 

120 runs.append(run) 

121 return runs 

122 

123 

124def run_solvers_performance_data( 

125 performance_data: PerformanceDataFrame, 

126 cutoff_time: int, 

127 rerun: bool = False, 

128 solvers: list[Solver] = None, 

129 instances: list[str] = None, 

130 sbatch_options: list[str] = None, 

131 slurm_prepend: str | list[str] | Path = None, 

132 run_on: Runner = Runner.SLURM) -> list[Run]: 

133 """Run the solvers for the performance data. 

134 

135 Parameters 

136 ---------- 

137 performance_data: PerformanceDataFrame 

138 The performance data 

139 cutoff_time: int 

140 The cut off time for the solvers 

141 rerun: bool 

142 Run only solvers for which no data is available yet (False) or (re)run all 

143 solvers to get (new) performance data for them (True) 

144 solvers: list[solvers] 

145 The solvers to run. If None, run all found solvers. 

146 instances: list[str] 

147 The instances to run the solvers on. If None, run all found instances. 

148 sbatch_options: list[str] 

149 The sbatch options to use 

150 slurm_prepend: str | list[str] | Path 

151 The script to prepend to a slurm script 

152 run_on: Runner 

153 Where to execute the solvers. For available values see runrunner.base.Runner 

154 enum. Default: "Runner.SLURM". 

155 

156 Returns 

157 ------- 

158 run: runrunner.LocalRun or runrunner.SlurmRun 

159 If the run is local return a QueuedRun object with the information concerning 

160 the run. 

161 """ 

162 # List of jobs to do 

163 jobs = performance_data.get_job_list(rerun=rerun) 

164 num_jobs = len(jobs) 

165 

166 print(f"Total number of jobs to run: {num_jobs}") 

167 

168 # If there are no jobs, stop 

169 if num_jobs == 0: 

170 return None 

171 

172 if run_on == Runner.LOCAL: 

173 print("Running the solvers locally") 

174 elif run_on == Runner.SLURM: 

175 print("Running the solvers through Slurm") 

176 

177 # Sort the jobs per solver 

178 solver_jobs = {p_solver: {} for _, _, p_solver in jobs} 

179 for p_instance, p_run, p_solver in jobs: 

180 if p_instance not in solver_jobs[p_solver]: 

181 solver_jobs[p_solver][p_instance] = [p_run] 

182 else: 

183 solver_jobs[p_solver][p_instance].append(p_run) 

184 runrunner_runs = [] 

185 solvers = [Solver(Path(p)) 

186 for p in performance_data.solvers] if solvers is None else solvers 

187 if run_on == Runner.LOCAL: 

188 print(f"Cutoff time for each solver run: {cutoff_time} seconds") 

189 for solver in solvers: 

190 solver_key = str(solver.directory) 

191 solver_instances = solver_jobs[solver_key].keys() 

192 if instances: # Filter 

193 solver_instances = [i for i in solver_instances if i in instances] 

194 runs = list(solver_jobs[solver_key][i] for i in solver_instances) 

195 if solver_instances == []: 

196 print(f"Warning: No jobs for instances found for solver {solver_key}") 

197 continue 

198 run = solver.run_performance_dataframe( 

199 solver_instances, runs, performance_data, cutoff_time=cutoff_time, 

200 sbatch_options=sbatch_options, slurm_prepend=slurm_prepend, 

201 log_dir=sl.caller_log_dir, base_dir=sl.caller_log_dir, run_on=run_on) 

202 runrunner_runs.append(run) 

203 if run_on == Runner.LOCAL: 

204 # Do some printing? 

205 pass 

206 if run_on == Runner.SLURM: 

207 num_jobs = sum(len(r.jobs) for r in runrunner_runs) 

208 print(f"Total number of jobs submitted: {num_jobs}") 

209 

210 return runrunner_runs 

211 

212 

213def main(argv: list[str]) -> None: 

214 """Main function of the run solvers command.""" 

215 # Log command call 

216 sl.log_command(sys.argv) 

217 check_for_initialise() 

218 

219 # Define command line arguments 

220 parser = parser_function() 

221 

222 # Process command line arguments 

223 args = parser.parse_args(argv) 

224 if args.settings_file is not None: 

225 # Do first, so other command line options can override settings from the file 

226 gv.settings().read_settings_ini(args.settings_file, SettingState.CMD_LINE) 

227 if args.target_cutoff_time is not None: 

228 gv.settings().set_general_target_cutoff_time( 

229 args.target_cutoff_time, SettingState.CMD_LINE) 

230 if args.run_on is not None: 

231 gv.settings().set_run_on( 

232 args.run_on.value, SettingState.CMD_LINE) 

233 if args.best_configuration or args.configuration: 

234 if not args.objective: 

235 objective = gv.settings().get_general_sparkle_objectives()[0] 

236 print("WARNING: Best configuration requested, but no objective specified. " 

237 f"Revert to first objective ({objective}).") 

238 else: 

239 objective = resolve_objective(args.objective) 

240 

241 # Compare current settings to latest.ini 

242 prev_settings = Settings(PurePath("Settings/latest.ini")) 

243 Settings.check_settings_changes(gv.settings(), prev_settings) 

244 

245 if args.solvers: 

246 solvers = [resolve_object_name(solver_path, 

247 gv.file_storage_data_mapping[gv.solver_nickname_list_path], 

248 gv.settings().DEFAULT_solver_dir, Solver) 

249 for solver_path in args.solvers] 

250 else: 

251 solvers = [Solver(p) for p in 

252 gv.settings().DEFAULT_solver_dir.iterdir() if p.is_dir()] 

253 

254 if args.instance_path: 

255 instances = [resolve_object_name(instance_path, 

256 gv.file_storage_data_mapping[gv.instances_nickname_path], 

257 gv.settings().DEFAULT_instance_dir, Instance_Set) 

258 for instance_path in args.instance_path] 

259 # Unpack the sets into instance strings 

260 instances = [str(path) for set in instances for path in set.instance_paths] 

261 else: 

262 instances = None # TODO: Fix? Or its good like this 

263 

264 sbatch_options = gv.settings().get_slurm_extra_options(as_args=True) 

265 slurm_prepend = gv.settings().get_slurm_job_prepend() 

266 # Write settings to file before starting, since they are used in callback scripts 

267 gv.settings().write_used_settings() 

268 run_on = gv.settings().get_run_on() 

269 cutoff_time = gv.settings().get_general_target_cutoff_time() 

270 # Open the performance data csv file 

271 performance_dataframe = PerformanceDataFrame( 

272 gv.settings().DEFAULT_performance_data_path) 

273 

274 print("Start running solvers ...") 

275 if args.performance_data_jobs: 

276 runs = run_solvers_performance_data( 

277 performance_data=performance_dataframe, 

278 solvers=solvers, 

279 instances=instances, 

280 cutoff_time=cutoff_time, 

281 rerun=args.recompute, 

282 sbatch_options=sbatch_options, 

283 slurm_prepend=slurm_prepend, 

284 run_on=run_on) 

285 else: 

286 configurations = [None] * len(solvers) 

287 if args.best_configuration: 

288 train_instances = None 

289 if isinstance(args.best_configuration, list): 

290 train_instances = [resolve_object_name( 

291 instance_path, 

292 gv.file_storage_data_mapping[gv.instances_nickname_path], 

293 gv.settings().DEFAULT_instance_dir, Instance_Set) 

294 for instance_path in args.best_configuration] 

295 # Unpack the sets into instance strings 

296 instances = [str(path) for set in train_instances 

297 for path in set.instance_paths] 

298 # Determine best configuration 

299 configurations = [performance_dataframe.best_configuration( 

300 str(solver.directory), objective, train_instances)[0] 

301 for solver in solvers] 

302 elif args.configuration: 

303 # Use given configurations 

304 # Hotfix: We take the first instance in the DF. Might not work in some cases 

305 instance = performance_dataframe.instances[0] 

306 configurations = [ast.literal_eval(performance_dataframe.get_value( 

307 str(solver.directory), instance, objective.name, run=args.configuration, 

308 solver_fields=[PerformanceDataFrame.column_configuration])) 

309 for solver in solvers] 

310 if instances is None: 

311 instances = performance_dataframe.instances 

312 runs = run_solvers( 

313 solvers=solvers, 

314 configuration=configurations, 

315 instances=instances, 

316 objectives=gv.settings().get_general_sparkle_objectives(), 

317 seed=random.randint(0, sys.maxsize), 

318 cutoff_time=cutoff_time, 

319 sbatch_options=sbatch_options, 

320 slurm_prepend=slurm_prepend, 

321 log_dir=sl.caller_log_dir, 

322 run_on=gv.settings().get_run_on(), 

323 ) 

324 

325 # If there are no jobs return 

326 if runs is None or all(run is None for run in runs): 

327 print("Running solvers done!") 

328 elif run_on == Runner.SLURM: 

329 print("Running solvers. Waiting for Slurm job(s) with id(s): " 

330 f'{",".join(r.run_id for r in runs if r is not None)}') 

331 sys.exit(0) 

332 

333 

334if __name__ == "__main__": 

335 main(sys.argv[1:])