Coverage for sparkle/CLI/run_parallel_portfolio.py: 73%

244 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-09-29 10:17 +0000

1#!/usr/bin/env python3 

2# -*- coding: UTF-8 -*- 

3"""Sparkle command to execute a parallel algorithm portfolio.""" 

4 

5import sys 

6import argparse 

7import random 

8import time 

9import shutil 

10import itertools 

11from operator import mod 

12from pathlib import Path 

13 

14from tqdm import tqdm 

15 

16import runrunner as rrr 

17from runrunner.base import Runner, Run 

18from runrunner.slurm import Status, SlurmRun 

19 

20from sparkle.CLI.help import logging as sl 

21from sparkle.CLI.help import global_variables as gv 

22from sparkle.CLI.initialise import check_for_initialise 

23from sparkle.CLI.help import argparse_custom as ac 

24from sparkle.CLI.help.nicknames import resolve_object_name 

25from sparkle.platform.settings_objects import Settings 

26from sparkle.solver import Solver 

27from sparkle.instance import Instance_Set, InstanceSet 

28from sparkle.types import SolverStatus, resolve_objective, UseTime 

29from sparkle.structures import PerformanceDataFrame 

30 

31 

32def parser_function() -> argparse.ArgumentParser: 

33 """Define the command line arguments. 

34 

35 Returns: 

36 parser: The parser with the parsed command line arguments 

37 """ 

38 parser = argparse.ArgumentParser( 

39 description="Run a portfolio of solvers on an instance set in parallel." 

40 ) 

41 parser.add_argument( 

42 *ac.InstanceSetPathsArgument.names, **ac.InstanceSetPathsArgument.kwargs 

43 ) 

44 parser.add_argument( 

45 *ac.NicknamePortfolioArgument.names, **ac.NicknamePortfolioArgument.kwargs 

46 ) 

47 parser.add_argument(*ac.SolversArgument.names, **ac.SolversArgument.kwargs) 

48 # Settings arguments 

49 parser.add_argument(*ac.SettingsFileArgument.names, **ac.SettingsFileArgument.kwargs) 

50 parser.add_argument( 

51 *Settings.OPTION_objectives.args, **Settings.OPTION_objectives.kwargs 

52 ) 

53 parser.add_argument( 

54 *Settings.OPTION_solver_cutoff_time.args, 

55 **Settings.OPTION_solver_cutoff_time.kwargs, 

56 ) 

57 parser.add_argument( 

58 *Settings.OPTION_parallel_portfolio_number_of_seeds_per_solver.args, 

59 **Settings.OPTION_parallel_portfolio_number_of_seeds_per_solver.kwargs, 

60 ) 

61 parser.add_argument(*Settings.OPTION_run_on.args, **Settings.OPTION_run_on.kwargs) 

62 return parser 

63 

64 

65def create_performance_dataframe( 

66 solvers: list[Solver], instances_set: InstanceSet, portfolio_path: Path 

67) -> PerformanceDataFrame: 

68 """Create a PerformanceDataFrame for the given solvers and instances. 

69 

70 Args: 

71 solvers: List of solvers to include in the PerformanceDataFrame. 

72 instances_set: Set of instances to include in the PerformanceDataFrame. 

73 portfolio_path: Path to save the CSV file. 

74 

75 Returns: 

76 pdf: PerformanceDataFrame object initialized with solvers and instances. 

77 """ 

78 instances = instances_set.instance_names 

79 solvers = [str(s.directory) for s in solvers] 

80 objectives = gv.settings().objectives 

81 csv_path = portfolio_path / "results.csv" 

82 return PerformanceDataFrame( 

83 csv_filepath=csv_path, 

84 solvers=solvers, 

85 objectives=objectives, 

86 instances=instances, 

87 ) 

88 

89 

90def init_default_objectives() -> list: 

91 """Initialize default objective values and key names. 

92 

93 Returns: 

94 default_objective_values: Dictionary with default values for each objective. 

95 cpu_time_key: Key for CPU time in the default values. 

96 status_key: Key for status in the default values. 

97 wall_time_key: Key for wall clock time in the default values. 

98 """ 

