Coverage for sparkle/CLI/jobs.py: 24%

165 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-03-05 13:48 +0000

1#!/usr/bin/env python3 

2"""Command to interact with async jobs.""" 

3import sys 

4import time 

5import argparse 

6 

7import pytermgui as ptg 

8from tabulate import tabulate 

9 

10from runrunner.base import Status, Run 

11from runrunner.slurm import SlurmRun 

12 

13from sparkle.platform.cli_types import TEXT 

14from sparkle.CLI.help import logging 

15from sparkle.CLI.help import argparse_custom as ac 

16from sparkle.CLI.help import jobs as jobs_help 

17from sparkle.CLI.help import global_variables as gv 

18 

19 

20def parser_function() -> argparse.ArgumentParser: 

21 """Create parser for the jobs command.""" 

22 parser = argparse.ArgumentParser(description="Command to interact with async jobs. " 

23 "The command starts an interactive " 

24 "table when no flags are given. Jobs " 

25 "can be selected for cancelling in the " 

26 "table and non activate jobs can be " 

27 "flushed by pressing the spacebar.") 

28 parser.add_argument(*ac.CancelJobsArgument.names, **ac.CancelJobsArgument.kwargs) 

29 parser.add_argument(*ac.JobIDsArgument.names, **ac.JobIDsArgument.kwargs) 

30 parser.add_argument(*ac.AllJobsArgument.names, **ac.AllJobsArgument.kwargs) 

31 return parser 

32 

33 

34def create_jobs_table(jobs: list[SlurmRun], 

35 markup: bool = True, 

36 format: str = "grid") -> str: 

37 """Create a table of jobs. 

38 

39 Args: 

40 runs: List of SlurmRun objects. 

41 markup: By default some mark up will be applied to the table. 

42 If false, a more plain version will be created. 

43 format: The tabulate format to use. 

44 per_job: If true, returns a dict with the job ids as keys and 

45 The lines per job as values. 

46 

47 Returns: 

48 A table of jobs as a string. 

49 """ 

50 job_table = [["RunId", "Name", "Quality of Service", "Partition", "Status", 

51 "Dependencies", "Finished Jobs", "Run Time"]] 

52 for job in jobs: 

53 # Count number of jobs that have finished 

54 finished_jobs_count = sum(1 for status in job.all_status 

55 if status == Status.COMPLETED) 

56 if markup: # Format job.status 

57 status_text = \ 

58 TEXT.format_text([TEXT.BOLD], job.status) \ 

59 if job.status == Status.RUNNING else \ 

60 (TEXT.format_text([TEXT.ITALIC], job.status) 

61 if job.status == Status.COMPLETED else job.status.value) 

62 else: 

63 status_text = job.status.value 

64 job_table.append( 

65 [job.run_id, 

66 job.name, 

67 job.qos, 

68 job.partition, 

69 status_text, 

70 "None" if len(job.dependencies) == 0 else ", ".join(job.dependencies), 

71 f"{finished_jobs_count}/{len(job.all_status)}", 

72 job.runtime]) 

73 if markup: 

74 job_table = tabulate(job_table, headers="firstrow", tablefmt=format, 

75 maxcolwidths=[12, 32, 14, 12, 16, 16, 16, 10]) 

76 return job_table 

77 

78 

79def table_gui(jobs: list[Run]) -> None: 

80 """Display a table of running jobs.""" 

81 jobs = sorted(jobs, key=lambda j: j.run_id) 

82 ptg.Button.chars = {"delimiter": ["", ""]} # Disable padding around buttons 

83 

84 def cancel_jobs(self: ptg.Button) -> None: 

85 """Cancel jobs based on a button click.""" 

86 job_id = self.label.split("|")[1].strip() 

87 job = job_id_map[job_id] 

88 

89 def kill_exit(self: ptg.Button) -> None: 

90 """Two step protocol of killing the job and removing the popup.""" 

91 job.kill() 

92 manager.remove(popup) 

93 

94 button_yes = ptg.Button("Yes", kill_exit) 

95 button_no = ptg.Button("No", lambda *_: manager.remove(popup)) 

96 

97 popup = manager.alert(ptg.Label(f"Cancel job {job_id}?"), 

98 button_no, button_yes) 

99 

100 refresh_data(self.parent) 

101 

102 def refresh_data(self: ptg.Window | ptg.WindowManager, key: str = None) -> None: 

103 """Refresh the table.""" 

104 # Resolve window 

105 window = self._windows[-1] if isinstance(self, ptg.WindowManager) else self 

106 # Fetch latest data 

107 for job in jobs: 

108 if job.status in [Status.WAITING, Status.RUNNING]: 

109 job.get_latest_job_details() 

110 job_table = create_jobs_table(jobs, 

111 markup=True).splitlines() 

112 

113 if window.width != len(job_table[0]): # Resize window 

114 window.width = len(job_table[0]) 

115 

116 for index, row in enumerate(job_table): 

117 if row.startswith("|"): 

118 row_id = row.split("|")[1].strip() 

119 if (row_id in job_id_map.keys() 

120 and job_id_map[row_id].status in [Status.WAITING, 

121 Status.RUNNING]): 

122 window._widgets[index + 1] = ptg.Button(row, cancel_jobs) 

123 else: 

124 window._widgets[index + 1] = ptg.Label(row) 

125 else: 

126 window._widgets[index + 1] = ptg.Label(row) 

127 window._widgets[index + 1].parent = window 

128 

129 table = create_jobs_table(jobs, markup=True).splitlines() 

130 with ptg.WindowManager() as manager: 

131 def macro_reload(fmt: str) -> str: 

132 """Updates jobs in the table with an interval.""" 

