Coverage for sparkle/structures/feature_dataframe.py: 87%

93 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-07 15:22 +0000

1"""Module to manage feature data files and common operations on them.""" 

2from __future__ import annotations 

3import pandas as pd 

4import math 

5from pathlib import Path 

6 

7 

8class FeatureDataFrame: 

9 """Class to manage feature data CSV files and common operations on them.""" 

10 missing_value = math.nan 

11 multi_dim_names = ["FeatureGroup", "FeatureName", "Extractor"] 

12 

13 def __init__(self: FeatureDataFrame, 

14 csv_filepath: Path, 

15 instances: list[str] = [], 

16 extractor_data: dict[str, list[tuple[str, str]]] = {} 

17 ) -> None: 

18 """Initialise a FeatureDataFrame object. 

19 

20 Arguments: 

21 csv_filepath: The Path for the CSV storage. If it does not exist, 

22 a new DataFrame will be initialised and stored here. 

23 instances: The list of instances (Columns) to be added to the DataFrame. 

24 extractor_data: A dictionary with extractor names as key, and a list of 

25 tuples ordered as [(feature_group, feature_name), ...] as value. 

26 """ 

27 self.csv_filepath = csv_filepath 

28 if self.csv_filepath.exists(): 

29 # Read from file 

30 self.dataframe = pd.read_csv(self.csv_filepath, 

31 index_col=FeatureDataFrame.multi_dim_names) 

32 return 

33 # Unfold the extractor_data into lists 

34 multi_index_lists = [[], [], []] 

35 for extractor in extractor_data: 

36 for group, feature_name in extractor_data[extractor]: 

37 multi_index_lists[0].append(group) 

38 multi_index_lists[1].append(feature_name) 

39 multi_index_lists[2].append(extractor) 

40 # Initialise new dataframe 

41 self.dataframe = pd.DataFrame(FeatureDataFrame.missing_value, 

42 index=multi_index_lists, 

43 columns=instances) 

44 self.dataframe.index.names = FeatureDataFrame.multi_dim_names 

45 self.save_csv() 

46 

47 def add_extractor(self: FeatureDataFrame, 

48 extractor: str, 

49 extractor_features: list[tuple[str, str]], 

50 values: list[list[float]] = None) -> None: 

51 """Add an extractor and its feature names to the dataframe. 

52 

53 Arguments: 

54 extractor: Name of the extractor 

55 extractor_features: Tuples of [FeatureGroup, FeatureName] 

56 values: Initial values of the Extractor per instance in the dataframe. 

57 Defaults to FeatureDataFrame.missing_value. 

58 """ 

59 if values is None: 

60 values = [FeatureDataFrame.missing_value 

61 for _ in range(len(extractor_features))] 

62 # Unfold to indices to lists 

63 for index, pair in enumerate(extractor_features): 

64 feature_group, feature = pair 

65 self.dataframe.loc[(feature_group, feature, extractor), :] = values[index] 

66 

67 def add_instances(self: FeatureDataFrame, 

68 instance: str | list[str], 

69 values: list[float] = None) -> None: 

70 """Add one or more instances to the dataframe.""" 

71 if values is None: 

72 values = FeatureDataFrame.missing_value 

73 self.dataframe[instance] = values 

74 

75 def remove_extractor(self: FeatureDataFrame, 

76 extractor: str) -> None: 

77 """Remove an extractor from the dataframe.""" 

78 self.dataframe.drop(extractor, axis=0, level="Extractor", inplace=True) 

79 

80 def remove_instances(self: FeatureDataFrame, 

81 instances: str | list[str]) -> None: 

82 """Remove an instance from the dataframe.""" 

83 self.dataframe.drop(instances, axis=1, inplace=True) 

84 

85 def get_feature_groups(self: FeatureDataFrame, 

86 extractor: str | list[str] = None) -> list[str]: 

87 """Retrieve the feature groups in the dataframe. 

88 

89 Args: 

90 extractor: Optional. If extractor(s) are given, 

91 yields only feature groups of that extractor. 

92 

93 Returns: 

94 A list of feature groups. 

95 """ 

96 indices = self.dataframe.index 

97 if extractor is not None: 

98 if isinstance(extractor, str): 

99 extractor = [extractor] 

100 indices = indices[indices.isin(extractor, level=2)] 

101 return indices.get_level_values(level=0).unique().to_list() 

102 

