Coverage for sparkle/solver/solver.py: 90%
213 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-07-01 13:21 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-07-01 13:21 +0000
1"""File to handle a solver and its directories."""
2from __future__ import annotations
3import sys
4from typing import Any
5import shlex
6import ast
7import json
8from pathlib import Path
10from ConfigSpace import ConfigurationSpace
12import runrunner as rrr
13from runrunner.local import LocalRun
14from runrunner.slurm import Run, SlurmRun
15from runrunner.base import Status, Runner
17from sparkle.tools.parameters import PCSConverter, PCSConvention
18from sparkle.tools import RunSolver
19from sparkle.types import SparkleCallable, SolverStatus
20from sparkle.solver import verifiers
21from sparkle.instance import InstanceSet
22from sparkle.structures import PerformanceDataFrame
23from sparkle.types import resolve_objective, SparkleObjective, UseTime
26class Solver(SparkleCallable):
27 """Class to handle a solver and its directories."""
28 meta_data = "solver_meta.txt"
29 _wrapper_file = "sparkle_solver_wrapper"
30 solver_cli = Path(__file__).parent / "solver_cli.py"
32 def __init__(self: Solver,
33 directory: Path,
34 runsolver_exec: Path = None,
35 deterministic: bool = None,
36 verifier: verifiers.SolutionVerifier = None) -> None:
37 """Initialize solver.
39 Args:
40 directory: Directory of the solver.
41 runsolver_exec: Path to the runsolver executable.
42 By default, runsolver in directory.
43 deterministic: Bool indicating determinism of the algorithm.
44 Defaults to False.
45 verifier: The solution verifier to use. If None, no verifier is used.
46 """
47 super().__init__(directory, runsolver_exec)
48 self.deterministic = deterministic
49 self.verifier = verifier
50 self._pcs_file: Path = None
51 self._interpreter: str = None
52 self._wrapper_extension: str = None
54 meta_data_file = self.directory / Solver.meta_data
55 if self.runsolver_exec is None:
56 self.runsolver_exec = self.directory / "runsolver"
57 if meta_data_file.exists():
58 meta_data = ast.literal_eval(meta_data_file.open().read())
59 # We only override the deterministic and verifier from file if not set
60 if self.deterministic is None:
61 if ("deterministic" in meta_data
62 and meta_data["deterministic"] is not None):
63 self.deterministic = meta_data["deterministic"]
64 if self.verifier is None and "verifier" in meta_data:
65 if isinstance(meta_data["verifier"], tuple): # File verifier
66 self.verifier = verifiers.mapping[meta_data["verifier"][0]](
67 Path(meta_data["verifier"][1])
68 )
69 elif meta_data["verifier"] in verifiers.mapping:
70 self.verifier = verifiers.mapping[meta_data["verifier"]]
71 if self.deterministic is None: # Default to False
72 self.deterministic = False
74 def __str__(self: Solver) -> str:
75 """Return the string representation of the solver."""
76 return self.name
78 def __repr__(self: Solver) -> str:
79 """Return detailed representation of the solver."""
80 return f"{self.name}:\n"\
81 f"\t- Directory: {self.directory}\n"\
82 f"\t- Deterministic: {self.deterministic}\n"\
83 f"\t- Verifier: {self.verifier}\n"\
84 f"\t- PCS File: {self.pcs_file}\n"\
85 f"\t- Wrapper: {self.wrapper}"
87 @property
88 def pcs_file(self: Solver) -> Path:
89 """Get path of the parameter file."""
90 if self._pcs_file is None:
91 files = sorted([p for p in self.directory.iterdir() if p.suffix == ".pcs"])
92 if len(files) == 0:
93 return None
94 self._pcs_file = files[0]
95 return self._pcs_file
97 @property
98 def wrapper_extension(self: Solver) -> str:
99 """Get the extension of the wrapper file."""
100 if self._wrapper_extension is None:
101 # Determine which file is the wrapper by sorting alphabetically
102 wrapper = sorted([p for p in self.directory.iterdir()
103 if p.stem == Solver._wrapper_file])[0]
104 self._wrapper_extension = wrapper.suffix
105 return self._wrapper_extension
107 @property
108 def wrapper(self: Solver) -> str:
109 """Get name of the wrapper file."""
110 return f"{Solver._wrapper_file}{self.wrapper_extension}"
112 @property
113 def wrapper_file(self: Solver) -> Path:
114 """Get path of the wrapper file."""
115 return self.directory / self.wrapper
117 def get_pcs_file(self: Solver, port_type: PCSConvention) -> Path:
118 """Get path of the parameter file of a specific convention.
120 Args:
121 port_type: Port type of the parameter file. If None, will return the
122 file with the shortest name.
124 Returns:
125 Path to the parameter file. None if it can not be resolved.
126 """
127 pcs_files = sorted([p for p in self.directory.iterdir() if p.suffix == ".pcs"])
128 if port_type is None:
129 return pcs_files[0]
130 for file in pcs_files:
131 if port_type == PCSConverter.get_convention(file):
132 return file
133 return None
135 def read_pcs_file(self: Solver) -> bool:
136 """Checks if the pcs file can be read."""
137 # TODO: Should be a .validate method instead
138 return PCSConverter.get_convention(self.pcs_file) is not None
140 def get_configuration_space(self: Solver) -> ConfigurationSpace:
141 """Get the ConfigurationSpace of the PCS file."""
142 if not self.pcs_file:
143 return None
144 return PCSConverter.parse(self.pcs_file)
146 def port_pcs(self: Solver, port_type: PCSConvention) -> None:
147 """Port the parameter file to the given port type."""
148 target_pcs_file =\
149 self.pcs_file.parent / f"{self.pcs_file.stem}_{port_type.name}.pcs"
150 if target_pcs_file.exists(): # Already exists, possibly user defined
151 return
152 PCSConverter.export(self.get_configuration_space(), port_type, target_pcs_file)
154 def build_cmd(self: Solver,
155 instance: str | list[str],
156 objectives: list[SparkleObjective],
157 seed: int,
158 cutoff_time: int = None,
159 configuration: dict = None,
160 log_dir: Path = None) -> list[str]:
161 """Build the solver call on an instance with a configuration.
163 Args:
164 instance: Path to the instance.
165 seed: Seed of the solver.
166 cutoff_time: Cutoff time for the solver.
167 configuration: Configuration of the solver.
169 Returns:
170 List of commands and arguments to execute the solver.
171 """
172 if configuration is None:
173 configuration = {}
174 # Ensure configuration contains required entries for each wrapper
175 configuration["solver_dir"] = str(self.directory.absolute())
176 configuration["instance"] = instance
177 configuration["seed"] = seed
178 configuration["objectives"] = ",".join([str(obj) for obj in objectives])
179 configuration["cutoff_time"] =\
180 cutoff_time if cutoff_time is not None else sys.maxsize
181 if "configuration_id" in configuration:
182 del configuration["configuration_id"]
183 # Ensure stringification of dictionary will go correctly for key value pairs
184 configuration = {key: str(configuration[key]) for key in configuration}
185 solver_cmd = [str(self.directory / self.wrapper),
186 f"'{json.dumps(configuration)}'"]
187 if log_dir is None:
188 log_dir = Path()
189 if cutoff_time is not None: # Use RunSolver
190 log_path_str = instance[0] if isinstance(instance, list) else instance
191 log_name_base = f"{Path(log_path_str).name}_{self.name}"
192 return RunSolver.wrap_command(self.runsolver_exec,
193 solver_cmd,
194 cutoff_time,
195 log_dir,
196 log_name_base=log_name_base)
197 return solver_cmd
199 def run(self: Solver,
200 instances: str | list[str] | InstanceSet | list[InstanceSet],
201 objectives: list[SparkleObjective],
202 seed: int,
203 cutoff_time: int = None,
204 configuration: dict = None,
205 run_on: Runner = Runner.LOCAL,
206 sbatch_options: list[str] = None,
207 slurm_prepend: str | list[str] | Path = None,
208 log_dir: Path = None,
209 ) -> SlurmRun | list[dict[str, Any]] | dict[str, Any]:
210 """Run the solver on an instance with a certain configuration.
212 Args:
213 instance: The instance(s) to run the solver on, list in case of multi-file.
214 In case of an instance set, will run on all instances in the set.
215 seed: Seed to run the solver with. Fill with abitrary int in case of
216 determnistic solver.
217 cutoff_time: The cutoff time for the solver, measured through RunSolver.
218 If None, will be executed without RunSolver.
219 configuration: The solver configuration to use. Can be empty.
220 run_on: Whether to run on slurm or locally.
221 sbatch_options: The sbatch options to use.
222 slurm_prepend: The script to prepend to a slurm script.
223 log_dir: The log directory to use.
225 Returns:
226 Solver output dict possibly with runsolver values.
227 """
228 cmds = []
229 set_label = instances.name if isinstance(
230 instances, InstanceSet) else "instances"
231 instances = [instances] if not isinstance(instances, list) else instances
232 log_dir = Path() if log_dir is None else log_dir
233 for instance in instances:
234 paths = instance.instance_paths if isinstance(instance,
235 InstanceSet) else [instance]
236 for instance_path in paths:
237 instance_path = [str(p) for p in instance_path] if isinstance(
238 instance_path, list) else instance_path
239 solver_cmd = self.build_cmd(instance_path,
240 objectives=objectives,
241 seed=seed,
242 cutoff_time=cutoff_time,
243 configuration=configuration,
244 log_dir=log_dir)
245 cmds.append(" ".join(solver_cmd))
247 commandname = f"Run Solver: {self.name} on {set_label}"
248 run = rrr.add_to_queue(runner=run_on,
249 cmd=cmds,
250 name=commandname,
251 base_dir=log_dir,
252 sbatch_options=sbatch_options,
253 prepend=slurm_prepend)
255 if isinstance(run, LocalRun):
256 run.wait()
257 if run.status == Status.ERROR: # Subprocess resulted in error
258 print(f"WARNING: Solver {self.name} execution seems to have failed!\n")
259 for i, job in enumerate(run.jobs):
260 print(f"[Job {i}] The used command was: {cmds[i]}\n"
261 "The error yielded was:\n"
262 f"\t-stdout: '{job.stdout}'\n"
263 f"\t-stderr: '{job.stderr}'\n")
264 return {"status": SolverStatus.ERROR, }
266 solver_outputs = []
267 for i, job in enumerate(run.jobs):
268 solver_cmd = cmds[i].split(" ")
269 solver_output = Solver.parse_solver_output(run.jobs[i].stdout,
270 solver_call=solver_cmd,
271 objectives=objectives,
272 verifier=self.verifier)
273 solver_outputs.append(solver_output)
274 return solver_outputs if len(solver_outputs) > 1 else solver_output
275 return run
277 def run_performance_dataframe(
278 self: Solver,
279 instances: str | list[str] | InstanceSet,
280 config_ids: str | list[str],
281 performance_dataframe: PerformanceDataFrame,
282 run_ids: list[int] | list[list[int]] = None,
283 cutoff_time: int = None,
284 objective: SparkleObjective = None,
285 train_set: InstanceSet = None,
286 sbatch_options: list[str] = None,
287 slurm_prepend: str | list[str] | Path = None,
288 dependencies: list[SlurmRun] = None,
289 log_dir: Path = None,
290 base_dir: Path = None,
291 job_name: str = None,
292 run_on: Runner = Runner.SLURM) -> Run:
293 """Run the solver from and place the results in the performance dataframe.
295 This in practice actually runs Solver.run, but has a little script before/after,
296 to read and write to the performance dataframe.
298 Args:
299 instance: The instance(s) to run the solver on. In case of an instance set,
300 or list, will create a job for all instances in the set/list.
301 config_ids: The config indices to use in the performance dataframe.
302 performance_dataframe: The performance dataframe to use.
303 run_ids: List of run ids to use. If list of list, a list of runs is given
304 per instance. Otherwise, all runs are used for each instance.
305 cutoff_time: The cutoff time for the solver, measured through RunSolver.
306 objective: The objective to use, only relevant for train set best config
307 determining
308 train_set: The training set to use. If present, will determine the best
309 configuration of the solver using these instances and run with it on
310 all instances in the instance argument.
311 sbatch_options: List of slurm batch options to use
312 slurm_prepend: Slurm script to prepend to the sbatch
313 dependencies: List of slurm runs to use as dependencies
314 log_dir: Path where to place output files. Defaults to CWD.
315 base_dir: Path where to place output files.
316 job_name: Name of the job
317 If None, will generate a name based on Solver and Instances
318 run_on: On which platform to run the jobs. Default: Slurm.
320 Returns:
321 SlurmRun or Local run of the job.
322 """
323 instances = [instances] if isinstance(instances, str) else instances
324 set_name = "instances"
325 if isinstance(instances, InstanceSet):
326 set_name = instances.name
327 instances = [str(i) for i in instances.instance_paths]
328 if not isinstance(config_ids, list):
329 config_ids = [config_ids]
330 if run_ids is None:
331 run_ids = performance_dataframe.run_ids
332 if isinstance(run_ids[0], list): # Runs per instance
333 combinations = []
334 for index, instance in enumerate(instances):
335 for run_id in run_ids[index]:
336 combinations.extend([(instance, config_id, run_id)
337 for config_id in config_ids])
338 else: # Runs for all instances
339 import itertools
340 combinations = [(instance, config_id, run_id) for instance, config_id, run_id
341 in itertools.product(instances, config_ids,
342 performance_dataframe.run_ids)]
343 objective_arg = f"--target-objective {objective.name}" if objective else ""
344 train_arg =\
345 " ".join([str(i) for i in train_set.instance_paths]) if train_set else ""
346 # We run all instances/configs/runs combinations
347 cmds = [
348 f"python3 {Solver.solver_cli} "
349 f"--solver {self.directory} "
350 f"--instance {instance} "
351 f"--configuration-id {config_id} "
352 f"--run-index {run_id} "
353 f"--performance-dataframe {performance_dataframe.csv_filepath} "
354 f"--cutoff-time {cutoff_time} "
355 f"--log-dir {log_dir} "
356 f"{objective_arg} "
357 f"{'--best-configuration-instances' if train_set else ''} {train_arg}"
358 for instance, config_id, run_id in combinations]
359 job_name = f"Run: {self.name} on {set_name}" if job_name is None else job_name
360 r = rrr.add_to_queue(
361 runner=run_on,
362 cmd=cmds,
363 name=job_name,
364 base_dir=base_dir,
365 sbatch_options=sbatch_options,
366 prepend=slurm_prepend,
367 dependencies=dependencies
368 )
369 if run_on == Runner.LOCAL:
370 r.wait()
371 return r
373 @staticmethod
374 def config_str_to_dict(config_str: str) -> dict[str, str]:
375 """Parse a configuration string to a dictionary."""
376 # First we filter the configuration of unwanted characters
377 config_str = config_str.strip().replace("-", "")
378 # Then we split the string by spaces, but conserve substrings
379 config_list = shlex.split(config_str)
380 # We return empty for empty input OR uneven input
381 if config_str == "" or config_str == r"{}" or len(config_list) & 1:
382 return {}
383 config_dict = {}
384 for index in range(0, len(config_list), 2):
385 # As the value will already be a string object, no quotes are allowed in it
386 value = config_list[index + 1].strip('"').strip("'")
387 config_dict[config_list[index]] = value
388 return config_dict
390 @staticmethod
391 def parse_solver_output(
392 solver_output: str,
393 solver_call: list[str | Path] = None,
394 objectives: list[SparkleObjective] = None,
395 verifier: verifiers.SolutionVerifier = None) -> dict[str, Any]:
396 """Parse the output of the solver.
398 Args:
399 solver_output: The output of the solver run which needs to be parsed
400 solver_call: The solver call used to run the solver
401 objectives: The objectives to apply to the solver output
402 verifier: The verifier to check the solver output
404 Returns:
405 Dictionary representing the parsed solver output
406 """
407 used_runsolver = False
408 if solver_call is not None and len(solver_call) > 2:
409 used_runsolver = True
410 parsed_output = RunSolver.get_solver_output(solver_call,
411 solver_output)
412 else:
413 parsed_output = ast.literal_eval(solver_output)
414 # cast status attribute from str to Enum
415 parsed_output["status"] = SolverStatus(parsed_output["status"])
416 # Apply objectives to parsed output, runtime based objectives added here
417 if verifier is not None and used_runsolver:
418 # Horrible hack to get the instance from the solver input
419 solver_call_str: str = " ".join(solver_call)
420 solver_input_str = solver_call_str.split(Solver._wrapper_file, maxsplit=1)[1]
421 solver_input_str = solver_input_str.split(" ", maxsplit=1)[1]
422 solver_input_str = solver_input_str[solver_input_str.index("{"):
423 solver_input_str.index("}") + 1]
424 solver_input = ast.literal_eval(solver_input_str)
425 target_instance = Path(solver_input["instance"])
426 parsed_output["status"] = verifier.verify(
427 target_instance, parsed_output, solver_call)
429 # Create objective map
430 objectives = {o.stem: o for o in objectives} if objectives else {}
431 removable_keys = ["cutoff_time"] # Keys to remove
433 # apply objectives to parsed output, runtime based objectives added here
434 for key, value in parsed_output.items():
435 if objectives and key in objectives:
436 objective = objectives[key]
437 removable_keys.append(key) # We translate it into the full name
438 else:
439 objective = resolve_objective(key)
440 # If not found in objectives, resolve to which objective the output belongs
441 if objective is None: # Could not parse, skip
442 continue
443 if objective.use_time == UseTime.NO:
444 if objective.post_process is not None:
445 parsed_output[key] = objective.post_process(value)
446 else:
447 if not used_runsolver:
448 continue
449 if objective.use_time == UseTime.CPU_TIME:
450 parsed_output[key] = parsed_output["cpu_time"]
451 else:
452 parsed_output[key] = parsed_output["wall_time"]
453 if objective.post_process is not None:
454 parsed_output[key] = objective.post_process(
455 parsed_output[key],
456 parsed_output["cutoff_time"],
457 parsed_output["status"])
459 # Replace or remove keys based on the objective names
460 for key in removable_keys:
461 if key in parsed_output:
462 if key in objectives:
463 # Map the result to the objective
464 parsed_output[objectives[key].name] = parsed_output[key]
465 if key != objectives[key].name: # Only delete actual mappings
466 del parsed_output[key]
467 else:
468 del parsed_output[key]
469 return parsed_output