Coverage for sparkle/CLI/run_parallel_portfolio.py: 12%

213 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-07 15:22 +0000

1#!/usr/bin/env python3 

2# -*- coding: UTF-8 -*- 

3"""Sparkle command to execute a parallel algorithm portfolio.""" 

4 

5import sys 

6import argparse 

7import random 

8import time 

9import shutil 

10import csv 

11import itertools 

12from pathlib import Path, PurePath 

13 

14from tqdm import tqdm 

15 

16import runrunner as rrr 

17from runrunner.base import Runner 

18from runrunner.slurm import Status 

19 

20from sparkle.CLI.help.reporting_scenario import Scenario 

21from sparkle.CLI.help import logging as sl 

22from sparkle.CLI.help import global_variables as gv 

23from sparkle.CLI.initialise import check_for_initialise 

24from sparkle.CLI.help import argparse_custom as ac 

25from sparkle.CLI.help.nicknames import resolve_object_name 

26from sparkle.platform.settings_objects import Settings, SettingState 

27from sparkle.solver import Solver 

28from sparkle.instance import Instance_Set, InstanceSet 

29from sparkle.types import SolverStatus, resolve_objective, UseTime 

30 

31 

32def run_parallel_portfolio(instances_set: InstanceSet, 

33 portfolio_path: Path, 

34 solvers: list[Solver], 

35 run_on: Runner = Runner.SLURM) -> None: 

36 """Run the parallel algorithm portfolio. 

37 

38 Args: 

39 instances_set: Set of instances to run on. 

40 portfolio_path: Path to the parallel portfolio. 

41 solvers: List of solvers to run on the instances. 

42 run_on: Currently only supports Slurm. 

43 """ 

44 num_solvers, num_instances = len(solvers), len(instances_set._instance_paths) 

45 seeds_per_solver = gv.settings().get_parallel_portfolio_number_of_seeds_per_solver() 

46 num_jobs = num_solvers * num_instances * seeds_per_solver 

47 parallel_jobs = min(gv.settings().get_number_of_jobs_in_parallel(), num_jobs) 

48 if parallel_jobs > num_jobs: 

49 print("WARNING: Not all jobs will be started at the same time due to the " 

50 "limitation of number of Slurm jobs that can be run in parallel. Check" 

51 " your Sparkle Slurm Settings.") 

52 print(f"Sparkle parallel portfolio is running {seeds_per_solver} seed(s) per solver " 

53 f"on {num_solvers} solvers for {num_instances} instances ...") 

54 cmd_list = [] 

55 cutoff = gv.settings().get_general_target_cutoff_time() 

56 objectives = gv.settings().get_general_sparkle_objectives() 

57 # Create a command for each instance-solver-seed combination 

58 for instance, solver in itertools.product(instances_set._instance_paths, solvers): 

59 for _ in range(seeds_per_solver): 

60 seed = int(random.getrandbits(32)) 

61 solver_call_list = solver.build_cmd( 

62 instance.absolute(), 

63 objectives=objectives, 

64 seed=seed, 

65 cutoff_time=cutoff, 

66 log_dir=portfolio_path) 

67 cmd_list.append((" ".join(solver_call_list)).replace("'", '"')) 

68 # Jobs are added in to the runrunner object in the same order they are provided 

69 sbatch_options = gv.settings().get_slurm_extra_options(as_args=True) 

70 solver_names = ", ".join([s.name for s in solvers]) 

71 run = rrr.add_to_queue( 

72 runner=run_on, 

73 cmd=cmd_list, 

74 name=f"Parallel Portfolio: {solver_names}", 

75 parallel_jobs=parallel_jobs, 

76 base_dir=sl.caller_log_dir, 

77 srun_options=["-N1", "-n1"] + sbatch_options, 

78 sbatch_options=sbatch_options 

79 ) 

80 check_interval = gv.settings().get_parallel_portfolio_check_interval() 

81 instances_done = [False] * num_instances 

82 # We record the 'best' of all seed results per solver-instance, 

83 # setting start values for objectives that are always present 

84 cpu_time_key = [o.name for o in objectives if o.name.startswith("cpu_time")][0] 

85 status_key = [o.name for o in objectives if o.name.startswith("status")][0] 

86 wall_time_key = [o.name for o in objectives if o.name.startswith("wall_time")][0] 

