Coverage for src/sparkle/selector/selector.py: 89%

157 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-15 14:11 +0000

1"""File to handle a Selector for selecting Solvers.""" 

2 

3from __future__ import annotations 

4import random 

5from pathlib import Path 

6 

7from sklearn.base import ClassifierMixin, RegressorMixin 

8from asf.cli import cli_train as asf_cli 

9from asf.predictors import AbstractPredictor 

10from asf.selectors.abstract_model_based_selector import AbstractModelBasedSelector 

11 

12import runrunner as rrr 

13from runrunner import Runner, Run 

14import pandas as pd 

15 

16from sparkle.types import SparkleObjective, resolve_objective 

17from sparkle.structures import FeatureDataFrame, PerformanceDataFrame 

18from sparkle.instance import InstanceSet 

19 

20 

21class Selector: 

22 """The Selector class for handling Algorithm Selection.""" 

23 

24 selector_cli = Path(__file__).parent / "selector_cli.py" 

25 

26 def __init__( 

27 self: Selector, 

28 selector_class: AbstractModelBasedSelector, 

29 model_class: AbstractPredictor | ClassifierMixin | RegressorMixin, 

30 ) -> None: 

31 """Initialize the Selector object. 

32 

33 Args: 

34 selector_class: The (name of) Selector class to construct. 

35 model_class: The (name of) model class the selector will use. 

36 """ 

37 if isinstance(selector_class, str): # Resolve class name 

38 from asf import selectors 

39 

40 selector_class = getattr(selectors, selector_class) 

41 if isinstance(model_class, str): # Resolve class name 

42 from sklearn import ensemble 

43 

44 model_class = getattr(ensemble, model_class) 

45 self.selector_class = selector_class 

46 self.model_class = model_class 

47 

48 @property 

49 def name(self: Selector) -> str: 

50 """Return the name of the selector.""" 

51 return f"{self.selector_class.__name__}_{self.model_class.__name__}" 

52 

53 def construct( 

54 self: Selector, 

55 selection_scenario: SelectionScenario, 

56 run_on: Runner = Runner.SLURM, 

57 job_name: str = None, 

58 sbatch_options: list[str] = None, 

59 slurm_prepend: str | list[str] | Path = None, 

60 base_dir: Path = Path(), 

61 ) -> Run: 

62 """Construct the Selector. 

63 

64 Args: 

65 selection_scenario: The scenario to construct the Selector for. 

66 run_on: Which runner to use. Defaults to slurm. 

67 job_name: Name to give the construction job when submitting. 

68 sbatch_options: Additional options to pass to sbatch. 

69 slurm_prepend: Slurm script to prepend to the sbatch 

70 base_dir: The base directory to run the Selector in. 

71 

72 Returns: 

73 The construction Run 

74 """ 

75 selection_scenario.create_scenario() 

76 selector = self.selector_class( 

77 model_class=self.model_class, 

78 budget=selection_scenario.solver_cutoff, 

79 maximize=not selection_scenario.objective.minimise, 

80 ) 

81 cmd = asf_cli.build_cli_command( 

82 selector, 

83 selection_scenario.feature_target_path, 

84 selection_scenario.performance_target_path, 

85 selection_scenario.selector_file_path, 

86 ) 

87 cmd = [" ".join([str(c) for c in cmd])] 

88 

89 job_name = job_name or f"Selector Construction: {selection_scenario.name}" 

90 construct = rrr.add_to_queue( 

91 runner=run_on, 

92 cmd=cmd, 

93 name=job_name, 

94 base_dir=base_dir, 

95 sbatch_options=sbatch_options, 

96 prepend=slurm_prepend, 

97 ) 

98 

99 if run_on == Runner.LOCAL: 

100 construct.wait() 

101 if not selection_scenario.selector_file_path.is_file(): 

102 print(f"Selector construction of {self.name} failed!") 

103 return construct 

104 

105 def run( 

106 self: Selector, 

107 selector_path: Path, 

108 instance: str, 

109 feature_data: FeatureDataFrame, 

110 ) -> list: 

111 """Run the Selector, returning the prediction schedule upon success.""" 

112 instance_features = feature_data[ 

113 [ 

114 instance, 

115 ] 

116 ] 

