Coverage for sparkle/selector/selector.py: 89%

143 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-07-01 13:21 +0000

1"""File to handle a Selector for selecting Solvers.""" 

2from __future__ import annotations 

3from pathlib import Path 

4 

5from sklearn.base import ClassifierMixin, RegressorMixin 

6from asf.cli import cli_train as asf_cli 

7from asf.scenario.scenario_metadata import ScenarioMetadata 

8from asf.predictors import AbstractPredictor 

9from asf.selectors.abstract_model_based_selector import AbstractModelBasedSelector 

10 

11import runrunner as rrr 

12from runrunner import Runner, Run 

13import pandas as pd 

14 

15from sparkle.types import SparkleObjective, resolve_objective 

16from sparkle.structures import FeatureDataFrame, PerformanceDataFrame 

17 

18 

19class Selector: 

20 """The Selector class for handling Algorithm Selection.""" 

21 

22 def __init__( 

23 self: Selector, 

24 selector_class: AbstractModelBasedSelector, 

25 model_class: AbstractPredictor | ClassifierMixin | RegressorMixin) -> None: 

26 """Initialize the Selector object. 

27 

28 Args: 

29 selector_class: The Selector class to construct. 

30 model_class: The model class the selector will use. 

31 """ 

32 self.selector_class = selector_class 

33 self.model_class = model_class 

34 

35 @property 

36 def name(self: Selector) -> str: 

37 """Return the name of the selector.""" 

38 return f"{self.selector_class.__name__}_{self.model_class.__name__}" 

39 

40 def construct(self: Selector, 

41 selection_scenario: SelectionScenario, 

42 run_on: Runner = Runner.SLURM, 

43 job_name: str = None, 

44 sbatch_options: list[str] = None, 

45 slurm_prepend: str | list[str] | Path = None, 

46 base_dir: Path = Path()) -> Run: 

47 """Construct the Selector. 

48 

49 Args: 

50 selector_scenario: The scenario to construct the Selector for. 

51 run_on: Which runner to use. Defaults to slurm. 

52 job_name: Name to give the construction job when submitting. 

53 sbatch_options: Additional options to pass to sbatch. 

54 slurm_prepend: Slurm script to prepend to the sbatch 

55 base_dir: The base directory to run the Selector in. 

56 

57 Returns: 

58 The construction Run 

59 """ 

60 selection_scenario.create_scenario() 

61 selector = self.selector_class( 

62 self.model_class, ScenarioMetadata( 

63 algorithms=selection_scenario.performance_data.columns.to_list(), 

64 features=selection_scenario.feature_data.columns.to_list(), 

65 performance_metric=selection_scenario.objective.name, 

66 maximize=not selection_scenario.objective.minimise, 

67 budget=selection_scenario.solver_cutoff 

68 ) 

69 ) 

70 cmd = asf_cli.build_cli_command(selector, 

71 selection_scenario.feature_target_path, 

72 selection_scenario.performance_target_path, 

73 selection_scenario.selector_file_path) 

74 cmd = [" ".join([str(c) for c in cmd])] 

75 

76 job_name = job_name or f"Selector Construction: {selection_scenario.name}" 

77 construct = rrr.add_to_queue( 

78 runner=run_on, 

79 cmd=cmd, 

80 name=job_name, 

81 base_dir=base_dir, 

82 sbatch_options=sbatch_options, 

83 prepend=slurm_prepend) 

84 

85 if run_on == Runner.LOCAL: 

86 construct.wait() 

87 if not selection_scenario.selector_file_path.is_file(): 

88 print(f"Selector construction of {self.name} failed!") 

89 return construct 

90 

91 def run(self: Selector, 

92 selector_path: Path, 

93 instance: str, 

94 feature_data: FeatureDataFrame) -> list: 

95 """Run the Selector, returning the prediction schedule upon success.""" 

96 instance_features = feature_data.dataframe[[instance, ]] 

97 instance_features.index = instance_features.index.map("_".join) # Reduce 

98 instance_features = instance_features.T # ASF dataframe structure 

99 selector = self.selector_class.load(selector_path) 

100 schedule = selector.predict(instance_features) 

101 if schedule is None: 

102 print(f"ERROR: Selector {self.name} failed predict schedule!") 

103 return None 

104 # ASF presents result as schedule per instance, we only use one in this setting 

105 schedule = schedule[instance] 

106 for index, (solver, time) in enumerate(schedule): 

107 # Split solver name back into solver and config id 

108 solver_name, conf_index = solver.split("_", maxsplit=1) 

109 schedule[index] = (solver_name, conf_index, time) 

110 return schedule 

111 

