Coverage for sparkle/tools/runsolver.py: 74%
111 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-07 15:22 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-07 15:22 +0000
1"""Class for handling the Runsolver Wrapper."""
2from __future__ import annotations
3import sys
4import ast
5import re
6import warnings
7from pathlib import Path
9from sparkle.types import SolverStatus
10from sparkle.tools.general import get_time_pid_random_string
13class RunSolver:
14 """Class representation of RunSolver.
16 For more information see: http://www.cril.univ-artois.fr/~roussel/runsolver/
17 """
19 def __init__(self: RunSolver) -> None:
20 """Currently RunSolver has no instance specific methods or properties."""
21 pass
23 @staticmethod
24 def wrap_command(
25 runsolver_executable: Path,
26 command: list[str],
27 cutoff_time: int,
28 log_directory: Path,
29 log_name_base: str = None,
30 raw_results_file: bool = True) -> list[str]:
31 """Wrap a command with the RunSolver call and arguments.
33 Args:
34 runsolver_executable: The Path to the runsolver executable.
35 Is returned as an *absolute* path in the output.
36 command: The command to wrap.
37 cutoff_time: The cutoff CPU time for the solver.
38 log_directory: The directory where to write the solver output.
39 log_name_base: A user defined name to easily identify the logs.
40 Defaults to "runsolver".
41 raw_results_file: Whether to use the raw results file.
43 Returns:
44 List of commands and arguments to execute the solver.
45 """
46 # Create RunSolver Logs
47 # --timestamp
48 # instructs to timestamp each line of the solver standard output and
49 # error files (which are then redirected to stdout)
51 # --use-pty
52 # use a pseudo-terminal to collect the solver output. Currently only
53 # available when lines are timestamped. Some I/O libraries (including
54 # the C library) automatically flushes the output after each line when
55 # the standard output is a terminal. There's no automatic flush when
56 # the standard output is a pipe or a plain file. See setlinebuf() for
57 # some details. This option instructs runsolver to use a
58 # pseudo-terminal instead of a pipe/file to collect the solver
59 # output. This fools the solver which will line-buffer its output.
61 # -w filename or --watcher-data filename
62 # sends the watcher informations to filename
64 # -v filename or --var filename
65 # save the most relevant information (times,...)
66 # in an easy to parse VAR=VALUE file
68 # -o filename or --solver-data filename
69 # redirects the solver output (both stdout and stderr) to filename
70 log_name_base = "runsolver" if log_name_base is None else log_name_base
71 unique_stamp = get_time_pid_random_string()
72 raw_result_path = log_directory / Path(f"{log_name_base}_{unique_stamp}.rawres")
73 watcher_data_path = raw_result_path.with_suffix(".log")
74 var_values_path = raw_result_path.with_suffix(".val")
76 return [str(runsolver_executable.absolute()),
77 "--timestamp", "--use-pty",
78 "--cpu-limit", str(cutoff_time),
79 "-w", str(watcher_data_path),
80 "-v", str(var_values_path)] + (
81 ["-o", str(raw_result_path)] if raw_results_file else []) + command
83 @staticmethod
84 def get_measurements(runsolver_values_path: Path,
85 not_found: float = -1.0) -> tuple[float, float, float]:
86 """Return the CPU and wallclock time reported by runsolver in values log."""
87 cpu_time, wall_time, memory = not_found, not_found, not_found
88 if runsolver_values_path.exists():
89 with runsolver_values_path.open("r") as infile:
90 lines = [line.strip().split("=") for line in infile.readlines()
91 if line.count("=") == 1]
92 for keyword, value in lines:
93 if keyword == "WCTIME":
94 wall_time = float(value)
95 elif keyword == "CPUTIME":
96 cpu_time = float(value)
97 elif keyword == "MAXVM":
98 memory = float(int(value) / 1024.0) # MB
99 # Order is fixed, CPU is the last thing we want to read, so break
100 break
101 return cpu_time, wall_time, memory
103 @staticmethod
104 def get_status(runsolver_values_path: Path,
105 runsolver_raw_path: Path) -> SolverStatus:
106 """Get run status from runsolver logs."""
107 if not runsolver_values_path.exists() and (runsolver_raw_path is not None
108 and not runsolver_raw_path.exists()):
109 # Runsolver logs were not created, job was stopped ''incorrectly''
110 return SolverStatus.CRASHED
111 # First check if runsolver reported time out
112 if runsolver_values_path.exists():
113 for line in reversed(runsolver_values_path.open("r").readlines()):
114 if line.strip().startswith("TIMEOUT="):
115 if line.strip() == "TIMEOUT=true":
116 return SolverStatus.TIMEOUT
117 break
118 if runsolver_raw_path is None:
119 return SolverStatus.UNKNOWN
120 if not runsolver_raw_path.exists():
121 # Runsolver log was not created, job was stopped ''incorrectly''
122 return SolverStatus.KILLED
123 # Last line of runsolver log should contain the raw sparkle solver wrapper output
124 runsolver_raw_contents = runsolver_raw_path.open("r").read().strip()
125 # cutoff_time =
126 sparkle_wrapper_dict_str = runsolver_raw_contents.splitlines()[-1]
127 solver_regex_filter = re.findall("{.*}", sparkle_wrapper_dict_str)[0]
128 output_dict = ast.literal_eval(solver_regex_filter)
129 status = SolverStatus(output_dict["status"])
130 # if status == SolverStatus.CRASHED and cpu_time > cutoff_time
131 return status
133 @staticmethod
134 def get_solver_args(runsolver_log_path: Path) -> str:
135 """Retrieves solver arguments dict from runsolver log."""
136 if runsolver_log_path.exists():
137 for line in runsolver_log_path.open("r").readlines():
138 if line.startswith("command line:"):
139 return (line.split("sparkle_solver_wrapper.py", 1)[1]
140 .strip().strip("'"))
141 return ""
143 @staticmethod
144 def get_solver_output(runsolver_configuration: list[str | Path],
145 process_output: str) -> dict[str, str | object]:
146 """Decode solver output dictionary when called with runsolver."""
147 solver_input = None
148 solver_output = None
149 value_data_file = None
150 cutoff_time = sys.maxsize
151 for idx, conf in enumerate(runsolver_configuration):
152 if not isinstance(conf, str):
153 # Looking for arg names
154 continue
155 conf = conf.strip()
156 if conf == "-o" or conf == "--solver-data":
157 # solver output was redirected
158 solver_data_file = Path(runsolver_configuration[idx + 1])
159 if solver_data_file.exists():
160 solver_output = solver_data_file.open("r").read()
161 if "-v" in conf or "--var" in conf:
162 value_data_file = Path(runsolver_configuration[idx + 1])
163 if "--cpu-limit" in conf:
164 cutoff_time = float(runsolver_configuration[idx + 1])
165 if "-w" in conf or "--watcher-data" in conf:
166 watch_file = Path(runsolver_configuration[idx + 1])
167 args_str = RunSolver.get_solver_args(watch_file)
168 if args_str == "": # Could not find log file or args
169 continue
170 solver_input = re.findall("{.*}", args_str)[0]
171 solver_input = ast.literal_eval(solver_input)
172 cutoff_time = float(solver_input["cutoff_time"])
174 if solver_output is None:
175 # Still empty, try to read from subprocess
176 solver_output = process_output
177 # Format output to only the brackets (dict)
178 # NOTE: It should have only one match, do we want some error logging here?
179 try:
180 solver_regex_filter = re.findall("{.*}", solver_output)[0]
181 output_dict = ast.literal_eval(solver_regex_filter)
182 except Exception:
183 config_str = " ".join([str(c) for c in runsolver_configuration])
184 warnings.warn("Solver output decoding failed from RunSolver configuration: "
185 f"'{config_str}'. Setting status to 'UNKNOWN'.",
186 category=RuntimeWarning)
187 output_dict = {"status": SolverStatus.UNKNOWN}
189 output_dict["cutoff_time"] = cutoff_time
190 if value_data_file is not None:
191 cpu_time, wall_time, memory = RunSolver.get_measurements(value_data_file)
192 output_dict["cpu_time"] = cpu_time
193 output_dict["wall_time"] = wall_time
194 output_dict["memory"] = memory
195 else: # Could not retrieve cpu and wall time (log does not exist)
196 output_dict["cpu_time"], output_dict["wall_time"] = -1.0, -1.0
197 if output_dict["cpu_time"] > cutoff_time:
198 output_dict["status"] = SolverStatus.TIMEOUT
199 # Add the missing objectives (runtime based)
200 if solver_input is not None and "objectives" in solver_input:
201 objectives = solver_input["objectives"].split(",")
202 for o_name in objectives:
203 if o_name not in output_dict:
204 output_dict[o_name] = None
205 return output_dict