Coverage for sparkle/CLI/run_parallel_portfolio.py: 73%
244 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-29 10:17 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-29 10:17 +0000
1#!/usr/bin/env python3
2# -*- coding: UTF-8 -*-
3"""Sparkle command to execute a parallel algorithm portfolio."""
5import sys
6import argparse
7import random
8import time
9import shutil
10import itertools
11from operator import mod
12from pathlib import Path
14from tqdm import tqdm
16import runrunner as rrr
17from runrunner.base import Runner, Run
18from runrunner.slurm import Status, SlurmRun
20from sparkle.CLI.help import logging as sl
21from sparkle.CLI.help import global_variables as gv
22from sparkle.CLI.initialise import check_for_initialise
23from sparkle.CLI.help import argparse_custom as ac
24from sparkle.CLI.help.nicknames import resolve_object_name
25from sparkle.platform.settings_objects import Settings
26from sparkle.solver import Solver
27from sparkle.instance import Instance_Set, InstanceSet
28from sparkle.types import SolverStatus, resolve_objective, UseTime
29from sparkle.structures import PerformanceDataFrame
32def parser_function() -> argparse.ArgumentParser:
33 """Define the command line arguments.
35 Returns:
36 parser: The parser with the parsed command line arguments
37 """
38 parser = argparse.ArgumentParser(
39 description="Run a portfolio of solvers on an instance set in parallel."
40 )
41 parser.add_argument(
42 *ac.InstanceSetPathsArgument.names, **ac.InstanceSetPathsArgument.kwargs
43 )
44 parser.add_argument(
45 *ac.NicknamePortfolioArgument.names, **ac.NicknamePortfolioArgument.kwargs
46 )
47 parser.add_argument(*ac.SolversArgument.names, **ac.SolversArgument.kwargs)
48 # Settings arguments
49 parser.add_argument(*ac.SettingsFileArgument.names, **ac.SettingsFileArgument.kwargs)
50 parser.add_argument(
51 *Settings.OPTION_objectives.args, **Settings.OPTION_objectives.kwargs
52 )
53 parser.add_argument(
54 *Settings.OPTION_solver_cutoff_time.args,
55 **Settings.OPTION_solver_cutoff_time.kwargs,
56 )
57 parser.add_argument(
58 *Settings.OPTION_parallel_portfolio_number_of_seeds_per_solver.args,
59 **Settings.OPTION_parallel_portfolio_number_of_seeds_per_solver.kwargs,
60 )
61 parser.add_argument(*Settings.OPTION_run_on.args, **Settings.OPTION_run_on.kwargs)
62 return parser
65def create_performance_dataframe(
66 solvers: list[Solver], instances_set: InstanceSet, portfolio_path: Path
67) -> PerformanceDataFrame:
68 """Create a PerformanceDataFrame for the given solvers and instances.
70 Args:
71 solvers: List of solvers to include in the PerformanceDataFrame.
72 instances_set: Set of instances to include in the PerformanceDataFrame.
73 portfolio_path: Path to save the CSV file.
75 Returns:
76 pdf: PerformanceDataFrame object initialized with solvers and instances.
77 """
78 instances = instances_set.instance_names
79 solvers = [str(s.directory) for s in solvers]
80 objectives = gv.settings().objectives
81 csv_path = portfolio_path / "results.csv"
82 return PerformanceDataFrame(
83 csv_filepath=csv_path,
84 solvers=solvers,
85 objectives=objectives,
86 instances=instances,
87 )
90def init_default_objectives() -> list:
91 """Initialize default objective values and key names.
93 Returns:
94 default_objective_values: Dictionary with default values for each objective.
95 cpu_time_key: Key for CPU time in the default values.
96 status_key: Key for status in the default values.
97 wall_time_key: Key for wall clock time in the default values.
98 """
99 # We record the 'best' of all seed results per solver-instance,
100 # setting start values for objectives that are always present
101 objectives = gv.settings().objectives
102 cutoff = gv.settings().solver_cutoff_time
103 cpu_time_key = [o.name for o in objectives if o.name.startswith("cpu_time")][0]
104 status_key = [o.name for o in objectives if o.name.startswith("status")][0]
105 wall_time_key = [o.name for o in objectives if o.name.startswith("wall_time")][0]
106 default_objective_values = {}
108 for o in objectives:
109 default_value = float(sys.maxsize) if o.minimise else 0
110 # Default values for time objectives can be linked to cutoff time
111 if o.time and o.post_process:
112 default_value = o.post_process(default_value, cutoff, SolverStatus.KILLED)
113 default_objective_values[o.name] = default_value
114 default_objective_values[status_key] = SolverStatus.UNKNOWN # Overwrite status
115 return default_objective_values, cpu_time_key, status_key, wall_time_key
118def monitor_jobs(
119 run: Run,
120 instances_set: InstanceSet,
121 solvers: list[Solver],
122 default_objective_values: dict,
123 run_on: Runner = Runner.SLURM,
124) -> dict:
125 """Monitor job progress and update job output dictionary.
127 Args:
128 run: The run object containing the submitted jobs.
129 instances_set: Set of instances to run on.
130 solvers: List of solvers to run on the instances.
131 default_objective_values: Default objective values for each solver-instance.
132 run_on: Unused
134 Returns:
135 job_output_dict: Dictionary containing the job output for each instance-solver
136 combination.
137 """
138 num_solvers, num_instances = len(solvers), len(instances_set._instance_paths)
139 seeds_per_solver = gv.settings().parallel_portfolio_num_seeds_per_solver
140 n_instance_jobs = num_solvers * seeds_per_solver
142 job_output_dict = {
143 instance_name: {
144 solver.name: default_objective_values.copy() for solver in solvers
145 }
146 for instance_name in instances_set._instance_names
147 }
149 check_interval = gv.settings().parallel_portfolio_check_interval
150 instances_done = [False] * num_instances
152 with tqdm(total=len(instances_done)) as pbar:
153 pbar.set_description("Instances done")
154 while not all(instances_done):
155 prev_done = sum(instances_done)
156 time.sleep(check_interval)
157 job_status_list = [r.status for r in run.jobs]
158 job_status_completed = [
159 status == Status.COMPLETED for status in job_status_list
160 ]
161 # The jobs are sorted by instance
162 for i, instance in enumerate(instances_set._instance_paths):
163 if instances_done[i]:
164 continue
165 instance_job_slice = slice(
166 i * n_instance_jobs, (i + 1) * n_instance_jobs
167 )
168 if any(job_status_completed[instance_job_slice]):
169 instances_done[i] = True
170 # Kill remaining jobs for this instance.
171 solver_kills = [0] * num_solvers
172 for job_index in range(
173 i * n_instance_jobs, (i + 1) * n_instance_jobs
174 ):
175 if not job_status_completed[job_index]:
176 run.jobs[job_index].kill()
177 solver_index = int(
178 (mod(job_index, n_instance_jobs)) // seeds_per_solver
179 )
180 solver_kills[solver_index] += 1
181 for solver_index in range(num_solvers):
182 # All seeds of a solver were killed on instance, set status kill
183 if solver_kills[solver_index] == seeds_per_solver:
184 solver_name = solvers[solver_index].name
185 job_output_dict[instance.stem][solver_name]["status"] = (
186 SolverStatus.KILLED
187 )
188 pbar.update(sum(instances_done) - prev_done)
189 return job_output_dict
192def wait_for_logs(cmd_list: list[str]) -> None:
193 """Wait for all log files to be written.
195 Args:
196 cmd_list: List of command strings for all instance-solver-seed combinations.
197 """
198 # Attempt to verify that all logs have been written (Slurm I/O latency)
199 check_interval = gv.settings().parallel_portfolio_check_interval
200 for cmd in cmd_list:
201 runsolver_configuration = cmd.split(" ")[:11]
202 logs = [
203 Path(p)
204 for p in runsolver_configuration
205 if Path(p).suffix in [".log", ".val", ".rawres"]
206 ]
207 if not all(p.exists() for p in logs):
208 time.sleep(check_interval)
211def update_results_from_logs(
212 cmd_list: list[str],
213 run: Run,
214 solvers: list[Solver],
215 job_output_dict: dict,
216 cpu_time_key: str,
217) -> dict:
218 """Parse logs to update job output dictionary with best objective values.
220 Args:
221 cmd_list: List of command strings for all instance-solver-seed combinations.
222 run: The run object containing the submitted jobs.
223 solvers: List of solvers to run on the instances.
224 job_output_dict: Dictionary containing the job output for each intsance-solver
225 combination.
226 cpu_time_key: Key for CPU time in the job output dictionary.
228 Returns:
229 job_output_dict: Updated job output dictionary with best objective values.
230 """
231 seeds_per_solver = gv.settings().parallel_portfolio_num_seeds_per_solver
232 num_solvers = len(solvers)
233 n_instance_jobs = num_solvers * seeds_per_solver
234 objectives = gv.settings().objectives
236 for index, cmd in enumerate(cmd_list):
237 solver_index = (mod(index, n_instance_jobs)) // seeds_per_solver
238 solver_obj = solvers[solver_index]
239 solver_output = Solver.parse_solver_output(
240 run.jobs[index].stdout,
241 cmd.split(" "),
242 objectives=objectives,
243 verifier=solver_obj.verifier,
244 )
245 instance_name = list(job_output_dict.keys())[index // n_instance_jobs]
246 cpu_time = solver_output[cpu_time_key]
247 cmd_output = job_output_dict[instance_name][solver_obj.name]
248 if cpu_time > 0.0 and cpu_time < cmd_output[cpu_time_key]:
249 for key, value in solver_output.items():
250 if key in [o.name for o in objectives]:
251 job_output_dict[instance_name][solver_obj.name][key] = value
252 if cmd_output.get("status") != SolverStatus.KILLED:
253 cmd_output["status"] = solver_output.get("status")
254 return job_output_dict
257def fix_missing_times(
258 job_output_dict: dict, status_key: str, cpu_time_key: str, wall_time_key: str
259) -> dict:
260 """Fix CPU and wall clock times for solvers that did not produce logs.
262 Args:
263 job_output_dict: Dictionary containing the job output for each instance-solver
264 combination.
265 status_key: Key for status in the job output dictionary.
266 cpu_time_key: Key for CPU time in the job output dictionary.
267 wall_time_key: Key for wall clock time in the job output dictionary.
269 Returns:
270 job_output_dict: Updated job output dictionary with fixed CPU and wall clock
271 times.
272 """
273 cutoff = gv.settings().solver_cutoff_time
274 check_interval = gv.settings().parallel_portfolio_check_interval
276 # Fix the CPU/WC time for non existent logs to instance min time + check_interval
277 for instance in job_output_dict.keys():
278 no_log_solvers = []
279 min_time = cutoff
280 for solver in job_output_dict[instance].keys():
281 cpu_time = job_output_dict[instance][solver][cpu_time_key]
282 if cpu_time == -1.0 or cpu_time == float(sys.maxsize):
283 no_log_solvers.append(solver)
284 elif cpu_time < min_time:
285 min_time = cpu_time
286 for solver in no_log_solvers:
287 job_output_dict[instance][solver][cpu_time_key] = min_time + check_interval
288 job_output_dict[instance][solver][wall_time_key] = min_time + check_interval
289 # Fix runtime objectives with resolved CPU/Wall times
290 for key, value in job_output_dict[instance][solver].items():
291 objective = resolve_objective(key)
292 if objective is not None and objective.time:
293 value = (
294 job_output_dict[instance][solver][cpu_time_key]
295 if objective.use_time == UseTime.CPU_TIME
296 else job_output_dict[instance][solver][wall_time_key]
297 )
298 if objective.post_process is not None:
299 status = job_output_dict[instance][solver][status_key]
300 value = objective.post_process(value, cutoff, status)
301 job_output_dict[instance][solver][key] = value
302 return job_output_dict
305def print_and_write_results(
306 job_output_dict: dict,
307 solvers: list[Solver],
308 instances_set: InstanceSet,
309 portfolio_path: Path,
310 status_key: str,
311 cpu_time_key: str,
312 wall_time_key: str,
313 pdf: PerformanceDataFrame,
314) -> None:
315 """Print results to console and write the CSV file."""
316 num_instances = len(job_output_dict)
317 num_solvers = len(solvers)
318 objectives = gv.settings().objectives
319 for index, instance_name in enumerate(job_output_dict.keys()):
320 index_str = f"[{index + 1}/{num_instances}] "
321 instance_output = job_output_dict[instance_name]
322 if all(
323 instance_output[k][status_key] == SolverStatus.TIMEOUT
324 for k in instance_output
325 ):
326 print(f"\n{index_str}{instance_name} was not solved within the cutoff-time.")
327 continue
328 print(f"\n{index_str}{instance_name} yielded the following Solver results:")
329 for sindex in range(index * num_solvers, (index + 1) * num_solvers):
330 solver_name = solvers[mod(sindex, num_solvers)].name
331 job_info = job_output_dict[instance_name][solver_name]
332 print(
333 f"\t- {solver_name} ended with status {job_info[status_key]} in "
334 f"{job_info[cpu_time_key]}s CPU-Time ({job_info[wall_time_key]}s "
335 "Wall clock time)"
336 )
338 instance_map = {Path(p).name: p for p in pdf.instances}
339 solver_map = {Path(s).name: s for s in pdf.solvers}
340 for instance, instance_dict in job_output_dict.items():
341 instance_name = Path(instance).name
342 instance_full_path = instance_map.get(instance_name, instance)
343 for solver, objective_dict in instance_dict.items():
344 solver_name = Path(solver).name
345 solver_full_path = solver_map.get(solver_name, solver)
346 for objective in objectives:
347 obj_name = objective.name
348 obj_val = objective_dict.get(
349 obj_name, PerformanceDataFrame.missing_value
350 )
351 pdf.set_value(
352 value=obj_val,
353 solver=solver_full_path,
354 instance=instance_full_path,
355 objective=obj_name,
356 )
357 pdf.save_csv()
360def build_command_list(
361 instances_set: InstanceSet,
362 solvers: list[Solver],
363 portfolio_path: Path,
364 performance_data: PerformanceDataFrame,
365) -> list[str]:
366 """Build the list of command strings for all instance-solver-seed combinations.
368 Args:
369 instances_set: Set of instances to run on.
370 solvers: List of solvers to run on the instances.
371 portfolio_path: Path to the parallel portfolio.
372 performance_data: PerformanceDataFrame object.
374 Returns:
375 cmd_list: List of command strings for all instance-solver-seed combinations.
376 """
377 cutoff = gv.settings().solver_cutoff_time
378 objectives = gv.settings().objectives
379 seeds_per_solver = gv.settings().parallel_portfolio_num_seeds_per_solver
380 cmd_list = []
382 # Create a command for each instance-solver-seed combination
383 for instance, solver in itertools.product(instances_set._instance_paths, solvers):
384 for _ in range(seeds_per_solver):
385 seed = int(random.getrandbits(32))
386 solver_call_list = solver.build_cmd(
387 instance.absolute(),
388 objectives=objectives,
389 seed=seed,
390 cutoff_time=cutoff,
391 log_dir=portfolio_path,
392 )
394 cmd_list.append(" ".join(solver_call_list))
395 for objective in objectives:
396 performance_data.set_value(
397 value=seed,
398 solver=str(solver.directory),
399 instance=instance.stem,
400 objective=objective.name,
401 solver_fields=["Seed"],
402 )
403 return cmd_list
406def submit_jobs(
407 cmd_list: list[str],
408 solvers: list[Solver],
409 instances_set: InstanceSet,
410 run_on: Runner = Runner.SLURM,
411) -> SlurmRun:
412 """Submit jobs to the runner and return the run object.
414 Args:
415 cmd_list: List of command strings for all instance-solver-seed combinations.
416 solvers: List of solvers to run on the instances.
417 instances_set: Set of instances to run on.
418 run_on: Runner to use for submitting the jobs.
420 Returns:
421 run: The run object containing the submitted jobs.
422 """
423 seeds_per_solver = gv.settings().parallel_portfolio_num_seeds_per_solver
424 num_solvers, num_instances = len(solvers), len(instances_set._instance_paths)
425 num_jobs = num_solvers * num_instances * seeds_per_solver
426 parallel_jobs = min(gv.settings().slurm_jobs_in_parallel, num_jobs)
427 if parallel_jobs > num_jobs:
428 print(
429 "WARNING: Not all jobs will be started at the same time due to the "
430 "limitation of number of Slurm jobs that can be run in parallel. Check"
431 " your Sparkle Slurm Settings."
432 )
433 print(
434 f"Sparkle parallel portfolio is running {seeds_per_solver} seed(s) per solver "
435 f"on {num_solvers} solvers for {num_instances} instances ..."
436 )
438 sbatch_options = gv.settings().sbatch_settings
439 solver_names = ", ".join([s.name for s in solvers])
440 # Jobs are added in to the runrunner object in the same order they are provided
441 return rrr.add_to_queue(
442 runner=run_on,
443 cmd=cmd_list,
444 name=f"Parallel Portfolio: {solver_names}",
445 parallel_jobs=parallel_jobs,
446 base_dir=sl.caller_log_dir,
447 srun_options=["-N1", "-n1"] + sbatch_options,
448 sbatch_options=sbatch_options,
449 prepend=gv.settings().slurm_job_prepend,
450 )
453def main(argv: list[str]) -> None:
454 """Main method of run parallel portfolio command."""
455 # Define command line arguments
456 parser = parser_function()
458 # Process command line arguments
459 args = parser.parse_args(argv)
460 settings = gv.settings(args)
462 # Log command call
463 sl.log_command(sys.argv, settings.random_state)
464 check_for_initialise()
466 # Compare current settings to latest.ini
467 prev_settings = Settings(Settings.DEFAULT_previous_settings_path)
468 Settings.check_settings_changes(settings, prev_settings)
470 if args.solvers is not None:
471 solver_paths = [
472 resolve_object_name("".join(s), target_dir=settings.DEFAULT_solver_dir)
473 for s in args.solvers
474 ]
475 if None in solver_paths:
476 print("Some solvers not recognised! Check solver names:")
477 for i, name in enumerate(solver_paths):
478 if solver_paths[i] is None:
479 print(f'\t- "{solver_paths[i]}" ')
480 sys.exit(-1)
481 solvers = [Solver(p) for p in solver_paths]
482 else:
483 solvers = [
484 Solver(p) for p in settings.DEFAULT_solver_dir.iterdir() if p.is_dir()
485 ]
487 portfolio_path = args.portfolio_name
489 run_on = settings.run_on
490 if run_on == Runner.LOCAL:
491 print("Parallel Portfolio is not fully supported yet for Local runs. Exiting.")
492 sys.exit(-1)
494 # Retrieve instance sets
495 instances = [
496 resolve_object_name(
497 instance_path,
498 gv.file_storage_data_mapping[gv.instances_nickname_path],
499 gv.settings().DEFAULT_instance_dir,
500 Instance_Set,
501 )
502 for instance_path in args.instance_path
503 ]
504 # Join them into one
505 if len(instances) > 1:
506 print(
507 "WARNING: More than one instance set specified. "
508 "Currently only supporting one."
509 )
510 instances = instances[0]
512 print(f"Running on {instances.size} instance(s)...")
514 if not settings.objectives[0].time:
515 print(
516 "ERROR: Parallel Portfolio is currently only relevant for "
517 "RunTime objectives. In all other cases, use validation"
518 )
519 sys.exit(-1)
521 if args.portfolio_name is not None: # Use a nickname
522 portfolio_path = settings.DEFAULT_parallel_portfolio_output / args.portfolio_name
523 else: # Generate a timestamped nickname
524 timestamp = time.strftime("%Y-%m-%d-%H:%M:%S", time.gmtime(time.time()))
525 randintstamp = int(random.getrandbits(32))
526 portfolio_path = (
527 settings.DEFAULT_parallel_portfolio_output / f"{timestamp}_{randintstamp}"
528 )
529 if portfolio_path.exists():
530 print(
531 f"[WARNING] Portfolio path {portfolio_path} already exists! "
532 "Overwrite? [y/n] ",
533 end="",
534 )
535 user_input = input()
536 if user_input != "y":
537 sys.exit()
538 shutil.rmtree(portfolio_path)
540 portfolio_path.mkdir(parents=True)
541 pdf = create_performance_dataframe(solvers, instances, portfolio_path)
542 returned_cmd = build_command_list(instances, solvers, portfolio_path, pdf)
543 default_objective_values, cpu_time_key, status_key, wall_time_key = (
544 init_default_objectives()
545 )
546 returned_run = submit_jobs(returned_cmd, solvers, instances, Runner.SLURM)
547 job_output_dict = monitor_jobs(
548 returned_run, instances, solvers, default_objective_values
549 )
550 wait_for_logs(returned_cmd)
551 job_output_dict = update_results_from_logs(
552 returned_cmd, returned_run, solvers, job_output_dict, cpu_time_key
553 )
554 job_output_dict = fix_missing_times(
555 job_output_dict, status_key, cpu_time_key, wall_time_key
556 )
557 print_and_write_results(
558 job_output_dict,
559 solvers,
560 instances,
561 portfolio_path,
562 status_key,
563 cpu_time_key,
564 wall_time_key,
565 pdf,
566 )
568 # Write used settings to file
569 settings.write_used_settings()
570 print("Running Sparkle parallel portfolio is done!")
571 sys.exit(0)
574if __name__ == "__main__":
575 main(sys.argv[1:])