Coverage for sparkle/CLI/wait.py: 0%
55 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-05 14:48 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-05 14:48 +0000
1#!/usr/bin/env python3
2"""Sparkle command to wait for one or more other commands to complete execution."""
3import sys
4import signal
5import time
6import argparse
7from pathlib import Path
9from runrunner.base import Status
11from sparkle.platform.cli_types import VerbosityLevel
12from sparkle.CLI.help import jobs as jobs_help
13from sparkle.CLI.help import logging
14from sparkle.CLI.help import argparse_custom as ac
15from sparkle.CLI.help import global_variables as gv
18def parser_function() -> argparse.ArgumentParser:
19 """Define the command line arguments.
21 Returns:
22 The argument parser.
23 """
24 parser = argparse.ArgumentParser(
25 description="Wait for async jobs to finish. Gives periodic updates in table "
26 " format about each job.")
27 parser.add_argument(*ac.JobIDsArgument.names, **ac.JobIDsArgument.kwargs)
28 return parser
31def wait_for_jobs(path: Path,
32 check_interval: int,
33 verbosity: VerbosityLevel = VerbosityLevel.STANDARD,
34 filter: list[str] = None) -> None:
35 """Wait for all active jobs to finish executing.
37 Args:
38 path: The Path where to look for the stored jobs.
39 check_interval: The time in seconds between updating the jobs.
40 verbosity: Amount of information shown.
41 The lower verbosity means lower computational load.
42 filter: If present, only show the given job ids.
43 """
44 # Filter jobs on relevant status
45 jobs = [run for run in jobs_help.get_runs_from_file(path)
46 if run.status == Status.WAITING or run.status == Status.RUNNING]
48 if filter is not None:
49 jobs = [job for job in jobs if job.run_id in filter]
51 running_jobs = jobs
53 def signal_handler(num: int, _: any) -> None:
54 """Create clean exit for CTRL + C."""
55 if num == signal.SIGINT:
56 sys.exit()
57 signal.signal(signal.SIGINT, signal_handler)
59 # If verbosity is quiet there is no need for further information
60 if verbosity == VerbosityLevel.QUIET:
61 prev_jobs = len(running_jobs) + 1
62 while len(running_jobs) > 0:
63 if len(running_jobs) < prev_jobs:
64 print(f"Waiting for {len(running_jobs)} jobs...", flush=False)
65 time.sleep(check_interval)
66 prev_jobs = len(running_jobs)
67 running_jobs = [run for run in running_jobs
68 if run.status == Status.WAITING
69 or run.status == Status.RUNNING]
71 # If verbosity is standard the command will print a table with relevant information
72 elif verbosity == VerbosityLevel.STANDARD:
73 # Order in which to display the jobs
74 status_order = {Status.COMPLETED: 0, Status.RUNNING: 1, Status.WAITING: 2}
75 while len(running_jobs) > 0:
76 for job in running_jobs:
77 job.get_latest_job_details()
78 running_jobs = [run for run in running_jobs
79 if run.status == Status.WAITING
80 or run.status == Status.RUNNING]
81 sorted_jobs = sorted(
82 jobs, key=lambda job: (status_order.get(job.status, 4), job.run_id))
83 table = jobs_help.create_jobs_table(sorted_jobs)
84 print(table)
85 time.sleep(check_interval)
87 # Clears the table for the new table to be printed
88 lines = table.count("\n") + 1
89 # \033 is the escape character (ESC),
90 # [{lines}A is the escape sequence that moves the cursor up.
91 print(f"\033[{lines}A", end="")
92 # [J is the escape sequence that clears the console from the cursor down
93 print("\033[J", end="")
95 print("All jobs done!")
98def main(argv: list[str]) -> None:
99 """Main function of the wait command."""
100 # Log command call
101 logging.log_command(sys.argv)
103 # Define command line arguments
104 parser = parser_function()
106 # Process command line arguments
107 args = parser.parse_args(argv)
109 check_interval = gv.settings().get_general_check_interval()
110 verbosity = gv.settings().get_general_verbosity()
112 wait_for_jobs(path=gv.settings().DEFAULT_log_output,
113 check_interval=check_interval,
114 verbosity=verbosity,
115 filter=args.job_ids)
118if __name__ == "__main__":
119 main(sys.argv[1:])