117 instance_features.index = instance_features.index.map("_".join) # Reduce 

118 instance_features = instance_features.T # ASF dataframe structure 

119 selector = self.selector_class.load(selector_path) 

120 schedule = selector.predict(instance_features) 

121 if schedule is None: 

122 print(f"ERROR: Selector {self.name} failed predict schedule!") 

123 return None 

124 # ASF presents result as schedule per instance, we only use one in this setting 

125 schedule = schedule[instance] 

126 for index, (solver, time) in enumerate(schedule): 

127 # Split solver name back into solver and config id 

128 solver_name, conf_index = solver.split("_", maxsplit=1) 

129 schedule[index] = (solver_name, conf_index, time) 

130 return schedule 

131 

132 def run_cli( 

133 self: Selector, 

134 scenario_path: Path, 

135 instance_set: InstanceSet | list[Path], 

136 feature_data: Path, 

137 run_on: Runner = Runner.LOCAL, 

138 sbatch_options: list[str] = None, 

139 slurm_prepend: str | list[str] | Path = None, 

140 job_name: str = None, 

141 dependencies: list[Run] = None, 

142 log_dir: Path = None, 

143 ) -> Run: 

144 """Run the Selector CLI and write result to the Scenario PerformanceDataFrame. 

145 

146 Args: 

147 scenario_path: The path to the scenario with the Selector to run. 

148 instance_set: The instance set to run the Selector on. 

149 feature_data: The instance feature data to use. 

150 run_on: Which runner to use. Defaults to slurm. 

151 sbatch_options: Additional options to pass to sbatch. 

152 slurm_prepend: Slurm script to prepend to the sbatch 

153 job_name: Name to give the Slurm job when submitting. 

154 dependencies: List of dependencies to add to the job. 

155 log_dir: The directory to write logs to. 

156 

157 Returns: 

158 The Run object. 

159 """ 

160 # NOTE: The selector object and the scenario selector could differ which could 

161 # cause unintended behaviour (e.g. running a different selector than desired) 

162 instances = ( 

163 instance_set 

164 if isinstance(instance_set, list) 

165 else instance_set.instance_paths 

166 ) 

167 commands = [ 

168 f"python3 {Selector.selector_cli} " 

169 f"--selector-scenario {scenario_path} " 

170 f"--instance {instance_path} " 

171 f"--feature-data {feature_data} " 

172 f"--log-dir {log_dir} " 

173 f"--seed {random.randint(0, 2**32 - 1)}" 

174 for instance_path in instances 

175 ] 

176 

177 job_name = ( 

178 f"Run Selector: {self.name} on {len(instances)} instances" 

179 if not job_name 

180 else job_name 

181 ) 

182 import subprocess 

183 

184 r = rrr.add_to_queue( 

185 cmd=commands, 

186 name=job_name, 

187 stdout=None if run_on == Runner.LOCAL else subprocess.PIPE, # Print 

188 stderr=None if run_on == Runner.LOCAL else subprocess.PIPE, # Print 

189 base_dir=log_dir, 

190 runner=run_on, 

191 sbatch_options=sbatch_options, 

192 prepend=slurm_prepend, 

193 dependencies=dependencies, 

194 ) 

195 if run_on == Runner.LOCAL: 

196 r.wait() 

197 return r 

198 

199 

200class SelectionScenario: 

201 """A scenario for a Selector.""" 

202 

203 __selector_solver_name__ = "portfolio_selector" 

204 

205 def __init__( 

206 self: SelectionScenario, 

207 parent_directory: Path, 

208 selector: Selector, 

209 objective: SparkleObjective, 

210 performance_data: PerformanceDataFrame | Path, 

211 feature_data: FeatureDataFrame | Path, 

212 feature_extractors: list[str] = None, 

213 solver_cutoff: int | float = None, 

214 extractor_cutoff: int | float = None, 

215 ablate: bool = False, 

216 subdir_path: Path = None, 

217 ) -> None: 

218 """Initialize a scenario for a selector.""" 

219 self.selector: Selector = selector 

220 self.objective: SparkleObjective = objective 

221 self.solver_cutoff: float = solver_cutoff 

222 self.extractor_cutoff: float = extractor_cutoff 

223 if subdir_path is not None: 

224 self.directory = parent_directory / subdir_path 

