Coverage for sparkle/platform/output/parallel_portfolio_output.py: 98%

47 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-07 15:22 +0000

1#!/usr/bin/env python3 

2"""Sparkle class to organise configuration output.""" 

3 

4from __future__ import annotations 

5 

6from sparkle.platform import generate_report_for_parallel_portfolio as sgrfpp 

7from sparkle.instance import InstanceSet 

8from sparkle.platform.output.structures import ParallelPortfolioResults 

9from sparkle.types import SparkleObjective 

10 

11import json 

12from pathlib import Path 

13import csv 

14 

15 

16class ParallelPortfolioOutput: 

17 """Class that collects parallel portfolio data and outputs it a JSON format.""" 

18 

19 def __init__(self: ParallelPortfolioOutput, parallel_portfolio_path: Path, 

20 instance_set: InstanceSet, 

21 objective: SparkleObjective, 

22 output: Path) -> None: 

23 """Initialize ParallelPortfolioOutput class. 

24 

25 Args: 

26 parallel_portfolio_path: Path to parallel portfolio output directory 

27 instance_set: List of instances 

28 objective: The objective of the portfolio 

29 output: Path to the output directory 

30 """ 

31 if not output.is_file(): 

32 self.output = output / "parallel_portfolio.json" 

33 else: 

34 self.output = output 

35 

36 self.instance_set = instance_set 

37 csv_data = [line for line in 

38 csv.reader((parallel_portfolio_path / "results.csv").open("r"))] 

39 header = csv_data[0] 

40 csv_data = csv_data[1:] 

41 solver_column = header.index("Solver") 

42 instance_column = header.index("Instance") 

43 status_column = [i for i, v in enumerate(header) 

44 if v.startswith("status")][0] 

45 objective_column = header.index(objective.name) 

46 self.solver_list = list(set([line[solver_column] for line in csv_data])) 

47 

48 # Collect solver performance for each instance 

49 instance_results = {name: [] for name in instance_set._instance_names} 

50 for row in csv_data: 

51 if row[instance_column] in instance_results.keys(): 

52 instance_results[row[instance_column]].append( 

53 [row[solver_column], row[status_column], row[objective_column]]) 

54 

55 solvers_solutions = self.get_solver_solutions(self.solver_list, csv_data) 

56 unsolved_instances = self.instance_set.size - sum([solvers_solutions[key] 

57 for key in solvers_solutions]) 

58 # sbs_runtime is redundant, the same information is available in instance_results 

59 _, sbs, runtime_all_solvers, _ =\ 

60 sgrfpp.get_portfolio_metrics(self.solver_list, 

61 instance_set, 

62 instance_results, 

63 objective) 

64 

65 self.results = ParallelPortfolioResults(unsolved_instances, 

66 sbs, runtime_all_solvers, 

67 instance_results) 

68 

69 def get_solver_solutions(self: ParallelPortfolioOutput, 

70 solver_list: list[str], 

71 csv_data: list[list[str]]) -> dict: 

72 """Return dictionary with solution count for each solver.""" 

73 # Default initalisation, increase solution counter for each successful evaluation 

74 solvers_solutions = {solver: 0 for solver in solver_list} 

75 instance_names_copy = self.instance_set._instance_names.copy() 

76 

77 for line in csv_data: 

78 if line[0] in instance_names_copy and line[2].lower() == "success": 

79 solvers_solutions[line[1]] += 1 

80 instance_names_copy.remove(line[0]) 

81 

82 return solvers_solutions 

83 

84 def serialize_instances(self: ParallelPortfolioOutput, 

85 instances: list[InstanceSet]) -> dict: 

86 """Transform Instances to dictionary format.""" 

87 # Even though parallel portfolio currently doesn't support multi sets, 

88 # this function is already ready to support mutliple sets 

89 return { 

90 "number_of_instance_sets": len(instances), 

91 "instance_sets": [ 

92 { 

93 "name": instance.name, 

94 "number_of_instances": instance.size 

95 } 

96 for instance in instances 

97 ] 

98 } 

99 

100 def serialize_results(self: ParallelPortfolioOutput, 

101 pr: ParallelPortfolioResults) -> dict: 

102 """Transform results to dictionary format.""" 

103 return { 

104 "sbs": pr.sbs, 

105 "unsolved_instances": pr.unsolved_instances, 

106 "runtime_solvers": pr.runtime_solvers, 

107 "solvers_performance": pr.solver_performance, 

108 "instance_results": pr.instance_results, 

109 } 

110 

111 def write_output(self: ParallelPortfolioOutput) -> None: 

112 """Write data into a JSON file.""" 

113 output_data = { 

114 "number_of_solvers": len(self.solver_list), 

115 "solvers": self.solver_list, 

116 "instances": self.serialize_instances([self.instance_set]), 

117 "results": self.serialize_results(self.results), 

118 } 

119 

120 self.output.parent.mkdir(parents=True, exist_ok=True) 

121 with self.output.open("w") as f: 

122 json.dump(output_data, f, indent=4)