Coverage for sparkle/platform/output/parallel_portfolio_output.py: 98%
47 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-05 14:48 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-05 14:48 +0000
1#!/usr/bin/env python3
2"""Sparkle class to organise configuration output."""
4from __future__ import annotations
6from sparkle.platform import generate_report_for_parallel_portfolio as sgrfpp
7from sparkle.instance import InstanceSet
8from sparkle.platform.output.structures import ParallelPortfolioResults
9from sparkle.types import SparkleObjective
11import json
12from pathlib import Path
13import csv
16class ParallelPortfolioOutput:
17 """Class that collects parallel portfolio data and outputs it a JSON format."""
19 def __init__(self: ParallelPortfolioOutput, parallel_portfolio_path: Path,
20 instance_set: InstanceSet,
21 objective: SparkleObjective,
22 output: Path) -> None:
23 """Initialize ParallelPortfolioOutput class.
25 Args:
26 parallel_portfolio_path: Path to parallel portfolio output directory
27 instance_set: List of instances
28 objective: The objective of the portfolio
29 output: Path to the output directory
30 """
31 if not output.is_file():
32 self.output = output / "parallel_portfolio.json"
33 else:
34 self.output = output
36 self.instance_set = instance_set
37 csv_data = [line for line in
38 csv.reader((parallel_portfolio_path / "results.csv").open("r"))]
39 header = csv_data[0]
40 csv_data = csv_data[1:]
41 solver_column = header.index("Solver")
42 instance_column = header.index("Instance")
43 status_column = header.index("status")
44 objective_column = header.index(objective.name)
45 self.solver_list = list(set([line[solver_column] for line in csv_data]))
47 # Collect solver performance for each instance
48 instance_results = {name: [] for name in instance_set._instance_names}
49 for row in csv_data:
50 if row[instance_column] in instance_results.keys():
51 instance_results[row[instance_column]].append(
52 [row[solver_column], row[status_column], row[objective_column]])
54 solvers_solutions = self.get_solver_solutions(self.solver_list, csv_data)
55 unsolved_instances = self.instance_set.size - sum([solvers_solutions[key]
56 for key in solvers_solutions])
57 # sbs_runtime is redundant, the same information is available in instance_results
58 _, sbs, runtime_all_solvers, _ =\
59 sgrfpp.get_portfolio_metrics(self.solver_list,
60 instance_set,
61 instance_results,
62 objective)
64 self.results = ParallelPortfolioResults(unsolved_instances,
65 sbs, runtime_all_solvers,
66 instance_results)
68 def get_solver_solutions(self: ParallelPortfolioOutput,
69 solver_list: list[str],
70 csv_data: list[list[str]]) -> dict:
71 """Return dictionary with solution count for each solver."""
72 # Default initalisation, increase solution counter for each successful evaluation
73 solvers_solutions = {solver: 0 for solver in solver_list}
74 instance_names_copy = self.instance_set._instance_names.copy()
76 for line in csv_data:
77 if line[0] in instance_names_copy and line[2].lower() == "success":
78 solvers_solutions[line[1]] += 1
79 instance_names_copy.remove(line[0])
81 return solvers_solutions
83 def serialize_instances(self: ParallelPortfolioOutput,
84 instances: list[InstanceSet]) -> dict:
85 """Transform Instances to dictionary format."""
86 # Even though parallel portfolio currently doesn't support multi sets,
87 # this function is already ready to support mutliple sets
88 return {
89 "number_of_instance_sets": len(instances),
90 "instance_sets": [
91 {
92 "name": instance.name,
93 "number_of_instances": instance.size
94 }
95 for instance in instances
96 ]
97 }
99 def serialize_results(self: ParallelPortfolioOutput,
100 pr: ParallelPortfolioResults) -> dict:
101 """Transform results to dictionary format."""
102 return {
103 "sbs": pr.sbs,
104 "unsolved_instances": pr.unsolved_instances,
105 "runtime_solvers": pr.runtime_solvers,
106 "solvers_performance": pr.solver_performance,
107 "instance_results": pr.instance_results,
108 }
110 def write_output(self: ParallelPortfolioOutput) -> None:
111 """Write data into a JSON file."""
112 output_data = {
113 "number_of_solvers": len(self.solver_list),
114 "solvers": self.solver_list,
115 "instances": self.serialize_instances([self.instance_set]),
116 "results": self.serialize_results(self.results),
117 }
119 self.output.parent.mkdir(parents=True, exist_ok=True)
120 with self.output.open("w") as f:
121 json.dump(output_data, f, indent=4)