Coverage for sparkle/CLI/run_solvers.py: 91%
134 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-07 15:22 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-07 15:22 +0000
1#!/usr/bin/env python3
2"""Sparkle command to run solvers to get their performance data."""
3from __future__ import annotations
4import random
5import sys
6import ast
7import argparse
8from pathlib import PurePath, Path
10from runrunner.base import Runner, Run
12from sparkle.solver import Solver
13from sparkle.instance import Instance_Set
14from sparkle.structures import PerformanceDataFrame
15from sparkle.types import SparkleObjective, resolve_objective
16from sparkle.platform.settings_objects import Settings, SettingState
17from sparkle.CLI.help import global_variables as gv
18from sparkle.CLI.help import logging as sl
19from sparkle.CLI.help import argparse_custom as ac
20from sparkle.CLI.help.nicknames import resolve_object_name
21from sparkle.CLI.initialise import check_for_initialise
24def parser_function() -> argparse.ArgumentParser:
25 """Define the command line arguments."""
26 parser = argparse.ArgumentParser(
27 description="Run solvers on instances to get their performance data.")
28 parser.add_argument(*ac.SolversArgument.names,
29 **ac.SolversArgument.kwargs)
30 parser.add_argument(*ac.InstanceSetPathsArgument.names,
31 **ac.InstanceSetPathsArgument.kwargs)
33 # Mutually exclusive: specific configuration or best configuration
34 configuration_group = parser.add_mutually_exclusive_group()
35 configuration_group.add_argument(*ac.ConfigurationArgument.names,
36 **ac.ConfigurationArgument.kwargs)
37 configuration_group.add_argument(*ac.BestConfigurationArgument.names,
38 **ac.BestConfigurationArgument.kwargs)
40 parser.add_argument(*ac.ObjectiveArgument.names,
41 **ac.ObjectiveArgument.kwargs)
42 parser.add_argument(*ac.PerformanceDataJobsArgument.names,
43 **ac.PerformanceDataJobsArgument.kwargs)
44 # This one is only relevant if the argument above is given
45 parser.add_argument(*ac.RecomputeRunSolversArgument.names,
46 **ac.RecomputeRunSolversArgument.kwargs)
47 parser.add_argument(*ac.TargetCutOffTimeArgument.names,
48 **ac.TargetCutOffTimeArgument.kwargs)
49 parser.add_argument(*ac.RunOnArgument.names,
50 **ac.RunOnArgument.kwargs)
51 parser.add_argument(*ac.SettingsFileArgument.names,
52 **ac.SettingsFileArgument.kwargs)
53 return parser
56def run_solvers(
57 solvers: list[Solver],
58 instances: list[str],
59 objectives: list[SparkleObjective],
60 seed: int,
61 cutoff_time: int,
62 configuration: list[dict] = None,
63 sbatch_options: list[str] = None,
64 log_dir: Path = None,
65 run_on: Runner = Runner.SLURM,) -> list[Run]:
66 """Run the solvers.
68 Parameters
69 ----------
70 solvers: list[solvers]
71 The solvers to run
72 instances: list[str]
73 The instances to run the solvers on
74 objectives: list[SparkleObjective]
75 The objective values to retrieve from the solvers
76 seed: int
77 The seed to use
78 cutoff_time: int
79 The cut off time for the solvers
80 configuration: list[dict]
81 The configuration to use for the solvers
82 sbatch_options: list[str]
83 The sbatch options to use for the solvers
84 log_dir: Path
85 The directory to use for the logs
86 run_on: Runner
87 Where to execute the solvers.
89 Returns
90 -------
91 run: runrunner.LocalRun or runrunner.SlurmRun
92 """
93 runs = []
94 # Run the solvers
95 for solver, configuration in zip(solvers, configuration):
96 run = solver.run(instances=instances,
97 objectives=objectives,
98 seed=seed,
99 configuration=configuration,
100 cutoff_time=cutoff_time,
101 run_on=run_on,
102 sbatch_options=sbatch_options,
103 log_dir=log_dir)
104 if run_on == Runner.LOCAL:
105 if isinstance(run, dict):
106 run = [run]
107 # Resolve objective keys
108 status_key = [key for key in run[0] if key.lower().startswith("status")][0]
109 time_key = [key for key in run[0] if key.lower().startswith("cpu_time")][0]
110 for i, solver_output in enumerate(run):
111 print(f"Execution of {solver.name} on instance {instances[i]} "
112 f"completed with status {solver_output[status_key]} "
113 f"in {solver_output[time_key]} seconds.")
114 print("Running configured solver done!")
115 else:
116 runs.append(run)
117 return runs
120def run_solvers_performance_data(
121 performance_data: PerformanceDataFrame,
122 cutoff_time: int,
123 rerun: bool = False,
124 solvers: list[Solver] = None,
125 instances: list[str] = None,
126 sbatch_options: list[str] = None,
127 run_on: Runner = Runner.SLURM) -> list[Run]:
128 """Run the solvers for the performance data.
130 Parameters
131 ----------
132 performance_data: PerformanceDataFrame
133 The performance data
134 cutoff_time: int
135 The cut off time for the solvers
136 rerun: bool
137 Run only solvers for which no data is available yet (False) or (re)run all
138 solvers to get (new) performance data for them (True)
139 solvers: list[solvers]
140 The solvers to run. If None, run all found solvers.
141 instances: list[str]
142 The instances to run the solvers on. If None, run all found instances.
143 sbatch_options: list[str]
144 The sbatch options to use
145 run_on: Runner
146 Where to execute the solvers. For available values see runrunner.base.Runner
147 enum. Default: "Runner.SLURM".
149 Returns
150 -------
151 run: runrunner.LocalRun or runrunner.SlurmRun
152 If the run is local return a QueuedRun object with the information concerning
153 the run.
154 """
155 # List of jobs to do
156 jobs = performance_data.get_job_list(rerun=rerun)
157 num_jobs = len(jobs)
159 print(f"Total number of jobs to run: {num_jobs}")
161 # If there are no jobs, stop
162 if num_jobs == 0:
163 return None
165 if run_on == Runner.LOCAL:
166 print("Running the solvers locally")
167 elif run_on == Runner.SLURM:
168 print("Running the solvers through Slurm")
170 # Sort the jobs per solver
171 solver_jobs = {p_solver: {} for _, _, p_solver in jobs}
172 for p_instance, p_run, p_solver in jobs:
173 if p_instance not in solver_jobs[p_solver]:
174 solver_jobs[p_solver][p_instance] = [p_run]
175 else:
176 solver_jobs[p_solver][p_instance].append(p_run)
177 runrunner_runs = []
178 solvers = [Solver(Path(p))
179 for p in performance_data.solvers] if solvers is None else solvers
180 if run_on == Runner.LOCAL:
181 print(f"Cutoff time for each solver run: {cutoff_time} seconds")
182 for solver in solvers:
183 solver_key = str(solver.directory)
184 solver_instances = solver_jobs[solver_key].keys()
185 if instances: # Filter
186 solver_instances = [i for i in solver_instances if i in instances]
187 runs = list(solver_jobs[solver_key][i] for i in solver_instances)
188 if solver_instances == []:
189 print(f"Warning: No jobs for instances found for solver {solver_key}")
190 continue
191 run = solver.run_performance_dataframe(
192 solver_instances, runs, performance_data, cutoff_time=cutoff_time,
193 sbatch_options=sbatch_options, log_dir=sl.caller_log_dir,
194 base_dir=sl.caller_log_dir, run_on=run_on)
195 runrunner_runs.append(run)
196 if run_on == Runner.LOCAL:
197 # Do some printing?
198 pass
199 if run_on == Runner.SLURM:
200 num_jobs = sum(len(r.jobs) for r in runrunner_runs)
201 print(f"Total number of jobs submitted: {num_jobs}")
203 return runrunner_runs
206def main(argv: list[str]) -> None:
207 """Main function of the run solvers command."""
208 # Log command call
209 sl.log_command(sys.argv)
210 check_for_initialise()
212 # Define command line arguments
213 parser = parser_function()
215 # Process command line arguments
216 args = parser.parse_args(argv)
217 if args.settings_file is not None:
218 # Do first, so other command line options can override settings from the file
219 gv.settings().read_settings_ini(args.settings_file, SettingState.CMD_LINE)
220 if args.target_cutoff_time is not None:
221 gv.settings().set_general_target_cutoff_time(
222 args.target_cutoff_time, SettingState.CMD_LINE)
223 if args.run_on is not None:
224 gv.settings().set_run_on(
225 args.run_on.value, SettingState.CMD_LINE)
226 if args.best_configuration or args.configuration:
227 if not args.objective:
228 objective = gv.settings().get_general_sparkle_objectives()[0]
229 print("WARNING: Best configuration requested, but no objective specified. "
230 f"Revert to first objective ({objective}).")
231 else:
232 objective = resolve_objective(args.objective)
234 # Compare current settings to latest.ini
235 prev_settings = Settings(PurePath("Settings/latest.ini"))
236 Settings.check_settings_changes(gv.settings(), prev_settings)
238 if args.solvers:
239 solvers = [resolve_object_name(solver_path,
240 gv.file_storage_data_mapping[gv.solver_nickname_list_path],
241 gv.settings().DEFAULT_solver_dir, Solver)
242 for solver_path in args.solvers]
243 else:
244 solvers = [Solver(p) for p in
245 gv.settings().DEFAULT_solver_dir.iterdir() if p.is_dir()]
247 if args.instance_path:
248 instances = [resolve_object_name(instance_path,
249 gv.file_storage_data_mapping[gv.instances_nickname_path],
250 gv.settings().DEFAULT_instance_dir, Instance_Set)
251 for instance_path in args.instance_path]
252 # Unpack the sets into instance strings
253 instances = [str(path) for set in instances for path in set.instance_paths]
254 else:
255 instances = None # TODO: Fix? Or its good like this
257 sbatch_options = gv.settings().get_slurm_extra_options(as_args=True)
258 # Write settings to file before starting, since they are used in callback scripts
259 gv.settings().write_used_settings()
260 run_on = gv.settings().get_run_on()
261 cutoff_time = gv.settings().get_general_target_cutoff_time()
262 # Open the performance data csv file
263 performance_dataframe = PerformanceDataFrame(
264 gv.settings().DEFAULT_performance_data_path)
266 print("Start running solvers ...")
267 if args.performance_data_jobs:
268 runs = run_solvers_performance_data(
269 performance_data=performance_dataframe,
270 solvers=solvers,
271 instances=instances,
272 cutoff_time=cutoff_time,
273 rerun=args.recompute,
274 sbatch_options=sbatch_options,
275 run_on=run_on)
276 else:
277 configurations = [None] * len(solvers)
278 if args.best_configuration:
279 train_instances = None
280 if isinstance(args.best_configuration, list):
281 train_instances = [resolve_object_name(
282 instance_path,
283 gv.file_storage_data_mapping[gv.instances_nickname_path],
284 gv.settings().DEFAULT_instance_dir, Instance_Set)
285 for instance_path in args.best_configuration]
286 # Unpack the sets into instance strings
287 instances = [str(path) for set in train_instances
288 for path in set.instance_paths]
289 # Determine best configuration
290 configurations = [performance_dataframe.best_configuration(
291 str(solver.directory), objective, train_instances)[0]
292 for solver in solvers]
293 elif args.configuration:
294 # Use given configurations
295 # Hotfix: We take the first instance in the DF. Might not work in some cases
296 instance = performance_dataframe.instances[0]
297 configurations = [ast.literal_eval(performance_dataframe.get_value(
298 str(solver.directory), instance, objective.name, run=args.configuration,
299 solver_fields=[PerformanceDataFrame.column_configuration]))
300 for solver in solvers]
301 if instances is None:
302 instances = performance_dataframe.instances
303 runs = run_solvers(
304 solvers=solvers,
305 configuration=configurations,
306 instances=instances,
307 objectives=gv.settings().get_general_sparkle_objectives(),
308 seed=random.randint(0, sys.maxsize),
309 cutoff_time=cutoff_time,
310 sbatch_options=sbatch_options,
311 log_dir=sl.caller_log_dir,
312 run_on=gv.settings().get_run_on(),
313 )
315 # If there are no jobs return
316 if runs is None or all(run is None for run in runs):
317 print("Running solvers done!")
318 elif run_on == Runner.SLURM:
319 print("Running solvers. Waiting for Slurm job(s) with id(s): "
320 f'{",".join(r.run_id for r in runs if r is not None)}')
321 sys.exit(0)
324if __name__ == "__main__":
325 main(sys.argv[1:])