Coverage for sparkle/CLI/run_parallel_portfolio.py: 93%

253 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-07-01 13:21 +0000

1#!/usr/bin/env python3 

2# -*- coding: UTF-8 -*- 

3"""Sparkle command to execute a parallel algorithm portfolio.""" 

4 

5import sys 

6import argparse 

7import random 

8import time 

9import shutil 

10import itertools 

11from operator import mod 

12from pathlib import Path, PurePath 

13 

14from tqdm import tqdm 

15 

16import runrunner as rrr 

17from runrunner.base import Runner, Run 

18from runrunner.slurm import Status, SlurmRun 

19 

20from sparkle.CLI.help import logging as sl 

21from sparkle.CLI.help import global_variables as gv 

22from sparkle.CLI.initialise import check_for_initialise 

23from sparkle.CLI.help import argparse_custom as ac 

24from sparkle.CLI.help.nicknames import resolve_object_name 

25from sparkle.platform.settings_objects import Settings, SettingState 

26from sparkle.solver import Solver 

27from sparkle.instance import Instance_Set, InstanceSet 

28from sparkle.types import SolverStatus, resolve_objective, UseTime 

29from sparkle.structures import PerformanceDataFrame 

30 

31 

32def parser_function() -> argparse.ArgumentParser: 

33 """Define the command line arguments. 

34 

35 Returns: 

36 parser: The parser with the parsed command line arguments 

37 """ 

38 parser = argparse.ArgumentParser(description="Run a portfolio of solvers on an " 

39 "instance set in parallel.") 

40 parser.add_argument(*ac.InstanceSetPathsArgument.names, 

41 **ac.InstanceSetPathsArgument.kwargs) 

42 parser.add_argument(*ac.NicknamePortfolioArgument.names, 

43 **ac.NicknamePortfolioArgument.kwargs) 

44 parser.add_argument(*ac.SolversArgument.names, 

45 **ac.SolversArgument.kwargs) 

46 parser.add_argument(*ac.ObjectivesArgument.names, 

47 **ac.ObjectivesArgument.kwargs) 

48 parser.add_argument(*ac.CutOffTimeArgument.names, 

49 **ac.CutOffTimeArgument.kwargs) 

50 parser.add_argument(*ac.SolverSeedsArgument.names, 

51 **ac.SolverSeedsArgument.kwargs) 

52 parser.add_argument(*ac.RunOnArgument.names, 

53 **ac.RunOnArgument.kwargs) 

54 parser.add_argument(*ac.SettingsFileArgument.names, 

55 **ac.SettingsFileArgument.kwargs) 

56 return parser 

57 

58 

59def create_performance_dataframe(solvers: list[Solver], 

60 instances_set: InstanceSet, 

61 portfolio_path: Path) -> PerformanceDataFrame: 

62 """Create a PerformanceDataFrame for the given solvers and instances. 

63 

64 Args: 

65 solvers: List of solvers to include in the PerformanceDataFrame. 

66 instances_set: Set of instances to include in the PerformanceDataFrame. 

67 csv_path: Path to save the CSV file. 

68 

69 Returns: 

70 pdf: PerformanceDataFrame object initialized with solvers and instances. 

71 """ 

72 instances = instances_set.instance_names 

73 solvers = [str(s.directory) for s in solvers] 

74 objectives = gv.settings().get_general_sparkle_objectives() 

75 csv_path = portfolio_path / "results.csv" 

76 return PerformanceDataFrame(csv_filepath=csv_path, 

77 solvers=solvers, 

78 objectives=objectives, 

79 instances=instances 

80 ) 

81 

82 

83def build_command_list(instances_set: InstanceSet, 

84 solvers: list[Solver], 

85 portfolio_path: Path, 

86 pdf: PerformanceDataFrame) -> list[str]: 

87 """Build the list of command strings for all instance-solver-seed combinations. 

88 

89 Args: 

90 instances_set: Set of instances to run on. 

91 solvers: List of solvers to run on the instances. 

92 portfolio_path: Path to the parallel portfolio. 

93 

94 Returns: 

95 cmd_list: List of command strings for all instance-solver-seed combinations. 

96 """ 

