Coverage for sparkle/CLI/jobs.py: 24%
165 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-29 10:17 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-29 10:17 +0000
1#!/usr/bin/env python3
2"""Command to interact with async jobs."""
4import sys
5import time
6import argparse
8import pytermgui as ptg
9from tabulate import tabulate
11from runrunner.base import Status, Run
12from runrunner.slurm import SlurmRun
14from sparkle.platform.cli_types import TEXT
15from sparkle.CLI.help import logging
16from sparkle.CLI.help import argparse_custom as ac
17from sparkle.CLI.help import jobs as jobs_help
18from sparkle.CLI.help import global_variables as gv
21def parser_function() -> argparse.ArgumentParser:
22 """Create parser for the jobs command."""
23 parser = argparse.ArgumentParser(
24 description="Command to interact with async jobs. "
25 "The command starts an interactive "
26 "table when no flags are given. Jobs "
27 "can be selected for cancelling in the "
28 "table and non activate jobs can be "
29 "flushed by pressing the spacebar."
30 )
31 parser.add_argument(*ac.CancelJobsArgument.names, **ac.CancelJobsArgument.kwargs)
32 parser.add_argument(*ac.JobIDsArgument.names, **ac.JobIDsArgument.kwargs)
33 parser.add_argument(*ac.AllJobsArgument.names, **ac.AllJobsArgument.kwargs)
34 return parser
37def create_jobs_table(
38 jobs: list[SlurmRun], markup: bool = True, format: str = "grid"
39) -> str:
40 """Create a table of jobs.
42 Args:
43 jobs: List of SlurmRun objects.
44 markup: By default some mark up will be applied to the table.
45 If false, a more plain version will be created.
46 format: The tabulate format to use.
48 Returns:
49 A table of jobs as a string.
50 """
51 job_table = [
52 [
53 "RunId",
54 "Name",
55 "Quality of Service",
56 "Partition",
57 "Status",
58 "Dependencies",
59 "Finished Jobs",
60 "Run Time",
61 ]
62 ]
63 for job in jobs:
64 # Count number of jobs that have finished
65 finished_jobs_count = sum(
66 1 for status in job.all_status if status == Status.COMPLETED
67 )
68 if markup: # Format job.status
69 status_text = (
70 TEXT.format_text([TEXT.BOLD], job.status)
71 if job.status == Status.RUNNING
72 else (
73 TEXT.format_text([TEXT.ITALIC], job.status)
74 if job.status == Status.COMPLETED
75 else job.status.value
76 )
77 )
78 else:
79 status_text = job.status.value
80 job_table.append(
81 [
82 job.run_id,
83 job.name,
84 job.qos,
85 job.partition,
86 status_text,
87 "None" if len(job.dependencies) == 0 else ", ".join(job.dependencies),
88 f"{finished_jobs_count}/{len(job.all_status)}",
89 job.runtime,
90 ]
91 )
92 if markup:
93 job_table = tabulate(
94 job_table,
95 headers="firstrow",
96 tablefmt=format,
97 maxcolwidths=[12, 32, 14, 12, 16, 16, 16, 10],
98 )
99 return job_table
102def table_gui(jobs: list[Run]) -> None:
103 """Display a table of running jobs."""
104 jobs = sorted(jobs, key=lambda j: j.run_id)
105 ptg.Button.chars = {"delimiter": ["", ""]} # Disable padding around buttons
107 def cancel_jobs(self: ptg.Button) -> None:
108 """Cancel jobs based on a button click."""
109 job_id = self.label.split("|")[1].strip()
110 job = job_id_map[job_id]
112 def kill_exit(self: ptg.Button) -> None:
113 """Two step protocol of killing the job and removing the popup."""
114 job.kill()
115 manager.remove(popup)
117 button_yes = ptg.Button("Yes", kill_exit)
118 button_no = ptg.Button("No", lambda *_: manager.remove(popup))
120 popup = manager.alert(ptg.Label(f"Cancel job {job_id}?"), button_no, button_yes)
122 refresh_data(self.parent)
124 def refresh_data(self: ptg.Window | ptg.WindowManager, key: str = None) -> None:
125 """Refresh the table."""
126 # Resolve window
127 window = self._windows[-1] if isinstance(self, ptg.WindowManager) else self
128 # Fetch latest data
129 for job in jobs:
130 if job.status in [Status.WAITING, Status.RUNNING]:
131 job.get_latest_job_details()
132 job_table = create_jobs_table(jobs, markup=True).splitlines()
134 if window.width != len(job_table[0]): # Resize window
135 window.width = len(job_table[0])
137 for index, row in enumerate(job_table):
138 if row.startswith("|"):
139 row_id = row.split("|")[1].strip()
140 if row_id in job_id_map.keys() and job_id_map[row_id].status in [
141 Status.WAITING,
142 Status.RUNNING,
143 ]:
144 window._widgets[index + 1] = ptg.Button(row, cancel_jobs)
145 else:
146 window._widgets[index + 1] = ptg.Label(row)
147 else:
148 window._widgets[index + 1] = ptg.Label(row)
149 window._widgets[index + 1].parent = window
151 table = create_jobs_table(jobs, markup=True).splitlines()
152 with ptg.WindowManager() as manager:
154 def macro_reload(fmt: str) -> str:
155 """Updates jobs in the table with an interval."""
156 if "last_reload" not in globals():
157 global last_reload
158 last_reload = time.time()
159 diff = time.time() - last_reload
160 interval = 10.0
161 if diff > interval: # Check every 10 seconds
162 last_reload = 0
163 any_running = False
164 for job in jobs:
165 if job.status in [Status.RUNNING, Status.WAITING]:
166 any_running = True
167 job.get_latest_job_details()
168 if not any_running:
169 manager.stop()
170 refresh_data(manager)
171 last_reload = time.time()
172 n_bars = int(diff / 2)
173 return "|" + "█" * n_bars + " " * (4 - n_bars) + "|"
175 def flush_popup(self: ptg.WindowManager, key: str) -> None:
176 """Pop up for flushing completed jobs."""
177 if len(jobs) <= 1: # Cannot flush the last job
178 return
179 flushable_jobs = []
180 flushable_job_ids = []
181 for job in jobs:
182 if job.status not in [Status.WAITING, Status.RUNNING]:
183 flushable_jobs.append(job)
184 flushable_job_ids.append(job.run_id)
185 if len(flushable_jobs) == 0: # Nothing to flush
186 return
188 def flush(self: ptg.Button) -> None:
189 """Flush completed jobs."""
190 for job in flushable_jobs:
191 jobs.remove(job)
192 del job_id_map[job.run_id]
193 flushable_widgets = []
194 table_window = manager._windows[-1]
195 for iw, widget in enumerate(table_window._widgets):
196 if isinstance(widget, ptg.Label):
197 if (
198 "|" in widget.value
199 and widget.value.split("|")[1].strip().isnumeric()
200 ):
201 job_id = widget.value.split("|")[1].strip()
202 if job_id in flushable_job_ids:
203 flushable_widgets.append(widget)
204 # Jobs can be multiple rows (labels) in the table window,
205 # are underlined with a vertical line to seperate jobs
206 offset = 1
207 while (len(table_window._widgets) - (iw + offset)) > 0:
208 current_widget = table_window._widgets[iw + offset]
209 if not isinstance(current_widget, ptg.Label):
210 break # This method only cleans labels
211 flushable_widgets.append(current_widget)
212 if current_widget.value.startswith("+"):
213 break # Seperation line, stop
214 offset += 1
215 for widget in flushable_widgets:
216 table_window.remove(widget)
217 manager.remove(popup)
219 popup = manager.alert(
220 ptg.Label("Flush non-active jobs?"),
221 ptg.Button("Yes", flush),
222 ptg.Button("No", lambda *_: manager.remove(popup)),
223 )
225 ptg.tim.define("!reload", macro_reload)
226 window = ptg.Window(
227 "[bold]Sparkle Jobs [!reload]%c",
228 width=len(table[0]),
229 box="EMPTY",
230 )
231 job_id_map = {job.run_id: job for job in jobs}
232 for row in table:
233 if "|" not in row or not row.split("|")[1].strip().isnumeric():
234 window._add_widget(ptg.Label(row))
235 else:
236 window._add_widget(ptg.Button(label=row, onclick=cancel_jobs))
238 manager.add(window)
239 manager.bind(" ", flush_popup, description="Flush finished jobs")
241 # If all jobs were finished, print final table.
242 if all([j.status not in [Status.WAITING, Status.RUNNING] for j in jobs]):
243 table = create_jobs_table(jobs, format="fancy_grid")
244 print(table)
247def main(argv: list[str]) -> None:
248 """Main function of the jobs command."""
249 # Log command call
250 logging.log_command(sys.argv)
252 # Define command line arguments
253 parser = parser_function()
255 # Process command line arguments
256 args = parser.parse_args(argv)
258 # Filter jobs on relevant status
259 path = gv.settings().DEFAULT_log_output
260 jobs = [
261 run
262 for run in jobs_help.get_runs_from_file(path)
263 if run.status == Status.WAITING or run.status == Status.RUNNING
264 ]
265 if args.job_ids: # Filter
266 jobs = [job for job in jobs if job.run_id in args.job_ids]
267 job_ids = [job.run_id for job in jobs]
268 for id in args.job_ids:
269 if id not in job_ids:
270 print(f"WARNING: Job ID {id} was not found ")
272 if len(jobs) == 0:
273 if args.job_ids:
274 print(f"None of the specified jobs are running: {args.job_ids}")
275 sys.exit(-1)
276 print("No jobs running.")
277 if args.cancel:
278 sys.exit(-1)
279 sys.exit(0)
281 if args.cancel:
282 if args.all or args.job_ids:
283 killed_jobs = []
284 for j in jobs:
285 if args.all or j.run_id in args.job_ids:
286 j.kill()
287 killed_jobs.append(j)
288 if len(killed_jobs) == 0:
289 if args.all:
290 print("No jobs to cancel.")
291 sys.exit(0)
292 else:
293 print(f"ERROR: No jobs with ids {args.job_ids} to cancel.")
294 sys.exit(-1)
295 print(
296 f"Canceled {len(killed_jobs)} jobs with IDs: "
297 f"{', '.join([j.run_id for j in killed_jobs])}."
298 )
299 sys.exit(0)
300 else:
301 table_gui(jobs)
302 sys.exit(0)
305if __name__ == "__main__":
306 main(sys.argv[1:])