Coverage for sparkle/tools/runsolver.py: 77%

115 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-09-29 10:17 +0000

1"""Class for handling the Runsolver Wrapper.""" 

2 

3from __future__ import annotations 

4import sys 

5import ast 

6import re 

7import warnings 

8from pathlib import Path 

9 

10from sparkle.types import SolverStatus 

11from sparkle.tools.general import get_time_pid_random_string 

12 

13 

14class RunSolver: 

15 """Class representation of RunSolver. 

16 

17 For more information see: http://www.cril.univ-artois.fr/~roussel/runsolver/ 

18 """ 

19 

20 def __init__(self: RunSolver) -> None: 

21 """Currently RunSolver has no instance specific methods or properties.""" 

22 pass 

23 

24 @staticmethod 

25 def wrap_command( 

26 runsolver_executable: Path, 

27 command: list[str], 

28 cutoff_time: int, 

29 log_directory: Path, 

30 log_name_base: str = None, 

31 raw_results_file: bool = True, 

32 ) -> list[str]: 

33 """Wrap a command with the RunSolver call and arguments. 

34 

35 Args: 

36 runsolver_executable: The Path to the runsolver executable. 

37 Is returned as an *absolute* path in the output. 

38 command: The command to wrap. 

39 cutoff_time: The cutoff CPU time for the solver. 

40 log_directory: The directory where to write the solver output. 

41 log_name_base: A user defined name to easily identify the logs. 

42 Defaults to "runsolver". 

43 raw_results_file: Whether to use the raw results file. 

44 

45 Returns: 

46 List of commands and arguments to execute the solver. 

47 """ 

48 # Create RunSolver Logs 

49 # --timestamp 

50 # instructs to timestamp each line of the solver standard output and 

51 # error files (which are then redirected to stdout) 

52 

53 # --use-pty 

54 # use a pseudo-terminal to collect the solver output. Currently only 

55 # available when lines are timestamped. Some I/O libraries (including 

56 # the C library) automatically flushes the output after each line when 

57 # the standard output is a terminal. There's no automatic flush when 

58 # the standard output is a pipe or a plain file. See setlinebuf() for 

59 # some details. This option instructs runsolver to use a 

60 # pseudo-terminal instead of a pipe/file to collect the solver 

61 # output. This fools the solver which will line-buffer its output. 

62 

63 # -w filename or --watcher-data filename 

64 # sends the watcher informations to filename 

65 

66 # -v filename or --var filename 

67 # save the most relevant information (times,...) 

68 # in an easy to parse VAR=VALUE file 

69 

70 # -o filename or --solver-data filename 

71 # redirects the solver output (both stdout and stderr) to filename 

72 log_name_base = "runsolver" if log_name_base is None else log_name_base 

73 unique_stamp = get_time_pid_random_string() 

74 raw_result_path = log_directory / Path(f"{log_name_base}_{unique_stamp}.rawres") 

75 watcher_data_path = raw_result_path.with_suffix(".log") 

76 var_values_path = raw_result_path.with_suffix(".val") 

77 

78 return ( 

79 [ 

80 str(runsolver_executable.absolute()), 

81 "--timestamp", 

82 "--use-pty", 

83 "--cpu-limit", 

84 str(cutoff_time), 

85 "-w", 

86 str(watcher_data_path), 

87 "-v", 

88 str(var_values_path), 

89 ] 

90 + (["-o", str(raw_result_path)] if raw_results_file else []) 

91 + command 

92 ) 

93 

94 @staticmethod 

95 def get_measurements( 

96 runsolver_values_path: Path, not_found: float = -1.0 

97 ) -> tuple[float, float, float]: 

98 """Return the CPU and wallclock time reported by runsolver in values log.""" 

99 cpu_time, wall_time, memory = not_found, not_found, not_found 

100 if runsolver_values_path.exists(): 

101 with runsolver_values_path.open("r") as infile: 

102 lines = [ 

103 line.strip().split("=") 

104 for line in infile.readlines() 

105 if line.count("=") == 1 

106 ] 

107 for keyword, value in lines: 

108 if keyword == "WCTIME": 

109 wall_time = float(value) 

110 elif keyword == "CPUTIME": 

111 cpu_time = float(value) 

112 elif keyword == "MAXVM": 

113 memory = float(int(value) / 1024.0) # MB 

114 # Order is fixed, CPU is the last thing we want to read, so break 

115 break 

116 return cpu_time, wall_time, memory 

117 

118 @staticmethod 

119 def get_status( 

120 runsolver_values_path: Path, runsolver_raw_path: Path 

121 ) -> SolverStatus: 

122 """Get run status from runsolver logs.""" 

123 if not runsolver_values_path.exists() and ( 

124 runsolver_raw_path is not None and not runsolver_raw_path.exists() 

125 ): 

126 # Runsolver logs were not created, job was stopped ''incorrectly'' 

127 return SolverStatus.CRASHED 

128 # First check if runsolver reported time out 

129 if runsolver_values_path.exists(): 

130 for line in reversed(runsolver_values_path.open("r").readlines()): 

