Coverage for src / sparkle / CLI / cleanup.py: 26%

141 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 15:31 +0000

1#!/usr/bin/env python3 

2"""Command to remove temporary files not affecting the platform state.""" 

3 

4import re 

5import math 

6import sys 

7import argparse 

8import shutil 

9 

10from runrunner.base import Status 

11 

12from sparkle.structures import PerformanceDataFrame, FeatureDataFrame 

13 

14from sparkle.CLI.help import logging as sl 

15from sparkle.CLI.help import global_variables as gv 

16from sparkle.CLI.help import argparse_custom as ac 

17from sparkle.CLI.help import snapshot_help as snh 

18from sparkle.CLI.help import jobs as jobs_help 

19from sparkle.CLI.help import resolve_instance_name 

20 

21 

22def parser_function() -> argparse.ArgumentParser: 

23 """Define the command line arguments.""" 

24 parser = argparse.ArgumentParser( 

25 description="Command to clean files from the platform." 

26 ) 

27 parser.add_argument(*ac.CleanupArgumentAll.names, **ac.CleanupArgumentAll.kwargs) 

28 parser.add_argument(*ac.CleanupArgumentLogs.names, **ac.CleanupArgumentLogs.kwargs) 

29 parser.add_argument( 

30 *ac.CleanupArgumentRemove.names, **ac.CleanupArgumentRemove.kwargs 

31 ) 

32 parser.add_argument( 

33 *ac.CleanUpPerformanceDataArgument.names, 

34 **ac.CleanUpPerformanceDataArgument.kwargs, 

35 ) 

36 parser.add_argument( 

37 *ac.CleanUpFeatureDataArgument.names, 

38 **ac.CleanUpFeatureDataArgument.kwargs, 

39 ) 

40 return parser 

41 

42 

43def check_logs_performance_data(performance_data: PerformanceDataFrame) -> int: 

44 """Check if the performance data is missing values that can be extracted from the logs. 

45 

46 Args: 

47 performance_data (PerformanceDataFrame): The performance data. 

48 

49 Returns: 

50 int: The number of updated values. 

51 """ 

52 # empty_indices = performance_data.empty_indices 

53 pattern = re.compile( 

54 r"^(?P<objective>\S+)\s*,\s*" 

55 r"(?P<instance>\S+)\s*,\s*" 

56 r"(?P<run_id>\S+)\s*\|\s*" 

57 r"(?P<solver>\S+)\s*,\s*" 

58 r"(?P<config_id>\S+)\s*:\s*" 

59 r"(?P<target_value>\S+)$" 

60 ) 

61 

62 # Only iterate over slurm log files 

63 log_files = [ 

64 f 

65 for f in gv.settings().DEFAULT_log_output.glob("**/*") 

66 if f.is_file() and f.suffix == ".out" 

67 ] 

68 count = 0 

69 for log in log_files: 

70 for line in log.read_text().splitlines(): 

71 match = pattern.match(line) 

72 if match: 

73 objective = match.group("objective") 

74 instance = match.group("instance") 

75 run_id = int(match.group("run_id")) 

76 solver = match.group("solver") 

77 config_id = match.group("config_id") 

78 target_value = match.group("target_value") 

79 current_value = performance_data.get_value( 

80 solver, instance, config_id, objective, run_id 

81 ) 

82 # TODO: Would be better to extract all nan indices from PDF and check against this? 

83 if ( 

84 ( 

85 isinstance(current_value, (int, float)) 

86 and math.isnan(current_value) 

87 ) 

88 or isinstance(current_value, str) 

89 and current_value == "nan" 

90 ): 

91 performance_data.set_value( 

92 target_value, solver, instance, config_id, objective, run_id 

93 ) 

94 count += 1 

95 if count: 

96 performance_data.save_csv() 

97 return count 

98 

99 

100def check_logs_feature_data(feature_data: FeatureDataFrame) -> int: 

101 """Check if the feature data is missing values that can be extracted from the logs. 

102 

103 Args: 

104 feature_data (FeatureDataFrame): The feature data. 

105 

106 Returns: 

107 int: The number of updated values. 

108 """ 

109 # empty_indices = performance_data.empty_indices 

110 pattern = re.compile( 

111 r"^(?P<extractor>\S+)\s*" 

112 r"(?P<instance>\S+)\s*" 

113 r"(?P<feature_group>\S+)\s*" 

114 r"(?P<feature_name>\S+)\s*\|\s*" 

115 r"(?P<target_value>\S+)$" 

116 ) 

117 

118 # Only iterate over slurm log files 

119 log_files = [ 

120 f 

121 for f in gv.settings().DEFAULT_log_output.glob("**/*") 

122 if f.is_file() and f.suffix == ".out" 

123 ] 

124 count = 0 

125 for log in log_files: 

126 for line in log.read_text().splitlines(): 

127 match = pattern.match(line) 

128 if match: 

129 target_value = float(match.group("target_value")) # Must be a float 

130 if math.isnan(target_value): 

131 continue 

132 extractor = match.group("extractor") 

133 instance = match.group("instance") 

134 feature_group = match.group("feature_group") 

135 feature_name = match.group("feature_name") 

136 current_value = feature_data.get_value( 

137 instance, extractor, feature_group, feature_name 

138 ) 

139 if ( 

140 ( 

141 isinstance(current_value, (int, float)) 

142 and math.isnan(current_value) 

143 ) 

144 or isinstance(current_value, str) 

145 and current_value == "nan" 

146 ): 

147 feature_data.set_value( 

148 instance, 

149 extractor, 

150 feature_group, 

151 feature_name, 

152 target_value, 

153 ) 

