Coverage for sparkle/selector/selector.py: 89%

158 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-09-29 10:17 +0000

1"""File to handle a Selector for selecting Solvers.""" 

2 

3from __future__ import annotations 

4import random 

5from pathlib import Path 

6 

7from sklearn.base import ClassifierMixin, RegressorMixin 

8from asf.cli import cli_train as asf_cli 

9from asf.scenario.scenario_metadata import ScenarioMetadata 

10from asf.predictors import AbstractPredictor 

11from asf.selectors.abstract_model_based_selector import AbstractModelBasedSelector 

12 

13import runrunner as rrr 

14from runrunner import Runner, Run 

15import pandas as pd 

16 

17from sparkle.types import SparkleObjective, resolve_objective 

18from sparkle.structures import FeatureDataFrame, PerformanceDataFrame 

19from sparkle.instance import InstanceSet 

20 

21 

22class Selector: 

23 """The Selector class for handling Algorithm Selection.""" 

24 

25 selector_cli = Path(__file__).parent / "selector_cli.py" 

26 

27 def __init__( 

28 self: Selector, 

29 selector_class: AbstractModelBasedSelector, 

30 model_class: AbstractPredictor | ClassifierMixin | RegressorMixin, 

31 ) -> None: 

32 """Initialize the Selector object. 

33 

34 Args: 

35 selector_class: The (name of) Selector class to construct. 

36 model_class: The (name of) model class the selector will use. 

37 """ 

38 if isinstance(selector_class, str): # Resolve class name 

39 from asf import selectors 

40 

41 selector_class = getattr(selectors, selector_class) 

42 if isinstance(model_class, str): # Resolve class name 

43 from sklearn import ensemble 

44 

45 model_class = getattr(ensemble, model_class) 

46 self.selector_class = selector_class 

47 self.model_class = model_class 

48 

49 @property 

50 def name(self: Selector) -> str: 

51 """Return the name of the selector.""" 

52 return f"{self.selector_class.__name__}_{self.model_class.__name__}" 

53 

54 def construct( 

55 self: Selector, 

56 selection_scenario: SelectionScenario, 

57 run_on: Runner = Runner.SLURM, 

58 job_name: str = None, 

59 sbatch_options: list[str] = None, 

60 slurm_prepend: str | list[str] | Path = None, 

61 base_dir: Path = Path(), 

62 ) -> Run: 

63 """Construct the Selector. 

64 

65 Args: 

66 selection_scenario: The scenario to construct the Selector for. 

67 run_on: Which runner to use. Defaults to slurm. 

68 job_name: Name to give the construction job when submitting. 

69 sbatch_options: Additional options to pass to sbatch. 

70 slurm_prepend: Slurm script to prepend to the sbatch 

71 base_dir: The base directory to run the Selector in. 

72 

73 Returns: 

74 The construction Run 

75 """ 

76 selection_scenario.create_scenario() 

77 selector = self.selector_class( 

78 self.model_class, 

79 ScenarioMetadata( 

80 algorithms=selection_scenario.performance_data.columns.to_list(), 

81 features=selection_scenario.feature_data.columns.to_list(), 

82 performance_metric=selection_scenario.objective.name, 

83 maximize=not selection_scenario.objective.minimise, 

84 budget=selection_scenario.solver_cutoff, 

85 ), 

86 ) 

87 cmd = asf_cli.build_cli_command( 

88 selector, 

89 selection_scenario.feature_target_path, 

90 selection_scenario.performance_target_path, 

91 selection_scenario.selector_file_path, 

92 ) 

93 cmd = [" ".join([str(c) for c in cmd])] 

94 

95 job_name = job_name or f"Selector Construction: {selection_scenario.name}" 

96 construct = rrr.add_to_queue( 

97 runner=run_on, 

98 cmd=cmd, 

99 name=job_name, 

100 base_dir=base_dir, 

101 sbatch_options=sbatch_options, 

102 prepend=slurm_prepend, 

103 ) 

104 

105 if run_on == Runner.LOCAL: 

106 construct.wait() 

107 if not selection_scenario.selector_file_path.is_file(): 

108 print(f"Selector construction of {self.name} failed!") 

109 return construct 

110 

111 def run( 

112 self: Selector, 

113 selector_path: Path, 

114 instance: str, 

115 feature_data: FeatureDataFrame, 

116 ) -> list: 