225 elif isinstance(performance_data, PerformanceDataFrame): 

226 self.directory: Path = ( 

227 parent_directory 

228 / selector.name 

229 / "_".join([Path(s).name for s in performance_data.solvers]) 

230 ) 

231 else: 

232 self.directory = performance_data.parent 

233 self.name = f"{selector.name} on {self.directory.name}" 

234 self.selector_file_path: Path = self.directory / "portfolio_selector" 

235 self.scenario_file: Path = self.directory / "scenario.txt" 

236 self.selector_performance_path: Path = ( 

237 self.directory / "selector_performance.csv" 

238 ) 

239 if self.selector_performance_path.exists(): 

240 self.selector_performance_data = PerformanceDataFrame( 

241 self.selector_performance_path 

242 ) 

243 else: # Create new performance data frame for selector, write to file later 

244 self.selector_performance_data = performance_data.clone() 

245 self.selector_performance_data.add_solver( 

246 SelectionScenario.__selector_solver_name__ 

247 ) 

248 

249 if isinstance(performance_data, PerformanceDataFrame): # Convert 

250 # Convert the dataframes to Selector Format 

251 new_column_names: list[str] = [] 

252 for solver, config_id, _ in performance_data.columns: 

253 if f"{solver}_{config_id}" not in new_column_names: 

254 new_column_names.append(f"{solver}_{config_id}") 

255 self.performance_data = performance_data.drop( 

256 [PerformanceDataFrame.column_seed], axis=1, level=2 

257 ) 

258 self.performance_data = self.performance_data.droplevel( 

259 [ 

260 PerformanceDataFrame.column_configuration, 

261 PerformanceDataFrame.column_meta, 

262 ], 

263 axis=1, 

264 ) 

265 self.performance_data = self.performance_data.droplevel( 

266 PerformanceDataFrame.index_objective, axis=0 

267 ) 

268 self.performance_data.columns = new_column_names 

269 # Requires instances as index for both, columns as features / solvers 

270 # TODO: This should be an aggregation instead? 

271 self.performance_data.index = self.performance_data.index.droplevel("Run") 

272 # Enforce data type to be numeric 

273 self.performance_data = self.performance_data.astype(float) 

274 self.performance_target_path = self.directory / "performance_data.csv" 

275 else: # Read from Path 

276 self.performance_data: pd.DataFrame = pd.read_csv( 

277 performance_data, index_col=0 

278 ) 

279 self.performance_target_path: Path = performance_data 

280 

281 if isinstance(feature_data, FeatureDataFrame): # Convert 

282 self.feature_extractors = feature_data.extractors 

283 # Features requires instances as index, columns as feature names 

284 feature_target = feature_data.copy() 

285 feature_target.index = feature_target.index.map("_".join) # Reduce Index 

286 # ASF -> feature columns, instance rows 

287 self.feature_data: pd.DataFrame = feature_target.T.astype(float) 

288 self.feature_target_path: Path = self.directory / "feature_data.csv" 

289 else: # Read from Path 

290 self.feature_extractors = feature_extractors 

291 self.feature_data: pd.DataFrame = pd.read_csv(feature_data) 

292 self.feature_target_path: Path = feature_data 

293 

294 self.ablation_scenarios: list[SelectionScenario] = [] 

295 if ablate and len(self.performance_data.columns) > 2: 

296 for solver in self.performance_data.columns: 

297 solver_key, conf_id = solver.split("_", maxsplit=1) 

298 ablate_subdir = Path(f"ablated_{Path(solver).name}") 

299 ablated_directory = self.directory / ablate_subdir 

300 if (ablated_directory / "performance_data.csv").exists(): 

301 ablated_pd = ablated_directory / "performance_data.csv" 

302 elif isinstance(performance_data, PerformanceDataFrame): 

303 ablated_pd = performance_data.clone() 

304 ablated_pd.remove_configuration(solver_key, conf_id) 

305 else: # Note we could do this but it would be hacky? 

306 raise ValueError( 

307 "Cannot ablate scenario after loading from file! " 

308 "Requires original PerformanceDataFrame." 

309 ) 

310 

