Coverage for sparkle/CLI/run_parallel_portfolio.py: 67%

217 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-03 10:42 +0000

1#!/usr/bin/env python3 

2# -*- coding: UTF-8 -*- 

3"""Sparkle command to execute a parallel algorithm portfolio.""" 

4 

5import sys 

6import argparse 

7import random 

8import time 

9import shutil 

10import csv 

11import itertools 

12from pathlib import Path, PurePath 

13 

14from tqdm import tqdm 

15 

16import runrunner as rrr 

17from runrunner.base import Runner 

18from runrunner.slurm import Status 

19 

20from sparkle.CLI.help.reporting_scenario import Scenario 

21from sparkle.CLI.help import logging as sl 

22from sparkle.CLI.help import global_variables as gv 

23from sparkle.CLI.initialise import check_for_initialise 

24from sparkle.CLI.help import argparse_custom as ac 

25from sparkle.CLI.help.nicknames import resolve_object_name 

26from sparkle.platform.settings_objects import Settings, SettingState 

27from sparkle.solver import Solver 

28from sparkle.instance import Instance_Set, InstanceSet 

29from sparkle.types import SolverStatus, resolve_objective, UseTime 

30 

31 

32def run_parallel_portfolio(instances_set: InstanceSet, 

33 portfolio_path: Path, 

34 solvers: list[Solver], 

35 run_on: Runner = Runner.SLURM) -> None: 

36 """Run the parallel algorithm portfolio. 

37 

38 Args: 

39 instances_set: Set of instances to run on. 

40 portfolio_path: Path to the parallel portfolio. 

41 solvers: List of solvers to run on the instances. 

42 run_on: Currently only supports Slurm. 

43 """ 

44 num_solvers, num_instances = len(solvers), len(instances_set._instance_paths) 

45 seeds_per_solver = gv.settings().get_parallel_portfolio_number_of_seeds_per_solver() 

46 num_jobs = num_solvers * num_instances * seeds_per_solver 

47 parallel_jobs = min(gv.settings().get_number_of_jobs_in_parallel(), num_jobs) 

48 if parallel_jobs > num_jobs: 

49 print("WARNING: Not all jobs will be started at the same time due to the " 

50 "limitation of number of Slurm jobs that can be run in parallel. Check" 

51 " your Sparkle Slurm Settings.") 

52 print(f"Sparkle parallel portfolio is running {seeds_per_solver} seed(s) per solver " 

53 f"on {num_solvers} solvers for {num_instances} instances ...") 

54 cmd_list = [] 

55 cutoff = gv.settings().get_general_target_cutoff_time() 

56 objectives = gv.settings().get_general_sparkle_objectives() 

57 # Create a command for each instance-solver-seed combination 

58 for instance, solver in itertools.product(instances_set._instance_paths, solvers): 

59 for _ in range(seeds_per_solver): 

60 seed = int(random.getrandbits(32)) 

61 solver_call_list = solver.build_cmd( 

62 instance.absolute(), 

63 objectives=objectives, 

64 seed=seed, 

65 cutoff_time=cutoff, 

66 log_dir=portfolio_path) 

67 cmd_list.append((" ".join(solver_call_list)).replace("'", '"')) 

68 # Jobs are added in to the runrunner object in the same order they are provided 

69 sbatch_options = gv.settings().get_slurm_extra_options(as_args=True) 

70 solver_names = ", ".join([s.name for s in solvers]) 

71 run = rrr.add_to_queue( 

72 runner=run_on, 

73 cmd=cmd_list, 

74 name=f"Parallel Portfolio: {solver_names}", 

75 parallel_jobs=parallel_jobs, 

76 base_dir=sl.caller_log_dir, 

77 srun_options=["-N1", "-n1"] + sbatch_options, 

78 sbatch_options=sbatch_options, 

79 prepend=gv.settings().get_slurm_job_prepend(), 

80 ) 

81 check_interval = gv.settings().get_parallel_portfolio_check_interval() 

82 instances_done = [False] * num_instances 

83 # We record the 'best' of all seed results per solver-instance, 

84 # setting start values for objectives that are always present 

85 cpu_time_key = [o.name for o in objectives if o.name.startswith("cpu_time")][0] 

86 status_key = [o.name for o in objectives if o.name.startswith("status")][0] 

87 wall_time_key = [o.name for o in objectives if o.name.startswith("wall_time")][0] 

