Source code for sparkle.structures.performance_dataframe

"""Module to manage performance data files and common operations on them."""
from __future__ import annotations
import ast
import copy
from typing import Any
import itertools
from pathlib import Path
import math
import numpy as np
import pandas as pd

from sparkle.types import SparkleObjective, resolve_objective


[docs] class PerformanceDataFrame(pd.DataFrame): """Class to manage performance data and common operations on them.""" missing_value = math.nan missing_objective = "UNKNOWN" default_configuration = "Default" index_objective = "Objective" index_instance = "Instance" index_run = "Run" multi_index_names = [index_objective, index_instance, index_run] column_solver = "Solver" column_configuration = "Configuration" column_meta = "Meta" column_value = "Value" column_seed = "Seed" multi_column_names = [column_solver, column_configuration, column_meta] multi_column_value = [column_value, column_seed] multi_column_dtypes = [str, int] def __init__(self: PerformanceDataFrame, csv_filepath: Path, solvers: list[str] = None, configurations: dict[str, dict[str, dict]] = None, objectives: list[str | SparkleObjective] = None, instances: list[str] = None, n_runs: int = 1) -> None: """Initialise a PerformanceDataFrame. Consists of: - Columns representing the Solvers - Rows representing the result by multi-index in order of: * Objective (Static, given in constructor or read from file) * Instance * Runs (Static, given in constructor or read from file) Args: csv_filepath: If path exists, load from Path. Otherwise create new and save to this path. solvers: List of solver names to be added into the Dataframe configurations: The configuration keys per solver to add, structured as configurations[solver][config_key] = {"parameter": "value", ..} objectives: List of SparkleObjectives or objective names. By default None, then the objectives will be derived from Sparkle Settings if possible. instances: List of instance names to be added into the Dataframe n_runs: The number of runs to consider per Solver/Objective/Instance comb. """ if csv_filepath and csv_filepath.exists(): # Read from file df = pd.read_csv(csv_filepath, header=[0, 1, 2], index_col=[0, 1, 2], dtype={"Value": str, "Seed": int}, on_bad_lines="skip", comment="$") # $ For extra data lines super().__init__(df) self.csv_filepath = csv_filepath # Load configuration mapping with self.csv_filepath.open() as f: configuration_lines = [line.strip().strip("$").split(",", maxsplit=2) for line in f.readlines() if line.startswith("$")] configurations = {s: {} for s in self.solvers} for solver, config_key, config in configuration_lines[1:]: # Skip header configurations[solver][config_key] = ast.literal_eval(config.strip('"')) else: # New PerformanceDataFrame # Initialize empty DataFrame run_ids = list(range(1, n_runs + 1)) # We count runs from 1 # We always need objectives to maintain the dimensions if objectives is None: objectives = [PerformanceDataFrame.missing_objective] else: objectives = [str(o) for o in objectives] # We always need an instance to maintain the dimensions if instances is None: instances = [PerformanceDataFrame.missing_value] # We always need a solver to maintain the dimensions if solvers is None: solvers = [PerformanceDataFrame.missing_value] midx = pd.MultiIndex.from_product( [objectives, instances, run_ids], names=PerformanceDataFrame.multi_index_names) # Create the multi index tuples if configurations is None: configurations = \ {solver: {PerformanceDataFrame.default_configuration: {}} for solver in solvers} column_tuples = [] # We cannot do .from_product here as config ids are per solver for solver in configurations.keys(): for config_id in configurations[solver].keys(): column_tuples.extend([ (solver, config_id, PerformanceDataFrame.column_seed), (solver, config_id, PerformanceDataFrame.column_value)]) mcolumns = pd.MultiIndex.from_tuples( column_tuples, names=[PerformanceDataFrame.column_solver, PerformanceDataFrame.column_configuration, PerformanceDataFrame.column_meta]) # Set dtype object to avoid inferring float for categorical objectives super().__init__(PerformanceDataFrame.missing_value, index=midx, columns=mcolumns, dtype="object") self.csv_filepath = csv_filepath # Store configuration in global attributes dictionary, see Pandas Docs self.attrs = configurations if self.index.duplicated().any(): # Combine duplicate indices combined = self.groupby(level=[0, 1, 2]).first() # We keep the last to allow overwriting existing values duplicates = self.index[self.index.duplicated(keep="last")] # Remove all duplicate entries from self self.drop(duplicates, inplace=True) for d in duplicates: # Place combined duplicates in self self.loc[d, :] = combined.loc[d, :] # Sort the index to optimize lookup speed self.sort_index(axis=0, inplace=True) self.sort_index(axis=1, inplace=True) if csv_filepath and not self.csv_filepath.exists(): # New Performance DataFrame self.save_csv() # Properties @property def num_objectives(self: PerformanceDataFrame) -> int: """Retrieve the number of objectives in the DataFrame.""" return self.index.get_level_values(0).unique().size @property def num_instances(self: PerformanceDataFrame) -> int: """Return the number of instances.""" return self.index.get_level_values(1).unique().size @property def num_runs(self: PerformanceDataFrame) -> int: """Return the maximum number of runs of each instance.""" return self.index.get_level_values(2).unique().size @property def num_solvers(self: PerformanceDataFrame) -> int: """Return the number of solvers.""" return self.columns.get_level_values(0).unique().size @property def num_solver_configurations(self: PerformanceDataFrame) -> int: """Return the number of solver configurations.""" return int(self.columns.get_level_values( # Config has a seed & value PerformanceDataFrame.column_configuration).size / 2) @property def multi_objective(self: PerformanceDataFrame) -> bool: """Return whether the dataframe represent MO or not.""" return self.num_objectives > 1 @property def solvers(self: PerformanceDataFrame) -> list[str]: """Return the solver present as a list of strings.""" # Do not return the nan solver as its not an actual solver return self.columns.get_level_values( PerformanceDataFrame.column_solver).dropna().unique().to_list() @property def configuration_ids(self: PerformanceDataFrame) -> list[str]: """Return the list of configuration keys.""" return self.columns.get_level_values( PerformanceDataFrame.column_configuration).unique().to_list() @property def configurations(self: PerformanceDataFrame) -> dict[str, dict[str, dict]]: """Return a dictionary (copy) containing the configurations for each solver.""" return copy.deepcopy(self.attrs) # Deepcopy to avoid mutation of attribute @property def objective_names(self: PerformanceDataFrame) -> list[str]: """Return the objective names as a list of strings.""" return self.index.get_level_values(0).unique().to_list() @property def objectives(self: PerformanceDataFrame) -> list[SparkleObjective]: """Return the objectives as a list of SparkleObjectives.""" return [resolve_objective(o) for o in self.objective_names] @property def instances(self: PerformanceDataFrame) -> list[str]: """Return the instances as a Pandas Index object.""" return self.index.get_level_values(1).unique().to_list() @property def run_ids(self: PerformanceDataFrame) -> list[int]: """Return the run ids as a list of integers.""" return self.index.get_level_values(2).unique().to_list() @property def has_missing_values(self: PerformanceDataFrame) -> bool: """Returns True if there are any missing values in the dataframe.""" return self.drop(PerformanceDataFrame.column_seed, level=PerformanceDataFrame.column_meta, axis=1).isnull().any().any()
[docs] def is_missing(self: PerformanceDataFrame, solver: str, instance: str,) -> int: """Checks if a solver/instance is missing values.""" return self.xs(solver, axis=1).xs( instance, axis=0, level=PerformanceDataFrame.index_instance).drop( PerformanceDataFrame.column_seed, level=PerformanceDataFrame.column_meta, axis=1).isnull().any().any()
[docs] def verify_objective(self: PerformanceDataFrame, objective: str) -> str: """Method to check whether the specified objective is valid. Users are allowed to index the dataframe without specifying all dimensions. However, when dealing with multiple objectives this is not allowed and this is verified here. If we have only one objective this is returned. Otherwise, if an objective is specified by the user this is returned. Args: objective: The objective given by the user """ if objective is None: if self.multi_objective: raise ValueError("Error: MO Data, but objective not specified.") elif self.num_objectives == 1: return self.objective_names[0] else: return PerformanceDataFrame.missing_objective return objective
[docs] def verify_run_id(self: PerformanceDataFrame, run_id: int) -> int: """Method to check whether run id is valid. Similar to verify_objective but here we check the dimensionality of runs. Args: run_id: the run as specified by the user. """ if run_id is None: if self.num_runs > 1: raise ValueError("Error: Multiple run performance data, " "but run not specified") else: run_id = self.run_ids[0] return run_id
[docs] def verify_indexing(self: PerformanceDataFrame, objective: str, run_id: int) -> tuple[str, int]: """Method to check whether data indexing is correct. Users are allowed to use the Performance Dataframe without the second and fourth dimension (Objective and Run respectively) in the case they only have one objective or only do one run. This method adjusts the indexing for those cases accordingly. Args: objective: The given objective name run_id: The given run index Returns: A tuple representing the (possibly adjusted) Objective and Run index. """ objective = self.verify_objective(objective) run_id = self.verify_run_id(run_id) return objective, run_id
# Getters and Setters
[docs] def add_solver(self: PerformanceDataFrame, solver_name: str, configurations: list[(str, dict)] = None, initial_value: float | list[str | float] = None) -> None: """Add a new solver to the dataframe. Initializes value to None by default. Args: solver_name: The name of the solver to be added. configurations: A list of configuration keys for the solver. initial_value: The value assigned for each index of the new solver. If not None, must match the index dimension (n_obj * n_inst * n_runs). """ if solver_name in self.solvers: print(f"WARNING: Tried adding already existing solver {solver_name} to " f"Performance DataFrame: {self.csv_filepath}") return if not isinstance(initial_value, list): # Single value initial_value = [[initial_value, initial_value]] if configurations is None: configurations = [(PerformanceDataFrame.default_configuration, {})] self.attrs[solver_name] = {} for (config_key, config), (value, seed) in itertools.product(configurations, initial_value): self[(solver_name, config_key, PerformanceDataFrame.column_seed)] = seed self[(solver_name, config_key, PerformanceDataFrame.column_value)] = value self.attrs[solver_name][config_key] = config if self.num_solvers == 2: # Remove nan solver for solver in self.solvers: if str(solver) == str(PerformanceDataFrame.missing_value): self.remove_solver(solver) break
[docs] def add_configuration( self: PerformanceDataFrame, solver: str, configuration_id: str | list[str], configuration: dict[str, Any] | list[dict[str, Any]] = None) -> None: """Add new configurations for a solver to the dataframe. If the key already exists, update the value. Args: solver: The name of the solver to be added. configuration_id: The name of the configuration to be added. configuration: The configuration to be added. """ if not isinstance(configuration_id, list): configuration_id = [configuration_id] if not isinstance(configuration, list): configuration = [configuration] for config_id, config in zip(configuration_id, configuration): if config_id not in self.get_configurations(solver): self[(solver, config_id, PerformanceDataFrame.column_value)] = None self[(solver, config_id, PerformanceDataFrame.column_seed)] = None self.attrs[solver][config_id] = config # Sort the index to optimize lookup speed self.sort_index(axis=1, inplace=True)
[docs] def add_objective(self: PerformanceDataFrame, objective_name: str, initial_value: float = None) -> None: """Add an objective to the DataFrame.""" initial_value = initial_value or self.missing_value if objective_name in self.objective_names: print(f"WARNING: Tried adding already existing objective {objective_name} " f"to Performance DataFrame: {self.csv_filepath}") return for instance, run in itertools.product(self.instances, self.run_ids): self.loc[(objective_name, instance, run)] = initial_value self.sort_index(axis=0, inplace=True)
[docs] def add_instance(self: PerformanceDataFrame, instance_name: str, initial_values: Any | list[Any] = None) -> None: """Add and instance to the DataFrame. Args: instance_name: The name of the instance to be added. initial_values: The values assigned for each index of the new instance. If list, must match the column dimension (Value, Seed, Configuration). """ initial_values = initial_values or self.missing_value if not isinstance(initial_values, list): initial_values = ([initial_values] * 2 # Value and Seed per target column * self.num_solver_configurations) elif len(initial_values) == len(PerformanceDataFrame.multi_column_names): initial_values = initial_values * self.num_solvers if instance_name in self.instances: print(f"WARNING: Tried adding already existing instance {instance_name} " f"to Performance DataFrame: {self.csv_filepath}") return # Add rows for all combinations for objective, run in itertools.product(self.objective_names, self.run_ids): self.loc[(objective, instance_name, run)] = initial_values if self.num_instances == 2: # Remove nan instance for instance in self.instances: if not isinstance(instance, str) and math.isnan(instance): self.remove_instances(instance) break # Sort the index to optimize lookup speed self.sort_index(axis=0, inplace=True)
[docs] def add_runs(self: PerformanceDataFrame, num_extra_runs: int, instance_names: list[str] = None, initial_values: Any | list[Any] = None) -> None: """Add runs to the DataFrame. Args: num_extra_runs: The number of runs to be added. instance_names: The instances for which runs are to be added. By default None, which means runs are added to all instances. initial_values: The initial value for each objective of each new run. If a list, needs to have a value for Value, Seed and Configuration. """ initial_values = initial_values or self.missing_value if not isinstance(initial_values, list): initial_values =\ [initial_values] * self.num_solvers * 2 # Value and Seed elif len(initial_values) == 2: # Value and seed provided initial_values = initial_values * self.num_solvers instance_names = self.instances if instance_names is None else instance_names for objective, instance in itertools.product(self.objective_names, instance_names): index_runs_start = len(self.loc[(objective, instance)]) + 1 for run in range(index_runs_start, index_runs_start + num_extra_runs): self.loc[(objective, instance, run)] = initial_values # Sort the index to optimize lookup speed # NOTE: It would be better to do this at the end, but that results in # PerformanceWarning: indexing past lexsort depth may impact performance. self.sort_index(axis=0, inplace=True)
[docs] def get_configurations(self: PerformanceDataFrame, solver_name: str) -> list[str]: """Return the list of configuration keys for a solver.""" return list(self[solver_name].columns.get_level_values( PerformanceDataFrame.column_configuration).unique())
[docs] def get_full_configuration(self: PerformanceDataFrame, solver: str, configuration_id: str | list[str]) -> dict | list[dict]: """Return the actual configuration associated with the configuration key.""" if isinstance(configuration_id, str): return self.attrs[solver][configuration_id] return [self.attrs[solver][cid] for cid in configuration_id]
[docs] def remove_solver(self: PerformanceDataFrame, solvers: str | list[str]) -> None: """Drop one or more solvers from the Dataframe.""" if not solvers: # Bugfix for when an empty list is passed to avoid nan adding return # To make sure objectives / runs are saved when no solvers are present solvers = [solvers] if isinstance(solvers, str) else solvers if self.num_solvers == 1: # This would preferrably be done after removing for field in PerformanceDataFrame.multi_column_value: self[PerformanceDataFrame.missing_value, PerformanceDataFrame.missing_value, field] =\ PerformanceDataFrame.missing_value self.drop(columns=solvers, level=0, axis=1, inplace=True) for solver in solvers: del self.attrs[solver]
[docs] def remove_configuration(self: PerformanceDataFrame, solver: str, configuration: str | list[str]) -> None: """Drop one or more configurations from the Dataframe.""" if isinstance(configuration, str): configuration = [configuration] for config in configuration: self.drop((solver, config), axis=1, inplace=True) del self.attrs[solver][config] # Sort the index to optimize lookup speed self.sort_index(axis=1, inplace=True)
[docs] def remove_objective(self: PerformanceDataFrame, objectives: str | list[str]) -> None: """Remove objective from the Dataframe.""" if len(self.objectives) < 2: raise Exception("Cannot remove last objective from PerformanceDataFrame") self.drop(objectives, axis=0, level=PerformanceDataFrame.index_objective, inplace=True)
[docs] def remove_instances(self: PerformanceDataFrame, instances: str | list[str]) -> None: """Drop instances from the Dataframe.""" # To make sure objectives / runs are saved when no instances are present num_instances = len(instances) if isinstance(instances, list) else 1 if self.num_instances - num_instances == 0: for objective, run in itertools.product(self.objective_names, self.run_ids): self.loc[(objective, PerformanceDataFrame.missing_value, run)] =\ PerformanceDataFrame.missing_value self.drop(instances, axis=0, level=PerformanceDataFrame.index_instance, inplace=True) # Sort the index to optimize lookup speed self.sort_index(axis=0, inplace=True)
[docs] def remove_runs(self: PerformanceDataFrame, runs: int | list[int], instance_names: list[str] = None) -> None: """Drop one or more runs from the Dataframe. Args: runs: The run indices to be removed. If its an int, the last n runs are removed. NOTE: If each instance has a different number of runs, the amount of removed runs is not uniform. instance_names: The instances for which runs are to be removed. By default None, which means runs are removed from all instances. """ instance_names = self.instances if instance_names is None else instance_names runs = list(range((self.num_runs + 1) - runs, (self.num_runs + 1)))\ if isinstance(runs, int) else runs self.drop(runs, axis=0, level=PerformanceDataFrame.index_run, inplace=True) # Sort the index to optimize lookup speed self.sort_index(axis=0, inplace=True)
[docs] def remove_empty_runs(self: PerformanceDataFrame) -> None: """Remove runs that contain no data, except for the first.""" for row_index in self.index: if row_index[2] == 1: # First run, never delete continue if self.loc[row_index].isna().all(): self.drop(row_index, inplace=True)
[docs] def filter_objective(self: PerformanceDataFrame, objective: str | list[str]) -> None: """Filter the Dataframe to a subset of objectives.""" if isinstance(objective, str): objective = [objective] self.drop(list(set(self.objective_names) - set(objective)), axis=0, level=PerformanceDataFrame.index_objective, inplace=True)
[docs] def reset_value(self: PerformanceDataFrame, solver: str, instance: str, objective: str = None, run: int = None) -> None: """Reset a value in the dataframe.""" self.set_value(PerformanceDataFrame.missing_value, solver, instance, objective, run)
[docs] def set_value(self: PerformanceDataFrame, value: float | str | list[float | str] | list[list[float | str]], solver: str | list[str], instance: str | list[str], configuration: str = None, objective: str | list[str] = None, run: int | list[int] = None, solver_fields: list[str] = ["Value"], append_write_csv: bool = False) -> None: """Setter method to assign a value to the Dataframe. Allows for setting the same value to multiple indices. Args: value: Value(s) to be assigned. If value is a list, first dimension is the solver field, second dimension is if multiple different values are to be assigned. Must be the same shape as target. solver: The solver(s) for which the value should be set. If solver is a list, multiple solvers are set. If None, all solvers are set. instance: The instance(s) for which the value should be set. If instance is a list, multiple instances are set. If None, all instances are set. configuration: The configuration(s) for which the value should be set. When left None, set for all configurations objective: The objectives for which the value should be set. When left None, set for all objectives run: The run index for which the value should be set. If left None, set for all runs. solver_fields: The level to which each value should be assinged. Defaults to ["Value"]. append_write_csv: For concurrent writing to the PerformanceDataFrame. If True, the value is directly appended to the CSV file. This will create duplicate entries in the file, but these are combined when loading the file. """ # Convert indices to slices for None values solver = slice(solver) if solver is None else solver configuration = slice(configuration) if configuration is None else configuration instance = slice(instance) if instance is None else instance objective = slice(objective) if objective is None else objective run = slice(run) if run is None else run # Convert column indices to slices for setting multiple columns value = [value] if not isinstance(value, list) else value # NOTE: We currently forloop levels here, as it allows us to set the same # sequence of values to the indices for item, level in zip(value, solver_fields): self.loc[(objective, instance, run), (solver, configuration, level)] = item if append_write_csv: writeable = self.loc[(objective, instance, run), :] if isinstance(writeable, pd.Series): # Single row, convert to pd.DataFrame writeable = self.loc[[(objective, instance, run)], :] # Append the new rows to the dataframe csv file writeable.to_csv(self.csv_filepath, mode="a", header=False)
[docs] def get_value(self: PerformanceDataFrame, solver: str | list[str] = None, instance: str | list[str] = None, configuration: str = None, objective: str = None, run: int = None, solver_fields: list[str] = ["Value"] ) -> float | str | list[Any]: """Index a value of the DataFrame and return it.""" # Convert indices to slices for None values solver = slice(solver) if solver is None else solver configuration = slice(configuration) if configuration is None else configuration instance = slice(instance) if instance is None else instance objective = slice(objective) if objective is None else objective solver_fields = slice(solver_fields) if solver_fields is None else solver_fields run = slice(run) if run is None else run target = self.loc[(objective, instance, run), (solver, configuration, solver_fields)].values # Reduce dimensions when relevant if len(target) > 0 and isinstance(target[0], np.ndarray) and len(target[0]) == 1: target = target.flatten() target = target.tolist() if len(target) == 1: return target[0] return target
[docs] def get_instance_num_runs(self: PerformanceDataFrame, instance: str) -> int: """Return the number of runs for an instance.""" # We assume each objective has the same index for Instance/Runs return len(self.loc[(self.objective_names[0], instance)].index)
# Calculables
[docs] def mean(self: PerformanceDataFrame, objective: str = None, solver: str = None, instance: str = None) -> float: """Return the mean value of a slice of the dataframe.""" objective = self.verify_objective(objective) subset = self.xs(objective, level=0) if solver is not None: subset = subset.xs(solver, axis=1, drop_level=False) if instance is not None: subset = subset.xs(instance, axis=0, drop_level=False) value = subset.astype(float).mean() if isinstance(value, pd.Series): return value.mean() return value
[docs] def get_job_list(self: PerformanceDataFrame, rerun: bool = False) \ -> list[tuple[str, str]]: """Return a list of performance computation jobs there are to be done. Get a list of tuple[instance, solver] to run from the performance data. If rerun is False (default), get only the tuples that don't have a value, else (True) get all the tuples. Args: rerun: Boolean indicating if we want to rerun all jobs Returns: A tuple of (solver, config, instance, run) combinations """ # Drop the seed as we are looking for nan values, not seeds df = self.drop(PerformanceDataFrame.column_seed, axis=1, level=PerformanceDataFrame.column_meta) df = df.droplevel(PerformanceDataFrame.column_meta, axis=1) if rerun: # Return all combinations # Drop objective, not needed df = df.droplevel(PerformanceDataFrame.index_objective, axis=0) result = [tuple(column) + tuple(index) for column, index in itertools.product(df.columns, df.index)] else: result = [] for (solver, config), (objective, instance, run) in itertools.product( df.columns, df.index): value = df.loc[(objective, instance, run), (solver, config)] if value is None or ( isinstance(value, (int, float)) and math.isnan(value)): result.append(tuple([solver, config, instance, run])) # Filter duplicates result = list(set(result)) return result
[docs] def configuration_performance( self: PerformanceDataFrame, solver: str, configuration: str | list[str] = None, objective: str | SparkleObjective = None, instances: list[str] = None, per_instance: bool = False) -> tuple[str, float]: """Return the (best) configuration performance for objective over the instances. Args: solver: The solver for which we determine evaluate the configuration configuration: The configuration (id) to evaluate objective: The objective for which we calculate find the best value instances: The instances which should be selected for the evaluation per_instance: Whether to return the performance per instance, or aggregated. Returns: The (best) configuration id and its aggregated performance. """ objective = self.verify_objective(objective) if isinstance(objective, str): objective = resolve_objective(objective) # Filter objective subdf = self.xs(objective.name, level=0, drop_level=True) # Filter solver subdf = subdf.xs(solver, axis=1, drop_level=True) # Drop the seed, then drop meta level as it is no longer needed subdf = subdf.drop(PerformanceDataFrame.column_seed, axis=1, level=PerformanceDataFrame.column_meta) subdf = subdf.droplevel(PerformanceDataFrame.column_meta, axis=1) # Ensure the objective is numeric subdf = subdf.astype(float) if instances: # Filter instances subdf = subdf.loc[instances, :] if configuration: # Filter configuration if not isinstance(configuration, list): configuration = [configuration] subdf = subdf.filter(configuration, axis=1) # Aggregate the runs subdf = subdf.groupby(PerformanceDataFrame.index_instance).agg( func=objective.run_aggregator.__name__) # Aggregate the instances sub_series = subdf.agg(func=objective.instance_aggregator.__name__) # Select the best configuration best_conf = sub_series.idxmin() if objective.minimise else sub_series.idxmax() if per_instance: # Return a list of instance results return best_conf, subdf[best_conf].to_list() return best_conf, sub_series[best_conf]
[docs] def best_configuration(self: PerformanceDataFrame, solver: str, objective: SparkleObjective = None, instances: list[str] = None) -> tuple[str, float]: """Return the best configuration for the given objective over the instances. Args: solver: The solver for which we determine the best configuration objective: The objective for which we calculate the best configuration instances: The instances which should be selected for the evaluation Returns: The best configuration id and its aggregated performance. """ return self.configuration_performance(solver, None, objective, instances)
[docs] def best_instance_performance( self: PerformanceDataFrame, objective: str | SparkleObjective = None, instances: list[str] = None, run_id: int = None, exclude_solvers: list[(str, str)] = None) -> pd.Series: """Return the best performance for each instance in the portfolio. Args: objective: The objective for which we calculate the best performance instances: The instances which should be selected for the evaluation run_id: The run for which we calculate the best performance. If None, we consider all runs. exclude_solvers: List of (solver, config_id) to exclude in the calculation. Returns: The best performance for each instance in the portfolio. """ objective = self.verify_objective(objective) if isinstance(objective, str): objective = resolve_objective(objective) subdf = self.drop( # Drop Seed, not needed [PerformanceDataFrame.column_seed], axis=1, level=PerformanceDataFrame.column_meta) subdf = subdf.xs(objective.name, level=0) # Drop objective if exclude_solvers is not None: subdf = subdf.drop(exclude_solvers, axis=1) if instances is not None: subdf = subdf.loc[instances, :] if run_id is not None: run_id = self.verify_run_id(run_id) subdf = subdf.xs(run_id, level=1) else: # Drop the run level subdf = subdf.droplevel(level=1) # Ensure the objective is numeric subdf = subdf.astype(float) series = subdf.min(axis=1) if objective.minimise else subdf.max(axis=1) # Ensure we always return the best for each run series = series.sort_values(ascending=objective.minimise) return series.groupby(series.index).first().astype(float)
[docs] def best_performance( self: PerformanceDataFrame, exclude_solvers: list[(str, str)] = [], instances: list[str] = None, objective: str | SparkleObjective = None) -> float: """Return the overall best performance of the portfolio. Args: exclude_solvers: List of (solver, config_id) to exclude in the calculation. Defaults to none. instances: The instances which should be selected for the evaluation If None, use all instances. objective: The objective for which we calculate the best performance Returns: The aggregated best performance of the portfolio over all instances. """ objective = self.verify_objective(objective) if isinstance(objective, str): objective = resolve_objective(objective) instance_best = self.best_instance_performance( objective, instances=instances, exclude_solvers=exclude_solvers).to_numpy(dtype=float) return objective.instance_aggregator(instance_best)
[docs] def schedule_performance( self: PerformanceDataFrame, schedule: dict[str: dict[str: (str, str, int)]], target_solver: str | tuple[str, str] = None, objective: str | SparkleObjective = None) -> float: """Return the performance of a selection schedule on the portfolio. Args: schedule: Compute the best performance according to a selection schedule. A schedule is a dictionary of instances, with a schedule per instance, consisting of a triple of solver, config_id and maximum runtime. target_solver: If not None, store the found values in this solver of the DF. objective: The objective for which we calculate the best performance Returns: The performance of the schedule over the instances in the dictionary. """ objective = self.verify_objective(objective) if isinstance(objective, str): objective = resolve_objective(objective) select = min if objective.minimise else max performances = [0.0] * len(schedule.keys()) if not isinstance(target_solver, tuple): target_conf = PerformanceDataFrame.default_configuration else: target_solver, target_conf = target_solver if target_solver and target_solver not in self.solvers: self.add_solver(target_solver) for ix, instance in enumerate(schedule.keys()): for iy, (solver, config, max_runtime) in enumerate(schedule[instance]): performance = float(self.get_value( solver, instance, config, objective.name)) if max_runtime is not None: # We are dealing with runtime performances[ix] += performance if performance < max_runtime: break # Solver finished in time else: # Quality, we take the best found performance if iy == 0: # First solver, set initial value performances[ix] = performance continue performances[ix] = select(performances[ix], performance) if target_solver is not None: self.set_value(performances[ix], target_solver, instance, target_conf, objective.name) return performances
[docs] def marginal_contribution( self: PerformanceDataFrame, objective: str | SparkleObjective = None, instances: list[str] = None, sort: bool = False) -> list[float]: """Return the marginal contribution of the solver configuration on the instances. Args: objective: The objective for which we calculate the marginal contribution. instances: The instances which should be selected for the evaluation sort: Whether to sort the results afterwards Returns: The marginal contribution of each solver. """ output = [] objective = self.verify_objective(objective) if isinstance(objective, str): objective = resolve_objective(objective) best_performance = self.best_performance(objective=objective, instances=instances) for solver in self.solvers: for config_id in self.get_configurations(solver): # By calculating the best performance excluding this Solver, # we can determine its relative impact on the portfolio. missing_solver_config_best = self.best_performance( exclude_solvers=[(solver, config_id)], instances=instances, objective=objective) # Now we need to see how much the portfolio's best performance # decreases without this solver. marginal_contribution = missing_solver_config_best / best_performance if missing_solver_config_best == best_performance: # No change, no contribution marginal_contribution = 0.0 output.append((solver, config_id, marginal_contribution, missing_solver_config_best)) if sort: output.sort(key=lambda x: x[2], reverse=objective.minimise) return output
[docs] def get_solver_ranking(self: PerformanceDataFrame, objective: str | SparkleObjective = None, instances: list[str] = None, ) -> list[tuple[str, dict, float]]: """Return a list with solvers ranked by average performance.""" objective = self.verify_objective(objective) if isinstance(objective, str): objective = resolve_objective(objective) # Drop Seed sub_df = self.drop( [PerformanceDataFrame.column_seed], axis=1, level=PerformanceDataFrame.column_meta) # Reduce objective sub_df: pd.DataFrame = sub_df.loc(axis=0)[objective.name, :, :] # Drop Objective, Meta multi index sub_df = sub_df.droplevel(PerformanceDataFrame.index_objective).droplevel( PerformanceDataFrame.column_meta, axis=1) if instances is not None: # Select instances sub_df = sub_df.loc(axis=0)[instances, ] # Ensure data is numeric sub_df = sub_df.astype(float) # Aggregate runs sub_df = sub_df.groupby(PerformanceDataFrame.index_instance).agg( func=objective.run_aggregator.__name__) # Aggregate instances sub_series = sub_df.aggregate(func=objective.instance_aggregator.__name__) # Sort by objective sub_series.sort_values(ascending=objective.minimise, inplace=True) return [(index[0], index[1], sub_series[index]) for index in sub_series.index]
[docs] def save_csv(self: PerformanceDataFrame, csv_filepath: Path = None) -> None: """Write a CSV to the given path. Args: csv_filepath: String path to the csv file. Defaults to self.csv_filepath. """ csv_filepath = self.csv_filepath if csv_filepath is None else csv_filepath self.to_csv(csv_filepath) # Append the configurations with csv_filepath.open("a") as fout: fout.write("\n$Solver,configuration_id,Configuration\n") for solver in self.solvers: for config_id in self.attrs[solver]: configuration = self.attrs[solver][config_id] fout.write(f"${solver},{config_id},{str(configuration)}\n")
[docs] def clone(self: PerformanceDataFrame, csv_filepath: Path = None) -> PerformanceDataFrame: """Create a copy of this object. Args: csv_filepath: The new filepath to use for saving the object to. If None, will not be saved. Warning: If the original path is used, it could lead to dataloss! """ pd_copy = PerformanceDataFrame( csv_filepath=csv_filepath, solvers=self.solvers, configurations=self.configurations, objectives=self.objectives, instances=self.instances, n_runs=self.num_runs) # Copy values for column_index in self.columns: for index in self.index: pd_copy.at[index, column_index] = self.loc[index, column_index] # Ensure everything is sorted? return pd_copy
[docs] def clean_csv(self: PerformanceDataFrame) -> None: """Set all values in Performance Data to None.""" self[:] = PerformanceDataFrame.missing_value self.save_csv()