133 if "last_reload" not in globals(): 

134 global last_reload 

135 last_reload = time.time() 

136 diff = time.time() - last_reload 

137 interval = 10.0 

138 if diff > interval: # Check every 10 seconds 

139 last_reload = 0 

140 any_running = False 

141 for job in jobs: 

142 if job.status in [Status.RUNNING, Status.WAITING]: 

143 any_running = True 

144 job.get_latest_job_details() 

145 if not any_running: 

146 manager.stop() 

147 refresh_data(manager) 

148 last_reload = time.time() 

149 n_bars = int(diff / 2) 

150 return "|" + "█" * n_bars + " " * (4 - n_bars) + "|" 

151 

152 def flush_popup(self: ptg.WindowManager, key: str) -> None: 

153 """Pop up for flushing completed jobs.""" 

154 if len(jobs) <= 1: # Cannot flush the last job 

155 return 

156 flushable_jobs = [] 

157 flushable_job_ids = [] 

158 for job in jobs: 

159 if job.status not in [Status.WAITING, Status.RUNNING]: 

160 flushable_jobs.append(job) 

161 flushable_job_ids.append(job.run_id) 

162 if len(flushable_jobs) == 0: # Nothing to flush 

163 return 

164 

165 def flush(self: ptg.Button) -> None: 

166 """Flush completed jobs.""" 

167 for job in flushable_jobs: 

168 jobs.remove(job) 

169 del job_id_map[job.run_id] 

170 flushable_widgets = [] 

171 table_window = manager._windows[-1] 

172 for iw, widget in enumerate(table_window._widgets): 

173 if isinstance(widget, ptg.Label): 

174 if ("|" in widget.value 

175 and widget.value.split("|")[1].strip().isnumeric()): 

176 job_id = widget.value.split("|")[1].strip() 

177 if job_id in flushable_job_ids: 

178 flushable_widgets.append(widget) 

179 # Jobs can be multiple rows (labels) in the table window, 

180 # are underlined with a vertical line to seperate jobs 

181 offset = 1 

182 while (len(table_window._widgets) - (iw + offset)) > 0: 

183 current_widget = table_window._widgets[iw + offset] 

184 if not isinstance(current_widget, ptg.Label): 

185 break # This method only cleans labels 

186 flushable_widgets.append(current_widget) 

187 if current_widget.value.startswith("+"): 

188 break # Seperation line, stop 

189 offset += 1 

190 for widget in flushable_widgets: 

191 table_window.remove(widget) 

192 manager.remove(popup) 

193 

194 popup = manager.alert(ptg.Label("Flush non-active jobs?"), 

195 ptg.Button("Yes", flush), 

196 ptg.Button("No", lambda *_: manager.remove(popup))) 

197 

198 ptg.tim.define("!reload", macro_reload) 

199 window = ( 

200 ptg.Window( 

201 "[bold]Sparkle Jobs [!reload]%c", 

202 width=len(table[0]), 

203 box="EMPTY", 

204 ) 

205 ) 

206 job_id_map = {job.run_id: job for job in jobs} 

207 for row in table: 

208 if "|" not in row or not row.split("|")[1].strip().isnumeric(): 

209 window._add_widget(ptg.Label(row)) 

210 else: 

211 window._add_widget(ptg.Button(label=row, onclick=cancel_jobs)) 

212 

213 manager.add(window) 

214 manager.bind(" ", flush_popup, description="Flush finished jobs") 

215 

216 # If all jobs were finished, print final table. 

217 if all([j.status not in [Status.WAITING, Status.RUNNING] for j in jobs]): 

218 table = create_jobs_table(jobs, format="fancy_grid") 

219 print(table) 

220 

221 

222def main(argv: list[str]) -> None: 

223 """Main function of the jobs command.""" 

224 # Log command call 

225 logging.log_command(sys.argv) 

226 

227 # Define command line arguments 

228 parser = parser_function() 

229 

230 # Process command line arguments 

231 args = parser.parse_args(argv) 

232 

233 # Filter jobs on relevant status 

234 path = gv.settings().DEFAULT_log_output 

235 jobs = [run for run in jobs_help.get_runs_from_file(path) 

236 if run.status == Status.WAITING or run.status == Status.RUNNING] 

237 if args.job_ids: # Filter 

238 jobs = [job for job in jobs if job.run_id in args.job_ids] 

239 job_ids = [job.run_id for job in jobs] 

240 for id in args.job_ids: 

241 if id not in job_ids: 

242 print(f"WARNING: Job ID {id} was not found ") 

243 

244 if len(jobs) == 0: 

245 if args.job_ids: 

246 print(f"None of the specified jobs are running: {args.job_ids}") 

247 sys.exit(-1) 

248 print("No jobs running.") 

249 if args.cancel: 

250 sys.exit(-1) 

251 sys.exit(0) 

252 

253 if args.cancel: 

254 if args.all or args.job_ids: 

255 killed_jobs = [] 

256 for j in jobs: 

257 if args.all or j.run_id in args.job_ids: 

258 j.kill() 

259 killed_jobs.append(j) 

260 if len(killed_jobs) == 0: 

261 if args.all: 

262 print("No jobs to cancel.") 

263 sys.exit(0) 

264 else: 

265 print(f"ERROR: No jobs with ids {args.job_ids} to cancel.") 

266 sys.exit(-1) 

267 print(f"Canceled {len(killed_jobs)} jobs with IDs: " 

268 f"{', '.join([j.run_id for j in killed_jobs])}.") 

269 sys.exit(0) 

270 else: 

271 table_gui(jobs) 

272 sys.exit(0) 

273 

274 

275if __name__ == "__main__": 

276 main(sys.argv[1:])