Coverage for src/sparkle/CLI/cleanup.py: 34%

86 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-15 14:11 +0000

1#!/usr/bin/env python3 

2"""Command to remove temporary files not affecting the platform state.""" 

3 

4import re 

5import sys 

6import argparse 

7import shutil 

8 

9from sparkle.structures import PerformanceDataFrame 

10 

11from sparkle.CLI.help import logging as sl 

12from sparkle.CLI.help import global_variables as gv 

13from sparkle.CLI.help import argparse_custom as ac 

14from sparkle.CLI.help import snapshot_help as snh 

15from sparkle.CLI.help import jobs as jobs_help 

16 

17 

18def parser_function() -> argparse.ArgumentParser: 

19 """Define the command line arguments.""" 

20 parser = argparse.ArgumentParser( 

21 description="Command to clean files from the platform." 

22 ) 

23 parser.add_argument(*ac.CleanupArgumentAll.names, **ac.CleanupArgumentAll.kwargs) 

24 parser.add_argument(*ac.CleanupArgumentLogs.names, **ac.CleanupArgumentLogs.kwargs) 

25 parser.add_argument( 

26 *ac.CleanupArgumentRemove.names, **ac.CleanupArgumentRemove.kwargs 

27 ) 

28 parser.add_argument( 

29 *ac.CleanUpPerformanceDataArgument.names, 

30 **ac.CleanUpPerformanceDataArgument.kwargs, 

31 ) 

32 return parser 

33 

34 

35def check_logs_performance_data(performance_data: PerformanceDataFrame) -> int: 

36 """Check if the performance data is missing values that can be extracted from the logs. 

37 

38 Args: 

39 performance_data (PerformanceDataFrame): The performance data. 

40 

41 Returns: 

42 int: The number of updated values. 

43 """ 

44 # empty_indices = performance_data.empty_indices 

45 pattern = re.compile( 

46 r"^(?P<objective>\S+)\s*,\s*" 

47 r"(?P<instance>\S+)\s*,\s*" 

48 r"(?P<run_id>\S+)\s*\|\s*" 

49 r"(?P<solver>\S+)\s*,\s*" 

50 r"(?P<config_id>\S+)\s*:\s*" 

51 r"(?P<target_value>\S+)$" 

52 ) 

53 import math 

54 

55 # Only iterate over slurm log files 

56 log_files = [ 

57 f 

58 for f in gv.settings().DEFAULT_log_output.glob("**/*") 

59 if f.is_file() and f.suffix == ".out" 

60 ] 

61 count = 0 

62 for log in log_files: 

63 for line in log.read_text().splitlines(): 

64 match = pattern.match(line) 

65 if match: 

66 objective = match.group("objective") 

67 instance = match.group("instance") 

68 run_id = int(match.group("run_id")) 

69 solver = match.group("solver") 

70 config_id = match.group("config_id") 

71 target_value = match.group("target_value") 

72 current_value = performance_data.get_value( 

73 solver, instance, config_id, objective, run_id 

74 ) 

75 # TODO: Would be better to extract all nan indices from PDF and check against this? 

76 if ( 

77 ( 

78 isinstance(current_value, (int, float)) 

79 and math.isnan(current_value) 

80 ) 

81 or isinstance(current_value, str) 

82 and current_value == "nan" 

83 ): 

84 performance_data.set_value( 

85 target_value, solver, instance, config_id, objective, run_id 

86 ) 

87 count += 1 

88 if count: 

89 performance_data.save_csv() 

90 return count 

91 

92 

93def remove_temporary_files() -> None: 

94 """Remove temporary files. Only removes files not affecting the sparkle state.""" 

95 shutil.rmtree(gv.settings().DEFAULT_log_output, ignore_errors=True) 

96 gv.settings().DEFAULT_log_output.mkdir() 

97 

98 

99def main(argv: list[str]) -> None: 

100 """Main function of the cleanup command.""" 

101 # Log command call 

102 sl.log_command(sys.argv, gv.settings().random_state) 

103 

104 # Define command line arguments 

105 parser = parser_function() 

106 

107 # Process command line arguments 

108 args = parser.parse_args(argv) 

109 

110 if args.performance_data: 

111 # Check if we can cleanup the PerformanceDataFrame if necessary 

112 from runrunner.base import Status 

113 

114 running_jobs = jobs_help.get_runs_from_file( 

115 gv.settings().DEFAULT_log_output, filter=[Status.WAITING, Status.RUNNING] 

116 ) 

117 if len(running_jobs) > 0: 

118 print("WARNING: There are still running jobs! Continue cleaning? [y/n]") 

119 a = input() 

120 if a != "y": 

121 sys.exit(0) 

122 

123 performance_data = PerformanceDataFrame( 

124 gv.settings().DEFAULT_performance_data_path 

125 ) 

126 count = check_logs_performance_data(performance_data) 

127 print( 

128 f"Extracted {count} values from the logs and placed them in the PerformanceDataFrame." 

129 ) 

130 

131 # Remove empty configurations 

132 removed_configurations = 0 

133 for solver, configurations in performance_data.configurations.items(): 

134 for config_id, config in configurations.items(): 

135 if config_id == PerformanceDataFrame.default_configuration: 

136 continue 

137 if not config: # Empty configuration, remove 

138 performance_data.remove_configuration(solver, config_id) 

139 removed_configurations += 1 

140 if removed_configurations: 

141 performance_data.save_csv() 

142 print( 

143 f"Removed {removed_configurations} empty configurations from the " 

144 "Performance DataFrame." 

145 ) 

146 

147 index_num = len(performance_data.index) 

148 # We only clean lines that are completely empty 

149 performance_data.remove_empty_runs() 

150 performance_data.save_csv() 

151 print( 

152 f"Removed {index_num - len(performance_data.index)} rows from the " 

153 f"Performance DataFrame, leaving {len(performance_data.index)} rows." 

154 ) 

155 

156 if args.all: 

157 shutil.rmtree(gv.settings().DEFAULT_output, ignore_errors=True) 

158 snh.create_working_dirs() 

159 print("Removed all output files from the platform!") 

160 elif args.remove: 

161 snh.remove_current_platform() 

162 snh.create_working_dirs() 

163 print("Cleaned platform of all files!") 

164 elif args.logs: 

165 remove_temporary_files() 

166 print("Cleaned platform of log files!") 

167 sys.exit(0) 

168 

169 

170if __name__ == "__main__": 

171 main(sys.argv[1:])