Coverage for sparkle/CLI/compute_marginal_contribution.py: 89%

96 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-03 10:42 +0000

1#!/usr/bin/env python3 

2"""Sparkle command for the computation of the marginal contributions.""" 

3import sys 

4import argparse 

5from pathlib import Path 

6import operator 

7 

8import tabulate 

9 

10from sparkle.solver import Selector 

11from sparkle.CLI.help import global_variables as gv 

12from sparkle.CLI.help import logging as sl 

13from sparkle.platform.settings_objects import SettingState 

14from sparkle.CLI.help import argparse_custom as ac 

15from sparkle.CLI.initialise import check_for_initialise 

16from sparkle.structures import PerformanceDataFrame, FeatureDataFrame 

17from sparkle.types import SparkleObjective 

18 

19 

20def parser_function() -> argparse.ArgumentParser: 

21 """Define the command line arguments.""" 

22 parser = argparse.ArgumentParser( 

23 description="Command to compute the marginal contribution of solvers to the " 

24 "portfolio.") 

25 parser.add_argument(*ac.PerfectSelectorMarginalContributionArgument.names, 

26 **ac.PerfectSelectorMarginalContributionArgument.kwargs) 

27 parser.add_argument(*ac.ActualMarginalContributionArgument.names, 

28 **ac.ActualMarginalContributionArgument.kwargs) 

29 parser.add_argument(*ac.ObjectivesArgument.names, 

30 **ac.ObjectivesArgument.kwargs) 

31 parser.add_argument(*ac.SettingsFileArgument.names, 

32 **ac.SettingsFileArgument.kwargs) 

33 

34 return parser 

35 

36 

37def compute_selector_performance( 

38 actual_portfolio_selector: Path, 

39 performance_data: PerformanceDataFrame, 

40 feature_data: FeatureDataFrame, 

41 objective: SparkleObjective) -> float: 

42 """Return the performance of a selector over all instances. 

43 

44 Args: 

45 actual_portfolio_selector: Path to portfolio selector. 

46 performance_data: The performance data. 

47 feature_data: The feature data. 

48 objective: Objective to compute the performance for 

49 

50 Returns: 

51 The selector performance as a single floating point number. 

52 """ 

53 performance_path = actual_portfolio_selector.parent / "performance.csv" 

54 if performance_path.exists(): 

55 selector_performance_data = PerformanceDataFrame(performance_path) 

56 return objective.instance_aggregator( 

57 selector_performance_data.get_values("portfolio_selector", 

58 objective=str(objective))) 

59 selector_performance_data = performance_data.clone() 

60 

61 selector_performance_data.add_solver("portfolio_selector") 

62 selector_performance_data.csv_filepath =\ 

63 actual_portfolio_selector.parent / "performance.csv" 

64 selector = Selector(gv.settings().get_selection_class(), 

65 gv.settings().get_selection_model()) 

66 

67 schedule = {} 

68 for instance in performance_data.instances: 

69 # We get the performance for an instance by infering the model predicition 

70 # for the instance. 

71 schedule[instance] = selector.run(actual_portfolio_selector, 

72 instance, 

73 feature_data) 

74 

75 schedule_performance = selector_performance_data.schedule_performance( 

76 schedule, target_solver="portfolio_selector", objective=objective) 

77 # Remove solvers from the dataframe 

78 selector_performance_data.remove_solver(performance_data.solvers) 

79 selector_performance_data.save_csv() # Save the results to disk 

80 return objective.instance_aggregator(schedule_performance) 

81 

82 

83def compute_selector_marginal_contribution( 

84 performance_data: PerformanceDataFrame, 

85 feature_data: FeatureDataFrame, 

86 selector_scenario: Path, 

87 objective: SparkleObjective) -> list[tuple[str, float]]: 

88 """Compute the marginal contributions of solvers in the selector. 

89 

90 Args: 

91 performance_data: Performance data object 

92 feature_data_csv_path: Path to the CSV file with the feature data. 

93 selector_scenario: Path to the selector scenario for which to compute 

94 marginal contribution. 

95 objective: Objective to compute the marginal contribution for. 

96 

97 Returns: 

98 A list of 2-tuples where every 2-tuple is of the form 

99 (solver name, marginal contribution, best_performance). 

100 """ 

101 portfolio_selector_path = selector_scenario / "portfolio_selector" 

102 

103 if not portfolio_selector_path.exists(): 

104 print(f"ERROR: Selector {portfolio_selector_path} does not exist! " 

105 "Cannot compute marginal contribution.") 

106 sys.exit(-1) 

107 

108 selector_performance = compute_selector_performance( 

109 portfolio_selector_path, performance_data, 

110 feature_data, objective) 

111 

112 rank_list = [] 

113 compare = operator.lt if objective.minimise else operator.gt 

114 # Compute contribution per solver 

115 # NOTE: This could be parallelised 

