Coverage for sparkle/CLI/jobs.py: 24%

165 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-09-29 10:17 +0000

1#!/usr/bin/env python3 

2"""Command to interact with async jobs.""" 

3 

4import sys 

5import time 

6import argparse 

7 

8import pytermgui as ptg 

9from tabulate import tabulate 

10 

11from runrunner.base import Status, Run 

12from runrunner.slurm import SlurmRun 

13 

14from sparkle.platform.cli_types import TEXT 

15from sparkle.CLI.help import logging 

16from sparkle.CLI.help import argparse_custom as ac 

17from sparkle.CLI.help import jobs as jobs_help 

18from sparkle.CLI.help import global_variables as gv 

19 

20 

21def parser_function() -> argparse.ArgumentParser: 

22 """Create parser for the jobs command.""" 

23 parser = argparse.ArgumentParser( 

24 description="Command to interact with async jobs. " 

25 "The command starts an interactive " 

26 "table when no flags are given. Jobs " 

27 "can be selected for cancelling in the " 

28 "table and non activate jobs can be " 

29 "flushed by pressing the spacebar." 

30 ) 

31 parser.add_argument(*ac.CancelJobsArgument.names, **ac.CancelJobsArgument.kwargs) 

32 parser.add_argument(*ac.JobIDsArgument.names, **ac.JobIDsArgument.kwargs) 

33 parser.add_argument(*ac.AllJobsArgument.names, **ac.AllJobsArgument.kwargs) 

34 return parser 

35 

36 

37def create_jobs_table( 

38 jobs: list[SlurmRun], markup: bool = True, format: str = "grid" 

39) -> str: 

40 """Create a table of jobs. 

41 

42 Args: 

43 jobs: List of SlurmRun objects. 

44 markup: By default some mark up will be applied to the table. 

45 If false, a more plain version will be created. 

46 format: The tabulate format to use. 

47 

48 Returns: 

49 A table of jobs as a string. 

50 """ 

51 job_table = [ 

52 [ 

53 "RunId", 

54 "Name", 

55 "Quality of Service", 

56 "Partition", 

57 "Status", 

58 "Dependencies", 

59 "Finished Jobs", 

60 "Run Time", 

61 ] 

62 ] 

63 for job in jobs: 

64 # Count number of jobs that have finished 

65 finished_jobs_count = sum( 

66 1 for status in job.all_status if status == Status.COMPLETED 

67 ) 

68 if markup: # Format job.status 

69 status_text = ( 

70 TEXT.format_text([TEXT.BOLD], job.status) 

71 if job.status == Status.RUNNING 

72 else ( 

73 TEXT.format_text([TEXT.ITALIC], job.status) 

74 if job.status == Status.COMPLETED 

75 else job.status.value 

76 ) 

77 ) 

78 else: 

79 status_text = job.status.value 

80 job_table.append( 

81 [ 

82 job.run_id, 

83 job.name, 

84 job.qos, 

85 job.partition, 

86 status_text, 

87 "None" if len(job.dependencies) == 0 else ", ".join(job.dependencies), 

88 f"{finished_jobs_count}/{len(job.all_status)}", 

89 job.runtime, 

90 ] 

91 ) 

92 if markup: 

93 job_table = tabulate( 

94 job_table, 

95 headers="firstrow", 

96 tablefmt=format, 

97 maxcolwidths=[12, 32, 14, 12, 16, 16, 16, 10], 

98 ) 

99 return job_table 

100 

101 

102def table_gui(jobs: list[Run]) -> None: 

103 """Display a table of running jobs.""" 

104 jobs = sorted(jobs, key=lambda j: j.run_id) 

105 ptg.Button.chars = {"delimiter": ["", ""]} # Disable padding around buttons 

106 

107 def cancel_jobs(self: ptg.Button) -> None: 

108 """Cancel jobs based on a button click.""" 

109 job_id = self.label.split("|")[1].strip() 

