Coverage for src/sparkle/CLI/cleanup.py: 34%
86 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-15 14:11 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-15 14:11 +0000
1#!/usr/bin/env python3
2"""Command to remove temporary files not affecting the platform state."""
4import re
5import sys
6import argparse
7import shutil
9from sparkle.structures import PerformanceDataFrame
11from sparkle.CLI.help import logging as sl
12from sparkle.CLI.help import global_variables as gv
13from sparkle.CLI.help import argparse_custom as ac
14from sparkle.CLI.help import snapshot_help as snh
15from sparkle.CLI.help import jobs as jobs_help
18def parser_function() -> argparse.ArgumentParser:
19 """Define the command line arguments."""
20 parser = argparse.ArgumentParser(
21 description="Command to clean files from the platform."
22 )
23 parser.add_argument(*ac.CleanupArgumentAll.names, **ac.CleanupArgumentAll.kwargs)
24 parser.add_argument(*ac.CleanupArgumentLogs.names, **ac.CleanupArgumentLogs.kwargs)
25 parser.add_argument(
26 *ac.CleanupArgumentRemove.names, **ac.CleanupArgumentRemove.kwargs
27 )
28 parser.add_argument(
29 *ac.CleanUpPerformanceDataArgument.names,
30 **ac.CleanUpPerformanceDataArgument.kwargs,
31 )
32 return parser
35def check_logs_performance_data(performance_data: PerformanceDataFrame) -> int:
36 """Check if the performance data is missing values that can be extracted from the logs.
38 Args:
39 performance_data (PerformanceDataFrame): The performance data.
41 Returns:
42 int: The number of updated values.
43 """
44 # empty_indices = performance_data.empty_indices
45 pattern = re.compile(
46 r"^(?P<objective>\S+)\s*,\s*"
47 r"(?P<instance>\S+)\s*,\s*"
48 r"(?P<run_id>\S+)\s*\|\s*"
49 r"(?P<solver>\S+)\s*,\s*"
50 r"(?P<config_id>\S+)\s*:\s*"
51 r"(?P<target_value>\S+)$"
52 )
53 import math
55 # Only iterate over slurm log files
56 log_files = [
57 f
58 for f in gv.settings().DEFAULT_log_output.glob("**/*")
59 if f.is_file() and f.suffix == ".out"
60 ]
61 count = 0
62 for log in log_files:
63 for line in log.read_text().splitlines():
64 match = pattern.match(line)
65 if match:
66 objective = match.group("objective")
67 instance = match.group("instance")
68 run_id = int(match.group("run_id"))
69 solver = match.group("solver")
70 config_id = match.group("config_id")
71 target_value = match.group("target_value")
72 current_value = performance_data.get_value(
73 solver, instance, config_id, objective, run_id
74 )
75 # TODO: Would be better to extract all nan indices from PDF and check against this?
76 if (
77 (
78 isinstance(current_value, (int, float))
79 and math.isnan(current_value)
80 )
81 or isinstance(current_value, str)
82 and current_value == "nan"
83 ):
84 performance_data.set_value(
85 target_value, solver, instance, config_id, objective, run_id
86 )
87 count += 1
88 if count:
89 performance_data.save_csv()
90 return count
93def remove_temporary_files() -> None:
94 """Remove temporary files. Only removes files not affecting the sparkle state."""
95 shutil.rmtree(gv.settings().DEFAULT_log_output, ignore_errors=True)
96 gv.settings().DEFAULT_log_output.mkdir()
99def main(argv: list[str]) -> None:
100 """Main function of the cleanup command."""
101 # Log command call
102 sl.log_command(sys.argv, gv.settings().random_state)
104 # Define command line arguments
105 parser = parser_function()
107 # Process command line arguments
108 args = parser.parse_args(argv)
110 if args.performance_data:
111 # Check if we can cleanup the PerformanceDataFrame if necessary
112 from runrunner.base import Status
114 running_jobs = jobs_help.get_runs_from_file(
115 gv.settings().DEFAULT_log_output, filter=[Status.WAITING, Status.RUNNING]
116 )
117 if len(running_jobs) > 0:
118 print("WARNING: There are still running jobs! Continue cleaning? [y/n]")
119 a = input()
120 if a != "y":
121 sys.exit(0)
123 performance_data = PerformanceDataFrame(
124 gv.settings().DEFAULT_performance_data_path
125 )
126 count = check_logs_performance_data(performance_data)
127 print(
128 f"Extracted {count} values from the logs and placed them in the PerformanceDataFrame."
129 )
131 # Remove empty configurations
132 removed_configurations = 0
133 for solver, configurations in performance_data.configurations.items():
134 for config_id, config in configurations.items():
135 if config_id == PerformanceDataFrame.default_configuration:
136 continue
137 if not config: # Empty configuration, remove
138 performance_data.remove_configuration(solver, config_id)
139 removed_configurations += 1
140 if removed_configurations:
141 performance_data.save_csv()
142 print(
143 f"Removed {removed_configurations} empty configurations from the "
144 "Performance DataFrame."
145 )
147 index_num = len(performance_data.index)
148 # We only clean lines that are completely empty
149 performance_data.remove_empty_runs()
150 performance_data.save_csv()
151 print(
152 f"Removed {index_num - len(performance_data.index)} rows from the "
153 f"Performance DataFrame, leaving {len(performance_data.index)} rows."
154 )
156 if args.all:
157 shutil.rmtree(gv.settings().DEFAULT_output, ignore_errors=True)
158 snh.create_working_dirs()
159 print("Removed all output files from the platform!")
160 elif args.remove:
161 snh.remove_current_platform()
162 snh.create_working_dirs()
163 print("Cleaned platform of all files!")
164 elif args.logs:
165 remove_temporary_files()
166 print("Cleaned platform of log files!")
167 sys.exit(0)
170if __name__ == "__main__":
171 main(sys.argv[1:])