Coverage for sparkle/tools/runsolver.py: 77%
115 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-07-01 13:21 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-07-01 13:21 +0000
1"""Class for handling the Runsolver Wrapper."""
2from __future__ import annotations
3import sys
4import ast
5import re
6import warnings
7from pathlib import Path
9from sparkle.types import SolverStatus
10from sparkle.tools.general import get_time_pid_random_string
13class RunSolver:
14 """Class representation of RunSolver.
16 For more information see: http://www.cril.univ-artois.fr/~roussel/runsolver/
17 """
19 def __init__(self: RunSolver) -> None:
20 """Currently RunSolver has no instance specific methods or properties."""
21 pass
23 @staticmethod
24 def wrap_command(
25 runsolver_executable: Path,
26 command: list[str],
27 cutoff_time: int,
28 log_directory: Path,
29 log_name_base: str = None,
30 raw_results_file: bool = True) -> list[str]:
31 """Wrap a command with the RunSolver call and arguments.
33 Args:
34 runsolver_executable: The Path to the runsolver executable.
35 Is returned as an *absolute* path in the output.
36 command: The command to wrap.
37 cutoff_time: The cutoff CPU time for the solver.
38 log_directory: The directory where to write the solver output.
39 log_name_base: A user defined name to easily identify the logs.
40 Defaults to "runsolver".
41 raw_results_file: Whether to use the raw results file.
43 Returns:
44 List of commands and arguments to execute the solver.
45 """
46 # Create RunSolver Logs
47 # --timestamp
48 # instructs to timestamp each line of the solver standard output and
49 # error files (which are then redirected to stdout)
51 # --use-pty
52 # use a pseudo-terminal to collect the solver output. Currently only
53 # available when lines are timestamped. Some I/O libraries (including
54 # the C library) automatically flushes the output after each line when
55 # the standard output is a terminal. There's no automatic flush when
56 # the standard output is a pipe or a plain file. See setlinebuf() for
57 # some details. This option instructs runsolver to use a
58 # pseudo-terminal instead of a pipe/file to collect the solver
59 # output. This fools the solver which will line-buffer its output.
61 # -w filename or --watcher-data filename
62 # sends the watcher informations to filename
64 # -v filename or --var filename
65 # save the most relevant information (times,...)
66 # in an easy to parse VAR=VALUE file
68 # -o filename or --solver-data filename
69 # redirects the solver output (both stdout and stderr) to filename
70 log_name_base = "runsolver" if log_name_base is None else log_name_base
71 unique_stamp = get_time_pid_random_string()
72 raw_result_path = log_directory / Path(f"{log_name_base}_{unique_stamp}.rawres")
73 watcher_data_path = raw_result_path.with_suffix(".log")
74 var_values_path = raw_result_path.with_suffix(".val")
76 return [str(runsolver_executable.absolute()),
77 "--timestamp", "--use-pty",
78 "--cpu-limit", str(cutoff_time),
79 "-w", str(watcher_data_path),
80 "-v", str(var_values_path)] + (
81 ["-o", str(raw_result_path)] if raw_results_file else []) + command
83 @staticmethod
84 def get_measurements(runsolver_values_path: Path,
85 not_found: float = -1.0) -> tuple[float, float, float]:
86 """Return the CPU and wallclock time reported by runsolver in values log."""
87 cpu_time, wall_time, memory = not_found, not_found, not_found
88 if runsolver_values_path.exists():
89 with runsolver_values_path.open("r") as infile:
90 lines = [line.strip().split("=") for line in infile.readlines()
91 if line.count("=") == 1]
92 for keyword, value in lines:
93 if keyword == "WCTIME":
94 wall_time = float(value)
95 elif keyword == "CPUTIME":
96 cpu_time = float(value)
97 elif keyword == "MAXVM":
98 memory = float(int(value) / 1024.0) # MB
99 # Order is fixed, CPU is the last thing we want to read, so break
100 break
101 return cpu_time, wall_time, memory
103 @staticmethod
104 def get_status(runsolver_values_path: Path,
105 runsolver_raw_path: Path) -> SolverStatus:
106 """Get run status from runsolver logs."""
107 if not runsolver_values_path.exists() and (runsolver_raw_path is not None
108 and not runsolver_raw_path.exists()):
109 # Runsolver logs were not created, job was stopped ''incorrectly''
110 return SolverStatus.CRASHED
111 # First check if runsolver reported time out
112 if runsolver_values_path.exists():
113 for line in reversed(runsolver_values_path.open("r").readlines()):
114 if line.strip().startswith("TIMEOUT="):
115 if line.strip() == "TIMEOUT=true":
116 return SolverStatus.TIMEOUT
117 break
118 if runsolver_raw_path is None:
119 return SolverStatus.UNKNOWN
120 if not runsolver_raw_path.exists():
121 # Runsolver log was not created, job was stopped ''incorrectly''
122 return SolverStatus.KILLED
123 # Last line of runsolver log should contain the raw sparkle solver wrapper output
124 runsolver_raw_contents = runsolver_raw_path.open("r").read().strip()
125 # cutoff_time =
126 sparkle_wrapper_dict_str = runsolver_raw_contents.splitlines()[-1]
127 solver_regex_filter = re.findall("{.*}", sparkle_wrapper_dict_str)[0]
128 output_dict = ast.literal_eval(solver_regex_filter)
129 status = SolverStatus(output_dict["status"])
130 # if status == SolverStatus.CRASHED and cpu_time > cutoff_time
131 return status
133 @staticmethod
134 def get_solver_args(runsolver_log_path: Path) -> str:
135 """Retrieves solver arguments dict from runsolver log."""
136 if runsolver_log_path.exists():
137 for line in runsolver_log_path.open("r").readlines():
138 if line.startswith("command line:"):
139 call = line.split("sparkle_solver_wrapper", 1)[1]
140 # Suffix is still there
141 return call.split(" ", 1)[1].strip()
142 return ""
144 @staticmethod
145 def get_solver_output(runsolver_configuration: list[str | Path],
146 process_output: str) -> dict[str, str | object]:
147 """Decode solver output dictionary when called with runsolver."""
148 solver_input = None
149 solver_output = None
150 value_data_file = None
151 cutoff_time = sys.maxsize
152 for idx, conf in enumerate(runsolver_configuration):
153 if not isinstance(conf, str):
154 # Looking for arg names
155 continue
156 conf = conf.strip()
157 if conf == "-o" or conf == "--solver-data":
158 # solver output was redirected
159 solver_data_file = Path(runsolver_configuration[idx + 1])
160 if solver_data_file.exists():
161 solver_output = solver_data_file.open("r").read()
162 elif process_output is None:
163 warnings.warn("[RunSolver] Could not find Solver output file: "
164 f"{solver_data_file}")
165 if "-v" in conf or "--var" in conf:
166 value_data_file = Path(runsolver_configuration[idx + 1])
167 if "--cpu-limit" in conf:
168 cutoff_time = float(runsolver_configuration[idx + 1])
169 if "-w" in conf or "--watcher-data" in conf:
170 watch_file = Path(runsolver_configuration[idx + 1])
171 args_str = RunSolver.get_solver_args(watch_file)
172 if args_str == "": # Could not find log file or args
173 continue
174 solver_input = re.findall("{.*}", args_str)[0]
175 solver_input = ast.literal_eval(solver_input)
176 cutoff_time = float(solver_input["cutoff_time"])
178 if solver_output is None:
179 # Still empty, try to read from subprocess
180 solver_output = process_output
181 # Format output to only the brackets (dict)
182 try:
183 # The solver output can be found on the last non empty line
184 solver_output = [s for s in solver_output.splitlines() if s.strip()][-1]
185 solver_regex_filter = re.findall("{.*}", solver_output)[0]
186 output_dict = ast.literal_eval(solver_regex_filter)
187 except Exception as ex:
188 config_str = " ".join([str(c) for c in runsolver_configuration])
189 warnings.warn("Solver output decoding failed from RunSolver configuration:\n"
190 f"'{config_str}'\n"
191 f"Output: {solver_output}\n"
192 f"Exception: {ex}\n"
193 "Setting status to 'UNKNOWN'.",
194 category=RuntimeWarning)
195 output_dict = {"status": SolverStatus.UNKNOWN}
197 output_dict["cutoff_time"] = cutoff_time
198 if value_data_file is not None:
199 cpu_time, wall_time, memory = RunSolver.get_measurements(value_data_file)
200 output_dict["cpu_time"] = cpu_time
201 output_dict["wall_time"] = wall_time
202 output_dict["memory"] = memory
203 else: # Could not retrieve cpu and wall time (log does not exist)
204 output_dict["cpu_time"], output_dict["wall_time"] = -1.0, -1.0
205 if output_dict["cpu_time"] > cutoff_time:
206 output_dict["status"] = SolverStatus.TIMEOUT
207 # Add the missing objectives (runtime based)
208 if solver_input is not None and "objectives" in solver_input:
209 objectives = solver_input["objectives"].split(",")
210 for o_name in objectives:
211 if o_name not in output_dict:
212 output_dict[o_name] = None
213 return output_dict