117 """Run the Selector, returning the prediction schedule upon success.""" 

118 instance_features = feature_data[ 

119 [ 

120 instance, 

121 ] 

122 ] 

123 instance_features.index = instance_features.index.map("_".join) # Reduce 

124 instance_features = instance_features.T # ASF dataframe structure 

125 selector = self.selector_class.load(selector_path) 

126 schedule = selector.predict(instance_features) 

127 if schedule is None: 

128 print(f"ERROR: Selector {self.name} failed predict schedule!") 

129 return None 

130 # ASF presents result as schedule per instance, we only use one in this setting 

131 schedule = schedule[instance] 

132 for index, (solver, time) in enumerate(schedule): 

133 # Split solver name back into solver and config id 

134 solver_name, conf_index = solver.split("_", maxsplit=1) 

135 schedule[index] = (solver_name, conf_index, time) 

136 return schedule 

137 

138 def run_cli( 

139 self: Selector, 

140 scenario_path: Path, 

141 instance_set: InstanceSet | list[Path], 

142 feature_data: Path, 

143 run_on: Runner = Runner.LOCAL, 

144 sbatch_options: list[str] = None, 

145 slurm_prepend: str | list[str] | Path = None, 

146 job_name: str = None, 

147 dependencies: list[Run] = None, 

148 log_dir: Path = None, 

149 ) -> Run: 

150 """Run the Selector CLI and write result to the Scenario PerformanceDataFrame. 

151 

152 Args: 

153 scenario_path: The path to the scenario with the Selector to run. 

154 instance_set: The instance set to run the Selector on. 

155 feature_data: The instance feature data to use. 

156 run_on: Which runner to use. Defaults to slurm. 

157 sbatch_options: Additional options to pass to sbatch. 

158 slurm_prepend: Slurm script to prepend to the sbatch 

159 job_name: Name to give the Slurm job when submitting. 

160 dependencies: List of dependencies to add to the job. 

161 log_dir: The directory to write logs to. 

162 

163 Returns: 

164 The Run object. 

165 """ 

166 # NOTE: The selector object and the scenario selector could differ which could 

167 # cause unintended behaviour (e.g. running a different selector than desired) 

168 instances = ( 

169 instance_set 

170 if isinstance(instance_set, list) 

171 else instance_set.instance_paths 

172 ) 

173 commands = [ 

174 f"python3 {Selector.selector_cli} " 

175 f"--selector-scenario {scenario_path} " 

176 f"--instance {instance_path} " 

177 f"--feature-data {feature_data} " 

178 f"--log-dir {log_dir} " 

179 f"--seed {random.randint(0, 2**32 - 1)}" 

180 for instance_path in instances 

181 ] 

182 

183 job_name = ( 

184 f"Run Selector: {self.name} on {len(instances)} instances" 

185 if not job_name 

186 else job_name 

187 ) 

188 import subprocess 

189 

190 r = rrr.add_to_queue( 

191 runner=run_on, 

192 cmd=commands, 

193 name=job_name, 

194 stdout=None if run_on == Runner.LOCAL else subprocess.PIPE, # Print 

195 stderr=None if run_on == Runner.LOCAL else subprocess.PIPE, # Print 

196 base_dir=log_dir, 

197 sbatch_options=sbatch_options, 

198 prepend=slurm_prepend, 

199 dependencies=dependencies, 

200 ) 

201 if run_on == Runner.LOCAL: 

202 r.wait() 

203 return r 

204 

205 

206class SelectionScenario: 

207 """A scenario for a Selector.""" 

208 

209 __selector_solver_name__ = "portfolio_selector" 

210 

211 def __init__( 

212 self: SelectionScenario, 

213 parent_directory: Path, 

214 selector: Selector, 

215 objective: SparkleObjective, 

216 performance_data: PerformanceDataFrame | Path, 

217 feature_data: FeatureDataFrame | Path, 

218 feature_extractors: list[str] = None, 

219 solver_cutoff: int | float = None, 

220 extractor_cutoff: int | float = None, 

221 ablate: bool = False, 

222 subdir_path: Path = None, 

223 ) -> None: 

224 """Initialize a scenario for a selector.""" 

225 self.selector: Selector = selector 

226 self.objective: SparkleObjective = objective 

227 self.solver_cutoff: float = solver_cutoff 