88 default_objective_values = {} 

89 for o in objectives: 

90 default_value = float(sys.maxsize) if o.minimise else 0 

91 # Default values for time objectives can be linked to cutoff time 

92 if o.time and o.post_process: 

93 default_value = o.post_process(default_value, cutoff, SolverStatus.KILLED) 

94 default_objective_values[o.name] = default_value 

95 default_objective_values[status_key] = SolverStatus.UNKNOWN # Overwrite Status 

96 job_output_dict = {instance_name: {solver.name: default_objective_values.copy() 

97 for solver in solvers} 

98 for instance_name in instances_set._instance_names} 

99 n_instance_jobs = num_solvers * seeds_per_solver 

100 

101 with tqdm(total=len(instances_done)) as pbar: 

102 pbar.set_description("Instances done") 

103 while not all(instances_done): 

104 prev_done = sum(instances_done) 

105 time.sleep(check_interval) 

106 job_status_list = [r.status for r in run.jobs] 

107 job_status_completed = [status == Status.COMPLETED 

108 for status in job_status_list] 

109 # The jobs are sorted by instance 

110 for i, instance in enumerate(instances_set._instance_paths): 

111 if instances_done[i]: 

112 continue 

113 instance_job_slice = slice(i * n_instance_jobs, 

114 (i + 1) * n_instance_jobs) 

115 if any(job_status_completed[instance_job_slice]): 

116 instances_done[i] = True 

117 # Kill all running jobs for this instance 

118 solver_kills = [0] * num_solvers 

119 for job_index in range(i * n_instance_jobs, 

120 (i + 1) * n_instance_jobs): 

121 if not job_status_completed[job_index]: 

122 run.jobs[job_index].kill() 

123 solver_index = int( 

124 (job_index % n_instance_jobs) / seeds_per_solver) 

125 solver_kills[solver_index] += 1 

126 for solver_index in range(num_solvers): 

127 # All seeds of a solver were killed on instance, set status kill 

128 if solver_kills[solver_index] == seeds_per_solver: 

129 solver_name = solvers[solver_index].name 

130 job_output_dict[instance.name][solver_name]["status"] =\ 

131 SolverStatus.KILLED 

132 pbar.update(sum(instances_done) - prev_done) 

133 

134 # Attempt to verify that all logs have been written (Slurm I/O latency) 

135 for index, cmd in enumerate(cmd_list): 

136 runsolver_configuration = cmd.split(" ")[:11] 

137 logs = [Path(p) for p in runsolver_configuration 

138 if Path(p).suffix in [".log", ".val", ".rawres"]] 

139 if not all([p.exists() for p in logs]): 

140 time.sleep(check_interval) 

141 

142 # Now iterate over runsolver logs to get runtime, get the lowest value per seed 

143 for index, cmd in enumerate(cmd_list): 

144 solver_index = int((index % n_instance_jobs) / seeds_per_solver) 

145 runsolver_configuration = cmd.split(" ")[:11] 

146 solver_obj = solvers[solver_index] 

147 solver_output = Solver.parse_solver_output(run.jobs[i].stdout, 

148 cmd.split(" "), 

149 objectives=objectives, 

150 verifier=solver_obj.verifier) 

151 instance_name = instances_set._instance_names[int(index / n_instance_jobs)] 

152 cpu_time = solver_output[cpu_time_key] 

153 cmd_output = job_output_dict[instance_name][solver_obj.name] 

154 if cpu_time > 0.0 and cpu_time < cmd_output[cpu_time_key]: 

155 for key, value in solver_output.items(): 

156 if key in [o.name for o in objectives]: 

157 job_output_dict[instance_name][solver_obj.name][key] = value 

158 if (status_key not in cmd_output 

159 or cmd_output[status_key] != SolverStatus.KILLED): 

160 cmd_output[status_key] = solver_output[status_key] 

161 

162 # Fix the CPU/WC time for non existent logs to instance min time + check_interval 

163 for instance in job_output_dict.keys(): 

164 no_log_solvers = [] 

165 min_time = cutoff 

166 for solver in job_output_dict[instance].keys(): 

167 cpu_time = job_output_dict[instance][solver][cpu_time_key] 

168 if cpu_time == -1.0 or cpu_time == float(sys.maxsize): 

169 no_log_solvers.append(solver) 

170 elif cpu_time < min_time: 

171 min_time = cpu_time 

