Coverage for sparkle/CLI/wait.py: 0%

67 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-09-27 09:10 +0000

1#!/usr/bin/env python3 

2"""Sparkle command to wait for one or more other commands to complete execution.""" 

3import sys 

4import signal 

5import time 

6import argparse 

7from pathlib import Path 

8 

9from runrunner.slurm import SlurmRun 

10from runrunner.base import Status 

11from tabulate import tabulate 

12 

13from sparkle.platform.cli_types import VerbosityLevel, TEXT 

14from sparkle.CLI.help import logging 

15from sparkle.CLI.help import argparse_custom as ac 

16from sparkle.CLI.help import global_variables as gv 

17 

18 

19def parser_function() -> argparse.ArgumentParser: 

20 """Define the command line arguments. 

21 

22 Returns: 

23 The argument parser. 

24 """ 

25 parser = argparse.ArgumentParser() 

26 parser.add_argument(*ac.JobIDsArgument.names, **ac.JobIDsArgument.kwargs) 

27 return parser 

28 

29 

30def get_runs_from_file(path: Path, print_error: bool = False) -> list[SlurmRun]: 

31 """Retrieve all run objects from file storage. 

32 

33 Args: 

34 path: Path object where to look recursively for the files. 

35 

36 Returns: 

37 List of all found SlumRun objects. 

38 """ 

39 runs = [] 

40 for file in path.rglob("*.json"): 

41 # TODO: RunRunner should be adapted to have more general methods for runs 

42 # So this method can work for both local and slurm 

43 try: 

44 run_obj = SlurmRun.from_file(file) 

45 runs.append(run_obj) 

46 except Exception as ex: 

47 # Not a (correct) RunRunner JSON file 

48 if print_error: 

49 print(f"[WARNING] Could not load file: {file}. Exception: {ex}") 

50 return runs 

51 

52 

53def wait_for_jobs(path: Path, 

54 check_interval: int, 

55 verbosity: VerbosityLevel = VerbosityLevel.STANDARD, 

56 filter: list[str] = None) -> None: 

57 """Wait for all active jobs to finish executing. 

58 

59 Args: 

60 path: The Path where to look for the stored jobs. 

61 check_interval: The time in seconds between updating the jobs. 

62 verbosity: Amount of information shown. 

63 The lower verbosity means lower computational load. 

64 filter: If present, only show the given job ids. 

65 """ 

66 # Filter jobs on relevant status 

67 jobs = [run for run in get_runs_from_file(path) 

68 if run.status == Status.WAITING or run.status == Status.RUNNING] 

69 

70 if filter is not None: 

71 jobs = [job for job in jobs if job.run_id in filter] 

72 

73 running_jobs = jobs 

74 

75 def signal_handler(num: int, _: any) -> None: 

76 """Create clean exit for CTRL + C.""" 

77 if num == signal.SIGINT: 

78 sys.exit(0) 

79 

80 signal.signal(signal.SIGINT, signal_handler) 

81 # If verbosity is quiet there is no need for further information 

82 if verbosity == VerbosityLevel.QUIET: 

83 prev_jobs = len(running_jobs) + 1 

84 while len(running_jobs) > 0: 

85 if len(running_jobs) < prev_jobs: 

86 print(f"Waiting for {len(running_jobs)} jobs...", flush=True) 

87 time.sleep(check_interval) 

88 prev_jobs = len(running_jobs) 

89 running_jobs = [run for run in running_jobs 

90 if run.status == Status.WAITING 

91 or run.status == Status.RUNNING] 

92 

93 # If verbosity is standard the command will print a table with relevant information 

94 elif verbosity == VerbosityLevel.STANDARD: 

95 # Order in which to display the jobs 

96 status_order = {Status.COMPLETED: 0, Status.RUNNING: 1, Status.WAITING: 2} 

97 while len(running_jobs) > 0: 

98 # Information to be printed to the table 

99 information = [["RunId", "Name", "Partition", "Status", 

100 "Dependencies", "Finished Jobs", "Run Time"]] 

101 running_jobs = [run for run in running_jobs 

102 if run.status == Status.WAITING 

103 or run.status == Status.RUNNING] 

104 sorted_jobs = sorted( 

105 jobs, key=lambda job: (status_order.get(job.status, 4), job.run_id)) 

106 for job in sorted_jobs: 

107 # Count number of jobs that have finished 

108 finished_jobs_count = sum(1 for status in job.all_status 

109 if status == Status.COMPLETED) 

110 # Format job.status 

111 status_text = \ 

112 TEXT.format_text([TEXT.BOLD], job.status) \ 

113 if job.status == Status.RUNNING else \ 

114 (TEXT.format_text([TEXT.ITALIC], job.status) 

115 if job.status == Status.COMPLETED else job.status) 

116 information.append( 

117 [job.run_id, 

118 job.name, 

119 job.partition, 

120 status_text, 

121 "None" if len(job.dependencies) == 0 

122 else ", ".join(job.dependencies), 

123 f"{finished_jobs_count}/{len(job.all_status)}", 

124 job.runtime]) 

125 # Print the table 

126 table = tabulate(information, headers="firstrow", tablefmt="grid") 

127 print(table) 

128 time.sleep(check_interval) 

129 

130 # Clears the table for the new table to be printed 

131 lines = table.count("\n") + 1 

132 # \033 is the escape character (ESC), 

133 # [{lines}A is the escape sequence that moves the cursor up. 

134 print(f"\033[{lines}A", end="") 

135 # [J is the escape sequence that clears the console from the cursor down 

136 print("\033[J", end="") 

137 

138 print("All jobs done!") 

139 

140 

141if __name__ == "__main__": 

142 # Log command call 

143 logging.log_command(sys.argv) 

144 

145 # Define command line arguments 

146 parser = parser_function() 

147 

148 # Process command line arguments 

149 args = parser.parse_args() 

150 

151 check_interval = gv.settings().get_general_check_interval() 

152 verbosity = gv.settings().get_general_verbosity() 

153 

154 wait_for_jobs(path=gv.settings().DEFAULT_log_output, 

155 check_interval=check_interval, 

156 verbosity=verbosity, 

157 filter=args.job_ids)