Coverage for src / sparkle / structures / feature_dataframe.py: 86%

134 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 15:31 +0000

1"""Module to manage feature data files and common operations on them.""" 

2 

3from __future__ import annotations 

4import math 

5from pathlib import Path 

6 

7import pandas as pd 

8 

9 

10class FeatureDataFrame(pd.DataFrame): 

11 """Class to manage feature data CSV files and common operations on them.""" 

12 

13 missing_value = math.nan 

14 extractor_dim = "Extractor" 

15 feature_group_dim = "FeatureGroup" 

16 feature_name_dim = "FeatureName" 

17 instances_index_dim = "Instances" 

18 multi_dim_column_names = [extractor_dim, feature_group_dim, feature_name_dim] 

19 

20 def __init__( 

21 self: FeatureDataFrame, 

22 csv_filepath: Path, 

23 instances: list[str] = [], 

24 extractor_data: dict[str, list[tuple[str, str]]] = {}, 

25 ) -> None: 

26 """Initialise a FeatureDataFrame object. 

27 

28 Arguments: 

29 csv_filepath: The Path for the CSV storage. If it does not exist, 

30 a new DataFrame will be initialised and stored here. 

31 instances: The list of instances (Columns) to be added to the DataFrame. 

32 extractor_data: A dictionary with extractor names as key, and a list of 

33 tuples ordered as [(feature_group, feature_name), ...] as value. 

34 """ 

35 # Initialize a dataframe from an existing file 

36 if csv_filepath.exists(): 

37 # Read from file 

38 temp_df = pd.read_csv( 

39 csv_filepath, 

40 # index_col=FeatureDataFrame.multi_dim_names, 

41 header=[0, 1, 2], 

42 index_col=[0], 

43 dtype={ 

44 FeatureDataFrame.extractor_dim: str, 

45 FeatureDataFrame.feature_group_dim: str, 

46 FeatureDataFrame.feature_name_dim: str, 

47 FeatureDataFrame.instances_index_dim: str, 

48 }, 

49 on_bad_lines="skip", 

50 skip_blank_lines=True, 

51 ) 

52 super().__init__(temp_df) 

53 self.index.name = FeatureDataFrame.instances_index_dim 

54 self.csv_filepath = csv_filepath 

55 # Create a new dataframe 

56 else: 

57 # Unfold the extractor_data into lists 

58 if extractor_data: 

59 multi_column_lists = [ 

60 (extractor, group, feature_name) 

61 for extractor in extractor_data 

62 for group, feature_name in extractor_data[extractor] 

63 ] 

64 else: 

65 multi_column_lists = [ 

66 ( 

67 FeatureDataFrame.missing_value, 

68 FeatureDataFrame.missing_value, 

69 FeatureDataFrame.feature_name_dim, 

70 ) 

71 ] 

72 # Initialise new dataframe 

73 multi_columns = pd.MultiIndex.from_tuples( 

74 multi_column_lists, names=self.multi_dim_column_names 

75 ) 

76 super().__init__( 

77 data=self.missing_value, 

78 index=instances, 

79 columns=multi_columns, 

80 dtype=float, 

81 ) 

82 self.index.name = FeatureDataFrame.instances_index_dim 

83 self.csv_filepath = csv_filepath 

84 self.save_csv() 

85 

86 if self.index.duplicated().any(): # Drop all duplicates except for last 

87 self.reset_index(inplace=True) # Reset index to column 

88 self.drop_duplicates( 

89 subset=self.columns[0], keep="last", inplace=True 

90 ) # filter duplicates from index column 

91 self.set_index( 

92 self.columns[0], inplace=True 

93 ) # Restore the Instance Index (in-place) 

94 self.index.name = FeatureDataFrame.instances_index_dim 

95 

96 # Sort the index to optimize lookup speed 

97 self.sort_index(axis=0, inplace=True) 

98 self.sort_index(axis=1, inplace=True) 

99 

100 def add_extractor( 

101 self: FeatureDataFrame, 

102 extractor: str, 

103 extractor_features: list[tuple[str, str]], 

104 values: list[list[float]] = None, 

105 ) -> None: 

106 """Add an extractor and its feature names to the dataframe. 

107 

108 Arguments: 

109 extractor: Name of the extractor 

110 extractor_features: Tuples of [FeatureGroup, FeatureName] 

111 values: Initial values of the Extractor per instance in the dataframe. 

112 Defaults to FeatureDataFrame.missing_value. 

113 """ 

114 if extractor in self.extractors: 

115 print( 

116 f"WARNING: Tried adding already existing extractor {extractor} to " 

117 f"Feature DataFrame: {self.csv_filepath}" 

118 ) 

119 return 