116 for solver in performance_data.solvers: 

117 solver_name = Path(solver).name 

118 # 1. Copy the dataframe original df 

119 tmp_performance_df = performance_data.clone() 

120 # 2. Remove the solver from this copy 

121 tmp_performance_df.remove_solver(solver) 

122 ablated_actual_portfolio_selector =\ 

123 selector_scenario / f"ablate_{solver_name}" / "portfolio_selector" 

124 if not ablated_actual_portfolio_selector.exists(): 

125 print(f"WARNING: Selector without {solver_name} does not exist! " 

126 f"Cannot compute marginal contribution of {solver_name}.") 

127 continue 

128 

129 ablated_selector_performance = compute_selector_performance( 

130 ablated_actual_portfolio_selector, tmp_performance_df, 

131 feature_data, objective) 

132 

133 # 1. If the performance remains equal, this solver did not contribute 

134 # 2. If there is a performance decay without this solver, it does contribute 

135 # 3. If there is a performance improvement, we have a bad portfolio selector 

136 if ablated_selector_performance == selector_performance: 

137 marginal_contribution = 0.0 

138 elif not compare(ablated_selector_performance, selector_performance): 

139 # In the case that the performance decreases, we have a contributing solver 

140 marginal_contribution = ablated_selector_performance / selector_performance 

141 else: 

142 print("****** WARNING DUBIOUS SELECTOR/SOLVER: " 

143 f"The omission of solver {solver_name} yields an improvement. " 

144 "The selector improves better without this solver. It may be usefull " 

145 "to construct a portfolio without this solver.") 

146 marginal_contribution = 0.0 

147 

148 rank_list.append((solver, marginal_contribution, ablated_selector_performance)) 

149 

150 rank_list.sort(key=lambda contribution: contribution[1], reverse=True) 

151 return rank_list 

152 

153 

154def compute_marginal_contribution( 

155 scenario: Path, compute_perfect: bool, compute_actual: bool) -> None: 

156 """Compute the marginal contribution. 

157 

158 Args: 

159 scenario: Path to the selector scenario for which to compute 

160 compute_perfect: Bool indicating if the contribution for the perfect 

161 portfolio selector should be computed. 

162 compute_actual: Bool indicating if the contribution for the actual portfolio 

163 selector should be computed. 

164 """ 

165 performance_data = PerformanceDataFrame(gv.settings().DEFAULT_performance_data_path) 

166 feature_data = FeatureDataFrame(gv.settings().DEFAULT_feature_data_path) 

167 objective = gv.settings().get_general_sparkle_objectives()[0] 

168 

169 if compute_perfect: 

170 # Perfect selector is the computation of the best performance per instance 

171 print("Computing each solver's marginal contribution to perfect selector ...") 

172 contribution_data = performance_data.marginal_contribution( 

173 objective=objective.name, sort=True) 

174 table = tabulate.tabulate( 

175 contribution_data, 

176 headers=["Solver", "Marginal Contribution", "Best Performance"],) 

177 print(table, "\n") 

178 print("Marginal contribution (perfect selector) computing done!") 

179 

180 if compute_actual: 

181 print("Start computing marginal contribution per Solver to actual selector...") 

182 contribution_data = compute_selector_marginal_contribution( 

183 performance_data, 

184 feature_data, 

185 scenario, 

186 objective 

187 ) 

188 table = tabulate.tabulate( 

189 contribution_data, 

190 headers=["Solver", "Marginal Contribution", "Best Performance"],) 

191 print(table, "\n") 

192 print("Marginal contribution (actual selector) computing done!") 

193 

194 

195def main(argv: list[str]) -> None: 

196 """Main function of the marginal contribution command.""" 

197 # Log command call 

198 sl.log_command(sys.argv) 

199 check_for_initialise() 

200 

201 # Define command line arguments 

202 parser = parser_function() 

203 

204 # Process command line arguments 

205 args = parser.parse_args(argv) 

206 

207 if ac.set_by_user(args, "settings_file"): 

208 gv.settings().read_settings_ini( 

209 args.settings_file, SettingState.CMD_LINE 

210 ) # Do first, so other command line options can override settings from the file 

211 if ac.set_by_user(args, "objectives"): 

212 gv.settings().set_general_sparkle_objectives( 

213 args.objectives, SettingState.CMD_LINE 

214 ) 

215 selection_scenario = gv.latest_scenario().get_selection_scenario_path() 

216 

217 if not (args.perfect | args.actual): 

218 print("ERROR: compute_marginal_contribution called without a flag set to" 

219 " True, stopping execution") 

220 sys.exit(-1) 

221 

222 compute_marginal_contribution(selection_scenario, args.perfect, args.actual) 

223 

224 # Write used settings to file 

225 gv.settings().write_used_settings() 

226 sys.exit(0) 

227 

228 

229if __name__ == "__main__": 

230 main(sys.argv[1:])