Coverage for sparkle/CLI/cleanup.py: 37%

84 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-09-29 10:17 +0000

1#!/usr/bin/env python3 

2"""Command to remove temporary files not affecting the platform state.""" 

3 

4import re 

5import sys 

6import argparse 

7import shutil 

8 

9from sparkle.structures import PerformanceDataFrame 

10 

11from sparkle.CLI.help import logging as sl 

12from sparkle.CLI.help import global_variables as gv 

13from sparkle.CLI.help import argparse_custom as ac 

14from sparkle.CLI.help import snapshot_help as snh 

15from sparkle.CLI.help import jobs as jobs_help 

16 

17 

18def parser_function() -> argparse.ArgumentParser: 

19 """Define the command line arguments.""" 

20 parser = argparse.ArgumentParser( 

21 description="Command to clean files from the platform." 

22 ) 

23 parser.add_argument(*ac.CleanupArgumentAll.names, **ac.CleanupArgumentAll.kwargs) 

24 parser.add_argument( 

25 *ac.CleanupArgumentRemove.names, **ac.CleanupArgumentRemove.kwargs 

26 ) 

27 parser.add_argument( 

28 *ac.CleanUpPerformanceDataArgument.names, 

29 **ac.CleanUpPerformanceDataArgument.kwargs, 

30 ) 

31 return parser 

32 

33 

34def check_logs_performance_data(performance_data: PerformanceDataFrame) -> int: 

35 """Check if the performance data is missing values that can be extracted from the logs. 

36 

37 Args: 

38 performance_data (PerformanceDataFrame): The performance data. 

39 

40 Returns: 

41 int: The number of updated values. 

42 """ 

43 # empty_indices = performance_data.empty_indices 

44 pattern = re.compile( 

45 r"^(?P<objective>\S+)\s*,\s*" 

46 r"(?P<instance>\S+)\s*,\s*" 

47 r"(?P<run_id>\S+)\s*\|\s*" 

48 r"(?P<solver>\S+)\s*,\s*" 

49 r"(?P<config_id>\S+)\s*:\s*" 

50 r"(?P<target_value>\S+)$" 

51 ) 

52 import math 

53 

54 # Only iterate over slurm log files 

55 log_files = [ 

56 f 

57 for f in gv.settings().DEFAULT_log_output.glob("**/*") 

58 if f.is_file() and f.suffix == ".out" 

59 ] 

60 count = 0 

61 for log in log_files: 

62 for line in log.read_text().splitlines(): 

63 match = pattern.match(line) 

64 if match: 

65 objective = match.group("objective") 

66 instance = match.group("instance") 

67 run_id = int(match.group("run_id")) 

68 solver = match.group("solver") 

69 config_id = match.group("config_id") 

70 target_value = match.group("target_value") 

71 current_value = performance_data.get_value( 

72 solver, instance, config_id, objective, run_id 

73 ) 

74 # TODO: Would be better to extract all nan indices from PDF and check against this? 

75 if ( 

76 ( 

77 isinstance(current_value, (int, float)) 

78 and math.isnan(current_value) 

79 ) 

80 or isinstance(current_value, str) 

81 and current_value == "nan" 

82 ): 

83 performance_data.set_value( 

84 target_value, solver, instance, config_id, objective, run_id 

85 ) 

86 count += 1 

87 if count: 

88 performance_data.save_csv() 

89 return count 

90 

91 

92def remove_temporary_files() -> None: 

93 """Remove temporary files. Only removes files not affecting the sparkle state.""" 

94 shutil.rmtree(gv.settings().DEFAULT_log_output, ignore_errors=True) 

95 gv.settings().DEFAULT_log_output.mkdir() 

96 

97 

98def main(argv: list[str]) -> None: 

99 """Main function of the cleanup command.""" 

100 # Log command call 

101 sl.log_command(sys.argv, gv.settings().random_state) 

102 

103 # Define command line arguments 

104 parser = parser_function() 

105 

106 # Process command line arguments 

107 args = parser.parse_args(argv) 

108 

109 if args.performance_data: 

110 # Check if we can cleanup the PerformanceDataFrame if necessary 

111 from runrunner.base import Status 

112 

113 running_jobs = jobs_help.get_runs_from_file( 

114 gv.settings().DEFAULT_log_output, filter=[Status.WAITING, Status.RUNNING] 

115 ) 

116 if len(running_jobs) > 0: 

117 print("WARNING: There are still running jobs! Continue cleaning? [y/n]") 

118 a = input() 

119 if a != "y": 

120 sys.exit(0) 

121 

122 performance_data = PerformanceDataFrame( 

123 gv.settings().DEFAULT_performance_data_path 

124 ) 

125 count = check_logs_performance_data(performance_data) 

126 print( 

127 f"Extracted {count} values from the logs and placed them in the PerformanceDataFrame." 

128 ) 

129 

130 # Remove empty configurations 

131 removed_configurations = 0 

132 for solver, configurations in performance_data.configurations.items(): 

133 for config_id, config in configurations.items(): 

134 if config_id == PerformanceDataFrame.default_configuration: 

135 continue 

136 if not config: # Empty configuration, remove 

137 performance_data.remove_configuration(solver, config_id) 

138 removed_configurations += 1 

139 if removed_configurations: 

140 performance_data.save_csv() 

141 print( 

142 f"Removed {removed_configurations} empty configurations from the " 

143 "Performance DataFrame." 

144 ) 

145 

146 index_num = len(performance_data.index) 

147 # We only clean lines that are completely empty 

148 performance_data.remove_empty_runs() 

149 performance_data.save_csv() 

150 print( 

151 f"Removed {index_num - len(performance_data.index)} rows from the " 

152 f"Performance DataFrame, leaving {len(performance_data.index)} rows." 

153 ) 

154 

155 if args.all: 

156 shutil.rmtree(gv.settings().DEFAULT_output, ignore_errors=True) 

157 snh.create_working_dirs() 

158 print("Removed all output files from the platform!") 

159 elif args.remove: 

160 snh.remove_current_platform() 

161 snh.create_working_dirs() 

162 print("Cleaned platform of all files!") 

163 else: 

164 remove_temporary_files() 

165 print("Cleaned platform of temporary files!") 

166 sys.exit(0) 

167 

168 

169if __name__ == "__main__": 

170 main(sys.argv[1:])