Coverage for sparkle/tools/runsolver.py: 77%
115 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-29 10:17 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-29 10:17 +0000
1"""Class for handling the Runsolver Wrapper."""
3from __future__ import annotations
4import sys
5import ast
6import re
7import warnings
8from pathlib import Path
10from sparkle.types import SolverStatus
11from sparkle.tools.general import get_time_pid_random_string
14class RunSolver:
15 """Class representation of RunSolver.
17 For more information see: http://www.cril.univ-artois.fr/~roussel/runsolver/
18 """
20 def __init__(self: RunSolver) -> None:
21 """Currently RunSolver has no instance specific methods or properties."""
22 pass
24 @staticmethod
25 def wrap_command(
26 runsolver_executable: Path,
27 command: list[str],
28 cutoff_time: int,
29 log_directory: Path,
30 log_name_base: str = None,
31 raw_results_file: bool = True,
32 ) -> list[str]:
33 """Wrap a command with the RunSolver call and arguments.
35 Args:
36 runsolver_executable: The Path to the runsolver executable.
37 Is returned as an *absolute* path in the output.
38 command: The command to wrap.
39 cutoff_time: The cutoff CPU time for the solver.
40 log_directory: The directory where to write the solver output.
41 log_name_base: A user defined name to easily identify the logs.
42 Defaults to "runsolver".
43 raw_results_file: Whether to use the raw results file.
45 Returns:
46 List of commands and arguments to execute the solver.
47 """
48 # Create RunSolver Logs
49 # --timestamp
50 # instructs to timestamp each line of the solver standard output and
51 # error files (which are then redirected to stdout)
53 # --use-pty
54 # use a pseudo-terminal to collect the solver output. Currently only
55 # available when lines are timestamped. Some I/O libraries (including
56 # the C library) automatically flushes the output after each line when
57 # the standard output is a terminal. There's no automatic flush when
58 # the standard output is a pipe or a plain file. See setlinebuf() for
59 # some details. This option instructs runsolver to use a
60 # pseudo-terminal instead of a pipe/file to collect the solver
61 # output. This fools the solver which will line-buffer its output.
63 # -w filename or --watcher-data filename
64 # sends the watcher informations to filename
66 # -v filename or --var filename
67 # save the most relevant information (times,...)
68 # in an easy to parse VAR=VALUE file
70 # -o filename or --solver-data filename
71 # redirects the solver output (both stdout and stderr) to filename
72 log_name_base = "runsolver" if log_name_base is None else log_name_base
73 unique_stamp = get_time_pid_random_string()
74 raw_result_path = log_directory / Path(f"{log_name_base}_{unique_stamp}.rawres")
75 watcher_data_path = raw_result_path.with_suffix(".log")
76 var_values_path = raw_result_path.with_suffix(".val")
78 return (
79 [
80 str(runsolver_executable.absolute()),
81 "--timestamp",
82 "--use-pty",
83 "--cpu-limit",
84 str(cutoff_time),
85 "-w",
86 str(watcher_data_path),
87 "-v",
88 str(var_values_path),
89 ]
90 + (["-o", str(raw_result_path)] if raw_results_file else [])
91 + command
92 )
94 @staticmethod
95 def get_measurements(
96 runsolver_values_path: Path, not_found: float = -1.0
97 ) -> tuple[float, float, float]:
98 """Return the CPU and wallclock time reported by runsolver in values log."""
99 cpu_time, wall_time, memory = not_found, not_found, not_found
100 if runsolver_values_path.exists():
101 with runsolver_values_path.open("r") as infile:
102 lines = [
103 line.strip().split("=")
104 for line in infile.readlines()
105 if line.count("=") == 1
106 ]
107 for keyword, value in lines:
108 if keyword == "WCTIME":
109 wall_time = float(value)
110 elif keyword == "CPUTIME":
111 cpu_time = float(value)
112 elif keyword == "MAXVM":
113 memory = float(int(value) / 1024.0) # MB
114 # Order is fixed, CPU is the last thing we want to read, so break
115 break
116 return cpu_time, wall_time, memory
118 @staticmethod
119 def get_status(
120 runsolver_values_path: Path, runsolver_raw_path: Path
121 ) -> SolverStatus:
122 """Get run status from runsolver logs."""
123 if not runsolver_values_path.exists() and (
124 runsolver_raw_path is not None and not runsolver_raw_path.exists()
125 ):
126 # Runsolver logs were not created, job was stopped ''incorrectly''
127 return SolverStatus.CRASHED
128 # First check if runsolver reported time out
129 if runsolver_values_path.exists():
130 for line in reversed(runsolver_values_path.open("r").readlines()):
131 if line.strip().startswith("TIMEOUT="):
132 if line.strip() == "TIMEOUT=true":
133 return SolverStatus.TIMEOUT
134 break
135 if runsolver_raw_path is None:
136 return SolverStatus.UNKNOWN
137 if not runsolver_raw_path.exists():
138 # Runsolver log was not created, job was stopped ''incorrectly''
139 return SolverStatus.KILLED
140 # Last line of runsolver log should contain the raw sparkle solver wrapper output
141 runsolver_raw_contents = runsolver_raw_path.open("r").read().strip()
142 # cutoff_time =
143 sparkle_wrapper_dict_str = runsolver_raw_contents.splitlines()[-1]
144 solver_regex_filter = re.findall("{.*}", sparkle_wrapper_dict_str)[0]
145 output_dict = ast.literal_eval(solver_regex_filter)
146 status = SolverStatus(output_dict["status"])
147 # if status == SolverStatus.CRASHED and cpu_time > cutoff_time
148 return status
150 @staticmethod
151 def get_solver_args(runsolver_log_path: Path) -> str:
152 """Retrieves solver arguments dict from runsolver log."""
153 if runsolver_log_path.exists():
154 for line in runsolver_log_path.open("r").readlines():
155 if line.startswith("command line:"):
156 call = line.split("sparkle_solver_wrapper", 1)[1]
157 # Suffix is still there
158 return call.split(" ", 1)[1].strip()
159 return ""
161 @staticmethod
162 def get_solver_output(
163 runsolver_configuration: list[str | Path], process_output: str
164 ) -> dict[str, str | object]:
165 """Decode solver output dictionary when called with runsolver."""
166 solver_input = None
167 solver_output = None
168 value_data_file = None
169 cutoff_time = sys.maxsize
170 for idx, conf in enumerate(runsolver_configuration):
171 if not isinstance(conf, str):
172 # Looking for arg names
173 continue
174 conf = conf.strip()
175 if conf == "-o" or conf == "--solver-data":
176 # solver output was redirected
177 solver_data_file = Path(runsolver_configuration[idx + 1])
178 if solver_data_file.exists():
179 solver_output = solver_data_file.open("r").read()
180 elif process_output is None:
181 warnings.warn(
182 "[RunSolver] Could not find Solver output file: "
183 f"{solver_data_file}"
184 )
185 if "-v" in conf or "--var" in conf:
186 value_data_file = Path(runsolver_configuration[idx + 1])
187 if "--cpu-limit" in conf:
188 cutoff_time = float(runsolver_configuration[idx + 1])
189 if "-w" in conf or "--watcher-data" in conf:
190 watch_file = Path(runsolver_configuration[idx + 1])
191 args_str = RunSolver.get_solver_args(watch_file)
192 if args_str == "": # Could not find log file or args
193 continue
194 solver_input = re.findall("{.*}", args_str)[0]
195 solver_input = ast.literal_eval(solver_input)
196 cutoff_time = float(solver_input["cutoff_time"])
198 if solver_output is None:
199 # Still empty, try to read from subprocess
200 solver_output = process_output
201 # Format output to only the brackets (dict)
202 try:
203 # The solver output can be found on the last non empty line
204 solver_output = [s for s in solver_output.splitlines() if s.strip()][-1]
205 solver_regex_filter = re.findall("{.*}", solver_output)[0]
206 output_dict = ast.literal_eval(solver_regex_filter)
207 except Exception as ex:
208 config_str = " ".join([str(c) for c in runsolver_configuration])
209 warnings.warn(
210 "Solver output decoding failed from RunSolver configuration:\n"
211 f"'{config_str}'\n"
212 f"Output: {solver_output}\n"
213 f"Exception: {ex}\n"
214 "Setting status to 'UNKNOWN'.",
215 category=RuntimeWarning,
216 )
217 output_dict = {"status": SolverStatus.UNKNOWN}
219 output_dict["cutoff_time"] = cutoff_time
220 if value_data_file is not None:
221 cpu_time, wall_time, memory = RunSolver.get_measurements(value_data_file)
222 output_dict["cpu_time"] = cpu_time
223 output_dict["wall_time"] = wall_time
224 output_dict["memory"] = memory
225 else: # Could not retrieve cpu and wall time (log does not exist)
226 output_dict["cpu_time"], output_dict["wall_time"] = -1.0, -1.0
227 if output_dict["cpu_time"] > cutoff_time:
228 output_dict["status"] = SolverStatus.TIMEOUT
229 # Add the missing objectives (runtime based)
230 if solver_input is not None and "objectives" in solver_input:
231 objectives = solver_input["objectives"].split(",")
232 for o_name in objectives:
233 if o_name not in output_dict:
234 output_dict[o_name] = None
235 return output_dict