99 # We record the 'best' of all seed results per solver-instance, 

100 # setting start values for objectives that are always present 

101 objectives = gv.settings().objectives 

102 cutoff = gv.settings().solver_cutoff_time 

103 cpu_time_key = [o.name for o in objectives if o.name.startswith("cpu_time")][0] 

104 status_key = [o.name for o in objectives if o.name.startswith("status")][0] 

105 wall_time_key = [o.name for o in objectives if o.name.startswith("wall_time")][0] 

106 default_objective_values = {} 

107 

108 for o in objectives: 

109 default_value = float(sys.maxsize) if o.minimise else 0 

110 # Default values for time objectives can be linked to cutoff time 

111 if o.time and o.post_process: 

112 default_value = o.post_process(default_value, cutoff, SolverStatus.KILLED) 

113 default_objective_values[o.name] = default_value 

114 default_objective_values[status_key] = SolverStatus.UNKNOWN # Overwrite status 

115 return default_objective_values, cpu_time_key, status_key, wall_time_key 

116 

117 

118def monitor_jobs( 

119 run: Run, 

120 instances_set: InstanceSet, 

121 solvers: list[Solver], 

122 default_objective_values: dict, 

123 run_on: Runner = Runner.SLURM, 

124) -> dict: 

125 """Monitor job progress and update job output dictionary. 

126 

127 Args: 

128 run: The run object containing the submitted jobs. 

129 instances_set: Set of instances to run on. 

130 solvers: List of solvers to run on the instances. 

131 default_objective_values: Default objective values for each solver-instance. 

132 run_on: Unused 

133 

134 Returns: 

135 job_output_dict: Dictionary containing the job output for each instance-solver 

136 combination. 

137 """ 

138 num_solvers, num_instances = len(solvers), len(instances_set._instance_paths) 

139 seeds_per_solver = gv.settings().parallel_portfolio_num_seeds_per_solver 

140 n_instance_jobs = num_solvers * seeds_per_solver 

141 

142 job_output_dict = { 

143 instance_name: { 

144 solver.name: default_objective_values.copy() for solver in solvers 

145 } 

146 for instance_name in instances_set._instance_names 

147 } 

148 

149 check_interval = gv.settings().parallel_portfolio_check_interval 

150 instances_done = [False] * num_instances 

151 

152 with tqdm(total=len(instances_done)) as pbar: 

153 pbar.set_description("Instances done") 

154 while not all(instances_done): 

155 prev_done = sum(instances_done) 

156 time.sleep(check_interval) 

157 job_status_list = [r.status for r in run.jobs] 

158 job_status_completed = [ 

159 status == Status.COMPLETED for status in job_status_list 

160 ] 

161 # The jobs are sorted by instance 

162 for i, instance in enumerate(instances_set._instance_paths): 

163 if instances_done[i]: 

164 continue 

165 instance_job_slice = slice( 

166 i * n_instance_jobs, (i + 1) * n_instance_jobs 

167 ) 

168 if any(job_status_completed[instance_job_slice]): 

169 instances_done[i] = True 

170 # Kill remaining jobs for this instance. 

171 solver_kills = [0] * num_solvers 

172 for job_index in range( 

173 i * n_instance_jobs, (i + 1) * n_instance_jobs 

174 ): 

175 if not job_status_completed[job_index]: 

176 run.jobs[job_index].kill() 

177 solver_index = int( 

178 (mod(job_index, n_instance_jobs)) // seeds_per_solver 

179 ) 

180 solver_kills[solver_index] += 1 

181 for solver_index in range(num_solvers): 

182 # All seeds of a solver were killed on instance, set status kill 

183 if solver_kills[solver_index] == seeds_per_solver: 

184 solver_name = solvers[solver_index].name 

185 job_output_dict[instance.stem][solver_name]["status"] = ( 

186 SolverStatus.KILLED 

187 ) 

