Coverage for sparkle/solver/solver.py: 36%
146 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-09-27 09:10 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-09-27 09:10 +0000
1"""File to handle a solver and its directories."""
3from __future__ import annotations
4import sys
5from typing import Any
6import shlex
7import ast
8import json
9from pathlib import Path
11import runrunner as rrr
12from runrunner.local import LocalRun
13from runrunner.slurm import SlurmRun
14from runrunner.base import Status, Runner
16from sparkle.tools import runsolver_parsing, general as tg
17from sparkle.tools import pcsparser
18from sparkle.types import SparkleCallable, SolverStatus
19from sparkle.solver.verifier import SolutionVerifier
20from sparkle.instance import InstanceSet
21from sparkle.types import resolve_objective, SparkleObjective, UseTime
24class Solver(SparkleCallable):
25 """Class to handle a solver and its directories."""
26 meta_data = "solver_meta.txt"
27 wrapper = "sparkle_solver_wrapper.py"
29 def __init__(self: Solver,
30 directory: Path,
31 raw_output_directory: Path = None,
32 runsolver_exec: Path = None,
33 deterministic: bool = None,
34 verifier: SolutionVerifier = None) -> None:
35 """Initialize solver.
37 Args:
38 directory: Directory of the solver.
39 raw_output_directory: Directory where solver will write its raw output.
40 Defaults to directory / tmp
41 runsolver_exec: Path to the runsolver executable.
42 By default, runsolver in directory.
43 deterministic: Bool indicating determinism of the algorithm.
44 Defaults to False.
45 verifier: The solution verifier to use. If None, no verifier is used.
46 """
47 super().__init__(directory, runsolver_exec, raw_output_directory)
48 self.deterministic = deterministic
49 self.verifier = verifier
50 self.meta_data_file = self.directory / Solver.meta_data
52 if self.raw_output_directory is None:
53 self.raw_output_directory = self.directory / "tmp"
54 self.raw_output_directory.mkdir(exist_ok=True)
55 if self.runsolver_exec is None:
56 self.runsolver_exec = self.directory / "runsolver"
57 if not self.meta_data_file.exists():
58 self.meta_data_file = None
59 if self.deterministic is None:
60 if self.meta_data_file is not None:
61 # Read the parameter from file
62 meta_dict = ast.literal_eval(self.meta_data_file.open().read())
63 self.deterministic = meta_dict["deterministic"]
64 else:
65 self.deterministic = False
67 def _get_pcs_file(self: Solver) -> Path | bool:
68 """Get path of the parameter file.
70 Returns:
71 Path to the parameter file or False if the parameter file does not exist.
72 """
73 pcs_files = [p for p in self.directory.iterdir() if p.suffix == ".pcs"]
74 if len(pcs_files) != 1:
75 # We only consider one PCS file per solver
76 return False
77 return pcs_files[0]
79 def get_pcs_file(self: Solver) -> Path:
80 """Get path of the parameter file.
82 Returns:
83 Path to the parameter file. None if it can not be resolved.
84 """
85 if not (file_path := self._get_pcs_file()):
86 return None
87 return file_path
89 def read_pcs_file(self: Solver) -> bool:
90 """Checks if the pcs file can be read."""
91 pcs_file = self._get_pcs_file()
92 try:
93 parser = pcsparser.PCSParser()
94 parser.load(str(pcs_file), convention="smac")
95 return True
96 except SyntaxError:
97 pass
98 return False
100 def get_pcs(self: Solver) -> dict[str, tuple[str, str, str]]:
101 """Get the parameter content of the PCS file."""
102 if not (pcs_file := self.get_pcs_file()):
103 return None
104 parser = pcsparser.PCSParser()
105 parser.load(str(pcs_file), convention="smac")
106 return [p for p in parser.pcs.params if p["type"] == "parameter"]
108 def build_cmd(self: Solver,
109 instance: str | list[str],
110 objectives: list[SparkleObjective],
111 seed: int,
112 cutoff_time: int = None,
113 configuration: dict = None) -> list[str]:
114 """Build the solver call on an instance with a configuration.
116 Args:
117 instance: Path to the instance.
118 seed: Seed of the solver.
119 cutoff_time: Cutoff time for the solver.
120 configuration: Configuration of the solver.
122 Returns:
123 List of commands and arguments to execute the solver.
124 """
125 if configuration is None:
126 configuration = {}
127 # Ensure configuration contains required entries for each wrapper
128 configuration["solver_dir"] = str(self.directory.absolute())
129 configuration["instance"] = instance
130 configuration["seed"] = seed
131 configuration["objectives"] = ",".join([str(obj) for obj in objectives])
132 if cutoff_time is not None: # Use RunSolver
133 configuration["cutoff_time"] = cutoff_time
134 # Create RunSolver Logs
135 # --timestamp
136 # instructs to timestamp each line of the solver standard output and
137 # error files (which are then redirected to stdout)
139 # --use-pty
140 # use a pseudo-terminal to collect the solver output. Currently only
141 # available when lines are timestamped. Some I/O libraries (including
142 # the C library) automatically flushes the output after each line when
143 # the standard output is a terminal. There's no automatic flush when
144 # the standard output is a pipe or a plain file. See setlinebuf() for
145 # some details. This option instructs runsolver to use a
146 # pseudo-terminal instead of a pipe/file to collect the solver
147 # output. This fools the solver which will line-buffer its output.
149 # -w filename or --watcher-data filename
150 # sends the watcher informations to filename
152 # -v filename or --var filename
153 # save the most relevant information (times,...)
154 # in an easy to parse VAR=VALUE file
156 # -o filename or --solver-data filename
157 # redirects the solver output (both stdout and stderr) to filename
158 inst_name = Path(instance).name
159 raw_result_path =\
160 Path(f"{self.name}_{inst_name}_{tg.get_time_pid_random_string()}.rawres")
161 runsolver_watch_data_path = raw_result_path.with_suffix(".log")
162 runsolver_values_path = raw_result_path.with_suffix(".val")
164 solver_cmd = [str(self.runsolver_exec.absolute()),
165 "--timestamp", "--use-pty",
166 "--cpu-limit", str(cutoff_time),
167 "-w", str(runsolver_watch_data_path),
168 "-v", str(runsolver_values_path),
169 "-o", str(raw_result_path)]
170 else:
171 configuration["cutoff_time"] = sys.maxsize
172 solver_cmd = []
174 # Ensure stringification of dictionary will go correctly for key value pairs
175 configuration = {key: str(configuration[key]) for key in configuration}
176 solver_cmd += [str((self.directory / Solver.wrapper).absolute()),
177 f"'{json.dumps(configuration)}'"]
178 return solver_cmd
180 def run(self: Solver,
181 instance: str | list[str] | InstanceSet,
182 objectives: list[SparkleObjective],
183 seed: int,
184 cutoff_time: int = None,
185 configuration: dict = None,
186 run_on: Runner = Runner.LOCAL,
187 commandname: str = "run_solver",
188 sbatch_options: list[str] = None,
189 cwd: Path = None) -> SlurmRun | list[dict[str, Any]] | dict[str, Any]:
190 """Run the solver on an instance with a certain configuration.
192 Args:
193 instance: The instance(s) to run the solver on, list in case of multi-file.
194 In case of an instance set, will run on all instances in the set.
195 seed: Seed to run the solver with. Fill with abitrary int in case of
196 determnistic solver.
197 cutoff_time: The cutoff time for the solver, measured through RunSolver.
198 If None, will be executed without RunSolver.
199 configuration: The solver configuration to use. Can be empty.
200 cwd: Path where to execute. Defaults to self.raw_output_directory.
202 Returns:
203 Solver output dict possibly with runsolver values.
204 """
205 if cwd is None:
206 cwd = self.raw_output_directory
207 cmds = []
208 if isinstance(instance, InstanceSet):
209 for inst in instance.instance_paths:
210 solver_cmd = self.build_cmd(inst.absolute(),
211 objectives=objectives,
212 seed=seed,
213 cutoff_time=cutoff_time,
214 configuration=configuration)
215 cmds.append(" ".join(solver_cmd))
216 else:
217 solver_cmd = self.build_cmd(instance,
218 objectives=objectives,
219 seed=seed,
220 cutoff_time=cutoff_time,
221 configuration=configuration)
222 cmds.append(" ".join(solver_cmd))
223 run = rrr.add_to_queue(runner=run_on,
224 cmd=cmds,
225 name=commandname,
226 base_dir=cwd,
227 path=cwd,
228 sbatch_options=sbatch_options)
230 if isinstance(run, LocalRun):
231 run.wait()
232 # Subprocess resulted in error
233 if run.status == Status.ERROR:
234 print(f"WARNING: Solver {self.name} execution seems to have failed!\n")
235 for i, job in enumerate(run.jobs):
236 print(f"[Job {i}] The used command was: {cmds[i]}\n"
237 "The error yielded was:\n"
238 f"\t-stdout: '{run.jobs[0]._process.stdout}'\n"
239 f"\t-stderr: '{run.jobs[0]._process.stderr}'\n")
240 return {"status": SolverStatus.ERROR, }
242 solver_outputs = []
243 for i, job in enumerate(run.jobs):
244 solver_cmd = cmds[i].split(" ")
245 runsolver_configuration = None
246 if solver_cmd[0] == str(self.runsolver_exec.absolute()):
247 runsolver_configuration = solver_cmd[:11]
248 solver_output = Solver.parse_solver_output(run.jobs[i].stdout,
249 runsolver_configuration,
250 cwd)
251 if self.verifier is not None:
252 solver_output["status"] = self.verifier.verifiy(
253 instance, Path(runsolver_configuration[-1]))
254 solver_outputs.append(solver_output)
255 return solver_outputs if len(solver_outputs) > 1 else solver_output
256 return run
258 @staticmethod
259 def config_str_to_dict(config_str: str) -> dict[str, str]:
260 """Parse a configuration string to a dictionary."""
261 # First we filter the configuration of unwanted characters
262 config_str = config_str.strip().replace("-", "")
263 # Then we split the string by spaces, but conserve substrings
264 config_list = shlex.split(config_str)
265 # We return empty for empty input OR uneven input
266 if config_str == "" or config_str == r"{}" or len(config_list) & 1:
267 return {}
268 config_dict = {}
269 for index in range(0, len(config_list), 2):
270 # As the value will already be a string object, no quotes are allowed in it
271 value = config_list[index + 1].strip('"').strip("'")
272 config_dict[config_list[index]] = value
273 return config_dict
275 @staticmethod
276 def parse_solver_output(solver_output: str,
277 runsolver_configuration: list[str] = None,
278 cwd: Path = None) -> dict[str, Any]:
279 """Parse the output of the solver.
281 Args:
282 solver_output: The output of the solver run which needs to be parsed
283 runsolver_configuration: The runsolver configuration to wrap the solver
284 with. If runsolver was not used this should be None.
285 cwd: Path where to execute. Defaults to self.raw_output_directory.
287 Returns:
288 Dictionary representing the parsed solver output
289 """
290 if runsolver_configuration is not None:
291 parsed_output = runsolver_parsing.get_solver_output(runsolver_configuration,
292 solver_output,
293 cwd)
294 else:
295 parsed_output = ast.literal_eval(solver_output)
297 # cast status attribute from str to Enum
298 parsed_output["status"] = SolverStatus(parsed_output["status"])
299 # apply objectives to parsed output, runtime based objectives added here
300 for key, value in parsed_output.items():
301 if key == "status":
302 continue
303 objective = resolve_objective(key)
304 if objective is None:
305 continue
306 if objective.use_time == UseTime.NO:
307 if objective.post_process is not None:
308 parsed_output[objective] = objective.post_process(value)
309 else:
310 if runsolver_configuration is None:
311 continue
312 if objective.use_time == UseTime.CPU_TIME:
313 parsed_output[key] = parsed_output["cpu_time"]
314 else:
315 parsed_output[key] = parsed_output["wall_time"]
316 if objective.post_process is not None:
317 parsed_output[key] = objective.post_process(
318 parsed_output[key], parsed_output["cutoff_time"])
319 if "cutoff_time" in parsed_output:
320 del parsed_output["cutoff_time"]
321 return parsed_output