Coverage for sparkle/tools/runsolver.py: 77%

115 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-07-01 13:21 +0000

1"""Class for handling the Runsolver Wrapper.""" 

2from __future__ import annotations 

3import sys 

4import ast 

5import re 

6import warnings 

7from pathlib import Path 

8 

9from sparkle.types import SolverStatus 

10from sparkle.tools.general import get_time_pid_random_string 

11 

12 

13class RunSolver: 

14 """Class representation of RunSolver. 

15 

16 For more information see: http://www.cril.univ-artois.fr/~roussel/runsolver/ 

17 """ 

18 

19 def __init__(self: RunSolver) -> None: 

20 """Currently RunSolver has no instance specific methods or properties.""" 

21 pass 

22 

23 @staticmethod 

24 def wrap_command( 

25 runsolver_executable: Path, 

26 command: list[str], 

27 cutoff_time: int, 

28 log_directory: Path, 

29 log_name_base: str = None, 

30 raw_results_file: bool = True) -> list[str]: 

31 """Wrap a command with the RunSolver call and arguments. 

32 

33 Args: 

34 runsolver_executable: The Path to the runsolver executable. 

35 Is returned as an *absolute* path in the output. 

36 command: The command to wrap. 

37 cutoff_time: The cutoff CPU time for the solver. 

38 log_directory: The directory where to write the solver output. 

39 log_name_base: A user defined name to easily identify the logs. 

40 Defaults to "runsolver". 

41 raw_results_file: Whether to use the raw results file. 

42 

43 Returns: 

44 List of commands and arguments to execute the solver. 

45 """ 

46 # Create RunSolver Logs 

47 # --timestamp 

48 # instructs to timestamp each line of the solver standard output and 

49 # error files (which are then redirected to stdout) 

50 

51 # --use-pty 

52 # use a pseudo-terminal to collect the solver output. Currently only 

53 # available when lines are timestamped. Some I/O libraries (including 

54 # the C library) automatically flushes the output after each line when 

55 # the standard output is a terminal. There's no automatic flush when 

56 # the standard output is a pipe or a plain file. See setlinebuf() for 

57 # some details. This option instructs runsolver to use a 

58 # pseudo-terminal instead of a pipe/file to collect the solver 

59 # output. This fools the solver which will line-buffer its output. 

60 

61 # -w filename or --watcher-data filename 

62 # sends the watcher informations to filename 

63 

64 # -v filename or --var filename 

65 # save the most relevant information (times,...) 

66 # in an easy to parse VAR=VALUE file 

67 

68 # -o filename or --solver-data filename 

69 # redirects the solver output (both stdout and stderr) to filename 

70 log_name_base = "runsolver" if log_name_base is None else log_name_base 

71 unique_stamp = get_time_pid_random_string() 

72 raw_result_path = log_directory / Path(f"{log_name_base}_{unique_stamp}.rawres") 

73 watcher_data_path = raw_result_path.with_suffix(".log") 

74 var_values_path = raw_result_path.with_suffix(".val") 

75 

76 return [str(runsolver_executable.absolute()), 

77 "--timestamp", "--use-pty", 

78 "--cpu-limit", str(cutoff_time), 

79 "-w", str(watcher_data_path), 

80 "-v", str(var_values_path)] + ( 

81 ["-o", str(raw_result_path)] if raw_results_file else []) + command 

82 

83 @staticmethod 

84 def get_measurements(runsolver_values_path: Path, 

85 not_found: float = -1.0) -> tuple[float, float, float]: 

86 """Return the CPU and wallclock time reported by runsolver in values log.""" 

87 cpu_time, wall_time, memory = not_found, not_found, not_found 

88 if runsolver_values_path.exists(): 

89 with runsolver_values_path.open("r") as infile: 

90 lines = [line.strip().split("=") for line in infile.readlines() 

91 if line.count("=") == 1] 

92 for keyword, value in lines: 

93 if keyword == "WCTIME": 

94 wall_time = float(value) 

95 elif keyword == "CPUTIME": 

96 cpu_time = float(value) 

97 elif keyword == "MAXVM": 

98 memory = float(int(value) / 1024.0) # MB 

99 # Order is fixed, CPU is the last thing we want to read, so break 

100 break 

101 return cpu_time, wall_time, memory 

102 

103 @staticmethod 

104 def get_status(runsolver_values_path: Path, 

105 runsolver_raw_path: Path) -> SolverStatus: 

106 """Get run status from runsolver logs.""" 

107 if not runsolver_values_path.exists() and (runsolver_raw_path is not None 

108 and not runsolver_raw_path.exists()): 

109 # Runsolver logs were not created, job was stopped ''incorrectly'' 

110 return SolverStatus.CRASHED 

111 # First check if runsolver reported time out 

112 if runsolver_values_path.exists(): 

113 for line in reversed(runsolver_values_path.open("r").readlines()): 

114 if line.strip().startswith("TIMEOUT="): 

