Coverage for sparkle/platform/output/parallel_portfolio_output.py: 98%
47 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-07 15:22 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-07 15:22 +0000
1#!/usr/bin/env python3
2"""Sparkle class to organise configuration output."""
4from __future__ import annotations
6from sparkle.platform import generate_report_for_parallel_portfolio as sgrfpp
7from sparkle.instance import InstanceSet
8from sparkle.platform.output.structures import ParallelPortfolioResults
9from sparkle.types import SparkleObjective
11import json
12from pathlib import Path
13import csv
16class ParallelPortfolioOutput:
17 """Class that collects parallel portfolio data and outputs it a JSON format."""
19 def __init__(self: ParallelPortfolioOutput, parallel_portfolio_path: Path,
20 instance_set: InstanceSet,
21 objective: SparkleObjective,
22 output: Path) -> None:
23 """Initialize ParallelPortfolioOutput class.
25 Args:
26 parallel_portfolio_path: Path to parallel portfolio output directory
27 instance_set: List of instances
28 objective: The objective of the portfolio
29 output: Path to the output directory
30 """
31 if not output.is_file():
32 self.output = output / "parallel_portfolio.json"
33 else:
34 self.output = output
36 self.instance_set = instance_set
37 csv_data = [line for line in
38 csv.reader((parallel_portfolio_path / "results.csv").open("r"))]
39 header = csv_data[0]
40 csv_data = csv_data[1:]
41 solver_column = header.index("Solver")
42 instance_column = header.index("Instance")
43 status_column = [i for i, v in enumerate(header)
44 if v.startswith("status")][0]
45 objective_column = header.index(objective.name)
46 self.solver_list = list(set([line[solver_column] for line in csv_data]))
48 # Collect solver performance for each instance
49 instance_results = {name: [] for name in instance_set._instance_names}
50 for row in csv_data:
51 if row[instance_column] in instance_results.keys():
52 instance_results[row[instance_column]].append(
53 [row[solver_column], row[status_column], row[objective_column]])
55 solvers_solutions = self.get_solver_solutions(self.solver_list, csv_data)
56 unsolved_instances = self.instance_set.size - sum([solvers_solutions[key]
57 for key in solvers_solutions])
58 # sbs_runtime is redundant, the same information is available in instance_results
59 _, sbs, runtime_all_solvers, _ =\
60 sgrfpp.get_portfolio_metrics(self.solver_list,
61 instance_set,
62 instance_results,
63 objective)
65 self.results = ParallelPortfolioResults(unsolved_instances,
66 sbs, runtime_all_solvers,
67 instance_results)
69 def get_solver_solutions(self: ParallelPortfolioOutput,
70 solver_list: list[str],
71 csv_data: list[list[str]]) -> dict:
72 """Return dictionary with solution count for each solver."""
73 # Default initalisation, increase solution counter for each successful evaluation
74 solvers_solutions = {solver: 0 for solver in solver_list}
75 instance_names_copy = self.instance_set._instance_names.copy()
77 for line in csv_data:
78 if line[0] in instance_names_copy and line[2].lower() == "success":
79 solvers_solutions[line[1]] += 1
80 instance_names_copy.remove(line[0])
82 return solvers_solutions
84 def serialize_instances(self: ParallelPortfolioOutput,
85 instances: list[InstanceSet]) -> dict:
86 """Transform Instances to dictionary format."""
87 # Even though parallel portfolio currently doesn't support multi sets,
88 # this function is already ready to support mutliple sets
89 return {
90 "number_of_instance_sets": len(instances),
91 "instance_sets": [
92 {
93 "name": instance.name,
94 "number_of_instances": instance.size
95 }
96 for instance in instances
97 ]
98 }
100 def serialize_results(self: ParallelPortfolioOutput,
101 pr: ParallelPortfolioResults) -> dict:
102 """Transform results to dictionary format."""
103 return {
104 "sbs": pr.sbs,
105 "unsolved_instances": pr.unsolved_instances,
106 "runtime_solvers": pr.runtime_solvers,
107 "solvers_performance": pr.solver_performance,
108 "instance_results": pr.instance_results,
109 }
111 def write_output(self: ParallelPortfolioOutput) -> None:
112 """Write data into a JSON file."""
113 output_data = {
114 "number_of_solvers": len(self.solver_list),
115 "solvers": self.solver_list,
116 "instances": self.serialize_instances([self.instance_set]),
117 "results": self.serialize_results(self.results),
118 }
120 self.output.parent.mkdir(parents=True, exist_ok=True)
121 with self.output.open("w") as f:
122 json.dump(output_data, f, indent=4)