Source code for sparkle.structures.performance_dataframe

"""Module to manage performance data files and common operations on them."""
from __future__ import annotations
import ast
from typing import Any
import itertools
from pathlib import Path
import math
import numpy as np
import pandas as pd

from sparkle.types import SparkleObjective, resolve_objective


[docs] class PerformanceDataFrame(pd.DataFrame): """Class to manage performance data and common operations on them.""" missing_value = math.nan missing_objective = "UNKNOWN" index_objective = "Objective" index_instance = "Instance" index_run = "Run" multi_index_names = [index_objective, index_instance, index_run] column_value = "Value" column_seed = "Seed" column_configuration = "Configuration" multi_column_names = [column_value, column_seed, column_configuration] multi_column_dtypes = [float, int, str] def __init__(self: PerformanceDataFrame, csv_filepath: Path, solvers: list[str] = None, objectives: list[str | SparkleObjective] = None, instances: list[str] = None, n_runs: int = 1, ) -> None: """Initialise a PerformanceDataFrame. Consists of: - Columns representing the Solvers - Rows representing the result by multi-index in order of: * Objective (Static, given in constructor or read from file) * Instance * Runs (Static, given in constructor or read from file) Args: csv_filepath: If path exists, load from Path. Otherwise create new and save to this path. solvers: List of solver names to be added into the Dataframe objectives: List of SparkleObjectives or objective names. By default None, then the objectives will be derived from Sparkle Settings if possible. instances: List of instance names to be added into the Dataframe n_runs: The number of runs to consider per Solver/Objective/Instance comb. """ if csv_filepath.exists(): dtypes = {key: value for key, value in zip( PerformanceDataFrame.multi_column_names, PerformanceDataFrame.multi_column_dtypes)} df = pd.read_csv(csv_filepath, header=[0, 1], index_col=[0, 1, 2], dtype=dtypes, on_bad_lines="skip") super().__init__(df) self.csv_filepath = csv_filepath else: # Initialize empty DataFrame run_ids = list(range(1, n_runs + 1)) # We count runs from 1 # We always need objectives to maintain the dimensions if objectives is None: objectives = [PerformanceDataFrame.missing_objective] else: objectives = [str(o) for o in objectives] # We always need an instance to maintain the dimensions if instances is None: instances = [PerformanceDataFrame.missing_value] # We always need a solver to maintain the dimensions if solvers is None: solvers = [PerformanceDataFrame.missing_value] midx = pd.MultiIndex.from_product( [objectives, instances, run_ids], names=PerformanceDataFrame.multi_index_names) mcolumns = pd.MultiIndex.from_product( [solvers, PerformanceDataFrame.multi_column_names], names=["Solver", "Meta"]) super().__init__(PerformanceDataFrame.missing_value, index=midx, columns=mcolumns) self.csv_filepath = csv_filepath self.save_csv() if self.index.duplicated().any(): # Combine duplicate indices combined = self.groupby(level=[0, 1, 2]).first() duplicates = self.index[self.index.duplicated(keep="first")] # Remove all duplicate entries from self self.drop(duplicates, inplace=True) for d in duplicates: # Place combined duplicates in self self.loc[d, :] = combined.loc[d, :] # Sort the index to optimize lookup speed self.sort_index(axis=0, inplace=True) # Properties @property def num_objectives(self: PerformanceDataFrame) -> int: """Retrieve the number of objectives in the DataFrame.""" return self.index.get_level_values(0).unique().size @property def num_instances(self: PerformanceDataFrame) -> int: """Return the number of instances.""" return self.index.get_level_values(1).unique().size @property def num_runs(self: PerformanceDataFrame) -> int: """Return the maximum number of runs of each instance.""" return self.index.get_level_values(2).unique().size @property def num_solvers(self: PerformanceDataFrame) -> int: """Return the number of solvers.""" return self.columns.get_level_values(0).unique().size @property def multi_objective(self: PerformanceDataFrame) -> bool: """Return whether the dataframe represent MO or not.""" return self.num_objectives > 1 @property def solvers(self: PerformanceDataFrame) -> list[str]: """Return the solver present as a list of strings.""" return self.columns.get_level_values(0).unique().to_list() @property def objective_names(self: PerformanceDataFrame) -> list[str]: """Return the objective names as a list of strings.""" return self.index.get_level_values(0).unique().to_list() @property def objectives(self: PerformanceDataFrame) -> list[SparkleObjective]: """Return the objectives as a list of SparkleObjectives.""" return [resolve_objective(o) for o in self.objective_names] @property def instances(self: PerformanceDataFrame) -> list[str]: """Return the instances as a Pandas Index object.""" return self.index.get_level_values(1).unique().to_list() @property def run_ids(self: PerformanceDataFrame) -> list[int]: """Return the run ids as a list of integers.""" return self.index.get_level_values(2).unique().to_list() @property def has_missing_values(self: PerformanceDataFrame) -> bool: """Returns True if there are any missing values in the dataframe.""" return self.isnull().any().drop([PerformanceDataFrame.column_seed, PerformanceDataFrame.column_configuration], level=1).any()
[docs] def verify_objective(self: PerformanceDataFrame, objective: str) -> str: """Method to check whether the specified objective is valid. Users are allowed to index the dataframe without specifying all dimensions. However, when dealing with multiple objectives this is not allowed and this is verified here. If we have only one objective this is returned. Otherwise, if an objective is specified by the user this is returned. Args: objective: The objective given by the user """ if objective is None: if self.multi_objective: raise ValueError("Error: MO Data, but objective not specified.") elif self.num_objectives == 1: return self.objective_names[0] else: return PerformanceDataFrame.missing_objective return objective
[docs] def verify_run_id(self: PerformanceDataFrame, run_id: int) -> int: """Method to check whether run id is valid. Similar to verify_objective but here we check the dimensionality of runs. Args: run_id: the run as specified by the user. """ if run_id is None: if self.num_runs > 1: raise ValueError("Error: Multiple run performance data, " "but run not specified") else: run_id = self.run_ids[0] return run_id
[docs] def verify_indexing(self: PerformanceDataFrame, objective: str, run_id: int) -> tuple[str, int]: """Method to check whether data indexing is correct. Users are allowed to use the Performance Dataframe without the second and fourth dimension (Objective and Run respectively) in the case they only have one objective or only do one run. This method adjusts the indexing for those cases accordingly. Args: objective: The given objective name run_id: The given run index Returns: A tuple representing the (possibly adjusted) Objective and Run index. """ objective = self.verify_objective(objective) run_id = self.verify_run_id(run_id) return objective, run_id
# Getters and Setters
[docs] def add_solver(self: PerformanceDataFrame, solver_name: str, initial_value: float | list[str | float] = None) -> None: """Add a new solver to the dataframe. Initializes value to None by default. Args: solver_name: The name of the solver to be added. initial_value: The value assigned for each index of the new solver. If not None, must match the index dimension (n_obj * n_inst * n_runs). """ if solver_name in self.solvers: print(f"WARNING: Tried adding already existing solver {solver_name} to " f"Performance DataFrame: {self.csv_filepath}") return initial_value =\ [initial_value] if not isinstance(initial_value, list) else initial_value column_dim_size = len(PerformanceDataFrame.multi_column_names) if len(initial_value) < column_dim_size: initial_value.extend([None] * (column_dim_size - len(initial_value))) for field, value in zip(PerformanceDataFrame.multi_column_names, initial_value): self[solver_name, field] = value if self.num_solvers == 2: # Remove nan solver for solver in self.solvers: if str(solver) == str(PerformanceDataFrame.missing_value): self.remove_solver(solver) break
[docs] def add_objective(self: PerformanceDataFrame, objective_name: str, initial_value: float = None) -> None: """Add an objective to the DataFrame.""" initial_value = initial_value or self.missing_value if objective_name in self.objective_names: print(f"WARNING: Tried adding already existing objective {objective_name} " f"to Performance DataFrame: {self.csv_filepath}") return for instance, run in itertools.product(self.instances, self.run_ids): self.loc[(objective_name, instance, run)] = initial_value self.sort_index(axis=0, inplace=True)
[docs] def add_instance(self: PerformanceDataFrame, instance_name: str, initial_value: float = None) -> None: """Add and instance to the DataFrame.""" initial_value = initial_value or self.missing_value if instance_name in self.instances: print(f"WARNING: Tried adding already existing instance {instance_name} " f"to Performance DataFrame: {self.csv_filepath}") return # Add rows for all combinations for objective, run in itertools.product(self.objective_names, self.run_ids): self.loc[(objective, instance_name, run)] = initial_value if self.num_instances == 2: # Remove nan instance for instance in self.instances: if not isinstance(instance, str) and math.isnan(instance): self.remove_instance(instance) break # Sort the index to optimize lookup speed self.sort_index(axis=0, inplace=True)
[docs] def add_runs(self: PerformanceDataFrame, num_extra_runs: int, instance_names: list[str] = None) -> None: """Add runs to the DataFrame. Args: num_extra_runs: The number of runs to be added. instance_names: The instances for which runs are to be added. By default None, which means runs are added to all instances. """ instance_names = self.instances if instance_names is None else instance_names for instance in instance_names: for objective in self.objective_names: index_runs_start = len(self.loc[(objective, instance)]) + 1 for run in range(index_runs_start, index_runs_start + num_extra_runs): self.loc[(objective, instance, run)] = self.missing_value # Sort the index to optimize lookup speed # NOTE: It would be better to do this at the end, but that results in # PerformanceWarning: indexing past lexsort depth may impact performance. self.sort_index(axis=0, inplace=True)
[docs] def remove_solver(self: PerformanceDataFrame, solver_name: str | list[str]) -> None: """Drop one or more solvers from the Dataframe.""" # To make sure objectives / runs are saved when no solvers are present if self.num_solvers == 1: for field in PerformanceDataFrame.multi_column_names: self[PerformanceDataFrame.missing_value, field] =\ PerformanceDataFrame.missing_value self.drop(columns=solver_name, level=0, axis=1, inplace=True)
[docs] def remove_instance(self: PerformanceDataFrame, instance_name: str) -> None: """Drop an instance from the Dataframe.""" # To make sure objectives / runs are saved when no instances are present if self.num_instances == 1: for objective, run in itertools.product(self.objective_names, self.run_ids): self.loc[(objective, PerformanceDataFrame.missing_value, run)] =\ PerformanceDataFrame.missing_value self.drop(instance_name, axis=0, level=PerformanceDataFrame.index_instance, inplace=True) # Sort the index to optimize lookup speed self.sort_index(axis=0, inplace=True)
[docs] def remove_runs(self: PerformanceDataFrame, runs: int | list[int], instance_names: list[str] = None) -> None: """Drop one or more runs from the Dataframe. Args: runs: The run indices to be removed. If its an int, the last n runs are removed. NOTE: If each instance has a different number of runs, the amount of removed runs is not uniform. instance_names: The instances for which runs are to be removed. By default None, which means runs are removed from all instances. """ instance_names = self.instances if instance_names is None else instance_names runs = list(range((self.num_runs + 1) - runs, (self.num_runs + 1)))\ if isinstance(runs, int) else runs self.drop(runs, axis=0, level=PerformanceDataFrame.index_run, inplace=True) # Sort the index to optimize lookup speed self.sort_index(axis=0, inplace=True)
[docs] def remove_empty_runs(self: PerformanceDataFrame) -> None: """Remove runs that contain no data, except for the first.""" for row_index in self.index: if row_index[2] == 1: # First run, never delete continue if self.loc[row_index].isna().all(): self.drop(row_index, inplace=True)
[docs] def reset_value(self: PerformanceDataFrame, solver: str, instance: str, objective: str = None, run: int = None) -> None: """Reset a value in the dataframe.""" self.set_value(PerformanceDataFrame.missing_value, solver, instance, objective, run)
[docs] def set_value(self: PerformanceDataFrame, value: float | str | list[float | str] | list[list[float | str]], solver: str | list[str], instance: str | list[str], objective: str | list[str] = None, run: int | list[int] = None, solver_fields: list[str] = ["Value"], append_write_csv: bool = False) -> None: """Setter method to assign a value to the Dataframe. Allows for setting the same value to multiple indices. Args: value: Value(s) to be assigned. If value is a list, first dimension is the solver field, second dimension is if multiple different values are to be assigned. Must be the same shape as target. solver: The solver(s) for which the value should be set. If solver is a list, multiple solvers are set. If None, all solvers are set. instance: The instance(s) for which the value should be set. If instance is a list, multiple instances are set. If None, all instances are set. objective: The objectives for which the value should be set. When left None, set for all objectives run: The run index for which the value should be set. If left None, set for all runs. solver_fields: The level to which each value should be assinged. Defaults to ["Value"]. append_write_csv: For concurrent writing to the PerformanceDataFrame. If True, the value is directly appended to the CSV file. This will create duplicate entries in the file, but these are combined when loading the file. """ # Convert indices to slices for None values solver = slice(solver) if solver is None else solver instance = slice(instance) if instance is None else instance objective = slice(objective) if objective is None else objective run = slice(run) if run is None else run # Convert column indices to slices for setting multiple columns value = [value] if not isinstance(value, list) else value # NOTE: We currently forloop levels here, as it allows us to set the same # sequence of values to the indices for item, level in zip(value, solver_fields): self.loc[(objective, instance, run), (solver, level)] = item if append_write_csv: writeable = self.loc[(objective, instance, run), :] if isinstance(writeable, pd.Series): # Single row, convert to pd.DataFrame writeable = self.loc[[(objective, instance, run)], :] # Append the new rows to the dataframe csv file writeable.to_csv(self.csv_filepath, mode="a", header=False)
[docs] def get_value(self: PerformanceDataFrame, solver: str | list[str], instance: str | list[str], objective: str = None, run: int = None, solver_fields: list[str] = ["Value"] ) -> float | str | list[Any]: """Index a value of the DataFrame and return it.""" # Convert indices to slices for None values solver = slice(solver) if solver is None else solver instance = slice(instance) if instance is None else instance objective = slice(objective) if objective is None else objective run = slice(run) if run is None else run target = self.loc[(objective, instance, run), (solver, solver_fields)].values # Reduce dimensions when relevant if isinstance(target[0], np.ndarray) and len(target[0]) == 1: target = target.flatten() target = target.tolist() if len(target) == 1: return target[0] return target
# This method can be removed now that above method does its job
[docs] def get_values(self: PerformanceDataFrame, solver: str, instance: str = None, objective: str = None, run: int = None, solver_fields: list[str] = ["Value"] ) -> list[float | str] | list[list[float | str]]: """Return a list of solver values.""" subdf = self[solver][solver_fields] if objective is not None: objective = self.verify_objective(objective) subdf = subdf.xs(objective, level=0, drop_level=False) if instance is not None: subdf = subdf.xs(instance, level=1, drop_level=False) if run is not None: run = self.verify_run_id(run) subdf = subdf.xs(run, level=2, drop_level=False) # Convert dict to list result = [subdf[field].to_list() for field in solver_fields] if len(result) == 1: return result[0] return result
[docs] def get_instance_num_runs(self: PerformanceDataFrame, instance: str) -> int: """Return the number of runs for an instance.""" # We assume each objective has the same index for Instance/Runs return len(self.loc[(self.objective_names[0], instance)].index)
# Calculables
[docs] def mean(self: PerformanceDataFrame, objective: str = None, solver: str = None, instance: str = None) -> float: """Return the mean value of a slice of the dataframe.""" objective = self.verify_objective(objective) subset = self.xs(objective, level=0) if solver is not None: subset = subset.xs(solver, axis=1, drop_level=False) if instance is not None: subset = subset.xs(instance, axis=0, drop_level=False) value = subset.astype(float).mean() if isinstance(value, pd.Series): return value.mean() return value
# TODO: This method should be refactored or not exist
[docs] def get_job_list(self: PerformanceDataFrame, rerun: bool = False) \ -> list[tuple[str, str]]: """Return a list of performance computation jobs there are to be done. Get a list of tuple[instance, solver] to run from the performance data. If rerun is False (default), get only the tuples that don't have a value, else (True) get all the tuples. Args: rerun: Boolean indicating if we want to rerun all jobs Returns: A list of [instance, solver] combinations """ # Format the dataframe such that only the values remain df = self.stack(future_stack=True) df.drop([PerformanceDataFrame.column_seed, PerformanceDataFrame.column_configuration], level=-1, inplace=True) df.index.droplevel() if not rerun: # Filter the nan values df = df.isnull() # Count the number of missing objective values for each Instance/Run/Algorithm df.index = df.index.droplevel(PerformanceDataFrame.index_objective) df.index = df.index.droplevel(-1) index_names = df.index.names df = df.groupby(df.index).agg({cname: "sum" for cname in df.columns}) df.index = pd.MultiIndex.from_tuples(df.index, names=index_names) # Return the Instance, Run, Solver combinations return [index + (column, ) for index, column in itertools.product(df.index, df.columns) if rerun or df[column][index] > 0]
# TODO: This method should be refactored or not exist
[docs] def remaining_jobs(self: PerformanceDataFrame) -> dict[str, list[str]]: """Return a dictionary for empty values as instance key and solver values.""" remaining_jobs = {} jobs = self.get_job_list(rerun=False) for instance, _, solver in jobs: if instance not in remaining_jobs: remaining_jobs[instance] = [solver] else: remaining_jobs[instance].append(solver) return remaining_jobs
[docs] def configuration_performance( self: PerformanceDataFrame, solver: str, configuration: dict, objective: str | SparkleObjective = None, instances: list[str] = None, per_instance: bool = False) -> tuple[dict, float]: """Return the configuration performance for objective over the instances. Args: solver: The solver for which we determine evaluate the configuration configuration: The configuration to evaluate objective: The objective for which we calculate find the best value instances: The instances which should be selected for the evaluation per_instance: Whether to return the performance per instance, or aggregated. Returns: The best configuration and its aggregated performance. """ objective = self.verify_objective(objective) instances = instances or slice(instances) # Convert None to slice if isinstance(objective, str): objective = resolve_objective(objective) # Filter objective subdf = self.xs(objective.name, level=0, drop_level=True) if configuration: # Filter configuration if not isinstance(configuration, dict): # Get empty configuration subdf = subdf[subdf[solver][ PerformanceDataFrame.column_configuration].isna()] else: subdf = subdf[subdf[solver][ PerformanceDataFrame.column_configuration] == str(configuration)] # Filter solver subdf = subdf.xs(solver, axis=1, drop_level=True) # Drop the seed, filter instances subdf = subdf.drop(PerformanceDataFrame.column_seed, axis=1).loc[instances, :] # Aggregate the runs per instance/configuration try: # Can only aggregate numerical values subdf[PerformanceDataFrame.column_value] =\ pd.to_numeric(subdf[PerformanceDataFrame.column_value]) # Ensure type subdf = subdf.groupby([PerformanceDataFrame.index_instance, PerformanceDataFrame.column_configuration], dropna=False).agg(objective.run_aggregator.__name__) except ValueError: subdf.drop(PerformanceDataFrame.column_configuration, axis=1, inplace=True) return configuration, subdf.values.flatten().tolist() if per_instance: # No instance aggregation # NOTE: How do we select the best configuration now if conf == None? return configuration, subdf.values.flatten().tolist() # Aggregate the instances per configuration subdf = subdf.droplevel(level=0).reset_index() # Drop instance column subdf = subdf.groupby(PerformanceDataFrame.column_configuration, dropna=False).agg( func=objective.instance_aggregator.__name__) if configuration: return configuration, subdf.values[0][0] # In case of no configuration given, select the one with best objective value best_index = subdf.idxmin() if objective.minimise else subdf.idxmax() try: best_configuration = ast.literal_eval(best_index.values[0]) except Exception: # Configuration is not a dictionary best_value = subdf.min() if objective.minimise else subdf.max() return {}, best_value.values[0] return (best_configuration, subdf.loc[best_index, PerformanceDataFrame.column_value].values[0])
[docs] def best_configuration(self: PerformanceDataFrame, solver: str, objective: SparkleObjective = None, instances: list[str] = None) -> tuple[dict, float]: """Return the best configuration for the given objective over the instances. Args: solver: The solver for which we determine the best configuration objective: The objective for which we calculate the best configuration instances: The instances which should be selected for the evaluation Returns: The best configuration and its aggregated performance. """ return self.configuration_performance(solver, None, objective, instances)
[docs] def best_instance_performance( self: PerformanceDataFrame, objective: str | SparkleObjective = None, run_id: int = None, exclude_solvers: list[str] = None) -> pd.Series: """Return the best performance for each instance in the portfolio. Args: objective: The objective for which we calculate the best performance run_id: The run for which we calculate the best performance. If None, we consider all runs. exclude_solvers: List of solvers to exclude in the calculation. Returns: The best performance for each instance in the portfolio. """ objective = self.verify_objective(objective) if isinstance(objective, str): objective = resolve_objective(objective) # Drop Seed/Configuration subdf = self.drop( [PerformanceDataFrame.column_seed, PerformanceDataFrame.column_configuration], axis=1, level=1) subdf = subdf.xs(objective.name, level=0) if exclude_solvers is not None: subdf = subdf.drop(exclude_solvers, axis=1, level=0) if run_id is not None: run_id = self.verify_run_id(run_id) subdf = subdf.xs(run_id, level=1) else: # Drop the run level subdf = subdf.droplevel(level=1) if objective.minimise: series = subdf.min(axis=1) else: series = subdf.max(axis=1) # Ensure we always return the best for each run series = series.sort_values(ascending=objective.minimise) return series.groupby(series.index).first().astype(float)
[docs] def best_performance( self: PerformanceDataFrame, exclude_solvers: list[str] = [], objective: str | SparkleObjective = None) -> float: """Return the overall best performance of the portfolio. Args: exclude_solvers: List of solvers to exclude in the calculation. Defaults to none. objective: The objective for which we calculate the best performance Returns: The aggregated best performance of the portfolio over all instances. """ objective = self.verify_objective(objective) if isinstance(objective, str): objective = resolve_objective(objective) instance_best = self.best_instance_performance( objective, exclude_solvers=exclude_solvers).to_numpy(dtype=float) return objective.instance_aggregator(instance_best)
[docs] def schedule_performance( self: PerformanceDataFrame, schedule: dict[str: list[tuple[str, float | None]]], target_solver: str = None, objective: str | SparkleObjective = None) -> float: """Return the performance of a selection schedule on the portfolio. Args: schedule: Compute the best performance according to a selection schedule. A dictionary with instances as keys and a list of tuple consisting of (solver, max_runtime) or solvers if no runtime prediction should be used. target_solver: If not None, store the values in this solver of the DF. objective: The objective for which we calculate the best performance Returns: The performance of the schedule over the instances in the dictionary. """ objective = self.verify_objective(objective) if isinstance(objective, str): objective = resolve_objective(objective) select = min if objective.minimise else max performances = [0.0] * len(schedule.keys()) for ix, instance in enumerate(schedule.keys()): for iy, (solver, max_runtime) in enumerate(schedule[instance]): performance = float(self.get_value(solver, instance, objective.name)) if max_runtime is not None: # We are dealing with runtime performances[ix] += performance if performance < max_runtime: break # Solver finished in time else: # Quality, we take the best found performance if iy == 0: # First solver, set initial value performances[ix] = performance continue performances[ix] = select(performances[ix], performance) if target_solver is not None: self.set_value(performances[ix], target_solver, instance, objective.name) return performances
[docs] def marginal_contribution( self: PerformanceDataFrame, objective: str | SparkleObjective = None, sort: bool = False) -> list[float]: """Return the marginal contribution of the solvers on the instances. Args: objective: The objective for which we calculate the marginal contribution. sort: Whether to sort the results afterwards Returns: The marginal contribution of each solver. """ output = [] objective = self.verify_objective(objective) if isinstance(objective, str): objective = resolve_objective(objective) best_performance = self.best_performance(objective=objective) for solver in self.solvers: # By calculating the best performance excluding this Solver, # we can determine its relative impact on the portfolio. missing_solver_best = self.best_performance( exclude_solvers=[solver], objective=objective) # Now we need to see how much the portfolio's best performance # decreases without this solver. marginal_contribution = missing_solver_best / best_performance if missing_solver_best == best_performance: # No change, no contribution marginal_contribution = 0.0 output.append((solver, marginal_contribution, missing_solver_best)) if sort: output.sort(key=lambda x: x[1], reverse=objective.minimise) return output
[docs] def get_solver_ranking(self: PerformanceDataFrame, objective: str | SparkleObjective = None ) -> list[tuple[str, float]]: """Return a list with solvers ranked by average performance.""" objective = self.verify_objective(objective) if isinstance(objective, str): objective = resolve_objective(objective) # Drop Seed/Configuration subdf = self.drop( [PerformanceDataFrame.column_seed, PerformanceDataFrame.column_configuration], axis=1, level=1) sub_df = subdf.loc(axis=0)[objective.name, :, :] # Reduce Runs Dimension sub_df = sub_df.droplevel("Run").astype(float) # By using .__name__, pandas converts it to a Pandas Aggregator function sub_df = sub_df.groupby(sub_df.index).agg(func=objective.run_aggregator.__name__) solver_ranking = [(solver, objective.instance_aggregator( sub_df[solver].astype(float))) for solver in self.solvers] # Sort the list by second value (the performance) solver_ranking.sort(key=lambda performance: performance[1], reverse=(not objective.minimise)) return solver_ranking
[docs] def save_csv(self: PerformanceDataFrame, csv_filepath: Path = None) -> None: """Write a CSV to the given path. Args: csv_filepath: String path to the csv file. Defaults to self.csv_filepath. """ csv_filepath = self.csv_filepath if csv_filepath is None else csv_filepath self.to_csv(csv_filepath)
[docs] def clone(self: PerformanceDataFrame, csv_filepath: Path = None) -> PerformanceDataFrame: """Create a copy of this object. Args: csv_filepath: The new filepath to use for saving the object to. Warning: If the original path is used, it could lead to dataloss! """ csv_filepath = csv_filepath or self.csv_filepath if self.csv_filepath.exists(): pd_copy = PerformanceDataFrame(csv_filepath) else: pd_copy = PerformanceDataFrame( csv_filepath=csv_filepath, solvers=self.solvers, objectives=self.objectives, instances=self.instances, n_runs=self.num_runs) for solver in self.solvers: for index in self.index: for field in PerformanceDataFrame.multi_column_names: pd_copy.at[index, (solver, field)] =\ self.loc[index, solver][field] return pd_copy
[docs] def clean_csv(self: PerformanceDataFrame) -> None: """Set all values in Performance Data to None.""" self[:] = PerformanceDataFrame.missing_value self.save_csv()
[docs] def to_autofolio(self: PerformanceDataFrame, objective: SparkleObjective = None, target: Path = None) -> Path: """Port the data to a format acceptable for AutoFolio.""" if (objective is None and self.multi_objective or self.num_runs > 1): print(f"ERROR: Currently no porting available for {self.csv_filepath} " "to Autofolio due to multi objective or number of runs.") return autofolio_df = super().copy() # Drop Seed/Configuration, then drop the level autofolio_df = autofolio_df.drop([PerformanceDataFrame.column_seed, PerformanceDataFrame.column_configuration], axis=1, level=1).droplevel(level=1, axis=1) if objective is not None: autofolio_df = autofolio_df.loc[objective.name] autofolio_df.index = autofolio_df.index.droplevel("Run") else: autofolio_df.index = autofolio_df.index.droplevel(["Objective", "Run"]) if target is None: path = self.csv_filepath.parent / f"autofolio_{self.csv_filepath.name}" else: path = target / f"autofolio_{self.csv_filepath.name}" autofolio_df.to_csv(path) return path