188 pbar.update(sum(instances_done) - prev_done) 

189 return job_output_dict 

190 

191 

192def wait_for_logs(cmd_list: list[str]) -> None: 

193 """Wait for all log files to be written. 

194 

195 Args: 

196 cmd_list: List of command strings for all instance-solver-seed combinations. 

197 """ 

198 # Attempt to verify that all logs have been written (Slurm I/O latency) 

199 check_interval = gv.settings().parallel_portfolio_check_interval 

200 for cmd in cmd_list: 

201 runsolver_configuration = cmd.split(" ")[:11] 

202 logs = [ 

203 Path(p) 

204 for p in runsolver_configuration 

205 if Path(p).suffix in [".log", ".val", ".rawres"] 

206 ] 

207 if not all(p.exists() for p in logs): 

208 time.sleep(check_interval) 

209 

210 

211def update_results_from_logs( 

212 cmd_list: list[str], 

213 run: Run, 

214 solvers: list[Solver], 

215 job_output_dict: dict, 

216 cpu_time_key: str, 

217) -> dict: 

218 """Parse logs to update job output dictionary with best objective values. 

219 

220 Args: 

221 cmd_list: List of command strings for all instance-solver-seed combinations. 

222 run: The run object containing the submitted jobs. 

223 solvers: List of solvers to run on the instances. 

224 job_output_dict: Dictionary containing the job output for each intsance-solver 

225 combination. 

226 cpu_time_key: Key for CPU time in the job output dictionary. 

227 

228 Returns: 

229 job_output_dict: Updated job output dictionary with best objective values. 

230 """ 

231 seeds_per_solver = gv.settings().parallel_portfolio_num_seeds_per_solver 

232 num_solvers = len(solvers) 

233 n_instance_jobs = num_solvers * seeds_per_solver 

234 objectives = gv.settings().objectives 

235 

236 for index, cmd in enumerate(cmd_list): 

237 solver_index = (mod(index, n_instance_jobs)) // seeds_per_solver 

238 solver_obj = solvers[solver_index] 

239 solver_output = Solver.parse_solver_output( 

240 run.jobs[index].stdout, 

241 cmd.split(" "), 

242 objectives=objectives, 

243 verifier=solver_obj.verifier, 

244 ) 

