Coverage for src/sparkle/tools/runsolver/py_runsolver.py: 79%
121 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-15 14:11 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-15 14:11 +0000
1"""Python Runsolver class."""
3import argparse
4import os
5import pty
6import select
7import shlex
8import subprocess
9import sys
10import time
11from typing import Optional
12from pathlib import Path
13import psutil
15from sparkle.tools.general import get_time_pid_random_string
16from sparkle.tools.runsolver.runsolver import RunSolver
17from sparkle.__about__ import __version__ as sparkle_version
20class PyRunSolver(RunSolver):
21 """Class representation of Python based RunSolver."""
23 @staticmethod
24 def wrap_command(
25 command: list[str],
26 cutoff_time: int,
27 log_directory: Path,
28 log_name_base: str = None,
29 raw_results_file: bool = True,
30 ) -> list[str]:
31 """Wrap a command with the RunSolver call and arguments.
33 Args:
34 command: The command to wrap.
35 cutoff_time: The cutoff CPU time for the solver.
36 log_directory: The directory where to write the solver output.
37 log_name_base: A user defined name to easily identify the logs.
38 Defaults to "runsolver".
39 raw_results_file: Whether to use the raw results file.
41 Returns:
42 List of commands and arguments to execute the solver.
43 """
44 log_name_base = "runsolver" if log_name_base is None else log_name_base
45 unique_stamp = get_time_pid_random_string()
46 raw_result_path = log_directory / Path(f"{log_name_base}_{unique_stamp}.rawres")
47 watcher_data_path = raw_result_path.with_suffix(".log")
48 var_values_path = raw_result_path.with_suffix(".val")
49 runner_exec = Path(__file__)
51 return (
52 [
53 sys.executable,
54 str(runner_exec.absolute()),
55 "--timestamp",
56 "-C",
57 str(cutoff_time),
58 "-w",
59 str(watcher_data_path),
60 "-v",
61 str(var_values_path),
62 ]
63 + (["-o", str(raw_result_path)] if raw_results_file else [])
64 + command
65 )
68def run_with_monitoring(
69 command: list[str],
70 watcher_file: Path,
71 value_file: Path,
72 output_file: Path,
73 timestamp: bool = True,
74 cpu_limit: Optional[int] = None,
75 wall_clock_limit: Optional[int] = None,
76 vm_limit: Optional[int] = None,
77) -> None:
78 """Runs a command with CPU, wall-clock, and memory monitoring.
80 Args:
81 command: The command to execute as a list of strings.
82 watcher_file: File to log the command line.
83 value_file: File to write final resource usage metrics.
84 output_file: Optional file to redirect command's output.
85 timestamp: Whether to add timestamps to each raw output line as CPU TIME / WC Time.
86 cpu_limit: CPU time limit in seconds.
87 wall_clock_limit: Wall-clock time limit in seconds.
88 vm_limit: Virtual memory limit in KiB.
89 """
90 start_time = time.time()
91 cpu_time = 0.0
92 user_time = 0.0
93 system_time = 0.0
94 max_memory_kib = 0
96 with watcher_file.open("w") as f:
97 f.write(
98 f"PyRunSolver from Sparkle v{sparkle_version}, a Python mirror of RunSolver. Copyright (C) 2025, ADA Research Group\n"
99 )
100 f.write(f"command line: {shlex.join(sys.argv)}")
102 def process_raw_output(fd: int, target_file: Path = None) -> int | None:
103 """Reads and writes 'raw' command output to a file."""
104 if fd is None: # Closed file stream, nothing to do
105 return None
106 while select.select([fd], [], [], 0)[
107 0
108 ]: # Check if FD is available for read without blocking/waiting
109 try:
110 data = os.read(
111 fd,
112 8192, # Preferably this would be larger, but the stream never gives more than 4095 chars per read. Don't know why.
113 ).decode()
114 if data:
115 if timestamp:
116 ends_with_newline = data.endswith("\n")
117 stamp = f"{user_time:.2f}/{time.time() - start_time:.2f}"
118 data = "\n".join(
119 [f"{stamp}\t{line}" for line in data.splitlines()]
120 ) # Add stamp at the beginning of each line
121 data += (
122 "\n" if ends_with_newline else ""
123 ) # Splitlines removes last \n
124 if target_file:
125 with target_file.open(
126 "a"
127 ) as f: # Reopen and close per line to stream output
128 f.write(data)
129 else: # No output log, print to 'terminal' (Or slurm log etc.)
130 print(data)
131 else:
132 os.close(fd) # No data on stream
133 return None # Remove FD
134 except OSError: # Streamno longer readable
135 return None # Remove FD
136 return fd
138 if output_file: # Create raw output log
139 output_file.open("w+").close()
141 try:
142 master_fd, slave_fd = (
143 pty.openpty()
144 ) # Open new pseudo-terminal pair for Master (us) and Slave (subprocess)
145 process = subprocess.Popen(
146 command, stdout=slave_fd, stderr=slave_fd, close_fds=True
147 )
148 os.close(slave_fd) # Is this necessary? Are the fds not already closed above?
149 ps_process = psutil.Process(process.pid)
151 # Main monitoring loop
152 while process.poll() is None:
153 # Check if process has exceed wall clock limit
154 if wall_clock_limit and (time.time() - start_time) > wall_clock_limit:
155 process.kill()
156 break
158 # Read process output and write to out_log
159 master_fd = process_raw_output(master_fd, output_file)
161 try: # Try to update statistics
162 children = ps_process.children(recursive=True)
164 # Sum CPU times and memory for the entire process tree
165 current_user_time = (
166 ps_process.cpu_times().user + ps_process.cpu_times().children_user
167 )
168 current_system_time = (
169 ps_process.cpu_times().system
170 + ps_process.cpu_times().children_system
171 )
172 current_vms = ps_process.memory_info().vms
174 for child in children:
175 try:
176 current_user_time += child.cpu_times().user
177 current_system_time += child.cpu_times().system
178 current_vms += child.memory_info().vms
179 except psutil.NoSuchProcess:
180 continue
182 user_time = current_user_time
183 system_time = current_system_time
184 cpu_time = user_time + system_time
185 max_memory_kib = max(max_memory_kib, current_vms / 1024)
186 if cpu_limit and cpu_time > cpu_limit:
187 process.kill()
188 break
189 if vm_limit and max_memory_kib > vm_limit:
190 process.kill()
191 break
193 except psutil.NoSuchProcess:
194 break
195 process_raw_output(master_fd, output_file) # Final read from stream
196 except (
197 KeyboardInterrupt
198 ): # Ensure that we can catch CTRL-C and still wrap up properly
199 if process and process.poll() is None:
200 process.kill()
201 raise
202 finally:
203 if process:
204 process.wait()
205 if master_fd:
206 os.close(master_fd) # Close child's output stream
208 wall_time = time.time() - start_time
209 timeout = (cpu_limit is not None and cpu_time > cpu_limit) or (
210 wall_clock_limit is not None and wall_time > wall_clock_limit
211 )
212 memout = vm_limit is not None and max_memory_kib > vm_limit
214 stats = {
215 "WCTIME": (f"{wall_time}", "wall clock time in seconds"),
216 "CPUTIME": (f"{cpu_time}", "CPU time in seconds (USERTIME+SYSTEMTIME)"),
217 "USERTIME": (f"{user_time}", "CPU time spent in user mode in seconds"),
218 "SYSTEMTIME": (f"{system_time}", "CPU time spent in system mode in seconds"),
219 "CPUUSAGE": (
220 f"{((cpu_time / wall_time) * 100 if wall_time > 0 else 0):.2f}",
221 "CPUTIME/WCTIME in percent",
222 ),
223 "MAXVM": (f"{max_memory_kib:.0f}", "maximum virtual memory used in KiB"),
224 "TIMEOUT": (str(timeout).lower(), "did the solver exceed a time limit?"),
225 "MEMOUT": (str(memout).lower(), "did the solver exceed the memory limit?"),
226 }
228 with Path.open(value_file, "w") as f:
229 for key, (value, comment) in stats.items():
230 f.write(f"# {key}: {comment}\n")
231 f.write(f"{key}={value}\n")
234if __name__ == "__main__":
235 parser = argparse.ArgumentParser(description="Run and monitor a command.")
236 parser.add_argument(
237 "--timestamp",
238 action="store_true",
239 help="Include a timestamp in the output logs.",
240 )
241 parser.add_argument("-C", "--cpu-limit", type=int, help="CPU time limit in seconds.")
242 parser.add_argument(
243 "-W", "--wall-clock-limit", type=int, help="Wall clock time limit in seconds."
244 )
245 parser.add_argument(
246 "-V",
247 "--vm-limit",
248 type=int,
249 help="Virtual memory limit in KiB.",
250 )
251 parser.add_argument(
252 "-w",
253 "--watcher-data",
254 type=Path,
255 required=True,
256 help="File to write watcher info to.",
257 )
258 parser.add_argument(
259 "-v",
260 "--var",
261 type=Path,
262 required=True,
263 help="File to write final resource values to.",
264 )
265 parser.add_argument(
266 "-o",
267 "--solver-data",
268 required=False,
269 type=Path,
270 help="File to redirect command stdout and stderr to.",
271 )
272 parser.add_argument("command", nargs=argparse.REMAINDER, help="The command to run.")
274 args = parser.parse_args()
276 if not args.command:
277 parser.error("You must specify a command to run.")
279 run_with_monitoring(
280 command=args.command,
281 watcher_file=args.watcher_data,
282 value_file=args.var,
283 output_file=args.solver_data,
284 timestamp=args.timestamp,
285 cpu_limit=args.cpu_limit,
286 wall_clock_limit=args.wall_clock_limit,
287 vm_limit=args.vm_limit,
288 )