Coverage for sparkle/selector/extractor.py: 55%

78 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-07-01 13:21 +0000

1"""Methods regarding feature extractors.""" 

2from __future__ import annotations 

3from typing import Any 

4from pathlib import Path 

5import ast 

6import subprocess 

7 

8import runrunner as rrr 

9from runrunner.base import Status, Runner 

10from runrunner.local import LocalRun 

11 

12from sparkle.types import SparkleCallable, SolverStatus 

13from sparkle.structures import FeatureDataFrame 

14from sparkle.tools import RunSolver 

15 

16 

17class Extractor(SparkleCallable): 

18 """Extractor base class for extracting features from instances.""" 

19 wrapper = "sparkle_extractor_wrapper.py" 

20 

21 def __init__(self: Extractor, 

22 directory: Path, 

23 runsolver_exec: Path = None) -> None: 

24 """Initialize solver. 

25 

26 Args: 

27 directory: Directory of the solver. 

28 runsolver_exec: Path to the runsolver executable. 

29 By default, runsolver in directory. 

30 """ 

31 super().__init__(directory, runsolver_exec) 

32 self._features = None 

33 self._feature_groups = None 

34 self._groupwise_computation = None 

35 

36 def __str__(self: Extractor) -> str: 

37 """Return the string representation of the extractor.""" 

38 return self.name 

39 

40 def __repr__(self: Extractor) -> str: 

41 """Return detailed representation of the extractor.""" 

42 return f"{self.name}:\n"\ 

43 f"\t- Directory: {self.directory}\n"\ 

44 f"\t- Wrapper: {self.wrapper}\n"\ 

45 f"\t- # Feature Groups: {len(self.feature_groups)}\n"\ 

46 f"\t- Output Dimension (# Features): {self.output_dimension}\n"\ 

47 f"\t- Groupwise Computation Enabled: {self.groupwise_computation}" 

48 

49 @property 

50 def features(self: Extractor) -> list[tuple[str, str]]: 

51 """Determines the features of the extractor.""" 

52 if self._features is None: 

53 extractor_process = subprocess.run( 

54 [self.directory / Extractor.wrapper, "-features"], capture_output=True) 

55 self._features = ast.literal_eval(extractor_process.stdout.decode()) 

56 return self._features 

57 

58 @property 

59 def feature_groups(self: Extractor) -> list[str]: 

60 """Returns the various feature groups the Extractor has.""" 

61 if self._feature_groups is None: 

62 self._feature_groups = list(set([group for group, _ in self.features])) 

63 return self._feature_groups 

64 

65 @property 

66 def output_dimension(self: Extractor) -> int: 

67 """The size of the output vector of the extractor.""" 

68 return len(self.features) 

69 

70 @property 

71 def groupwise_computation(self: Extractor) -> bool: 

72 """Determines if you can call the extractor per group for parallelisation.""" 

73 if self._groupwise_computation is None: 

74 extractor_help = subprocess.run([self.directory / Extractor.wrapper, "-h"], 

75 capture_output=True) 

76 # Not the cleanest / most precise way to determine this 

77 self._groupwise_computation =\ 

78 "-feature_group" in extractor_help.stdout.decode() 

79 return self._groupwise_computation 

80 

81 def build_cmd(self: Extractor, 

82 instance: Path | list[Path], 

83 feature_group: str = None, 

84 output_file: Path = None, 

85 cutoff_time: int = None, 

86 log_dir: Path = None, 

87 ) -> list[str]: 

88 """Builds a command line string seperated by space. 

89 

90 Args: 

91 instance: The instance to run on 

92 feature_group: The optional feature group to run the extractor for. 

93 outputfile: Optional file to write the output to. 

94 runsolver_args: The arguments for runsolver. If not present, 

95 will run the extractor without runsolver. 

96 

97 Returns: 

98 The command seperated per item in the list. 

99 """ 

100 cmd_list_extractor = [] 

101 if not isinstance(instance, list): 

102 instance = [instance] 

103 cmd_list_extractor = [f"{self.directory / Extractor.wrapper}", 

104 "-extractor_dir", f"{self.directory}/", 

105 "-instance_file"] + [str(file) for file in instance] 

106 if feature_group is not None: 

107 cmd_list_extractor += ["-feature_group", feature_group] 

108 if output_file is not None: 

109 cmd_list_extractor += ["-output_file", str(output_file)] 

110 if cutoff_time is not None: 

111 # Extractor handles output file itself 

112 return RunSolver.wrap_command(self.runsolver_exec, 

113 cmd_list_extractor, 

114 cutoff_time, 

115 log_dir, 

116 log_name_base=self.name, 

117 raw_results_file=False) 

118 return cmd_list_extractor 

119 

120 def run(self: Extractor, 

121 instance: Path | list[Path], 

122 feature_group: str = None, 

123 output_file: Path = None, 

124 cutoff_time: int = None, 

125 log_dir: Path = None) -> list[list[Any]] | list[Any] | None: 

126 """Runs an extractor job with Runrunner. 

127 

128 Args: 

129 extractor_path: Path to the executable 

130 instance: Path to the instance to run on 

131 feature_group: The feature group to compute. Must be supported by the 

132 extractor to use. 

133 output_file: Target output. If None, piped to the RunRunner job. 

134 cutoff_time: CPU cutoff time in seconds 

135 log_dir: Directory to write logs. Defaults to CWD. 

136 

137 Returns: 

138 The features or None if an output file is used, or features can not be found. 

139 """ 

140 log_dir = Path() if log_dir is None else log_dir 

141 if feature_group is not None and not self.groupwise_computation: 

142 # This extractor cannot handle groups, compute all features 

143 feature_group = None 

144 cmd_extractor = self.build_cmd( 

145 instance, feature_group, output_file, cutoff_time, log_dir) 

146 run_on = Runner.LOCAL # TODO: Let this function also handle Slurm runs 

147 extractor_run = rrr.add_to_queue(runner=run_on, 

148 cmd=" ".join(cmd_extractor)) 

149 if isinstance(extractor_run, LocalRun): 

150 extractor_run.wait() 

151 if extractor_run.status == Status.ERROR: 

152 print(f"{self.name} failed to compute features for {instance}.") 

153 for i, job in enumerate(extractor_run.jobs): 

154 print(f"Job {i} error yielded was:\n" 

155 f"\t-stdout: '{job.stdout}'\n" 

156 f"\t-stderr: '{job.stderr}'\n") 

157 return None 

158 # RunRunner adds a stamp before the statement 

159 output = [ast.literal_eval(job.stdout.split("\t", maxsplit=1)[-1]) 

160 for job in extractor_run.jobs] 

161 if len(output) == 1: 

162 return output[0] 

163 return output 

164 return None 

165 

166 def get_feature_vector(self: Extractor, 

167 result: Path, 

168 runsolver_values: Path = None) -> list[str]: 

169 """Extracts feature vector from an output file. 

170 

171 Args: 

172 result: The raw output of the extractor 

173 runsolver_values: The output of runsolver. 

174 

175 Returns: 

176 A list of features. Vector of missing values upon failure. 

177 """ 

178 if result.exists() and RunSolver.get_status(runsolver_values, 

179 None) != SolverStatus.TIMEOUT: 

180 feature_values = ast.literal_eval(result.read_text()) 

181 return [str(value) for _, _, value in feature_values] 

182 return [FeatureDataFrame.missing_value] * self.output_dimension