1#!/usr/bin/env python3 

2"""Sparkle command to wait for one or more other commands to complete execution.""" 

3import sys 

4import signal 

5import time 

6import argparse 

7from pathlib import Path 


9from runrunner.slurm import SlurmRun 

10from runrunner.base import Status 

11from tabulate import tabulate 


13from sparkle.platform.cli_types import VerbosityLevel, TEXT 

14from import logging 

15from import argparse_custom as ac 

16from import global_variables as gv 



19def parser_function() -> argparse.ArgumentParser: 

20 """Define the command line arguments. 


22 Returns: 

23 The argument parser. 

24 """ 

25 parser = argparse.ArgumentParser() 

26 parser.add_argument(*ac.JobIDsArgument.names, **ac.JobIDsArgument.kwargs) 

27 return parser 



30def get_runs_from_file(path: Path, print_error: bool = False) -> list[SlurmRun]: 

31 """Retrieve all run objects from file storage. 


33 Args: 

34 path: Path object where to look recursively for the files. 


36 Returns: 

37 List of all found SlumRun objects. 

38 """ 

39 runs = [] 

40 for file in path.rglob("*.json"): 

41 # TODO: RunRunner should be adapted to have more general methods for runs 

42 # So this method can work for both local and slurm 

43 try: 

44 run_obj = SlurmRun.from_file(file) 

45 runs.append(run_obj) 

46 except Exception as ex: 

47 # Not a (correct) RunRunner JSON file 

48 if print_error: 

49 print(f"[WARNING] Could not load file: {file}. Exception: {ex}") 

50 return runs 



53def wait_for_jobs(path: Path, 

54 check_interval: int, 

55 verbosity: VerbosityLevel = VerbosityLevel.STANDARD, 

56 filter: list[str] = None) -> None: 

57 """Wait for all active jobs to finish executing. 


59 Args: 

60 path: The Path where to look for the stored jobs. 

61 check_interval: The time in seconds between updating the jobs. 

62 verbosity: Amount of information shown. 

63 The lower verbosity means lower computational load. 

64 filter: If present, only show the given job ids. 

65 """ 

66 # Filter jobs on relevant status 

67 jobs = [run for run in get_runs_from_file(path) 

68 if run.status == Status.WAITING or run.status == Status.RUNNING] 


70 if filter is not None: 

71 jobs = [job for job in jobs if job.run_id in filter] 


73 running_jobs = jobs 


75 def signal_handler(num: int, _: any) -> None: 

76 """Create clean exit for CTRL + C.""" 

77 if num == signal.SIGINT: 

78 sys.exit(0) 


80 signal.signal(signal.SIGINT, signal_handler) 

81 # If verbosity is quiet there is no need for further information 

82 if verbosity == VerbosityLevel.QUIET: 

83 prev_jobs = len(running_jobs) + 1 

84 while len(running_jobs) > 0: 

85 if len(running_jobs) < prev_jobs: 

86 print(f"Waiting for {len(running_jobs)} jobs...", flush=True) 

87 time.sleep(check_interval) 

88 prev_jobs = len(running_jobs) 

89 running_jobs = [run for run in running_jobs 

90 if run.status == Status.WAITING 

91 or run.status == Status.RUNNING] 


93 # If verbosity is standard the command will print a table with relevant information 

94 elif verbosity == VerbosityLevel.STANDARD: 

95 # Order in which to display the jobs 

96 status_order = {Status.COMPLETED: 0, Status.RUNNING: 1, Status.WAITING: 2} 

97 while len(running_jobs) > 0: 

98 # Information to be printed to the table 

99 information = [["RunId", "Name", "Partition", "Status", 

100 "Dependencies", "Finished Jobs", "Run Time"]] 

101 running_jobs = [run for run in running_jobs 

102 if run.status == Status.WAITING 

103 or run.status == Status.RUNNING] 

104 sorted_jobs = sorted( 

105 jobs, key=lambda job: (status_order.get(job.status, 4), job.run_id)) 

106 for job in sorted_jobs: 

107 # Count number of jobs that have finished 

108 finished_jobs_count = sum(1 for status in job.all_status 

109 if status == Status.COMPLETED) 

110 # Format job.status 

111 status_text = \ 

112 TEXT.format_text([TEXT.BOLD], job.status) \ 

113 if job.status == Status.RUNNING else \ 

114 (TEXT.format_text([TEXT.ITALIC], job.status) 

115 if job.status == Status.COMPLETED else job.status) 

116 information.append( 

117 [job.run_id, 


119 job.partition, 

120 status_text, 

121 "None" if len(job.dependencies) == 0 

122 else ", ".join(job.dependencies), 

123 f"{finished_jobs_count}/{len(job.all_status)}", 

124 job.runtime]) 

125 # Print the table 

126 table = tabulate(information, headers="firstrow", tablefmt="grid") 

127 print(table) 

128 time.sleep(check_interval) 


130 # Clears the table for the new table to be printed 

131 lines = table.count("\n") + 1 

132 # \033 is the escape character (ESC), 

133 # [{lines}A is the escape sequence that moves the cursor up. 

134 print(f"\033[{lines}A", end="") 

135 # [J is the escape sequence that clears the console from the cursor down 

136 print("\033[J", end="") 


138 print("All jobs done!") 



141if __name__ == "__main__": 

142 # Log command call 

143 logging.log_command(sys.argv) 


145 # Define command line arguments 

146 parser = parser_function() 


148 # Process command line arguments 

149 args = parser.parse_args() 


151 check_interval = gv.settings().get_general_check_interval() 

152 verbosity = gv.settings().get_general_verbosity() 


154 wait_for_jobs(path=gv.settings().DEFAULT_log_output, 

155 check_interval=check_interval, 

156 verbosity=verbosity, 

157 filter=args.job_ids)