172 for solver in no_log_solvers: 

173 job_output_dict[instance][solver][cpu_time_key] = min_time + check_interval 

174 job_output_dict[instance][solver][wall_time_key] = min_time + check_interval 

175 # Fix runtime objectives with resolved CPU/Wall times 

176 for key, value in job_output_dict[instance][solver].items(): 

177 objective = resolve_objective(key) 

178 if objective is not None and objective.time: 

179 print(key, value) 

180 if objective.use_time == UseTime.CPU_TIME: 

181 value = job_output_dict[instance][solver][cpu_time_key] 

182 else: 

183 value = job_output_dict[instance][solver][wall_time_key] 

184 if objective.post_process is not None: 

185 status = job_output_dict[instance][solver][status_key] 

186 value = objective.post_process(value, cutoff, status) 

187 job_output_dict[instance][solver][key] = value 

188 

189 for index, instance_name in enumerate(instances_set._instance_names): 

190 index_str = f"[{index + 1}/{num_instances}] " 

191 instance_output = job_output_dict[instance_name] 

192 if all([instance_output[k][status_key] == SolverStatus.TIMEOUT 

193 for k in instance_output.keys()]): 

194 print(f"\n{index_str}{instance_name} was not solved within the cutoff-time.") 

195 continue 

196 print(f"\n{index_str}{instance_name} yielded the following Solver results:") 

197 for sindex in range(index * num_solvers, (index + 1) * num_solvers): 

198 solver_name = solvers[sindex % num_solvers].name 

199 job_info = job_output_dict[instance_name][solver_name] 

200 print(f"\t- {solver_name} ended with status {job_info[status_key]} in " 

201 f"{job_info[cpu_time_key]}s CPU-Time ({job_info[wall_time_key]}s " 

202 "Wall clock time)") 

203 

204 # Write the results to a CSV 

205 csv_path = portfolio_path / "results.csv" 

206 values_header = [o.name for o in objectives] 

207 header = ["Instance", "Solver"] + values_header 

208 result_rows = [header] 

209 for instance_name in job_output_dict.keys(): 

210 for solver_name in job_output_dict[instance_name].keys(): 

211 job_o = job_output_dict[instance_name][solver_name] 

212 values = [instance_name, solver_name] + [ 

213 job_o[key] if key in job_o else "None" 

214 for key in values_header] 

215 result_rows.append(values) 

216 with csv_path.open("w") as out: 

217 writer = csv.writer(out) 

218 writer.writerows(result_rows) 

219 

220 

221def parser_function() -> argparse.ArgumentParser: 

222 """Define the command line arguments. 

223 

224 Returns: 

225 parser: The parser with the parsed command line arguments 

226 """ 

227 parser = argparse.ArgumentParser(description="Run a portfolio of solvers on an " 

228 "instance set in parallel.") 

229 parser.add_argument(*ac.InstanceSetPathsArgument.names, 

230 **ac.InstanceSetPathsArgument.kwargs) 

231 parser.add_argument(*ac.NicknamePortfolioArgument.names, 

232 **ac.NicknamePortfolioArgument.kwargs) 

233 parser.add_argument(*ac.SolversArgument.names, 

234 **ac.SolversArgument.kwargs) 

235 parser.add_argument(*ac.ObjectivesArgument.names, 

236 **ac.ObjectivesArgument.kwargs) 

237 parser.add_argument(*ac.CutOffTimeArgument.names, 

238 **ac.CutOffTimeArgument.kwargs) 

239 parser.add_argument(*ac.SolverSeedsArgument.names, 

240 **ac.SolverSeedsArgument.kwargs) 

241 parser.add_argument(*ac.RunOnArgument.names, 

242 **ac.RunOnArgument.kwargs) 

243 parser.add_argument(*ac.SettingsFileArgument.names, 

244 **ac.SettingsFileArgument.kwargs) 

245 return parser 

246 

247 

248def main(argv: list[str]) -> None: 

249 """Main method of run parallel portfolio command.""" 

250 # Log command call 

251 sl.log_command(sys.argv) 

252 check_for_initialise() 

253 

254 # Define command line arguments 

255 parser = parser_function() 

256 

257 # Process command line arguments 

258 args = parser.parse_args(argv) 

259 if args.solvers is not None: 

260 solver_paths = [resolve_object_name("".join(s), 

261 target_dir=gv.settings().DEFAULT_solver_dir) 

262 for s in args.solvers] 

