Coverage for sparkle/CLI/run_solvers.py: 91%
135 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-03 10:42 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-03 10:42 +0000
1#!/usr/bin/env python3
2"""Sparkle command to run solvers to get their performance data."""
3from __future__ import annotations
4import random
5import sys
6import ast
7import argparse
8from pathlib import PurePath, Path
10from runrunner.base import Runner, Run
12from sparkle.solver import Solver
13from sparkle.instance import Instance_Set
14from sparkle.structures import PerformanceDataFrame
15from sparkle.types import SparkleObjective, resolve_objective
16from sparkle.platform.settings_objects import Settings, SettingState
17from sparkle.CLI.help import global_variables as gv
18from sparkle.CLI.help import logging as sl
19from sparkle.CLI.help import argparse_custom as ac
20from sparkle.CLI.help.nicknames import resolve_object_name
21from sparkle.CLI.initialise import check_for_initialise
24def parser_function() -> argparse.ArgumentParser:
25 """Define the command line arguments."""
26 parser = argparse.ArgumentParser(
27 description="Run solvers on instances to get their performance data.")
28 parser.add_argument(*ac.SolversArgument.names,
29 **ac.SolversArgument.kwargs)
30 parser.add_argument(*ac.InstanceSetPathsArgument.names,
31 **ac.InstanceSetPathsArgument.kwargs)
33 # Mutually exclusive: specific configuration or best configuration
34 configuration_group = parser.add_mutually_exclusive_group()
35 configuration_group.add_argument(*ac.ConfigurationArgument.names,
36 **ac.ConfigurationArgument.kwargs)
37 configuration_group.add_argument(*ac.BestConfigurationArgument.names,
38 **ac.BestConfigurationArgument.kwargs)
40 parser.add_argument(*ac.ObjectiveArgument.names,
41 **ac.ObjectiveArgument.kwargs)
42 parser.add_argument(*ac.PerformanceDataJobsArgument.names,
43 **ac.PerformanceDataJobsArgument.kwargs)
44 # This one is only relevant if the argument above is given
45 parser.add_argument(*ac.RecomputeRunSolversArgument.names,
46 **ac.RecomputeRunSolversArgument.kwargs)
47 parser.add_argument(*ac.TargetCutOffTimeArgument.names,
48 **ac.TargetCutOffTimeArgument.kwargs)
49 parser.add_argument(*ac.RunOnArgument.names,
50 **ac.RunOnArgument.kwargs)
51 parser.add_argument(*ac.SettingsFileArgument.names,
52 **ac.SettingsFileArgument.kwargs)
53 return parser
56def run_solvers(
57 solvers: list[Solver],
58 instances: list[str],
59 objectives: list[SparkleObjective],
60 seed: int,
61 cutoff_time: int,
62 configuration: list[dict] = None,
63 sbatch_options: list[str] = None,
64 slurm_prepend: str | list[str] | Path = None,
65 log_dir: Path = None,
66 run_on: Runner = Runner.SLURM,) -> list[Run]:
67 """Run the solvers.
69 Parameters
70 ----------
71 solvers: list[solvers]
72 The solvers to run
73 instances: list[str]
74 The instances to run the solvers on
75 objectives: list[SparkleObjective]
76 The objective values to retrieve from the solvers
77 seed: int
78 The seed to use
79 cutoff_time: int
80 The cut off time for the solvers
81 configuration: list[dict]
82 The configuration to use for the solvers
83 sbatch_options: list[str]
84 The sbatch options to use for the solvers
85 slurm_prepend: str | list[str] | Path
86 The script to prepend to a slurm script
87 log_dir: Path
88 The directory to use for the logs
89 run_on: Runner
90 Where to execute the solvers.
92 Returns
93 -------
94 run: runrunner.LocalRun or runrunner.SlurmRun
95 """
96 runs = []
97 # Run the solvers
98 for solver, configuration in zip(solvers, configuration):
99 run = solver.run(instances=instances,
100 objectives=objectives,
101 seed=seed,
102 configuration=configuration,
103 cutoff_time=cutoff_time,
104 run_on=run_on,
105 sbatch_options=sbatch_options,
106 slurm_prepend=slurm_prepend,
107 log_dir=log_dir)
108 if run_on == Runner.LOCAL:
109 if isinstance(run, dict):
110 run = [run]
111 # Resolve objective keys
112 status_key = [key for key in run[0] if key.lower().startswith("status")][0]
113 time_key = [key for key in run[0] if key.lower().startswith("cpu_time")][0]
114 for i, solver_output in enumerate(run):
115 print(f"Execution of {solver.name} on instance {instances[i]} "
116 f"completed with status {solver_output[status_key]} "
117 f"in {solver_output[time_key]} seconds.")
118 print("Running configured solver done!")
119 else:
120 runs.append(run)
121 return runs
124def run_solvers_performance_data(
125 performance_data: PerformanceDataFrame,
126 cutoff_time: int,
127 rerun: bool = False,
128 solvers: list[Solver] = None,
129 instances: list[str] = None,
130 sbatch_options: list[str] = None,
131 slurm_prepend: str | list[str] | Path = None,
132 run_on: Runner = Runner.SLURM) -> list[Run]:
133 """Run the solvers for the performance data.
135 Parameters
136 ----------
137 performance_data: PerformanceDataFrame
138 The performance data
139 cutoff_time: int
140 The cut off time for the solvers
141 rerun: bool
142 Run only solvers for which no data is available yet (False) or (re)run all
143 solvers to get (new) performance data for them (True)
144 solvers: list[solvers]
145 The solvers to run. If None, run all found solvers.
146 instances: list[str]
147 The instances to run the solvers on. If None, run all found instances.
148 sbatch_options: list[str]
149 The sbatch options to use
150 slurm_prepend: str | list[str] | Path
151 The script to prepend to a slurm script
152 run_on: Runner
153 Where to execute the solvers. For available values see runrunner.base.Runner
154 enum. Default: "Runner.SLURM".
156 Returns
157 -------
158 run: runrunner.LocalRun or runrunner.SlurmRun
159 If the run is local return a QueuedRun object with the information concerning
160 the run.
161 """
162 # List of jobs to do
163 jobs = performance_data.get_job_list(rerun=rerun)
164 num_jobs = len(jobs)
166 print(f"Total number of jobs to run: {num_jobs}")
168 # If there are no jobs, stop
169 if num_jobs == 0:
170 return None
172 if run_on == Runner.LOCAL:
173 print("Running the solvers locally")
174 elif run_on == Runner.SLURM:
175 print("Running the solvers through Slurm")
177 # Sort the jobs per solver
178 solver_jobs = {p_solver: {} for _, _, p_solver in jobs}
179 for p_instance, p_run, p_solver in jobs:
180 if p_instance not in solver_jobs[p_solver]:
181 solver_jobs[p_solver][p_instance] = [p_run]
182 else:
183 solver_jobs[p_solver][p_instance].append(p_run)
184 runrunner_runs = []
185 solvers = [Solver(Path(p))
186 for p in performance_data.solvers] if solvers is None else solvers
187 if run_on == Runner.LOCAL:
188 print(f"Cutoff time for each solver run: {cutoff_time} seconds")
189 for solver in solvers:
190 solver_key = str(solver.directory)
191 solver_instances = solver_jobs[solver_key].keys()
192 if instances: # Filter
193 solver_instances = [i for i in solver_instances if i in instances]
194 runs = list(solver_jobs[solver_key][i] for i in solver_instances)
195 if solver_instances == []:
196 print(f"Warning: No jobs for instances found for solver {solver_key}")
197 continue
198 run = solver.run_performance_dataframe(
199 solver_instances, runs, performance_data, cutoff_time=cutoff_time,
200 sbatch_options=sbatch_options, slurm_prepend=slurm_prepend,
201 log_dir=sl.caller_log_dir, base_dir=sl.caller_log_dir, run_on=run_on)
202 runrunner_runs.append(run)
203 if run_on == Runner.LOCAL:
204 # Do some printing?
205 pass
206 if run_on == Runner.SLURM:
207 num_jobs = sum(len(r.jobs) for r in runrunner_runs)
208 print(f"Total number of jobs submitted: {num_jobs}")
210 return runrunner_runs
213def main(argv: list[str]) -> None:
214 """Main function of the run solvers command."""
215 # Log command call
216 sl.log_command(sys.argv)
217 check_for_initialise()
219 # Define command line arguments
220 parser = parser_function()
222 # Process command line arguments
223 args = parser.parse_args(argv)
224 if args.settings_file is not None:
225 # Do first, so other command line options can override settings from the file
226 gv.settings().read_settings_ini(args.settings_file, SettingState.CMD_LINE)
227 if args.target_cutoff_time is not None:
228 gv.settings().set_general_target_cutoff_time(
229 args.target_cutoff_time, SettingState.CMD_LINE)
230 if args.run_on is not None:
231 gv.settings().set_run_on(
232 args.run_on.value, SettingState.CMD_LINE)
233 if args.best_configuration or args.configuration:
234 if not args.objective:
235 objective = gv.settings().get_general_sparkle_objectives()[0]
236 print("WARNING: Best configuration requested, but no objective specified. "
237 f"Revert to first objective ({objective}).")
238 else:
239 objective = resolve_objective(args.objective)
241 # Compare current settings to latest.ini
242 prev_settings = Settings(PurePath("Settings/latest.ini"))
243 Settings.check_settings_changes(gv.settings(), prev_settings)
245 if args.solvers:
246 solvers = [resolve_object_name(solver_path,
247 gv.file_storage_data_mapping[gv.solver_nickname_list_path],
248 gv.settings().DEFAULT_solver_dir, Solver)
249 for solver_path in args.solvers]
250 else:
251 solvers = [Solver(p) for p in
252 gv.settings().DEFAULT_solver_dir.iterdir() if p.is_dir()]
254 if args.instance_path:
255 instances = [resolve_object_name(instance_path,
256 gv.file_storage_data_mapping[gv.instances_nickname_path],
257 gv.settings().DEFAULT_instance_dir, Instance_Set)
258 for instance_path in args.instance_path]
259 # Unpack the sets into instance strings
260 instances = [str(path) for set in instances for path in set.instance_paths]
261 else:
262 instances = None # TODO: Fix? Or its good like this
264 sbatch_options = gv.settings().get_slurm_extra_options(as_args=True)
265 slurm_prepend = gv.settings().get_slurm_job_prepend()
266 # Write settings to file before starting, since they are used in callback scripts
267 gv.settings().write_used_settings()
268 run_on = gv.settings().get_run_on()
269 cutoff_time = gv.settings().get_general_target_cutoff_time()
270 # Open the performance data csv file
271 performance_dataframe = PerformanceDataFrame(
272 gv.settings().DEFAULT_performance_data_path)
274 print("Start running solvers ...")
275 if args.performance_data_jobs:
276 runs = run_solvers_performance_data(
277 performance_data=performance_dataframe,
278 solvers=solvers,
279 instances=instances,
280 cutoff_time=cutoff_time,
281 rerun=args.recompute,
282 sbatch_options=sbatch_options,
283 slurm_prepend=slurm_prepend,
284 run_on=run_on)
285 else:
286 configurations = [None] * len(solvers)
287 if args.best_configuration:
288 train_instances = None
289 if isinstance(args.best_configuration, list):
290 train_instances = [resolve_object_name(
291 instance_path,
292 gv.file_storage_data_mapping[gv.instances_nickname_path],
293 gv.settings().DEFAULT_instance_dir, Instance_Set)
294 for instance_path in args.best_configuration]
295 # Unpack the sets into instance strings
296 instances = [str(path) for set in train_instances
297 for path in set.instance_paths]
298 # Determine best configuration
299 configurations = [performance_dataframe.best_configuration(
300 str(solver.directory), objective, train_instances)[0]
301 for solver in solvers]
302 elif args.configuration:
303 # Use given configurations
304 # Hotfix: We take the first instance in the DF. Might not work in some cases
305 instance = performance_dataframe.instances[0]
306 configurations = [ast.literal_eval(performance_dataframe.get_value(
307 str(solver.directory), instance, objective.name, run=args.configuration,
308 solver_fields=[PerformanceDataFrame.column_configuration]))
309 for solver in solvers]
310 if instances is None:
311 instances = performance_dataframe.instances
312 runs = run_solvers(
313 solvers=solvers,
314 configuration=configurations,
315 instances=instances,
316 objectives=gv.settings().get_general_sparkle_objectives(),
317 seed=random.randint(0, sys.maxsize),
318 cutoff_time=cutoff_time,
319 sbatch_options=sbatch_options,
320 slurm_prepend=slurm_prepend,
321 log_dir=sl.caller_log_dir,
322 run_on=gv.settings().get_run_on(),
323 )
325 # If there are no jobs return
326 if runs is None or all(run is None for run in runs):
327 print("Running solvers done!")
328 elif run_on == Runner.SLURM:
329 print("Running solvers. Waiting for Slurm job(s) with id(s): "
330 f'{",".join(r.run_id for r in runs if r is not None)}')
331 sys.exit(0)
334if __name__ == "__main__":
335 main(sys.argv[1:])