228 self.extractor_cutoff: float = extractor_cutoff 

229 if subdir_path is not None: 

230 self.directory = parent_directory / subdir_path 

231 elif isinstance(performance_data, PerformanceDataFrame): 

232 self.directory: Path = ( 

233 parent_directory 

234 / selector.name 

235 / "_".join([Path(s).name for s in performance_data.solvers]) 

236 ) 

237 else: 

238 self.directory = performance_data.parent 

239 self.name = f"{selector.name} on {self.directory.name}" 

240 self.selector_file_path: Path = self.directory / "portfolio_selector" 

241 self.scenario_file: Path = self.directory / "scenario.txt" 

242 self.selector_performance_path: Path = ( 

243 self.directory / "selector_performance.csv" 

244 ) 

245 if self.selector_performance_path.exists(): 

246 self.selector_performance_data = PerformanceDataFrame( 

247 self.selector_performance_path 

248 ) 

249 else: # Create new performance data frame for selector, write to file later 

250 self.selector_performance_data = performance_data.clone() 

251 self.selector_performance_data.add_solver( 

252 SelectionScenario.__selector_solver_name__ 

253 ) 

254 

255 if isinstance(performance_data, PerformanceDataFrame): # Convert 

256 # Convert the dataframes to Selector Format 

257 new_column_names: list[str] = [] 

258 for solver, config_id, _ in performance_data.columns: 

259 if f"{solver}_{config_id}" not in new_column_names: 

260 new_column_names.append(f"{solver}_{config_id}") 

261 self.performance_data = performance_data.drop( 

262 [PerformanceDataFrame.column_seed], axis=1, level=2 

263 ) 

264 self.performance_data = self.performance_data.droplevel( 

265 [ 

266 PerformanceDataFrame.column_configuration, 

267 PerformanceDataFrame.column_meta, 

268 ], 

269 axis=1, 

270 ) 

271 self.performance_data = self.performance_data.droplevel( 

272 PerformanceDataFrame.index_objective, axis=0 

273 ) 

274 self.performance_data.columns = new_column_names 

275 # Requires instances as index for both, columns as features / solvers 

276 # TODO: This should be an aggregation instead? 

277 self.performance_data.index = self.performance_data.index.droplevel("Run") 

278 # Enforce data type to be numeric 

279 self.performance_data = self.performance_data.astype(float) 

280 self.performance_target_path = self.directory / "performance_data.csv" 

281 else: # Read from Path 

282 self.performance_data: pd.DataFrame = pd.read_csv( 

283 performance_data, index_col=0 

284 ) 

285 self.performance_target_path: Path = performance_data 

286 

287 if isinstance(feature_data, FeatureDataFrame): # Convert 

288 self.feature_extractors = feature_data.extractors 

289 # Features requires instances as index, columns as feature names 

290 feature_target = feature_data.copy() 

291 feature_target.index = feature_target.index.map("_".join) # Reduce Index 

292 # ASF -> feature columns, instance rows 

293 self.feature_data: pd.DataFrame = feature_target.T.astype(float) 

294 self.feature_target_path: Path = self.directory / "feature_data.csv" 

295 else: # Read from Path 

296 self.feature_extractors = feature_extractors 

297 self.feature_data: pd.DataFrame = pd.read_csv(feature_data) 

298 self.feature_target_path: Path = feature_data 

299 

300 self.ablation_scenarios: list[SelectionScenario] = [] 

301 if ablate and len(self.performance_data.columns) > 2: 

302 for solver in self.performance_data.columns: 

303 solver_key, conf_id = solver.split("_", maxsplit=1) 

304 ablate_subdir = Path(f"ablated_{Path(solver).name}") 

305 ablated_directory = self.directory / ablate_subdir 

306 if (ablated_directory / "performance_data.csv").exists(): 

307 ablated_pd = ablated_directory / "performance_data.csv" 

308 elif isinstance(performance_data, PerformanceDataFrame): 

309 ablated_pd = performance_data.clone() 

310 ablated_pd.remove_configuration(solver_key, conf_id) 

311 else: # Note we could do this but it would be hacky? 

312 raise ValueError( 

313 "Cannot ablate scenario after loading from file! " 

314 "Requires original PerformanceDataFrame." 

315 ) 

316 

