Coverage for sparkle/structures/feature_dataframe.py: 88%

90 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-11-05 14:48 +0000

1#!/usr/bin/env python3 

2# -*- coding: UTF-8 -*- 

3"""Module to manage feature data files and common operations on them.""" 

4from __future__ import annotations 

5import pandas as pd 

6import math 

7from pathlib import Path 

8 

9 

10class FeatureDataFrame: 

11 """Class to manage feature data CSV files and common operations on them.""" 

12 missing_value = math.nan 

13 multi_dim_names = ["FeatureGroup", "FeatureName", "Extractor"] 

14 

15 def __init__(self: FeatureDataFrame, 

16 csv_filepath: Path, 

17 instances: list[str] = [], 

18 extractor_data: dict[str, list[tuple[str, str]]] = {} 

19 ) -> None: 

20 """Initialise a SparkleFeatureDataCSV object. 

21 

22 Arguments: 

23 csv_filepath: The Path for the CSV storage. If it does not exist, 

24 a new DataFrame will be initialised and stored here. 

25 instances: The list of instances (Columns) to be added to the DataFrame. 

26 extractor_data: A dictionary with extractor names as key, and a list of 

27 tuples ordered as [(feature_group, feature_name), ...] as value. 

28 """ 

29 self.csv_filepath = csv_filepath 

30 if self.csv_filepath.exists(): 

31 # Read from file 

32 self.dataframe = pd.read_csv(self.csv_filepath, 

33 index_col=FeatureDataFrame.multi_dim_names) 

34 return 

35 # Unfold the extractor_data into lists 

36 multi_index_lists = [[], [], []] 

37 for extractor in extractor_data: 

38 for group, feature_name in extractor_data[extractor]: 

39 multi_index_lists[0].append(group) 

40 multi_index_lists[1].append(feature_name) 

41 multi_index_lists[2].append(extractor) 

42 # Initialise new dataframe 

43 self.dataframe = pd.DataFrame(FeatureDataFrame.missing_value, 

44 index=multi_index_lists, 

45 columns=instances) 

46 self.dataframe.index.names = FeatureDataFrame.multi_dim_names 

47 self.save_csv() 

48 

49 def add_extractor(self: FeatureDataFrame, 

50 extractor: str, 

51 extractor_features: list[tuple[str, str]], 

52 values: list[list[float]] = None) -> None: 

53 """Add an extractor and its feature names to the dataframe. 

54 

55 Arguments: 

56 extractor: Name of the extractor 

57 extractor_features: Tuples of [FeatureGroup, FeatureName] 

58 values: Initial values of the Extractor per instance in the dataframe. 

59 Defaults to FeatureDataFrame.missing_value. 

60 """ 

61 if values is None: 

62 values = [FeatureDataFrame.missing_value 

63 for _ in range(len(extractor_features))] 

64 # Unfold to indices to lists 

65 for index, pair in enumerate(extractor_features): 

66 feature_group, feature = pair 

67 self.dataframe.loc[(feature_group, feature, extractor), :] = values[index] 

68 

69 def add_instances(self: FeatureDataFrame, 

70 instance: str | list[str], 

71 values: list[float] = None) -> None: 

72 """Add one or more instances to the dataframe.""" 

73 if values is None: 

74 values = FeatureDataFrame.missing_value 

75 self.dataframe[instance] = values 

76 

77 def remove_extractor(self: FeatureDataFrame, 

78 extractor: str) -> None: 

79 """Remove an extractor from the dataframe.""" 

80 self.dataframe.drop(extractor, axis=0, level="Extractor", inplace=True) 

81 

82 def remove_instances(self: FeatureDataFrame, 

83 instances: str | list[str]) -> None: 

84 """Remove an instance from the dataframe.""" 

85 self.dataframe.drop(instances, axis=1, inplace=True) 

86 

87 def get_feature_groups(self: FeatureDataFrame, 

88 extractor: str | list[str] = None) -> list[str]: 

89 """Retrieve the feature groups in the dataframe. 

90 

91 Args: 

92 extractor: Optional. If extractor(s) are given, 

93 yields only feature groups of that extractor. 

94 

95 Returns: 

96 A list of feature groups. 

97 """ 

98 indices = self.dataframe.index 

99 if extractor is not None: 

100 if isinstance(extractor, str): 

101 extractor = [extractor] 

102 indices = indices[indices.isin(extractor, level=2)] 

