Coverage for src / sparkle / selector / selector.py: 82%

155 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 15:31 +0000

1"""File to handle a Selector for selecting Solvers.""" 

2 

3from __future__ import annotations 

4import random 

5from pathlib import Path 

6 

7 

8from sklearn.base import ClassifierMixin, RegressorMixin 

9from asf.cli import cli_train as asf_cli 

10from asf.predictors import AbstractPredictor 

11from asf.selectors.abstract_model_based_selector import AbstractModelBasedSelector 

12 

13 

14import runrunner as rrr 

15from runrunner import Runner, Run 

16import pandas as pd 

17 

18from sparkle.types import SparkleObjective, resolve_objective 

19from sparkle.structures import FeatureDataFrame, PerformanceDataFrame 

20from sparkle.instance import InstanceSet 

21 

22 

23class Selector: 

24 """The Selector class for handling Algorithm Selection.""" 

25 

26 selector_cli = Path(__file__).parent / "selector_cli.py" 

27 

28 def __init__( 

29 self: Selector, 

30 selector_class: AbstractModelBasedSelector, 

31 model_class: AbstractPredictor | ClassifierMixin | RegressorMixin, 

32 ) -> None: 

33 """Initialize the Selector object. 

34 

35 Args: 

36 selector_class: The (name of) Selector class to construct. 

37 model_class: The (name of) model class the selector will use. 

38 """ 

39 if isinstance(selector_class, str): # Resolve class name 

40 from asf import selectors 

41 

42 selector_class = getattr(selectors, selector_class) 

43 if isinstance(model_class, str): # Resolve class name 

44 from sklearn import ensemble 

45 

46 model_class = getattr(ensemble, model_class) 

47 

48 self.selector_class = selector_class 

49 self.model_class = model_class 

50 

51 @property 

52 def name(self: Selector) -> str: 

53 """Return the name of the selector.""" 

54 return f"{self.selector_class.__name__}_{self.model_class.__name__}" 

55 

56 def construct( 

57 self: Selector, 

58 selection_scenario: SelectionScenario, 

59 run_on: Runner = Runner.SLURM, 

60 job_name: str = None, 

61 sbatch_options: list[str] = None, 

62 slurm_prepend: str | list[str] | Path = None, 

63 base_dir: Path = Path(), 

64 ) -> Run: 

65 """Construct the Selector. 

66 

67 Args: 

68 selection_scenario: The scenario to construct the Selector for. 

69 run_on: Which runner to use. Defaults to slurm. 

70 job_name: Name to give the construction job when submitting. 

71 sbatch_options: Additional options to pass to sbatch. 

72 slurm_prepend: Slurm script to prepend to the sbatch 

73 base_dir: The base directory to run the Selector in. 

74 

75 Returns: 

76 The construction Run 

77 """ 

78 selection_scenario.create_scenario() 

79 selector = self.selector_class( 

80 model_class=self.model_class, 

81 budget=selection_scenario.solver_cutoff, 

82 maximize=not selection_scenario.objective.minimise, 

83 ) 

84 cmd = asf_cli.build_cli_command( 

85 selector, 

86 selection_scenario.feature_target_path, 

87 selection_scenario.performance_target_path, 

88 selection_scenario.selector_file_path, 

89 ) 

90 cmd = [" ".join([str(c) for c in cmd])] 

91 

92 job_name = job_name or f"Selector Construction {selection_scenario.name}" 

93 construct = rrr.add_to_queue( 

94 runner=run_on, 

95 cmd=cmd, 

96 name=job_name, 

97 base_dir=base_dir, 

98 sbatch_options=sbatch_options, 

99 prepend=slurm_prepend, 

100 ) 

101 

102 if run_on == Runner.LOCAL: 

103 construct.wait() 

104 if not selection_scenario.selector_file_path.is_file(): 

105 print(f"Selector construction of {self.name} failed!") 

106 return construct 

107 

108 def run( 

109 self: Selector, 

110 selector_path: Path, 

111 instance: str, 

112 feature_data: FeatureDataFrame, 

113 ) -> list: 

114 """Run the Selector, returning the prediction schedule upon success.""" 

115 instance_features = feature_data.get_instance(instance, as_dataframe=True) 

116 # instance_features = feature_data[ 

117 # [ 

118 # instance, 

119 # ] 

120 # ] 