245 instance_name = list(job_output_dict.keys())[index // n_instance_jobs] 

246 cpu_time = solver_output[cpu_time_key] 

247 cmd_output = job_output_dict[instance_name][solver_obj.name] 

248 if cpu_time > 0.0 and cpu_time < cmd_output[cpu_time_key]: 

249 for key, value in solver_output.items(): 

250 if key in [o.name for o in objectives]: 

251 job_output_dict[instance_name][solver_obj.name][key] = value 

252 if cmd_output.get("status") != SolverStatus.KILLED: 

253 cmd_output["status"] = solver_output.get("status") 

254 return job_output_dict 

255 

256 

257def fix_missing_times( 

258 job_output_dict: dict, status_key: str, cpu_time_key: str, wall_time_key: str 

259) -> dict: 

260 """Fix CPU and wall clock times for solvers that did not produce logs. 

261 

262 Args: 

263 job_output_dict: Dictionary containing the job output for each instance-solver 

264 combination. 

265 status_key: Key for status in the job output dictionary. 

266 cpu_time_key: Key for CPU time in the job output dictionary. 

267 wall_time_key: Key for wall clock time in the job output dictionary. 

268 

269 Returns: 

270 job_output_dict: Updated job output dictionary with fixed CPU and wall clock 

271 times. 

272 """ 

273 cutoff = gv.settings().solver_cutoff_time 

274 check_interval = gv.settings().parallel_portfolio_check_interval 

275 

276 # Fix the CPU/WC time for non existent logs to instance min time + check_interval 

277 for instance in job_output_dict.keys(): 

278 no_log_solvers = [] 

279 min_time = cutoff 

280 for solver in job_output_dict[instance].keys(): 

281 cpu_time = job_output_dict[instance][solver][cpu_time_key] 

282 if cpu_time == -1.0 or cpu_time == float(sys.maxsize): 

283 no_log_solvers.append(solver) 

284 elif cpu_time < min_time: 

285 min_time = cpu_time 

286 for solver in no_log_solvers: 

287 job_output_dict[instance][solver][cpu_time_key] = min_time + check_interval 

288 job_output_dict[instance][solver][wall_time_key] = min_time + check_interval 

289 # Fix runtime objectives with resolved CPU/Wall times 

290 for key, value in job_output_dict[instance][solver].items(): 

291 objective = resolve_objective(key) 

292 if objective is not None and objective.time: 

293 value = ( 

294 job_output_dict[instance][solver][cpu_time_key] 

295 if objective.use_time == UseTime.CPU_TIME 

296 else job_output_dict[instance][solver][wall_time_key] 

297 ) 

298 if objective.post_process is not None: 

299 status = job_output_dict[instance][solver][status_key] 

300 value = objective.post_process(value, cutoff, status) 

301 job_output_dict[instance][solver][key] = value 

302 return job_output_dict 

303 

304 

305def print_and_write_results( 

306 job_output_dict: dict, 

307 solvers: list[Solver], 

308 instances_set: InstanceSet, 

309 portfolio_path: Path, 

310 status_key: str, 

311 cpu_time_key: str, 

312 wall_time_key: str, 

313 pdf: PerformanceDataFrame, 

314) -> None: 

315 """Print results to console and write the CSV file.""" 

316 num_instances = len(job_output_dict) 

317 num_solvers = len(solvers) 

318 objectives = gv.settings().objectives 

319 for index, instance_name in enumerate(job_output_dict.keys()): 

320 index_str = f"[{index + 1}/{num_instances}] " 

321 instance_output = job_output_dict[instance_name] 

322 if all( 

323 instance_output[k][status_key] == SolverStatus.TIMEOUT 

324 for k in instance_output 

325 ): 

326 print(f"\n{index_str}{instance_name} was not solved within the cutoff-time.") 

327 continue 

328 print(f"\n{index_str}{instance_name} yielded the following Solver results:") 

329 for sindex in range(index * num_solvers, (index + 1) * num_solvers): 

330 solver_name = solvers[mod(sindex, num_solvers)].name 

331 job_info = job_output_dict[instance_name][solver_name] 

332 print( 

333 f"\t- {solver_name} ended with status {job_info[status_key]} in " 

334 f"{job_info[cpu_time_key]}s CPU-Time ({job_info[wall_time_key]}s " 

335 "Wall clock time)" 

336 ) 

337 

338 instance_map = {Path(p).name: p for p in pdf.instances} 

339 solver_map = {Path(s).name: s for s in pdf.solvers} 

340 for instance, instance_dict in job_output_dict.items(): 

341 instance_name = Path(instance).name 

342 instance_full_path = instance_map.get(instance_name, instance) 

343 for solver, objective_dict in instance_dict.items(): 

344 solver_name = Path(solver).name 

345 solver_full_path = solver_map.get(solver_name, solver) 

346 for objective in objectives: 

347 obj_name = objective.name 

348 obj_val = objective_dict.get( 

349 obj_name, PerformanceDataFrame.missing_value 

350 ) 

351 pdf.set_value( 

352 value=obj_val, 

353 solver=solver_full_path, 

354 instance=instance_full_path, 

355 objective=obj_name, 

356 ) 

357 pdf.save_csv() 

358 

359 

360def build_command_list( 

361 instances_set: InstanceSet, 

362 solvers: list[Solver], 

363 portfolio_path: Path, 

364 performance_data: PerformanceDataFrame, 

365) -> list[str]: 

366 """Build the list of command strings for all instance-solver-seed combinations. 

367 

368 Args: 

369 instances_set: Set of instances to run on. 

370 solvers: List of solvers to run on the instances. 

371 portfolio_path: Path to the parallel portfolio. 

372 performance_data: PerformanceDataFrame object. 

373 

374 Returns: 

375 cmd_list: List of command strings for all instance-solver-seed combinations. 

376 """ 

377 cutoff = gv.settings().solver_cutoff_time 

378 objectives = gv.settings().objectives 

379 seeds_per_solver = gv.settings().parallel_portfolio_num_seeds_per_solver 

380 cmd_list = [] 

381 

382 # Create a command for each instance-solver-seed combination 

383 for instance, solver in itertools.product(instances_set._instance_paths, solvers): 

384 for _ in range(seeds_per_solver): 

385 seed = int(random.getrandbits(32)) 

386 solver_call_list = solver.build_cmd( 

387 instance.absolute(), 

388 objectives=objectives, 

389 seed=seed, 

390 cutoff_time=cutoff, 

391 log_dir=portfolio_path, 

392 ) 

393 

394 cmd_list.append(" ".join(solver_call_list)) 

395 for objective in objectives: 

396 performance_data.set_value( 

397 value=seed, 

398 solver=str(solver.directory), 

399 instance=instance.stem, 

400 objective=objective.name, 

401 solver_fields=["Seed"], 

402 ) 

403 return cmd_list 

404 

405 

406def submit_jobs( 

407 cmd_list: list[str], 

408 solvers: list[Solver], 

409 instances_set: InstanceSet, 

410 run_on: Runner = Runner.SLURM, 

411) -> SlurmRun: 

412 """Submit jobs to the runner and return the run object. 

413 

414 Args: 

415 cmd_list: List of command strings for all instance-solver-seed combinations. 

416 solvers: List of solvers to run on the instances. 

417 instances_set: Set of instances to run on. 

418 run_on: Runner to use for submitting the jobs. 

419 

420 Returns: 

421 run: The run object containing the submitted jobs. 

422 """ 

423 seeds_per_solver = gv.settings().parallel_portfolio_num_seeds_per_solver 

424 num_solvers, num_instances = len(solvers), len(instances_set._instance_paths) 

425 num_jobs = num_solvers * num_instances * seeds_per_solver 

426 parallel_jobs = min(gv.settings().slurm_jobs_in_parallel, num_jobs) 

427 if parallel_jobs > num_jobs: 

428 print( 

429 "WARNING: Not all jobs will be started at the same time due to the " 

430 "limitation of number of Slurm jobs that can be run in parallel. Check" 

431 " your Sparkle Slurm Settings." 

432 ) 

433 print( 

434 f"Sparkle parallel portfolio is running {seeds_per_solver} seed(s) per solver " 

435 f"on {num_solvers} solvers for {num_instances} instances ..." 

436 ) 

437 

438 sbatch_options = gv.settings().sbatch_settings 

439 solver_names = ", ".join([s.name for s in solvers]) 

440 # Jobs are added in to the runrunner object in the same order they are provided 

441 return rrr.add_to_queue( 

442 runner=run_on, 

443 cmd=cmd_list, 

444 name=f"Parallel Portfolio: {solver_names}", 

445 parallel_jobs=parallel_jobs, 

446 base_dir=sl.caller_log_dir, 

447 srun_options=["-N1", "-n1"] + sbatch_options, 

448 sbatch_options=sbatch_options, 

449 prepend=gv.settings().slurm_job_prepend, 

450 ) 

451 

452 

453def main(argv: list[str]) -> None: 

454 """Main method of run parallel portfolio command.""" 

455 # Define command line arguments 

456 parser = parser_function() 

457 

458 # Process command line arguments 

459 args = parser.parse_args(argv) 

460 settings = gv.settings(args) 

461 

462 # Log command call 

463 sl.log_command(sys.argv, settings.random_state) 

464 check_for_initialise() 

465 

466 # Compare current settings to latest.ini 

467 prev_settings = Settings(Settings.DEFAULT_previous_settings_path) 

468 Settings.check_settings_changes(settings, prev_settings) 

469 

470 if args.solvers is not None: 

471 solver_paths = [ 

472 resolve_object_name("".join(s), target_dir=settings.DEFAULT_solver_dir) 

473 for s in args.solvers 

474 ] 

475 if None in solver_paths: 

476 print("Some solvers not recognised! Check solver names:") 

477 for i, name in enumerate(solver_paths): 

478 if solver_paths[i] is None: 

479 print(f'\t- "{solver_paths[i]}" ') 

480 sys.exit(-1) 

481 solvers = [Solver(p) for p in solver_paths] 

482 else: 

483 solvers = [ 

484 Solver(p) for p in settings.DEFAULT_solver_dir.iterdir() if p.is_dir() 

485 ] 

486 

487 portfolio_path = args.portfolio_name 

488 

489 run_on = settings.run_on 

490 if run_on == Runner.LOCAL: 

491 print("Parallel Portfolio is not fully supported yet for Local runs. Exiting.") 

492 sys.exit(-1) 

493 

494 # Retrieve instance sets 

495 instances = [ 

496 resolve_object_name( 

497 instance_path, 

498 gv.file_storage_data_mapping[gv.instances_nickname_path], 

499 gv.settings().DEFAULT_instance_dir, 

500 Instance_Set, 

501 ) 

502 for instance_path in args.instance_path 

503 ] 

504 # Join them into one 

505 if len(instances) > 1: 

506 print( 

507 "WARNING: More than one instance set specified. " 

508 "Currently only supporting one." 

509 ) 

510 instances = instances[0] 

511 

512 print(f"Running on {instances.size} instance(s)...") 

513 

514 if not settings.objectives[0].time: 

515 print( 

516 "ERROR: Parallel Portfolio is currently only relevant for " 

517 "RunTime objectives. In all other cases, use validation" 

518 ) 

519 sys.exit(-1) 

520 

521 if args.portfolio_name is not None: # Use a nickname 

522 portfolio_path = settings.DEFAULT_parallel_portfolio_output / args.portfolio_name 

523 else: # Generate a timestamped nickname 

524 timestamp = time.strftime("%Y-%m-%d-%H:%M:%S", time.gmtime(time.time())) 

525 randintstamp = int(random.getrandbits(32)) 

526 portfolio_path = ( 

527 settings.DEFAULT_parallel_portfolio_output / f"{timestamp}_{randintstamp}" 

528 ) 

529 if portfolio_path.exists(): 

530 print( 

531 f"[WARNING] Portfolio path {portfolio_path} already exists! " 

532 "Overwrite? [y/n] ", 

533 end="", 

534 ) 

535 user_input = input() 

536 if user_input != "y": 

537 sys.exit() 

538 shutil.rmtree(portfolio_path) 

539 

540 portfolio_path.mkdir(parents=True) 

541 pdf = create_performance_dataframe(solvers, instances, portfolio_path) 

542 returned_cmd = build_command_list(instances, solvers, portfolio_path, pdf) 

543 default_objective_values, cpu_time_key, status_key, wall_time_key = ( 

544 init_default_objectives() 

545 ) 

546 returned_run = submit_jobs(returned_cmd, solvers, instances, Runner.SLURM) 

547 job_output_dict = monitor_jobs( 

548 returned_run, instances, solvers, default_objective_values 

549 ) 

550 wait_for_logs(returned_cmd) 

551 job_output_dict = update_results_from_logs( 

552 returned_cmd, returned_run, solvers, job_output_dict, cpu_time_key 

553 ) 

554 job_output_dict = fix_missing_times( 

555 job_output_dict, status_key, cpu_time_key, wall_time_key 

556 ) 

557 print_and_write_results( 

558 job_output_dict, 

559 solvers, 

560 instances, 

561 portfolio_path, 

562 status_key, 

563 cpu_time_key, 

564 wall_time_key, 

565 pdf, 

566 ) 

567 

568 # Write used settings to file 

569 settings.write_used_settings() 

570 print("Running Sparkle parallel portfolio is done!") 

571 sys.exit(0) 

572 

573 

574if __name__ == "__main__": 

575 main(sys.argv[1:])