154 count += 1 

155 if count: 

156 feature_data.save_csv() 

157 return count 

158 

159 

160def remove_temporary_files() -> None: 

161 """Remove temporary files. Only removes files not affecting the sparkle state.""" 

162 shutil.rmtree(gv.settings().DEFAULT_log_output, ignore_errors=True) 

163 gv.settings().DEFAULT_log_output.mkdir() 

164 

165 

166def main(argv: list[str]) -> None: 

167 """Main function of the cleanup command.""" 

168 # Log command call 

169 sl.log_command(sys.argv, gv.settings().random_state) 

170 

171 # Define command line arguments 

172 parser = parser_function() 

173 

174 # Process command line arguments 

175 args = parser.parse_args(argv) 

176 

177 if args.performance_data: 

178 # Check if we can cleanup the PerformanceDataFrame if necessary 

179 running_jobs = jobs_help.get_runs_from_file( 

180 gv.settings().DEFAULT_log_output, filter=[Status.WAITING, Status.RUNNING] 

181 ) 

182 if len(running_jobs) > 0: 

183 print("WARNING: There are still running jobs! Continue cleaning? [y/n]") 

184 a = input() 

185 if a != "y": 

186 sys.exit(0) 

187 

188 performance_data = PerformanceDataFrame( 

189 gv.settings().DEFAULT_performance_data_path 

190 ) 

191 count = check_logs_performance_data(performance_data) 

192 print( 

193 f"Extracted {count} values from the logs and placed them in the PerformanceDataFrame." 

194 ) 

195 

196 # Remove empty configurations 

197 removed_configurations = 0 

198 for solver, configurations in performance_data.configurations.items(): 

199 for config_id, config in configurations.items(): 

200 if config_id == PerformanceDataFrame.default_configuration: 

201 continue 

202 if not config: # Empty configuration, remove 

203 performance_data.remove_configuration(solver, config_id) 

204 removed_configurations += 1 

205 if removed_configurations: 

206 print( 

207 f"Removed {removed_configurations} empty configurations from the " 

208 "Performance DataFrame." 

209 ) 

210 

211 index_num = len(performance_data.index) 

212 # We only clean lines that are completely empty 

213 performance_data.remove_empty_runs() 

214 print( 

215 f"Removed {index_num - len(performance_data.index)} rows from the " 

216 f"Performance DataFrame, leaving {len(performance_data.index)} rows." 

217 ) 

218 

219 # Sanity check all indices, clean lines that are broken 

220 # NOTE: This check is quite e 

221 objective_errors, instance_errors, run_id_errors = 0, 0, 0 

222 known_objectives = [o.name for o in gv.settings().objectives] 

223 wrong_indices = [] 

224 for objective, instance, run_id in performance_data.index: 

225 if objective not in known_objectives: 

226 objective_errors += 1 

227 wrong_indices.append((objective, instance, run_id)) 

228 # print("Objective issue:", objective) 

229 elif isinstance(run_id, str) and not run_id.isdigit(): 

230 run_id_errors += 1 

231 wrong_indices.append((objective, instance, run_id)) 

232 # print("Run id issue:", run_id) 

233 else: 

234 # NOTE: This check is very expensive, and it would be better if we could pass all the instances at once instead 

235 instance_path = resolve_instance_name( 

236 instance, target=gv.settings().DEFAULT_instance_dir 

237 ) 

238 if instance_path is None: 

239 instance_errors += 1 

240 wrong_indices.append((objective, instance, run_id)) 

241 if wrong_indices: 

242 print( 

243 f"Found {len(wrong_indices)} in the PerformanceDataFrame ({objective_errors} objective errors, {instance_errors} instance errors, {run_id_errors} run id errors).\n" 

244 "Removing from PerformanceDataFrame..." 

245 ) 

246 performance_data.drop(wrong_indices, inplace=True) 

247 print( 

248 f"Removed {len(wrong_indices)} rows from the PerformanceDataFrame, leaving {len(performance_data.index)} rows." 

249 ) 

250 performance_data.save_csv() 

251 

252 if args.feature_data: 

253 running_jobs = jobs_help.get_runs_from_file( 

254 gv.settings().DEFAULT_log_output, filter=[Status.WAITING, Status.RUNNING] 

255 ) 

256 if len(running_jobs) > 0: 

257 print("WARNING: There are still running jobs! Continue cleaning? [y/n]") 

258 a = input() 

259 if a != "y": 

260 sys.exit(0) 

261 feature_data = FeatureDataFrame(gv.settings().DEFAULT_feature_data_path) 

262 count = check_logs_feature_data(feature_data) 

263 print( 

264 f"Extracted {count} values from the logs and placed them in the FeatureDataFrame." 

265 ) 

266 feature_data.save_csv() 

267 # TODO: Can do other cleanup like index verification and empty line removal etc 

268 # For example, we can check if each index references a valid instance, if not, remove the line 

269 

270 if args.all: 

271 shutil.rmtree(gv.settings().DEFAULT_output, ignore_errors=True) 

272 snh.create_working_dirs() 

273 print("Removed all output files from the platform!") 

274 elif args.remove: 

275 snh.remove_current_platform() 

276 snh.create_working_dirs() 

277 print("Cleaned platform of all files!") 

278 elif args.logs: 

279 remove_temporary_files() 

280 print("Cleaned platform of log files!") 

281 elif not args.performance_data and not args.feature_data: 

282 print(parser.print_help()) 

283 sys.exit(1) 

284 sys.exit(0) 

285 

286 

287if __name__ == "__main__": 

288 main(sys.argv[1:])