Coverage for sparkle/CLI/compute_marginal_contribution.py: 89%

96 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-07 15:22 +0000

1#!/usr/bin/env python3 

2"""Sparkle command for the computation of the marginal contributions.""" 

3import sys 

4import argparse 

5from pathlib import Path 

6import operator 

7 

8import tabulate 

9 

10from sparkle.CLI.help import global_variables as gv 

11from sparkle.CLI.help import logging as sl 

12from sparkle.platform.settings_objects import SettingState 

13from sparkle.CLI.help import argparse_custom as ac 

14from sparkle.CLI.initialise import check_for_initialise 

15from sparkle.structures import PerformanceDataFrame, FeatureDataFrame 

16from sparkle.types import SparkleObjective 

17 

18 

19def parser_function() -> argparse.ArgumentParser: 

20 """Define the command line arguments.""" 

21 parser = argparse.ArgumentParser( 

22 description="Command to compute the marginal contribution of solvers to the " 

23 "portfolio.") 

24 parser.add_argument(*ac.PerfectSelectorMarginalContributionArgument.names, 

25 **ac.PerfectSelectorMarginalContributionArgument.kwargs) 

26 parser.add_argument(*ac.ActualMarginalContributionArgument.names, 

27 **ac.ActualMarginalContributionArgument.kwargs) 

28 parser.add_argument(*ac.ObjectivesArgument.names, 

29 **ac.ObjectivesArgument.kwargs) 

30 parser.add_argument(*ac.SettingsFileArgument.names, 

31 **ac.SettingsFileArgument.kwargs) 

32 

33 return parser 

34 

35 

36def compute_selector_performance( 

37 actual_portfolio_selector: Path, 

38 performance_data: PerformanceDataFrame, 

39 feature_data: FeatureDataFrame, 

40 objective: SparkleObjective) -> float: 

41 """Return the performance of a selector over all instances. 

42 

43 Args: 

44 actual_portfolio_selector: Path to portfolio selector. 

45 performance_data: The performance data. 

46 feature_data: The feature data. 

47 objective: Objective to compute the performance for 

48 

49 Returns: 

50 The selector performance as a single floating point number. 

51 """ 

52 performance_path = actual_portfolio_selector.parent / "performance.csv" 

53 if performance_path.exists(): 

54 selector_performance_data = PerformanceDataFrame(performance_path) 

55 return objective.instance_aggregator( 

56 selector_performance_data.get_values("portfolio_selector", 

57 objective=str(objective))) 

58 selector_performance_data = performance_data.clone() 

59 

60 selector_performance_data.add_solver("portfolio_selector") 

61 selector_performance_data.csv_filepath =\ 

62 actual_portfolio_selector.parent / "performance.csv" 

63 selector = gv.settings().get_general_sparkle_selector() 

64 

65 schedule = {} 

66 for instance in performance_data.instances: 

67 # We get the performance for an instance by infering the model predicition 

68 # for the instance. 

69 feature_vector = feature_data.get_instance(instance) 

70 schedule[instance] = selector.run(actual_portfolio_selector, feature_vector) 

71 

72 schedule_performance = selector_performance_data.schedule_performance( 

73 schedule, target_solver="portfolio_selector", objective=objective) 

74 # Remove solvers from the dataframe 

75 selector_performance_data.remove_solver(performance_data.solvers) 

76 selector_performance_data.save_csv() # Save the results to disk 

77 return objective.instance_aggregator(schedule_performance) 

78 

79 

80def compute_selector_marginal_contribution( 

81 performance_data: PerformanceDataFrame, 

82 feature_data: FeatureDataFrame, 

83 selector_scenario: Path, 

84 objective: SparkleObjective) -> list[tuple[str, float]]: 

85 """Compute the marginal contributions of solvers in the selector. 

86 

87 Args: 

88 performance_data: Performance data object 

89 feature_data_csv_path: Path to the CSV file with the feature data. 

90 selector_scenario: Path to the selector scenario for which to compute 

91 marginal contribution. 

92 objective: Objective to compute the marginal contribution for. 

93 

94 Returns: 

95 A list of 2-tuples where every 2-tuple is of the form 

96 (solver name, marginal contribution, best_performance). 

97 """ 

98 portfolio_selector_path = selector_scenario / "portfolio_selector" 

99 

100 if not portfolio_selector_path.exists(): 

101 print(f"ERROR: Selector {portfolio_selector_path} does not exist! " 

102 "Cannot compute marginal contribution.") 

103 sys.exit(-1) 

104 

105 selector_performance = compute_selector_performance( 

106 portfolio_selector_path, performance_data, 

107 feature_data, objective) 

108 

109 rank_list = [] 

110 compare = operator.lt if objective.minimise else operator.gt 

111 # Compute contribution per solver 

112 # NOTE: This could be parallelised 

113 for solver in performance_data.solvers: 