97 cutoff = gv.settings().get_general_solver_cutoff_time() 

98 objectives = gv.settings().get_general_sparkle_objectives() 

99 seeds_per_solver = gv.settings().get_parallel_portfolio_number_of_seeds_per_solver() 

100 cmd_list = [] 

101 

102 # Create a command for each instance-solver-seed combination 

103 for instance, solver in itertools.product(instances_set._instance_paths, solvers): 

104 for _ in range(seeds_per_solver): 

105 seed = int(random.getrandbits(32)) 

106 solver_call_list = solver.build_cmd( 

107 instance.absolute(), 

108 objectives=objectives, 

109 seed=seed, 

110 cutoff_time=cutoff, 

111 log_dir=portfolio_path 

112 ) 

113 

114 cmd_list.append(" ".join(solver_call_list)) 

115 for objective in objectives: 

116 pdf.set_value( 

117 value=seed, 

118 solver=str(solver.directory), 

119 instance=instance.stem, 

120 objective=objective.name, 

121 solver_fields=["Seed"] 

122 ) 

123 return cmd_list 

124 

125 

126def init_default_objectives() -> list: 

127 """Initialize default objective values and key names. 

128 

129 Returns: 

130 default_objective_values: Dictionary with default values for each objective. 

131 cpu_time_key: Key for CPU time in the default values. 

132 status_key: Key for status in the default values. 

133 wall_time_key: Key for wall clock time in the default values. 

134 """ 

135 # We record the 'best' of all seed results per solver-instance, 

136 # setting start values for objectives that are always present 

137 objectives = gv.settings().get_general_sparkle_objectives() 

138 cutoff = gv.settings().get_general_solver_cutoff_time() 

139 cpu_time_key = [o.name for o in objectives if o.name.startswith("cpu_time")][0] 

140 status_key = [o.name for o in objectives if o.name.startswith("status")][0] 

141 wall_time_key = [o.name for o in objectives if o.name.startswith("wall_time")][0] 

142 default_objective_values = {} 

143 

144 for o in objectives: 

145 default_value = float(sys.maxsize) if o.minimise else 0 

146 # Default values for time objectives can be linked to cutoff time 

147 if o.time and o.post_process: 

148 default_value = o.post_process(default_value, cutoff, SolverStatus.KILLED) 

149 default_objective_values[o.name] = default_value 

150 default_objective_values[status_key] = SolverStatus.UNKNOWN # Overwrite status 

151 return default_objective_values, cpu_time_key, status_key, wall_time_key 

152 

153 

154def submit_jobs(cmd_list: list[str], 

155 solvers: list[Solver], 

156 instances_set: InstanceSet, 

157 run_on: Runner = Runner.SLURM) -> SlurmRun: 

158 """Submit jobs to the runner and return the run object. 

159 

160 Args: 

161 cmd_list: List of command strings for all instance-solver-seed combinations. 

162 solvers: List of solvers to run on the instances. 

163 instances_set: Set of instances to run on. 

164 run_on: Runner to use for submitting the jobs. 

165 

166 Returns: 

167 run: The run object containing the submitted jobs. 

168 """ 

169 seeds_per_solver = gv.settings().get_parallel_portfolio_number_of_seeds_per_solver() 

170 num_solvers, num_instances = len(solvers), len(instances_set._instance_paths) 

171 num_jobs = num_solvers * num_instances * seeds_per_solver 

172 parallel_jobs = min(gv.settings().get_number_of_jobs_in_parallel(), num_jobs) 

173 if parallel_jobs > num_jobs: 

174 print("WARNING: Not all jobs will be started at the same time due to the " 

175 "limitation of number of Slurm jobs that can be run in parallel. Check" 

176 " your Sparkle Slurm Settings.") 

177 print(f"Sparkle parallel portfolio is running {seeds_per_solver} seed(s) per solver " 

178 f"on {num_solvers} solvers for {num_instances} instances ...") 

179 