121 # instance_features.columns = instance_features.columns.map("_".join) # Reduce columns multi index 

122 selector = self.selector_class.load(selector_path) 

123 schedule = selector.predict(instance_features) 

124 if schedule is None: 

125 print(f"ERROR: Selector {self.name} failed predict schedule!") 

126 return None 

127 # ASF presents result as schedule per instance, we only use one in this setting 

128 schedule = schedule[instance] 

129 for index, (solver, time) in enumerate(schedule): 

130 # Split solver name back into solver and config id 

131 # NOTE: There is an issue with this incase the Solver name has an "_" in its name... We need to change the delimiter to different character(s) 

132 solver_name, conf_index = solver.split("_", maxsplit=1) 

133 schedule[index] = (solver_name, conf_index, time) 

134 return schedule 

135 

136 def run_cli( 

137 self: Selector, 

138 scenario_path: Path, 

139 instance_set: InstanceSet | list[Path], 

140 feature_data: Path, 

141 run_on: Runner = Runner.LOCAL, 

142 sbatch_options: list[str] = None, 

143 slurm_prepend: str | list[str] | Path = None, 

144 job_name: str = None, 

145 dependencies: list[Run] = None, 

146 log_dir: Path = None, 

147 ) -> Run: 

148 """Run the Selector CLI and write result to the Scenario PerformanceDataFrame. 

149 

150 Args: 

151 scenario_path: The path to the scenario with the Selector to run. 

152 instance_set: The instance set to run the Selector on. 

153 feature_data: The instance feature data to use. 

154 run_on: Which runner to use. Defaults to slurm. 

155 sbatch_options: Additional options to pass to sbatch. 

156 slurm_prepend: Slurm script to prepend to the sbatch 

157 job_name: Name to give the Slurm job when submitting. 

158 dependencies: List of dependencies to add to the job. 

159 log_dir: The directory to write logs to. 

160 

161 Returns: 

162 The Run object. 

163 """ 

164 # NOTE: The selector object and the scenario selector could differ which could 

165 # cause unintended behaviour (e.g. running a different selector than desired) 

166 instances = ( 

167 instance_set 

168 if isinstance(instance_set, list) 

169 else instance_set.instance_paths 

170 ) 

171 commands = [ 

172 f"python3 {Selector.selector_cli} " 

173 f"--selector-scenario {scenario_path} " 

174 f"--instance {instance_path} " 

175 f"--feature-data {feature_data} " 

176 f"--log-dir {log_dir} " 

177 f"--seed {random.randint(0, 2**32 - 1)}" 

178 for instance_path in instances 

179 ] 

180 

181 job_name = ( 

182 f"Run Selector {self.name} on {len(instances)} instances" 

183 if not job_name 

184 else job_name 

185 ) 

186 import subprocess 

187 

188 r = rrr.add_to_queue( 

189 cmd=commands, 

190 name=job_name, 

191 stdout=None if run_on == Runner.LOCAL else subprocess.PIPE, # Print 

192 stderr=None if run_on == Runner.LOCAL else subprocess.PIPE, # Print 

193 base_dir=log_dir, 

194 runner=run_on, 

195 sbatch_options=sbatch_options, 

196 prepend=slurm_prepend, 

197 dependencies=dependencies, 

198 ) 

199 if run_on == Runner.LOCAL: 

200 r.wait() 

201 return r 

202 

203 

204class SelectionScenario: 

205 """A scenario for a Selector.""" 

206 

207 __selector_solver_name__ = "portfolio_selector" 

208 

209 def __init__( 

210 self: SelectionScenario, 

211 parent_directory: Path, 

212 selector: Selector, 

213 objective: SparkleObjective, 

214 performance_data: PerformanceDataFrame | Path, 

215 feature_data: FeatureDataFrame | Path, 

216 feature_extractors: list[str] = None, 

217 solver_cutoff: int | float = None, 

218 extractor_cutoff: int | float = None, 

219 ablate: bool = False, 

220 subdir_path: Path = None, 

221 ) -> None: 

222 """Initialize a scenario for a selector.""" 

223 self.selector: Selector = selector 

224 self.objective: SparkleObjective = objective 

225 self.solver_cutoff: float = solver_cutoff 

226 self.extractor_cutoff: float = extractor_cutoff 

227 if subdir_path is not None: 

