Coverage for sparkle/tools/runsolver.py: 76%

111 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-11-05 14:48 +0000

1"""Class for handling the Runsolver Wrapper.""" 

2from __future__ import annotations 

3import sys 

4import ast 

5import re 

6import warnings 

7from pathlib import Path 

8 

9from sparkle.types import SolverStatus 

10from sparkle.tools.general import get_time_pid_random_string 

11 

12 

13class RunSolver: 

14 """Class representation of RunSolver. 

15 

16 For more information see: http://www.cril.univ-artois.fr/~roussel/runsolver/ 

17 """ 

18 

19 def __init__(self: RunSolver) -> None: 

20 """Currently RunSolver has no instance specific methods or properties.""" 

21 pass 

22 

23 @staticmethod 

24 def wrap_command( 

25 runsolver_executable: Path, 

26 command: list[str], 

27 cutoff_time: int, 

28 log_directory: Path, 

29 log_name_base: str = None, 

30 raw_results_file: bool = True) -> list[str]: 

31 """Wrap a command with the RunSolver call and arguments. 

32 

33 Args: 

34 runsolver_executable: The Path to the runsolver executable. 

35 Is returned as an *absolute* path in the output. 

36 command: The command to wrap. 

37 cutoff_time: The cutoff CPU time for the solver. 

38 log_directory: The directory where to write the solver output. 

39 log_name_base: A user defined name to easily identify the logs. 

40 Defaults to "runsolver". 

41 raw_results_file: Whether to use the raw results file. 

42 

43 Returns: 

44 List of commands and arguments to execute the solver. 

45 """ 

46 # Create RunSolver Logs 

47 # --timestamp 

48 # instructs to timestamp each line of the solver standard output and 

49 # error files (which are then redirected to stdout) 

50 

51 # --use-pty 

52 # use a pseudo-terminal to collect the solver output. Currently only 

53 # available when lines are timestamped. Some I/O libraries (including 

54 # the C library) automatically flushes the output after each line when 

55 # the standard output is a terminal. There's no automatic flush when 

56 # the standard output is a pipe or a plain file. See setlinebuf() for 

57 # some details. This option instructs runsolver to use a 

58 # pseudo-terminal instead of a pipe/file to collect the solver 

59 # output. This fools the solver which will line-buffer its output. 

60 

61 # -w filename or --watcher-data filename 

62 # sends the watcher informations to filename 

63 

64 # -v filename or --var filename 

65 # save the most relevant information (times,...) 

66 # in an easy to parse VAR=VALUE file 

67 

68 # -o filename or --solver-data filename 

69 # redirects the solver output (both stdout and stderr) to filename 

70 log_name_base = "runsolver" if log_name_base is None else log_name_base 

71 unique_stamp = get_time_pid_random_string() 

72 raw_result_path = log_directory / Path(f"{log_name_base}_{unique_stamp}.rawres") 

73 watcher_data_path = raw_result_path.with_suffix(".log") 

74 var_values_path = raw_result_path.with_suffix(".val") 

75 

76 return [str(runsolver_executable.absolute()), 

77 "--timestamp", "--use-pty", 

78 "--cpu-limit", str(cutoff_time), 

79 "-w", str(watcher_data_path), 

80 "-v", str(var_values_path)] + ( 

81 ["-o", str(raw_result_path)] if raw_results_file else []) + command 

82 

83 @staticmethod 

84 def get_measurements(runsolver_values_path: Path, 

85 not_found: float = -1.0) -> tuple[float, float, float]: 

86 """Return the CPU and wallclock time reported by runsolver in values log.""" 

87 cpu_time, wall_time, memory = not_found, not_found, not_found 

88 if runsolver_values_path.exists(): 

89 with runsolver_values_path.open("r") as infile: 

90 lines = [line.strip().split("=") for line in infile.readlines() 

91 if line.count("=") == 1] 

92 for keyword, value in lines: 

93 if keyword == "WCTIME": 

94 wall_time = float(value) 

95 elif keyword == "CPUTIME": 

96 cpu_time = float(value) 

97 elif keyword == "MAXVM": 

98 memory = float(int(value) / 1024.0) # MB 

99 # Order is fixed, CPU is the last thing we want to read, so break 

100 break 

101 return cpu_time, wall_time, memory 

102 

103 @staticmethod 

104 def get_status(runsolver_values_path: Path, 

105 runsolver_raw_path: Path) -> SolverStatus: 

106 """Get run status from runsolver logs.""" 

107 if not runsolver_values_path.exists() and (runsolver_raw_path is not None 

108 and not runsolver_raw_path.exists()): 

109 # Runsolver logs were not created, job was stopped ''incorrectly'' 

110 return SolverStatus.CRASHED 

111 # First check if runsolver reported time out 

