Coverage for sparkle/CLI/jobs.py: 31%

128 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-07 15:22 +0000

1#!/usr/bin/env python3 

2"""Command to interact with async jobs.""" 

3import sys 

4import time 

5import argparse 

6 

7import pytermgui as ptg 

8from tabulate import tabulate 

9 

10from runrunner.base import Status, Run 

11from runrunner.slurm import SlurmRun 

12 

13from sparkle.platform.cli_types import TEXT 

14from sparkle.CLI.help import logging 

15from sparkle.CLI.help import argparse_custom as ac 

16from sparkle.CLI.help import jobs as jobs_help 

17from sparkle.CLI.help import global_variables as gv 

18 

19 

20def parser_function() -> argparse.ArgumentParser: 

21 """Create parser for the jobs command.""" 

22 parser = argparse.ArgumentParser(description="Command to interact with async jobs.") 

23 parser.add_argument(*ac.CancelJobsArgument.names, **ac.CancelJobsArgument.kwargs) 

24 parser.add_argument(*ac.JobIDsArgument.names, **ac.JobIDsArgument.kwargs) 

25 parser.add_argument(*ac.AllJobsArgument.names, **ac.AllJobsArgument.kwargs) 

26 return parser 

27 

28 

29def create_jobs_table(jobs: list[SlurmRun], 

30 markup: bool = True, 

31 format: str = "grid") -> str: 

32 """Create a table of jobs. 

33 

34 Args: 

35 runs: List of SlurmRun objects. 

36 markup: By default some mark up will be applied to the table. 

37 If false, a more plain version will be created. 

38 format: The tabulate format to use. 

39 per_job: If true, returns a dict with the job ids as keys and 

40 The lines per job as values. 

41 

42 Returns: 

43 A table of jobs as a string. 

44 """ 

45 job_table = [["RunId", "Name", "Quality of Service", "Partition", "Status", 

46 "Dependencies", "Finished Jobs", "Run Time"]] 

47 for job in jobs: 

48 # Count number of jobs that have finished 

49 finished_jobs_count = sum(1 for status in job.all_status 

50 if status == Status.COMPLETED) 

51 if markup: # Format job.status 

52 status_text = \ 

53 TEXT.format_text([TEXT.BOLD], job.status) \ 

54 if job.status == Status.RUNNING else \ 

55 (TEXT.format_text([TEXT.ITALIC], job.status) 

56 if job.status == Status.COMPLETED else job.status.value) 

57 else: 

58 status_text = job.status.value 

59 job_table.append( 

60 [job.run_id, 

61 job.name, 

62 job.qos, 

63 job.partition, 

64 status_text, 

65 "None" if len(job.dependencies) == 0 else ", ".join(job.dependencies), 

66 f"{finished_jobs_count}/{len(job.all_status)}", 

67 job.runtime]) 

68 if markup: 

69 job_table = tabulate(job_table, headers="firstrow", tablefmt=format, 

70 maxcolwidths=[12, 32, 14, 12, 16, 16, 16, 10]) 

71 return job_table 

72 

73 

74def table_gui(jobs: list[Run]) -> None: 

75 """Display a table of running jobs.""" 

76 jobs = sorted(jobs, key=lambda j: j.run_id) 

77 ptg.Button.chars = {"delimiter": ["", ""]} # Disable padding around buttons 

78 

79 def cancel_jobs(self: ptg.Button) -> None: 

80 """Cancel jobs based on a button click.""" 

81 job_id = self.label.split("|")[1].strip() 

82 job = job_id_map[job_id] 

83 

84 def kill_exit(self: ptg.Button) -> None: 

85 """Two step protocol of killing the job and removing the popup.""" 

86 job.kill() 

87 manager.remove(popup) 

88 

89 button_yes = ptg.Button("Yes", kill_exit) 

90 button_no = ptg.Button("No", lambda *_: manager.remove(popup)) 

91 

92 popup = manager.alert(ptg.Label(f"Cancel job {job_id}?"), 

93 button_no, button_yes) 

94 

95 refresh_data(self.parent) 

96 

97 def refresh_data(self: ptg.Window | ptg.WindowManager, key: str = None) -> None: 

98 """Refresh the table.""" 

99 # Resolve window 

100 window = self._windows[-1] if isinstance(self, ptg.WindowManager) else self 

101 # Fetch latest data 

102 for job in jobs: 

103 if job.status in [Status.WAITING, Status.RUNNING]: 

104 job.get_latest_job_details() 

105 job_table = create_jobs_table(jobs, 

106 markup=True).splitlines() 

