Coverage for sparkle/CLI/run_solvers.py: 91%

134 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-07 15:22 +0000

1#!/usr/bin/env python3 

2"""Sparkle command to run solvers to get their performance data.""" 

3from __future__ import annotations 

4import random 

5import sys 

6import ast 

7import argparse 

8from pathlib import PurePath, Path 

9 

10from runrunner.base import Runner, Run 

11 

12from sparkle.solver import Solver 

13from sparkle.instance import Instance_Set 

14from sparkle.structures import PerformanceDataFrame 

15from sparkle.types import SparkleObjective, resolve_objective 

16from sparkle.platform.settings_objects import Settings, SettingState 

17from sparkle.CLI.help import global_variables as gv 

18from sparkle.CLI.help import logging as sl 

19from sparkle.CLI.help import argparse_custom as ac 

20from sparkle.CLI.help.nicknames import resolve_object_name 

21from sparkle.CLI.initialise import check_for_initialise 

22 

23 

24def parser_function() -> argparse.ArgumentParser: 

25 """Define the command line arguments.""" 

26 parser = argparse.ArgumentParser( 

27 description="Run solvers on instances to get their performance data.") 

28 parser.add_argument(*ac.SolversArgument.names, 

29 **ac.SolversArgument.kwargs) 

30 parser.add_argument(*ac.InstanceSetPathsArgument.names, 

31 **ac.InstanceSetPathsArgument.kwargs) 

32 

33 # Mutually exclusive: specific configuration or best configuration 

34 configuration_group = parser.add_mutually_exclusive_group() 

35 configuration_group.add_argument(*ac.ConfigurationArgument.names, 

36 **ac.ConfigurationArgument.kwargs) 

37 configuration_group.add_argument(*ac.BestConfigurationArgument.names, 

38 **ac.BestConfigurationArgument.kwargs) 

39 

40 parser.add_argument(*ac.ObjectiveArgument.names, 

41 **ac.ObjectiveArgument.kwargs) 

42 parser.add_argument(*ac.PerformanceDataJobsArgument.names, 

43 **ac.PerformanceDataJobsArgument.kwargs) 

44 # This one is only relevant if the argument above is given 

45 parser.add_argument(*ac.RecomputeRunSolversArgument.names, 

46 **ac.RecomputeRunSolversArgument.kwargs) 

47 parser.add_argument(*ac.TargetCutOffTimeArgument.names, 

48 **ac.TargetCutOffTimeArgument.kwargs) 

49 parser.add_argument(*ac.RunOnArgument.names, 

50 **ac.RunOnArgument.kwargs) 

51 parser.add_argument(*ac.SettingsFileArgument.names, 

52 **ac.SettingsFileArgument.kwargs) 

53 return parser 

54 

55 

56def run_solvers( 

57 solvers: list[Solver], 

58 instances: list[str], 

59 objectives: list[SparkleObjective], 

60 seed: int, 

61 cutoff_time: int, 

62 configuration: list[dict] = None, 

63 sbatch_options: list[str] = None, 

64 log_dir: Path = None, 

65 run_on: Runner = Runner.SLURM,) -> list[Run]: 

66 """Run the solvers. 

67 

68 Parameters 

69 ---------- 

70 solvers: list[solvers] 

71 The solvers to run 

72 instances: list[str] 

73 The instances to run the solvers on 

74 objectives: list[SparkleObjective] 

75 The objective values to retrieve from the solvers 

76 seed: int 

77 The seed to use 

78 cutoff_time: int 

79 The cut off time for the solvers 

80 configuration: list[dict] 

81 The configuration to use for the solvers 

82 sbatch_options: list[str] 

83 The sbatch options to use for the solvers 

84 log_dir: Path 

85 The directory to use for the logs 

86 run_on: Runner 

87 Where to execute the solvers. 

88 

89 Returns 

90 ------- 

91 run: runrunner.LocalRun or runrunner.SlurmRun 

92 """ 

93 runs = [] 

94 # Run the solvers 

95 for solver, configuration in zip(solvers, configuration): 

96 run = solver.run(instances=instances, 

97 objectives=objectives, 

98 seed=seed, 

99 configuration=configuration, 

100 cutoff_time=cutoff_time, 

101 run_on=run_on, 

102 sbatch_options=sbatch_options, 

103 log_dir=log_dir) 

104 if run_on == Runner.LOCAL: 

105 if isinstance(run, dict): 

106 run = [run] 

107 # Resolve objective keys 

108 status_key = [key for key in run[0] if key.lower().startswith("status")][0] 

109 time_key = [key for key in run[0] if key.lower().startswith("cpu_time")][0] 

