Coverage for src/sparkle/tools/runsolver/py_runsolver.py: 79%

121 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-15 14:11 +0000

1"""Python Runsolver class.""" 

2 

3import argparse 

4import os 

5import pty 

6import select 

7import shlex 

8import subprocess 

9import sys 

10import time 

11from typing import Optional 

12from pathlib import Path 

13import psutil 

14 

15from sparkle.tools.general import get_time_pid_random_string 

16from sparkle.tools.runsolver.runsolver import RunSolver 

17from sparkle.__about__ import __version__ as sparkle_version 

18 

19 

20class PyRunSolver(RunSolver): 

21 """Class representation of Python based RunSolver.""" 

22 

23 @staticmethod 

24 def wrap_command( 

25 command: list[str], 

26 cutoff_time: int, 

27 log_directory: Path, 

28 log_name_base: str = None, 

29 raw_results_file: bool = True, 

30 ) -> list[str]: 

31 """Wrap a command with the RunSolver call and arguments. 

32 

33 Args: 

34 command: The command to wrap. 

35 cutoff_time: The cutoff CPU time for the solver. 

36 log_directory: The directory where to write the solver output. 

37 log_name_base: A user defined name to easily identify the logs. 

38 Defaults to "runsolver". 

39 raw_results_file: Whether to use the raw results file. 

40 

41 Returns: 

42 List of commands and arguments to execute the solver. 

43 """ 

44 log_name_base = "runsolver" if log_name_base is None else log_name_base 

45 unique_stamp = get_time_pid_random_string() 

46 raw_result_path = log_directory / Path(f"{log_name_base}_{unique_stamp}.rawres") 

47 watcher_data_path = raw_result_path.with_suffix(".log") 

48 var_values_path = raw_result_path.with_suffix(".val") 

49 runner_exec = Path(__file__) 

50 

51 return ( 

52 [ 

53 sys.executable, 

54 str(runner_exec.absolute()), 

55 "--timestamp", 

56 "-C", 

57 str(cutoff_time), 

58 "-w", 

59 str(watcher_data_path), 

60 "-v", 

61 str(var_values_path), 

62 ] 

63 + (["-o", str(raw_result_path)] if raw_results_file else []) 

64 + command 

65 ) 

66 

67 

68def run_with_monitoring( 

69 command: list[str], 

70 watcher_file: Path, 

71 value_file: Path, 

72 output_file: Path, 

73 timestamp: bool = True, 

74 cpu_limit: Optional[int] = None, 

75 wall_clock_limit: Optional[int] = None, 

76 vm_limit: Optional[int] = None, 

77) -> None: 

78 """Runs a command with CPU, wall-clock, and memory monitoring. 

79 

80 Args: 

81 command: The command to execute as a list of strings. 

82 watcher_file: File to log the command line. 

83 value_file: File to write final resource usage metrics. 

84 output_file: Optional file to redirect command's output. 

85 timestamp: Whether to add timestamps to each raw output line as CPU TIME / WC Time. 

86 cpu_limit: CPU time limit in seconds. 

87 wall_clock_limit: Wall-clock time limit in seconds. 

88 vm_limit: Virtual memory limit in KiB. 

89 """ 

90 start_time = time.time() 

91 cpu_time = 0.0 

92 user_time = 0.0 

93 system_time = 0.0 

94 max_memory_kib = 0 

95 

96 with watcher_file.open("w") as f: 

97 f.write( 

98 f"PyRunSolver from Sparkle v{sparkle_version}, a Python mirror of RunSolver. Copyright (C) 2025, ADA Research Group\n" 

99 ) 

100 f.write(f"command line: {shlex.join(sys.argv)}") 

101 

102 def process_raw_output(fd: int, target_file: Path = None) -> int | None: 

103 """Reads and writes 'raw' command output to a file.""" 

104 if fd is None: # Closed file stream, nothing to do 

105 return None 

106 while select.select([fd], [], [], 0)[ 

107 0 

108 ]: # Check if FD is available for read without blocking/waiting 

109 try: 

110 data = os.read( 

111 fd, 

112 8192, # Preferably this would be larger, but the stream never gives more than 4095 chars per read. Don't know why. 

113 ).decode() 

114 if data: 

115 if timestamp: 

116 ends_with_newline = data.endswith("\n") 

117 stamp = f"{user_time:.2f}/{time.time() - start_time:.2f}" 

118 data = "\n".join( 

119 [f"{stamp}\t{line}" for line in data.splitlines()] 

120 ) # Add stamp at the beginning of each line 

121 data += ( 

122 "\n" if ends_with_newline else "" 

123 ) # Splitlines removes last \n 

124 if target_file: 

125 with target_file.open( 

126 "a" 

127 ) as f: # Reopen and close per line to stream output 

128 f.write(data) 

129 else: # No output log, print to 'terminal' (Or slurm log etc.) 

130 print(data) 

131 else: 

132 os.close(fd) # No data on stream 

133 return None # Remove FD 

134 except OSError: # Streamno longer readable 

135 return None # Remove FD 

136 return fd 

137 

138 if output_file: # Create raw output log 

139 output_file.open("w+").close() 

140 

141 try: 

142 master_fd, slave_fd = ( 

143 pty.openpty() 

144 ) # Open new pseudo-terminal pair for Master (us) and Slave (subprocess) 

145 process = subprocess.Popen( 

146 command, stdout=slave_fd, stderr=slave_fd, close_fds=True 

147 ) 