103 def get_value(self: FeatureDataFrame, 

104 instance: str, 

105 extractor: str, 

106 feature_group: str, 

107 feature_name: str) -> None: 

108 """Return a value in the dataframe.""" 

109 return self.dataframe.loc[(feature_group, feature_name, extractor), instance] 

110 

111 def set_value(self: FeatureDataFrame, 

112 instance: str, 

113 extractor: str, 

114 feature_group: str, 

115 feature_name: str, 

116 value: float) -> None: 

117 """Set a value in the dataframe.""" 

118 self.dataframe.loc[(feature_group, feature_name, extractor), instance] = value 

119 

120 def has_missing_vectors(self: FeatureDataFrame) -> bool: 

121 """Returns True if there are any Extractors still to be run on any instance.""" 

122 for instance in self.dataframe.columns: 

123 for extractor in self.extractors: 

124 extractor_features = self.dataframe.xs(extractor, level=2, 

125 drop_level=False) 

126 if extractor_features.loc[:, instance].isnull().all(): 

127 return True 

128 return False 

129 

130 def remaining_jobs(self: FeatureDataFrame) -> list[tuple[str, str, str]]: 

131 """Determines needed feature computations per instance/extractor/group. 

132 

133 Returns: 

134 list: A list of tuples representing (Extractor, Instance, Feature Group). 

135 that needs to be computed. 

136 """ 

137 remaining_jobs = [] 

138 for extractor in self.extractors: 

139 for group in self.get_feature_groups(extractor): 

140 subset = self.dataframe.xs((group, extractor), level=(0, 2)) 

141 for instance in self.dataframe.columns: 

142 if subset.loc[:, instance].isnull().all(): 

143 remaining_jobs.append((instance, extractor, group)) 

144 return remaining_jobs 

145 

146 def get_instance(self: FeatureDataFrame, instance: str) -> list[float]: 

147 """Return the feature vector of an instance.""" 

148 return self.dataframe[instance].tolist() 

149 

150 def impute_missing_values(self: FeatureDataFrame) -> None: 

151 """Imputes all NaN values by taking the average feature value.""" 

152 self.dataframe = self.dataframe.T.fillna(self.dataframe.mean(axis=1)).T 

153 

154 def has_missing_value(self: FeatureDataFrame) -> bool: 

155 """Return whether there are missing values in the feature data.""" 

156 return self.dataframe.isnull().any().any() 

157 

158 def reset_dataframe(self: FeatureDataFrame) -> bool: 

159 """Resets all values to FeatureDataFrame.missing_value.""" 

160 self.dataframe.loc[:, :] = FeatureDataFrame.missing_value 

161 

162 def sort(self: FeatureDataFrame) -> None: 

163 """Sorts the DataFrame by Multi-Index for readability.""" 

164 self.dataframe.sort_index(level=FeatureDataFrame.multi_dim_names) 

165 

166 @property 

167 def instances(self: FeatureDataFrame) -> list[str]: 

168 """Return the instances in the dataframe.""" 

169 return self.dataframe.columns 

170 

171 @property 

172 def extractors(self: FeatureDataFrame) -> list[str]: 

173 """Returns all unique extractors in the DataFrame.""" 

174 return self.dataframe.index.get_level_values("Extractor").unique().to_list() 

175 

176 @property 

177 def num_features(self: FeatureDataFrame) -> int: 

178 """Return the number of features in the dataframe.""" 

179 return self.dataframe.shape[0] 

180 

181 def save_csv(self: FeatureDataFrame, csv_filepath: Path = None) -> None: 

182 """Write a CSV to the given path. 

183 

184 Args: 

185 csv_filepath: String path to the csv file. Defaults to self.csv_filepath. 

186 """ 

187 csv_filepath = self.csv_filepath if csv_filepath is None else csv_filepath 

188 self.dataframe.to_csv(csv_filepath) 

189 

190 def to_autofolio(self: FeatureDataFrame, 

191 target: Path = None) -> Path: 

192 """Port the data to a format acceptable for AutoFolio.""" 

193 autofolio_df = self.dataframe.copy() 

194 autofolio_df.index = autofolio_df.index.map("_".join) # Reduce Multi-Index 

195 autofolio_df = autofolio_df.T # Autofolio has feature columns and instance rows 

196 if target is None: 

197 path = self.csv_filepath.parent / f"autofolio_{self.csv_filepath.name}" 

198 else: 

199 path = target / f"autofolio_{self.csv_filepath.name}" 

200 autofolio_df.to_csv(path) 

201 return path