Coverage for sparkle/platform/output/selection_output.py: 23%

75 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-09-29 10:17 +0000

1"""Sparkle class to organise configuration output.""" 

2 

3from __future__ import annotations 

4import operator 

5import json 

6from pathlib import Path 

7 

8from sparkle.selector import SelectionScenario 

9from sparkle.structures import PerformanceDataFrame 

10from sparkle.platform.output.structures import ( 

11 SelectionPerformance, 

12 SelectionSolverData, 

13) 

14 

15 

16def compute_selector_marginal_contribution( 

17 selection_scenario: SelectionScenario, 

18) -> list[tuple[str, float]]: 

19 """Compute the marginal contributions of solvers in the selector. 

20 

21 Args: 

22 performance_data: Performance data object 

23 feature_data_csv_path: Path to the CSV file with the feature data. 

24 selection_scenario: The selector scenario for which to compute 

25 marginal contribution. 

26 objective: Objective to compute the marginal contribution for. 

27 

28 Returns: 

29 A list of 4-tuples where every 4-tuple is of the form 

30 (solver_name, config_id, marginal contribution, best_performance). 

31 """ 

32 selector_performance = selection_scenario.objective.instance_aggregator( 

33 selection_scenario.selector_performance_data.get_value( 

34 SelectionScenario.__selector_solver_name__, 

35 instance=selection_scenario.training_instances, 

36 objective=selection_scenario.objective.name, 

37 ) 

38 ) 

39 rank_list = [] 

40 compare = operator.lt if selection_scenario.objective.minimise else operator.gt 

41 # Compute contribution per solver 

42 for ablation_scenario in selection_scenario.ablation_scenarios: 

43 # Hacky way of getting the needed data on the ablation 

44 _, solver_name, config = ablation_scenario.directory.name.split("_", maxsplit=2) 

45 # Hacky way of reconstructing the solver id in the PDF 

46 solver = f"Solvers/{solver_name}" 

47 ablated_selector_performance = ablation_scenario.objective.instance_aggregator( 

48 ablation_scenario.selector_performance_data.get_value( 

49 SelectionScenario.__selector_solver_name__, 

50 instance=ablation_scenario.training_instances, 

51 objective=ablation_scenario.objective.name, 

52 ) 

53 ) 

54 

55 # 1. If the performance remains equal, this solver did not contribute 

56 # 2. If there is a performance decay without this solver, it does contribute 

57 # 3. If there is a performance improvement, we have a bad portfolio selector 

58 if ablated_selector_performance == selector_performance: 

59 marginal_contribution = 0.0 

60 elif not compare(ablated_selector_performance, selector_performance): 

61 # The performance decreases, we have a contributing solver 

62 marginal_contribution = ablated_selector_performance / selector_performance 

63 else: 

64 print( 

65 "****** WARNING DUBIOUS SELECTOR/SOLVER: " 

66 f"The omission of solver {solver_name} ({config}) yields an " 

67 "improvement. The selector improves better without this solver. " 

68 "It may be usefull to construct a portfolio without this solver." 

69 ) 

70 marginal_contribution = 0.0 

71 

72 rank_list.append( 

73 (solver, config, marginal_contribution, ablated_selector_performance) 

74 ) 

75 

76 rank_list.sort(key=lambda contribution: contribution[2], reverse=True) 

77 return rank_list 

78 

79 

80class SelectionOutput: 

81 """Class that collects selection data and outputs it a JSON format.""" 

82 

83 def __init__( 

84 self: SelectionOutput, 

85 selection_scenario: SelectionScenario, 

86 ) -> None: 

87 """Initialize SelectionOutput class. 

88 

89 Args: 

90 selection_scenario: Path to selection output directory 

91 performance_data: The performance data used for the selector 

92 """ 

93 self.training_instances = selection_scenario.training_instances 

94 training_instance_sets = selection_scenario.training_instance_sets 

95 self.training_instance_sets = [ 

96 (instance_set, sum(instance_set in s for s in self.training_instances)) 

97 for instance_set in training_instance_sets 

98 ] 

99 self.test_instances = selection_scenario.test_instances 

100 test_sets = selection_scenario.test_instance_sets 

101 self.test_sets = [ 

102 (instance_set, sum(instance_set in s for s in self.test_instances)) 

103 for instance_set in test_sets 

104 ] 

105 self.cutoff_time = selection_scenario.solver_cutoff 

106 self.objective = selection_scenario.objective 

107 

108 solver_performance_data = selection_scenario.selector_performance_data.clone() 

109 solver_performance_data.remove_solver(SelectionScenario.__selector_solver_name__) 

110 

111 self.solver_performance_ranking = solver_performance_data.get_solver_ranking( 

112 instances=self.training_instances, objective=self.objective 

113 ) 

114 

115 self.solver_data = self.get_solver_data(solver_performance_data) 

116 self.solvers = {} 

117 for solver_conf in selection_scenario.performance_data.columns: 

118 solver, conf = solver_conf.split("_", maxsplit=1) 

119 if solver not in self.solvers: 

120 self.solvers[solver] = [] 

121 self.solvers[solver].append(conf) 

122 