180 sbatch_options = gv.settings().get_slurm_extra_options(as_args=True) 

181 solver_names = ", ".join([s.name for s in solvers]) 

182 # Jobs are added in to the runrunner object in the same order they are provided 

183 return rrr.add_to_queue( 

184 runner=run_on, 

185 cmd=cmd_list, 

186 name=f"Parallel Portfolio: {solver_names}", 

187 parallel_jobs=parallel_jobs, 

188 base_dir=sl.caller_log_dir, 

189 srun_options=["-N1", "-n1"] + sbatch_options, 

190 sbatch_options=sbatch_options, 

191 prepend=gv.settings().get_slurm_job_prepend(), 

192 ) 

193 

194 

195def monitor_jobs(run: Run, 

196 instances_set: InstanceSet, 

197 solvers: list[Solver], 

198 default_objective_values: dict, 

199 run_on: Runner = Runner.SLURM) -> dict: 

200 """Monitor job progress and update job output dictionary. 

201 

202 Args: 

203 run: The run object containing the submitted jobs. 

204 instances_set: Set of instances to run on. 

205 solvers: List of solvers to run on the instances. 

206 default_objective_values: Default objective values for each solver-instance. 

207 

208 Returns: 

209 job_output_dict: Dictionary containing the job output for each instance-solver 

210 combination. 

211 """ 

212 num_solvers, num_instances = len(solvers), len(instances_set._instance_paths) 

213 seeds_per_solver = gv.settings().get_parallel_portfolio_number_of_seeds_per_solver() 

214 n_instance_jobs = num_solvers * seeds_per_solver 

215 

216 job_output_dict = { 

217 instance_name: {solver.name: 

218 default_objective_values.copy() for solver in solvers} 

219 for instance_name in instances_set._instance_names 

220 } 

221 

222 check_interval = gv.settings().get_parallel_portfolio_check_interval() 

223 instances_done = [False] * num_instances 

224 

225 with tqdm(total=len(instances_done)) as pbar: 

226 pbar.set_description("Instances done") 

227 while not all(instances_done): 

228 prev_done = sum(instances_done) 

229 time.sleep(check_interval) 

230 job_status_list = [r.status for r in run.jobs] 

231 job_status_completed = [status == Status.COMPLETED 

232 for status in job_status_list] 

233 # The jobs are sorted by instance 

234 for i, instance in enumerate(instances_set._instance_paths): 

235 if instances_done[i]: 

236 continue 

237 instance_job_slice = slice(i * n_instance_jobs, 

238 (i + 1) * n_instance_jobs) 

239 if any(job_status_completed[instance_job_slice]): 

240 instances_done[i] = True 

241 # Kill remaining jobs for this instance. 

242 solver_kills = [0] * num_solvers 

243 for job_index in range(i * n_instance_jobs, 

244 (i + 1) * n_instance_jobs): 

245 if not job_status_completed[job_index]: 

246 run.jobs[job_index].kill() 