120 if values is None: 

121 values = [self.missing_value] * len( 

122 extractor_features 

123 ) # Single missing value for each feature 

124 extractor_dim = self.columns.get_level_values(FeatureDataFrame.extractor_dim) 

125 # Unfold to indices to lists 

126 for index, (feature_group, feature) in enumerate(extractor_features): 

127 self[(extractor, feature_group, feature)] = values[index] 

128 if self.num_extractors > 1: 

129 # Upon successfull adding of the extractor, remove the nan extractor 

130 if str(math.nan) in extractor_dim: 

131 self.drop( 

132 str(math.nan), 

133 axis=1, 

134 level=FeatureDataFrame.extractor_dim, 

135 inplace=True, 

136 ) 

137 elif math.nan in extractor_dim: 

138 self.drop( 

139 math.nan, axis=1, level=FeatureDataFrame.extractor_dim, inplace=True 

140 ) 

141 

142 def add_instances( 

143 self: FeatureDataFrame, instance: str | list[str], values: list[float] = None 

144 ) -> None: 

145 """Add one or more instances to the dataframe.""" 

146 if values is None: 

147 values = FeatureDataFrame.missing_value 

148 if isinstance(instance, str): 

149 instance = [instance] 

150 # with warnings.catch_warnings(): # Block Pandas Performance Warnings 

151 # warnings.simplefilter(action="ignore", category=pd.errors.PerformanceWarning) 

152 for i in instance: 

153 self.loc[i] = values 

154 

155 def remove_extractor(self: FeatureDataFrame, extractor: str) -> None: 

156 """Remove an extractor from the dataframe.""" 

157 self.drop(extractor, axis=1, level=FeatureDataFrame.extractor_dim, inplace=True) 

158 # if self.num_extractors == 0: 

159 if self.num_extractors == 0: # make sure we have atleast one 'extractor' 

160 self.add_extractor( 

161 str(FeatureDataFrame.missing_value), 

162 [(FeatureDataFrame.missing_value, FeatureDataFrame.feature_name_dim)], 

163 ) 

164 

165 def remove_instances(self: FeatureDataFrame, instances: str | list[str]) -> None: 

166 """Remove an instance from the dataframe.""" 

167 # self.drop(instances, axis=1, inplace=True) 

168 self.drop(instances, axis=0, inplace=True) 

169 

170 def get_feature_groups( 

171 self: FeatureDataFrame, extractor: str | list[str] = None 

172 ) -> list[str]: 

173 """Retrieve the feature groups in the dataframe. 

174 

175 Args: 

176 extractor: Optional. If extractor(s) are given, 

177 yields only feature groups of that extractor. 

178 

179 Returns: 

180 A list of feature groups. 

181 """ 

182 columns = self.columns 

183 if extractor is not None: 

184 if isinstance(extractor, str): 

185 extractor = [extractor] 

186 columns = columns[columns.isin(extractor, level=0)] 

187 return columns.get_level_values(level=1).unique().to_list() 

188 

189 def get_value( 

190 self: FeatureDataFrame, 

191 instance: str, 

192 extractor: str, 

193 feature_group: str, 

194 feature_name: str, 

195 ) -> float: 

196 """Return a value in the dataframe.""" 

197 # return self.loc[(feature_group, feature_name, extractor), instance] 

198 return self.loc[instance, (extractor, feature_group, feature_name)] 

199 

200 def set_value( 

201 self: FeatureDataFrame, 

202 instance: str, 

203 extractor: str, 

204 feature_group: str, 

205 feature_name: str | list[str], 

206 value: float | list[float], 

207 append_write_csv: bool = False, 

208 ) -> None: 

209 """Set a value in the dataframe.""" 

210 if isinstance(feature_name, list) and isinstance(value, list): 

211 if len(feature_name) != len(value): 

212 raise ValueError( 

213 f"feature_name and values must be the same length ({len(feature_name)}, {len(value)})." 

214 ) 

215 elif isinstance(feature_name, list) or isinstance(value, list): 

216 raise ValueError( 

217 f"feature_name parameter and value must be the same type ({type(feature_name)}, {type(value)})." 

218 ) 

219 # self.loc[(feature_group, feature_name, extractor), instance] = value 

220 self.loc[instance, (extractor, feature_group, feature_name)] = value 

221 if append_write_csv: 

222 writeable = self.loc[[instance], :] # Take line 

223 # Append the new rows to the dataframe csv file 

224 import os 

225 

226 csv_string = writeable.to_csv(header=False) # Convert to the csv lines 

227 for line in csv_string.splitlines(): # Should be only one line, but is safe now if we were to do multiple values 