131 if line.strip().startswith("TIMEOUT="): 

132 if line.strip() == "TIMEOUT=true": 

133 return SolverStatus.TIMEOUT 

134 break 

135 if runsolver_raw_path is None: 

136 return SolverStatus.UNKNOWN 

137 if not runsolver_raw_path.exists(): 

138 # Runsolver log was not created, job was stopped ''incorrectly'' 

139 return SolverStatus.KILLED 

140 # Last line of runsolver log should contain the raw sparkle solver wrapper output 

141 runsolver_raw_contents = runsolver_raw_path.open("r").read().strip() 

142 # cutoff_time = 

143 sparkle_wrapper_dict_str = runsolver_raw_contents.splitlines()[-1] 

144 solver_regex_filter = re.findall("{.*}", sparkle_wrapper_dict_str)[0] 

145 output_dict = ast.literal_eval(solver_regex_filter) 

146 status = SolverStatus(output_dict["status"]) 

147 # if status == SolverStatus.CRASHED and cpu_time > cutoff_time 

148 return status 

149 

150 @staticmethod 

151 def get_solver_args(runsolver_log_path: Path) -> str: 

152 """Retrieves solver arguments dict from runsolver log.""" 

153 if runsolver_log_path.exists(): 

154 for line in runsolver_log_path.open("r").readlines(): 

155 if line.startswith("command line:"): 

156 call = line.split("sparkle_solver_wrapper", 1)[1] 

157 # Suffix is still there 

158 return call.split(" ", 1)[1].strip() 

159 return "" 

160 

161 @staticmethod 

162 def get_solver_output( 

163 runsolver_configuration: list[str | Path], process_output: str 

164 ) -> dict[str, str | object]: 

165 """Decode solver output dictionary when called with runsolver.""" 

166 solver_input = None 

167 solver_output = None 

168 value_data_file = None 

169 cutoff_time = sys.maxsize 

170 for idx, conf in enumerate(runsolver_configuration): 

171 if not isinstance(conf, str): 

172 # Looking for arg names 

173 continue 

174 conf = conf.strip() 

175 if conf == "-o" or conf == "--solver-data": 

176 # solver output was redirected 

177 solver_data_file = Path(runsolver_configuration[idx + 1]) 

178 if solver_data_file.exists(): 

179 solver_output = solver_data_file.open("r").read() 

180 elif process_output is None: 

181 warnings.warn( 

182 "[RunSolver] Could not find Solver output file: " 

183 f"{solver_data_file}" 

184 ) 

185 if "-v" in conf or "--var" in conf: 

186 value_data_file = Path(runsolver_configuration[idx + 1]) 

187 if "--cpu-limit" in conf: 

188 cutoff_time = float(runsolver_configuration[idx + 1]) 

189 if "-w" in conf or "--watcher-data" in conf: 

190 watch_file = Path(runsolver_configuration[idx + 1]) 

191 args_str = RunSolver.get_solver_args(watch_file) 

192 if args_str == "": # Could not find log file or args 

193 continue 

194 solver_input = re.findall("{.*}", args_str)[0] 

195 solver_input = ast.literal_eval(solver_input) 

196 cutoff_time = float(solver_input["cutoff_time"]) 

197 

198 if solver_output is None: 

199 # Still empty, try to read from subprocess 

200 solver_output = process_output 

201 # Format output to only the brackets (dict) 

202 try: 

203 # The solver output can be found on the last non empty line 

204 solver_output = [s for s in solver_output.splitlines() if s.strip()][-1] 

205 solver_regex_filter = re.findall("{.*}", solver_output)[0] 

206 output_dict = ast.literal_eval(solver_regex_filter) 

207 except Exception as ex: 

208 config_str = " ".join([str(c) for c in runsolver_configuration]) 

209 warnings.warn( 

210 "Solver output decoding failed from RunSolver configuration:\n" 

211 f"'{config_str}'\n" 

212 f"Output: {solver_output}\n" 

213 f"Exception: {ex}\n" 

214 "Setting status to 'UNKNOWN'.", 

215 category=RuntimeWarning, 

216 ) 

217 output_dict = {"status": SolverStatus.UNKNOWN} 

218 

219 output_dict["cutoff_time"] = cutoff_time 

220 if value_data_file is not None: 

221 cpu_time, wall_time, memory = RunSolver.get_measurements(value_data_file) 

222 output_dict["cpu_time"] = cpu_time 

223 output_dict["wall_time"] = wall_time 

224 output_dict["memory"] = memory 

225 else: # Could not retrieve cpu and wall time (log does not exist) 

226 output_dict["cpu_time"], output_dict["wall_time"] = -1.0, -1.0 

227 if output_dict["cpu_time"] > cutoff_time: 

228 output_dict["status"] = SolverStatus.TIMEOUT 

229 # Add the missing objectives (runtime based) 

230 if solver_input is not None and "objectives" in solver_input: 

231 objectives = solver_input["objectives"].split(",") 

232 for o_name in objectives: 

233 if o_name not in output_dict: 

234 output_dict[o_name] = None 

235 return output_dict