87 default_objective_values = {} 

88 for o in objectives: 

89 default_value = float(sys.maxsize) if o.minimise else 0 

90 # Default values for time objectives can be linked to cutoff time 

91 if o.time and o.post_process: 

92 default_value = o.post_process(default_value, cutoff, SolverStatus.KILLED) 

93 default_objective_values[o.name] = default_value 

94 job_output_dict = {instance_name: {solver.name: default_objective_values.copy() 

95 for solver in solvers} 

96 for instance_name in instances_set._instance_names} 

97 n_instance_jobs = num_solvers * seeds_per_solver 

98 

99 with tqdm(total=len(instances_done)) as pbar: 

100 pbar.set_description("Instances done") 

101 while not all(instances_done): 

102 prev_done = sum(instances_done) 

103 time.sleep(check_interval) 

104 job_status_list = [r.status for r in run.jobs] 

105 job_status_completed = [status == Status.COMPLETED 

106 for status in job_status_list] 

107 # The jobs are sorted by instance 

108 for i, instance in enumerate(instances_set._instance_paths): 

109 if instances_done[i]: 

110 continue 

111 instance_job_slice = slice(i * n_instance_jobs, 

112 (i + 1) * n_instance_jobs) 

113 if any(job_status_completed[instance_job_slice]): 

114 instances_done[i] = True 

115 # Kill all running jobs for this instance 

116 solver_kills = [0] * num_solvers 

117 for job_index in range(i * n_instance_jobs, 

118 (i + 1) * n_instance_jobs): 

119 if not job_status_completed[job_index]: 

120 run.jobs[job_index].kill() 

121 solver_index = int( 

122 (job_index % n_instance_jobs) / seeds_per_solver) 

123 solver_kills[solver_index] += 1 

124 for solver_index in range(num_solvers): 

125 # All seeds of a solver were killed on instance, set status kill 

126 if solver_kills[solver_index] == seeds_per_solver: 

127 solver_name = solvers[solver_index].name 

128 job_output_dict[instance.name][solver_name]["status"] =\ 

129 SolverStatus.KILLED 

130 pbar.update(sum(instances_done) - prev_done) 

131 

132 # Attempt to verify that all logs have been written (Slurm I/O latency) 

133 for index, cmd in enumerate(cmd_list): 

134 runsolver_configuration = cmd.split(" ")[:11] 

135 logs = [Path(p) for p in runsolver_configuration 

136 if Path(p).suffix in [".log", ".val", ".rawres"]] 

137 if not all([p.exists() for p in logs]): 

138 time.sleep(check_interval) 

139 

140 # Now iterate over runsolver logs to get runtime, get the lowest value per seed 

141 for index, cmd in enumerate(cmd_list): 

142 solver_index = int((index % n_instance_jobs) / seeds_per_solver) 

143 runsolver_configuration = cmd.split(" ")[:11] 

144 solver_obj = solvers[solver_index] 

145 solver_output = Solver.parse_solver_output(run.jobs[i].stdout, 

146 cmd.split(" "), 

147 objectives=objectives, 

148 verifier=solver_obj.verifier) 

149 instance_name = instances_set._instance_names[int(index / n_instance_jobs)] 

150 print(solver_output) 

151 cpu_time = solver_output[cpu_time_key] 

152 cmd_output = job_output_dict[instance_name][solver_obj.name] 

153 if cpu_time > 0.0 and cpu_time < cmd_output[cpu_time_key]: 

154 for key, value in solver_output.items(): 

155 if key in [o.name for o in objectives]: 

156 job_output_dict[instance_name][solver_obj.name][key] = value 

157 if (status_key not in cmd_output 

158 or cmd_output[status_key] != SolverStatus.KILLED): 

159 cmd_output[status_key] = solver_output[status_key] 

160 

161 # Fix the CPU/WC time for non existent logs to instance min time + check_interval 

162 for instance in job_output_dict.keys(): 

163 no_log_solvers = [] 

164 min_time = cutoff 

165 for solver in job_output_dict[instance].keys(): 

166 cpu_time = job_output_dict[instance][solver][cpu_time_key] 

167 if cpu_time == -1.0 or cpu_time == float(sys.maxsize): 

168 no_log_solvers.append(solver) 

169 elif cpu_time < min_time: 