228 fd = os.open(f"{self.csv_filepath}", os.O_WRONLY | os.O_APPEND) 

229 os.write(fd, f"{line}\n".encode("utf-8")) # Encode to create buffer 

230 # Open and close for each line to minimise possibilities of conflict 

231 os.close(fd) 

232 

233 def has_missing_vectors(self: FeatureDataFrame) -> bool: 

234 """Returns True if there are any Extractors still to be run on any instance.""" 

235 for extractor in self.extractors: 

236 if ( 

237 self[extractor].isnull().all().all() 

238 ): # First all for the column, second all for the feature groups 

239 return True 

240 return False 

241 

242 def remaining_jobs(self: FeatureDataFrame) -> list[tuple[str, str, str]]: 

243 """Determines needed feature computations per instance/extractor/group. 

244 

245 Returns: 

246 list: A list of tuples representing (Instance, Extractor, Feature Group). 

247 that needs to be computed. 

248 """ 

249 remaining_jobs = [] 

250 for extractor, group, _ in self.columns: 

251 if ( 

252 extractor == str(FeatureDataFrame.missing_value) 

253 or extractor == FeatureDataFrame.missing_value 

254 ): 

255 continue 

256 for instance in self.index: 

257 if self.loc[instance, (extractor, group, slice(None))].isnull().all(): 

258 remaining_jobs.append((instance, extractor, group)) 

259 return list(set(remaining_jobs)) # Filter duplicates 

260 

261 def get_instance( 

262 self: FeatureDataFrame, instance: str, as_dataframe: bool = False 

263 ) -> list[float]: 

264 """Return the feature vector of an instance.""" 

265 if as_dataframe: 

266 return self.loc[[instance]] 

267 return self.loc[instance].tolist() 

268 

269 def impute_missing_values(self: FeatureDataFrame) -> None: 

270 """Imputes all NaN values by taking the average feature value.""" 

271 # imputed_df = self.T.fillna(self.mean(axis=1)).T 

272 imputed_df = self.fillna(self.mean(axis=0)) 

273 self[:] = imputed_df.values 

274 

275 def has_missing_value(self: FeatureDataFrame) -> bool: 

276 """Return whether there are missing values in the feature data.""" 

277 return self.isnull().any().any() 

278 

279 def reset_dataframe(self: FeatureDataFrame) -> bool: 

280 """Resets all values to FeatureDataFrame.missing_value.""" 

281 self.loc[:, (slice(None), slice(None), slice(None))] = ( 

282 FeatureDataFrame.missing_value 

283 ) 

284 

285 def sort(self: FeatureDataFrame) -> None: 

286 """Sorts the DataFrame by Multi-Index for readability.""" 

287 self.sort_index(inplace=True) 

288 

289 @property 

290 def instances(self: FeatureDataFrame) -> list[str]: 

291 """Return the instances in the dataframe.""" 

292 return self.index 

293 

294 @property 

295 def extractors(self: FeatureDataFrame) -> list[str]: 

296 """Returns all unique extractors in the DataFrame.""" 

297 return [ 

298 x 

299 for x in self.columns.get_level_values( 

300 FeatureDataFrame.extractor_dim 

301 ).unique() 

302 if str(x) != str(FeatureDataFrame.missing_value) 

303 ] 

304 

305 @property 

306 def num_features(self: FeatureDataFrame) -> int: 

307 """Return the number of features in the dataframe.""" 

308 # return self.shape[0] 

309 return self.shape[1] 

310 

311 @property 

312 def num_instances(self: FeatureDataFrame) -> int: 

313 """Return the number of instances in the dataframe.""" 

314 # return self.shape[1] 

315 return self.shape[0] 

316 

317 @property 

318 def num_extractors(self: FeatureDataFrame) -> int: 

319 """Return the number of extractors in the dataframe.""" 

320 return self.columns.get_level_values("Extractor").unique().size 

321 

322 @property 

323 def features(self: FeatureDataFrame) -> list[str]: 

324 """Return the features in the dataframe.""" 

325 # return self.index.get_level_values("FeatureName").unique().to_list() 

326 return self.columns.get_level_values("FeatureName").unique().to_list() 

327 

328 def save_csv(self: FeatureDataFrame, csv_filepath: Path = None) -> None: 

329 """Write a CSV to the given path. 

330 

331 Args: 

332 csv_filepath: String path to the csv file. Defaults to self.csv_filepath. 

333 """ 

334 csv_filepath = self.csv_filepath if csv_filepath is None else csv_filepath 

335 if csv_filepath is None: 

336 raise ValueError("Cannot save DataFrame: no `csv_filepath` was provided.") 

337 self.sort_index(inplace=True) 

338 self.to_csv(csv_filepath)