110 for i, solver_output in enumerate(run): 

111 print(f"Execution of {solver.name} on instance {instances[i]} " 

112 f"completed with status {solver_output[status_key]} " 

113 f"in {solver_output[time_key]} seconds.") 

114 print("Running configured solver done!") 

115 else: 

116 runs.append(run) 

117 return runs 

118 

119 

120def run_solvers_performance_data( 

121 performance_data: PerformanceDataFrame, 

122 cutoff_time: int, 

123 rerun: bool = False, 

124 solvers: list[Solver] = None, 

125 instances: list[str] = None, 

126 sbatch_options: list[str] = None, 

127 run_on: Runner = Runner.SLURM) -> list[Run]: 

128 """Run the solvers for the performance data. 

129 

130 Parameters 

131 ---------- 

132 performance_data: PerformanceDataFrame 

133 The performance data 

134 cutoff_time: int 

135 The cut off time for the solvers 

136 rerun: bool 

137 Run only solvers for which no data is available yet (False) or (re)run all 

138 solvers to get (new) performance data for them (True) 

139 solvers: list[solvers] 

140 The solvers to run. If None, run all found solvers. 

141 instances: list[str] 

142 The instances to run the solvers on. If None, run all found instances. 

143 sbatch_options: list[str] 

144 The sbatch options to use 

145 run_on: Runner 

146 Where to execute the solvers. For available values see runrunner.base.Runner 

147 enum. Default: "Runner.SLURM". 

148 

149 Returns 

150 ------- 

151 run: runrunner.LocalRun or runrunner.SlurmRun 

152 If the run is local return a QueuedRun object with the information concerning 

153 the run. 

154 """ 

155 # List of jobs to do 

156 jobs = performance_data.get_job_list(rerun=rerun) 

157 num_jobs = len(jobs) 

158 

159 print(f"Total number of jobs to run: {num_jobs}") 

160 

161 # If there are no jobs, stop 

162 if num_jobs == 0: 

163 return None 

164 

165 if run_on == Runner.LOCAL: 

166 print("Running the solvers locally") 

167 elif run_on == Runner.SLURM: 

168 print("Running the solvers through Slurm") 

169 

170 # Sort the jobs per solver 

171 solver_jobs = {p_solver: {} for _, _, p_solver in jobs} 

172 for p_instance, p_run, p_solver in jobs: 

173 if p_instance not in solver_jobs[p_solver]: 

174 solver_jobs[p_solver][p_instance] = [p_run] 

175 else: 

176 solver_jobs[p_solver][p_instance].append(p_run) 

177 runrunner_runs = [] 

178 solvers = [Solver(Path(p)) 

179 for p in performance_data.solvers] if solvers is None else solvers 

180 if run_on == Runner.LOCAL: 

181 print(f"Cutoff time for each solver run: {cutoff_time} seconds") 

182 for solver in solvers: 

183 solver_key = str(solver.directory) 

184 solver_instances = solver_jobs[solver_key].keys() 

185 if instances: # Filter 

186 solver_instances = [i for i in solver_instances if i in instances] 

187 runs = list(solver_jobs[solver_key][i] for i in solver_instances) 

188 if solver_instances == []: 

189 print(f"Warning: No jobs for instances found for solver {solver_key}") 

190 continue 

191 run = solver.run_performance_dataframe( 

192 solver_instances, runs, performance_data, cutoff_time=cutoff_time, 

193 sbatch_options=sbatch_options, log_dir=sl.caller_log_dir, 

194 base_dir=sl.caller_log_dir, run_on=run_on) 

195 runrunner_runs.append(run) 

196 if run_on == Runner.LOCAL: 

197 # Do some printing? 

198 pass 

199 if run_on == Runner.SLURM: 

200 num_jobs = sum(len(r.jobs) for r in runrunner_runs) 

201 print(f"Total number of jobs submitted: {num_jobs}") 

202 

203 return runrunner_runs 

204 

205 

206def main(argv: list[str]) -> None: 

207 """Main function of the run solvers command.""" 

208 # Log command call 

209 sl.log_command(sys.argv) 

210 check_for_initialise() 

211 

212 # Define command line arguments 

213 parser = parser_function() 

214 

215 # Process command line arguments 

216 args = parser.parse_args(argv) 

217 if args.settings_file is not None: 

218 # Do first, so other command line options can override settings from the file 

219 gv.settings().read_settings_ini(args.settings_file, SettingState.CMD_LINE) 

220 if args.target_cutoff_time is not None: 

221 gv.settings().set_general_target_cutoff_time( 

222 args.target_cutoff_time, SettingState.CMD_LINE) 