311 self.ablation_scenarios.append( 

312 SelectionScenario( 

313 parent_directory=self.directory, 

314 selector=selector, 

315 objective=objective, 

316 performance_data=ablated_pd, 

317 feature_data=feature_data, 

318 solver_cutoff=solver_cutoff, 

319 ablate=False, # If we set to true here, recursion would happen 

320 subdir_path=ablate_subdir, 

321 ) 

322 ) 

323 

324 @property 

325 def training_instances(self: SelectionScenario) -> list[str]: 

326 """Get the training instances.""" 

327 return self.performance_data.index.to_list() 

328 

329 @property 

330 def test_instances(self: SelectionScenario) -> list[str]: 

331 """Get the test instances.""" 

332 instances = self.selector_performance_data.instances 

333 return [i for i in instances if i not in self.training_instances] 

334 

335 @property 

336 def training_instance_sets(self: SelectionScenario) -> list[str]: 

337 """Get the training instance sets.""" 

338 # NOTE: This no longer works as instances no longer have their set in the name 

339 return list(set(Path(i).parent.name for i in self.training_instances)) 

340 

341 @property 

342 def test_instance_sets(self: SelectionScenario) -> list[str]: 

343 """Get the test instance sets.""" 

344 # NOTE: This no longer works as instances no longer have their set in the name 

345 return list(set(Path(i).parent.name for i in self.test_instances)) 

346 

347 @property 

348 def instance_sets(self: SelectionScenario) -> list[str]: 

349 """Get all the instance sets used in this scenario.""" 

350 return list( 

351 set(Path(i).parent.name for i in self.selector_performance_data.instances) 

352 ) 

353 

354 @property 

355 def solvers(self: SelectionScenario) -> list[str]: 

356 """Get the solvers used for the selector.""" 

357 return self.performance_data.columns.to_list() 

358 

359 def create_scenario(self: SelectionScenario) -> None: 

360 """Prepare the scenario directories.""" 

361 self.directory.mkdir(parents=True, exist_ok=True) 

362 self.performance_data.to_csv(self.performance_target_path) 

363 self.feature_data.to_csv(self.feature_target_path) 

364 self.selector_performance_data.save_csv(self.selector_performance_path) 

365 self.create_scenario_file() 

366 

367 def create_scenario_file(self: SelectionScenario) -> None: 

368 """Create the scenario file. 

369 

370 Write the scenario to file. 

371 """ 

372 with self.scenario_file.open("w") as fout: 

373 fout.write(self.serialise()) 

374 

375 def serialise(self: SelectionScenario) -> dict: 

376 """Serialize the scenario.""" 

377 return ( 

378 f"selector: {self.selector.name}\n" 

379 f"solver_cutoff: {self.solver_cutoff}\n" 

380 f"extractor_cutoff: {self.extractor_cutoff}\n" 

381 f"ablate: {len(self.ablation_scenarios) > 0}\n" 

382 f"objective: {self.objective}\n" 

383 f"selector_performance_data: {self.selector_performance_path}\n" 

384 f"performance_data: {self.performance_target_path}\n" 

385 f"feature_data: {self.feature_target_path}\n" 

386 f"feature_extractors: {','.join(self.feature_extractors)}\n" 

387 ) 

388 

389 @staticmethod 

390 def from_file(scenario_file: Path) -> SelectionScenario: 

391 """Reads scenario file and initalises SelectorScenario.""" 

392 if not scenario_file.is_file() and (scenario_file / "scenario.txt").is_file(): 

393 scenario_file = scenario_file / "scenario.txt" # Resolve from directory 

394 values = { 

395 key: value.strip() 

396 for key, value in [ 

397 line.split(": ", maxsplit=1) for line in scenario_file.open() 

398 ] 

399 } 

400 selector_class, selector_model = values["selector"].split("_", maxsplit=1) 

401 import ast 

402 

403 selector = Selector(selector_class, selector_model) 

404 return SelectionScenario( 

405 parent_directory=scenario_file.parent, 

406 selector=selector, 

407 objective=resolve_objective(values["objective"]), 

408 performance_data=Path(values["performance_data"]), 

409 feature_data=Path(values["feature_data"]), 

410 feature_extractors=values["feature_extractors"].split(","), 

411 solver_cutoff=float(values["solver_cutoff"]), 

412 ablate=ast.literal_eval(values["ablate"]), 

413 )