Coverage for sparkle/CLI/run_parallel_portfolio.py: 93%
253 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-07-01 13:21 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-07-01 13:21 +0000
1#!/usr/bin/env python3
2# -*- coding: UTF-8 -*-
3"""Sparkle command to execute a parallel algorithm portfolio."""
5import sys
6import argparse
7import random
8import time
9import shutil
10import itertools
11from operator import mod
12from pathlib import Path, PurePath
14from tqdm import tqdm
16import runrunner as rrr
17from runrunner.base import Runner, Run
18from runrunner.slurm import Status, SlurmRun
20from sparkle.CLI.help import logging as sl
21from sparkle.CLI.help import global_variables as gv
22from sparkle.CLI.initialise import check_for_initialise
23from sparkle.CLI.help import argparse_custom as ac
24from sparkle.CLI.help.nicknames import resolve_object_name
25from sparkle.platform.settings_objects import Settings, SettingState
26from sparkle.solver import Solver
27from sparkle.instance import Instance_Set, InstanceSet
28from sparkle.types import SolverStatus, resolve_objective, UseTime
29from sparkle.structures import PerformanceDataFrame
32def parser_function() -> argparse.ArgumentParser:
33 """Define the command line arguments.
35 Returns:
36 parser: The parser with the parsed command line arguments
37 """
38 parser = argparse.ArgumentParser(description="Run a portfolio of solvers on an "
39 "instance set in parallel.")
40 parser.add_argument(*ac.InstanceSetPathsArgument.names,
41 **ac.InstanceSetPathsArgument.kwargs)
42 parser.add_argument(*ac.NicknamePortfolioArgument.names,
43 **ac.NicknamePortfolioArgument.kwargs)
44 parser.add_argument(*ac.SolversArgument.names,
45 **ac.SolversArgument.kwargs)
46 parser.add_argument(*ac.ObjectivesArgument.names,
47 **ac.ObjectivesArgument.kwargs)
48 parser.add_argument(*ac.CutOffTimeArgument.names,
49 **ac.CutOffTimeArgument.kwargs)
50 parser.add_argument(*ac.SolverSeedsArgument.names,
51 **ac.SolverSeedsArgument.kwargs)
52 parser.add_argument(*ac.RunOnArgument.names,
53 **ac.RunOnArgument.kwargs)
54 parser.add_argument(*ac.SettingsFileArgument.names,
55 **ac.SettingsFileArgument.kwargs)
56 return parser
59def create_performance_dataframe(solvers: list[Solver],
60 instances_set: InstanceSet,
61 portfolio_path: Path) -> PerformanceDataFrame:
62 """Create a PerformanceDataFrame for the given solvers and instances.
64 Args:
65 solvers: List of solvers to include in the PerformanceDataFrame.
66 instances_set: Set of instances to include in the PerformanceDataFrame.
67 csv_path: Path to save the CSV file.
69 Returns:
70 pdf: PerformanceDataFrame object initialized with solvers and instances.
71 """
72 instances = instances_set.instance_names
73 solvers = [str(s.directory) for s in solvers]
74 objectives = gv.settings().get_general_sparkle_objectives()
75 csv_path = portfolio_path / "results.csv"
76 return PerformanceDataFrame(csv_filepath=csv_path,
77 solvers=solvers,
78 objectives=objectives,
79 instances=instances
80 )
83def build_command_list(instances_set: InstanceSet,
84 solvers: list[Solver],
85 portfolio_path: Path,
86 pdf: PerformanceDataFrame) -> list[str]:
87 """Build the list of command strings for all instance-solver-seed combinations.
89 Args:
90 instances_set: Set of instances to run on.
91 solvers: List of solvers to run on the instances.
92 portfolio_path: Path to the parallel portfolio.
94 Returns:
95 cmd_list: List of command strings for all instance-solver-seed combinations.
96 """
97 cutoff = gv.settings().get_general_solver_cutoff_time()
98 objectives = gv.settings().get_general_sparkle_objectives()
99 seeds_per_solver = gv.settings().get_parallel_portfolio_number_of_seeds_per_solver()
100 cmd_list = []
102 # Create a command for each instance-solver-seed combination
103 for instance, solver in itertools.product(instances_set._instance_paths, solvers):
104 for _ in range(seeds_per_solver):
105 seed = int(random.getrandbits(32))
106 solver_call_list = solver.build_cmd(
107 instance.absolute(),
108 objectives=objectives,
109 seed=seed,
110 cutoff_time=cutoff,
111 log_dir=portfolio_path
112 )
114 cmd_list.append(" ".join(solver_call_list))
115 for objective in objectives:
116 pdf.set_value(
117 value=seed,
118 solver=str(solver.directory),
119 instance=instance.stem,
120 objective=objective.name,
121 solver_fields=["Seed"]
122 )
123 return cmd_list
126def init_default_objectives() -> list:
127 """Initialize default objective values and key names.
129 Returns:
130 default_objective_values: Dictionary with default values for each objective.
131 cpu_time_key: Key for CPU time in the default values.
132 status_key: Key for status in the default values.
133 wall_time_key: Key for wall clock time in the default values.
134 """
135 # We record the 'best' of all seed results per solver-instance,
136 # setting start values for objectives that are always present
137 objectives = gv.settings().get_general_sparkle_objectives()
138 cutoff = gv.settings().get_general_solver_cutoff_time()
139 cpu_time_key = [o.name for o in objectives if o.name.startswith("cpu_time")][0]
140 status_key = [o.name for o in objectives if o.name.startswith("status")][0]
141 wall_time_key = [o.name for o in objectives if o.name.startswith("wall_time")][0]
142 default_objective_values = {}
144 for o in objectives:
145 default_value = float(sys.maxsize) if o.minimise else 0
146 # Default values for time objectives can be linked to cutoff time
147 if o.time and o.post_process:
148 default_value = o.post_process(default_value, cutoff, SolverStatus.KILLED)
149 default_objective_values[o.name] = default_value
150 default_objective_values[status_key] = SolverStatus.UNKNOWN # Overwrite status
151 return default_objective_values, cpu_time_key, status_key, wall_time_key
154def submit_jobs(cmd_list: list[str],
155 solvers: list[Solver],
156 instances_set: InstanceSet,
157 run_on: Runner = Runner.SLURM) -> SlurmRun:
158 """Submit jobs to the runner and return the run object.
160 Args:
161 cmd_list: List of command strings for all instance-solver-seed combinations.
162 solvers: List of solvers to run on the instances.
163 instances_set: Set of instances to run on.
164 run_on: Runner to use for submitting the jobs.
166 Returns:
167 run: The run object containing the submitted jobs.
168 """
169 seeds_per_solver = gv.settings().get_parallel_portfolio_number_of_seeds_per_solver()
170 num_solvers, num_instances = len(solvers), len(instances_set._instance_paths)
171 num_jobs = num_solvers * num_instances * seeds_per_solver
172 parallel_jobs = min(gv.settings().get_number_of_jobs_in_parallel(), num_jobs)
173 if parallel_jobs > num_jobs:
174 print("WARNING: Not all jobs will be started at the same time due to the "
175 "limitation of number of Slurm jobs that can be run in parallel. Check"
176 " your Sparkle Slurm Settings.")
177 print(f"Sparkle parallel portfolio is running {seeds_per_solver} seed(s) per solver "
178 f"on {num_solvers} solvers for {num_instances} instances ...")
180 sbatch_options = gv.settings().get_slurm_extra_options(as_args=True)
181 solver_names = ", ".join([s.name for s in solvers])
182 # Jobs are added in to the runrunner object in the same order they are provided
183 return rrr.add_to_queue(
184 runner=run_on,
185 cmd=cmd_list,
186 name=f"Parallel Portfolio: {solver_names}",
187 parallel_jobs=parallel_jobs,
188 base_dir=sl.caller_log_dir,
189 srun_options=["-N1", "-n1"] + sbatch_options,
190 sbatch_options=sbatch_options,
191 prepend=gv.settings().get_slurm_job_prepend(),
192 )
195def monitor_jobs(run: Run,
196 instances_set: InstanceSet,
197 solvers: list[Solver],
198 default_objective_values: dict,
199 run_on: Runner = Runner.SLURM) -> dict:
200 """Monitor job progress and update job output dictionary.
202 Args:
203 run: The run object containing the submitted jobs.
204 instances_set: Set of instances to run on.
205 solvers: List of solvers to run on the instances.
206 default_objective_values: Default objective values for each solver-instance.
208 Returns:
209 job_output_dict: Dictionary containing the job output for each instance-solver
210 combination.
211 """
212 num_solvers, num_instances = len(solvers), len(instances_set._instance_paths)
213 seeds_per_solver = gv.settings().get_parallel_portfolio_number_of_seeds_per_solver()
214 n_instance_jobs = num_solvers * seeds_per_solver
216 job_output_dict = {
217 instance_name: {solver.name:
218 default_objective_values.copy() for solver in solvers}
219 for instance_name in instances_set._instance_names
220 }
222 check_interval = gv.settings().get_parallel_portfolio_check_interval()
223 instances_done = [False] * num_instances
225 with tqdm(total=len(instances_done)) as pbar:
226 pbar.set_description("Instances done")
227 while not all(instances_done):
228 prev_done = sum(instances_done)
229 time.sleep(check_interval)
230 job_status_list = [r.status for r in run.jobs]
231 job_status_completed = [status == Status.COMPLETED
232 for status in job_status_list]
233 # The jobs are sorted by instance
234 for i, instance in enumerate(instances_set._instance_paths):
235 if instances_done[i]:
236 continue
237 instance_job_slice = slice(i * n_instance_jobs,
238 (i + 1) * n_instance_jobs)
239 if any(job_status_completed[instance_job_slice]):
240 instances_done[i] = True
241 # Kill remaining jobs for this instance.
242 solver_kills = [0] * num_solvers
243 for job_index in range(i * n_instance_jobs,
244 (i + 1) * n_instance_jobs):
245 if not job_status_completed[job_index]:
246 run.jobs[job_index].kill()
247 solver_index = int(
248 (mod(job_index, n_instance_jobs))
249 // seeds_per_solver)
250 solver_kills[solver_index] += 1
251 for solver_index in range(num_solvers):
252 # All seeds of a solver were killed on instance, set status kill
253 if solver_kills[solver_index] == seeds_per_solver:
254 solver_name = solvers[solver_index].name
255 job_output_dict[instance.stem][solver_name]["status"] =\
256 SolverStatus.KILLED
257 pbar.update(sum(instances_done) - prev_done)
258 return job_output_dict
261def wait_for_logs(cmd_list: list[str]) -> None:
262 """Wait for all log files to be written.
264 Args:
265 cmd_list: List of command strings for all instance-solver-seed combinations.
266 """
267 # Attempt to verify that all logs have been written (Slurm I/O latency)
268 check_interval = gv.settings().get_parallel_portfolio_check_interval()
269 for cmd in cmd_list:
270 runsolver_configuration = cmd.split(" ")[:11]
271 logs = [Path(p) for p in runsolver_configuration
272 if Path(p).suffix in [".log", ".val", ".rawres"]]
273 if not all(p.exists() for p in logs):
274 time.sleep(check_interval)
277def update_results_from_logs(cmd_list: list[str], run: Run, solvers: list[Solver],
278 job_output_dict: dict,
279 cpu_time_key: str) -> dict:
280 """Parse logs to update job output dictionary with best objective values.
282 Args:
283 cmd_list: List of command strings for all instance-solver-seed combinations.
284 run: The run object containing the submitted jobs.
285 solvers: List of solvers to run on the instances.
286 job_output_dict: Dictionary containing the job output for each intsance-solver
287 combination.
288 cpu_time_key: Key for CPU time in the job output dictionary.
290 Returns:
291 job_output_dict: Updated job output dictionary with best objective values.
292 """
293 seeds_per_solver = gv.settings().get_parallel_portfolio_number_of_seeds_per_solver()
294 num_solvers = len(solvers)
295 n_instance_jobs = num_solvers * seeds_per_solver
296 objectives = gv.settings().get_general_sparkle_objectives()
298 for index, cmd in enumerate(cmd_list):
299 solver_index = (mod(index, n_instance_jobs)) // seeds_per_solver
300 solver_obj = solvers[solver_index]
301 solver_output = Solver.parse_solver_output(
302 run.jobs[index].stdout,
303 cmd.split(" "),
304 objectives=objectives,
305 verifier=solver_obj.verifier
306 )
307 instance_name = list(job_output_dict.keys())[index // n_instance_jobs]
308 cpu_time = solver_output[cpu_time_key]
309 cmd_output = job_output_dict[instance_name][solver_obj.name]
310 if cpu_time > 0.0 and cpu_time < cmd_output[cpu_time_key]:
311 for key, value in solver_output.items():
312 if key in [o.name for o in objectives]:
313 job_output_dict[instance_name][solver_obj.name][key] = value
314 if cmd_output.get("status") != SolverStatus.KILLED:
315 cmd_output["status"] = solver_output.get("status")
316 return job_output_dict
319def fix_missing_times(job_output_dict: dict,
320 status_key: str,
321 cpu_time_key: str,
322 wall_time_key: str) -> dict:
323 """Fix CPU and wall clock times for solvers that did not produce logs.
325 Args:
326 job_output_dict: Dictionary containing the job output for each instance-solver
327 combination.
328 status_key: Key for status in the job output dictionary.
329 cpu_time_key: Key for CPU time in the job output dictionary.
330 wall_time_key: Key for wall clock time in the job output dictionary.
332 Returns:
333 job_output_dict: Updated job output dictionary with fixed CPU and wall clock
334 times.
335 """
336 cutoff = gv.settings().get_general_solver_cutoff_time()
337 check_interval = gv.settings().get_parallel_portfolio_check_interval()
339 # Fix the CPU/WC time for non existent logs to instance min time + check_interval
340 for instance in job_output_dict.keys():
341 no_log_solvers = []
342 min_time = cutoff
343 for solver in job_output_dict[instance].keys():
344 cpu_time = job_output_dict[instance][solver][cpu_time_key]
345 if cpu_time == -1.0 or cpu_time == float(sys.maxsize):
346 no_log_solvers.append(solver)
347 elif cpu_time < min_time:
348 min_time = cpu_time
349 for solver in no_log_solvers:
350 job_output_dict[instance][solver][cpu_time_key] = min_time + check_interval
351 job_output_dict[instance][solver][wall_time_key] = min_time + check_interval
352 # Fix runtime objectives with resolved CPU/Wall times
353 for key, value in job_output_dict[instance][solver].items():
354 objective = resolve_objective(key)
355 if objective is not None and objective.time:
356 value = (job_output_dict[instance][solver][cpu_time_key]
357 if objective.use_time == UseTime.CPU_TIME
358 else job_output_dict[instance][solver][wall_time_key])
359 if objective.post_process is not None:
360 status = job_output_dict[instance][solver][status_key]
361 value = objective.post_process(value, cutoff, status)
362 job_output_dict[instance][solver][key] = value
363 return job_output_dict
366def print_and_write_results(job_output_dict: dict,
367 solvers: list[Solver],
368 instances_set: InstanceSet,
369 portfolio_path: Path,
370 status_key: str,
371 cpu_time_key: str,
372 wall_time_key: str,
373 pdf: PerformanceDataFrame) -> None:
374 """Print results to console and write the CSV file."""
375 num_instances = len(job_output_dict)
376 num_solvers = len(solvers)
377 objectives = gv.settings().get_general_sparkle_objectives()
378 for index, instance_name in enumerate(job_output_dict.keys()):
379 index_str = f"[{index + 1}/{num_instances}] "
380 instance_output = job_output_dict[instance_name]
381 if all(instance_output[k][status_key] == SolverStatus.TIMEOUT
382 for k in instance_output):
383 print(f"\n{index_str}{instance_name} was not solved within the cutoff-time.")
384 continue
385 print(f"\n{index_str}{instance_name} yielded the following Solver results:")
386 for sindex in range(index * num_solvers, (index + 1) * num_solvers):
387 solver_name = solvers[mod(sindex, num_solvers)].name
388 job_info = job_output_dict[instance_name][solver_name]
389 print(f"\t- {solver_name} ended with status {job_info[status_key]} in "
390 f"{job_info[cpu_time_key]}s CPU-Time ({job_info[wall_time_key]}s "
391 "Wall clock time)")
393 instance_map = {Path(p).name: p for p in pdf.instances}
394 solver_map = {Path(s).name: s for s in pdf.solvers}
395 for instance, instance_dict in job_output_dict.items():
396 instance_name = Path(instance).name
397 instance_full_path = instance_map.get(instance_name, instance)
398 for solver, objective_dict in instance_dict.items():
399 solver_name = Path(solver).name
400 solver_full_path = solver_map.get(solver_name, solver)
401 for objective in objectives:
402 obj_name = objective.name
403 obj_val = objective_dict.get(
404 obj_name,
405 PerformanceDataFrame.missing_value
406 )
407 pdf.set_value(
408 value=obj_val,
409 solver=solver_full_path,
410 instance=instance_full_path,
411 objective=obj_name
412 )
413 pdf.save_csv()
416def main(argv: list[str]) -> None:
417 """Main method of run parallel portfolio command."""
418 # Log command call
419 sl.log_command(sys.argv)
420 check_for_initialise()
422 # Define command line arguments
423 parser = parser_function()
425 # Process command line arguments
426 args = parser.parse_args(argv)
427 if args.solvers is not None:
428 solver_paths = [resolve_object_name("".join(s),
429 target_dir=gv.settings().DEFAULT_solver_dir)
430 for s in args.solvers]
431 if None in solver_paths:
432 print("Some solvers not recognised! Check solver names:")
433 for i, name in enumerate(solver_paths):
434 if solver_paths[i] is None:
435 print(f'\t- "{solver_paths[i]}" ')
436 sys.exit(-1)
437 solvers = [Solver(p) for p in solver_paths]
438 else:
439 solvers = [Solver(p) for p in
440 gv.settings().DEFAULT_solver_dir.iterdir() if p.is_dir()]
442 # Compare current settings to latest.ini
443 prev_settings = Settings(PurePath("Settings/latest.ini"))
444 Settings.check_settings_changes(gv.settings(), prev_settings)
446 # Do first, so other command line options can override settings from the file
447 if args.settings_file is not None:
448 gv.settings().read_settings_ini(args.settings_file, SettingState.CMD_LINE)
450 portfolio_path = args.portfolio_name
452 if args.run_on is not None:
453 gv.settings().set_run_on(
454 args.run_on.value, SettingState.CMD_LINE)
455 run_on = gv.settings().get_run_on()
457 if args.solver_seeds is not None:
458 gv.settings().set_parallel_portfolio_number_of_seeds_per_solver(
459 args.solver_seeds, SettingState.CMD_LINE)
461 if run_on == Runner.LOCAL:
462 print("Parallel Portfolio is not fully supported yet for Local runs. Exiting.")
463 sys.exit(-1)
465 # Retrieve instance sets
466 instances = [resolve_object_name(instance_path,
467 gv.file_storage_data_mapping[gv.instances_nickname_path],
468 gv.settings().DEFAULT_instance_dir, Instance_Set)
469 for instance_path in args.instance_path]
470 # Join them into one
471 if len(instances) > 1:
472 print("WARNING: More than one instance set specified. "
473 "Currently only supporting one.")
474 instances = instances[0]
476 print(f"Running on {instances.size} instance(s)...")
478 if args.cutoff_time is not None:
479 gv.settings().set_general_solver_cutoff_time(args.cutoff_time,
480 SettingState.CMD_LINE)
482 if args.objectives is not None:
483 gv.settings().set_general_sparkle_objectives(
484 args.objectives, SettingState.CMD_LINE)
485 if not gv.settings().get_general_sparkle_objectives()[0].time:
486 print("ERROR: Parallel Portfolio is currently only relevant for "
487 "RunTime objectives. In all other cases, use validation")
488 sys.exit(-1)
490 if args.portfolio_name is not None: # Use a nickname
491 portfolio_path = gv.settings().DEFAULT_parallel_portfolio_output /\
492 args.portfolio_name
493 else: # Generate a timestamped nickname
494 timestamp = time.strftime("%Y-%m-%d-%H:%M:%S", time.gmtime(time.time()))
495 randintstamp = int(random.getrandbits(32))
496 portfolio_path = gv.settings().DEFAULT_parallel_portfolio_output /\
497 f"{timestamp}_{randintstamp}"
498 if portfolio_path.exists():
499 print(f"[WARNING] Portfolio path {portfolio_path} already exists! "
500 "Overwrite? [y/n] ", end="")
501 user_input = input()
502 if user_input != "y":
503 sys.exit()
504 shutil.rmtree(portfolio_path)
506 portfolio_path.mkdir(parents=True)
507 pdf = create_performance_dataframe(solvers, instances, portfolio_path)
508 returned_cmd = build_command_list(instances, solvers, portfolio_path, pdf)
509 default_objective_values, cpu_time_key, \
510 status_key, wall_time_key = init_default_objectives()
511 returned_run = submit_jobs(returned_cmd, solvers, instances, Runner.SLURM)
512 job_output_dict = monitor_jobs(returned_run, instances,
513 solvers, default_objective_values)
514 wait_for_logs(returned_cmd)
515 job_output_dict = update_results_from_logs(returned_cmd, returned_run,
516 solvers, job_output_dict, cpu_time_key)
517 job_output_dict = fix_missing_times(job_output_dict,
518 status_key, cpu_time_key, wall_time_key)
519 print_and_write_results(job_output_dict, solvers, instances,
520 portfolio_path, status_key,
521 cpu_time_key, wall_time_key, pdf
522 )
524 # Write used settings to file
525 gv.settings().write_used_settings()
526 print("Running Sparkle parallel portfolio is done!")
527 sys.exit(0)
530if __name__ == "__main__":
531 main(sys.argv[1:])