223 if args.run_on is not None: 

224 gv.settings().set_run_on( 

225 args.run_on.value, SettingState.CMD_LINE) 

226 if args.best_configuration or args.configuration: 

227 if not args.objective: 

228 objective = gv.settings().get_general_sparkle_objectives()[0] 

229 print("WARNING: Best configuration requested, but no objective specified. " 

230 f"Revert to first objective ({objective}).") 

231 else: 

232 objective = resolve_objective(args.objective) 

233 

234 # Compare current settings to latest.ini 

235 prev_settings = Settings(PurePath("Settings/latest.ini")) 

236 Settings.check_settings_changes(gv.settings(), prev_settings) 

237 

238 if args.solvers: 

239 solvers = [resolve_object_name(solver_path, 

240 gv.file_storage_data_mapping[gv.solver_nickname_list_path], 

241 gv.settings().DEFAULT_solver_dir, Solver) 

242 for solver_path in args.solvers] 

243 else: 

244 solvers = [Solver(p) for p in 

245 gv.settings().DEFAULT_solver_dir.iterdir() if p.is_dir()] 

246 

247 if args.instance_path: 

248 instances = [resolve_object_name(instance_path, 

249 gv.file_storage_data_mapping[gv.instances_nickname_path], 

250 gv.settings().DEFAULT_instance_dir, Instance_Set) 

251 for instance_path in args.instance_path] 

252 # Unpack the sets into instance strings 

253 instances = [str(path) for set in instances for path in set.instance_paths] 

254 else: 

255 instances = None # TODO: Fix? Or its good like this 

256 

257 sbatch_options = gv.settings().get_slurm_extra_options(as_args=True) 

258 # Write settings to file before starting, since they are used in callback scripts 

259 gv.settings().write_used_settings() 

260 run_on = gv.settings().get_run_on() 

261 cutoff_time = gv.settings().get_general_target_cutoff_time() 

262 # Open the performance data csv file 

263 performance_dataframe = PerformanceDataFrame( 

264 gv.settings().DEFAULT_performance_data_path) 

265 

266 print("Start running solvers ...") 

267 if args.performance_data_jobs: 

268 runs = run_solvers_performance_data( 

269 performance_data=performance_dataframe, 

270 solvers=solvers, 

271 instances=instances, 

272 cutoff_time=cutoff_time, 

273 rerun=args.recompute, 

274 sbatch_options=sbatch_options, 

275 run_on=run_on) 

276 else: 

277 configurations = [None] * len(solvers) 

278 if args.best_configuration: 

279 train_instances = None 

280 if isinstance(args.best_configuration, list): 

281 train_instances = [resolve_object_name( 

282 instance_path, 

283 gv.file_storage_data_mapping[gv.instances_nickname_path], 

284 gv.settings().DEFAULT_instance_dir, Instance_Set) 

285 for instance_path in args.best_configuration] 

286 # Unpack the sets into instance strings 

287 instances = [str(path) for set in train_instances 

288 for path in set.instance_paths] 

289 # Determine best configuration 

290 configurations = [performance_dataframe.best_configuration( 

291 str(solver.directory), objective, train_instances)[0] 

292 for solver in solvers] 

293 elif args.configuration: 

294 # Use given configurations 

295 # Hotfix: We take the first instance in the DF. Might not work in some cases 

296 instance = performance_dataframe.instances[0] 

297 configurations = [ast.literal_eval(performance_dataframe.get_value( 

298 str(solver.directory), instance, objective.name, run=args.configuration, 

299 solver_fields=[PerformanceDataFrame.column_configuration])) 

300 for solver in solvers] 

301 if instances is None: 

302 instances = performance_dataframe.instances 

303 runs = run_solvers( 

304 solvers=solvers, 

305 configuration=configurations, 

306 instances=instances, 

307 objectives=gv.settings().get_general_sparkle_objectives(), 

308 seed=random.randint(0, sys.maxsize), 

309 cutoff_time=cutoff_time, 

310 sbatch_options=sbatch_options, 

311 log_dir=sl.caller_log_dir, 

312 run_on=gv.settings().get_run_on(), 

313 ) 

314 

315 # If there are no jobs return 

316 if runs is None or all(run is None for run in runs): 

317 print("Running solvers done!") 

318 elif run_on == Runner.SLURM: 

319 print("Running solvers. Waiting for Slurm job(s) with id(s): " 

320 f'{",".join(r.run_id for r in runs if r is not None)}') 

321 sys.exit(0) 

322 

323 

324if __name__ == "__main__": 

325 main(sys.argv[1:])