228 self.directory = parent_directory / subdir_path 

229 elif isinstance(performance_data, PerformanceDataFrame): 

230 self.directory: Path = ( 

231 parent_directory 

232 / selector.name 

233 / "_".join([Path(s).name for s in performance_data.solvers]) 

234 ) 

235 else: 

236 self.directory = performance_data.parent 

237 self.name = f"{selector.name} on {self.directory.name}" 

238 self.selector_file_path: Path = self.directory / "portfolio_selector" 

239 self.scenario_file: Path = self.directory / "scenario.txt" 

240 self.selector_performance_path: Path = ( 

241 self.directory / "selector_performance.csv" 

242 ) 

243 if self.selector_performance_path.exists(): 

244 self.selector_performance_data = PerformanceDataFrame( 

245 self.selector_performance_path 

246 ) 

247 else: # Create new performance data frame for selector, write to file later 

248 self.selector_performance_data = performance_data.clone() 

249 self.selector_performance_data.add_solver( 

250 SelectionScenario.__selector_solver_name__ 

251 ) 

252 

253 if isinstance(performance_data, PerformanceDataFrame): # Convert 

254 # Convert the dataframes to Selector Format 

255 new_column_names: list[str] = [] 

256 for solver, config_id, _ in performance_data.columns: 

257 if f"{solver}_{config_id}" not in new_column_names: 

258 new_column_names.append(f"{solver}_{config_id}") 

259 self.performance_data = performance_data.drop( 

260 [PerformanceDataFrame.column_seed], axis=1, level=2 

261 ) 

262 self.performance_data = self.performance_data.droplevel( 

263 [ 

264 PerformanceDataFrame.column_configuration, 

265 PerformanceDataFrame.column_meta, 

266 ], 

267 axis=1, 

268 ) 

269 self.performance_data = self.performance_data.droplevel( 

270 PerformanceDataFrame.index_objective, axis=0 

271 ) 

272 self.performance_data.columns = new_column_names 

273 # Requires instances as index for both, columns as features / solvers 

274 # TODO: This should be an aggregation instead? 

275 self.performance_data.index = self.performance_data.index.droplevel("Run") 

276 # Enforce data type to be numeric 

277 self.performance_data = self.performance_data.astype(float) 

278 self.performance_target_path = self.directory / "performance_data.csv" 

279 else: # Read from Path 

280 self.performance_data: pd.DataFrame = pd.read_csv( 

281 performance_data, index_col=0 

282 ) 

283 self.performance_target_path: Path = performance_data 

284 

285 if isinstance(feature_data, FeatureDataFrame): # Convert 

286 self.feature_extractors = feature_data.extractors 

287 # Features requires instances as index, columns as feature names 

288 feature_target = feature_data.copy() 

289 feature_target.columns = feature_target.columns.map( 

290 "_".join 

291 ) # Reduce Column Multi Index to single 

292 # ASF -> feature columns, instance rows 

293 self.feature_data: pd.DataFrame = feature_target.astype(float) 

294 self.feature_target_path: Path = self.directory / "feature_data.csv" 

295 else: # Read from Path 

296 self.feature_extractors = feature_extractors 

297 self.feature_data: pd.DataFrame = pd.read_csv(feature_data) 

298 self.feature_target_path: Path = feature_data 

299 

300 self.ablation_scenarios: list[SelectionScenario] = [] 

301 if ablate and len(self.performance_data.columns) > 2: 

302 for solver in self.performance_data.columns: 

303 solver_key, conf_id = solver.split("_", maxsplit=1) 

304 ablate_subdir = Path(f"ablated_{Path(solver).name}") 

305 ablated_directory = self.directory / ablate_subdir 

306 if (ablated_directory / "performance_data.csv").exists(): 

307 ablated_pd = ablated_directory / "performance_data.csv" 

308 elif isinstance(performance_data, PerformanceDataFrame): 

309 ablated_pd = performance_data.clone() 

310 ablated_pd.remove_configuration(solver_key, conf_id) 

311 else: # Note we could do this but it would be hacky? 

312 raise ValueError( 

313 "Cannot ablate scenario after loading from file! " 

314 "Requires original PerformanceDataFrame." 

315 ) 

316 

