Coverage for src / sparkle / CLI / cleanup.py: 26%
141 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 15:31 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 15:31 +0000
1#!/usr/bin/env python3
2"""Command to remove temporary files not affecting the platform state."""
4import re
5import math
6import sys
7import argparse
8import shutil
10from runrunner.base import Status
12from sparkle.structures import PerformanceDataFrame, FeatureDataFrame
14from sparkle.CLI.help import logging as sl
15from sparkle.CLI.help import global_variables as gv
16from sparkle.CLI.help import argparse_custom as ac
17from sparkle.CLI.help import snapshot_help as snh
18from sparkle.CLI.help import jobs as jobs_help
19from sparkle.CLI.help import resolve_instance_name
22def parser_function() -> argparse.ArgumentParser:
23 """Define the command line arguments."""
24 parser = argparse.ArgumentParser(
25 description="Command to clean files from the platform."
26 )
27 parser.add_argument(*ac.CleanupArgumentAll.names, **ac.CleanupArgumentAll.kwargs)
28 parser.add_argument(*ac.CleanupArgumentLogs.names, **ac.CleanupArgumentLogs.kwargs)
29 parser.add_argument(
30 *ac.CleanupArgumentRemove.names, **ac.CleanupArgumentRemove.kwargs
31 )
32 parser.add_argument(
33 *ac.CleanUpPerformanceDataArgument.names,
34 **ac.CleanUpPerformanceDataArgument.kwargs,
35 )
36 parser.add_argument(
37 *ac.CleanUpFeatureDataArgument.names,
38 **ac.CleanUpFeatureDataArgument.kwargs,
39 )
40 return parser
43def check_logs_performance_data(performance_data: PerformanceDataFrame) -> int:
44 """Check if the performance data is missing values that can be extracted from the logs.
46 Args:
47 performance_data (PerformanceDataFrame): The performance data.
49 Returns:
50 int: The number of updated values.
51 """
52 # empty_indices = performance_data.empty_indices
53 pattern = re.compile(
54 r"^(?P<objective>\S+)\s*,\s*"
55 r"(?P<instance>\S+)\s*,\s*"
56 r"(?P<run_id>\S+)\s*\|\s*"
57 r"(?P<solver>\S+)\s*,\s*"
58 r"(?P<config_id>\S+)\s*:\s*"
59 r"(?P<target_value>\S+)$"
60 )
62 # Only iterate over slurm log files
63 log_files = [
64 f
65 for f in gv.settings().DEFAULT_log_output.glob("**/*")
66 if f.is_file() and f.suffix == ".out"
67 ]
68 count = 0
69 for log in log_files:
70 for line in log.read_text().splitlines():
71 match = pattern.match(line)
72 if match:
73 objective = match.group("objective")
74 instance = match.group("instance")
75 run_id = int(match.group("run_id"))
76 solver = match.group("solver")
77 config_id = match.group("config_id")
78 target_value = match.group("target_value")
79 current_value = performance_data.get_value(
80 solver, instance, config_id, objective, run_id
81 )
82 # TODO: Would be better to extract all nan indices from PDF and check against this?
83 if (
84 (
85 isinstance(current_value, (int, float))
86 and math.isnan(current_value)
87 )
88 or isinstance(current_value, str)
89 and current_value == "nan"
90 ):
91 performance_data.set_value(
92 target_value, solver, instance, config_id, objective, run_id
93 )
94 count += 1
95 if count:
96 performance_data.save_csv()
97 return count
100def check_logs_feature_data(feature_data: FeatureDataFrame) -> int:
101 """Check if the feature data is missing values that can be extracted from the logs.
103 Args:
104 feature_data (FeatureDataFrame): The feature data.
106 Returns:
107 int: The number of updated values.
108 """
109 # empty_indices = performance_data.empty_indices
110 pattern = re.compile(
111 r"^(?P<extractor>\S+)\s*"
112 r"(?P<instance>\S+)\s*"
113 r"(?P<feature_group>\S+)\s*"
114 r"(?P<feature_name>\S+)\s*\|\s*"
115 r"(?P<target_value>\S+)$"
116 )
118 # Only iterate over slurm log files
119 log_files = [
120 f
121 for f in gv.settings().DEFAULT_log_output.glob("**/*")
122 if f.is_file() and f.suffix == ".out"
123 ]
124 count = 0
125 for log in log_files:
126 for line in log.read_text().splitlines():
127 match = pattern.match(line)
128 if match:
129 target_value = float(match.group("target_value")) # Must be a float
130 if math.isnan(target_value):
131 continue
132 extractor = match.group("extractor")
133 instance = match.group("instance")
134 feature_group = match.group("feature_group")
135 feature_name = match.group("feature_name")
136 current_value = feature_data.get_value(
137 instance, extractor, feature_group, feature_name
138 )
139 if (
140 (
141 isinstance(current_value, (int, float))
142 and math.isnan(current_value)
143 )
144 or isinstance(current_value, str)
145 and current_value == "nan"
146 ):
147 feature_data.set_value(
148 instance,
149 extractor,
150 feature_group,
151 feature_name,
152 target_value,
153 )
154 count += 1
155 if count:
156 feature_data.save_csv()
157 return count
160def remove_temporary_files() -> None:
161 """Remove temporary files. Only removes files not affecting the sparkle state."""
162 shutil.rmtree(gv.settings().DEFAULT_log_output, ignore_errors=True)
163 gv.settings().DEFAULT_log_output.mkdir()
166def main(argv: list[str]) -> None:
167 """Main function of the cleanup command."""
168 # Log command call
169 sl.log_command(sys.argv, gv.settings().random_state)
171 # Define command line arguments
172 parser = parser_function()
174 # Process command line arguments
175 args = parser.parse_args(argv)
177 if args.performance_data:
178 # Check if we can cleanup the PerformanceDataFrame if necessary
179 running_jobs = jobs_help.get_runs_from_file(
180 gv.settings().DEFAULT_log_output, filter=[Status.WAITING, Status.RUNNING]
181 )
182 if len(running_jobs) > 0:
183 print("WARNING: There are still running jobs! Continue cleaning? [y/n]")
184 a = input()
185 if a != "y":
186 sys.exit(0)
188 performance_data = PerformanceDataFrame(
189 gv.settings().DEFAULT_performance_data_path
190 )
191 count = check_logs_performance_data(performance_data)
192 print(
193 f"Extracted {count} values from the logs and placed them in the PerformanceDataFrame."
194 )
196 # Remove empty configurations
197 removed_configurations = 0
198 for solver, configurations in performance_data.configurations.items():
199 for config_id, config in configurations.items():
200 if config_id == PerformanceDataFrame.default_configuration:
201 continue
202 if not config: # Empty configuration, remove
203 performance_data.remove_configuration(solver, config_id)
204 removed_configurations += 1
205 if removed_configurations:
206 print(
207 f"Removed {removed_configurations} empty configurations from the "
208 "Performance DataFrame."
209 )
211 index_num = len(performance_data.index)
212 # We only clean lines that are completely empty
213 performance_data.remove_empty_runs()
214 print(
215 f"Removed {index_num - len(performance_data.index)} rows from the "
216 f"Performance DataFrame, leaving {len(performance_data.index)} rows."
217 )
219 # Sanity check all indices, clean lines that are broken
220 # NOTE: This check is quite e
221 objective_errors, instance_errors, run_id_errors = 0, 0, 0
222 known_objectives = [o.name for o in gv.settings().objectives]
223 wrong_indices = []
224 for objective, instance, run_id in performance_data.index:
225 if objective not in known_objectives:
226 objective_errors += 1
227 wrong_indices.append((objective, instance, run_id))
228 # print("Objective issue:", objective)
229 elif isinstance(run_id, str) and not run_id.isdigit():
230 run_id_errors += 1
231 wrong_indices.append((objective, instance, run_id))
232 # print("Run id issue:", run_id)
233 else:
234 # NOTE: This check is very expensive, and it would be better if we could pass all the instances at once instead
235 instance_path = resolve_instance_name(
236 instance, target=gv.settings().DEFAULT_instance_dir
237 )
238 if instance_path is None:
239 instance_errors += 1
240 wrong_indices.append((objective, instance, run_id))
241 if wrong_indices:
242 print(
243 f"Found {len(wrong_indices)} in the PerformanceDataFrame ({objective_errors} objective errors, {instance_errors} instance errors, {run_id_errors} run id errors).\n"
244 "Removing from PerformanceDataFrame..."
245 )
246 performance_data.drop(wrong_indices, inplace=True)
247 print(
248 f"Removed {len(wrong_indices)} rows from the PerformanceDataFrame, leaving {len(performance_data.index)} rows."
249 )
250 performance_data.save_csv()
252 if args.feature_data:
253 running_jobs = jobs_help.get_runs_from_file(
254 gv.settings().DEFAULT_log_output, filter=[Status.WAITING, Status.RUNNING]
255 )
256 if len(running_jobs) > 0:
257 print("WARNING: There are still running jobs! Continue cleaning? [y/n]")
258 a = input()
259 if a != "y":
260 sys.exit(0)
261 feature_data = FeatureDataFrame(gv.settings().DEFAULT_feature_data_path)
262 count = check_logs_feature_data(feature_data)
263 print(
264 f"Extracted {count} values from the logs and placed them in the FeatureDataFrame."
265 )
266 feature_data.save_csv()
267 # TODO: Can do other cleanup like index verification and empty line removal etc
268 # For example, we can check if each index references a valid instance, if not, remove the line
270 if args.all:
271 shutil.rmtree(gv.settings().DEFAULT_output, ignore_errors=True)
272 snh.create_working_dirs()
273 print("Removed all output files from the platform!")
274 elif args.remove:
275 snh.remove_current_platform()
276 snh.create_working_dirs()
277 print("Cleaned platform of all files!")
278 elif args.logs:
279 remove_temporary_files()
280 print("Cleaned platform of log files!")
281 elif not args.performance_data and not args.feature_data:
282 print(parser.print_help())
283 sys.exit(1)
284 sys.exit(0)
287if __name__ == "__main__":
288 main(sys.argv[1:])