103 return indices.get_level_values(level=0).unique().to_list() 

104 

105 def get_value(self: FeatureDataFrame, 

106 instance: str, 

107 extractor: str, 

108 feature_group: str, 

109 feature_name: str) -> None: 

110 """Return a value in the dataframe.""" 

111 return self.dataframe.loc[(feature_group, feature_name, extractor), instance] 

112 

113 def set_value(self: FeatureDataFrame, 

114 instance: str, 

115 extractor: str, 

116 feature_group: str, 

117 feature_name: str, 

118 value: float) -> None: 

119 """Set a value in the dataframe.""" 

120 self.dataframe.loc[(feature_group, feature_name, extractor), instance] = value 

121 

122 def has_missing_vectors(self: FeatureDataFrame) -> bool: 

123 """Returns True if there are any Extractors still to be run on any instance.""" 

124 for instance in self.dataframe.columns: 

125 for extractor in self.extractors: 

126 extractor_features = self.dataframe.xs(extractor, level=2, 

127 drop_level=False) 

128 if extractor_features.loc[:, instance].isnull().all(): 

129 return True 

130 return False 

131 

132 def remaining_jobs(self: FeatureDataFrame) -> list[tuple[str, str, str]]: 

133 """Determines needed feature computations per instance/extractor/group. 

134 

135 Returns: 

136 list: A list of tuples representing (Extractor, Instance, Feature Group). 

137 that needs to be computed. 

138 """ 

139 remaining_jobs = [] 

140 for extractor in self.extractors: 

141 for group in self.get_feature_groups(extractor): 

142 subset = self.dataframe.xs((group, extractor), level=(0, 2)) 

143 for instance in self.dataframe.columns: 

144 if subset.loc[:, instance].isnull().all(): 

145 remaining_jobs.append((instance, extractor, group)) 

146 return remaining_jobs 

147 

148 def get_instance(self: FeatureDataFrame, instance: str) -> list[float]: 

149 """Return the feature vector of an instance.""" 

150 return self.dataframe[instance].tolist() 

151 

152 def impute_missing_values(self: FeatureDataFrame) -> None: 

153 """Imputes all NaN values by taking the average feature value.""" 

154 self.dataframe = self.dataframe.T.fillna(self.dataframe.mean(axis=1)).T 

155 

156 def has_missing_value(self: FeatureDataFrame) -> bool: 

157 """Return whether there are missing values in the feature data.""" 

158 return self.dataframe.isnull().any().any() 

159 

160 def reset_dataframe(self: FeatureDataFrame) -> bool: 

161 """Resets all values to FeatureDataFrame.missing_value.""" 

162 self.dataframe.loc[:, :] = FeatureDataFrame.missing_value 

163 

164 def sort(self: FeatureDataFrame) -> None: 

165 """Sorts the DataFrame by Multi-Index for readability.""" 

166 self.dataframe.sort_index(level=FeatureDataFrame.multi_dim_names) 

167 

168 @property 

169 def instances(self: FeatureDataFrame) -> list[str]: 

170 """Return the instances in the dataframe.""" 

171 return self.dataframe.columns 

172 

173 @property 

174 def extractors(self: FeatureDataFrame) -> list[str]: 

175 """Returns all unique extractors in the DataFrame.""" 

176 return self.dataframe.index.get_level_values("Extractor").unique().to_list() 

177 

178 def save_csv(self: FeatureDataFrame, csv_filepath: Path = None) -> None: 

179 """Write a CSV to the given path. 

180 

181 Args: 

182 csv_filepath: String path to the csv file. Defaults to self.csv_filepath. 

183 """ 

184 csv_filepath = self.csv_filepath if csv_filepath is None else csv_filepath 

185 self.dataframe.to_csv(csv_filepath) 

186 

187 def to_autofolio(self: FeatureDataFrame, 

188 target: Path = None) -> Path: 

189 """Port the data to a format acceptable for AutoFolio.""" 

190 autofolio_df = self.dataframe.copy() 

191 autofolio_df.index = autofolio_df.index.map("_".join) # Reduce Multi-Index 

192 autofolio_df = autofolio_df.T # Autofolio has feature columns and instance rows 

193 if target is None: 

194 path = self.csv_filepath.parent / f"autofolio_{self.csv_filepath.name}" 

195 else: 

196 path = target / f"autofolio_{self.csv_filepath.name}" 

197 autofolio_df.to_csv(path) 

198 return path