263 if None in solver_paths: 

264 print("Some solvers not recognised! Check solver names:") 

265 for i, name in enumerate(solver_paths): 

266 if solver_paths[i] is None: 

267 print(f'\t- "{solver_paths[i]}" ') 

268 sys.exit(-1) 

269 solvers = [Solver(p) for p in solver_paths] 

270 else: 

271 solvers = [Solver(p) for p in 

272 gv.settings().DEFAULT_solver_dir.iterdir() if p.is_dir()] 

273 

274 # Compare current settings to latest.ini 

275 prev_settings = Settings(PurePath("Settings/latest.ini")) 

276 Settings.check_settings_changes(gv.settings(), prev_settings) 

277 

278 # Do first, so other command line options can override settings from the file 

279 if args.settings_file is not None: 

280 gv.settings().read_settings_ini(args.settings_file, SettingState.CMD_LINE) 

281 

282 portfolio_path = args.portfolio_name 

283 

284 if args.run_on is not None: 

285 gv.settings().set_run_on( 

286 args.run_on.value, SettingState.CMD_LINE) 

287 run_on = gv.settings().get_run_on() 

288 

289 if args.solver_seeds is not None: 

290 gv.settings().set_parallel_portfolio_number_of_seeds_per_solver( 

291 args.solver_seeds, SettingState.CMD_LINE) 

292 

293 if run_on == Runner.LOCAL: 

294 print("Parallel Portfolio is not fully supported yet for Local runs. Exiting.") 

295 sys.exit(-1) 

296 

297 # Retrieve instance sets 

298 instances = [resolve_object_name(instance_path, 

299 gv.file_storage_data_mapping[gv.instances_nickname_path], 

300 gv.settings().DEFAULT_instance_dir, Instance_Set) 

301 for instance_path in args.instance_path] 

302 # Join them into one 

303 if len(instances) > 1: 

304 print("WARNING: More than one instance set specified. " 

305 "Currently only supporting one.") 

306 instances = instances[0] 

307 

308 print(f"Running on {instances.size} instance(s)...") 

309 

310 if args.cutoff_time is not None: 

311 gv.settings().set_general_target_cutoff_time(args.cutoff_time, 

312 SettingState.CMD_LINE) 

313 

314 if args.objectives is not None: 

315 gv.settings().set_general_sparkle_objectives( 

316 args.objectives, SettingState.CMD_LINE) 

317 if not gv.settings().get_general_sparkle_objectives()[0].time: 

318 print("ERROR: Parallel Portfolio is currently only relevant for " 

319 "RunTime objectives. In all other cases, use validation") 

320 sys.exit(-1) 

321 

322 if args.portfolio_name is not None: # Use a nickname 

323 portfolio_path = gv.settings().DEFAULT_parallel_portfolio_output_raw /\ 

324 args.portfolio_name 

325 else: # Generate a timestamped nickname 

326 timestamp = time.strftime("%Y-%m-%d-%H:%M:%S", time.gmtime(time.time())) 

327 randintstamp = int(random.getrandbits(32)) 

328 portfolio_path = gv.settings().DEFAULT_parallel_portfolio_output_raw /\ 

329 f"{timestamp}_{randintstamp}" 

330 if portfolio_path.exists(): 

331 print(f"[WARNING] Portfolio path {portfolio_path} already exists! " 

332 "Overwrite? [y/n] ", end="") 

333 user_input = input() 

334 if user_input != "y": 

335 sys.exit() 

336 shutil.rmtree(portfolio_path) 

337 portfolio_path.mkdir(parents=True) 

338 run_parallel_portfolio(instances, portfolio_path, solvers, run_on=run_on) 

339 

340 # Update latest scenario 

341 gv.latest_scenario().set_parallel_portfolio_path(portfolio_path) 

342 gv.latest_scenario().set_latest_scenario(Scenario.PARALLEL_PORTFOLIO) 

343 gv.latest_scenario().set_parallel_portfolio_instance_path(instances.directory) 

344 # Write used scenario to file 

345 gv.latest_scenario().write_scenario_ini() 

346 # Write used settings to file 

347 gv.settings().write_used_settings() 

348 print("Running Sparkle parallel portfolio is done!") 

349 sys.exit(0) 

350 

351 

352if __name__ == "__main__": 

353 main(sys.argv[1:])