Coverage for sparkle/platform/output/parallel_portfolio_output.py: 98%

47 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-11-05 14:48 +0000

1#!/usr/bin/env python3 

2"""Sparkle class to organise configuration output.""" 

3 

4from __future__ import annotations 

5 

6from sparkle.platform import generate_report_for_parallel_portfolio as sgrfpp 

7from sparkle.instance import InstanceSet 

8from sparkle.platform.output.structures import ParallelPortfolioResults 

9from sparkle.types import SparkleObjective 

10 

11import json 

12from pathlib import Path 

13import csv 

14 

15 

16class ParallelPortfolioOutput: 

17 """Class that collects parallel portfolio data and outputs it a JSON format.""" 

18 

19 def __init__(self: ParallelPortfolioOutput, parallel_portfolio_path: Path, 

20 instance_set: InstanceSet, 

21 objective: SparkleObjective, 

22 output: Path) -> None: 

23 """Initialize ParallelPortfolioOutput class. 

24 

25 Args: 

26 parallel_portfolio_path: Path to parallel portfolio output directory 

27 instance_set: List of instances 

28 objective: The objective of the portfolio 

29 output: Path to the output directory 

30 """ 

31 if not output.is_file(): 

32 self.output = output / "parallel_portfolio.json" 

33 else: 

34 self.output = output 

35 

36 self.instance_set = instance_set 

37 csv_data = [line for line in 

38 csv.reader((parallel_portfolio_path / "results.csv").open("r"))] 

39 header = csv_data[0] 

40 csv_data = csv_data[1:] 

41 solver_column = header.index("Solver") 

42 instance_column = header.index("Instance") 

43 status_column = header.index("status") 

44 objective_column = header.index(objective.name) 

45 self.solver_list = list(set([line[solver_column] for line in csv_data])) 

46 

47 # Collect solver performance for each instance 

48 instance_results = {name: [] for name in instance_set._instance_names} 

49 for row in csv_data: 

50 if row[instance_column] in instance_results.keys(): 

51 instance_results[row[instance_column]].append( 

52 [row[solver_column], row[status_column], row[objective_column]]) 

53 

54 solvers_solutions = self.get_solver_solutions(self.solver_list, csv_data) 

55 unsolved_instances = self.instance_set.size - sum([solvers_solutions[key] 

56 for key in solvers_solutions]) 

57 # sbs_runtime is redundant, the same information is available in instance_results 

58 _, sbs, runtime_all_solvers, _ =\ 

59 sgrfpp.get_portfolio_metrics(self.solver_list, 

60 instance_set, 

61 instance_results, 

62 objective) 

63 

64 self.results = ParallelPortfolioResults(unsolved_instances, 

65 sbs, runtime_all_solvers, 

66 instance_results) 

67 

68 def get_solver_solutions(self: ParallelPortfolioOutput, 

69 solver_list: list[str], 

70 csv_data: list[list[str]]) -> dict: 

71 """Return dictionary with solution count for each solver.""" 

72 # Default initalisation, increase solution counter for each successful evaluation 

73 solvers_solutions = {solver: 0 for solver in solver_list} 

74 instance_names_copy = self.instance_set._instance_names.copy() 

75 

76 for line in csv_data: 

77 if line[0] in instance_names_copy and line[2].lower() == "success": 

78 solvers_solutions[line[1]] += 1 

79 instance_names_copy.remove(line[0]) 

80 

81 return solvers_solutions 

82 

83 def serialize_instances(self: ParallelPortfolioOutput, 

84 instances: list[InstanceSet]) -> dict: 

85 """Transform Instances to dictionary format.""" 

86 # Even though parallel portfolio currently doesn't support multi sets, 

87 # this function is already ready to support mutliple sets 

88 return { 

89 "number_of_instance_sets": len(instances), 

90 "instance_sets": [ 

91 { 

92 "name": instance.name, 

93 "number_of_instances": instance.size 

94 } 

95 for instance in instances 

96 ] 

97 } 

98 

99 def serialize_results(self: ParallelPortfolioOutput, 

100 pr: ParallelPortfolioResults) -> dict: 

101 """Transform results to dictionary format.""" 

102 return { 

103 "sbs": pr.sbs, 

104 "unsolved_instances": pr.unsolved_instances, 

105 "runtime_solvers": pr.runtime_solvers, 

106 "solvers_performance": pr.solver_performance, 

107 "instance_results": pr.instance_results, 

108 } 

109 

110 def write_output(self: ParallelPortfolioOutput) -> None: 

111 """Write data into a JSON file.""" 

112 output_data = { 

113 "number_of_solvers": len(self.solver_list), 

114 "solvers": self.solver_list, 

115 "instances": self.serialize_instances([self.instance_set]), 

116 "results": self.serialize_results(self.results), 

117 } 

118 

119 self.output.parent.mkdir(parents=True, exist_ok=True) 

120 with self.output.open("w") as f: 

121 json.dump(output_data, f, indent=4)