Coverage for sparkle/CLI/jobs.py: 31%
128 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-07 15:22 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-07 15:22 +0000
1#!/usr/bin/env python3
2"""Command to interact with async jobs."""
3import sys
4import time
5import argparse
7import pytermgui as ptg
8from tabulate import tabulate
10from runrunner.base import Status, Run
11from runrunner.slurm import SlurmRun
13from sparkle.platform.cli_types import TEXT
14from sparkle.CLI.help import logging
15from sparkle.CLI.help import argparse_custom as ac
16from sparkle.CLI.help import jobs as jobs_help
17from sparkle.CLI.help import global_variables as gv
20def parser_function() -> argparse.ArgumentParser:
21 """Create parser for the jobs command."""
22 parser = argparse.ArgumentParser(description="Command to interact with async jobs.")
23 parser.add_argument(*ac.CancelJobsArgument.names, **ac.CancelJobsArgument.kwargs)
24 parser.add_argument(*ac.JobIDsArgument.names, **ac.JobIDsArgument.kwargs)
25 parser.add_argument(*ac.AllJobsArgument.names, **ac.AllJobsArgument.kwargs)
26 return parser
29def create_jobs_table(jobs: list[SlurmRun],
30 markup: bool = True,
31 format: str = "grid") -> str:
32 """Create a table of jobs.
34 Args:
35 runs: List of SlurmRun objects.
36 markup: By default some mark up will be applied to the table.
37 If false, a more plain version will be created.
38 format: The tabulate format to use.
39 per_job: If true, returns a dict with the job ids as keys and
40 The lines per job as values.
42 Returns:
43 A table of jobs as a string.
44 """
45 job_table = [["RunId", "Name", "Quality of Service", "Partition", "Status",
46 "Dependencies", "Finished Jobs", "Run Time"]]
47 for job in jobs:
48 # Count number of jobs that have finished
49 finished_jobs_count = sum(1 for status in job.all_status
50 if status == Status.COMPLETED)
51 if markup: # Format job.status
52 status_text = \
53 TEXT.format_text([TEXT.BOLD], job.status) \
54 if job.status == Status.RUNNING else \
55 (TEXT.format_text([TEXT.ITALIC], job.status)
56 if job.status == Status.COMPLETED else job.status.value)
57 else:
58 status_text = job.status.value
59 job_table.append(
60 [job.run_id,
61 job.name,
62 job.qos,
63 job.partition,
64 status_text,
65 "None" if len(job.dependencies) == 0 else ", ".join(job.dependencies),
66 f"{finished_jobs_count}/{len(job.all_status)}",
67 job.runtime])
68 if markup:
69 job_table = tabulate(job_table, headers="firstrow", tablefmt=format,
70 maxcolwidths=[12, 32, 14, 12, 16, 16, 16, 10])
71 return job_table
74def table_gui(jobs: list[Run]) -> None:
75 """Display a table of running jobs."""
76 jobs = sorted(jobs, key=lambda j: j.run_id)
77 ptg.Button.chars = {"delimiter": ["", ""]} # Disable padding around buttons
79 def cancel_jobs(self: ptg.Button) -> None:
80 """Cancel jobs based on a button click."""
81 job_id = self.label.split("|")[1].strip()
82 job = job_id_map[job_id]
84 def kill_exit(self: ptg.Button) -> None:
85 """Two step protocol of killing the job and removing the popup."""
86 job.kill()
87 manager.remove(popup)
89 button_yes = ptg.Button("Yes", kill_exit)
90 button_no = ptg.Button("No", lambda *_: manager.remove(popup))
92 popup = manager.alert(ptg.Label(f"Cancel job {job_id}?"),
93 button_no, button_yes)
95 refresh_data(self.parent)
97 def refresh_data(self: ptg.Window | ptg.WindowManager, key: str = None) -> None:
98 """Refresh the table."""
99 # Resolve window
100 window = self._windows[-1] if isinstance(self, ptg.WindowManager) else self
101 # Fetch latest data
102 for job in jobs:
103 if job.status in [Status.WAITING, Status.RUNNING]:
104 job.get_latest_job_details()
105 job_table = create_jobs_table(jobs,
106 markup=True).splitlines()
108 if window.width != len(job_table[0]): # Resize window
109 window.width = len(job_table[0])
111 for index, row in enumerate(job_table):
112 if row.startswith("|"):
113 row_id = row.split("|")[1].strip()
114 if (row_id in job_id_map.keys()
115 and job_id_map[row_id].status in [Status.WAITING,
116 Status.RUNNING]):
117 window._widgets[index + 1] = ptg.Button(row, cancel_jobs)
118 else:
119 window._widgets[index + 1] = ptg.Label(row)
120 else:
121 window._widgets[index + 1] = ptg.Label(row)
122 window._widgets[index + 1].parent = window
124 table = create_jobs_table(jobs, markup=True).splitlines()
125 with ptg.WindowManager() as manager:
126 def macro_reload(fmt: str) -> str:
127 """Updates jobs in the table with an interval."""
128 if "last_reload" not in globals():
129 global last_reload
130 last_reload = time.time()
131 diff = time.time() - last_reload
132 interval = 10.0
133 if diff > interval: # Check every 10 seconds
134 last_reload = 0
135 any_running = False
136 for job in jobs:
137 if job.status in [Status.RUNNING, Status.WAITING]:
138 any_running = True
139 job.get_latest_job_details()
140 if not any_running:
141 manager.stop()
142 refresh_data(manager)
143 last_reload = time.time()
144 n_bars = int(diff / 2)
145 return "|" + "█" * n_bars + " " * (4 - n_bars) + "|"
147 ptg.tim.define("!reload", macro_reload)
148 window = (
149 ptg.Window(
150 "[bold]Sparkle Jobs [!reload]%c",
151 width=len(table[0]),
152 box="EMPTY",
153 )
154 )
155 job_id_map = {job.run_id: job for job in jobs}
156 for row in table:
157 if "|" not in row or not row.split("|")[1].strip().isnumeric():
158 window._add_widget(ptg.Label(row))
159 else:
160 window._add_widget(ptg.Button(label=row, onclick=cancel_jobs))
162 manager.add(window)
164 # If all jobs were finished, print final table.
165 if all([j.status not in [Status.WAITING, Status.RUNNING] for j in jobs]):
166 table = create_jobs_table(jobs, format="fancy_grid")
167 print(table)
170def main(argv: list[str]) -> None:
171 """Main function of the jobs command."""
172 # Log command call
173 logging.log_command(sys.argv)
175 # Define command line arguments
176 parser = parser_function()
178 # Process command line arguments
179 args = parser.parse_args(argv)
181 # Filter jobs on relevant status
182 path = gv.settings().DEFAULT_log_output
183 jobs = [run for run in jobs_help.get_runs_from_file(path)
184 if run.status == Status.WAITING or run.status == Status.RUNNING]
185 if args.job_ids: # Filter
186 jobs = [job for job in jobs if job.run_id in args.job_ids]
187 job_ids = [job.run_id for job in jobs]
188 for id in args.job_ids:
189 if id not in job_ids:
190 print(f"WARNING: Job ID {id} was not found ")
192 if len(jobs) == 0:
193 if args.job_ids:
194 print(f"None of the specified jobs are running: {args.job_ids}")
195 sys.exit(-1)
196 print("No jobs running.")
197 if args.cancel:
198 sys.exit(-1)
199 sys.exit(0)
201 if args.cancel:
202 if args.all or args.job_ids:
203 killed_jobs = []
204 for j in jobs:
205 if args.all or j.run_id in args.job_ids:
206 j.kill()
207 killed_jobs.append(j)
208 if len(killed_jobs) == 0:
209 if args.all:
210 print("No jobs to cancel.")
211 sys.exit(0)
212 else:
213 print(f"ERROR: No jobs with ids {args.job_ids} to cancel.")
214 sys.exit(-1)
215 print(f"Canceled {len(killed_jobs)} jobs with IDs: "
216 f"{', '.join([j.run_id for j in killed_jobs])}.")
217 sys.exit(0)
218 else:
219 table_gui(jobs)
220 sys.exit(0)
223if __name__ == "__main__":
224 main(sys.argv[1:])