112 

113class SelectionScenario: 

114 """A scenario for a Selector.""" 

115 

116 __selector_solver_name__ = "portfolio_selector" 

117 

118 def __init__(self: SelectionScenario, 

119 parent_directory: Path, 

120 selector: Selector, 

121 objective: SparkleObjective, 

122 performance_data: PerformanceDataFrame | Path, 

123 feature_data: FeatureDataFrame | Path, 

124 feature_extractors: list[str] = None, 

125 solver_cutoff: int | float = None, 

126 extractor_cutoff: int | float = None, 

127 ablate: bool = False, 

128 subdir_path: Path = None 

129 ) -> None: 

130 """Initialize a scenario for a selector.""" 

131 self.selector: Selector = selector 

132 self.objective: SparkleObjective = objective 

133 self.solver_cutoff: float = solver_cutoff 

134 self.extractor_cutoff: float = extractor_cutoff 

135 if subdir_path is not None: 

136 self.directory = parent_directory / subdir_path 

137 elif isinstance(performance_data, PerformanceDataFrame): 

138 self.directory: Path =\ 

139 parent_directory / selector.name / "_".join( 

140 [Path(s).name for s in performance_data.solvers]) 

141 else: 

142 self.directory = performance_data.parent 

143 self.name = f"{selector.name} on {self.directory.name}" 

144 self.selector_file_path: Path = self.directory / "portfolio_selector" 

145 self.scenario_file: Path = self.directory / "scenario.txt" 

146 self.selector_performance_path: Path =\ 

147 self.directory / "selector_performance.csv" 

148 if self.selector_performance_path.exists(): 

149 self.selector_performance_data = PerformanceDataFrame( 

150 self.selector_performance_path) 

151 else: # Create new performance data frame for selector, write to file later 

152 self.selector_performance_data = performance_data.clone() 

153 self.selector_performance_data.add_solver( 

154 SelectionScenario.__selector_solver_name__) 

155 

156 if isinstance(performance_data, PerformanceDataFrame): # Convert 

157 # Convert the dataframes to Selector Format 

158 new_column_names: list[str] = [] 

159 for solver, config_id, _ in performance_data.columns: 

160 if f"{solver}_{config_id}" not in new_column_names: 

161 new_column_names.append(f"{solver}_{config_id}") 

162 self.performance_data = performance_data.drop( 

163 [PerformanceDataFrame.column_seed], 

164 axis=1, level=2) 

165 self.performance_data = self.performance_data.droplevel([ 

166 PerformanceDataFrame.column_configuration, 

167 PerformanceDataFrame.column_meta], axis=1) 

168 self.performance_data = self.performance_data.droplevel( 

169 PerformanceDataFrame.index_objective, axis=0) 

170 self.performance_data.columns = new_column_names 

171 # Requires instances as index for both, columns as features / solvers 

172 # TODO: This should be an aggregation instead? 

173 self.performance_data.index = self.performance_data.index.droplevel("Run") 

174 # Enforce data type to be numeric 

175 self.performance_data = self.performance_data.astype(float) 

176 self.performance_target_path = self.directory / "performance_data.csv" 

177 else: # Read from Path 

178 self.performance_data: pd.DataFrame = pd.read_csv(performance_data, 

179 index_col=0) 

180 self.performance_target_path: Path = performance_data 

181 

182 if isinstance(feature_data, FeatureDataFrame): # Convert 

183 self.feature_extractors = feature_data.extractors 

184 # Features requires instances as index, columns as feature names 

185 feature_target = feature_data.dataframe.copy() 

186 feature_target.index = feature_target.index.map("_".join) # Reduce Index 

187 # ASF -> feature columns, instance rows 

188 self.feature_data: pd.DataFrame = feature_target.T.astype(float) 

189 self.feature_target_path: Path = self.directory / "feature_data.csv" 

190 else: # Read from Path 

191 self.feature_extractors = feature_extractors 

192 self.feature_data: pd.DataFrame = pd.read_csv(feature_data) 

193 self.feature_target_path: Path = feature_data 

194 

195 self.ablation_scenarios: list[SelectionScenario] = [] 

196 if ablate and len(self.performance_data.columns) > 2: 

197 for solver in self.performance_data.columns: 

198 solver_key, conf_id = solver.split("_", maxsplit=1) 

199 ablate_subdir = Path(f"ablated_{Path(solver).name}") 

200 ablated_directory = self.directory / ablate_subdir 

201 if (ablated_directory / "performance_data.csv").exists(): 

202 ablated_pd = ablated_directory / "performance_data.csv" 

203 elif isinstance(performance_data, PerformanceDataFrame): 

