Coverage for sparkle/CLI/cleanup.py: 37%
84 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-29 10:17 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-29 10:17 +0000
1#!/usr/bin/env python3
2"""Command to remove temporary files not affecting the platform state."""
4import re
5import sys
6import argparse
7import shutil
9from sparkle.structures import PerformanceDataFrame
11from sparkle.CLI.help import logging as sl
12from sparkle.CLI.help import global_variables as gv
13from sparkle.CLI.help import argparse_custom as ac
14from sparkle.CLI.help import snapshot_help as snh
15from sparkle.CLI.help import jobs as jobs_help
18def parser_function() -> argparse.ArgumentParser:
19 """Define the command line arguments."""
20 parser = argparse.ArgumentParser(
21 description="Command to clean files from the platform."
22 )
23 parser.add_argument(*ac.CleanupArgumentAll.names, **ac.CleanupArgumentAll.kwargs)
24 parser.add_argument(
25 *ac.CleanupArgumentRemove.names, **ac.CleanupArgumentRemove.kwargs
26 )
27 parser.add_argument(
28 *ac.CleanUpPerformanceDataArgument.names,
29 **ac.CleanUpPerformanceDataArgument.kwargs,
30 )
31 return parser
34def check_logs_performance_data(performance_data: PerformanceDataFrame) -> int:
35 """Check if the performance data is missing values that can be extracted from the logs.
37 Args:
38 performance_data (PerformanceDataFrame): The performance data.
40 Returns:
41 int: The number of updated values.
42 """
43 # empty_indices = performance_data.empty_indices
44 pattern = re.compile(
45 r"^(?P<objective>\S+)\s*,\s*"
46 r"(?P<instance>\S+)\s*,\s*"
47 r"(?P<run_id>\S+)\s*\|\s*"
48 r"(?P<solver>\S+)\s*,\s*"
49 r"(?P<config_id>\S+)\s*:\s*"
50 r"(?P<target_value>\S+)$"
51 )
52 import math
54 # Only iterate over slurm log files
55 log_files = [
56 f
57 for f in gv.settings().DEFAULT_log_output.glob("**/*")
58 if f.is_file() and f.suffix == ".out"
59 ]
60 count = 0
61 for log in log_files:
62 for line in log.read_text().splitlines():
63 match = pattern.match(line)
64 if match:
65 objective = match.group("objective")
66 instance = match.group("instance")
67 run_id = int(match.group("run_id"))
68 solver = match.group("solver")
69 config_id = match.group("config_id")
70 target_value = match.group("target_value")
71 current_value = performance_data.get_value(
72 solver, instance, config_id, objective, run_id
73 )
74 # TODO: Would be better to extract all nan indices from PDF and check against this?
75 if (
76 (
77 isinstance(current_value, (int, float))
78 and math.isnan(current_value)
79 )
80 or isinstance(current_value, str)
81 and current_value == "nan"
82 ):
83 performance_data.set_value(
84 target_value, solver, instance, config_id, objective, run_id
85 )
86 count += 1
87 if count:
88 performance_data.save_csv()
89 return count
92def remove_temporary_files() -> None:
93 """Remove temporary files. Only removes files not affecting the sparkle state."""
94 shutil.rmtree(gv.settings().DEFAULT_log_output, ignore_errors=True)
95 gv.settings().DEFAULT_log_output.mkdir()
98def main(argv: list[str]) -> None:
99 """Main function of the cleanup command."""
100 # Log command call
101 sl.log_command(sys.argv, gv.settings().random_state)
103 # Define command line arguments
104 parser = parser_function()
106 # Process command line arguments
107 args = parser.parse_args(argv)
109 if args.performance_data:
110 # Check if we can cleanup the PerformanceDataFrame if necessary
111 from runrunner.base import Status
113 running_jobs = jobs_help.get_runs_from_file(
114 gv.settings().DEFAULT_log_output, filter=[Status.WAITING, Status.RUNNING]
115 )
116 if len(running_jobs) > 0:
117 print("WARNING: There are still running jobs! Continue cleaning? [y/n]")
118 a = input()
119 if a != "y":
120 sys.exit(0)
122 performance_data = PerformanceDataFrame(
123 gv.settings().DEFAULT_performance_data_path
124 )
125 count = check_logs_performance_data(performance_data)
126 print(
127 f"Extracted {count} values from the logs and placed them in the PerformanceDataFrame."
128 )
130 # Remove empty configurations
131 removed_configurations = 0
132 for solver, configurations in performance_data.configurations.items():
133 for config_id, config in configurations.items():
134 if config_id == PerformanceDataFrame.default_configuration:
135 continue
136 if not config: # Empty configuration, remove
137 performance_data.remove_configuration(solver, config_id)
138 removed_configurations += 1
139 if removed_configurations:
140 performance_data.save_csv()
141 print(
142 f"Removed {removed_configurations} empty configurations from the "
143 "Performance DataFrame."
144 )
146 index_num = len(performance_data.index)
147 # We only clean lines that are completely empty
148 performance_data.remove_empty_runs()
149 performance_data.save_csv()
150 print(
151 f"Removed {index_num - len(performance_data.index)} rows from the "
152 f"Performance DataFrame, leaving {len(performance_data.index)} rows."
153 )
155 if args.all:
156 shutil.rmtree(gv.settings().DEFAULT_output, ignore_errors=True)
157 snh.create_working_dirs()
158 print("Removed all output files from the platform!")
159 elif args.remove:
160 snh.remove_current_platform()
161 snh.create_working_dirs()
162 print("Cleaned platform of all files!")
163 else:
164 remove_temporary_files()
165 print("Cleaned platform of temporary files!")
166 sys.exit(0)
169if __name__ == "__main__":
170 main(sys.argv[1:])