110 job = job_id_map[job_id] 

111 

112 def kill_exit(self: ptg.Button) -> None: 

113 """Two step protocol of killing the job and removing the popup.""" 

114 job.kill() 

115 manager.remove(popup) 

116 

117 button_yes = ptg.Button("Yes", kill_exit) 

118 button_no = ptg.Button("No", lambda *_: manager.remove(popup)) 

119 

120 popup = manager.alert(ptg.Label(f"Cancel job {job_id}?"), button_no, button_yes) 

121 

122 refresh_data(self.parent) 

123 

124 def refresh_data(self: ptg.Window | ptg.WindowManager, key: str = None) -> None: 

125 """Refresh the table.""" 

126 # Resolve window 

127 window = self._windows[-1] if isinstance(self, ptg.WindowManager) else self 

128 # Fetch latest data 

129 for job in jobs: 

130 if job.status in [Status.WAITING, Status.RUNNING]: 

131 job.get_latest_job_details() 

132 job_table = create_jobs_table(jobs, markup=True).splitlines() 

133 

134 if window.width != len(job_table[0]): # Resize window 

135 window.width = len(job_table[0]) 

136 

137 for index, row in enumerate(job_table): 

138 if row.startswith("|"): 

139 row_id = row.split("|")[1].strip() 

140 if row_id in job_id_map.keys() and job_id_map[row_id].status in [ 

141 Status.WAITING, 

142 Status.RUNNING, 

143 ]: 

144 window._widgets[index + 1] = ptg.Button(row, cancel_jobs) 

145 else: 

146 window._widgets[index + 1] = ptg.Label(row) 

147 else: 

148 window._widgets[index + 1] = ptg.Label(row) 

149 window._widgets[index + 1].parent = window 

150 

151 table = create_jobs_table(jobs, markup=True).splitlines() 

152 with ptg.WindowManager() as manager: 

153 

154 def macro_reload(fmt: str) -> str: 

155 """Updates jobs in the table with an interval.""" 

156 if "last_reload" not in globals(): 

157 global last_reload 

158 last_reload = time.time() 

159 diff = time.time() - last_reload 

160 interval = 10.0 

161 if diff > interval: # Check every 10 seconds 

162 last_reload = 0 

163 any_running = False 

164 for job in jobs: 

165 if job.status in [Status.RUNNING, Status.WAITING]: 

166 any_running = True 

167 job.get_latest_job_details() 

168 if not any_running: 

169 manager.stop() 

170 refresh_data(manager) 

171 last_reload = time.time() 

172 n_bars = int(diff / 2) 

173 return "|" + "█" * n_bars + " " * (4 - n_bars) + "|" 

174 

175 def flush_popup(self: ptg.WindowManager, key: str) -> None: 

176 """Pop up for flushing completed jobs.""" 

177 if len(jobs) <= 1: # Cannot flush the last job 

178 return 

179 flushable_jobs = [] 

180 flushable_job_ids = [] 

181 for job in jobs: 

182 if job.status not in [Status.WAITING, Status.RUNNING]: 

183 flushable_jobs.append(job) 

184 flushable_job_ids.append(job.run_id) 

185 if len(flushable_jobs) == 0: # Nothing to flush 

186 return 

187 

188 def flush(self: ptg.Button) -> None: 

189 """Flush completed jobs.""" 

190 for job in flushable_jobs: 

191 jobs.remove(job) 

192 del job_id_map[job.run_id] 

193 flushable_widgets = [] 

194 table_window = manager._windows[-1] 

195 for iw, widget in enumerate(table_window._widgets): 

196 if isinstance(widget, ptg.Label): 

197 if ( 

198 "|" in widget.value 

199 and widget.value.split("|")[1].strip().isnumeric() 

200 ): 

201 job_id = widget.value.split("|")[1].strip() 

202 if job_id in flushable_job_ids: 

203 flushable_widgets.append(widget) 

