Coverage for sparkle/solver/extractor.py: 52%

66 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-07 15:22 +0000

1"""Methods regarding feature extractors.""" 

2from __future__ import annotations 

3from pathlib import Path 

4import ast 

5import subprocess 

6from sparkle.types import SparkleCallable, SolverStatus 

7from sparkle.structures import FeatureDataFrame 

8from sparkle.tools import RunSolver 

9 

10 

11class Extractor(SparkleCallable): 

12 """Extractor base class for extracting features from instances.""" 

13 wrapper = "sparkle_extractor_wrapper.py" 

14 

15 def __init__(self: Extractor, 

16 directory: Path, 

17 runsolver_exec: Path = None, 

18 raw_output_directory: Path = None, 

19 ) -> None: 

20 """Initialize solver. 

21 

22 Args: 

23 directory: Directory of the solver. 

24 runsolver_exec: Path to the runsolver executable. 

25 By default, runsolver in directory. 

26 raw_output_directory: Directory where solver will write its raw output. 

27 Defaults to directory / tmp 

28 """ 

29 super().__init__(directory, runsolver_exec, raw_output_directory) 

30 self._features = None 

31 self._feature_groups = None 

32 self._output_dimension = None 

33 self._groupwise_computation = None 

34 

35 @property 

36 def features(self: Extractor) -> list[tuple[str, str]]: 

37 """Determines the features of the extractor.""" 

38 if self._features is None: 

39 extractor_process = subprocess.run( 

40 [self.directory / Extractor.wrapper, "-features"], capture_output=True) 

41 self._features = ast.literal_eval(extractor_process.stdout.decode()) 

42 return self._features 

43 

44 @property 

45 def feature_groups(self: Extractor) -> list[str]: 

46 """Returns the various feature groups the Extractor has.""" 

47 if self._feature_groups is None: 

48 self._feature_groups = list(set([group for group, _ in self.features])) 

49 return self._feature_groups 

50 

51 @property 

52 def output_dimension(self: Extractor) -> int: 

53 """The size of the output vector of the extractor.""" 

54 return len(self.features) 

55 

56 @property 

57 def groupwise_computation(self: Extractor) -> bool: 

58 """Determines if you can call the extractor per group for parallelisation.""" 

59 if self._groupwise_computation is None: 

60 extractor_help = subprocess.run([self.directory / Extractor.wrapper, "-h"], 

61 capture_output=True) 

62 # Not the cleanest / most precise way to determine this 

63 self._groupwise_computation =\ 

64 "-feature_group" in extractor_help.stdout.decode() 

65 return self._groupwise_computation 

66 

67 def build_cmd(self: Extractor, 

68 instance: Path | list[Path], 

69 feature_group: str = None, 

70 output_file: Path = None, 

71 cutoff_time: int = None, 

72 log_dir: Path = None, 

73 ) -> list[str]: 

74 """Builds a command line string seperated by space. 

75 

76 Args: 

77 instance: The instance to run on 

78 feature_group: The optional feature group to run the extractor for. 

79 outputfile: Optional file to write the output to. 

80 runsolver_args: The arguments for runsolver. If not present, 

81 will run the extractor without runsolver. 

82 

83 Returns: 

84 The command seperated per item in the list. 

85 """ 

86 cmd_list_extractor = [] 

87 if not isinstance(instance, list): 

88 instance = [instance] 

89 cmd_list_extractor = [f"{self.directory / Extractor.wrapper}", 

90 "-extractor_dir", f"{self.directory}/", 

91 "-instance_file"] + [str(file) for file in instance] 

92 if feature_group is not None: 

93 cmd_list_extractor += ["-feature_group", feature_group] 

94 if output_file is not None: 

95 cmd_list_extractor += ["-output_file", str(output_file)] 

96 if cutoff_time is not None: 

97 # Extractor handles output file itself 

98 return RunSolver.wrap_command(self.runsolver_exec, 

99 cmd_list_extractor, 

100 cutoff_time, 

101 log_dir, 

102 log_name_base=self.name, 

103 raw_results_file=False) 

104 return cmd_list_extractor 

105 

106 def run(self: Extractor, 

107 instance: Path | list[Path], 

108 feature_group: str = None, 

109 output_file: Path = None, 

110 cutoff_time: int = None, 

111 log_dir: Path = None) -> list | None: 

112 """Runs an extractor job with Runrunner. 

113 

114 Args: 

115 extractor_path: Path to the executable 

116 instance: Path to the instance to run on 

117 feature_group: The feature group to compute. Must be supported by the 

118 extractor to use. 

119 output_file: Target output. If None, piped to the RunRunner job. 

120 cutoff_time: CPU cutoff time in seconds 

121 log_dir: Directory to write logs. Defaults to self.raw_output_directory. 

122 

123 Returns: 

124 The features or None if an output file is used, or features can not be found. 

125 """ 

126 if log_dir is None: 

127 log_dir = self.raw_output_directory 

128 if feature_group is not None and not self.groupwise_computation: 

129 # This extractor cannot handle groups, compute all features 

130 feature_group = None 

131 cmd_extractor = self.build_cmd( 

132 instance, feature_group, output_file, cutoff_time, log_dir) 

133 extractor = subprocess.run(cmd_extractor, capture_output=True) 

134 if output_file is None: 

135 try: 

136 features = ast.literal_eval( 

137 extractor.stdout.decode().split(maxsplit=1)[1]) 

138 return features 

139 except Exception: 

140 return None 

141 return None 

142 

143 def get_feature_vector(self: Extractor, 

144 result: Path, 

145 runsolver_values: Path = None) -> list[str]: 

146 """Extracts feature vector from an output file. 

147 

148 Args: 

149 result: The raw output of the extractor 

150 runsolver_values: The output of runsolver. 

151 

152 Returns: 

153 A list of features. Vector of missing values upon failure. 

154 """ 

155 if result.exists() and RunSolver.get_status(runsolver_values, 

156 None) != SolverStatus.TIMEOUT: 

157 feature_values = ast.literal_eval(result.read_text()) 

158 return [str(value) for _, _, value in feature_values] 

159 return [FeatureDataFrame.missing_value] * self.output_dimension