Coverage for src / sparkle / structures / performance_dataframe.py: 84%
429 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 15:31 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 15:31 +0000
1"""Module to manage performance data files and common operations on them."""
3from __future__ import annotations
4import ast
5import copy
6from typing import Any
7import itertools
8from pathlib import Path
9import math
10import numpy as np
11import pandas as pd
13from sparkle.types import SparkleObjective, resolve_objective
16class PerformanceDataFrame(pd.DataFrame):
17 """Class to manage performance data and common operations on them."""
19 missing_value = math.nan
21 missing_objective = "UNKNOWN"
22 default_configuration = "Default"
24 index_objective = "Objective"
25 index_instance = "Instance"
26 index_run = "Run"
27 multi_index_names = [index_objective, index_instance, index_run]
29 column_solver = "Solver"
30 column_configuration = "Configuration"
31 column_meta = "Meta"
32 column_value = "Value"
33 column_seed = "Seed"
34 multi_column_names = [column_solver, column_configuration, column_meta]
35 multi_column_value = [column_value, column_seed]
36 multi_column_dtypes = [str, int]
38 def __init__(
39 self: PerformanceDataFrame,
40 csv_filepath: Path,
41 solvers: list[str] = None,
42 configurations: dict[str, dict[str, dict]] = None,
43 objectives: list[str | SparkleObjective] = None,
44 instances: list[str] = None,
45 n_runs: int = 1,
46 ) -> None:
47 """Initialise a PerformanceDataFrame.
49 Consists of:
50 - Columns representing the Solvers
51 - Rows representing the result by multi-index in order of:
52 * Objective (Static, given in constructor or read from file)
53 * Instance
54 * Runs (Static, given in constructor or read from file)
56 Args:
57 csv_filepath: If path exists, load from Path.
58 Otherwise create new and save to this path.
59 solvers: List of solver names to be added into the Dataframe
60 configurations: The configuration keys per solver to add, structured as
61 configurations[solver][config_key] = {"parameter": "value", ..}
62 objectives: List of SparkleObjectives or objective names. By default None,
63 then the objectives will be derived from Sparkle Settings if possible.
64 instances: List of instance names to be added into the Dataframe
65 n_runs: The number of runs to consider per Solver/Objective/Instance comb.
66 """
67 if csv_filepath and csv_filepath.exists(): # Read from file
68 df = pd.read_csv(
69 csv_filepath,
70 header=[0, 1, 2],
71 index_col=[0, 1, 2],
72 on_bad_lines="skip",
73 dtype={
74 PerformanceDataFrame.column_value: str,
75 PerformanceDataFrame.column_seed: int,
76 # PerformanceDataFrame.index_run: int, # NOTE: Preferrably, this would be set, but it is not included in the "on_bad_lines=skip" case for error lines.
77 },
78 comment="$",
79 ) # $ For extra data lines
80 super().__init__(df)
81 self.csv_filepath = csv_filepath
82 # Load configuration mapping
83 with self.csv_filepath.open() as f:
84 configuration_lines = [
85 line.strip().strip("$").split(",", maxsplit=2)
86 for line in f.readlines()
87 if line.startswith("$")
88 ]
89 configurations = {s: {} for s in self.solvers}
90 for solver, config_key, config in configuration_lines[1:]: # Skip header
91 if (
92 solver in configurations
93 ): # Only add configurations to already known solvers, based on the columns
94 configurations[solver][config_key] = ast.literal_eval(
95 config.strip('"')
96 )
97 else: # New PerformanceDataFrame
98 # Initialize empty DataFrame
99 run_ids = list(range(1, n_runs + 1)) # We count runs from 1
100 # We always need objectives to maintain the dimensions
101 if objectives is None:
102 objectives = [PerformanceDataFrame.missing_objective]
103 else:
104 objectives = [str(o) for o in objectives]
105 # We always need an instance to maintain the dimensions
106 if instances is None:
107 instances = [PerformanceDataFrame.missing_value]
108 # We always need a solver to maintain the dimensions
109 if solvers is None:
110 solvers = [PerformanceDataFrame.missing_value]
111 midx = pd.MultiIndex.from_product(
112 [objectives, instances, run_ids],
113 names=PerformanceDataFrame.multi_index_names,
114 )
115 # Create the multi index tuples
116 if configurations is None:
117 configurations = {
118 solver: {PerformanceDataFrame.default_configuration: {}}
119 for solver in solvers
120 }
121 column_tuples = []
122 # We cannot do .from_product here as config ids are per solver
123 for solver in configurations.keys():
124 for config_id in configurations[solver].keys():
125 column_tuples.extend(
126 [
127 (solver, config_id, PerformanceDataFrame.column_seed),
128 (solver, config_id, PerformanceDataFrame.column_value),
129 ]
130 )
131 mcolumns = pd.MultiIndex.from_tuples(
132 column_tuples,
133 names=[
134 PerformanceDataFrame.column_solver,
135 PerformanceDataFrame.column_configuration,
136 PerformanceDataFrame.column_meta,
137 ],
138 )
139 # Set dtype object to avoid inferring float for categorical objectives
140 super().__init__(
141 PerformanceDataFrame.missing_value,
142 index=midx,
143 columns=mcolumns,
144 dtype="object",
145 )
146 self.csv_filepath = csv_filepath
148 # Store configuration in global attributes dictionary, see Pandas Docs
149 self.attrs = configurations
151 if self.index.duplicated().any(): # Drop all duplicates except for last
152 # NOTE: This is rather convoluted (but fast!) due to the fact we need to do it inplace to maintain our type (PerformanceDataFrame)
153 # Make the index levels into columns (in-place)
154 self.reset_index(inplace=True)
155 # The first nlevels columns are the index columns created by reset_index, drop duplicates in those columns
156 idx_cols = self.columns[
157 : len(PerformanceDataFrame.multi_index_names)
158 ].tolist()
159 self.drop_duplicates(
160 subset=idx_cols, keep="last", inplace=True
161 ) # Drop duplicates
162 self.set_index(idx_cols, inplace=True) # Restore the MultiIndex (in-place)
163 self.index.rename(
164 self.multi_index_names, inplace=True
165 ) # Restore level names
167 # Sort the index to optimize lookup speed
168 self.sort_index(axis=0, inplace=True)
169 self.sort_index(axis=1, inplace=True)
171 if csv_filepath and not self.csv_filepath.exists(): # New Performance DataFrame
172 self.save_csv()
174 # Properties
176 @property
177 def num_objectives(self: PerformanceDataFrame) -> int:
178 """Retrieve the number of objectives in the DataFrame."""
179 return self.index.get_level_values(0).unique().size
181 @property
182 def num_instances(self: PerformanceDataFrame) -> int:
183 """Return the number of instances."""
184 return self.index.get_level_values(1).unique().size
186 @property
187 def num_runs(self: PerformanceDataFrame) -> int:
188 """Return the maximum number of runs of each instance."""
189 return self.index.get_level_values(2).unique().size
191 @property
192 def num_solvers(self: PerformanceDataFrame) -> int:
193 """Return the number of solvers."""
194 return self.columns.get_level_values(0).unique().size
196 @property
197 def num_solver_configurations(self: PerformanceDataFrame) -> int:
198 """Return the number of solver configurations."""
199 return int(
200 self.columns.get_level_values( # Config has a seed & value
201 PerformanceDataFrame.column_configuration
202 ).size
203 / 2
204 )
206 @property
207 def multi_objective(self: PerformanceDataFrame) -> bool:
208 """Return whether the dataframe represent MO or not."""
209 return self.num_objectives > 1
211 @property
212 def solvers(self: PerformanceDataFrame) -> list[str]:
213 """Return the solver present as a list of strings."""
214 # Do not return the nan solver as its not an actual solver
215 return (
216 self.columns.get_level_values(PerformanceDataFrame.column_solver)
217 .dropna()
218 .unique()
219 .to_list()
220 )
222 @property
223 def configuration_ids(self: PerformanceDataFrame) -> list[str]:
224 """Return the list of configuration keys."""
225 return (
226 self.columns.get_level_values(PerformanceDataFrame.column_configuration)
227 .unique()
228 .to_list()
229 )
231 @property
232 def configurations(self: PerformanceDataFrame) -> dict[str, dict[str, dict]]:
233 """Return a dictionary (copy) containing the configurations for each solver."""
234 return copy.deepcopy(self.attrs) # Deepcopy to avoid mutation of attribute
236 @property
237 def objective_names(self: PerformanceDataFrame) -> list[str]:
238 """Return the objective names as a list of strings."""
239 return self.index.get_level_values(0).unique().to_list()
241 @property
242 def objectives(self: PerformanceDataFrame) -> list[SparkleObjective]:
243 """Return the objectives as a list of SparkleObjectives."""
244 return [resolve_objective(o) for o in self.objective_names]
246 @property
247 def instances(self: PerformanceDataFrame) -> list[str]:
248 """Return the instances as a Pandas Index object."""
249 return self.index.get_level_values(1).unique().to_list()
251 @property
252 def run_ids(self: PerformanceDataFrame) -> list[int]:
253 """Return the run ids as a list of integers."""
254 return self.index.get_level_values(2).unique().to_list()
256 @property
257 def has_missing_values(self: PerformanceDataFrame) -> bool:
258 """Returns True if there are any missing values in the dataframe."""
259 return (
260 self.drop(
261 PerformanceDataFrame.column_seed,
262 level=PerformanceDataFrame.column_meta,
263 axis=1,
264 )
265 .isnull()
266 .any()
267 .any()
268 )
270 def is_missing(
271 self: PerformanceDataFrame,
272 solver: str,
273 instance: str,
274 ) -> int:
275 """Checks if a solver/instance is missing values."""
276 return (
277 self.xs(solver, axis=1)
278 .xs(instance, axis=0, level=PerformanceDataFrame.index_instance)
279 .drop(
280 PerformanceDataFrame.column_seed,
281 level=PerformanceDataFrame.column_meta,
282 axis=1,
283 )
284 .isnull()
285 .any()
286 .any()
287 )
289 def verify_objective(self: PerformanceDataFrame, objective: str) -> str:
290 """Method to check whether the specified objective is valid.
292 Users are allowed to index the dataframe without specifying all dimensions.
293 However, when dealing with multiple objectives this is not allowed and this
294 is verified here. If we have only one objective this is returned. Otherwise,
295 if an objective is specified by the user this is returned.
297 Args:
298 objective: The objective given by the user
299 """
300 if objective is None:
301 if self.multi_objective:
302 raise ValueError("Error: MO Data, but objective not specified.")
303 elif self.num_objectives == 1:
304 return self.objective_names[0]
305 else:
306 return PerformanceDataFrame.missing_objective
307 return objective
309 def verify_run_id(self: PerformanceDataFrame, run_id: int) -> int:
310 """Method to check whether run id is valid.
312 Similar to verify_objective but here we check the dimensionality of runs.
314 Args:
315 run_id: the run as specified by the user.
316 """
317 if run_id is None:
318 if self.num_runs > 1:
319 raise ValueError(
320 "Error: Multiple run performance data, but run not specified"
321 )
322 else:
323 run_id = self.run_ids[0]
324 return run_id
326 def verify_indexing(
327 self: PerformanceDataFrame, objective: str, run_id: int
328 ) -> tuple[str, int]:
329 """Method to check whether data indexing is correct.
331 Users are allowed to use the Performance Dataframe without the second and
332 fourth dimension (Objective and Run respectively) in the case they only
333 have one objective or only do one run. This method adjusts the indexing for
334 those cases accordingly.
336 Args:
337 objective: The given objective name
338 run_id: The given run index
340 Returns:
341 A tuple representing the (possibly adjusted) Objective and Run index.
342 """
343 objective = self.verify_objective(objective)
344 run_id = self.verify_run_id(run_id)
345 return objective, run_id
347 # Getters and Setters
349 def add_solver(
350 self: PerformanceDataFrame,
351 solver_name: str,
352 configurations: list[(str, dict)] = None,
353 initial_value: float | list[str | float] = None,
354 ) -> None:
355 """Add a new solver to the dataframe. Initializes value to None by default.
357 Args:
358 solver_name: The name of the solver to be added.
359 configurations: A list of configuration keys for the solver.
360 initial_value: The value assigned for each index of the new solver.
361 If not None, must match the index dimension (n_obj * n_inst * n_runs).
362 """
363 if solver_name in self.solvers:
364 print(
365 f"WARNING: Tried adding already existing solver {solver_name} to "
366 f"Performance DataFrame: {self.csv_filepath}"
367 )
368 return
369 if not isinstance(initial_value, list): # Single value
370 initial_value = [[initial_value, initial_value]]
371 if configurations is None:
372 configurations = [(PerformanceDataFrame.default_configuration, {})]
373 self.attrs[solver_name] = {}
374 for (config_key, config), (value, seed) in itertools.product(
375 configurations, initial_value
376 ):
377 self[(solver_name, config_key, PerformanceDataFrame.column_seed)] = seed
378 self[(solver_name, config_key, PerformanceDataFrame.column_value)] = value
379 self.attrs[solver_name][config_key] = config
380 if self.num_solvers == 2: # Remove nan solver
381 for solver in self.solvers:
382 if str(solver) == str(PerformanceDataFrame.missing_value):
383 self.remove_solver(solver)
384 break
386 def add_configuration(
387 self: PerformanceDataFrame,
388 solver: str,
389 configuration_id: str | list[str],
390 configuration: dict[str, Any] | list[dict[str, Any]] = None,
391 ) -> None:
392 """Add new configurations for a solver to the dataframe.
394 If the key already exists, update the value.
396 Args:
397 solver: The name of the solver to be added.
398 configuration_id: The name of the configuration to be added.
399 configuration: The configuration to be added.
400 """
401 if not isinstance(configuration_id, list):
402 configuration_id = [configuration_id]
403 if not isinstance(configuration, list):
404 configuration = [configuration]
405 for config_id, config in zip(configuration_id, configuration):
406 if config_id not in self.get_configurations(solver):
407 self[(solver, config_id, PerformanceDataFrame.column_value)] = None
408 self[(solver, config_id, PerformanceDataFrame.column_seed)] = None
409 self.attrs[solver][config_id] = config
410 # Sort the index to optimize lookup speed
411 self.sort_index(axis=1, inplace=True)
413 def add_objective(
414 self: PerformanceDataFrame, objective_name: str, initial_value: float = None
415 ) -> None:
416 """Add an objective to the DataFrame."""
417 initial_value = initial_value or self.missing_value
418 if objective_name in self.objective_names:
419 print(
420 f"WARNING: Tried adding already existing objective {objective_name} "
421 f"to Performance DataFrame: {self.csv_filepath}"
422 )
423 return
424 for instance, run in itertools.product(self.instances, self.run_ids):
425 self.loc[(objective_name, instance, run)] = initial_value
426 self.sort_index(axis=0, inplace=True)
428 def add_instance(
429 self: PerformanceDataFrame,
430 instance_name: str,
431 initial_values: Any | list[Any] = None,
432 ) -> None:
433 """Add and instance to the DataFrame.
435 Args:
436 instance_name: The name of the instance to be added.
437 initial_values: The values assigned for each index of the new instance.
438 If list, must match the column dimension (Value, Seed, Configuration).
439 """
440 initial_values = initial_values or self.missing_value
441 if not isinstance(initial_values, list):
442 initial_values = (
443 [initial_values]
444 * 2 # Value and Seed per target column
445 * self.num_solver_configurations
446 )
447 elif len(initial_values) == len(PerformanceDataFrame.multi_column_names):
448 initial_values = initial_values * self.num_solvers
450 if instance_name in self.instances:
451 print(
452 f"WARNING: Tried adding already existing instance {instance_name} "
453 f"to Performance DataFrame: {self.csv_filepath}"
454 )
455 return
456 # Add rows for all combinations
457 for objective, run in itertools.product(self.objective_names, self.run_ids):
458 self.loc[(objective, instance_name, run)] = initial_values
459 if self.num_instances == 2: # Remove nan instance
460 for instance in self.instances:
461 if not isinstance(instance, str) and math.isnan(instance):
462 self.remove_instances(instance)
463 break
464 # Sort the index to optimize lookup speed
465 self.sort_index(axis=0, inplace=True)
467 def add_runs(
468 self: PerformanceDataFrame,
469 num_extra_runs: int,
470 instance_names: list[str] = None,
471 initial_values: Any | list[Any] = None,
472 ) -> None:
473 """Add runs to the DataFrame.
475 Args:
476 num_extra_runs: The number of runs to be added.
477 instance_names: The instances for which runs are to be added.
478 By default None, which means runs are added to all instances.
479 initial_values: The initial value for each objective of each new run.
480 If a list, needs to have a value for Value, Seed and Configuration.
481 """
482 initial_values = initial_values or self.missing_value
483 if not isinstance(initial_values, list):
484 initial_values = [initial_values] * self.num_solvers * 2 # Value and Seed
485 elif len(initial_values) == 2: # Value and seed provided
486 initial_values = initial_values * self.num_solvers
487 instance_names = self.instances if instance_names is None else instance_names
488 for objective, instance in itertools.product(
489 self.objective_names, instance_names
490 ):
491 index_runs_start = len(self.loc[(objective, instance)]) + 1
492 for run in range(index_runs_start, index_runs_start + num_extra_runs):
493 self.loc[(objective, instance, run)] = initial_values
494 # Sort the index to optimize lookup speed
495 # NOTE: It would be better to do this at the end, but that results in
496 # PerformanceWarning: indexing past lexsort depth may impact performance.
497 self.sort_index(axis=0, inplace=True)
499 def get_configurations(self: PerformanceDataFrame, solver_name: str) -> list[str]:
500 """Return the list of configuration keys for a solver."""
501 return list(
502 self[solver_name]
503 .columns.get_level_values(PerformanceDataFrame.column_configuration)
504 .unique()
505 )
507 def get_full_configuration(
508 self: PerformanceDataFrame, solver: str, configuration_id: str | list[str]
509 ) -> dict | list[dict]:
510 """Return the actual configuration associated with the configuration key."""
511 if isinstance(configuration_id, str):
512 return self.attrs[solver][configuration_id]
513 return [self.attrs[solver][cid] for cid in configuration_id]
515 def remove_solver(self: PerformanceDataFrame, solvers: str | list[str]) -> None:
516 """Drop one or more solvers from the Dataframe."""
517 if not solvers: # Bugfix for when an empty list is passed to avoid nan adding
518 return
519 # To make sure objectives / runs are saved when no solvers are present
520 solvers = [solvers] if isinstance(solvers, str) else solvers
521 if self.num_solvers == 1: # This would preferrably be done after removing
522 for field in PerformanceDataFrame.multi_column_value:
523 self[
524 PerformanceDataFrame.missing_value,
525 PerformanceDataFrame.missing_value,
526 field,
527 ] = PerformanceDataFrame.missing_value
528 self.drop(columns=solvers, level=0, axis=1, inplace=True)
529 for solver in solvers:
530 del self.attrs[solver]
532 def remove_configuration(
533 self: PerformanceDataFrame, solver: str, configuration: str | list[str]
534 ) -> None:
535 """Drop one or more configurations from the Dataframe."""
536 if isinstance(configuration, str):
537 configuration = [configuration]
538 for config in configuration:
539 self.drop((solver, config), axis=1, inplace=True)
540 del self.attrs[solver][config]
541 # Sort the index to optimize lookup speed
542 self.sort_index(axis=1, inplace=True)
544 def remove_objective(
545 self: PerformanceDataFrame, objectives: str | list[str]
546 ) -> None:
547 """Remove objective from the Dataframe."""
548 if len(self.objectives) < 2:
549 raise Exception("Cannot remove last objective from PerformanceDataFrame")
550 self.drop(
551 objectives,
552 axis=0,
553 level=PerformanceDataFrame.index_objective,
554 inplace=True,
555 )
557 def remove_instances(self: PerformanceDataFrame, instances: str | list[str]) -> None:
558 """Drop instances from the Dataframe."""
559 # To make sure objectives / runs are saved when no instances are present
560 num_instances = len(instances) if isinstance(instances, list) else 1
561 if self.num_instances - num_instances == 0:
562 for objective, run in itertools.product(self.objective_names, self.run_ids):
563 self.loc[(objective, PerformanceDataFrame.missing_value, run)] = (
564 PerformanceDataFrame.missing_value
565 )
566 self.drop(
567 instances, axis=0, level=PerformanceDataFrame.index_instance, inplace=True
568 )
569 # Sort the index to optimize lookup speed
570 self.sort_index(axis=0, inplace=True)
572 def remove_runs(
573 self: PerformanceDataFrame,
574 runs: int | list[int],
575 instance_names: list[str] = None,
576 ) -> None:
577 """Drop one or more runs from the Dataframe.
579 Args:
580 runs: The run indices to be removed. If its an int,
581 the last n runs are removed. NOTE: If each instance has a different
582 number of runs, the amount of removed runs is not uniform.
583 instance_names: The instances for which runs are to be removed.
584 By default None, which means runs are removed from all instances.
585 """
586 instance_names = self.instances if instance_names is None else instance_names
587 runs = (
588 list(range((self.num_runs + 1) - runs, (self.num_runs + 1)))
589 if isinstance(runs, int)
590 else runs
591 )
592 self.drop(runs, axis=0, level=PerformanceDataFrame.index_run, inplace=True)
593 # Sort the index to optimize lookup speed
594 self.sort_index(axis=0, inplace=True)
596 def remove_empty_runs(self: PerformanceDataFrame) -> None:
597 """Remove runs that contain no data, except for the first."""
598 for row_index in self.index:
599 if row_index[2] == 1: # First run, never delete
600 continue
601 if self.loc[row_index].isna().all():
602 self.drop(row_index, inplace=True)
604 def filter_objective(self: PerformanceDataFrame, objective: str | list[str]) -> None:
605 """Filter the Dataframe to a subset of objectives."""
606 if isinstance(objective, str):
607 objective = [objective]
608 self.drop(
609 list(set(self.objective_names) - set(objective)),
610 axis=0,
611 level=PerformanceDataFrame.index_objective,
612 inplace=True,
613 )
615 def reset_value(
616 self: PerformanceDataFrame,
617 solver: str,
618 instance: str,
619 objective: str = None,
620 run: int = None,
621 ) -> None:
622 """Reset a value in the dataframe."""
623 self.set_value(
624 PerformanceDataFrame.missing_value, solver, instance, objective, run
625 )
627 def set_value(
628 self: PerformanceDataFrame,
629 value: float | str | list[float | str] | list[list[float | str]],
630 solver: str | list[str],
631 instance: str | list[str],
632 configuration: str = None,
633 objective: str | list[str] = None,
634 run: int | list[int] = None,
635 solver_fields: list[str] = ["Value"],
636 append_write_csv: bool = False,
637 ) -> None:
638 """Setter method to assign a value to the Dataframe.
640 Allows for setting the same value to multiple indices.
642 Args:
643 value: Value(s) to be assigned. If value is a list, first dimension is
644 the solver field, second dimension is if multiple different values are
645 to be assigned. Must be the same shape as target.
646 solver: The solver(s) for which the value should be set.
647 If solver is a list, multiple solvers are set. If None, all
648 solvers are set.
649 instance: The instance(s) for which the value should be set.
650 If instance is a list, multiple instances are set. If None, all
651 instances are set.
652 configuration: The configuration(s) for which the value should be set.
653 When left None, set for all configurations
654 objective: The objectives for which the value should be set.
655 When left None, set for all objectives
656 run: The run index for which the value should be set.
657 If left None, set for all runs.
658 solver_fields: The level to which each value should be assinged.
659 Defaults to ["Value"].
660 append_write_csv: For concurrent writing to the PerformanceDataFrame.
661 If True, the value is directly appended to the CSV file.
662 This will create duplicate entries in the file, but these are combined
663 when loading the file.
664 """
665 # Convert indices to slices for None values
666 solver = slice(solver) if solver is None else solver
667 configuration = slice(configuration) if configuration is None else configuration
668 instance = slice(instance) if instance is None else instance
669 objective = slice(objective) if objective is None else objective
670 run = slice(run) if run is None else run
671 # Convert column indices to slices for setting multiple columns
672 value = [value] if not isinstance(value, list) else value
673 # NOTE: We currently forloop levels here, as it allows us to set the same
674 # sequence of values to the indices
675 for item, level in zip(value, solver_fields):
676 self.loc[(objective, instance, run), (solver, configuration, level)] = item
678 if append_write_csv:
679 writeable = self.loc[(objective, instance, run), :]
680 if isinstance(writeable, pd.Series): # Single row, convert to pd.DataFrame
681 writeable = self.loc[[(objective, instance, run)], :]
682 # Append the new rows to the dataframe csv file
683 import os
685 csv_string = writeable.to_csv(header=False) # Convert to the csv lines
686 for line in csv_string.splitlines():
687 fd = os.open(f"{self.csv_filepath}", os.O_WRONLY | os.O_APPEND)
688 os.write(fd, f"{line}\n".encode("utf-8")) # Encode to create buffer
689 # Open and close for each line to minimise possibilities of conflict
690 os.close(fd)
692 def get_value(
693 self: PerformanceDataFrame,
694 solver: str | list[str] = None,
695 instance: str | list[str] = None,
696 configuration: str = None,
697 objective: str = None,
698 run: int = None,
699 solver_fields: list[str] = ["Value"],
700 ) -> float | str | list[Any]:
701 """Index a value of the DataFrame and return it."""
702 # Convert indices to slices for None values
703 solver = slice(solver) if solver is None else solver
704 configuration = slice(configuration) if configuration is None else configuration
705 instance = slice(instance) if instance is None else instance
706 objective = slice(objective) if objective is None else objective
707 solver_fields = slice(solver_fields) if solver_fields is None else solver_fields
708 run = slice(run) if run is None else run
709 target = self.loc[
710 (objective, instance, run), (solver, configuration, solver_fields)
711 ].values
712 # Reduce dimensions when relevant
713 if len(target) > 0 and isinstance(target[0], np.ndarray) and len(target[0]) == 1:
714 target = target.flatten()
715 target = target.tolist()
716 if len(target) == 1:
717 return target[0]
718 return target
720 def get_instance_num_runs(self: PerformanceDataFrame, instance: str) -> int:
721 """Return the number of runs for an instance."""
722 # We assume each objective has the same index for Instance/Runs
723 return len(self.loc[(self.objective_names[0], instance)].index)
725 # Calculables
727 def mean(
728 self: PerformanceDataFrame,
729 objective: str = None,
730 solver: str = None,
731 instance: str = None,
732 ) -> float:
733 """Return the mean value of a slice of the dataframe."""
734 objective = self.verify_objective(objective)
735 subset = self.xs(objective, level=0)
736 if solver is not None:
737 subset = subset.xs(solver, axis=1, drop_level=False)
738 if instance is not None:
739 subset = subset.xs(instance, axis=0, drop_level=False)
740 value = subset.astype(float).mean()
741 if isinstance(value, pd.Series):
742 return value.mean()
743 return value
745 def get_job_list(
746 self: PerformanceDataFrame, rerun: bool = False
747 ) -> list[tuple[str, str]]:
748 """Return a list of performance computation jobs there are to be done.
750 Get a list of tuple[instance, solver] to run from the performance data.
751 If rerun is False (default), get only the tuples that don't have a
752 value, else (True) get all the tuples.
754 Args:
755 rerun: Boolean indicating if we want to rerun all jobs
757 Returns:
758 A tuple of (solver, config, instance, run) combinations
759 """
760 # Drop the seed as we are looking for nan values, not seeds
761 df = self.drop(
762 PerformanceDataFrame.column_seed,
763 axis=1,
764 level=PerformanceDataFrame.column_meta,
765 )
766 df = df.droplevel(PerformanceDataFrame.column_meta, axis=1)
767 if rerun: # Return all combinations
768 # Drop objective, not needed
769 df = df.droplevel(PerformanceDataFrame.index_objective, axis=0)
770 result = [
771 tuple(column) + tuple(index)
772 for column, index in itertools.product(df.columns, df.index)
773 ]
774 else:
775 result = []
776 for (solver, config), (objective, instance, run) in itertools.product(
777 df.columns, df.index
778 ):
779 value = df.loc[(objective, instance, run), (solver, config)]
780 if value is None or (
781 isinstance(value, (int, float)) and math.isnan(value)
782 ):
783 # NOTE: Force Run to be int, as it can be float on accident
784 if math.isnan(run):
785 continue
786 run = int(run)
787 result.append(tuple([solver, config, instance, run]))
788 # Filter duplicates while keeping the order conistent
789 return list(dict.fromkeys(result))
791 def configuration_performance(
792 self: PerformanceDataFrame,
793 solver: str,
794 configuration: str | list[str] = None,
795 objective: str | SparkleObjective = None,
796 instances: list[str] = None,
797 per_instance: bool = False,
798 ) -> tuple[str, float]:
799 """Return the (best) configuration performance for objective over the instances.
801 Args:
802 solver: The solver for which we determine evaluate the configuration
803 configuration: The configuration (id) to evaluate
804 objective: The objective for which we calculate find the best value
805 instances: The instances which should be selected for the evaluation
806 per_instance: Whether to return the performance per instance,
807 or aggregated.
809 Returns:
810 The (best) configuration id and its aggregated performance.
811 """
812 objective = self.verify_objective(objective)
813 if isinstance(objective, str):
814 objective = resolve_objective(objective)
815 # Filter objective
816 subdf = self.xs(objective.name, level=0, drop_level=True)
817 # Filter solver
818 subdf = subdf.xs(solver, axis=1, drop_level=True)
819 # Drop the seed, then drop meta level as it is no longer needed
820 subdf = subdf.drop(
821 PerformanceDataFrame.column_seed,
822 axis=1,
823 level=PerformanceDataFrame.column_meta,
824 )
825 subdf = subdf.droplevel(PerformanceDataFrame.column_meta, axis=1)
826 # Ensure the objective is numeric
827 subdf = subdf.astype(float)
829 if instances: # Filter instances
830 subdf = subdf.loc[instances, :]
831 if configuration: # Filter configuration
832 if not isinstance(configuration, list):
833 configuration = [configuration]
834 subdf = subdf.filter(configuration, axis=1)
835 # Aggregate the runs
836 subdf = subdf.groupby(PerformanceDataFrame.index_instance).agg(
837 func=objective.run_aggregator.__name__
838 )
839 # Aggregate the instances
840 sub_series = subdf.agg(func=objective.instance_aggregator.__name__)
841 sub_series = sub_series.dropna()
842 if sub_series.empty: # If all values are NaN, raise an error
843 raise ValueError(
844 f"No valid performance measurements for solver '{solver}' (Configuration: '{configuration}') "
845 f"and objective '{objective.name}'."
846 )
847 # Select the best configuration
848 best_conf = sub_series.idxmin() if objective.minimise else sub_series.idxmax()
849 if per_instance: # Return a list of instance results
850 return best_conf, subdf[best_conf].to_list()
851 return best_conf, sub_series[best_conf]
853 def best_configuration(
854 self: PerformanceDataFrame,
855 solver: str,
856 objective: SparkleObjective = None,
857 instances: list[str] = None,
858 ) -> tuple[str, float]:
859 """Return the best configuration for the given objective over the instances.
861 Args:
862 solver: The solver for which we determine the best configuration
863 objective: The objective for which we calculate the best configuration
864 instances: The instances which should be selected for the evaluation
866 Returns:
867 The best configuration id and its aggregated performance.
868 """
869 return self.configuration_performance(solver, None, objective, instances)
871 def best_instance_performance(
872 self: PerformanceDataFrame,
873 objective: str | SparkleObjective = None,
874 instances: list[str] = None,
875 run_id: int = None,
876 exclude_solvers: list[(str, str)] = None,
877 ) -> pd.Series:
878 """Return the best performance for each instance in the portfolio.
880 Args:
881 objective: The objective for which we calculate the best performance
882 instances: The instances which should be selected for the evaluation
883 run_id: The run for which we calculate the best performance. If None,
884 we consider all runs.
885 exclude_solvers: List of (solver, config_id) to exclude in the calculation.
887 Returns:
888 The best performance for each instance in the portfolio.
889 """
890 objective = self.verify_objective(objective)
891 if isinstance(objective, str):
892 objective = resolve_objective(objective)
893 subdf = self.drop( # Drop Seed, not needed
894 [PerformanceDataFrame.column_seed],
895 axis=1,
896 level=PerformanceDataFrame.column_meta,
897 )
898 subdf = subdf.xs(objective.name, level=0) # Drop objective
899 if exclude_solvers is not None:
900 subdf = subdf.drop(exclude_solvers, axis=1)
901 if instances is not None:
902 subdf = subdf.loc[instances, :]
903 if run_id is not None:
904 run_id = self.verify_run_id(run_id)
905 subdf = subdf.xs(run_id, level=1)
906 else:
907 # Drop the run level
908 subdf = subdf.droplevel(level=1)
909 # Ensure the objective is numeric
910 subdf = subdf.astype(float)
911 series = subdf.min(axis=1) if objective.minimise else subdf.max(axis=1)
912 # Ensure we always return the best for each run
913 series = series.sort_values(ascending=objective.minimise)
914 return series.groupby(series.index).first().astype(float)
916 def best_performance(
917 self: PerformanceDataFrame,
918 exclude_solvers: list[(str, str)] = [],
919 instances: list[str] = None,
920 objective: str | SparkleObjective = None,
921 ) -> float:
922 """Return the overall best performance of the portfolio.
924 Args:
925 exclude_solvers: List of (solver, config_id) to exclude in the calculation.
926 Defaults to none.
927 instances: The instances which should be selected for the evaluation
928 If None, use all instances.
929 objective: The objective for which we calculate the best performance
931 Returns:
932 The aggregated best performance of the portfolio over all instances.
933 """
934 objective = self.verify_objective(objective)
935 if isinstance(objective, str):
936 objective = resolve_objective(objective)
937 instance_best = self.best_instance_performance(
938 objective, instances=instances, exclude_solvers=exclude_solvers
939 ).to_numpy(dtype=float)
940 return objective.instance_aggregator(instance_best)
942 def schedule_performance(
943 self: PerformanceDataFrame,
944 schedule: dict[str : dict[str : (str, str, int)]],
945 target_solver: str | tuple[str, str] = None,
946 objective: str | SparkleObjective = None,
947 ) -> float:
948 """Return the performance of a selection schedule on the portfolio.
950 Args:
951 schedule: Compute the best performance according to a selection schedule.
952 A schedule is a dictionary of instances, with a schedule per instance,
953 consisting of a triple of solver, config_id and maximum runtime.
954 target_solver: If not None, store the found values in this solver of the DF.
955 objective: The objective for which we calculate the best performance
957 Returns:
958 The performance of the schedule over the instances in the dictionary.
959 """
960 objective = self.verify_objective(objective)
961 if isinstance(objective, str):
962 objective = resolve_objective(objective)
963 select = min if objective.minimise else max
964 performances = [0.0] * len(schedule.keys())
965 if not isinstance(target_solver, tuple):
966 target_conf = PerformanceDataFrame.default_configuration
967 else:
968 target_solver, target_conf = target_solver
969 if target_solver and target_solver not in self.solvers:
970 self.add_solver(target_solver)
971 for ix, instance in enumerate(schedule.keys()):
972 for iy, (solver, config, max_runtime) in enumerate(schedule[instance]):
973 performance = float(
974 self.get_value(solver, instance, config, objective.name)
975 )
976 if max_runtime is not None: # We are dealing with runtime
977 performances[ix] += performance
978 if performance < max_runtime:
979 break # Solver finished in time
980 else: # Quality, we take the best found performance
981 if iy == 0: # First solver, set initial value
982 performances[ix] = performance
983 continue
984 performances[ix] = select(performances[ix], performance)
985 if target_solver is not None:
986 self.set_value(
987 performances[ix],
988 target_solver,
989 instance,
990 target_conf,
991 objective.name,
992 )
993 return performances
995 def marginal_contribution(
996 self: PerformanceDataFrame,
997 objective: str | SparkleObjective = None,
998 instances: list[str] = None,
999 sort: bool = False,
1000 ) -> list[float]:
1001 """Return the marginal contribution of the solver configuration on the instances.
1003 Args:
1004 objective: The objective for which we calculate the marginal contribution.
1005 instances: The instances which should be selected for the evaluation
1006 sort: Whether to sort the results afterwards
1007 Returns:
1008 The marginal contribution of each solver (configuration) as:
1009 [(solver, config_id, marginal_contribution, portfolio_best_performance_without_solver)]
1010 """
1011 output = []
1012 objective = self.verify_objective(objective)
1013 if isinstance(objective, str):
1014 objective = resolve_objective(objective)
1015 best_performance = self.best_performance(
1016 objective=objective, instances=instances
1017 )
1018 for solver in self.solvers:
1019 for config_id in self.get_configurations(solver):
1020 # By calculating the best performance excluding this Solver,
1021 # we can determine its relative impact on the portfolio.
1022 missing_solver_config_best = self.best_performance(
1023 exclude_solvers=[(solver, config_id)],
1024 instances=instances,
1025 objective=objective,
1026 )
1027 # Now we need to see how much the portfolio's best performance
1028 # decreases without this solver.
1029 marginal_contribution = missing_solver_config_best / best_performance
1030 if missing_solver_config_best == best_performance:
1031 # No change, no contribution
1032 marginal_contribution = 0.0
1033 output.append(
1034 (
1035 solver,
1036 config_id,
1037 marginal_contribution,
1038 missing_solver_config_best,
1039 )
1040 )
1041 if sort:
1042 output.sort(key=lambda x: x[2], reverse=objective.minimise)
1043 return output
1045 def get_solver_ranking(
1046 self: PerformanceDataFrame,
1047 objective: str | SparkleObjective = None,
1048 instances: list[str] = None,
1049 ) -> list[tuple[str, dict, float]]:
1050 """Return a list with solvers ranked by average performance."""
1051 objective = self.verify_objective(objective)
1052 if isinstance(objective, str):
1053 objective = resolve_objective(objective)
1054 # Drop Seed
1055 sub_df = self.drop(
1056 [PerformanceDataFrame.column_seed],
1057 axis=1,
1058 level=PerformanceDataFrame.column_meta,
1059 )
1060 # Reduce objective
1061 sub_df: pd.DataFrame = sub_df.loc(axis=0)[objective.name, :, :]
1062 # Drop Objective, Meta multi index
1063 sub_df = sub_df.droplevel(PerformanceDataFrame.index_objective).droplevel(
1064 PerformanceDataFrame.column_meta, axis=1
1065 )
1066 if instances is not None: # Select instances
1067 sub_df = sub_df.loc(axis=0)[instances,]
1068 # Ensure data is numeric
1069 sub_df = sub_df.astype(float)
1070 # Aggregate runs
1071 sub_df = sub_df.groupby(PerformanceDataFrame.index_instance).agg(
1072 func=objective.run_aggregator.__name__
1073 )
1074 # Aggregate instances
1075 sub_series = sub_df.aggregate(func=objective.instance_aggregator.__name__)
1076 # Sort by objective
1077 sub_series.sort_values(ascending=objective.minimise, inplace=True)
1078 return [(index[0], index[1], sub_series[index]) for index in sub_series.index]
1080 def save_csv(self: PerformanceDataFrame, csv_filepath: Path = None) -> None:
1081 """Write a CSV to the given path.
1083 Args:
1084 csv_filepath: String path to the csv file. Defaults to self.csv_filepath.
1085 """
1086 csv_filepath = self.csv_filepath if csv_filepath is None else csv_filepath
1087 self.to_csv(csv_filepath)
1088 # Append the configurations
1089 with csv_filepath.open("a") as fout:
1090 fout.write("\n$Solver,configuration_id,Configuration\n")
1091 for solver in self.solvers:
1092 for config_id in self.attrs[solver]:
1093 configuration = self.attrs[solver][config_id]
1094 fout.write(f"${solver},{config_id},{str(configuration)}\n")
1096 def clone(
1097 self: PerformanceDataFrame, csv_filepath: Path = None
1098 ) -> PerformanceDataFrame:
1099 """Create a copy of this object.
1101 Args:
1102 csv_filepath: The new filepath to use for saving the object to.
1103 If None, will not be saved.
1104 Warning: If the original path is used, it could lead to dataloss!
1105 """
1106 pd_copy = PerformanceDataFrame(
1107 csv_filepath=csv_filepath,
1108 solvers=self.solvers,
1109 configurations=self.configurations,
1110 objectives=self.objectives,
1111 instances=self.instances,
1112 n_runs=self.num_runs,
1113 )
1114 # Copy values
1115 for column_index in self.columns:
1116 for index in self.index:
1117 pd_copy.at[index, column_index] = self.loc[index, column_index]
1118 # Ensure everything is sorted?
1119 return pd_copy
1121 def clean_csv(self: PerformanceDataFrame) -> None:
1122 """Set all values in Performance Data to None."""
1123 self[:] = PerformanceDataFrame.missing_value
1124 self.save_csv()