107 

108 if window.width != len(job_table[0]): # Resize window 

109 window.width = len(job_table[0]) 

110 

111 for index, row in enumerate(job_table): 

112 if row.startswith("|"): 

113 row_id = row.split("|")[1].strip() 

114 if (row_id in job_id_map.keys() 

115 and job_id_map[row_id].status in [Status.WAITING, 

116 Status.RUNNING]): 

117 window._widgets[index + 1] = ptg.Button(row, cancel_jobs) 

118 else: 

119 window._widgets[index + 1] = ptg.Label(row) 

120 else: 

121 window._widgets[index + 1] = ptg.Label(row) 

122 window._widgets[index + 1].parent = window 

123 

124 table = create_jobs_table(jobs, markup=True).splitlines() 

125 with ptg.WindowManager() as manager: 

126 def macro_reload(fmt: str) -> str: 

127 """Updates jobs in the table with an interval.""" 

128 if "last_reload" not in globals(): 

129 global last_reload 

130 last_reload = time.time() 

131 diff = time.time() - last_reload 

132 interval = 10.0 

133 if diff > interval: # Check every 10 seconds 

134 last_reload = 0 

135 any_running = False 

136 for job in jobs: 

137 if job.status in [Status.RUNNING, Status.WAITING]: 

138 any_running = True 

139 job.get_latest_job_details() 

140 if not any_running: 

141 manager.stop() 

142 refresh_data(manager) 

143 last_reload = time.time() 

144 n_bars = int(diff / 2) 

145 return "|" + "█" * n_bars + " " * (4 - n_bars) + "|" 

146 

147 ptg.tim.define("!reload", macro_reload) 

148 window = ( 

149 ptg.Window( 

150 "[bold]Sparkle Jobs [!reload]%c", 

151 width=len(table[0]), 

152 box="EMPTY", 

153 ) 

154 ) 

155 job_id_map = {job.run_id: job for job in jobs} 

156 for row in table: 

157 if "|" not in row or not row.split("|")[1].strip().isnumeric(): 

158 window._add_widget(ptg.Label(row)) 

159 else: 

160 window._add_widget(ptg.Button(label=row, onclick=cancel_jobs)) 

161 

162 manager.add(window) 

163 

164 # If all jobs were finished, print final table. 

165 if all([j.status not in [Status.WAITING, Status.RUNNING] for j in jobs]): 

166 table = create_jobs_table(jobs, format="fancy_grid") 

167 print(table) 

168 

169 

170def main(argv: list[str]) -> None: 

171 """Main function of the jobs command.""" 

172 # Log command call 

173 logging.log_command(sys.argv) 

174 

175 # Define command line arguments 

176 parser = parser_function() 

177 

178 # Process command line arguments 

179 args = parser.parse_args(argv) 

180 

181 # Filter jobs on relevant status 

182 path = gv.settings().DEFAULT_log_output 

183 jobs = [run for run in jobs_help.get_runs_from_file(path) 

184 if run.status == Status.WAITING or run.status == Status.RUNNING] 

185 if args.job_ids: # Filter 

186 jobs = [job for job in jobs if job.run_id in args.job_ids] 

187 job_ids = [job.run_id for job in jobs] 

188 for id in args.job_ids: 

189 if id not in job_ids: 

190 print(f"WARNING: Job ID {id} was not found ") 

191 

192 if len(jobs) == 0: 

193 if args.job_ids: 

194 print(f"None of the specified jobs are running: {args.job_ids}") 

195 sys.exit(-1) 

196 print("No jobs running.") 

197 if args.cancel: 

198 sys.exit(-1) 

199 sys.exit(0) 

200 

201 if args.cancel: 

202 if args.all or args.job_ids: 

203 killed_jobs = [] 

204 for j in jobs: 

205 if args.all or j.run_id in args.job_ids: 

206 j.kill() 

207 killed_jobs.append(j) 

208 if len(killed_jobs) == 0: 

209 if args.all: 

210 print("No jobs to cancel.") 

211 sys.exit(0) 

212 else: 

213 print(f"ERROR: No jobs with ids {args.job_ids} to cancel.") 

214 sys.exit(-1) 

215 print(f"Canceled {len(killed_jobs)} jobs with IDs: " 

216 f"{', '.join([j.run_id for j in killed_jobs])}.") 

217 sys.exit(0) 

218 else: 

219 table_gui(jobs) 

220 sys.exit(0) 

221 

222 

223if __name__ == "__main__": 

224 main(sys.argv[1:])