Source code for sparkle.selector.selector

"""File to handle a Selector for selecting Solvers."""
from __future__ import annotations
from pathlib import Path

from sklearn.base import ClassifierMixin, RegressorMixin
from asf.cli import cli_train as asf_cli
from asf.scenario.scenario_metadata import ScenarioMetadata
from asf.predictors import AbstractPredictor
from asf.selectors.abstract_model_based_selector import AbstractModelBasedSelector

import runrunner as rrr
from runrunner import Runner, Run
import pandas as pd

from sparkle.types import SparkleObjective, resolve_objective
from sparkle.structures import FeatureDataFrame, PerformanceDataFrame


[docs] class Selector: """The Selector class for handling Algorithm Selection.""" def __init__( self: Selector, selector_class: AbstractModelBasedSelector, model_class: AbstractPredictor | ClassifierMixin | RegressorMixin) -> None: """Initialize the Selector object. Args: selector_class: The Selector class to construct. model_class: The model class the selector will use. """ self.selector_class = selector_class self.model_class = model_class @property def name(self: Selector) -> str: """Return the name of the selector.""" return f"{self.selector_class.__name__}_{self.model_class.__name__}"
[docs] def construct(self: Selector, selection_scenario: SelectionScenario, run_on: Runner = Runner.SLURM, job_name: str = None, sbatch_options: list[str] = None, slurm_prepend: str | list[str] | Path = None, base_dir: Path = Path()) -> Run: """Construct the Selector. Args: selector_scenario: The scenario to construct the Selector for. run_on: Which runner to use. Defaults to slurm. job_name: Name to give the construction job when submitting. sbatch_options: Additional options to pass to sbatch. slurm_prepend: Slurm script to prepend to the sbatch base_dir: The base directory to run the Selector in. Returns: The construction Run """ selection_scenario.create_scenario() selector = self.selector_class( self.model_class, ScenarioMetadata( algorithms=selection_scenario.performance_data.columns.to_list(), features=selection_scenario.feature_data.columns.to_list(), performance_metric=selection_scenario.objective.name, maximize=not selection_scenario.objective.minimise, budget=selection_scenario.solver_cutoff ) ) cmd = asf_cli.build_cli_command(selector, selection_scenario.feature_target_path, selection_scenario.performance_target_path, selection_scenario.selector_file_path) cmd = [" ".join([str(c) for c in cmd])] job_name = job_name or f"Selector Construction: {selection_scenario.name}" construct = rrr.add_to_queue( runner=run_on, cmd=cmd, name=job_name, base_dir=base_dir, sbatch_options=sbatch_options, prepend=slurm_prepend) if run_on == Runner.LOCAL: construct.wait() if not selection_scenario.selector_file_path.is_file(): print(f"Selector construction of {self.name} failed!") return construct
[docs] def run(self: Selector, selector_path: Path, instance: str, feature_data: FeatureDataFrame) -> list: """Run the Selector, returning the prediction schedule upon success.""" instance_features = feature_data.dataframe[[instance, ]] instance_features.index = instance_features.index.map("_".join) # Reduce instance_features = instance_features.T # ASF dataframe structure selector = self.selector_class.load(selector_path) schedule = selector.predict(instance_features) if schedule is None: print(f"ERROR: Selector {self.name} failed predict schedule!") return None # ASF presents result as schedule per instance, we only use one in this setting schedule = schedule[instance] for index, (solver, time) in enumerate(schedule): # Split solver name back into solver and config id solver_name, conf_index = solver.split("_", maxsplit=1) schedule[index] = (solver_name, conf_index, time) return schedule
[docs] class SelectionScenario: """A scenario for a Selector.""" __selector_solver_name__ = "portfolio_selector" def __init__(self: SelectionScenario, parent_directory: Path, selector: Selector, objective: SparkleObjective, performance_data: PerformanceDataFrame | Path, feature_data: FeatureDataFrame | Path, feature_extractors: list[str] = None, solver_cutoff: int | float = None, extractor_cutoff: int | float = None, ablate: bool = False, subdir_path: Path = None ) -> None: """Initialize a scenario for a selector.""" self.selector: Selector = selector self.objective: SparkleObjective = objective self.solver_cutoff: float = solver_cutoff self.extractor_cutoff: float = extractor_cutoff if subdir_path is not None: self.directory = parent_directory / subdir_path elif isinstance(performance_data, PerformanceDataFrame): self.directory: Path =\ parent_directory / selector.name / "_".join( [Path(s).name for s in performance_data.solvers]) else: self.directory = performance_data.parent self.name = f"{selector.name} on {self.directory.name}" self.selector_file_path: Path = self.directory / "portfolio_selector" self.scenario_file: Path = self.directory / "scenario.txt" self.selector_performance_path: Path =\ self.directory / "selector_performance.csv" if self.selector_performance_path.exists(): self.selector_performance_data = PerformanceDataFrame( self.selector_performance_path) else: # Create new performance data frame for selector, write to file later self.selector_performance_data = performance_data.clone() self.selector_performance_data.add_solver( SelectionScenario.__selector_solver_name__) if isinstance(performance_data, PerformanceDataFrame): # Convert # Convert the dataframes to Selector Format new_column_names: list[str] = [] for solver, config_id, _ in performance_data.columns: if f"{solver}_{config_id}" not in new_column_names: new_column_names.append(f"{solver}_{config_id}") self.performance_data = performance_data.drop( [PerformanceDataFrame.column_seed], axis=1, level=2) self.performance_data = self.performance_data.droplevel([ PerformanceDataFrame.column_configuration, PerformanceDataFrame.column_meta], axis=1) self.performance_data = self.performance_data.droplevel( PerformanceDataFrame.index_objective, axis=0) self.performance_data.columns = new_column_names # Requires instances as index for both, columns as features / solvers # TODO: This should be an aggregation instead? self.performance_data.index = self.performance_data.index.droplevel("Run") # Enforce data type to be numeric self.performance_data = self.performance_data.astype(float) self.performance_target_path = self.directory / "performance_data.csv" else: # Read from Path self.performance_data: pd.DataFrame = pd.read_csv(performance_data, index_col=0) self.performance_target_path: Path = performance_data if isinstance(feature_data, FeatureDataFrame): # Convert self.feature_extractors = feature_data.extractors # Features requires instances as index, columns as feature names feature_target = feature_data.dataframe.copy() feature_target.index = feature_target.index.map("_".join) # Reduce Index # ASF -> feature columns, instance rows self.feature_data: pd.DataFrame = feature_target.T.astype(float) self.feature_target_path: Path = self.directory / "feature_data.csv" else: # Read from Path self.feature_extractors = feature_extractors self.feature_data: pd.DataFrame = pd.read_csv(feature_data) self.feature_target_path: Path = feature_data self.ablation_scenarios: list[SelectionScenario] = [] if ablate and len(self.performance_data.columns) > 2: for solver in self.performance_data.columns: solver_key, conf_id = solver.split("_", maxsplit=1) ablate_subdir = Path(f"ablated_{Path(solver).name}") ablated_directory = self.directory / ablate_subdir if (ablated_directory / "performance_data.csv").exists(): ablated_pd = ablated_directory / "performance_data.csv" elif isinstance(performance_data, PerformanceDataFrame): ablated_pd = performance_data.clone() ablated_pd.remove_configuration(solver_key, conf_id) else: # Note we could do this but it would be hacky? raise ValueError("Cannot ablate scenario after loading from file! " "Requires original PerformanceDataFrame.") self.ablation_scenarios.append(SelectionScenario( parent_directory=self.directory, selector=selector, objective=objective, performance_data=ablated_pd, feature_data=feature_data, solver_cutoff=solver_cutoff, ablate=False, # If we set to true here, recursion would happen subdir_path=ablate_subdir) ) @property def training_instances(self: SelectionScenario) -> list[str]: """Get the training instances.""" return self.performance_data.index.to_list() @property def test_instances(self: SelectionScenario) -> list[str]: """Get the test instances.""" instances = self.selector_performance_data.instances return [i for i in instances if i not in self.training_instances] @property def training_instance_sets(self: SelectionScenario) -> list[str]: """Get the training instance sets.""" return list(set(Path(i).parent.name for i in self.training_instances)) @property def test_instance_sets(self: SelectionScenario) -> list[str]: """Get the test instance sets.""" return list(set(Path(i).parent.name for i in self.test_instances)) @property def instance_sets(self: SelectionScenario) -> list[str]: """Get all the instance sets used in this scenario.""" return list(set(Path(i).parent.name for i in self.selector_performance_data.instances)) @property def solvers(self: SelectionScenario) -> list[str]: """Get the solvers used for the selector.""" return self.performance_data.columns.to_list()
[docs] def create_scenario(self: SelectionScenario) -> None: """Prepare the scenario directories.""" self.directory.mkdir(parents=True, exist_ok=True) self.performance_data.to_csv(self.performance_target_path) self.feature_data.to_csv(self.feature_target_path) self.selector_performance_data.save_csv(self.selector_performance_path) self.create_scenario_file()
[docs] def create_scenario_file(self: SelectionScenario) -> None: """Create the scenario file. Write the scenario to file. """ with self.scenario_file.open("w") as fout: fout.write(self.serialise())
[docs] def serialise(self: SelectionScenario) -> dict: """Serialize the scenario.""" return f"selector: {self.selector.name}\n"\ f"solver_cutoff: {self.solver_cutoff}\n"\ f"extractor_cutoff: {self.extractor_cutoff}\n"\ f"ablate: {self.ablation_scenarios is not None}\n"\ f"objective: {self.objective}\n"\ f"selector_performance_data: {self.selector_performance_path}\n"\ f"performance_data: {self.performance_target_path}\n"\ f"feature_data: {self.feature_target_path}\n"\ f"feature_extractors: {','.join(self.feature_extractors)}\n"
[docs] @staticmethod def from_file(scenario_file: Path) -> SelectionScenario: """Reads scenario file and initalises SelectorScenario.""" if not scenario_file.is_file() and (scenario_file / "scenario.txt").is_file(): scenario_file = scenario_file / "scenario.txt" # Resolve from directory values = {key: value.strip() for key, value in [line.split(": ", maxsplit=1) for line in scenario_file.open()]} selector_class, selector_model = values["selector"].split("_", maxsplit=1) # Evaluate string to class from sklearn import ensemble from asf import selectors selector_class = getattr(selectors, selector_class) selector_model = getattr(ensemble, selector_model) selector = Selector(selector_class, selector_model) return SelectionScenario( parent_directory=scenario_file.parent, selector=selector, objective=resolve_objective(values["objective"]), performance_data=Path(values["performance_data"]), feature_data=Path(values["feature_data"]), feature_extractors=values["feature_extractors"].split(","), solver_cutoff=float(values["solver_cutoff"]), ablate=bool(values["ablate"]))