123 self.sbs_performance = solver_performance_data.get_value( 

124 solver=self.solver_performance_ranking[0][0], 

125 configuration=self.solver_performance_ranking[0][1], 

126 instance=self.training_instances, 

127 objective=self.objective.name, 

128 ) 

129 

130 # Collect marginal contribution data 

131 self.marginal_contribution_perfect = ( 

132 solver_performance_data.marginal_contribution( 

133 selection_scenario.objective, 

134 instances=self.training_instances, 

135 sort=True, 

136 ) 

137 ) 

138 

139 self.marginal_contribution_actual = compute_selector_marginal_contribution( 

140 selection_scenario 

141 ) 

142 # Collect performance data 

143 self.vbs_performance_data = solver_performance_data.best_instance_performance( 

144 instances=self.training_instances, objective=selection_scenario.objective 

145 ) 

146 self.vbs_performance = selection_scenario.objective.instance_aggregator( 

147 self.vbs_performance_data 

148 ) 

149 

150 self.test_set_performance = {} if self.test_sets else None 

151 for test_set, _ in self.test_sets: 

152 test_set_instances = [ 

153 instance for instance in self.test_instances if test_set in instance 

154 ] 

155 test_perf = selection_scenario.selector_performance_data.best_performance( 

156 exclude_solvers=[ 

157 s 

158 for s in selection_scenario.selector_performance_data.solvers 

159 if s != SelectionScenario.__selector_solver_name__ 

160 ], 

161 instances=test_set_instances, 

162 objective=selection_scenario.objective, 

163 ) 

164 self.test_set_performance[test_set] = test_perf 

165 self.actual_performance_data = ( 

166 selection_scenario.selector_performance_data.get_value( 

167 solver=SelectionScenario.__selector_solver_name__, 

168 instance=self.training_instances, 

169 objective=self.objective.name, 

170 ) 

171 ) 

172 self.actual_performance = self.objective.instance_aggregator( 

173 self.actual_performance_data 

174 ) 

175 

176 def get_solver_data( 

177 self: SelectionOutput, train_data: PerformanceDataFrame 

178 ) -> SelectionSolverData: 

179 """Initalise SelectionSolverData object.""" 

180 num_solvers = train_data.num_solvers 

181 return SelectionSolverData(self.solver_performance_ranking, num_solvers) 

182 

183 def serialise_solvers(self: SelectionOutput, sd: SelectionSolverData) -> dict: 

184 """Transform SelectionSolverData to dictionary format.""" 

185 return { 

186 "number_of_solvers": sd.num_solvers, 

187 "single_best_solver": sd.single_best_solver, 

188 "solver_ranking": [ 

189 {"solver_name": solver[0], "performance": solver[1]} 

190 for solver in sd.solver_performance_ranking 

191 ], 

192 } 

193 

194 def serialise_performance(self: SelectionOutput, sp: SelectionPerformance) -> dict: 

195 """Transform SelectionPerformance to dictionary format.""" 

196 return { 

197 "vbs_performance": sp.vbs_performance, 

198 "actual_performance": sp.actual_performance, 

199 "objective": self.objective.name, 

200 "metric": sp.metric, 

201 } 

202 

203 def serialise_instances(self: SelectionOutput, instances: list[str]) -> dict: 

204 """Transform Instances to dictionary format.""" 

205 instance_sets = set(Path(instance).parent.name for instance in instances) 

206 return { 

207 "number_of_instance_sets": len(instance_sets), 

208 "instance_sets": [ 

209 { 

210 "name": instance_set, 

211 "number_of_instances": sum( 

212 [1 if instance_set in instance else 0 for instance in instances] 

213 ), 

214 } 

215 for instance_set in instance_sets 

216 ], 

217 } 

218 

219 def serialise_marginal_contribution(self: SelectionOutput) -> dict: 

220 """Transform performance ranking to dictionary format.""" 

221 return { 

222 "marginal_contribution_actual": [ 

223 { 

224 "solver_name": ranking[0], 

225 "marginal_contribution": ranking[1], 

226 "best_performance": ranking[2], 

227 } 

228 for ranking in self.marginal_contribution_actual 

229 ], 

230 "marginal_contribution_perfect": [ 

231 { 

232 "solver_name": ranking[0], 

233 "marginal_contribution": ranking[1], 

234 "best_performance": ranking[2], 

235 } 

236 for ranking in self.marginal_contribution_perfect 

237 ], 

238 } 

239 

240 def serialise(self: SelectionOutput) -> dict: 

241 """Serialise the selection output.""" 

242 test_data = ( 

243 self.serialise_instances(self.test_instances) 

244 if self.test_instances 

245 else None 

246 ) 

247 return { 

248 "solvers": self.serialise_solvers(self.solver_data), 

249 "training_instances": self.serialise_instances(self.training_instances), 

250 "test_instances": test_data, 

251 "settings": {"cutoff_time": self.cutoff_time}, 

252 "marginal_contribution": self.serialise_marginal_contribution(), 

253 } 

254 

255 def write_output(self: SelectionOutput, output: Path) -> None: 

256 """Write data into a JSON file.""" 

257 output = output / "configuration.json" if output.is_dir() else output 

258 with output.open("w") as f: 

259 json.dump(self.serialise(), f, indent=4)