148 os.close(slave_fd) # Is this necessary? Are the fds not already closed above? 

149 ps_process = psutil.Process(process.pid) 

150 

151 # Main monitoring loop 

152 while process.poll() is None: 

153 # Check if process has exceed wall clock limit 

154 if wall_clock_limit and (time.time() - start_time) > wall_clock_limit: 

155 process.kill() 

156 break 

157 

158 # Read process output and write to out_log 

159 master_fd = process_raw_output(master_fd, output_file) 

160 

161 try: # Try to update statistics 

162 children = ps_process.children(recursive=True) 

163 

164 # Sum CPU times and memory for the entire process tree 

165 current_user_time = ( 

166 ps_process.cpu_times().user + ps_process.cpu_times().children_user 

167 ) 

168 current_system_time = ( 

169 ps_process.cpu_times().system 

170 + ps_process.cpu_times().children_system 

171 ) 

172 current_vms = ps_process.memory_info().vms 

173 

174 for child in children: 

175 try: 

176 current_user_time += child.cpu_times().user 

177 current_system_time += child.cpu_times().system 

178 current_vms += child.memory_info().vms 

179 except psutil.NoSuchProcess: 

180 continue 

181 

182 user_time = current_user_time 

183 system_time = current_system_time 

184 cpu_time = user_time + system_time 

185 max_memory_kib = max(max_memory_kib, current_vms / 1024) 

186 if cpu_limit and cpu_time > cpu_limit: 

187 process.kill() 

188 break 

189 if vm_limit and max_memory_kib > vm_limit: 

190 process.kill() 

191 break 

192 

193 except psutil.NoSuchProcess: 

194 break 

195 process_raw_output(master_fd, output_file) # Final read from stream 

196 except ( 

197 KeyboardInterrupt 

198 ): # Ensure that we can catch CTRL-C and still wrap up properly 

199 if process and process.poll() is None: 

200 process.kill() 

201 raise 

202 finally: 

203 if process: 

204 process.wait() 

205 if master_fd: 

206 os.close(master_fd) # Close child's output stream 

207 

208 wall_time = time.time() - start_time 

209 timeout = (cpu_limit is not None and cpu_time > cpu_limit) or ( 

210 wall_clock_limit is not None and wall_time > wall_clock_limit 

211 ) 

212 memout = vm_limit is not None and max_memory_kib > vm_limit 

213 

214 stats = { 

215 "WCTIME": (f"{wall_time}", "wall clock time in seconds"), 

216 "CPUTIME": (f"{cpu_time}", "CPU time in seconds (USERTIME+SYSTEMTIME)"), 

217 "USERTIME": (f"{user_time}", "CPU time spent in user mode in seconds"), 

218 "SYSTEMTIME": (f"{system_time}", "CPU time spent in system mode in seconds"), 

219 "CPUUSAGE": ( 

220 f"{((cpu_time / wall_time) * 100 if wall_time > 0 else 0):.2f}", 

221 "CPUTIME/WCTIME in percent", 

222 ), 

223 "MAXVM": (f"{max_memory_kib:.0f}", "maximum virtual memory used in KiB"), 

224 "TIMEOUT": (str(timeout).lower(), "did the solver exceed a time limit?"), 

225 "MEMOUT": (str(memout).lower(), "did the solver exceed the memory limit?"), 

226 } 

227 

228 with Path.open(value_file, "w") as f: 

229 for key, (value, comment) in stats.items(): 

230 f.write(f"# {key}: {comment}\n") 

231 f.write(f"{key}={value}\n") 

232 

233 

234if __name__ == "__main__": 

235 parser = argparse.ArgumentParser(description="Run and monitor a command.") 

236 parser.add_argument( 

237 "--timestamp", 

238 action="store_true", 

239 help="Include a timestamp in the output logs.", 

240 ) 

241 parser.add_argument("-C", "--cpu-limit", type=int, help="CPU time limit in seconds.") 

242 parser.add_argument( 

243 "-W", "--wall-clock-limit", type=int, help="Wall clock time limit in seconds." 

244 ) 

245 parser.add_argument( 

246 "-V", 

247 "--vm-limit", 

248 type=int, 

249 help="Virtual memory limit in KiB.", 

250 ) 

251 parser.add_argument( 

252 "-w", 

253 "--watcher-data", 

254 type=Path, 

255 required=True, 

256 help="File to write watcher info to.", 

257 ) 

258 parser.add_argument( 

259 "-v", 

260 "--var", 

261 type=Path, 

262 required=True, 

263 help="File to write final resource values to.", 

264 ) 

265 parser.add_argument( 

266 "-o", 

267 "--solver-data", 

268 required=False, 

269 type=Path, 

270 help="File to redirect command stdout and stderr to.", 

271 ) 

272 parser.add_argument("command", nargs=argparse.REMAINDER, help="The command to run.") 

273 

274 args = parser.parse_args() 

275 

276 if not args.command: 

277 parser.error("You must specify a command to run.") 

278 

279 run_with_monitoring( 

280 command=args.command, 

281 watcher_file=args.watcher_data, 

282 value_file=args.var, 

283 output_file=args.solver_data, 

284 timestamp=args.timestamp, 

285 cpu_limit=args.cpu_limit, 

286 wall_clock_limit=args.wall_clock_limit, 

287 vm_limit=args.vm_limit, 

288 )