204 ablated_pd = performance_data.clone() 

205 ablated_pd.remove_configuration(solver_key, conf_id) 

206 else: # Note we could do this but it would be hacky? 

207 raise ValueError("Cannot ablate scenario after loading from file! " 

208 "Requires original PerformanceDataFrame.") 

209 

210 self.ablation_scenarios.append(SelectionScenario( 

211 parent_directory=self.directory, 

212 selector=selector, 

213 objective=objective, 

214 performance_data=ablated_pd, 

215 feature_data=feature_data, 

216 solver_cutoff=solver_cutoff, 

217 ablate=False, # If we set to true here, recursion would happen 

218 subdir_path=ablate_subdir) 

219 ) 

220 

221 @property 

222 def training_instances(self: SelectionScenario) -> list[str]: 

223 """Get the training instances.""" 

224 return self.performance_data.index.to_list() 

225 

226 @property 

227 def test_instances(self: SelectionScenario) -> list[str]: 

228 """Get the test instances.""" 

229 instances = self.selector_performance_data.instances 

230 return [i for i in instances if i not in self.training_instances] 

231 

232 @property 

233 def training_instance_sets(self: SelectionScenario) -> list[str]: 

234 """Get the training instance sets.""" 

235 return list(set(Path(i).parent.name for i in self.training_instances)) 

236 

237 @property 

238 def test_instance_sets(self: SelectionScenario) -> list[str]: 

239 """Get the test instance sets.""" 

240 return list(set(Path(i).parent.name for i in self.test_instances)) 

241 

242 @property 

243 def instance_sets(self: SelectionScenario) -> list[str]: 

244 """Get all the instance sets used in this scenario.""" 

245 return list(set(Path(i).parent.name 

246 for i in self.selector_performance_data.instances)) 

247 

248 @property 

249 def solvers(self: SelectionScenario) -> list[str]: 

250 """Get the solvers used for the selector.""" 

251 return self.performance_data.columns.to_list() 

252 

253 def create_scenario(self: SelectionScenario) -> None: 

254 """Prepare the scenario directories.""" 

255 self.directory.mkdir(parents=True, exist_ok=True) 

256 self.performance_data.to_csv(self.performance_target_path) 

257 self.feature_data.to_csv(self.feature_target_path) 

258 self.selector_performance_data.save_csv(self.selector_performance_path) 

259 self.create_scenario_file() 

260 

261 def create_scenario_file(self: SelectionScenario) -> None: 

262 """Create the scenario file. 

263 

264 Write the scenario to file. 

265 """ 

266 with self.scenario_file.open("w") as fout: 

267 fout.write(self.serialise()) 

268 

269 def serialise(self: SelectionScenario) -> dict: 

270 """Serialize the scenario.""" 

271 return f"selector: {self.selector.name}\n"\ 

272 f"solver_cutoff: {self.solver_cutoff}\n"\ 

273 f"extractor_cutoff: {self.extractor_cutoff}\n"\ 

274 f"ablate: {self.ablation_scenarios is not None}\n"\ 

275 f"objective: {self.objective}\n"\ 

276 f"selector_performance_data: {self.selector_performance_path}\n"\ 

277 f"performance_data: {self.performance_target_path}\n"\ 

278 f"feature_data: {self.feature_target_path}\n"\ 

279 f"feature_extractors: {','.join(self.feature_extractors)}\n" 

280 

281 @staticmethod 

282 def from_file(scenario_file: Path) -> SelectionScenario: 

283 """Reads scenario file and initalises SelectorScenario.""" 

284 if not scenario_file.is_file() and (scenario_file / "scenario.txt").is_file(): 

285 scenario_file = scenario_file / "scenario.txt" # Resolve from directory 

286 values = {key: value.strip() for key, value in 

287 [line.split(": ", maxsplit=1) for line in scenario_file.open()]} 

288 selector_class, selector_model = values["selector"].split("_", maxsplit=1) 

289 # Evaluate string to class 

290 from sklearn import ensemble 

291 from asf import selectors 

292 selector_class = getattr(selectors, selector_class) 

293 selector_model = getattr(ensemble, selector_model) 

294 selector = Selector(selector_class, selector_model) 

295 return SelectionScenario( 

296 parent_directory=scenario_file.parent, 

297 selector=selector, 

298 objective=resolve_objective(values["objective"]), 

299 performance_data=Path(values["performance_data"]), 

300 feature_data=Path(values["feature_data"]), 

301 feature_extractors=values["feature_extractors"].split(","), 

302 solver_cutoff=float(values["solver_cutoff"]), 

303 ablate=bool(values["ablate"]))