Coverage for sparkle/CLI/compute_marginal_contribution.py: 89%
96 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-07 15:22 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-07 15:22 +0000
1#!/usr/bin/env python3
2"""Sparkle command for the computation of the marginal contributions."""
3import sys
4import argparse
5from pathlib import Path
6import operator
8import tabulate
10from sparkle.CLI.help import global_variables as gv
11from sparkle.CLI.help import logging as sl
12from sparkle.platform.settings_objects import SettingState
13from sparkle.CLI.help import argparse_custom as ac
14from sparkle.CLI.initialise import check_for_initialise
15from sparkle.structures import PerformanceDataFrame, FeatureDataFrame
16from sparkle.types import SparkleObjective
19def parser_function() -> argparse.ArgumentParser:
20 """Define the command line arguments."""
21 parser = argparse.ArgumentParser(
22 description="Command to compute the marginal contribution of solvers to the "
23 "portfolio.")
24 parser.add_argument(*ac.PerfectSelectorMarginalContributionArgument.names,
25 **ac.PerfectSelectorMarginalContributionArgument.kwargs)
26 parser.add_argument(*ac.ActualMarginalContributionArgument.names,
27 **ac.ActualMarginalContributionArgument.kwargs)
28 parser.add_argument(*ac.ObjectivesArgument.names,
29 **ac.ObjectivesArgument.kwargs)
30 parser.add_argument(*ac.SettingsFileArgument.names,
31 **ac.SettingsFileArgument.kwargs)
33 return parser
36def compute_selector_performance(
37 actual_portfolio_selector: Path,
38 performance_data: PerformanceDataFrame,
39 feature_data: FeatureDataFrame,
40 objective: SparkleObjective) -> float:
41 """Return the performance of a selector over all instances.
43 Args:
44 actual_portfolio_selector: Path to portfolio selector.
45 performance_data: The performance data.
46 feature_data: The feature data.
47 objective: Objective to compute the performance for
49 Returns:
50 The selector performance as a single floating point number.
51 """
52 performance_path = actual_portfolio_selector.parent / "performance.csv"
53 if performance_path.exists():
54 selector_performance_data = PerformanceDataFrame(performance_path)
55 return objective.instance_aggregator(
56 selector_performance_data.get_values("portfolio_selector",
57 objective=str(objective)))
58 selector_performance_data = performance_data.clone()
60 selector_performance_data.add_solver("portfolio_selector")
61 selector_performance_data.csv_filepath =\
62 actual_portfolio_selector.parent / "performance.csv"
63 selector = gv.settings().get_general_sparkle_selector()
65 schedule = {}
66 for instance in performance_data.instances:
67 # We get the performance for an instance by infering the model predicition
68 # for the instance.
69 feature_vector = feature_data.get_instance(instance)
70 schedule[instance] = selector.run(actual_portfolio_selector, feature_vector)
72 schedule_performance = selector_performance_data.schedule_performance(
73 schedule, target_solver="portfolio_selector", objective=objective)
74 # Remove solvers from the dataframe
75 selector_performance_data.remove_solver(performance_data.solvers)
76 selector_performance_data.save_csv() # Save the results to disk
77 return objective.instance_aggregator(schedule_performance)
80def compute_selector_marginal_contribution(
81 performance_data: PerformanceDataFrame,
82 feature_data: FeatureDataFrame,
83 selector_scenario: Path,
84 objective: SparkleObjective) -> list[tuple[str, float]]:
85 """Compute the marginal contributions of solvers in the selector.
87 Args:
88 performance_data: Performance data object
89 feature_data_csv_path: Path to the CSV file with the feature data.
90 selector_scenario: Path to the selector scenario for which to compute
91 marginal contribution.
92 objective: Objective to compute the marginal contribution for.
94 Returns:
95 A list of 2-tuples where every 2-tuple is of the form
96 (solver name, marginal contribution, best_performance).
97 """
98 portfolio_selector_path = selector_scenario / "portfolio_selector"
100 if not portfolio_selector_path.exists():
101 print(f"ERROR: Selector {portfolio_selector_path} does not exist! "
102 "Cannot compute marginal contribution.")
103 sys.exit(-1)
105 selector_performance = compute_selector_performance(
106 portfolio_selector_path, performance_data,
107 feature_data, objective)
109 rank_list = []
110 compare = operator.lt if objective.minimise else operator.gt
111 # Compute contribution per solver
112 # NOTE: This could be parallelised
113 for solver in performance_data.solvers:
114 solver_name = Path(solver).name
115 # 1. Copy the dataframe original df
116 tmp_performance_df = performance_data.clone()
117 # 2. Remove the solver from this copy
118 tmp_performance_df.remove_solver(solver)
119 ablated_actual_portfolio_selector =\
120 selector_scenario / f"ablate_{solver_name}" / "portfolio_selector"
121 if not ablated_actual_portfolio_selector.exists():
122 print(f"WARNING: Selector without {solver_name} does not exist! "
123 f"Cannot compute marginal contribution of {solver_name}.")
124 continue
126 ablated_selector_performance = compute_selector_performance(
127 ablated_actual_portfolio_selector, tmp_performance_df,
128 feature_data, objective)
130 # 1. If the performance remains equal, this solver did not contribute
131 # 2. If there is a performance decay without this solver, it does contribute
132 # 3. If there is a performance improvement, we have a bad portfolio selector
133 if ablated_selector_performance == selector_performance:
134 marginal_contribution = 0.0
135 elif not compare(ablated_selector_performance, selector_performance):
136 # In the case that the performance decreases, we have a contributing solver
137 marginal_contribution = ablated_selector_performance / selector_performance
138 else:
139 print("****** WARNING DUBIOUS SELECTOR/SOLVER: "
140 f"The omission of solver {solver_name} yields an improvement. "
141 "The selector improves better without this solver. It may be usefull "
142 "to construct a portfolio without this solver.")
143 marginal_contribution = 0.0
145 rank_list.append((solver, marginal_contribution, ablated_selector_performance))
147 rank_list.sort(key=lambda contribution: contribution[1], reverse=True)
148 return rank_list
151def compute_marginal_contribution(
152 scenario: Path, compute_perfect: bool, compute_actual: bool) -> None:
153 """Compute the marginal contribution.
155 Args:
156 scenario: Path to the selector scenario for which to compute
157 compute_perfect: Bool indicating if the contribution for the perfect
158 portfolio selector should be computed.
159 compute_actual: Bool indicating if the contribution for the actual portfolio
160 selector should be computed.
161 """
162 performance_data = PerformanceDataFrame(gv.settings().DEFAULT_performance_data_path)
163 feature_data = FeatureDataFrame(gv.settings().DEFAULT_feature_data_path)
164 objective = gv.settings().get_general_sparkle_objectives()[0]
166 if compute_perfect:
167 # Perfect selector is the computation of the best performance per instance
168 print("Computing each solver's marginal contribution to perfect selector ...")
169 contribution_data = performance_data.marginal_contribution(
170 objective=objective.name, sort=True)
171 table = tabulate.tabulate(
172 contribution_data,
173 headers=["Solver", "Marginal Contribution", "Best Performance"],)
174 print(table, "\n")
175 print("Marginal contribution (perfect selector) computing done!")
177 if compute_actual:
178 print("Start computing marginal contribution per Solver to actual selector...")
179 contribution_data = compute_selector_marginal_contribution(
180 performance_data,
181 feature_data,
182 scenario,
183 objective
184 )
185 table = tabulate.tabulate(
186 contribution_data,
187 headers=["Solver", "Marginal Contribution", "Best Performance"],)
188 print(table, "\n")
189 print("Marginal contribution (actual selector) computing done!")
192def main(argv: list[str]) -> None:
193 """Main function of the marginal contribution command."""
194 # Log command call
195 sl.log_command(sys.argv)
196 check_for_initialise()
198 # Define command line arguments
199 parser = parser_function()
201 # Process command line arguments
202 args = parser.parse_args(argv)
204 if ac.set_by_user(args, "settings_file"):
205 gv.settings().read_settings_ini(
206 args.settings_file, SettingState.CMD_LINE
207 ) # Do first, so other command line options can override settings from the file
208 if ac.set_by_user(args, "objectives"):
209 gv.settings().set_general_sparkle_objectives(
210 args.objectives, SettingState.CMD_LINE
211 )
212 selection_scenario = gv.latest_scenario().get_selection_scenario_path()
214 if not (args.perfect | args.actual):
215 print("ERROR: compute_marginal_contribution called without a flag set to"
216 " True, stopping execution")
217 sys.exit(-1)
219 compute_marginal_contribution(selection_scenario, args.perfect, args.actual)
221 # Write used settings to file
222 gv.settings().write_used_settings()
223 sys.exit(0)
226if __name__ == "__main__":
227 main(sys.argv[1:])