204 # Jobs can be multiple rows (labels) in the table window, 

205 # are underlined with a vertical line to seperate jobs 

206 offset = 1 

207 while (len(table_window._widgets) - (iw + offset)) > 0: 

208 current_widget = table_window._widgets[iw + offset] 

209 if not isinstance(current_widget, ptg.Label): 

210 break # This method only cleans labels 

211 flushable_widgets.append(current_widget) 

212 if current_widget.value.startswith("+"): 

213 break # Seperation line, stop 

214 offset += 1 

215 for widget in flushable_widgets: 

216 table_window.remove(widget) 

217 manager.remove(popup) 

218 

219 popup = manager.alert( 

220 ptg.Label("Flush non-active jobs?"), 

221 ptg.Button("Yes", flush), 

222 ptg.Button("No", lambda *_: manager.remove(popup)), 

223 ) 

224 

225 ptg.tim.define("!reload", macro_reload) 

226 window = ptg.Window( 

227 "[bold]Sparkle Jobs [!reload]%c", 

228 width=len(table[0]), 

229 box="EMPTY", 

230 ) 

231 job_id_map = {job.run_id: job for job in jobs} 

232 for row in table: 

233 if "|" not in row or not row.split("|")[1].strip().isnumeric(): 

234 window._add_widget(ptg.Label(row)) 

235 else: 

236 window._add_widget(ptg.Button(label=row, onclick=cancel_jobs)) 

237 

238 manager.add(window) 

239 manager.bind(" ", flush_popup, description="Flush finished jobs") 

240 

241 # If all jobs were finished, print final table. 

242 if all([j.status not in [Status.WAITING, Status.RUNNING] for j in jobs]): 

243 table = create_jobs_table(jobs, format="fancy_grid") 

244 print(table) 

245 

246 

247def main(argv: list[str]) -> None: 

248 """Main function of the jobs command.""" 

249 # Log command call 

250 logging.log_command(sys.argv) 

251 

252 # Define command line arguments 

253 parser = parser_function() 

254 

255 # Process command line arguments 

256 args = parser.parse_args(argv) 

257 

258 # Filter jobs on relevant status 

259 path = gv.settings().DEFAULT_log_output 

260 jobs = [ 

261 run 

262 for run in jobs_help.get_runs_from_file(path) 

263 if run.status == Status.WAITING or run.status == Status.RUNNING 

264 ] 

265 if args.job_ids: # Filter 

266 jobs = [job for job in jobs if job.run_id in args.job_ids] 

267 job_ids = [job.run_id for job in jobs] 

268 for id in args.job_ids: 

269 if id not in job_ids: 

270 print(f"WARNING: Job ID {id} was not found ") 

271 

272 if len(jobs) == 0: 

273 if args.job_ids: 

274 print(f"None of the specified jobs are running: {args.job_ids}") 

275 sys.exit(-1) 

276 print("No jobs running.") 

277 if args.cancel: 

278 sys.exit(-1) 

279 sys.exit(0) 

280 

281 if args.cancel: 

282 if args.all or args.job_ids: 

283 killed_jobs = [] 

284 for j in jobs: 

285 if args.all or j.run_id in args.job_ids: 

286 j.kill() 

287 killed_jobs.append(j) 

288 if len(killed_jobs) == 0: 

289 if args.all: 

290 print("No jobs to cancel.") 

291 sys.exit(0) 

292 else: 

293 print(f"ERROR: No jobs with ids {args.job_ids} to cancel.") 

294 sys.exit(-1) 

295 print( 

296 f"Canceled {len(killed_jobs)} jobs with IDs: " 

297 f"{', '.join([j.run_id for j in killed_jobs])}." 

298 ) 

299 sys.exit(0) 

300 else: 

301 table_gui(jobs) 

302 sys.exit(0) 

303 

304 

305if __name__ == "__main__": 

306 main(sys.argv[1:])