Coverage for sparkle/CLI/wait.py: 0%
67 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-09-27 09:10 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-09-27 09:10 +0000
1#!/usr/bin/env python3
2"""Sparkle command to wait for one or more other commands to complete execution."""
3import sys
4import signal
5import time
6import argparse
7from pathlib import Path
9from runrunner.slurm import SlurmRun
10from runrunner.base import Status
11from tabulate import tabulate
13from sparkle.platform.cli_types import VerbosityLevel, TEXT
14from sparkle.CLI.help import logging
15from sparkle.CLI.help import argparse_custom as ac
16from sparkle.CLI.help import global_variables as gv
19def parser_function() -> argparse.ArgumentParser:
20 """Define the command line arguments.
22 Returns:
23 The argument parser.
24 """
25 parser = argparse.ArgumentParser()
26 parser.add_argument(*ac.JobIDsArgument.names, **ac.JobIDsArgument.kwargs)
27 return parser
30def get_runs_from_file(path: Path, print_error: bool = False) -> list[SlurmRun]:
31 """Retrieve all run objects from file storage.
33 Args:
34 path: Path object where to look recursively for the files.
36 Returns:
37 List of all found SlumRun objects.
38 """
39 runs = []
40 for file in path.rglob("*.json"):
41 # TODO: RunRunner should be adapted to have more general methods for runs
42 # So this method can work for both local and slurm
43 try:
44 run_obj = SlurmRun.from_file(file)
45 runs.append(run_obj)
46 except Exception as ex:
47 # Not a (correct) RunRunner JSON file
48 if print_error:
49 print(f"[WARNING] Could not load file: {file}. Exception: {ex}")
50 return runs
53def wait_for_jobs(path: Path,
54 check_interval: int,
55 verbosity: VerbosityLevel = VerbosityLevel.STANDARD,
56 filter: list[str] = None) -> None:
57 """Wait for all active jobs to finish executing.
59 Args:
60 path: The Path where to look for the stored jobs.
61 check_interval: The time in seconds between updating the jobs.
62 verbosity: Amount of information shown.
63 The lower verbosity means lower computational load.
64 filter: If present, only show the given job ids.
65 """
66 # Filter jobs on relevant status
67 jobs = [run for run in get_runs_from_file(path)
68 if run.status == Status.WAITING or run.status == Status.RUNNING]
70 if filter is not None:
71 jobs = [job for job in jobs if job.run_id in filter]
73 running_jobs = jobs
75 def signal_handler(num: int, _: any) -> None:
76 """Create clean exit for CTRL + C."""
77 if num == signal.SIGINT:
78 sys.exit(0)
80 signal.signal(signal.SIGINT, signal_handler)
81 # If verbosity is quiet there is no need for further information
82 if verbosity == VerbosityLevel.QUIET:
83 prev_jobs = len(running_jobs) + 1
84 while len(running_jobs) > 0:
85 if len(running_jobs) < prev_jobs:
86 print(f"Waiting for {len(running_jobs)} jobs...", flush=True)
87 time.sleep(check_interval)
88 prev_jobs = len(running_jobs)
89 running_jobs = [run for run in running_jobs
90 if run.status == Status.WAITING
91 or run.status == Status.RUNNING]
93 # If verbosity is standard the command will print a table with relevant information
94 elif verbosity == VerbosityLevel.STANDARD:
95 # Order in which to display the jobs
96 status_order = {Status.COMPLETED: 0, Status.RUNNING: 1, Status.WAITING: 2}
97 while len(running_jobs) > 0:
98 # Information to be printed to the table
99 information = [["RunId", "Name", "Partition", "Status",
100 "Dependencies", "Finished Jobs", "Run Time"]]
101 running_jobs = [run for run in running_jobs
102 if run.status == Status.WAITING
103 or run.status == Status.RUNNING]
104 sorted_jobs = sorted(
105 jobs, key=lambda job: (status_order.get(job.status, 4), job.run_id))
106 for job in sorted_jobs:
107 # Count number of jobs that have finished
108 finished_jobs_count = sum(1 for status in job.all_status
109 if status == Status.COMPLETED)
110 # Format job.status
111 status_text = \
112 TEXT.format_text([TEXT.BOLD], job.status) \
113 if job.status == Status.RUNNING else \
114 (TEXT.format_text([TEXT.ITALIC], job.status)
115 if job.status == Status.COMPLETED else job.status)
116 information.append(
117 [job.run_id,
118 job.name,
119 job.partition,
120 status_text,
121 "None" if len(job.dependencies) == 0
122 else ", ".join(job.dependencies),
123 f"{finished_jobs_count}/{len(job.all_status)}",
124 job.runtime])
125 # Print the table
126 table = tabulate(information, headers="firstrow", tablefmt="grid")
127 print(table)
128 time.sleep(check_interval)
130 # Clears the table for the new table to be printed
131 lines = table.count("\n") + 1
132 # \033 is the escape character (ESC),
133 # [{lines}A is the escape sequence that moves the cursor up.
134 print(f"\033[{lines}A", end="")
135 # [J is the escape sequence that clears the console from the cursor down
136 print("\033[J", end="")
138 print("All jobs done!")
141if __name__ == "__main__":
142 # Log command call
143 logging.log_command(sys.argv)
145 # Define command line arguments
146 parser = parser_function()
148 # Process command line arguments
149 args = parser.parse_args()
151 check_interval = gv.settings().get_general_check_interval()
152 verbosity = gv.settings().get_general_verbosity()
154 wait_for_jobs(path=gv.settings().DEFAULT_log_output,
155 check_interval=check_interval,
156 verbosity=verbosity,
157 filter=args.job_ids)