Coverage for sparkle/CLI/run_parallel_portfolio.py: 0%
207 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-09-27 09:10 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-09-27 09:10 +0000
1#!/usr/bin/env python3
2# -*- coding: UTF-8 -*-
3"""Sparkle command to execute a parallel algorithm portfolio."""
5import sys
6import argparse
7import random
8import time
9import shutil
10import csv
11import itertools
12from pathlib import Path, PurePath
14from tqdm import tqdm
16import runrunner as rrr
17from runrunner.base import Runner
18from runrunner.slurm import Status
20from sparkle.CLI.help.reporting_scenario import Scenario
21from sparkle.CLI.help import logging as sl
22from sparkle.CLI.help import global_variables as gv
23from sparkle.platform import CommandName, COMMAND_DEPENDENCIES
24from sparkle.CLI.initialise import check_for_initialise
25from sparkle.CLI.help import argparse_custom as ac
26from sparkle.CLI.help.nicknames import resolve_object_name
27from sparkle.platform.settings_objects import Settings, SettingState
28from sparkle.solver import Solver
29from sparkle.instance import instance_set, InstanceSet
30from sparkle.types import SolverStatus, resolve_objective, UseTime
33def run_parallel_portfolio(instances_set: InstanceSet,
34 portfolio_path: Path,
35 solvers: list[Solver],
36 run_on: Runner = Runner.SLURM) -> None:
37 """Run the parallel algorithm portfolio.
39 Args:
40 instances_set: Set of instances to run on.
41 portfolio_path: Path to the parallel portfolio.
42 solvers: List of solvers to run on the instances.
43 run_on: Currently only supports Slurm.
44 """
45 num_solvers, num_instances = len(solvers), len(instances_set._instance_paths)
46 seeds_per_solver = gv.settings().get_parallel_portfolio_number_of_seeds_per_solver()
47 num_jobs = num_solvers * num_instances * seeds_per_solver
48 parallel_jobs = min(gv.settings().get_number_of_jobs_in_parallel(), num_jobs)
49 if parallel_jobs > num_jobs:
50 print("WARNING: Not all jobs will be started at the same time due to the "
51 "limitation of number of Slurm jobs that can be run in parallel. Check"
52 " your Sparkle Slurm Settings.")
53 print(f"Sparkle parallel portfolio is running {seeds_per_solver} seed(s) per solver "
54 f"on {num_solvers} solvers for {num_instances} instances ...")
55 cmd_list = []
56 cutoff = gv.settings().get_general_target_cutoff_time()
57 objectives = gv.settings().get_general_sparkle_objectives()
58 # Create a command for each instance-solver-seed combination
59 for instance, solver in itertools.product(instances_set._instance_paths, solvers):
60 for _ in range(seeds_per_solver):
61 seed = int(random.getrandbits(32))
62 solver_call_list = solver.build_cmd(
63 instance.absolute(),
64 objectives=objectives,
65 seed=seed,
66 cutoff_time=cutoff)
67 cmd_list.append((" ".join(solver_call_list)).replace("'", '"'))
68 # Jobs are added in to the runrunner object in the same order they are provided
69 sbatch_options = gv.settings().get_slurm_extra_options(as_args=True)
70 run = rrr.add_to_queue(
71 runner=run_on,
72 cmd=cmd_list,
73 name=CommandName.RUN_PARALLEL_PORTFOLIO,
74 parallel_jobs=parallel_jobs,
75 path=portfolio_path,
76 base_dir=sl.caller_log_dir,
77 srun_options=["-N1", "-n1"] + sbatch_options,
78 sbatch_options=sbatch_options
79 )
80 check_interval = gv.settings().get_parallel_portfolio_check_interval()
81 instances_done = [False] * num_instances
82 # We record the 'best' of all seed results per solver-instance,
83 # setting start values for objectives that are always present
84 default_objective_values = {}
85 for o in objectives:
86 default_value = float(sys.maxsize) if o.minimise else 0
87 # Default values for time objectives can be linked to cutoff time
88 if o.time:
89 default_value = cutoff + 1.0
90 if o.post_process is not None:
91 default_value = o.post_process(default_value, cutoff)
92 default_objective_values[o.name] = default_value
93 job_output_dict = {instance_name: {solver.name: default_objective_values.copy()
94 for solver in solvers}
95 for instance_name in instances_set._instance_names}
96 n_instance_jobs = num_solvers * seeds_per_solver
98 with tqdm(total=len(instances_done)) as pbar:
99 pbar.set_description("Instances done")
100 while not all(instances_done):
101 prev_done = sum(instances_done)
102 time.sleep(check_interval)
103 job_status_list = [r.status for r in run.jobs]
104 job_status_completed = [status == Status.COMPLETED
105 for status in job_status_list]
106 # The jobs are sorted by instance
107 for i, instance in enumerate(instances_set._instance_paths):
108 if instances_done[i]:
109 continue
110 instance_job_slice = slice(i * n_instance_jobs,
111 (i + 1) * n_instance_jobs)
112 if any(job_status_completed[instance_job_slice]):
113 instances_done[i] = True
114 # Kill all running jobs for this instance
115 solver_kills = [0] * num_solvers
116 for job_index in range(i * n_instance_jobs,
117 (i + 1) * n_instance_jobs):
118 if not job_status_completed[job_index]:
119 run.jobs[job_index].kill()
120 solver_index = int(
121 (job_index % n_instance_jobs) / seeds_per_solver)
122 solver_kills[solver_index] += 1
123 for solver_index in range(num_solvers):
124 # All seeds of a solver were killed on instance, set status kill
125 if solver_kills[solver_index] == seeds_per_solver:
126 solver_name = solvers[solver_index].name
127 job_output_dict[instance.name][solver_name]["status"] =\
128 SolverStatus.KILLED
129 pbar.update(sum(instances_done) - prev_done)
131 # Attempt to verify that all logs have been written (Slurm I/O latency)
132 for index, cmd in enumerate(cmd_list):
133 runsolver_configuration = cmd.split(" ")[:11]
134 logs = [portfolio_path / p for p in runsolver_configuration
135 if Path(p).suffix in [".log", ".val", ".rawres"]]
136 if not all([p.exists() for p in logs]):
137 time.sleep(check_interval)
139 # Now iterate over runsolver logs to get runtime, get the lowest value per seed
140 for index, cmd in enumerate(cmd_list):
141 runsolver_configuration = cmd.split(" ")[:11]
142 solver_output = Solver.parse_solver_output(run.jobs[i].stdout,
143 runsolver_configuration,
144 portfolio_path)
145 solver_index = int((index % n_instance_jobs) / seeds_per_solver)
146 solver_name = solvers[solver_index].name
147 instance_name = instances_set._instance_names[int(index / n_instance_jobs)]
148 cpu_time = solver_output["cpu_time"]
149 cmd_output = job_output_dict[instance_name][solver_name]
150 if cpu_time > 0.0 and cpu_time < cmd_output["cpu_time"]:
151 for key, value in solver_output.items():
152 if key in [o.name for o in objectives]:
153 job_output_dict[instance_name][solver_name][key] = value
154 if "status" not in cmd_output or cmd_output["status"] != SolverStatus.KILLED:
155 cmd_output["status"] = solver_output["status"]
157 # Fix the CPU/WC time for non existent logs to instance min time + check_interval
158 for instance in job_output_dict.keys():
159 no_log_solvers = []
160 min_time = cutoff
161 for solver in job_output_dict[instance].keys():
162 cpu_time = job_output_dict[instance][solver]["cpu_time"]
163 if cpu_time == -1.0 or cpu_time == float(sys.maxsize):
164 no_log_solvers.append(solver)
165 elif cpu_time < min_time:
166 min_time = cpu_time
167 for solver in no_log_solvers:
168 job_output_dict[instance][solver]["cpu_time"] = min_time + check_interval
169 job_output_dict[instance][solver]["wall_time"] = min_time + check_interval
170 # Fix runtime objectives with resolved CPU/Wall times
171 for key, value in job_output_dict[instance][solver].items():
172 objective = resolve_objective(key)
173 if objective is not None and objective.time:
174 if objective.use_time == UseTime.CPU_TIME:
175 value = job_output_dict[instance][solver]["cpu_time"]
176 else:
177 value = job_output_dict[instance][solver]["wall_time"]
178 if objective.post_process is not None:
179 value = objective.post_process(value, cutoff)
180 job_output_dict[instance][solver][key] = value
182 for index, instance_name in enumerate(instances_set._instance_names):
183 index_str = f"[{index + 1}/{num_instances}] "
184 instance_output = job_output_dict[instance_name]
185 if all([instance_output[k]["status"] == SolverStatus.TIMEOUT
186 for k in instance_output.keys()]):
187 print(f"\n{index_str}{instance_name} was not solved within the cutoff-time.")
188 continue
189 print(f"\n{index_str}{instance_name} yielded the following Solver results:")
190 for sindex in range(index * num_solvers, (index + 1) * num_solvers):
191 solver_name = solvers[sindex % num_solvers].name
192 job_info = job_output_dict[instance_name][solver_name]
193 print(f"\t- {solver_name} ended with status {job_info['status']} in "
194 f"{job_info['cpu_time']}s CPU-Time ({job_info['wall_time']}s WC-Time)")
196 # Write the results to a CSV
197 csv_path = portfolio_path / "results.csv"
198 values_header = ["status"] + [o.name for o in objectives]
199 header = ["Instance", "Solver"] + values_header
200 result_rows = [header]
201 for instance_name in job_output_dict.keys():
202 for solver_name in job_output_dict[instance_name].keys():
203 job_o = job_output_dict[instance_name][solver_name]
204 values = [instance_name, solver_name] + [
205 job_o[key] if key in job_o else "None"
206 for key in values_header]
207 result_rows.append(values)
208 with csv_path.open("w") as out:
209 writer = csv.writer(out)
210 writer.writerows(result_rows)
213def parser_function() -> argparse.ArgumentParser:
214 """Define the command line arguments.
216 Returns:
217 parser: The parser with the parsed command line arguments
218 """
219 parser = argparse.ArgumentParser()
220 parser.add_argument(*ac.InstancePath.names,
221 **ac.InstancePath.kwargs)
222 parser.add_argument(*ac.NicknamePortfolioArgument.names,
223 **ac.NicknamePortfolioArgument.kwargs)
224 parser.add_argument(*ac.SolversArgument.names,
225 **ac.SolversArgument.kwargs)
226 parser.add_argument(*ac.SparkleObjectiveArgument.names,
227 **ac.SparkleObjectiveArgument.kwargs)
228 parser.add_argument(*ac.CutOffTimeArgument.names,
229 **ac.CutOffTimeArgument.kwargs)
230 parser.add_argument(*ac.SolverSeedsArgument.names,
231 **ac.SolverSeedsArgument.kwargs)
232 parser.add_argument(*ac.RunOnArgument.names,
233 **ac.RunOnArgument.kwargs)
234 parser.add_argument(*ac.SettingsFileArgument.names,
235 **ac.SettingsFileArgument.kwargs)
236 return parser
239if __name__ == "__main__":
240 # Log command call
241 sl.log_command(sys.argv)
243 # Define command line arguments
244 parser = parser_function()
246 # Process command line arguments
247 args = parser.parse_args()
248 if args.solvers is not None:
249 solver_paths = [resolve_object_name("".join(s),
250 target_dir=gv.settings().DEFAULT_solver_dir)
251 for s in args.solvers]
252 if None in solver_paths:
253 print("Some solvers not recognised! Check solver names:")
254 for i, name in enumerate(solver_paths):
255 if solver_paths[i] is None:
256 print(f'\t- "{solver_paths[i]}" ')
257 sys.exit(-1)
258 solvers = [Solver(p) for p in solver_paths]
259 else:
260 solvers = [Solver(p) for p in
261 gv.settings().DEFAULT_solver_dir.iterdir() if p.is_dir()]
263 check_for_initialise(COMMAND_DEPENDENCIES[CommandName.RUN_PARALLEL_PORTFOLIO])
265 # Compare current settings to latest.ini
266 prev_settings = Settings(PurePath("Settings/latest.ini"))
267 Settings.check_settings_changes(gv.settings(), prev_settings)
269 # Do first, so other command line options can override settings from the file
270 if args.settings_file is not None:
271 gv.settings().read_settings_ini(args.settings_file, SettingState.CMD_LINE)
273 portfolio_path = args.portfolio_name
275 if args.run_on is not None:
276 gv.settings().set_run_on(
277 args.run_on.value, SettingState.CMD_LINE)
278 run_on = gv.settings().get_run_on()
280 if args.solver_seeds is not None:
281 gv.settings().set_parallel_portfolio_number_of_seeds_per_solver(
282 args.solver_seeds, SettingState.CMD_LINE)
284 if run_on == Runner.LOCAL:
285 print("Parallel Portfolio is not fully supported yet for Local runs. Exiting.")
286 sys.exit(-1)
288 # Retrieve instance set
289 data_set = resolve_object_name(
290 args.instance_path,
291 gv.file_storage_data_mapping[gv.instances_nickname_path],
292 gv.settings().DEFAULT_instance_dir,
293 instance_set)
294 print(f"Running on {data_set.size} instance(s)...")
296 if args.cutoff_time is not None:
297 gv.settings().set_general_target_cutoff_time(args.cutoff_time,
298 SettingState.CMD_LINE)
300 if args.objectives is not None:
301 gv.settings().set_general_sparkle_objectives(
302 args.objectives, SettingState.CMD_LINE)
303 if not gv.settings().get_general_sparkle_objectives()[0].time:
304 print("ERROR: Parallel Portfolio is currently only relevant for "
305 "RunTime objectives. In all other cases, use validation")
306 sys.exit(-1)
308 if args.portfolio_name is not None: # Use a nickname
309 portfolio_path = gv.settings().DEFAULT_parallel_portfolio_output_raw /\
310 args.portfolio_name
311 else: # Generate a timestamped nickname
312 timestamp = time.strftime("%Y-%m-%d-%H:%M:%S", time.gmtime(time.time()))
313 randintstamp = int(random.getrandbits(32))
314 portfolio_path = gv.settings().DEFAULT_parallel_portfolio_output_raw /\
315 f"{timestamp}_{randintstamp}"
316 if portfolio_path.exists():
317 print(f"[WARNING] Portfolio path {portfolio_path} already exists! "
318 "Overwrite? [y/n] ", end="")
319 user_input = input()
320 if user_input != "y":
321 sys.exit()
322 shutil.rmtree(portfolio_path)
323 portfolio_path.mkdir(parents=True)
324 run_parallel_portfolio(data_set, portfolio_path, solvers, run_on=run_on)
326 # Update latest scenario
327 gv.latest_scenario().set_parallel_portfolio_path(portfolio_path)
328 gv.latest_scenario().set_latest_scenario(Scenario.PARALLEL_PORTFOLIO)
329 gv.latest_scenario().set_parallel_portfolio_instance_path(args.instance_path)
330 # Write used scenario to file
331 gv.latest_scenario().write_scenario_ini()
332 # Write used settings to file
333 gv.settings().write_used_settings()
334 print("Running Sparkle parallel portfolio is done!")