317 self.ablation_scenarios.append( 

318 SelectionScenario( 

319 parent_directory=self.directory, 

320 selector=selector, 

321 objective=objective, 

322 performance_data=ablated_pd, 

323 feature_data=feature_data, 

324 solver_cutoff=solver_cutoff, 

325 ablate=False, # If we set to true here, recursion would happen 

326 subdir_path=ablate_subdir, 

327 ) 

328 ) 

329 

330 @property 

331 def training_instances(self: SelectionScenario) -> list[str]: 

332 """Get the training instances.""" 

333 return self.performance_data.index.to_list() 

334 

335 @property 

336 def test_instances(self: SelectionScenario) -> list[str]: 

337 """Get the test instances.""" 

338 instances = self.selector_performance_data.instances 

339 return [i for i in instances if i not in self.training_instances] 

340 

341 @property 

342 def training_instance_sets(self: SelectionScenario) -> list[str]: 

343 """Get the training instance sets.""" 

344 # NOTE: This no longer works as instances no longer have their set in the name 

345 return list(set(Path(i).parent.name for i in self.training_instances)) 

346 

347 @property 

348 def test_instance_sets(self: SelectionScenario) -> list[str]: 

349 """Get the test instance sets.""" 

350 # NOTE: This no longer works as instances no longer have their set in the name 

351 return list(set(Path(i).parent.name for i in self.test_instances)) 

352 

353 @property 

354 def instance_sets(self: SelectionScenario) -> list[str]: 

355 """Get all the instance sets used in this scenario.""" 

356 return list( 

357 set(Path(i).parent.name for i in self.selector_performance_data.instances) 

358 ) 

359 

360 @property 

361 def solvers(self: SelectionScenario) -> list[str]: 

362 """Get the solvers used for the selector.""" 

363 return self.performance_data.columns.to_list() 

364 

365 def create_scenario(self: SelectionScenario) -> None: 

366 """Prepare the scenario directories.""" 

367 self.directory.mkdir(parents=True, exist_ok=True) 

368 self.performance_data.to_csv(self.performance_target_path) 

369 self.feature_data.to_csv(self.feature_target_path) 

370 self.selector_performance_data.save_csv(self.selector_performance_path) 

371 self.create_scenario_file() 

372 

373 def create_scenario_file(self: SelectionScenario) -> None: 

374 """Create the scenario file. 

375 

376 Write the scenario to file. 

377 """ 

378 with self.scenario_file.open("w") as fout: 

379 fout.write(self.serialise()) 

380 

381 def serialise(self: SelectionScenario) -> dict: 

382 """Serialize the scenario.""" 

383 return ( 

384 f"selector: {self.selector.name}\n" 

385 f"solver_cutoff: {self.solver_cutoff}\n" 

386 f"extractor_cutoff: {self.extractor_cutoff}\n" 

387 f"ablate: {len(self.ablation_scenarios) > 0}\n" 

388 f"objective: {self.objective}\n" 

389 f"selector_performance_data: {self.selector_performance_path}\n" 

390 f"performance_data: {self.performance_target_path}\n" 

391 f"feature_data: {self.feature_target_path}\n" 

392 f"feature_extractors: {','.join(self.feature_extractors)}\n" 

393 ) 

394 

395 @staticmethod 

396 def from_file(scenario_file: Path) -> SelectionScenario: 

397 """Reads scenario file and initalises SelectorScenario.""" 

398 if not scenario_file.is_file() and (scenario_file / "scenario.txt").is_file(): 

399 scenario_file = scenario_file / "scenario.txt" # Resolve from directory 

400 values = { 

401 key: value.strip() 

402 for key, value in [ 

403 line.split(": ", maxsplit=1) for line in scenario_file.open() 

404 ] 

405 } 

406 selector_class, selector_model = values["selector"].split("_", maxsplit=1) 

407 import ast 

408 

409 selector = Selector(selector_class, selector_model) 

410 return SelectionScenario( 

411 parent_directory=scenario_file.parent, 

412 selector=selector, 

413 objective=resolve_objective(values["objective"]), 

414 performance_data=Path(values["performance_data"]), 

415 feature_data=Path(values["feature_data"]), 

416 feature_extractors=values["feature_extractors"].split(","), 

417 solver_cutoff=float(values["solver_cutoff"]), 

418 extractor_cutoff=float(values["extractor_cutoff"]), 

419 ablate=ast.literal_eval(values["ablate"]), 

420 )