Coverage for sparkle/structures/performance_dataframe.py: 89%
253 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-05 14:48 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-05 14:48 +0000
1#!/usr/bin/env python3
2# -*- coding: UTF-8 -*-
3"""Module to manage performance data files and common operations on them."""
4from __future__ import annotations
5from pathlib import Path
6import sys
7import math
8import pandas as pd
10from sparkle.types import SparkleObjective, resolve_objective
13class PerformanceDataFrame():
14 """Class to manage performance data and common operations on them."""
15 missing_value = math.nan
16 missing_objective = "UNKNOWN"
17 multi_dim_names = ["Objective", "Instance", "Run"]
19 def __init__(self: PerformanceDataFrame,
20 csv_filepath: Path,
21 solvers: list[str] = [],
22 objectives: list[str | SparkleObjective] = None,
23 instances: list[str] = [],
24 n_runs: int = 1,
25 init_df: bool = True) -> None:
26 """Initialise a PerformanceDataFrame.
28 Consists of:
29 - Columns representing the Solvers
30 - Rows representing the result by multi-index in order of:
31 * Objective (Static, given in constructor or read from file)
32 * Instance
33 * Runs (Static, given in constructor or read from file)
35 Args:
36 csv_filepath: If path exists, load from Path.
37 Otherwise create new and save to this path.
38 solvers: List of solver names to be added into the Dataframe
39 objectives: List of SparkleObjectives or objective names. By default None,
40 then the objectives will be derived from Sparkle Settings if possible.
41 instances: List of instance names to be added into the Dataframe
42 n_runs: The number of runs to consider per Solver/Objective/Instance comb.
43 init_df: Whether the dataframe should be initialised. Set to false to reduce
44 heavy IO loads.
45 """
46 self.csv_filepath = csv_filepath
47 # Runs is a ``static'' dimension
48 self.n_runs = n_runs
49 self.run_ids = list(range(1, self.n_runs + 1)) # We count runs from 1
50 if objectives is not None:
51 self.objectives = [resolve_objective(o) if isinstance(o, str) else o
52 for o in objectives]
53 else:
54 self.objectives = [SparkleObjective(PerformanceDataFrame.missing_objective)]
55 if init_df:
56 if self.csv_filepath.exists():
57 self.dataframe = pd.read_csv(csv_filepath)
58 has_rows = len(self.dataframe.index) > 0
59 if (PerformanceDataFrame.multi_dim_names[0] not in self.dataframe.columns
60 or not has_rows):
61 # No objective present, force into column
62 if objectives is None:
63 self.dataframe[PerformanceDataFrame.multi_dim_names[0]] =\
64 PerformanceDataFrame.missing_objective
65 else: # Constructor is provided with the objectives
66 self.dataframe[PerformanceDataFrame.multi_dim_names[0]] =\
67 [o.name for o in self.objectives]
68 else:
69 # Objectives are present, determine which ones
70 names = self.dataframe[PerformanceDataFrame.multi_dim_names[0]]
71 self.objectives = [resolve_objective(name) for name in
72 names.unique()]
73 if (PerformanceDataFrame.multi_dim_names[2] not in self.dataframe.columns
74 or not has_rows):
75 # No runs column present, force into column
76 self.n_runs = 1
77 self.dataframe[PerformanceDataFrame.multi_dim_names[2]] = self.n_runs
78 self.run_ids = [self.n_runs]
79 else:
80 # Runs are present, determine run ids
81 run_label = PerformanceDataFrame.multi_dim_names[2]
82 self.run_ids = self.dataframe[run_label].unique().tolist()
83 if PerformanceDataFrame.multi_dim_names[1] not in self.dataframe.columns:
84 # Instances are listed as rows, force into column
85 self.dataframe = self.dataframe.reset_index().rename(
86 columns={"index": PerformanceDataFrame.multi_dim_names[1]})
87 # Now we can cast the columns into multi dim
88 self.dataframe = self.dataframe.set_index(
89 PerformanceDataFrame.multi_dim_names)
90 else:
91 # Initialize empty DataFrame
92 midx = pd.MultiIndex.from_product(
93 [[o.name for o in self.objectives], instances, self.run_ids],
94 names=PerformanceDataFrame.multi_dim_names)
95 self.dataframe = pd.DataFrame(PerformanceDataFrame.missing_value,
96 index=midx,
97 columns=solvers)
98 self.save_csv()
99 # Sort the index to optimize lookup speed
100 self.dataframe = self.dataframe.sort_index()
102 def __repr__(self: PerformanceDataFrame) -> str:
103 """Return string representation of the DataFrame."""
104 return self.dataframe.__repr__()
106 # Properties
108 @property
109 def num_objectives(self: PerformanceDataFrame) -> int:
110 """Retrieve the number of objectives in the DataFrame."""
111 return self.dataframe.index.levels[0].size
113 @property
114 def num_instances(self: PerformanceDataFrame) -> int:
115 """Return the number of instances."""
116 return self.dataframe.index.levels[1].size
118 @property
119 def num_runs(self: PerformanceDataFrame) -> int:
120 """Return the number of runs."""
121 return self.dataframe.index.levels[2].size
123 @property
124 def num_solvers(self: PerformanceDataFrame) -> int:
125 """Return the number of solvers."""
126 return self.dataframe.columns.size
128 @property
129 def multi_objective(self: PerformanceDataFrame) -> bool:
130 """Return whether the dataframe represent MO or not."""
131 return self.num_objectives > 1
133 @property
134 def solvers(self: PerformanceDataFrame) -> list[str]:
135 """Return the solver present as a list of strings."""
136 return self.dataframe.columns.tolist()
138 @property
139 def objective_names(self: PerformanceDataFrame) -> list[str]:
140 """Return the objective names as a list of strings."""
141 if self.num_objectives == 0:
142 return [PerformanceDataFrame.missing_objective]
143 return self.dataframe.index.levels[0].tolist()
145 @property
146 def instances(self: PerformanceDataFrame) -> list[str]:
147 """Return the instances as a Pandas Index object."""
148 return self.dataframe.index.levels[1].tolist()
150 @property
151 def has_missing_values(self: PerformanceDataFrame) -> bool:
152 """Returns True if there are any missing values in the dataframe."""
153 return self.dataframe.isnull().any().any()
155 def verify_objective(self: PerformanceDataFrame,
156 objective: str) -> str:
157 """Method to check whether the specified objective is valid.
159 Users are allowed to index the dataframe without specifying all dimensions.
160 However, when dealing with multiple objectives this is not allowed and this
161 is verified here. If we have only one objective this is returned. Otherwise,
162 if an objective is specified by the user this is returned.
164 Args:
165 objective: The objective given by the user
166 """
167 if objective is None:
168 if self.multi_objective:
169 raise ValueError("Error: MO Data, but objective not specified.")
170 elif self.num_objectives == 1:
171 return self.objective_names[0]
172 else:
173 return PerformanceDataFrame.missing_objective
174 return objective
176 def verify_run_id(self: PerformanceDataFrame,
177 run_id: int) -> int:
178 """Method to check whether run id is valid.
180 Similar to verify_objective but here we check the dimensionality of runs.
182 Args:
183 run_id: the run as specified by the user.
184 """
185 if run_id is None:
186 if self.n_runs > 1:
187 print("Error: Multiple run performance data, but run not specified")
188 sys.exit(-1)
189 else:
190 run_id = self.run_ids[0]
191 return run_id
193 def verify_indexing(self: PerformanceDataFrame,
194 objective: str,
195 run_id: int) -> tuple[str, int]:
196 """Method to check whether data indexing is correct.
198 Users are allowed to use the Performance Dataframe without the second and
199 fourth dimension (Objective and Run respectively) in the case they only
200 have one objective or only do one run. This method adjusts the indexing for
201 those cases accordingly.
203 Args:
204 objective: The given objective name
205 run_id: The given run index
207 Returns:
208 A tuple representing the (possibly adjusted) Objective and Run index.
209 """
210 objective = self.verify_objective(objective)
211 run_id = self.verify_run_id(run_id)
212 return objective, run_id
214 # Getters and Setters
216 def add_solver(self: PerformanceDataFrame,
217 solver_name: str,
218 initial_value: float | list[float] = None) -> None:
219 """Add a new solver to the dataframe. Initializes value to None by default.
221 Args:
222 solver_name: The name of the solver to be added.
223 initial_value: The value assigned for each index of the new solver.
224 If not None, must match the index dimension (n_obj * n_inst * n_runs).
225 """
226 if solver_name in self.dataframe.columns:
227 print(f"WARNING: Tried adding already existing solver {solver_name} to "
228 f"Performance DataFrame: {self.csv_filepath}")
229 return
230 self.dataframe[solver_name] = initial_value
232 def add_instance(self: PerformanceDataFrame,
233 instance_name: str,
234 initial_value: float | list[float] = None) -> None:
235 """Add and instance to the DataFrame."""
236 if self.num_instances == 0 or self.num_solvers == 0:
237 # First instance or no Solvers yet
238 solvers = self.dataframe.columns.to_list()
239 instances = self.dataframe.index.levels[1].to_list() + [instance_name]
240 midx = pd.MultiIndex.from_product(
241 [self.objective_names, instances, self.run_ids],
242 names=PerformanceDataFrame.multi_dim_names)
243 self.dataframe = pd.DataFrame(initial_value, index=midx, columns=solvers)
244 else:
245 if instance_name in self.dataframe.index.levels[1]:
246 print(f"WARNING: Tried adding already existing instance {instance_name} "
247 f"to Performance DataFrame: {self.csv_filepath}")
248 return
249 # Create the missing indices, casting them to the correct sizes
250 levels = [self.dataframe.index.levels[0].tolist() * self.num_runs,
251 [instance_name] * self.num_objectives * self.num_runs,
252 self.dataframe.index.levels[2].tolist() * self.num_objectives]
253 # NOTE: Did this fix Jeroen's bug? .from_arrays instead of direct constructor
254 emidx = pd.MultiIndex.from_arrays(levels,
255 names=PerformanceDataFrame.multi_dim_names)
256 # Create the missing column values
257 edf = pd.DataFrame(PerformanceDataFrame.missing_value,
258 index=emidx,
259 columns=self.dataframe.columns)
260 # Concatenate the original and new dataframe together
261 self.dataframe = pd.concat([self.dataframe, edf])
263 # Can we make this handle a sequence of inputs instead of just 1?
264 def set_value(self: PerformanceDataFrame,
265 value: float,
266 solver: str,
267 instance: str,
268 objective: str = None,
269 run: int = None) -> None:
270 """Setter method to assign a value to the Dataframe.
272 Args:
273 value: Float value to be assigned.
274 solver: The solver that produced the value.
275 instance: The instance that the value was produced on.
276 objective: The objective for which the result was produced.
277 Optional in case of using single objective.
278 run: The run index for which the result was produced.
279 Optional in case of doing single run results.
280 """
281 objective, run = self.verify_indexing(objective, run)
282 self.dataframe.at[(objective, instance, run), solver] = value
284 def remove_solver(self: PerformanceDataFrame, solver_name: str | list[str]) -> None:
285 """Drop one or more solvers from the Dataframe."""
286 self.dataframe.drop(solver_name, axis=1, inplace=True)
288 def remove_instance(self: PerformanceDataFrame, instance_name: str) -> None:
289 """Drop an instance from the Dataframe."""
290 self.dataframe.drop(instance_name, axis=0, level="Instance", inplace=True)
292 def reset_value(self: PerformanceDataFrame,
293 solver: str,
294 instance: str,
295 objective: str = None,
296 run: int = None) -> None:
297 """Reset a value in the dataframe."""
298 self.set_value(PerformanceDataFrame.missing_value,
299 solver, instance, objective, run)
301 # Can we unify get_value and get_values?
302 def get_value(self: PerformanceDataFrame,
303 solver: str,
304 instance: str,
305 objective: str = None,
306 run: int = None) -> float:
307 """Index a value of the DataFrame and return it."""
308 objective, run = self.verify_indexing(objective, run)
309 return float(self.dataframe.loc[(objective, instance, run), solver])
311 def get_values(self: PerformanceDataFrame,
312 solver: str,
313 instance: str = None,
314 objective: str = None,
315 run: int = None) -> list[float]:
316 """Return a list of solver values."""
317 subdf = self.dataframe[solver]
318 if objective is not None:
319 objective = self.verify_objective(objective)
320 subdf = subdf.xs(objective, level=0, drop_level=False)
321 if instance is not None:
322 subdf = subdf.xs(instance, level=1, drop_level=False)
323 if run is not None:
324 run = self.verify_run_id(run)
325 subdf = subdf.xs(run, level=2, drop_level=False)
326 return subdf.to_list()
328 # Modifiers
330 '''def penalise(self: PerformanceDataFrame,
331 threshold: float,
332 penalty: float,
333 objective: str = None,
334 lower_bound: bool = False) -> None:
335 """Penalises the DataFrame values if crossing threshold by specified penalty.
337 Directly updates the DataFrame object held by this class.
339 Args:
340 threshold: The threshold of performances to be met
341 penalty: The values assigned for out of bounds performances
342 objective: The objective that should be penalised.
343 lower_bound: Whether the threshold is a lower_bound. By default,
344 the threshold is treated as an upperbound for performance values.
345 """
346 objective = self.verify_objective(objective)
347 comparison_op = operator.lt if lower_bound else operator.gt
348 self.dataframe[comparison_op(self.dataframe.loc[(objective), :],
349 threshold)] = penalty'''
351 # Calculables
353 def mean(self: PerformanceDataFrame,
354 objective: str = None,
355 solver: str = None,
356 instance: str = None) -> float:
357 """Return the mean value of a slice of the dataframe."""
358 objective = self.verify_objective(objective)
359 subset = self.dataframe.xs(objective, level=0)
360 if solver is not None:
361 subset = subset.xs(solver, axis=1, drop_level=False)
362 if instance is not None:
363 subset = subset.xs(instance, axis=0, drop_level=False)
364 value = subset.astype(float).mean()
365 if isinstance(value, pd.Series):
366 return value.mean()
367 return value
369 # TODO: This method should be refactored or not exist
370 def get_job_list(self: PerformanceDataFrame, rerun: bool = False) \
371 -> list[tuple[str, str]]:
372 """Return a list of performance computation jobs there are to be done.
374 Get a list of tuple[instance, solver] to run from the performance data
375 csv file. If rerun is False (default), get only the tuples that don't have a
376 value in the table, else (True) get all the tuples.
378 Args:
379 rerun: Boolean indicating if we want to rerun all jobs
380 """
381 df = self.dataframe.stack(future_stack=True)
382 if not rerun:
383 df = df[df.isnull()]
384 df.index = df.index.droplevel(["Objective"])
385 return df.index.unique().tolist()
387 # TODO: This method should be refactored or not exist
388 def remaining_jobs(self: PerformanceDataFrame) -> dict[str, list[str]]:
389 """Return a dictionary for empty values per instance and solver combination."""
390 remaining_jobs = {}
391 null_df = self.dataframe.isnull()
392 for row in self.dataframe.index:
393 instance = row[1]
394 for solver in self.dataframe.columns:
395 if null_df.at[row, solver]:
396 if instance not in remaining_jobs:
397 remaining_jobs[instance] = set([solver])
398 else:
399 remaining_jobs[instance].add(solver)
400 return remaining_jobs
402 def best_instance_performance(
403 self: PerformanceDataFrame,
404 objective: str | SparkleObjective = None,
405 run_id: int = None,
406 exclude_solvers: list[str] = None) -> pd.Series:
407 """Return the best performance for each instance in the portfolio.
409 Args:
410 objective: The objective for which we calculate the best performance
411 run_id: The run for which we calculate the best performance. If None,
412 we consider all runs.
413 exclude_solvers: List of solvers to exclude in the calculation.
415 Returns:
416 The best performance for each instance in the portfolio.
417 """
418 objective = self.verify_objective(objective)
419 if isinstance(objective, str):
420 objective = resolve_objective(objective)
421 subdf = self.dataframe.xs(objective.name, level=0)
422 if exclude_solvers is not None:
423 subdf = subdf.drop(exclude_solvers, axis=1)
424 if run_id is not None:
425 run_id = self.verify_run_id(run_id)
426 subdf = subdf.xs(run_id, level=1)
427 else:
428 # Drop the run level
429 subdf = subdf.droplevel(level=1)
430 if objective.minimise:
431 series = subdf.min(axis=1)
432 else:
433 series = subdf.max(axis=1)
434 # Ensure we always return the best for each run
435 series = series.sort_values(ascending=objective.minimise)
436 return series.groupby(series.index).first().astype(float)
438 def best_performance(
439 self: PerformanceDataFrame,
440 exclude_solvers: list[str] = [],
441 objective: str | SparkleObjective = None) -> float:
442 """Return the overall best performance of the portfolio.
444 Args:
445 exclude_solvers: List of solvers to exclude in the calculation.
446 Defaults to none.
447 objective: The objective for which we calculate the best performance
449 Returns:
450 The aggregated best performance of the portfolio over all instances.
451 """
452 objective = self.verify_objective(objective)
453 if isinstance(objective, str):
454 objective = resolve_objective(objective)
455 instance_best = self.best_instance_performance(
456 objective, exclude_solvers=exclude_solvers).to_numpy(dtype=float)
457 return objective.instance_aggregator(instance_best)
459 def schedule_performance(
460 self: PerformanceDataFrame,
461 schedule: dict[str: list[tuple[str, float | None]]],
462 target_solver: str = None,
463 objective: str | SparkleObjective = None) -> float:
464 """Return the performance of a selection schedule on the portfolio.
466 Args:
467 schedule: Compute the best performance according to a selection schedule.
468 A dictionary with instances as keys and a list of tuple consisting of
469 (solver, max_runtime) or solvers if no runtime prediction should be used.
470 target_solver: If not None, store the values in this solver of the DF.
471 objective: The objective for which we calculate the best performance
473 Returns:
474 The performance of the schedule over the instances in the dictionary.
475 """
476 objective = self.verify_objective(objective)
477 if isinstance(objective, str):
478 objective = resolve_objective(objective)
479 select = min if objective.minimise else max
480 performances = [0.0 for _ in range(len(schedule.keys()))]
481 for ix, instance in enumerate(schedule.keys()):
482 for iy, (solver, max_runtime) in enumerate(schedule[instance]):
483 performance = self.get_value(solver, instance, objective.name)
484 if max_runtime is not None: # We are dealing with runtime
485 performances[ix] += performance
486 if performance < max_runtime:
487 break # Solver finished in time
488 else: # Quality, we take the best found performance
489 if iy == 0: # First solver, set initial value
490 performances[ix] = performance
491 continue
492 performances[ix] = select(performances[ix], performance)
493 if target_solver is not None:
494 self.set_value(performances[ix], target_solver, instance, objective.name)
495 return performances
497 def marginal_contribution(
498 self: PerformanceDataFrame,
499 objective: str | SparkleObjective = None,
500 sort: bool = False) -> list[float]:
501 """Return the marginal contribution of the solvers on the instances.
503 Args:
504 objective: The objective for which we calculate the marginal contribution.
505 sort: Whether to sort the results afterwards
506 Returns:
507 The marginal contribution of each solver.
508 """
509 output = []
510 objective = self.verify_objective(objective)
511 if isinstance(objective, str):
512 objective = resolve_objective(objective)
513 best_performance = self.best_performance(objective=objective)
514 for solver in self.solvers:
515 # By calculating the best performance excluding this Solver,
516 # we can determine its relative impact on the portfolio.
517 missing_solver_best = self.best_performance(
518 exclude_solvers=[solver],
519 objective=objective)
520 # Now we need to see how much the portfolio's best performance
521 # decreases without this solver.
522 marginal_contribution = missing_solver_best / best_performance
523 if missing_solver_best == best_performance:
524 # No change, no contribution
525 marginal_contribution = 0.0
526 output.append((solver, marginal_contribution, missing_solver_best))
527 if sort:
528 output.sort(key=lambda x: x[1], reverse=objective.minimise)
529 return output
531 def get_solver_ranking(self: PerformanceDataFrame,
532 objective: str | SparkleObjective = None
533 ) -> list[tuple[str, float]]:
534 """Return a list with solvers ranked by average performance."""
535 objective = self.verify_objective(objective)
536 if isinstance(objective, str):
537 objective = resolve_objective(objective)
538 sub_df = self.dataframe.loc(axis=0)[objective.name, :, :]
539 # Reduce Runs Dimension
540 sub_df = sub_df.droplevel("Run").astype(float)
541 # By using .__name__, pandas converts it to a Pandas Aggregator function
542 sub_df = sub_df.groupby(sub_df.index).agg(func=objective.run_aggregator.__name__)
543 solver_ranking = [(solver, objective.instance_aggregator(
544 sub_df[solver].astype(float))) for solver in self.solvers]
545 # Sort the list by second value (the performance)
546 solver_ranking.sort(key=lambda performance: performance[1],
547 reverse=(not objective.minimise))
548 return solver_ranking
550 def save_csv(self: PerformanceDataFrame, csv_filepath: Path = None) -> None:
551 """Write a CSV to the given path.
553 Args:
554 csv_filepath: String path to the csv file. Defaults to self.csv_filepath.
555 """
556 csv_filepath = self.csv_filepath if csv_filepath is None else csv_filepath
557 self.dataframe.to_csv(csv_filepath)
559 def clean_csv(self: PerformanceDataFrame) -> None:
560 """Set all values in Performance Data to None."""
561 self.dataframe[:] = PerformanceDataFrame.missing_value
562 self.save_csv()
564 def copy(self: PerformanceDataFrame,
565 csv_filepath: Path = None) -> PerformanceDataFrame:
566 """Create a copy of this object.
568 Args:
569 csv_filepath: The new filepath to use for saving the object to.
570 Warning: If the original path is used, it could lead to dataloss!
571 """
572 csv_filepath = self.csv_filepath if csv_filepath is None else csv_filepath
573 pd_copy = PerformanceDataFrame(self.csv_filepath, init_df=False)
574 pd_copy.dataframe = self.dataframe.copy()
575 pd_copy.csv_filepath = csv_filepath
576 return pd_copy
578 def to_autofolio(self: PerformanceDataFrame,
579 objective: SparkleObjective = None,
580 target: Path = None) -> Path:
581 """Port the data to a format acceptable for AutoFolio."""
582 if (objective is None and self.multi_objective or self.n_runs > 1):
583 print(f"ERROR: Currently no porting available for {self.csv_filepath} "
584 "to Autofolio due to multi objective or number of runs.")
585 return
586 autofolio_df = self.dataframe.copy()
587 if objective is not None:
588 autofolio_df = autofolio_df.loc[objective.name]
589 autofolio_df.index = autofolio_df.index.droplevel("Run")
590 else:
591 autofolio_df.index = autofolio_df.index.droplevel(["Objective", "Run"])
592 if target is None:
593 path = self.csv_filepath.parent / f"autofolio_{self.csv_filepath.name}"
594 else:
595 path = target / f"autofolio_{self.csv_filepath.name}"
596 autofolio_df.to_csv(path)
597 return path