Coverage for src/sparkle/solver/solver.py: 92%
214 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-15 14:11 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-15 14:11 +0000
1"""File to handle a solver and its directories."""
3from __future__ import annotations
4import sys
5from typing import Any
6import shlex
7import ast
8import json
9import random
10from pathlib import Path
12from ConfigSpace import ConfigurationSpace
14import runrunner as rrr
15from runrunner.local import LocalRun
16from runrunner.slurm import Run, SlurmRun
17from runrunner.base import Status, Runner
19from sparkle.tools.parameters import PCSConverter, PCSConvention
20from sparkle.tools import RunSolver
21from sparkle.types import SparkleCallable, SolverStatus
22from sparkle.solver import verifiers
23from sparkle.instance import InstanceSet
24from sparkle.structures import PerformanceDataFrame
25from sparkle.types import resolve_objective, SparkleObjective, UseTime
28class Solver(SparkleCallable):
29 """Class to handle a solver and its directories."""
31 meta_data = "solver_meta.txt"
32 _wrapper_file = "sparkle_solver_wrapper"
33 solver_cli = Path(__file__).parent / "solver_cli.py"
35 def __init__(
36 self: Solver,
37 directory: Path,
38 runsolver_exec: Path = None,
39 deterministic: bool = None,
40 verifier: verifiers.SolutionVerifier = None,
41 ) -> None:
42 """Initialize solver.
44 Args:
45 directory: Directory of the solver.
46 runsolver_exec: Path to the runsolver executable.
47 By default, runsolver in directory.
48 deterministic: Bool indicating determinism of the algorithm.
49 Defaults to False.
50 verifier: The solution verifier to use. If None, no verifier is used.
51 """
52 super().__init__(directory, runsolver_exec)
53 self.deterministic = deterministic
54 self.verifier = verifier
55 self._pcs_file: Path = None
56 self._interpreter: str = None
57 self._wrapper_extension: str = None
59 meta_data_file = self.directory / Solver.meta_data
60 if meta_data_file.exists():
61 meta_data = ast.literal_eval(meta_data_file.open().read())
62 # We only override the deterministic and verifier from file if not set
63 if self.deterministic is None:
64 if (
65 "deterministic" in meta_data
66 and meta_data["deterministic"] is not None
67 ):
68 self.deterministic = meta_data["deterministic"]
69 if self.verifier is None and "verifier" in meta_data:
70 if isinstance(meta_data["verifier"], tuple): # File verifier
71 self.verifier = verifiers.mapping[meta_data["verifier"][0]](
72 Path(meta_data["verifier"][1])
73 )
74 elif meta_data["verifier"] in verifiers.mapping:
75 self.verifier = verifiers.mapping[meta_data["verifier"]]
76 if self.deterministic is None: # Default to False
77 self.deterministic = False
79 def __str__(self: Solver) -> str:
80 """Return the string representation of the solver."""
81 return self.name
83 def __repr__(self: Solver) -> str:
84 """Return detailed representation of the solver."""
85 return (
86 f"{self.name}:\n"
87 f"\t- Directory: {self.directory}\n"
88 f"\t- Deterministic: {self.deterministic}\n"
89 f"\t- Verifier: {self.verifier}\n"
90 f"\t- PCS File: {self.pcs_file}\n"
91 f"\t- Wrapper: {self.wrapper}"
92 )
94 @property
95 def pcs_file(self: Solver) -> Path:
96 """Get path of the parameter file."""
97 if self._pcs_file is None:
98 files = sorted([p for p in self.directory.iterdir() if p.suffix == ".pcs"])
99 if len(files) == 0:
100 return None
101 self._pcs_file = files[0]
102 return self._pcs_file
104 @property
105 def wrapper_extension(self: Solver) -> str:
106 """Get the extension of the wrapper file."""
107 if self._wrapper_extension is None:
108 # Determine which file is the wrapper by sorting alphabetically
109 wrapper = sorted(
110 [p for p in self.directory.iterdir() if p.stem == Solver._wrapper_file]
111 )[0]
112 self._wrapper_extension = wrapper.suffix
113 return self._wrapper_extension
115 @property
116 def wrapper(self: Solver) -> str:
117 """Get name of the wrapper file."""
118 return f"{Solver._wrapper_file}{self.wrapper_extension}"
120 @property
121 def wrapper_file(self: Solver) -> Path:
122 """Get path of the wrapper file."""
123 return self.directory / self.wrapper
125 def get_pcs_file(self: Solver, port_type: PCSConvention) -> Path:
126 """Get path of the parameter file of a specific convention.
128 Args:
129 port_type: Port type of the parameter file. If None, will return the
130 file with the shortest name.
132 Returns:
133 Path to the parameter file. None if it can not be resolved.
134 """
135 pcs_files = sorted([p for p in self.directory.iterdir() if p.suffix == ".pcs"])
136 if port_type is None:
137 return pcs_files[0]
138 for file in pcs_files:
139 if port_type == PCSConverter.get_convention(file):
140 return file
141 return None
143 def read_pcs_file(self: Solver) -> bool:
144 """Checks if the pcs file can be read."""
145 # TODO: Should be a .validate method instead
146 return PCSConverter.get_convention(self.pcs_file) is not None
148 def get_configuration_space(self: Solver) -> ConfigurationSpace:
149 """Get the ConfigurationSpace of the PCS file."""
150 if not self.pcs_file:
151 return None
152 return PCSConverter.parse(self.pcs_file)
154 def port_pcs(self: Solver, port_type: PCSConvention) -> None:
155 """Port the parameter file to the given port type."""
156 target_pcs_file = (
157 self.pcs_file.parent / f"{self.pcs_file.stem}_{port_type.name}.pcs"
158 )
159 if target_pcs_file.exists(): # Already exists, possibly user defined
160 return
161 PCSConverter.export(self.get_configuration_space(), port_type, target_pcs_file)
163 def build_cmd(
164 self: Solver,
165 instance: str | list[str],
166 objectives: list[SparkleObjective],
167 seed: int,
168 cutoff_time: int = None,
169 configuration: dict = None,
170 log_dir: Path = None,
171 ) -> list[str]:
172 """Build the solver call on an instance with a configuration.
174 Args:
175 instance: Path to the instance.
176 objectives: List of sparkle objectives.
177 seed: Seed of the solver.
178 cutoff_time: Cutoff time for the solver.
179 configuration: Configuration of the solver.
180 log_dir: Directory path for logs.
182 Returns:
183 List of commands and arguments to execute the solver.
184 """
185 if configuration is None:
186 configuration = {}
187 # Ensure configuration contains required entries for each wrapper
188 configuration["solver_dir"] = str(self.directory.absolute())
189 configuration["instance"] = instance
190 configuration["seed"] = seed
191 configuration["objectives"] = ",".join([str(obj) for obj in objectives])
192 configuration["cutoff_time"] = (
193 cutoff_time if cutoff_time is not None else sys.maxsize
194 )
195 if "configuration_id" in configuration:
196 del configuration["configuration_id"]
197 # Ensure stringification of dictionary will go correctly for key value pairs
198 configuration = {key: str(configuration[key]) for key in configuration}
199 solver_cmd = [
200 str(self.directory / self.wrapper),
201 f"'{json.dumps(configuration)}'",
202 ]
203 if log_dir is None:
204 log_dir = Path()
205 if cutoff_time is not None: # Use RunSolver
206 log_path_str = instance[0] if isinstance(instance, list) else instance
207 log_name_base = f"{Path(log_path_str).name}_{self.name}"
208 return RunSolver.wrap_command(
209 self.runsolver_exec,
210 solver_cmd,
211 cutoff_time,
212 log_dir,
213 log_name_base=log_name_base,
214 )
215 return solver_cmd
217 def run(
218 self: Solver,
219 instances: str | list[str] | InstanceSet | list[InstanceSet],
220 objectives: list[SparkleObjective],
221 seed: int,
222 cutoff_time: int = None,
223 configuration: dict = None,
224 run_on: Runner = Runner.LOCAL,
225 sbatch_options: list[str] = None,
226 slurm_prepend: str | list[str] | Path = None,
227 log_dir: Path = None,
228 ) -> SlurmRun | list[dict[str, Any]] | dict[str, Any]:
229 """Run the solver on an instance with a certain configuration.
231 Args:
232 instances: The instance(s) to run the solver on, list in case of multi-file.
233 In case of an instance set, will run on all instances in the set.
234 objectives: List of sparkle objectives.
235 seed: Seed to run the solver with. Fill with abitrary int in case of
236 determnistic solver.
237 cutoff_time: The cutoff time for the solver, measured through RunSolver.
238 If None, will be executed without RunSolver.
239 configuration: The solver configuration to use. Can be empty.
240 run_on: Whether to run on slurm or locally.
241 sbatch_options: The sbatch options to use.
242 slurm_prepend: The script to prepend to a slurm script.
243 log_dir: The log directory to use.
245 Returns:
246 Solver output dict possibly with runsolver values.
247 """
248 cmds = []
249 set_label = instances.name if isinstance(instances, InstanceSet) else "instances"
250 instances = [instances] if not isinstance(instances, list) else instances
251 log_dir = Path() if log_dir is None else log_dir
253 for instance in instances:
254 paths = (
255 instance.instance_paths
256 if isinstance(instance, InstanceSet)
257 else [instance]
258 )
259 for instance_path in paths:
260 instance_path = (
261 [str(p) for p in instance_path]
262 if isinstance(instance_path, list)
263 else instance_path
264 )
265 solver_cmd = self.build_cmd(
266 instance_path,
267 objectives=objectives,
268 seed=seed,
269 cutoff_time=cutoff_time,
270 configuration=configuration,
271 log_dir=log_dir,
272 )
273 cmds.append(" ".join(solver_cmd))
275 commandname = f"Run Solver: {self.name} on {set_label}"
276 run = rrr.add_to_queue(
277 runner=run_on,
278 cmd=cmds,
279 name=commandname,
280 base_dir=log_dir,
281 sbatch_options=sbatch_options,
282 prepend=slurm_prepend,
283 )
285 if isinstance(run, LocalRun):
286 run.wait()
287 if run.status == Status.ERROR: # Subprocess resulted in error
288 print(f"WARNING: Solver {self.name} execution seems to have failed!\n")
289 for i, job in enumerate(run.jobs):
290 print(
291 f"[Job {i}] The used command was: {cmds[i]}\n"
292 "The error yielded was:\n"
293 f"\t-stdout: '{job.stdout}'\n"
294 f"\t-stderr: '{job.stderr}'\n"
295 )
296 return {
297 "status": SolverStatus.ERROR,
298 }
300 solver_outputs = []
301 for i, job in enumerate(run.jobs):
302 solver_cmd = cmds[i].split(" ")
303 solver_output = Solver.parse_solver_output(
304 run.jobs[i].stdout,
305 solver_call=solver_cmd,
306 objectives=objectives,
307 verifier=self.verifier,
308 )
309 solver_outputs.append(solver_output)
310 return solver_outputs if len(solver_outputs) > 1 else solver_output
311 return run
313 def run_performance_dataframe(
314 self: Solver,
315 instances: str | list[str] | InstanceSet,
316 performance_dataframe: PerformanceDataFrame,
317 config_ids: str | list[str] = None,
318 run_ids: list[int] | list[list[int]] = None,
319 cutoff_time: int = None,
320 objective: SparkleObjective = None,
321 train_set: InstanceSet = None,
322 sbatch_options: list[str] = None,
323 slurm_prepend: str | list[str] | Path = None,
324 dependencies: list[SlurmRun] = None,
325 log_dir: Path = None,
326 base_dir: Path = None,
327 job_name: str = None,
328 run_on: Runner = Runner.SLURM,
329 ) -> Run:
330 """Run the solver from and place the results in the performance dataframe.
332 This in practice actually runs Solver.run, but has a little script before/after,
333 to read and write to the performance dataframe.
335 Args:
336 instances: The instance(s) to run the solver on. In case of an instance set,
337 or list, will create a job for all instances in the set/list.
338 config_ids: The config indices to use in the performance dataframe.
339 performance_dataframe: The performance dataframe to use.
340 run_ids: List of run ids to use. If list of list, a list of runs is given
341 per instance. Otherwise, all runs are used for each instance.
342 cutoff_time: The cutoff time for the solver, measured through RunSolver.
343 objective: The objective to use, only relevant when determining the best
344 configuration.
345 train_set: The training set to use. If present, will determine the best
346 configuration of the solver using these instances and run with it on
347 all instances in the instance argument.
348 sbatch_options: List of slurm batch options to use
349 slurm_prepend: Slurm script to prepend to the sbatch
350 dependencies: List of slurm runs to use as dependencies
351 log_dir: Path where to place output files. Defaults to CWD.
352 base_dir: Path where to place output files.
353 job_name: Name of the job
354 If None, will generate a name based on Solver and Instances
355 run_on: On which platform to run the jobs. Default: Slurm.
357 Returns:
358 SlurmRun or Local run of the job.
359 """
360 instances = [instances] if isinstance(instances, str) else instances
361 set_name = "instances"
362 if isinstance(instances, InstanceSet):
363 set_name = instances.name
364 instances = [str(i) for i in instances.instance_paths]
365 if not isinstance(config_ids, list):
366 config_ids = [config_ids]
367 configurations = [
368 performance_dataframe.get_full_configuration(str(self.directory), config_id)
369 if config_id
370 else None
371 for config_id in config_ids
372 ]
373 if run_ids is None:
374 run_ids = performance_dataframe.run_ids
375 if isinstance(run_ids[0], list): # Runs per instance
376 combinations = []
377 for index, instance in enumerate(instances):
378 for run_id in run_ids[index]:
379 combinations.extend(
380 [
381 (instance, config_id, config, run_id)
382 for config_id, config in zip(config_ids, configurations)
383 ]
384 )
385 else: # Runs for all instances
386 import itertools
388 combinations = [
389 (instance, config_data[0], config_data[1], run_id)
390 for instance, config_data, run_id in itertools.product(
391 instances,
392 zip(config_ids, configurations),
393 performance_dataframe.run_ids,
394 )
395 ]
396 objective_arg = f"--target-objective {objective.name}" if objective else ""
397 train_arg = (
398 "--best-configuration-instances "
399 + " ".join([str(i) for i in train_set.instance_paths])
400 if train_set
401 else ""
402 )
403 configuration_args = [
404 ""
405 if not config_id and not config
406 else f"--configuration-id {config_id}"
407 if not config
408 else f"--configuration '{json.dumps(config)}'"
409 for _, config_id, config, _ in combinations
410 ]
412 # We run all instances/configs/runs combinations
413 # For each value we try to resolve from the PDF, to avoid high read loads during executions
414 cmds = [
415 f"python3 {Solver.solver_cli} "
416 f"--solver {self.directory} "
417 f"--instance {instance} "
418 f"{config_arg} "
419 # f"{'--configuration-id ' + config_id if not config else '--configuration"' + str(config) + '\"'} "
420 f"--run-index {run_id} "
421 f"--objectives {' '.join([obj.name for obj in performance_dataframe.objectives])} "
422 f"--performance-dataframe {performance_dataframe.csv_filepath} "
423 f"--cutoff-time {cutoff_time} "
424 f"--log-dir {log_dir} "
425 f"--seed {random.randint(0, 2**32 - 1)} "
426 f"{objective_arg} "
427 f"{train_arg}"
428 for (instance, _, _, run_id), config_arg in zip(
429 combinations, configuration_args
430 )
431 ]
432 job_name = f"Run: {self.name} on {set_name}" if job_name is None else job_name
433 r = rrr.add_to_queue(
434 runner=run_on,
435 cmd=cmds,
436 name=job_name,
437 base_dir=base_dir,
438 sbatch_options=sbatch_options,
439 prepend=slurm_prepend,
440 dependencies=dependencies,
441 )
442 if run_on == Runner.LOCAL:
443 r.wait()
444 return r
446 @staticmethod
447 def config_str_to_dict(config_str: str) -> dict[str, str]:
448 """Parse a configuration string to a dictionary."""
449 # First we filter the configuration of unwanted characters
450 config_str = config_str.strip().replace("-", "")
451 # Then we split the string by spaces, but conserve substrings
452 config_list = shlex.split(config_str)
453 # We return empty for empty input OR uneven input
454 if config_str == "" or config_str == r"{}" or len(config_list) & 1:
455 return {}
456 config_dict = {}
457 for index in range(0, len(config_list), 2):
458 # As the value will already be a string object, no quotes are allowed in it
459 value = config_list[index + 1].strip('"').strip("'")
460 config_dict[config_list[index]] = value
461 return config_dict
463 @staticmethod
464 def parse_solver_output(
465 solver_output: str,
466 solver_call: list[str | Path] = None,
467 objectives: list[SparkleObjective] = None,
468 verifier: verifiers.SolutionVerifier = None,
469 ) -> dict[str, Any]:
470 """Parse the output of the solver.
472 Args:
473 solver_output: The output of the solver run which needs to be parsed
474 solver_call: The solver call used to run the solver
475 objectives: The objectives to apply to the solver output
476 verifier: The verifier to check the solver output
478 Returns:
479 Dictionary representing the parsed solver output
480 """
481 used_runsolver = False
482 if solver_call is not None and len(solver_call) > 2:
483 used_runsolver = True
484 parsed_output = RunSolver.get_solver_output(solver_call, solver_output)
485 else:
486 parsed_output = ast.literal_eval(solver_output)
487 # cast status attribute from str to Enum
488 parsed_output["status"] = SolverStatus(parsed_output["status"])
489 # Apply objectives to parsed output, runtime based objectives added here
490 if verifier is not None and used_runsolver:
491 # Horrible hack to get the instance from the solver input
492 solver_call_str: str = " ".join(solver_call)
493 solver_input_str = solver_call_str.split(Solver._wrapper_file, maxsplit=1)[1]
494 solver_input_str = solver_input_str.split(" ", maxsplit=1)[1]
495 solver_input_str = solver_input_str[
496 solver_input_str.index("{") : solver_input_str.index("}") + 1
497 ]
498 solver_input = ast.literal_eval(solver_input_str)
499 target_instance = Path(solver_input["instance"])
500 parsed_output["status"] = verifier.verify(
501 target_instance, parsed_output, solver_call
502 )
504 # Create objective map
505 objectives = {o.stem: o for o in objectives} if objectives else {}
506 removable_keys = ["cutoff_time"] # Keys to remove
508 # apply objectives to parsed output, runtime based objectives added here
509 for key, value in parsed_output.items():
510 if objectives and key in objectives:
511 objective = objectives[key]
512 removable_keys.append(key) # We translate it into the full name
513 else:
514 objective = resolve_objective(key)
515 # If not found in objectives, resolve to which objective the output belongs
516 if objective is None: # Could not parse, skip
517 continue
518 if objective.use_time == UseTime.NO:
519 if objective.post_process is not None:
520 parsed_output[key] = objective.post_process(value)
521 else:
522 if not used_runsolver:
523 continue
524 if objective.use_time == UseTime.CPU_TIME:
525 parsed_output[key] = parsed_output["cpu_time"]
526 else:
527 parsed_output[key] = parsed_output["wall_time"]
528 if objective.post_process is not None:
529 parsed_output[key] = objective.post_process(
530 parsed_output[key],
531 parsed_output["cutoff_time"],
532 parsed_output["status"],
533 )
535 # Replace or remove keys based on the objective names
536 for key in removable_keys:
537 if key in parsed_output:
538 if key in objectives:
539 # Map the result to the objective
540 parsed_output[objectives[key].name] = parsed_output[key]
541 if key != objectives[key].name: # Only delete actual mappings
542 del parsed_output[key]
543 else:
544 del parsed_output[key]
545 return parsed_output