170 min_time = cpu_time 

171 for solver in no_log_solvers: 

172 job_output_dict[instance][solver][cpu_time_key] = min_time + check_interval 

173 job_output_dict[instance][solver][wall_time_key] = min_time + check_interval 

174 # Fix runtime objectives with resolved CPU/Wall times 

175 for key, value in job_output_dict[instance][solver].items(): 

176 objective = resolve_objective(key) 

177 if objective is not None and objective.time: 

178 if objective.use_time == UseTime.CPU_TIME: 

179 value = job_output_dict[instance][solver][cpu_time_key] 

180 else: 

181 value = job_output_dict[instance][solver][wall_time_key] 

182 if objective.post_process is not None: 

183 status = job_output_dict[instance][solver][status_key] 

184 value = objective.post_process(value, cutoff, status) 

185 job_output_dict[instance][solver][key] = value 

186 

187 for index, instance_name in enumerate(instances_set._instance_names): 

188 index_str = f"[{index + 1}/{num_instances}] " 

189 instance_output = job_output_dict[instance_name] 

190 if all([instance_output[k][status_key] == SolverStatus.TIMEOUT 

191 for k in instance_output.keys()]): 

192 print(f"\n{index_str}{instance_name} was not solved within the cutoff-time.") 

193 continue 

194 print(f"\n{index_str}{instance_name} yielded the following Solver results:") 

195 for sindex in range(index * num_solvers, (index + 1) * num_solvers): 

196 solver_name = solvers[sindex % num_solvers].name 

197 job_info = job_output_dict[instance_name][solver_name] 

198 print(f"\t- {solver_name} ended with status {job_info[status_key]} in " 

199 f"{job_info[cpu_time_key]}s CPU-Time ({job_info[wall_time_key]}s " 

200 "Wall clock time)") 

201 

202 # Write the results to a CSV 

203 csv_path = portfolio_path / "results.csv" 

204 values_header = [o.name for o in objectives] 

205 header = ["Instance", "Solver"] + values_header 

206 result_rows = [header] 

207 for instance_name in job_output_dict.keys(): 

208 for solver_name in job_output_dict[instance_name].keys(): 

209 job_o = job_output_dict[instance_name][solver_name] 

210 values = [instance_name, solver_name] + [ 

211 job_o[key] if key in job_o else "None" 

212 for key in values_header] 

213 result_rows.append(values) 

214 with csv_path.open("w") as out: 

215 writer = csv.writer(out) 

216 writer.writerows(result_rows) 

217 

218 

219def parser_function() -> argparse.ArgumentParser: 

220 """Define the command line arguments. 

221 

222 Returns: 

223 parser: The parser with the parsed command line arguments 

224 """ 

225 parser = argparse.ArgumentParser(description="Run a portfolio of solvers on an " 

226 "instance set in parallel.") 

227 parser.add_argument(*ac.InstanceSetPathsArgument.names, 

228 **ac.InstanceSetPathsArgument.kwargs) 

229 parser.add_argument(*ac.NicknamePortfolioArgument.names, 

230 **ac.NicknamePortfolioArgument.kwargs) 

231 parser.add_argument(*ac.SolversArgument.names, 

232 **ac.SolversArgument.kwargs) 

233 parser.add_argument(*ac.ObjectivesArgument.names, 

234 **ac.ObjectivesArgument.kwargs) 

235 parser.add_argument(*ac.CutOffTimeArgument.names, 

236 **ac.CutOffTimeArgument.kwargs) 

237 parser.add_argument(*ac.SolverSeedsArgument.names, 

238 **ac.SolverSeedsArgument.kwargs) 

239 parser.add_argument(*ac.RunOnArgument.names, 

240 **ac.RunOnArgument.kwargs) 

241 parser.add_argument(*ac.SettingsFileArgument.names, 

242 **ac.SettingsFileArgument.kwargs) 

243 return parser 

244 

245 

246def main(argv: list[str]) -> None: 

247 """Main method of run parallel portfolio command.""" 

248 # Log command call 

249 sl.log_command(sys.argv) 

250 check_for_initialise() 

251 

252 # Define command line arguments 

253 parser = parser_function() 

254 

255 # Process command line arguments 

256 args = parser.parse_args(argv) 

257 if args.solvers is not None: 

