Coverage for sparkle/CLI/compute_marginal_contribution.py: 89%

79 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-07-01 13:21 +0000

1#!/usr/bin/env python3 

2"""Sparkle command for the computation of the marginal contributions.""" 

3import sys 

4import argparse 

5import operator 

6 

7import tabulate 

8 

9from sparkle.selector import SelectionScenario 

10from sparkle.CLI.help import global_variables as gv 

11from sparkle.CLI.help import logging as sl 

12from sparkle.CLI.help import argparse_custom as ac 

13from sparkle.CLI.initialise import check_for_initialise 

14from sparkle.structures import PerformanceDataFrame, FeatureDataFrame 

15 

16 

17def parser_function() -> argparse.ArgumentParser: 

18 """Define the command line arguments.""" 

19 parser = argparse.ArgumentParser( 

20 description="Command to compute the marginal contribution of solvers to the " 

21 "portfolio.") 

22 parser.add_argument(*ac.PerfectSelectorMarginalContributionArgument.names, 

23 **ac.PerfectSelectorMarginalContributionArgument.kwargs) 

24 parser.add_argument(*ac.ActualMarginalContributionArgument.names, 

25 **ac.ActualMarginalContributionArgument.kwargs) 

26 parser.add_argument(*ac.SelectionScenarioArgument.names, 

27 **ac.SelectionScenarioArgument.kwargs) 

28 return parser 

29 

30 

31def compute_selector_performance( 

32 selector_scenario: SelectionScenario, 

33 feature_data: FeatureDataFrame) -> float: 

34 """Return the performance of a selector over all instances. 

35 

36 Args: 

37 selector_scenario: The Selector scenario to compute the marginal contribution for. 

38 feature_data: The feature data of the instances. 

39 

40 Returns: 

41 The selector performance as a single floating point number. 

42 """ 

43 selector_performance_data = selector_scenario.selector_performance_data 

44 missing_instances =\ 

45 [instance for instance in selector_scenario.training_instances 

46 if selector_performance_data.is_missing( 

47 SelectionScenario.__selector_solver_name__, instance)] 

48 if not missing_instances: 

49 return selector_scenario.objective.instance_aggregator( 

50 selector_scenario.selector_performance_data.get_value( 

51 SelectionScenario.__selector_solver_name__, 

52 instance=selector_scenario.training_instances, 

53 objective=selector_scenario.objective.name)) 

54 

55 schedule = {} 

56 for instance in missing_instances: 

57 # We get the performance for an instance by infering the model predicition 

58 # for the instance. 

59 schedule[instance] = selector_scenario.selector.run( 

60 selector_scenario.selector_file_path, 

61 instance, 

62 feature_data) 

63 schedule_performance = selector_performance_data.schedule_performance( 

64 schedule, target_solver=SelectionScenario.__selector_solver_name__, 

65 objective=selector_scenario.objective) 

66 selector_performance_data.save_csv() # Save the results to disk 

67 return selector_scenario.objective.instance_aggregator(schedule_performance) 

68 

69 

70def compute_selector_marginal_contribution( 

71 feature_data: FeatureDataFrame, 

72 selection_scenario: SelectionScenario) -> list[tuple[str, float]]: 

73 """Compute the marginal contributions of solvers in the selector. 

74 

75 Args: 

76 performance_data: Performance data object 

77 feature_data_csv_path: Path to the CSV file with the feature data. 

78 selection_scenario: The selector scenario for which to compute 

79 marginal contribution. 

80 objective: Objective to compute the marginal contribution for. 

81 

82 Returns: 

83 A list of 4-tuples where every 4-tuple is of the form 

84 (solver_name, config_id, marginal contribution, best_performance). 

85 """ 

86 if not selection_scenario.selector_file_path.exists(): 

87 print(f"ERROR: Selector {selection_scenario.selector_file_path} does not exist! " 

88 "Cannot compute marginal contribution.") 

89 sys.exit(-1) 

90 

91 selector_performance = compute_selector_performance( 

92 selection_scenario, feature_data) 

93 

94 rank_list = [] 

95 compare = operator.lt if selection_scenario.objective.minimise else operator.gt 

96 # Compute contribution per solver 

97 # NOTE: This could be parallelised 

98 for ablation_scenario in selection_scenario.ablation_scenarios: 

99 # Hacky way of getting the needed data on the ablation 

100 _, solver_name, config = ablation_scenario.directory.name.split("_", maxsplit=2) 

