Coverage for sparkle/platform/output/parallel_portfolio_output.py: 0%

48 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-09-29 10:17 +0000

1#!/usr/bin/env python3 

2"""Sparkle class to organise configuration output.""" 

3 

4from __future__ import annotations 

5 

6from sparkle.platform import generate_report_for_parallel_portfolio as sgrfpp 

7from sparkle.instance import InstanceSet 

8from sparkle.platform.output.structures import ParallelPortfolioResults 

9from sparkle.types import SparkleObjective 

10 

11import json 

12from pathlib import Path 

13import csv 

14 

15 

16class ParallelPortfolioOutput: 

17 """Class that collects parallel portfolio data and outputs it a JSON format.""" 

18 

19 def __init__( 

20 self: ParallelPortfolioOutput, 

21 parallel_portfolio_path: Path, 

22 instance_set: InstanceSet, 

23 objective: SparkleObjective, 

24 output: Path, 

25 ) -> None: 

26 """Initialize ParallelPortfolioOutput class. 

27 

28 Args: 

29 parallel_portfolio_path: Path to parallel portfolio output directory 

30 instance_set: List of instances 

31 objective: The objective of the portfolio 

32 output: Path to the output directory 

33 """ 

34 if not output.is_file(): 

35 self.output = output / "parallel_portfolio.json" 

36 else: 

37 self.output = output 

38 

39 self.instance_set = instance_set 

40 csv_data = [ 

41 line 

42 for line in csv.reader((parallel_portfolio_path / "results.csv").open("r")) 

43 ] 

44 header = csv_data[0] 

45 csv_data = csv_data[1:] 

46 solver_column = header.index("Solver") 

47 instance_column = header.index("Instance") 

48 status_column = [i for i, v in enumerate(header) if v.startswith("status")][0] 

49 objective_column = header.index(objective.name) 

50 self.solver_list = list(set([line[solver_column] for line in csv_data])) 

51 

52 # Collect solver performance for each instance 

53 instance_results = {name: [] for name in instance_set._instance_names} 

54 for row in csv_data: 

55 if row[instance_column] in instance_results.keys(): 

56 instance_results[row[instance_column]].append( 

57 [row[solver_column], row[status_column], row[objective_column]] 

58 ) 

59 

60 solvers_solutions = self.get_solver_solutions(self.solver_list, csv_data) 

61 unsolved_instances = self.instance_set.size - sum( 

62 [solvers_solutions[key] for key in solvers_solutions] 

63 ) 

64 # sbs_runtime is redundant, the same information is available in instance_results 

65 _, sbs, runtime_all_solvers, _ = sgrfpp.get_portfolio_metrics( 

66 self.solver_list, instance_set, instance_results, objective 

67 ) 

68 

69 self.results = ParallelPortfolioResults( 

70 unsolved_instances, sbs, runtime_all_solvers, instance_results 

71 ) 

72 

73 def get_solver_solutions( 

74 self: ParallelPortfolioOutput, 

75 solver_list: list[str], 

76 csv_data: list[list[str]], 

77 ) -> dict: 

78 """Return dictionary with solution count for each solver.""" 

79 # Default initalisation, increase solution counter for each successful evaluation 

80 solvers_solutions = {solver: 0 for solver in solver_list} 

81 instance_names_copy = self.instance_set._instance_names.copy() 

82 

83 for line in csv_data: 

84 if line[0] in instance_names_copy and line[2].lower() == "success": 

85 solvers_solutions[line[1]] += 1 

86 instance_names_copy.remove(line[0]) 

87 

88 return solvers_solutions 

89 

90 def serialise_instances( 

91 self: ParallelPortfolioOutput, instances: list[InstanceSet] 

92 ) -> dict: 

93 """Transform Instances to dictionary format.""" 

94 # Even though parallel portfolio currently doesn't support multi sets, 

95 # this function can 

96 return { 

97 "number_of_instance_sets": len(instances), 

98 "instance_sets": [ 

99 {"name": instance.name, "number_of_instances": instance.size} 

100 for instance in instances 

101 ], 

102 } 

103 

104 def serialise_results( 

105 self: ParallelPortfolioOutput, pr: ParallelPortfolioResults 

106 ) -> dict: 

107 """Transform results to dictionary format.""" 

108 return { 

109 "sbs": pr.sbs, 

110 "unsolved_instances": pr.unsolved_instances, 

111 "runtime_solvers": pr.runtime_solvers, 

112 "solvers_performance": pr.solver_performance, 

113 "instance_results": pr.instance_results, 

114 } 

115 

116 def serialise(self: ParallelPortfolioOutput) -> dict: 

117 """Transform to dictionary format.""" 

118 return { 

119 "number_of_solvers": len(self.solver_list), 

120 "solvers": self.solver_list, 

121 "instances": self.serialise_instances([self.instance_set]), 

122 "results": self.serialise_results(self.results), 

123 } 

124 

125 def write_output(self: ParallelPortfolioOutput, output: Path) -> None: 

126 """Write data into a JSON file.""" 

127 output = output / "configuration.json" if output.is_dir() else output 

128 with output.open("w") as f: 

129 json.dump(self.serialise(), f, indent=4)