114 solver_name = Path(solver).name 

115 # 1. Copy the dataframe original df 

116 tmp_performance_df = performance_data.clone() 

117 # 2. Remove the solver from this copy 

118 tmp_performance_df.remove_solver(solver) 

119 ablated_actual_portfolio_selector =\ 

120 selector_scenario / f"ablate_{solver_name}" / "portfolio_selector" 

121 if not ablated_actual_portfolio_selector.exists(): 

122 print(f"WARNING: Selector without {solver_name} does not exist! " 

123 f"Cannot compute marginal contribution of {solver_name}.") 

124 continue 

125 

126 ablated_selector_performance = compute_selector_performance( 

127 ablated_actual_portfolio_selector, tmp_performance_df, 

128 feature_data, objective) 

129 

130 # 1. If the performance remains equal, this solver did not contribute 

131 # 2. If there is a performance decay without this solver, it does contribute 

132 # 3. If there is a performance improvement, we have a bad portfolio selector 

133 if ablated_selector_performance == selector_performance: 

134 marginal_contribution = 0.0 

135 elif not compare(ablated_selector_performance, selector_performance): 

136 # In the case that the performance decreases, we have a contributing solver 

137 marginal_contribution = ablated_selector_performance / selector_performance 

138 else: 

139 print("****** WARNING DUBIOUS SELECTOR/SOLVER: " 

140 f"The omission of solver {solver_name} yields an improvement. " 

141 "The selector improves better without this solver. It may be usefull " 

142 "to construct a portfolio without this solver.") 

143 marginal_contribution = 0.0 

144 

145 rank_list.append((solver, marginal_contribution, ablated_selector_performance)) 

146 

147 rank_list.sort(key=lambda contribution: contribution[1], reverse=True) 

148 return rank_list 

149 

150 

151def compute_marginal_contribution( 

152 scenario: Path, compute_perfect: bool, compute_actual: bool) -> None: 

153 """Compute the marginal contribution. 

154 

155 Args: 

156 scenario: Path to the selector scenario for which to compute 

157 compute_perfect: Bool indicating if the contribution for the perfect 

158 portfolio selector should be computed. 

159 compute_actual: Bool indicating if the contribution for the actual portfolio 

160 selector should be computed. 

161 """ 

162 performance_data = PerformanceDataFrame(gv.settings().DEFAULT_performance_data_path) 

163 feature_data = FeatureDataFrame(gv.settings().DEFAULT_feature_data_path) 

164 objective = gv.settings().get_general_sparkle_objectives()[0] 

165 

166 if compute_perfect: 

167 # Perfect selector is the computation of the best performance per instance 

168 print("Computing each solver's marginal contribution to perfect selector ...") 

169 contribution_data = performance_data.marginal_contribution( 

170 objective=objective.name, sort=True) 

171 table = tabulate.tabulate( 

172 contribution_data, 

173 headers=["Solver", "Marginal Contribution", "Best Performance"],) 

174 print(table, "\n") 

175 print("Marginal contribution (perfect selector) computing done!") 

176 

177 if compute_actual: 

178 print("Start computing marginal contribution per Solver to actual selector...") 

179 contribution_data = compute_selector_marginal_contribution( 

180 performance_data, 

181 feature_data, 

182 scenario, 

183 objective 

184 ) 

185 table = tabulate.tabulate( 

186 contribution_data, 

187 headers=["Solver", "Marginal Contribution", "Best Performance"],) 

188 print(table, "\n") 

189 print("Marginal contribution (actual selector) computing done!") 

190 

191 

192def main(argv: list[str]) -> None: 

193 """Main function of the marginal contribution command.""" 

194 # Log command call 

195 sl.log_command(sys.argv) 

196 check_for_initialise() 

197 

198 # Define command line arguments 

199 parser = parser_function() 

200 

201 # Process command line arguments 

202 args = parser.parse_args(argv) 

203 

204 if ac.set_by_user(args, "settings_file"): 

205 gv.settings().read_settings_ini( 

206 args.settings_file, SettingState.CMD_LINE 

207 ) # Do first, so other command line options can override settings from the file 

208 if ac.set_by_user(args, "objectives"): 

209 gv.settings().set_general_sparkle_objectives( 

210 args.objectives, SettingState.CMD_LINE 

211 ) 

212 selection_scenario = gv.latest_scenario().get_selection_scenario_path() 

213 

214 if not (args.perfect | args.actual): 

215 print("ERROR: compute_marginal_contribution called without a flag set to" 

216 " True, stopping execution") 

217 sys.exit(-1) 

218 

219 compute_marginal_contribution(selection_scenario, args.perfect, args.actual) 

220 

221 # Write used settings to file 

222 gv.settings().write_used_settings() 

223 sys.exit(0) 

224 

225 

226if __name__ == "__main__": 

227 main(sys.argv[1:])