Coverage for sparkle/CLI/wait.py: 0%

55 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-11-05 14:48 +0000

1#!/usr/bin/env python3 

2"""Sparkle command to wait for one or more other commands to complete execution.""" 

3import sys 

4import signal 

5import time 

6import argparse 

7from pathlib import Path 

8 

9from runrunner.base import Status 

10 

11from sparkle.platform.cli_types import VerbosityLevel 

12from sparkle.CLI.help import jobs as jobs_help 

13from sparkle.CLI.help import logging 

14from sparkle.CLI.help import argparse_custom as ac 

15from sparkle.CLI.help import global_variables as gv 

16 

17 

18def parser_function() -> argparse.ArgumentParser: 

19 """Define the command line arguments. 

20 

21 Returns: 

22 The argument parser. 

23 """ 

24 parser = argparse.ArgumentParser( 

25 description="Wait for async jobs to finish. Gives periodic updates in table " 

26 " format about each job.") 

27 parser.add_argument(*ac.JobIDsArgument.names, **ac.JobIDsArgument.kwargs) 

28 return parser 

29 

30 

31def wait_for_jobs(path: Path, 

32 check_interval: int, 

33 verbosity: VerbosityLevel = VerbosityLevel.STANDARD, 

34 filter: list[str] = None) -> None: 

35 """Wait for all active jobs to finish executing. 

36 

37 Args: 

38 path: The Path where to look for the stored jobs. 

39 check_interval: The time in seconds between updating the jobs. 

40 verbosity: Amount of information shown. 

41 The lower verbosity means lower computational load. 

42 filter: If present, only show the given job ids. 

43 """ 

44 # Filter jobs on relevant status 

45 jobs = [run for run in jobs_help.get_runs_from_file(path) 

46 if run.status == Status.WAITING or run.status == Status.RUNNING] 

47 

48 if filter is not None: 

49 jobs = [job for job in jobs if job.run_id in filter] 

50 

51 running_jobs = jobs 

52 

53 def signal_handler(num: int, _: any) -> None: 

54 """Create clean exit for CTRL + C.""" 

55 if num == signal.SIGINT: 

56 sys.exit() 

57 signal.signal(signal.SIGINT, signal_handler) 

58 

59 # If verbosity is quiet there is no need for further information 

60 if verbosity == VerbosityLevel.QUIET: 

61 prev_jobs = len(running_jobs) + 1 

62 while len(running_jobs) > 0: 

63 if len(running_jobs) < prev_jobs: 

64 print(f"Waiting for {len(running_jobs)} jobs...", flush=False) 

65 time.sleep(check_interval) 

66 prev_jobs = len(running_jobs) 

67 running_jobs = [run for run in running_jobs 

68 if run.status == Status.WAITING 

69 or run.status == Status.RUNNING] 

70 

71 # If verbosity is standard the command will print a table with relevant information 

72 elif verbosity == VerbosityLevel.STANDARD: 

73 # Order in which to display the jobs 

74 status_order = {Status.COMPLETED: 0, Status.RUNNING: 1, Status.WAITING: 2} 

75 while len(running_jobs) > 0: 

76 for job in running_jobs: 

77 job.get_latest_job_details() 

78 running_jobs = [run for run in running_jobs 

79 if run.status == Status.WAITING 

80 or run.status == Status.RUNNING] 

81 sorted_jobs = sorted( 

82 jobs, key=lambda job: (status_order.get(job.status, 4), job.run_id)) 

83 table = jobs_help.create_jobs_table(sorted_jobs) 

84 print(table) 

85 time.sleep(check_interval) 

86 

87 # Clears the table for the new table to be printed 

88 lines = table.count("\n") + 1 

89 # \033 is the escape character (ESC), 

90 # [{lines}A is the escape sequence that moves the cursor up. 

91 print(f"\033[{lines}A", end="") 

92 # [J is the escape sequence that clears the console from the cursor down 

93 print("\033[J", end="") 

94 

95 print("All jobs done!") 

96 

97 

98def main(argv: list[str]) -> None: 

99 """Main function of the wait command.""" 

100 # Log command call 

101 logging.log_command(sys.argv) 

102 

103 # Define command line arguments 

104 parser = parser_function() 

105 

106 # Process command line arguments 

107 args = parser.parse_args(argv) 

108 

109 check_interval = gv.settings().get_general_check_interval() 

110 verbosity = gv.settings().get_general_verbosity() 

111 

112 wait_for_jobs(path=gv.settings().DEFAULT_log_output, 

113 check_interval=check_interval, 

114 verbosity=verbosity, 

115 filter=args.job_ids) 

116 

117 

118if __name__ == "__main__": 

119 main(sys.argv[1:])