Coverage for src / sparkle / structures / feature_dataframe.py: 86%
134 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 15:31 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 15:31 +0000
1"""Module to manage feature data files and common operations on them."""
3from __future__ import annotations
4import math
5from pathlib import Path
7import pandas as pd
10class FeatureDataFrame(pd.DataFrame):
11 """Class to manage feature data CSV files and common operations on them."""
13 missing_value = math.nan
14 extractor_dim = "Extractor"
15 feature_group_dim = "FeatureGroup"
16 feature_name_dim = "FeatureName"
17 instances_index_dim = "Instances"
18 multi_dim_column_names = [extractor_dim, feature_group_dim, feature_name_dim]
20 def __init__(
21 self: FeatureDataFrame,
22 csv_filepath: Path,
23 instances: list[str] = [],
24 extractor_data: dict[str, list[tuple[str, str]]] = {},
25 ) -> None:
26 """Initialise a FeatureDataFrame object.
28 Arguments:
29 csv_filepath: The Path for the CSV storage. If it does not exist,
30 a new DataFrame will be initialised and stored here.
31 instances: The list of instances (Columns) to be added to the DataFrame.
32 extractor_data: A dictionary with extractor names as key, and a list of
33 tuples ordered as [(feature_group, feature_name), ...] as value.
34 """
35 # Initialize a dataframe from an existing file
36 if csv_filepath.exists():
37 # Read from file
38 temp_df = pd.read_csv(
39 csv_filepath,
40 # index_col=FeatureDataFrame.multi_dim_names,
41 header=[0, 1, 2],
42 index_col=[0],
43 dtype={
44 FeatureDataFrame.extractor_dim: str,
45 FeatureDataFrame.feature_group_dim: str,
46 FeatureDataFrame.feature_name_dim: str,
47 FeatureDataFrame.instances_index_dim: str,
48 },
49 on_bad_lines="skip",
50 skip_blank_lines=True,
51 )
52 super().__init__(temp_df)
53 self.index.name = FeatureDataFrame.instances_index_dim
54 self.csv_filepath = csv_filepath
55 # Create a new dataframe
56 else:
57 # Unfold the extractor_data into lists
58 if extractor_data:
59 multi_column_lists = [
60 (extractor, group, feature_name)
61 for extractor in extractor_data
62 for group, feature_name in extractor_data[extractor]
63 ]
64 else:
65 multi_column_lists = [
66 (
67 FeatureDataFrame.missing_value,
68 FeatureDataFrame.missing_value,
69 FeatureDataFrame.feature_name_dim,
70 )
71 ]
72 # Initialise new dataframe
73 multi_columns = pd.MultiIndex.from_tuples(
74 multi_column_lists, names=self.multi_dim_column_names
75 )
76 super().__init__(
77 data=self.missing_value,
78 index=instances,
79 columns=multi_columns,
80 dtype=float,
81 )
82 self.index.name = FeatureDataFrame.instances_index_dim
83 self.csv_filepath = csv_filepath
84 self.save_csv()
86 if self.index.duplicated().any(): # Drop all duplicates except for last
87 self.reset_index(inplace=True) # Reset index to column
88 self.drop_duplicates(
89 subset=self.columns[0], keep="last", inplace=True
90 ) # filter duplicates from index column
91 self.set_index(
92 self.columns[0], inplace=True
93 ) # Restore the Instance Index (in-place)
94 self.index.name = FeatureDataFrame.instances_index_dim
96 # Sort the index to optimize lookup speed
97 self.sort_index(axis=0, inplace=True)
98 self.sort_index(axis=1, inplace=True)
100 def add_extractor(
101 self: FeatureDataFrame,
102 extractor: str,
103 extractor_features: list[tuple[str, str]],
104 values: list[list[float]] = None,
105 ) -> None:
106 """Add an extractor and its feature names to the dataframe.
108 Arguments:
109 extractor: Name of the extractor
110 extractor_features: Tuples of [FeatureGroup, FeatureName]
111 values: Initial values of the Extractor per instance in the dataframe.
112 Defaults to FeatureDataFrame.missing_value.
113 """
114 if extractor in self.extractors:
115 print(
116 f"WARNING: Tried adding already existing extractor {extractor} to "
117 f"Feature DataFrame: {self.csv_filepath}"
118 )
119 return
120 if values is None:
121 values = [self.missing_value] * len(
122 extractor_features
123 ) # Single missing value for each feature
124 extractor_dim = self.columns.get_level_values(FeatureDataFrame.extractor_dim)
125 # Unfold to indices to lists
126 for index, (feature_group, feature) in enumerate(extractor_features):
127 self[(extractor, feature_group, feature)] = values[index]
128 if self.num_extractors > 1:
129 # Upon successfull adding of the extractor, remove the nan extractor
130 if str(math.nan) in extractor_dim:
131 self.drop(
132 str(math.nan),
133 axis=1,
134 level=FeatureDataFrame.extractor_dim,
135 inplace=True,
136 )
137 elif math.nan in extractor_dim:
138 self.drop(
139 math.nan, axis=1, level=FeatureDataFrame.extractor_dim, inplace=True
140 )
142 def add_instances(
143 self: FeatureDataFrame, instance: str | list[str], values: list[float] = None
144 ) -> None:
145 """Add one or more instances to the dataframe."""
146 if values is None:
147 values = FeatureDataFrame.missing_value
148 if isinstance(instance, str):
149 instance = [instance]
150 # with warnings.catch_warnings(): # Block Pandas Performance Warnings
151 # warnings.simplefilter(action="ignore", category=pd.errors.PerformanceWarning)
152 for i in instance:
153 self.loc[i] = values
155 def remove_extractor(self: FeatureDataFrame, extractor: str) -> None:
156 """Remove an extractor from the dataframe."""
157 self.drop(extractor, axis=1, level=FeatureDataFrame.extractor_dim, inplace=True)
158 # if self.num_extractors == 0:
159 if self.num_extractors == 0: # make sure we have atleast one 'extractor'
160 self.add_extractor(
161 str(FeatureDataFrame.missing_value),
162 [(FeatureDataFrame.missing_value, FeatureDataFrame.feature_name_dim)],
163 )
165 def remove_instances(self: FeatureDataFrame, instances: str | list[str]) -> None:
166 """Remove an instance from the dataframe."""
167 # self.drop(instances, axis=1, inplace=True)
168 self.drop(instances, axis=0, inplace=True)
170 def get_feature_groups(
171 self: FeatureDataFrame, extractor: str | list[str] = None
172 ) -> list[str]:
173 """Retrieve the feature groups in the dataframe.
175 Args:
176 extractor: Optional. If extractor(s) are given,
177 yields only feature groups of that extractor.
179 Returns:
180 A list of feature groups.
181 """
182 columns = self.columns
183 if extractor is not None:
184 if isinstance(extractor, str):
185 extractor = [extractor]
186 columns = columns[columns.isin(extractor, level=0)]
187 return columns.get_level_values(level=1).unique().to_list()
189 def get_value(
190 self: FeatureDataFrame,
191 instance: str,
192 extractor: str,
193 feature_group: str,
194 feature_name: str,
195 ) -> float:
196 """Return a value in the dataframe."""
197 # return self.loc[(feature_group, feature_name, extractor), instance]
198 return self.loc[instance, (extractor, feature_group, feature_name)]
200 def set_value(
201 self: FeatureDataFrame,
202 instance: str,
203 extractor: str,
204 feature_group: str,
205 feature_name: str | list[str],
206 value: float | list[float],
207 append_write_csv: bool = False,
208 ) -> None:
209 """Set a value in the dataframe."""
210 if isinstance(feature_name, list) and isinstance(value, list):
211 if len(feature_name) != len(value):
212 raise ValueError(
213 f"feature_name and values must be the same length ({len(feature_name)}, {len(value)})."
214 )
215 elif isinstance(feature_name, list) or isinstance(value, list):
216 raise ValueError(
217 f"feature_name parameter and value must be the same type ({type(feature_name)}, {type(value)})."
218 )
219 # self.loc[(feature_group, feature_name, extractor), instance] = value
220 self.loc[instance, (extractor, feature_group, feature_name)] = value
221 if append_write_csv:
222 writeable = self.loc[[instance], :] # Take line
223 # Append the new rows to the dataframe csv file
224 import os
226 csv_string = writeable.to_csv(header=False) # Convert to the csv lines
227 for line in csv_string.splitlines(): # Should be only one line, but is safe now if we were to do multiple values
228 fd = os.open(f"{self.csv_filepath}", os.O_WRONLY | os.O_APPEND)
229 os.write(fd, f"{line}\n".encode("utf-8")) # Encode to create buffer
230 # Open and close for each line to minimise possibilities of conflict
231 os.close(fd)
233 def has_missing_vectors(self: FeatureDataFrame) -> bool:
234 """Returns True if there are any Extractors still to be run on any instance."""
235 for extractor in self.extractors:
236 if (
237 self[extractor].isnull().all().all()
238 ): # First all for the column, second all for the feature groups
239 return True
240 return False
242 def remaining_jobs(self: FeatureDataFrame) -> list[tuple[str, str, str]]:
243 """Determines needed feature computations per instance/extractor/group.
245 Returns:
246 list: A list of tuples representing (Instance, Extractor, Feature Group).
247 that needs to be computed.
248 """
249 remaining_jobs = []
250 for extractor, group, _ in self.columns:
251 if (
252 extractor == str(FeatureDataFrame.missing_value)
253 or extractor == FeatureDataFrame.missing_value
254 ):
255 continue
256 for instance in self.index:
257 if self.loc[instance, (extractor, group, slice(None))].isnull().all():
258 remaining_jobs.append((instance, extractor, group))
259 return list(set(remaining_jobs)) # Filter duplicates
261 def get_instance(
262 self: FeatureDataFrame, instance: str, as_dataframe: bool = False
263 ) -> list[float]:
264 """Return the feature vector of an instance."""
265 if as_dataframe:
266 return self.loc[[instance]]
267 return self.loc[instance].tolist()
269 def impute_missing_values(self: FeatureDataFrame) -> None:
270 """Imputes all NaN values by taking the average feature value."""
271 # imputed_df = self.T.fillna(self.mean(axis=1)).T
272 imputed_df = self.fillna(self.mean(axis=0))
273 self[:] = imputed_df.values
275 def has_missing_value(self: FeatureDataFrame) -> bool:
276 """Return whether there are missing values in the feature data."""
277 return self.isnull().any().any()
279 def reset_dataframe(self: FeatureDataFrame) -> bool:
280 """Resets all values to FeatureDataFrame.missing_value."""
281 self.loc[:, (slice(None), slice(None), slice(None))] = (
282 FeatureDataFrame.missing_value
283 )
285 def sort(self: FeatureDataFrame) -> None:
286 """Sorts the DataFrame by Multi-Index for readability."""
287 self.sort_index(inplace=True)
289 @property
290 def instances(self: FeatureDataFrame) -> list[str]:
291 """Return the instances in the dataframe."""
292 return self.index
294 @property
295 def extractors(self: FeatureDataFrame) -> list[str]:
296 """Returns all unique extractors in the DataFrame."""
297 return [
298 x
299 for x in self.columns.get_level_values(
300 FeatureDataFrame.extractor_dim
301 ).unique()
302 if str(x) != str(FeatureDataFrame.missing_value)
303 ]
305 @property
306 def num_features(self: FeatureDataFrame) -> int:
307 """Return the number of features in the dataframe."""
308 # return self.shape[0]
309 return self.shape[1]
311 @property
312 def num_instances(self: FeatureDataFrame) -> int:
313 """Return the number of instances in the dataframe."""
314 # return self.shape[1]
315 return self.shape[0]
317 @property
318 def num_extractors(self: FeatureDataFrame) -> int:
319 """Return the number of extractors in the dataframe."""
320 return self.columns.get_level_values("Extractor").unique().size
322 @property
323 def features(self: FeatureDataFrame) -> list[str]:
324 """Return the features in the dataframe."""
325 # return self.index.get_level_values("FeatureName").unique().to_list()
326 return self.columns.get_level_values("FeatureName").unique().to_list()
328 def save_csv(self: FeatureDataFrame, csv_filepath: Path = None) -> None:
329 """Write a CSV to the given path.
331 Args:
332 csv_filepath: String path to the csv file. Defaults to self.csv_filepath.
333 """
334 csv_filepath = self.csv_filepath if csv_filepath is None else csv_filepath
335 if csv_filepath is None:
336 raise ValueError("Cannot save DataFrame: no `csv_filepath` was provided.")
337 self.sort_index(inplace=True)
338 self.to_csv(csv_filepath)