258 solver_paths = [resolve_object_name("".join(s), 

259 target_dir=gv.settings().DEFAULT_solver_dir) 

260 for s in args.solvers] 

261 if None in solver_paths: 

262 print("Some solvers not recognised! Check solver names:") 

263 for i, name in enumerate(solver_paths): 

264 if solver_paths[i] is None: 

265 print(f'\t- "{solver_paths[i]}" ') 

266 sys.exit(-1) 

267 solvers = [Solver(p) for p in solver_paths] 

268 else: 

269 solvers = [Solver(p) for p in 

270 gv.settings().DEFAULT_solver_dir.iterdir() if p.is_dir()] 

271 

272 # Compare current settings to latest.ini 

273 prev_settings = Settings(PurePath("Settings/latest.ini")) 

274 Settings.check_settings_changes(gv.settings(), prev_settings) 

275 

276 # Do first, so other command line options can override settings from the file 

277 if args.settings_file is not None: 

278 gv.settings().read_settings_ini(args.settings_file, SettingState.CMD_LINE) 

279 

280 portfolio_path = args.portfolio_name 

281 

282 if args.run_on is not None: 

283 gv.settings().set_run_on( 

284 args.run_on.value, SettingState.CMD_LINE) 

285 run_on = gv.settings().get_run_on() 

286 

287 if args.solver_seeds is not None: 

288 gv.settings().set_parallel_portfolio_number_of_seeds_per_solver( 

289 args.solver_seeds, SettingState.CMD_LINE) 

290 

291 if run_on == Runner.LOCAL: 

292 print("Parallel Portfolio is not fully supported yet for Local runs. Exiting.") 

293 sys.exit(-1) 

294 

295 # Retrieve instance set 

296 data_set = resolve_object_name( 

297 args.instance_path, 

298 gv.file_storage_data_mapping[gv.instances_nickname_path], 

299 gv.settings().DEFAULT_instance_dir, 

300 Instance_Set) 

301 

302 print(f"Running on {data_set.size} instance(s)...") 

303 

304 if args.cutoff_time is not None: 

305 gv.settings().set_general_target_cutoff_time(args.cutoff_time, 

306 SettingState.CMD_LINE) 

307 

308 if args.objectives is not None: 

309 gv.settings().set_general_sparkle_objectives( 

310 args.objectives, SettingState.CMD_LINE) 

311 if not gv.settings().get_general_sparkle_objectives()[0].time: 

312 print("ERROR: Parallel Portfolio is currently only relevant for " 

313 "RunTime objectives. In all other cases, use validation") 

314 sys.exit(-1) 

315 

316 if args.portfolio_name is not None: # Use a nickname 

317 portfolio_path = gv.settings().DEFAULT_parallel_portfolio_output_raw /\ 

318 args.portfolio_name 

319 else: # Generate a timestamped nickname 

320 timestamp = time.strftime("%Y-%m-%d-%H:%M:%S", time.gmtime(time.time())) 

321 randintstamp = int(random.getrandbits(32)) 

322 portfolio_path = gv.settings().DEFAULT_parallel_portfolio_output_raw /\ 

323 f"{timestamp}_{randintstamp}" 

324 if portfolio_path.exists(): 

325 print(f"[WARNING] Portfolio path {portfolio_path} already exists! " 

326 "Overwrite? [y/n] ", end="") 

327 user_input = input() 

328 if user_input != "y": 

329 sys.exit() 

330 shutil.rmtree(portfolio_path) 

331 portfolio_path.mkdir(parents=True) 

332 run_parallel_portfolio(data_set, portfolio_path, solvers, run_on=run_on) 

333 

334 # Update latest scenario 

335 gv.latest_scenario().set_parallel_portfolio_path(portfolio_path) 

336 gv.latest_scenario().set_latest_scenario(Scenario.PARALLEL_PORTFOLIO) 

337 gv.latest_scenario().set_parallel_portfolio_instance_path(args.instance_path) 

338 # Write used scenario to file 

339 gv.latest_scenario().write_scenario_ini() 

340 # Write used settings to file 

341 gv.settings().write_used_settings() 

342 print("Running Sparkle parallel portfolio is done!") 

343 sys.exit(0) 

344 

345 

346if __name__ == "__main__": 

347 main(sys.argv[1:])