247 solver_index = int( 

248 (mod(job_index, n_instance_jobs)) 

249 // seeds_per_solver) 

250 solver_kills[solver_index] += 1 

251 for solver_index in range(num_solvers): 

252 # All seeds of a solver were killed on instance, set status kill 

253 if solver_kills[solver_index] == seeds_per_solver: 

254 solver_name = solvers[solver_index].name 

255 job_output_dict[instance.stem][solver_name]["status"] =\ 

256 SolverStatus.KILLED 

257 pbar.update(sum(instances_done) - prev_done) 

258 return job_output_dict 

259 

260 

261def wait_for_logs(cmd_list: list[str]) -> None: 

262 """Wait for all log files to be written. 

263 

264 Args: 

265 cmd_list: List of command strings for all instance-solver-seed combinations. 

266 """ 

267 # Attempt to verify that all logs have been written (Slurm I/O latency) 

268 check_interval = gv.settings().get_parallel_portfolio_check_interval() 

269 for cmd in cmd_list: 

270 runsolver_configuration = cmd.split(" ")[:11] 

271 logs = [Path(p) for p in runsolver_configuration 

272 if Path(p).suffix in [".log", ".val", ".rawres"]] 

273 if not all(p.exists() for p in logs): 

274 time.sleep(check_interval) 

275 

276 

277def update_results_from_logs(cmd_list: list[str], run: Run, solvers: list[Solver], 

278 job_output_dict: dict, 

279 cpu_time_key: str) -> dict: 

280 """Parse logs to update job output dictionary with best objective values. 

281 

282 Args: 

283 cmd_list: List of command strings for all instance-solver-seed combinations. 

284 run: The run object containing the submitted jobs. 

285 solvers: List of solvers to run on the instances. 

286 job_output_dict: Dictionary containing the job output for each intsance-solver 

287 combination. 

288 cpu_time_key: Key for CPU time in the job output dictionary. 

289 

290 Returns: 

291 job_output_dict: Updated job output dictionary with best objective values. 

292 """ 

293 seeds_per_solver = gv.settings().get_parallel_portfolio_number_of_seeds_per_solver() 

294 num_solvers = len(solvers) 

295 n_instance_jobs = num_solvers * seeds_per_solver 

296 objectives = gv.settings().get_general_sparkle_objectives() 

297 

298 for index, cmd in enumerate(cmd_list): 

299 solver_index = (mod(index, n_instance_jobs)) // seeds_per_solver 

300 solver_obj = solvers[solver_index] 

301 solver_output = Solver.parse_solver_output( 

302 run.jobs[index].stdout, 

303 cmd.split(" "), 

304 objectives=objectives, 

305 verifier=solver_obj.verifier 

306 ) 

307 instance_name = list(job_output_dict.keys())[index // n_instance_jobs] 

308 cpu_time = solver_output[cpu_time_key] 

309 cmd_output = job_output_dict[instance_name][solver_obj.name] 

310 if cpu_time > 0.0 and cpu_time < cmd_output[cpu_time_key]: 

311 for key, value in solver_output.items(): 

312 if key in [o.name for o in objectives]: 

313 job_output_dict[instance_name][solver_obj.name][key] = value 

314 if cmd_output.get("status") != SolverStatus.KILLED: 

315 cmd_output["status"] = solver_output.get("status") 

316 return job_output_dict 

317 

318 

319def fix_missing_times(job_output_dict: dict, 

320 status_key: str, 

321 cpu_time_key: str, 

322 wall_time_key: str) -> dict: 

323 """Fix CPU and wall clock times for solvers that did not produce logs. 

324 

325 Args: 

326 job_output_dict: Dictionary containing the job output for each instance-solver 

327 combination. 

328 status_key: Key for status in the job output dictionary. 

329 cpu_time_key: Key for CPU time in the job output dictionary. 

330 wall_time_key: Key for wall clock time in the job output dictionary. 

331 

332 Returns: 

333 job_output_dict: Updated job output dictionary with fixed CPU and wall clock 

334 times. 

335 """ 

336 cutoff = gv.settings().get_general_solver_cutoff_time() 

337 check_interval = gv.settings().get_parallel_portfolio_check_interval() 

338 

339 # Fix the CPU/WC time for non existent logs to instance min time + check_interval 

340 for instance in job_output_dict.keys(): 

341 no_log_solvers = [] 

342 min_time = cutoff 

343 for solver in job_output_dict[instance].keys(): 

344 cpu_time = job_output_dict[instance][solver][cpu_time_key] 

345 if cpu_time == -1.0 or cpu_time == float(sys.maxsize): 

346 no_log_solvers.append(solver) 

347 elif cpu_time < min_time: 

348 min_time = cpu_time 

349 for solver in no_log_solvers: 

350 job_output_dict[instance][solver][cpu_time_key] = min_time + check_interval 

351 job_output_dict[instance][solver][wall_time_key] = min_time + check_interval 

352 # Fix runtime objectives with resolved CPU/Wall times 

353 for key, value in job_output_dict[instance][solver].items(): 

354 objective = resolve_objective(key) 

355 if objective is not None and objective.time: 

356 value = (job_output_dict[instance][solver][cpu_time_key] 

357 if objective.use_time == UseTime.CPU_TIME 

358 else job_output_dict[instance][solver][wall_time_key]) 

359 if objective.post_process is not None: 

360 status = job_output_dict[instance][solver][status_key] 

361 value = objective.post_process(value, cutoff, status) 

362 job_output_dict[instance][solver][key] = value 

363 return job_output_dict 

364 

365 

366def print_and_write_results(job_output_dict: dict, 

367 solvers: list[Solver], 

368 instances_set: InstanceSet, 

369 portfolio_path: Path, 

370 status_key: str, 

371 cpu_time_key: str, 

372 wall_time_key: str, 

373 pdf: PerformanceDataFrame) -> None: 

374 """Print results to console and write the CSV file.""" 

375 num_instances = len(job_output_dict) 

376 num_solvers = len(solvers) 

377 objectives = gv.settings().get_general_sparkle_objectives() 

378 for index, instance_name in enumerate(job_output_dict.keys()): 

379 index_str = f"[{index + 1}/{num_instances}] " 

380 instance_output = job_output_dict[instance_name] 

381 if all(instance_output[k][status_key] == SolverStatus.TIMEOUT 

382 for k in instance_output): 

383 print(f"\n{index_str}{instance_name} was not solved within the cutoff-time.") 

384 continue 

385 print(f"\n{index_str}{instance_name} yielded the following Solver results:") 

386 for sindex in range(index * num_solvers, (index + 1) * num_solvers): 

387 solver_name = solvers[mod(sindex, num_solvers)].name 

388 job_info = job_output_dict[instance_name][solver_name] 

389 print(f"\t- {solver_name} ended with status {job_info[status_key]} in " 

390 f"{job_info[cpu_time_key]}s CPU-Time ({job_info[wall_time_key]}s " 

391 "Wall clock time)") 

392 

393 instance_map = {Path(p).name: p for p in pdf.instances} 

394 solver_map = {Path(s).name: s for s in pdf.solvers} 

395 for instance, instance_dict in job_output_dict.items(): 

396 instance_name = Path(instance).name 

397 instance_full_path = instance_map.get(instance_name, instance) 

398 for solver, objective_dict in instance_dict.items(): 

399 solver_name = Path(solver).name 

400 solver_full_path = solver_map.get(solver_name, solver) 

401 for objective in objectives: 

402 obj_name = objective.name 

403 obj_val = objective_dict.get( 

404 obj_name, 

405 PerformanceDataFrame.missing_value 

406 ) 

407 pdf.set_value( 

408 value=obj_val, 

409 solver=solver_full_path, 

410 instance=instance_full_path, 

411 objective=obj_name 

412 ) 

413 pdf.save_csv() 

414 

415 

416def main(argv: list[str]) -> None: 

417 """Main method of run parallel portfolio command.""" 

418 # Log command call 

419 sl.log_command(sys.argv) 

420 check_for_initialise() 

421 

422 # Define command line arguments 

423 parser = parser_function() 

424 

425 # Process command line arguments 

426 args = parser.parse_args(argv) 

427 if args.solvers is not None: 

428 solver_paths = [resolve_object_name("".join(s), 

429 target_dir=gv.settings().DEFAULT_solver_dir) 

430 for s in args.solvers] 

431 if None in solver_paths: 

432 print("Some solvers not recognised! Check solver names:") 

433 for i, name in enumerate(solver_paths): 

434 if solver_paths[i] is None: 

435 print(f'\t- "{solver_paths[i]}" ') 

436 sys.exit(-1) 

437 solvers = [Solver(p) for p in solver_paths] 

438 else: 

439 solvers = [Solver(p) for p in 

440 gv.settings().DEFAULT_solver_dir.iterdir() if p.is_dir()] 

441 

442 # Compare current settings to latest.ini 

443 prev_settings = Settings(PurePath("Settings/latest.ini")) 

444 Settings.check_settings_changes(gv.settings(), prev_settings) 

445 

446 # Do first, so other command line options can override settings from the file 

447 if args.settings_file is not None: 

448 gv.settings().read_settings_ini(args.settings_file, SettingState.CMD_LINE) 

449 

450 portfolio_path = args.portfolio_name 

451 

452 if args.run_on is not None: 

453 gv.settings().set_run_on( 

454 args.run_on.value, SettingState.CMD_LINE) 

455 run_on = gv.settings().get_run_on() 

456 

457 if args.solver_seeds is not None: 

458 gv.settings().set_parallel_portfolio_number_of_seeds_per_solver( 

459 args.solver_seeds, SettingState.CMD_LINE) 

460 

461 if run_on == Runner.LOCAL: 

462 print("Parallel Portfolio is not fully supported yet for Local runs. Exiting.") 

463 sys.exit(-1) 

464 

465 # Retrieve instance sets 

466 instances = [resolve_object_name(instance_path, 

467 gv.file_storage_data_mapping[gv.instances_nickname_path], 

468 gv.settings().DEFAULT_instance_dir, Instance_Set) 

469 for instance_path in args.instance_path] 

470 # Join them into one 

471 if len(instances) > 1: 

472 print("WARNING: More than one instance set specified. " 

473 "Currently only supporting one.") 

474 instances = instances[0] 

475 

476 print(f"Running on {instances.size} instance(s)...") 

477 

478 if args.cutoff_time is not None: 

479 gv.settings().set_general_solver_cutoff_time(args.cutoff_time, 

480 SettingState.CMD_LINE) 

481 

482 if args.objectives is not None: 

483 gv.settings().set_general_sparkle_objectives( 

484 args.objectives, SettingState.CMD_LINE) 

485 if not gv.settings().get_general_sparkle_objectives()[0].time: 

486 print("ERROR: Parallel Portfolio is currently only relevant for " 

487 "RunTime objectives. In all other cases, use validation") 

488 sys.exit(-1) 

489 

490 if args.portfolio_name is not None: # Use a nickname 

491 portfolio_path = gv.settings().DEFAULT_parallel_portfolio_output /\ 

492 args.portfolio_name 

493 else: # Generate a timestamped nickname 

494 timestamp = time.strftime("%Y-%m-%d-%H:%M:%S", time.gmtime(time.time())) 

495 randintstamp = int(random.getrandbits(32)) 

496 portfolio_path = gv.settings().DEFAULT_parallel_portfolio_output /\ 

497 f"{timestamp}_{randintstamp}" 

498 if portfolio_path.exists(): 

499 print(f"[WARNING] Portfolio path {portfolio_path} already exists! " 

500 "Overwrite? [y/n] ", end="") 

501 user_input = input() 

502 if user_input != "y": 

503 sys.exit() 

504 shutil.rmtree(portfolio_path) 

505 

506 portfolio_path.mkdir(parents=True) 

507 pdf = create_performance_dataframe(solvers, instances, portfolio_path) 

508 returned_cmd = build_command_list(instances, solvers, portfolio_path, pdf) 

509 default_objective_values, cpu_time_key, \ 

510 status_key, wall_time_key = init_default_objectives() 

511 returned_run = submit_jobs(returned_cmd, solvers, instances, Runner.SLURM) 

512 job_output_dict = monitor_jobs(returned_run, instances, 

513 solvers, default_objective_values) 

514 wait_for_logs(returned_cmd) 

515 job_output_dict = update_results_from_logs(returned_cmd, returned_run, 

516 solvers, job_output_dict, cpu_time_key) 

517 job_output_dict = fix_missing_times(job_output_dict, 

518 status_key, cpu_time_key, wall_time_key) 

519 print_and_write_results(job_output_dict, solvers, instances, 

520 portfolio_path, status_key, 

521 cpu_time_key, wall_time_key, pdf 

522 ) 

523 

524 # Write used settings to file 

525 gv.settings().write_used_settings() 

526 print("Running Sparkle parallel portfolio is done!") 

527 sys.exit(0) 

528 

529 

530if __name__ == "__main__": 

531 main(sys.argv[1:])