112 if runsolver_values_path.exists(): 

113 for line in reversed(runsolver_values_path.open("r").readlines()): 

114 if line.strip().startswith("TIMEOUT="): 

115 if line.strip() == "TIMEOUT=true": 

116 return SolverStatus.TIMEOUT 

117 break 

118 if runsolver_raw_path is None: 

119 return SolverStatus.UNKNOWN 

120 if not runsolver_raw_path.exists(): 

121 # Runsolver log was not created, job was stopped ''incorrectly'' 

122 return SolverStatus.KILLED 

123 # Last line of runsolver log should contain the raw sparkle solver wrapper output 

124 runsolver_raw_contents = runsolver_raw_path.open("r").read().strip() 

125 # cutoff_time = 

126 sparkle_wrapper_dict_str = runsolver_raw_contents.splitlines()[-1] 

127 solver_regex_filter = re.findall("{.*}", sparkle_wrapper_dict_str)[0] 

128 output_dict = ast.literal_eval(solver_regex_filter) 

129 status = SolverStatus(output_dict["status"]) 

130 # if status == SolverStatus.CRASHED and cpu_time > cutoff_time 

131 return status 

132 

133 @staticmethod 

134 def get_solver_args(runsolver_log_path: Path) -> str: 

135 """Retrieves solver arguments dict from runsolver log.""" 

136 if runsolver_log_path.exists(): 

137 for line in runsolver_log_path.open("r").readlines(): 

138 if line.startswith("command line:"): 

139 return (line.split("sparkle_solver_wrapper.py", 1)[1] 

140 .strip().strip("'")) 

141 return "" 

142 

143 @staticmethod 

144 def get_solver_output(runsolver_configuration: list[str | Path], 

145 process_output: str) -> dict[str, str | object]: 

146 """Decode solver output dictionary when called with runsolver.""" 

147 solver_input = None 

148 solver_output = None 

149 value_data_file = None 

150 cutoff_time = sys.maxsize 

151 for idx, conf in enumerate(runsolver_configuration): 

152 if not isinstance(conf, str): 

153 # Looking for arg names 

154 continue 

155 conf = conf.strip() 

156 if conf == "-o" or conf == "--solver-data": 

157 # solver output was redirected 

158 solver_data_file = Path(runsolver_configuration[idx + 1]) 

159 if solver_data_file.exists(): 

160 solver_output = solver_data_file.open("r").read() 

161 if "-v" in conf or "--var" in conf: 

162 value_data_file = Path(runsolver_configuration[idx + 1]) 

163 if "--cpu-limit" in conf: 

164 cutoff_time = float(runsolver_configuration[idx + 1]) 

165 if "-w" in conf or "--watcher-data" in conf: 

166 watch_file = Path(runsolver_configuration[idx + 1]) 

167 args_str = RunSolver.get_solver_args(watch_file) 

168 if args_str == "": # Could not find log file or args 

169 continue 

170 solver_input = re.findall("{.*}", args_str)[0] 

171 solver_input = ast.literal_eval(solver_input) 

172 cutoff_time = float(solver_input["cutoff_time"]) 

173 

174 if solver_output is None: 

175 # Still empty, try to read from subprocess 

176 solver_output = process_output 

177 # Format output to only the brackets (dict) 

178 # NOTE: It should have only one match, do we want some error logging here? 

179 try: 

180 solver_regex_filter = re.findall("{.*}", solver_output)[0] 

181 output_dict = ast.literal_eval(solver_regex_filter) 

182 except Exception: 

183 config_str = " ".join([str(c) for c in runsolver_configuration]) 

184 warnings.warn("Solver output decoding failed from RunSolver configuration: " 

185 f"'{config_str}'. Setting status to 'UNKNOWN'.", 

186 category=RuntimeWarning) 

187 output_dict = {"status": SolverStatus.UNKNOWN} 

188 

189 output_dict["cutoff_time"] = cutoff_time 

190 if value_data_file is not None: 

191 cpu_time, wall_time, memory = RunSolver.get_measurements(value_data_file) 

192 output_dict["cpu_time"] = cpu_time 

193 output_dict["wall_time"] = wall_time 

194 output_dict["memory"] = memory 

195 else: # Could not retrieve cpu and wall time (log does not exist) 

196 output_dict["cpu_time"], output_dict["wall_time"] = -1.0, -1.0 

197 if output_dict["cpu_time"] > cutoff_time: 

198 output_dict["status"] = SolverStatus.TIMEOUT 

199 # Add the missing objectives (runtime based) 

200 if solver_input is not None and "objectives" in solver_input: 

201 objectives = solver_input["objectives"].split(",") 

202 for o_name in objectives: 

203 if o_name not in output_dict: 

204 output_dict[o_name] = None 

205 return output_dict