101 # TODO: This should be fixed through SPRK-352 

102 # Hacky way of reconstructing the solver id in the PDF 

103 solver = f"Solvers/{solver_name}" 

104 if not ablation_scenario.selector_file_path.exists(): 

105 print(f"WARNING: Selector without {solver_name} does not exist! " 

106 f"Cannot compute marginal contribution of {solver_name}.") 

107 continue 

108 

109 ablated_selector_performance = compute_selector_performance( 

110 ablation_scenario, feature_data) 

111 

112 # 1. If the performance remains equal, this solver did not contribute 

113 # 2. If there is a performance decay without this solver, it does contribute 

114 # 3. If there is a performance improvement, we have a bad portfolio selector 

115 if ablated_selector_performance == selector_performance: 

116 marginal_contribution = 0.0 

117 elif not compare(ablated_selector_performance, selector_performance): 

118 # The performance decreases, we have a contributing solver 

119 marginal_contribution =\ 

120 ablated_selector_performance / selector_performance 

121 else: 

122 print("****** WARNING DUBIOUS SELECTOR/SOLVER: " 

123 f"The omission of solver {solver_name} ({config}) yields an " 

124 "improvement. The selector improves better without this solver. " 

125 "It may be usefull to construct a portfolio without this solver.") 

126 marginal_contribution = 0.0 

127 

128 rank_list.append((solver, config, 

129 marginal_contribution, ablated_selector_performance)) 

130 

131 rank_list.sort(key=lambda contribution: contribution[2], reverse=True) 

132 return rank_list 

133 

134 

135def compute_marginal_contribution( 

136 scenario: SelectionScenario, 

137 performance_data: PerformanceDataFrame, 

138 feature_data: FeatureDataFrame, 

139 compute_perfect: bool, compute_actual: bool) -> None: 

140 """Compute the marginal contribution. 

141 

142 Args: 

143 scenario: Selector scenario for which to compute marginal contribution. 

144 performance_data: The complete performance data object 

145 feature_data: Feature data object 

146 compute_perfect: Bool indicating if the contribution for the perfect 

147 portfolio selector should be computed. 

148 compute_actual: Bool indicating if the contribution for the actual portfolio 

149 selector should be computed. 

150 """ 

151 if compute_perfect: 

152 # Perfect selector is the computation of the best performance per instance 

153 print("Computing each solver's marginal contribution to perfect selector ...") 

154 contribution_data = performance_data.marginal_contribution( 

155 objective=scenario.objective.name, 

156 instances=scenario.training_instances, sort=True) 

157 table = tabulate.tabulate( 

158 contribution_data, 

159 headers=["Solver", "Configuration", 

160 "Marginal Contribution", "Best Performance"],) 

161 print(table, "\n") 

162 print("Marginal contribution (perfect selector) computing done!") 

163 

164 if compute_actual: 

165 print("Start computing marginal contribution per Solver to actual selector...") 

166 contribution_data = compute_selector_marginal_contribution( 

167 feature_data, 

168 scenario 

169 ) 

170 table = tabulate.tabulate( 

171 contribution_data, 

172 headers=["Solver", "Configuration", 

173 "Marginal Contribution", "Best Performance"],) 

174 print(table, "\n") 

175 print("Marginal contribution (actual selector) computing done!") 

176 

177 

178def main(argv: list[str]) -> None: 

179 """Main function of the marginal contribution command.""" 

180 # Log command call 

181 sl.log_command(sys.argv) 

182 check_for_initialise() 

183 

184 # Define command line arguments 

185 parser = parser_function() 

186 

187 # Process command line arguments 

188 args = parser.parse_args(argv) 

189 

190 selection_scenario = SelectionScenario.from_file(args.selection_scenario) 

191 performance_data = PerformanceDataFrame(gv.settings().DEFAULT_performance_data_path) 

192 feature_data = FeatureDataFrame(gv.settings().DEFAULT_feature_data_path) 

193 

194 if not (args.perfect | args.actual): 

195 print("ERROR: compute_marginal_contribution called without a flag set to" 

196 " True, stopping execution") 

197 sys.exit(-1) 

198 

199 compute_marginal_contribution(selection_scenario, 

200 performance_data, 

201 feature_data, 

202 args.perfect, args.actual) 

203 

204 # Write used settings to file 

205 gv.settings().write_used_settings() 

206 sys.exit(0) 

207 

208 

209if __name__ == "__main__": 

210 main(sys.argv[1:])