317 self.ablation_scenarios.append( 

318 SelectionScenario( 

319 parent_directory=self.directory, 

320 selector=selector, 

321 objective=objective, 

322 performance_data=ablated_pd, 

323 feature_data=feature_data, 

324 solver_cutoff=solver_cutoff, 

325 ablate=False, # If we set to true here, recursion would happen 

326 subdir_path=ablate_subdir, 

327 ) 

328 ) 

329 

330 @property 

331 def training_instances(self: SelectionScenario) -> list[str]: 

332 """Get the training instances.""" 

333 return self.performance_data.index.to_list() 

334 

335 @property 

336 def test_instances(self: SelectionScenario) -> list[str]: 

337 """Get the test instances.""" 

338 instances = self.selector_performance_data.instances 

339 return [i for i in instances if i not in self.training_instances] 

340 

341 @property 

342 def training_instance_sets(self: SelectionScenario) -> list[str]: 

343 """Get the training instance sets.""" 

344 # NOTE: This no longer works as instances no longer have their set in the name 

345 return list(set(Path(i).parent.name for i in self.training_instances)) 

346 

347 @property 

348 def test_instance_sets(self: SelectionScenario) -> list[str]: 

349 """Get the test instance sets.""" 

350 # NOTE: This no longer works as instances no longer have their set in the name 

351 return list(set(Path(i).parent.name for i in self.test_instances)) 

352 

353 @property 

354 def instance_sets(self: SelectionScenario) -> list[str]: 

355 """Get all the instance sets used in this scenario.""" 

356 return list( 

357 set(Path(i).parent.name for i in self.selector_performance_data.instances) 

358 ) 

359 

360 @property 

361 def solvers(self: SelectionScenario) -> list[str]: 

362 """Get the solvers used for the selector.""" 

363 return self.performance_data.columns.to_list() 

364 

365 def create_scenario(self: SelectionScenario) -> None: 

366 """Prepare the scenario directories.""" 

367 self.directory.mkdir(parents=True, exist_ok=True) 

368 self.performance_data.to_csv(self.performance_target_path) 

369 self.feature_data.to_csv(self.feature_target_path) 

370 self.selector_performance_data.save_csv(self.selector_performance_path) 

371 self.create_scenario_file() 

372 

373 def create_scenario_file(self: SelectionScenario) -> None: 

374 """Create the scenario file. 

375 

376 Write the scenario to file. 

377 """ 

378 with self.scenario_file.open("w") as fout: 

379 fout.write(self.serialise()) 

380 

381 def serialise(self: SelectionScenario) -> dict: 

382 """Serialize the scenario.""" 

383 return ( 

384 f"selector: {self.selector.name}\n" 

385 f"solver_cutoff: {self.solver_cutoff}\n" 

386 f"extractor_cutoff: {self.extractor_cutoff}\n" 

387 f"ablate: {len(self.ablation_scenarios) > 0}\n" 

388 f"objective: {self.objective}\n" 

389 f"selector_performance_data: {self.selector_performance_path}\n" 

390 f"performance_data: {self.performance_target_path}\n" 

391 f"feature_data: {self.feature_target_path}\n" 

392 f"feature_extractors: {','.join(self.feature_extractors)}\n" 

393 ) 

394 

395 @staticmethod 

396 def from_file(scenario_file: Path) -> SelectionScenario: 

397 """Reads scenario file and initalises SelectorScenario.""" 

398 if not scenario_file.is_file() and (scenario_file / "scenario.txt").is_file(): 

399 scenario_file = scenario_file / "scenario.txt" # Resolve from directory 

400 values = { 

401 key: value.strip() 

402 for key, value in [ 

403 line.split(": ", maxsplit=1) for line in scenario_file.open() 

404 ] 

405 } 

406 selector_class, selector_model = values["selector"].split("_", maxsplit=1) 

407 import ast 

408 

409 selector = Selector(selector_class, selector_model) 

410 return SelectionScenario( 

411 parent_directory=scenario_file.parent, 

412 selector=selector, 

413 objective=resolve_objective(values["objective"]), 

414 performance_data=Path(values["performance_data"]), 

415 feature_data=Path(values["feature_data"]), 

416 feature_extractors=values["feature_extractors"].split(","), 

417 solver_cutoff=float(values["solver_cutoff"]), 

418 ablate=ast.literal_eval(values["ablate"]), 

419 )