Coverage for sparkle/platform/output/selection_output.py: 23%
75 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-29 10:17 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-29 10:17 +0000
1"""Sparkle class to organise configuration output."""
3from __future__ import annotations
4import operator
5import json
6from pathlib import Path
8from sparkle.selector import SelectionScenario
9from sparkle.structures import PerformanceDataFrame
10from sparkle.platform.output.structures import (
11 SelectionPerformance,
12 SelectionSolverData,
13)
16def compute_selector_marginal_contribution(
17 selection_scenario: SelectionScenario,
18) -> list[tuple[str, float]]:
19 """Compute the marginal contributions of solvers in the selector.
21 Args:
22 performance_data: Performance data object
23 feature_data_csv_path: Path to the CSV file with the feature data.
24 selection_scenario: The selector scenario for which to compute
25 marginal contribution.
26 objective: Objective to compute the marginal contribution for.
28 Returns:
29 A list of 4-tuples where every 4-tuple is of the form
30 (solver_name, config_id, marginal contribution, best_performance).
31 """
32 selector_performance = selection_scenario.objective.instance_aggregator(
33 selection_scenario.selector_performance_data.get_value(
34 SelectionScenario.__selector_solver_name__,
35 instance=selection_scenario.training_instances,
36 objective=selection_scenario.objective.name,
37 )
38 )
39 rank_list = []
40 compare = operator.lt if selection_scenario.objective.minimise else operator.gt
41 # Compute contribution per solver
42 for ablation_scenario in selection_scenario.ablation_scenarios:
43 # Hacky way of getting the needed data on the ablation
44 _, solver_name, config = ablation_scenario.directory.name.split("_", maxsplit=2)
45 # Hacky way of reconstructing the solver id in the PDF
46 solver = f"Solvers/{solver_name}"
47 ablated_selector_performance = ablation_scenario.objective.instance_aggregator(
48 ablation_scenario.selector_performance_data.get_value(
49 SelectionScenario.__selector_solver_name__,
50 instance=ablation_scenario.training_instances,
51 objective=ablation_scenario.objective.name,
52 )
53 )
55 # 1. If the performance remains equal, this solver did not contribute
56 # 2. If there is a performance decay without this solver, it does contribute
57 # 3. If there is a performance improvement, we have a bad portfolio selector
58 if ablated_selector_performance == selector_performance:
59 marginal_contribution = 0.0
60 elif not compare(ablated_selector_performance, selector_performance):
61 # The performance decreases, we have a contributing solver
62 marginal_contribution = ablated_selector_performance / selector_performance
63 else:
64 print(
65 "****** WARNING DUBIOUS SELECTOR/SOLVER: "
66 f"The omission of solver {solver_name} ({config}) yields an "
67 "improvement. The selector improves better without this solver. "
68 "It may be usefull to construct a portfolio without this solver."
69 )
70 marginal_contribution = 0.0
72 rank_list.append(
73 (solver, config, marginal_contribution, ablated_selector_performance)
74 )
76 rank_list.sort(key=lambda contribution: contribution[2], reverse=True)
77 return rank_list
80class SelectionOutput:
81 """Class that collects selection data and outputs it a JSON format."""
83 def __init__(
84 self: SelectionOutput,
85 selection_scenario: SelectionScenario,
86 ) -> None:
87 """Initialize SelectionOutput class.
89 Args:
90 selection_scenario: Path to selection output directory
91 performance_data: The performance data used for the selector
92 """
93 self.training_instances = selection_scenario.training_instances
94 training_instance_sets = selection_scenario.training_instance_sets
95 self.training_instance_sets = [
96 (instance_set, sum(instance_set in s for s in self.training_instances))
97 for instance_set in training_instance_sets
98 ]
99 self.test_instances = selection_scenario.test_instances
100 test_sets = selection_scenario.test_instance_sets
101 self.test_sets = [
102 (instance_set, sum(instance_set in s for s in self.test_instances))
103 for instance_set in test_sets
104 ]
105 self.cutoff_time = selection_scenario.solver_cutoff
106 self.objective = selection_scenario.objective
108 solver_performance_data = selection_scenario.selector_performance_data.clone()
109 solver_performance_data.remove_solver(SelectionScenario.__selector_solver_name__)
111 self.solver_performance_ranking = solver_performance_data.get_solver_ranking(
112 instances=self.training_instances, objective=self.objective
113 )
115 self.solver_data = self.get_solver_data(solver_performance_data)
116 self.solvers = {}
117 for solver_conf in selection_scenario.performance_data.columns:
118 solver, conf = solver_conf.split("_", maxsplit=1)
119 if solver not in self.solvers:
120 self.solvers[solver] = []
121 self.solvers[solver].append(conf)
123 self.sbs_performance = solver_performance_data.get_value(
124 solver=self.solver_performance_ranking[0][0],
125 configuration=self.solver_performance_ranking[0][1],
126 instance=self.training_instances,
127 objective=self.objective.name,
128 )
130 # Collect marginal contribution data
131 self.marginal_contribution_perfect = (
132 solver_performance_data.marginal_contribution(
133 selection_scenario.objective,
134 instances=self.training_instances,
135 sort=True,
136 )
137 )
139 self.marginal_contribution_actual = compute_selector_marginal_contribution(
140 selection_scenario
141 )
142 # Collect performance data
143 self.vbs_performance_data = solver_performance_data.best_instance_performance(
144 instances=self.training_instances, objective=selection_scenario.objective
145 )
146 self.vbs_performance = selection_scenario.objective.instance_aggregator(
147 self.vbs_performance_data
148 )
150 self.test_set_performance = {} if self.test_sets else None
151 for test_set, _ in self.test_sets:
152 test_set_instances = [
153 instance for instance in self.test_instances if test_set in instance
154 ]
155 test_perf = selection_scenario.selector_performance_data.best_performance(
156 exclude_solvers=[
157 s
158 for s in selection_scenario.selector_performance_data.solvers
159 if s != SelectionScenario.__selector_solver_name__
160 ],
161 instances=test_set_instances,
162 objective=selection_scenario.objective,
163 )
164 self.test_set_performance[test_set] = test_perf
165 self.actual_performance_data = (
166 selection_scenario.selector_performance_data.get_value(
167 solver=SelectionScenario.__selector_solver_name__,
168 instance=self.training_instances,
169 objective=self.objective.name,
170 )
171 )
172 self.actual_performance = self.objective.instance_aggregator(
173 self.actual_performance_data
174 )
176 def get_solver_data(
177 self: SelectionOutput, train_data: PerformanceDataFrame
178 ) -> SelectionSolverData:
179 """Initalise SelectionSolverData object."""
180 num_solvers = train_data.num_solvers
181 return SelectionSolverData(self.solver_performance_ranking, num_solvers)
183 def serialise_solvers(self: SelectionOutput, sd: SelectionSolverData) -> dict:
184 """Transform SelectionSolverData to dictionary format."""
185 return {
186 "number_of_solvers": sd.num_solvers,
187 "single_best_solver": sd.single_best_solver,
188 "solver_ranking": [
189 {"solver_name": solver[0], "performance": solver[1]}
190 for solver in sd.solver_performance_ranking
191 ],
192 }
194 def serialise_performance(self: SelectionOutput, sp: SelectionPerformance) -> dict:
195 """Transform SelectionPerformance to dictionary format."""
196 return {
197 "vbs_performance": sp.vbs_performance,
198 "actual_performance": sp.actual_performance,
199 "objective": self.objective.name,
200 "metric": sp.metric,
201 }
203 def serialise_instances(self: SelectionOutput, instances: list[str]) -> dict:
204 """Transform Instances to dictionary format."""
205 instance_sets = set(Path(instance).parent.name for instance in instances)
206 return {
207 "number_of_instance_sets": len(instance_sets),
208 "instance_sets": [
209 {
210 "name": instance_set,
211 "number_of_instances": sum(
212 [1 if instance_set in instance else 0 for instance in instances]
213 ),
214 }
215 for instance_set in instance_sets
216 ],
217 }
219 def serialise_marginal_contribution(self: SelectionOutput) -> dict:
220 """Transform performance ranking to dictionary format."""
221 return {
222 "marginal_contribution_actual": [
223 {
224 "solver_name": ranking[0],
225 "marginal_contribution": ranking[1],
226 "best_performance": ranking[2],
227 }
228 for ranking in self.marginal_contribution_actual
229 ],
230 "marginal_contribution_perfect": [
231 {
232 "solver_name": ranking[0],
233 "marginal_contribution": ranking[1],
234 "best_performance": ranking[2],
235 }
236 for ranking in self.marginal_contribution_perfect
237 ],
238 }
240 def serialise(self: SelectionOutput) -> dict:
241 """Serialise the selection output."""
242 test_data = (
243 self.serialise_instances(self.test_instances)
244 if self.test_instances
245 else None
246 )
247 return {
248 "solvers": self.serialise_solvers(self.solver_data),
249 "training_instances": self.serialise_instances(self.training_instances),
250 "test_instances": test_data,
251 "settings": {"cutoff_time": self.cutoff_time},
252 "marginal_contribution": self.serialise_marginal_contribution(),
253 }
255 def write_output(self: SelectionOutput, output: Path) -> None:
256 """Write data into a JSON file."""
257 output = output / "configuration.json" if output.is_dir() else output
258 with output.open("w") as f:
259 json.dump(self.serialise(), f, indent=4)