Coverage for sparkle/solver/solver.py: 92%
214 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-29 10:17 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-29 10:17 +0000
1"""File to handle a solver and its directories."""
3from __future__ import annotations
4import sys
5from typing import Any
6import shlex
7import ast
8import json
9import random
10from pathlib import Path
12from ConfigSpace import ConfigurationSpace
14import runrunner as rrr
15from runrunner.local import LocalRun
16from runrunner.slurm import Run, SlurmRun
17from runrunner.base import Status, Runner
19from sparkle.tools.parameters import PCSConverter, PCSConvention
20from sparkle.tools import RunSolver
21from sparkle.types import SparkleCallable, SolverStatus
22from sparkle.solver import verifiers
23from sparkle.instance import InstanceSet
24from sparkle.structures import PerformanceDataFrame
25from sparkle.types import resolve_objective, SparkleObjective, UseTime
28class Solver(SparkleCallable):
29 """Class to handle a solver and its directories."""
31 meta_data = "solver_meta.txt"
32 _wrapper_file = "sparkle_solver_wrapper"
33 solver_cli = Path(__file__).parent / "solver_cli.py"
35 def __init__(
36 self: Solver,
37 directory: Path,
38 runsolver_exec: Path = None,
39 deterministic: bool = None,
40 verifier: verifiers.SolutionVerifier = None,
41 ) -> None:
42 """Initialize solver.
44 Args:
45 directory: Directory of the solver.
46 runsolver_exec: Path to the runsolver executable.
47 By default, runsolver in directory.
48 deterministic: Bool indicating determinism of the algorithm.
49 Defaults to False.
50 verifier: The solution verifier to use. If None, no verifier is used.
51 """
52 super().__init__(directory, runsolver_exec)
53 self.deterministic = deterministic
54 self.verifier = verifier
55 self._pcs_file: Path = None
56 self._interpreter: str = None
57 self._wrapper_extension: str = None
59 meta_data_file = self.directory / Solver.meta_data
60 if meta_data_file.exists():
61 meta_data = ast.literal_eval(meta_data_file.open().read())
62 # We only override the deterministic and verifier from file if not set
63 if self.deterministic is None:
64 if (
65 "deterministic" in meta_data
66 and meta_data["deterministic"] is not None
67 ):
68 self.deterministic = meta_data["deterministic"]
69 if self.verifier is None and "verifier" in meta_data:
70 if isinstance(meta_data["verifier"], tuple): # File verifier
71 self.verifier = verifiers.mapping[meta_data["verifier"][0]](
72 Path(meta_data["verifier"][1])
73 )
74 elif meta_data["verifier"] in verifiers.mapping:
75 self.verifier = verifiers.mapping[meta_data["verifier"]]
76 if self.deterministic is None: # Default to False
77 self.deterministic = False
79 def __str__(self: Solver) -> str:
80 """Return the string representation of the solver."""
81 return self.name
83 def __repr__(self: Solver) -> str:
84 """Return detailed representation of the solver."""
85 return (
86 f"{self.name}:\n"
87 f"\t- Directory: {self.directory}\n"
88 f"\t- Deterministic: {self.deterministic}\n"
89 f"\t- Verifier: {self.verifier}\n"
90 f"\t- PCS File: {self.pcs_file}\n"
91 f"\t- Wrapper: {self.wrapper}"
92 )
94 @property
95 def pcs_file(self: Solver) -> Path:
96 """Get path of the parameter file."""
97 if self._pcs_file is None:
98 files = sorted([p for p in self.directory.iterdir() if p.suffix == ".pcs"])
99 if len(files) == 0:
100 return None
101 self._pcs_file = files[0]
102 return self._pcs_file
104 @property
105 def wrapper_extension(self: Solver) -> str:
106 """Get the extension of the wrapper file."""
107 if self._wrapper_extension is None:
108 # Determine which file is the wrapper by sorting alphabetically
109 wrapper = sorted(
110 [p for p in self.directory.iterdir() if p.stem == Solver._wrapper_file]
111 )[0]
112 self._wrapper_extension = wrapper.suffix
113 return self._wrapper_extension
115 @property
116 def wrapper(self: Solver) -> str:
117 """Get name of the wrapper file."""
118 return f"{Solver._wrapper_file}{self.wrapper_extension}"
120 @property
121 def wrapper_file(self: Solver) -> Path:
122 """Get path of the wrapper file."""
123 return self.directory / self.wrapper
125 def get_pcs_file(self: Solver, port_type: PCSConvention) -> Path:
126 """Get path of the parameter file of a specific convention.
128 Args:
129 port_type: Port type of the parameter file. If None, will return the
130 file with the shortest name.
132 Returns:
133 Path to the parameter file. None if it can not be resolved.
134 """
135 pcs_files = sorted([p for p in self.directory.iterdir() if p.suffix == ".pcs"])
136 if port_type is None:
137 return pcs_files[0]
138 for file in pcs_files:
139 if port_type == PCSConverter.get_convention(file):
140 return file
141 return None
143 def read_pcs_file(self: Solver) -> bool:
144 """Checks if the pcs file can be read."""
145 # TODO: Should be a .validate method instead
146 return PCSConverter.get_convention(self.pcs_file) is not None
148 def get_configuration_space(self: Solver) -> ConfigurationSpace:
149 """Get the ConfigurationSpace of the PCS file."""
150 if not self.pcs_file:
151 return None
152 return PCSConverter.parse(self.pcs_file)
154 def port_pcs(self: Solver, port_type: PCSConvention) -> None:
155 """Port the parameter file to the given port type."""
156 target_pcs_file = (
157 self.pcs_file.parent / f"{self.pcs_file.stem}_{port_type.name}.pcs"
158 )
159 if target_pcs_file.exists(): # Already exists, possibly user defined
160 return
161 PCSConverter.export(self.get_configuration_space(), port_type, target_pcs_file)
163 def build_cmd(
164 self: Solver,
165 instance: str | list[str],
166 objectives: list[SparkleObjective],
167 seed: int,
168 cutoff_time: int = None,
169 configuration: dict = None,
170 log_dir: Path = None,
171 ) -> list[str]:
172 """Build the solver call on an instance with a configuration.
174 Args:
175 instance: Path to the instance.
176 objectives: List of sparkle objectives.
177 seed: Seed of the solver.
178 cutoff_time: Cutoff time for the solver.
179 configuration: Configuration of the solver.
180 log_dir: Directory path for logs.
182 Returns:
183 List of commands and arguments to execute the solver.
184 """
185 if configuration is None:
186 configuration = {}
187 # Ensure configuration contains required entries for each wrapper
188 configuration["solver_dir"] = str(self.directory.absolute())
189 configuration["instance"] = instance
190 configuration["seed"] = seed
191 configuration["objectives"] = ",".join([str(obj) for obj in objectives])
192 configuration["cutoff_time"] = (
193 cutoff_time if cutoff_time is not None else sys.maxsize
194 )
195 if "configuration_id" in configuration:
196 del configuration["configuration_id"]
197 # Ensure stringification of dictionary will go correctly for key value pairs
198 configuration = {key: str(configuration[key]) for key in configuration}
199 solver_cmd = [
200 str(self.directory / self.wrapper),
201 f"'{json.dumps(configuration)}'",
202 ]
203 if log_dir is None:
204 log_dir = Path()
205 if cutoff_time is not None: # Use RunSolver
206 log_path_str = instance[0] if isinstance(instance, list) else instance
207 log_name_base = f"{Path(log_path_str).name}_{self.name}"
208 return RunSolver.wrap_command(
209 self.runsolver_exec,
210 solver_cmd,
211 cutoff_time,
212 log_dir,
213 log_name_base=log_name_base,
214 )
215 return solver_cmd
217 def run(
218 self: Solver,
219 instances: str | list[str] | InstanceSet | list[InstanceSet],
220 objectives: list[SparkleObjective],
221 seed: int,
222 cutoff_time: int = None,
223 configuration: dict = None,
224 run_on: Runner = Runner.LOCAL,
225 sbatch_options: list[str] = None,
226 slurm_prepend: str | list[str] | Path = None,
227 log_dir: Path = None,
228 ) -> SlurmRun | list[dict[str, Any]] | dict[str, Any]:
229 """Run the solver on an instance with a certain configuration.
231 Args:
232 instances: The instance(s) to run the solver on, list in case of multi-file.
233 In case of an instance set, will run on all instances in the set.
234 objectives: List of sparkle objectives.
235 seed: Seed to run the solver with. Fill with abitrary int in case of
236 determnistic solver.
237 cutoff_time: The cutoff time for the solver, measured through RunSolver.
238 If None, will be executed without RunSolver.
239 configuration: The solver configuration to use. Can be empty.
240 run_on: Whether to run on slurm or locally.
241 sbatch_options: The sbatch options to use.
242 slurm_prepend: The script to prepend to a slurm script.
243 log_dir: The log directory to use.
245 Returns:
246 Solver output dict possibly with runsolver values.
247 """
248 cmds = []
249 set_label = instances.name if isinstance(instances, InstanceSet) else "instances"
250 instances = [instances] if not isinstance(instances, list) else instances
251 log_dir = Path() if log_dir is None else log_dir
252 for instance in instances:
253 paths = (
254 instance.instance_paths
255 if isinstance(instance, InstanceSet)
256 else [instance]
257 )
258 for instance_path in paths:
259 instance_path = (
260 [str(p) for p in instance_path]
261 if isinstance(instance_path, list)
262 else instance_path
263 )
264 solver_cmd = self.build_cmd(
265 instance_path,
266 objectives=objectives,
267 seed=seed,
268 cutoff_time=cutoff_time,
269 configuration=configuration,
270 log_dir=log_dir,
271 )
272 cmds.append(" ".join(solver_cmd))
274 commandname = f"Run Solver: {self.name} on {set_label}"
275 run = rrr.add_to_queue(
276 runner=run_on,
277 cmd=cmds,
278 name=commandname,
279 base_dir=log_dir,
280 sbatch_options=sbatch_options,
281 prepend=slurm_prepend,
282 )
284 if isinstance(run, LocalRun):
285 run.wait()
286 if run.status == Status.ERROR: # Subprocess resulted in error
287 print(f"WARNING: Solver {self.name} execution seems to have failed!\n")
288 for i, job in enumerate(run.jobs):
289 print(
290 f"[Job {i}] The used command was: {cmds[i]}\n"
291 "The error yielded was:\n"
292 f"\t-stdout: '{job.stdout}'\n"
293 f"\t-stderr: '{job.stderr}'\n"
294 )
295 return {
296 "status": SolverStatus.ERROR,
297 }
299 solver_outputs = []
300 for i, job in enumerate(run.jobs):
301 solver_cmd = cmds[i].split(" ")
302 solver_output = Solver.parse_solver_output(
303 run.jobs[i].stdout,
304 solver_call=solver_cmd,
305 objectives=objectives,
306 verifier=self.verifier,
307 )
308 solver_outputs.append(solver_output)
309 return solver_outputs if len(solver_outputs) > 1 else solver_output
310 return run
312 def run_performance_dataframe(
313 self: Solver,
314 instances: str | list[str] | InstanceSet,
315 performance_dataframe: PerformanceDataFrame,
316 config_ids: str | list[str] = None,
317 run_ids: list[int] | list[list[int]] = None,
318 cutoff_time: int = None,
319 objective: SparkleObjective = None,
320 train_set: InstanceSet = None,
321 sbatch_options: list[str] = None,
322 slurm_prepend: str | list[str] | Path = None,
323 dependencies: list[SlurmRun] = None,
324 log_dir: Path = None,
325 base_dir: Path = None,
326 job_name: str = None,
327 run_on: Runner = Runner.SLURM,
328 ) -> Run:
329 """Run the solver from and place the results in the performance dataframe.
331 This in practice actually runs Solver.run, but has a little script before/after,
332 to read and write to the performance dataframe.
334 Args:
335 instances: The instance(s) to run the solver on. In case of an instance set,
336 or list, will create a job for all instances in the set/list.
337 config_ids: The config indices to use in the performance dataframe.
338 performance_dataframe: The performance dataframe to use.
339 run_ids: List of run ids to use. If list of list, a list of runs is given
340 per instance. Otherwise, all runs are used for each instance.
341 cutoff_time: The cutoff time for the solver, measured through RunSolver.
342 objective: The objective to use, only relevant when determining the best
343 configuration.
344 train_set: The training set to use. If present, will determine the best
345 configuration of the solver using these instances and run with it on
346 all instances in the instance argument.
347 sbatch_options: List of slurm batch options to use
348 slurm_prepend: Slurm script to prepend to the sbatch
349 dependencies: List of slurm runs to use as dependencies
350 log_dir: Path where to place output files. Defaults to CWD.
351 base_dir: Path where to place output files.
352 job_name: Name of the job
353 If None, will generate a name based on Solver and Instances
354 run_on: On which platform to run the jobs. Default: Slurm.
356 Returns:
357 SlurmRun or Local run of the job.
358 """
359 instances = [instances] if isinstance(instances, str) else instances
360 set_name = "instances"
361 if isinstance(instances, InstanceSet):
362 set_name = instances.name
363 instances = [str(i) for i in instances.instance_paths]
364 if not isinstance(config_ids, list):
365 config_ids = [config_ids]
366 configurations = [
367 performance_dataframe.get_full_configuration(str(self.directory), config_id)
368 if config_id
369 else None
370 for config_id in config_ids
371 ]
372 if run_ids is None:
373 run_ids = performance_dataframe.run_ids
374 if isinstance(run_ids[0], list): # Runs per instance
375 combinations = []
376 for index, instance in enumerate(instances):
377 for run_id in run_ids[index]:
378 combinations.extend(
379 [
380 (instance, config_id, config, run_id)
381 for config_id, config in zip(config_ids, configurations)
382 ]
383 )
384 else: # Runs for all instances
385 import itertools
387 combinations = [
388 (instance, config_data[0], config_data[1], run_id)
389 for instance, config_data, run_id in itertools.product(
390 instances,
391 zip(config_ids, configurations),
392 performance_dataframe.run_ids,
393 )
394 ]
395 objective_arg = f"--target-objective {objective.name}" if objective else ""
396 train_arg = (
397 "--best-configuration-instances "
398 + " ".join([str(i) for i in train_set.instance_paths])
399 if train_set
400 else ""
401 )
402 configuration_args = [
403 ""
404 if not config_id and not config
405 else f"--configuration-id {config_id}"
406 if not config
407 else f"--configuration '{json.dumps(config)}'"
408 for _, config_id, config, _ in combinations
409 ]
411 # We run all instances/configs/runs combinations
412 # For each value we try to resolve from the PDF, to avoid high read loads during executions
413 cmds = [
414 f"python3 {Solver.solver_cli} "
415 f"--solver {self.directory} "
416 f"--instance {instance} "
417 f"{config_arg} "
418 # f"{'--configuration-id ' + config_id if not config else '--configuration"' + str(config) + '\"'} "
419 f"--run-index {run_id} "
420 f"--objectives {' '.join([obj.name for obj in performance_dataframe.objectives])} "
421 f"--performance-dataframe {performance_dataframe.csv_filepath} "
422 f"--cutoff-time {cutoff_time} "
423 f"--log-dir {log_dir} "
424 f"--seed {random.randint(0, 2**32 - 1)} "
425 f"{objective_arg} "
426 f"{train_arg}"
427 for (instance, _, _, run_id), config_arg in zip(
428 combinations, configuration_args
429 )
430 ]
431 job_name = f"Run: {self.name} on {set_name}" if job_name is None else job_name
432 r = rrr.add_to_queue(
433 runner=run_on,
434 cmd=cmds,
435 name=job_name,
436 base_dir=base_dir,
437 sbatch_options=sbatch_options,
438 prepend=slurm_prepend,
439 dependencies=dependencies,
440 )
441 if run_on == Runner.LOCAL:
442 r.wait()
443 return r
445 @staticmethod
446 def config_str_to_dict(config_str: str) -> dict[str, str]:
447 """Parse a configuration string to a dictionary."""
448 # First we filter the configuration of unwanted characters
449 config_str = config_str.strip().replace("-", "")
450 # Then we split the string by spaces, but conserve substrings
451 config_list = shlex.split(config_str)
452 # We return empty for empty input OR uneven input
453 if config_str == "" or config_str == r"{}" or len(config_list) & 1:
454 return {}
455 config_dict = {}
456 for index in range(0, len(config_list), 2):
457 # As the value will already be a string object, no quotes are allowed in it
458 value = config_list[index + 1].strip('"').strip("'")
459 config_dict[config_list[index]] = value
460 return config_dict
462 @staticmethod
463 def parse_solver_output(
464 solver_output: str,
465 solver_call: list[str | Path] = None,
466 objectives: list[SparkleObjective] = None,
467 verifier: verifiers.SolutionVerifier = None,
468 ) -> dict[str, Any]:
469 """Parse the output of the solver.
471 Args:
472 solver_output: The output of the solver run which needs to be parsed
473 solver_call: The solver call used to run the solver
474 objectives: The objectives to apply to the solver output
475 verifier: The verifier to check the solver output
477 Returns:
478 Dictionary representing the parsed solver output
479 """
480 used_runsolver = False
481 if solver_call is not None and len(solver_call) > 2:
482 used_runsolver = True
483 parsed_output = RunSolver.get_solver_output(solver_call, solver_output)
484 else:
485 parsed_output = ast.literal_eval(solver_output)
486 # cast status attribute from str to Enum
487 parsed_output["status"] = SolverStatus(parsed_output["status"])
488 # Apply objectives to parsed output, runtime based objectives added here
489 if verifier is not None and used_runsolver:
490 # Horrible hack to get the instance from the solver input
491 solver_call_str: str = " ".join(solver_call)
492 solver_input_str = solver_call_str.split(Solver._wrapper_file, maxsplit=1)[1]
493 solver_input_str = solver_input_str.split(" ", maxsplit=1)[1]
494 solver_input_str = solver_input_str[
495 solver_input_str.index("{") : solver_input_str.index("}") + 1
496 ]
497 solver_input = ast.literal_eval(solver_input_str)
498 target_instance = Path(solver_input["instance"])
499 parsed_output["status"] = verifier.verify(
500 target_instance, parsed_output, solver_call
501 )
503 # Create objective map
504 objectives = {o.stem: o for o in objectives} if objectives else {}
505 removable_keys = ["cutoff_time"] # Keys to remove
507 # apply objectives to parsed output, runtime based objectives added here
508 for key, value in parsed_output.items():
509 if objectives and key in objectives:
510 objective = objectives[key]
511 removable_keys.append(key) # We translate it into the full name
512 else:
513 objective = resolve_objective(key)
514 # If not found in objectives, resolve to which objective the output belongs
515 if objective is None: # Could not parse, skip
516 continue
517 if objective.use_time == UseTime.NO:
518 if objective.post_process is not None:
519 parsed_output[key] = objective.post_process(value)
520 else:
521 if not used_runsolver:
522 continue
523 if objective.use_time == UseTime.CPU_TIME:
524 parsed_output[key] = parsed_output["cpu_time"]
525 else:
526 parsed_output[key] = parsed_output["wall_time"]
527 if objective.post_process is not None:
528 parsed_output[key] = objective.post_process(
529 parsed_output[key],
530 parsed_output["cutoff_time"],
531 parsed_output["status"],
532 )
534 # Replace or remove keys based on the objective names
535 for key in removable_keys:
536 if key in parsed_output:
537 if key in objectives:
538 # Map the result to the objective
539 parsed_output[objectives[key].name] = parsed_output[key]
540 if key != objectives[key].name: # Only delete actual mappings
541 del parsed_output[key]
542 else:
543 del parsed_output[key]
544 return parsed_output