Coverage for sparkle/CLI/jobs.py: 24%
165 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-03-05 13:48 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-03-05 13:48 +0000
1#!/usr/bin/env python3
2"""Command to interact with async jobs."""
3import sys
4import time
5import argparse
7import pytermgui as ptg
8from tabulate import tabulate
10from runrunner.base import Status, Run
11from runrunner.slurm import SlurmRun
13from sparkle.platform.cli_types import TEXT
14from sparkle.CLI.help import logging
15from sparkle.CLI.help import argparse_custom as ac
16from sparkle.CLI.help import jobs as jobs_help
17from sparkle.CLI.help import global_variables as gv
20def parser_function() -> argparse.ArgumentParser:
21 """Create parser for the jobs command."""
22 parser = argparse.ArgumentParser(description="Command to interact with async jobs. "
23 "The command starts an interactive "
24 "table when no flags are given. Jobs "
25 "can be selected for cancelling in the "
26 "table and non activate jobs can be "
27 "flushed by pressing the spacebar.")
28 parser.add_argument(*ac.CancelJobsArgument.names, **ac.CancelJobsArgument.kwargs)
29 parser.add_argument(*ac.JobIDsArgument.names, **ac.JobIDsArgument.kwargs)
30 parser.add_argument(*ac.AllJobsArgument.names, **ac.AllJobsArgument.kwargs)
31 return parser
34def create_jobs_table(jobs: list[SlurmRun],
35 markup: bool = True,
36 format: str = "grid") -> str:
37 """Create a table of jobs.
39 Args:
40 runs: List of SlurmRun objects.
41 markup: By default some mark up will be applied to the table.
42 If false, a more plain version will be created.
43 format: The tabulate format to use.
44 per_job: If true, returns a dict with the job ids as keys and
45 The lines per job as values.
47 Returns:
48 A table of jobs as a string.
49 """
50 job_table = [["RunId", "Name", "Quality of Service", "Partition", "Status",
51 "Dependencies", "Finished Jobs", "Run Time"]]
52 for job in jobs:
53 # Count number of jobs that have finished
54 finished_jobs_count = sum(1 for status in job.all_status
55 if status == Status.COMPLETED)
56 if markup: # Format job.status
57 status_text = \
58 TEXT.format_text([TEXT.BOLD], job.status) \
59 if job.status == Status.RUNNING else \
60 (TEXT.format_text([TEXT.ITALIC], job.status)
61 if job.status == Status.COMPLETED else job.status.value)
62 else:
63 status_text = job.status.value
64 job_table.append(
65 [job.run_id,
66 job.name,
67 job.qos,
68 job.partition,
69 status_text,
70 "None" if len(job.dependencies) == 0 else ", ".join(job.dependencies),
71 f"{finished_jobs_count}/{len(job.all_status)}",
72 job.runtime])
73 if markup:
74 job_table = tabulate(job_table, headers="firstrow", tablefmt=format,
75 maxcolwidths=[12, 32, 14, 12, 16, 16, 16, 10])
76 return job_table
79def table_gui(jobs: list[Run]) -> None:
80 """Display a table of running jobs."""
81 jobs = sorted(jobs, key=lambda j: j.run_id)
82 ptg.Button.chars = {"delimiter": ["", ""]} # Disable padding around buttons
84 def cancel_jobs(self: ptg.Button) -> None:
85 """Cancel jobs based on a button click."""
86 job_id = self.label.split("|")[1].strip()
87 job = job_id_map[job_id]
89 def kill_exit(self: ptg.Button) -> None:
90 """Two step protocol of killing the job and removing the popup."""
91 job.kill()
92 manager.remove(popup)
94 button_yes = ptg.Button("Yes", kill_exit)
95 button_no = ptg.Button("No", lambda *_: manager.remove(popup))
97 popup = manager.alert(ptg.Label(f"Cancel job {job_id}?"),
98 button_no, button_yes)
100 refresh_data(self.parent)
102 def refresh_data(self: ptg.Window | ptg.WindowManager, key: str = None) -> None:
103 """Refresh the table."""
104 # Resolve window
105 window = self._windows[-1] if isinstance(self, ptg.WindowManager) else self
106 # Fetch latest data
107 for job in jobs:
108 if job.status in [Status.WAITING, Status.RUNNING]:
109 job.get_latest_job_details()
110 job_table = create_jobs_table(jobs,
111 markup=True).splitlines()
113 if window.width != len(job_table[0]): # Resize window
114 window.width = len(job_table[0])
116 for index, row in enumerate(job_table):
117 if row.startswith("|"):
118 row_id = row.split("|")[1].strip()
119 if (row_id in job_id_map.keys()
120 and job_id_map[row_id].status in [Status.WAITING,
121 Status.RUNNING]):
122 window._widgets[index + 1] = ptg.Button(row, cancel_jobs)
123 else:
124 window._widgets[index + 1] = ptg.Label(row)
125 else:
126 window._widgets[index + 1] = ptg.Label(row)
127 window._widgets[index + 1].parent = window
129 table = create_jobs_table(jobs, markup=True).splitlines()
130 with ptg.WindowManager() as manager:
131 def macro_reload(fmt: str) -> str:
132 """Updates jobs in the table with an interval."""
133 if "last_reload" not in globals():
134 global last_reload
135 last_reload = time.time()
136 diff = time.time() - last_reload
137 interval = 10.0
138 if diff > interval: # Check every 10 seconds
139 last_reload = 0
140 any_running = False
141 for job in jobs:
142 if job.status in [Status.RUNNING, Status.WAITING]:
143 any_running = True
144 job.get_latest_job_details()
145 if not any_running:
146 manager.stop()
147 refresh_data(manager)
148 last_reload = time.time()
149 n_bars = int(diff / 2)
150 return "|" + "█" * n_bars + " " * (4 - n_bars) + "|"
152 def flush_popup(self: ptg.WindowManager, key: str) -> None:
153 """Pop up for flushing completed jobs."""
154 if len(jobs) <= 1: # Cannot flush the last job
155 return
156 flushable_jobs = []
157 flushable_job_ids = []
158 for job in jobs:
159 if job.status not in [Status.WAITING, Status.RUNNING]:
160 flushable_jobs.append(job)
161 flushable_job_ids.append(job.run_id)
162 if len(flushable_jobs) == 0: # Nothing to flush
163 return
165 def flush(self: ptg.Button) -> None:
166 """Flush completed jobs."""
167 for job in flushable_jobs:
168 jobs.remove(job)
169 del job_id_map[job.run_id]
170 flushable_widgets = []
171 table_window = manager._windows[-1]
172 for iw, widget in enumerate(table_window._widgets):
173 if isinstance(widget, ptg.Label):
174 if ("|" in widget.value
175 and widget.value.split("|")[1].strip().isnumeric()):
176 job_id = widget.value.split("|")[1].strip()
177 if job_id in flushable_job_ids:
178 flushable_widgets.append(widget)
179 # Jobs can be multiple rows (labels) in the table window,
180 # are underlined with a vertical line to seperate jobs
181 offset = 1
182 while (len(table_window._widgets) - (iw + offset)) > 0:
183 current_widget = table_window._widgets[iw + offset]
184 if not isinstance(current_widget, ptg.Label):
185 break # This method only cleans labels
186 flushable_widgets.append(current_widget)
187 if current_widget.value.startswith("+"):
188 break # Seperation line, stop
189 offset += 1
190 for widget in flushable_widgets:
191 table_window.remove(widget)
192 manager.remove(popup)
194 popup = manager.alert(ptg.Label("Flush non-active jobs?"),
195 ptg.Button("Yes", flush),
196 ptg.Button("No", lambda *_: manager.remove(popup)))
198 ptg.tim.define("!reload", macro_reload)
199 window = (
200 ptg.Window(
201 "[bold]Sparkle Jobs [!reload]%c",
202 width=len(table[0]),
203 box="EMPTY",
204 )
205 )
206 job_id_map = {job.run_id: job for job in jobs}
207 for row in table:
208 if "|" not in row or not row.split("|")[1].strip().isnumeric():
209 window._add_widget(ptg.Label(row))
210 else:
211 window._add_widget(ptg.Button(label=row, onclick=cancel_jobs))
213 manager.add(window)
214 manager.bind(" ", flush_popup, description="Flush finished jobs")
216 # If all jobs were finished, print final table.
217 if all([j.status not in [Status.WAITING, Status.RUNNING] for j in jobs]):
218 table = create_jobs_table(jobs, format="fancy_grid")
219 print(table)
222def main(argv: list[str]) -> None:
223 """Main function of the jobs command."""
224 # Log command call
225 logging.log_command(sys.argv)
227 # Define command line arguments
228 parser = parser_function()
230 # Process command line arguments
231 args = parser.parse_args(argv)
233 # Filter jobs on relevant status
234 path = gv.settings().DEFAULT_log_output
235 jobs = [run for run in jobs_help.get_runs_from_file(path)
236 if run.status == Status.WAITING or run.status == Status.RUNNING]
237 if args.job_ids: # Filter
238 jobs = [job for job in jobs if job.run_id in args.job_ids]
239 job_ids = [job.run_id for job in jobs]
240 for id in args.job_ids:
241 if id not in job_ids:
242 print(f"WARNING: Job ID {id} was not found ")
244 if len(jobs) == 0:
245 if args.job_ids:
246 print(f"None of the specified jobs are running: {args.job_ids}")
247 sys.exit(-1)
248 print("No jobs running.")
249 if args.cancel:
250 sys.exit(-1)
251 sys.exit(0)
253 if args.cancel:
254 if args.all or args.job_ids:
255 killed_jobs = []
256 for j in jobs:
257 if args.all or j.run_id in args.job_ids:
258 j.kill()
259 killed_jobs.append(j)
260 if len(killed_jobs) == 0:
261 if args.all:
262 print("No jobs to cancel.")
263 sys.exit(0)
264 else:
265 print(f"ERROR: No jobs with ids {args.job_ids} to cancel.")
266 sys.exit(-1)
267 print(f"Canceled {len(killed_jobs)} jobs with IDs: "
268 f"{', '.join([j.run_id for j in killed_jobs])}.")
269 sys.exit(0)
270 else:
271 table_gui(jobs)
272 sys.exit(0)
275if __name__ == "__main__":
276 main(sys.argv[1:])