115 if line.strip() == "TIMEOUT=true": 

116 return SolverStatus.TIMEOUT 

117 break 

118 if runsolver_raw_path is None: 

119 return SolverStatus.UNKNOWN 

120 if not runsolver_raw_path.exists(): 

121 # Runsolver log was not created, job was stopped ''incorrectly'' 

122 return SolverStatus.KILLED 

123 # Last line of runsolver log should contain the raw sparkle solver wrapper output 

124 runsolver_raw_contents = runsolver_raw_path.open("r").read().strip() 

125 # cutoff_time = 

126 sparkle_wrapper_dict_str = runsolver_raw_contents.splitlines()[-1] 

127 solver_regex_filter = re.findall("{.*}", sparkle_wrapper_dict_str)[0] 

128 output_dict = ast.literal_eval(solver_regex_filter) 

129 status = SolverStatus(output_dict["status"]) 

130 # if status == SolverStatus.CRASHED and cpu_time > cutoff_time 

131 return status 

132 

133 @staticmethod 

134 def get_solver_args(runsolver_log_path: Path) -> str: 

135 """Retrieves solver arguments dict from runsolver log.""" 

136 if runsolver_log_path.exists(): 

137 for line in runsolver_log_path.open("r").readlines(): 

138 if line.startswith("command line:"): 

139 call = line.split("sparkle_solver_wrapper", 1)[1] 

140 # Suffix is still there 

141 return call.split(" ", 1)[1].strip() 

142 return "" 

143 

144 @staticmethod 

145 def get_solver_output(runsolver_configuration: list[str | Path], 

146 process_output: str) -> dict[str, str | object]: 

147 """Decode solver output dictionary when called with runsolver.""" 

148 solver_input = None 

149 solver_output = None 

150 value_data_file = None 

151 cutoff_time = sys.maxsize 

152 for idx, conf in enumerate(runsolver_configuration): 

153 if not isinstance(conf, str): 

154 # Looking for arg names 

155 continue 

156 conf = conf.strip() 

157 if conf == "-o" or conf == "--solver-data": 

158 # solver output was redirected 

159 solver_data_file = Path(runsolver_configuration[idx + 1]) 

160 if solver_data_file.exists(): 

161 solver_output = solver_data_file.open("r").read() 

162 elif process_output is None: 

163 warnings.warn("[RunSolver] Could not find Solver output file: " 

164 f"{solver_data_file}") 

165 if "-v" in conf or "--var" in conf: 

166 value_data_file = Path(runsolver_configuration[idx + 1]) 

167 if "--cpu-limit" in conf: 

168 cutoff_time = float(runsolver_configuration[idx + 1]) 

169 if "-w" in conf or "--watcher-data" in conf: 

170 watch_file = Path(runsolver_configuration[idx + 1]) 

171 args_str = RunSolver.get_solver_args(watch_file) 

172 if args_str == "": # Could not find log file or args 

173 continue 

174 solver_input = re.findall("{.*}", args_str)[0] 

175 solver_input = ast.literal_eval(solver_input) 

176 cutoff_time = float(solver_input["cutoff_time"]) 

177 

178 if solver_output is None: 

179 # Still empty, try to read from subprocess 

180 solver_output = process_output 

181 # Format output to only the brackets (dict) 

182 try: 

183 # The solver output can be found on the last non empty line 

184 solver_output = [s for s in solver_output.splitlines() if s.strip()][-1] 

185 solver_regex_filter = re.findall("{.*}", solver_output)[0] 

186 output_dict = ast.literal_eval(solver_regex_filter) 

187 except Exception as ex: 

188 config_str = " ".join([str(c) for c in runsolver_configuration]) 

189 warnings.warn("Solver output decoding failed from RunSolver configuration:\n" 

190 f"'{config_str}'\n" 

191 f"Output: {solver_output}\n" 

192 f"Exception: {ex}\n" 

193 "Setting status to 'UNKNOWN'.", 

194 category=RuntimeWarning) 

195 output_dict = {"status": SolverStatus.UNKNOWN} 

196 

197 output_dict["cutoff_time"] = cutoff_time 

198 if value_data_file is not None: 

199 cpu_time, wall_time, memory = RunSolver.get_measurements(value_data_file) 

200 output_dict["cpu_time"] = cpu_time 

201 output_dict["wall_time"] = wall_time 

202 output_dict["memory"] = memory 

203 else: # Could not retrieve cpu and wall time (log does not exist) 

204 output_dict["cpu_time"], output_dict["wall_time"] = -1.0, -1.0 

205 if output_dict["cpu_time"] > cutoff_time: 

206 output_dict["status"] = SolverStatus.TIMEOUT 

207 # Add the missing objectives (runtime based) 

208 if solver_input is not None and "objectives" in solver_input: 

209 objectives = solver_input["objectives"].split(",") 

210 for o_name in objectives: 

211 if o_name not in output_dict: 

212 output_dict[o_name] = None 

213 return output_dict