Coverage for src/sparkle/selector/extractor.py: 54%

106 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-15 14:11 +0000

1"""Methods regarding feature extractors.""" 

2 

3from __future__ import annotations 

4from typing import Any 

5from pathlib import Path 

6import ast 

7import subprocess 

8 

9import runrunner as rrr 

10from runrunner.base import Status, Runner 

11from runrunner.local import Run, LocalRun 

12 

13from sparkle.types import SparkleCallable, SolverStatus 

14from sparkle.structures import FeatureDataFrame 

15from sparkle.tools import RunSolver 

16from sparkle.instance import InstanceSet 

17 

18 

19class Extractor(SparkleCallable): 

20 """Extractor base class for extracting features from instances.""" 

21 

22 wrapper = "sparkle_extractor_wrapper.py" 

23 extractor_cli = Path(__file__).parent / "extractor_cli.py" 

24 

25 def __init__(self: Extractor, directory: Path) -> None: 

26 """Initialize solver. 

27 

28 Args: 

29 directory: Directory of the solver. 

30 runsolver_exec: Path to the runsolver executable. 

31 By default, runsolver in directory. 

32 """ 

33 super().__init__(directory) 

34 self._features = None 

35 self._feature_groups = None 

36 self._groupwise_computation = None 

37 

38 def __str__(self: Extractor) -> str: 

39 """Return the string representation of the extractor.""" 

40 return self.name 

41 

42 def __repr__(self: Extractor) -> str: 

43 """Return detailed representation of the extractor.""" 

44 return ( 

45 f"{self.name}:\n" 

46 f"\t- Directory: {self.directory}\n" 

47 f"\t- Wrapper: {self.wrapper}\n" 

48 f"\t- # Feature Groups: {len(self.feature_groups)}\n" 

49 f"\t- Output Dimension (# Features): {self.output_dimension}\n" 

50 f"\t- Groupwise Computation Enabled: {self.groupwise_computation}" 

51 ) 

52 

53 @property 

54 def features(self: Extractor) -> list[tuple[str, str]]: 

55 """Determines the features of the extractor.""" 

56 if self._features is None: 

57 extractor_process = subprocess.run( 

58 [self.directory / Extractor.wrapper, "-features"], capture_output=True 

59 ) 

60 self._features = ast.literal_eval(extractor_process.stdout.decode()) 

61 return self._features 

62 

63 @property 

64 def feature_groups(self: Extractor) -> list[str]: 

65 """Returns the various feature groups the Extractor has.""" 

66 if self._feature_groups is None: 

67 self._feature_groups = list(set([group for group, _ in self.features])) 

68 return self._feature_groups 

69 

70 @property 

71 def output_dimension(self: Extractor) -> int: 

72 """The size of the output vector of the extractor.""" 

73 return len(self.features) 

74 

75 @property 

76 def groupwise_computation(self: Extractor) -> bool: 

77 """Determines if you can call the extractor per group for parallelisation.""" 

78 if self._groupwise_computation is None: 

79 extractor_help = subprocess.run( 

80 [self.directory / Extractor.wrapper, "-h"], capture_output=True 

81 ) 

82 # Not the cleanest / most precise way to determine this 

83 self._groupwise_computation = ( 

84 "-feature_group" in extractor_help.stdout.decode() 

85 ) 

86 return self._groupwise_computation 

87 

88 def build_cmd( 

89 self: Extractor, 

90 instance: Path | list[Path], 

91 feature_group: str = None, 

92 output_file: Path = None, 

93 cutoff_time: int = None, 

94 log_dir: Path = None, 

95 ) -> list[str]: 

96 """Builds a command line string seperated by space. 

97 

98 Args: 

99 instance: The instance to run on 

100 feature_group: The optional feature group to run the extractor for. 

101 output_file: Optional file to write the output to. 

102 runsolver_args: The arguments for runsolver. If not present, 

103 will run the extractor without runsolver. 

104 cutoff_time: The maximum runtime. 

105 log_dir: Directory path for logs. 

106 

107 Returns: 

108 The command seperated per item in the list. 

109 """ 

110 cmd_list_extractor = [] 

111 if not isinstance(instance, list): 

112 instance = [instance] 

113 cmd_list_extractor = [ 

114 f"{self.directory / Extractor.wrapper}", 

115 "-extractor_dir", 

116 f"{self.directory}/", 

117 "-instance_file", 

118 ] + [str(file) for file in instance] 

119 if feature_group is not None: 

120 cmd_list_extractor += ["-feature_group", feature_group] 

121 if output_file is not None: 

122 cmd_list_extractor += ["-output_file", str(output_file)] 

123 if cutoff_time is not None: 

124 # Extractor handles output file itself 

125 return RunSolver.wrap_command( 

126 self.runsolver_exec, 

127 cmd_list_extractor, 

128 cutoff_time, 

129 log_dir, 

130 log_name_base=self.name, 

131 raw_results_file=False, 

132 ) 

133 return cmd_list_extractor 

134 

135 def run( 

136 self: Extractor, 

137 instance: Path | list[Path], 

138 feature_group: str = None, 

139 output_file: Path = None, 

140 cutoff_time: int = None, 

141 log_dir: Path = None, 

142 ) -> list[list[Any]] | list[Any] | None: 

143 """Runs an extractor job with Runrunner. 

144 

145 Args: 

146 extractor_path: Path to the executable 

147 instance: Path to the instance to run on 

148 feature_group: The feature group to compute. Must be supported by the 

149 extractor to use. 

150 output_file: Target output. If None, piped to the RunRunner job. 

151 cutoff_time: CPU cutoff time in seconds 

152 log_dir: Directory to write logs. Defaults to CWD. 

153 

154 Returns: 

155 The features or None if an output file is used, or features can not be found. 

156 """ 

157 log_dir = Path() if log_dir is None else log_dir 

158 if feature_group is not None and not self.groupwise_computation: 

159 # This extractor cannot handle groups, compute all features 

160 feature_group = None 

161 cmd_extractor = self.build_cmd( 

162 instance, feature_group, output_file, cutoff_time, log_dir 

163 ) 

164 run_on = Runner.LOCAL # TODO: Let this function also handle Slurm runs 

165 extractor_run = rrr.add_to_queue(runner=run_on, cmd=" ".join(cmd_extractor)) 

166 if isinstance(extractor_run, LocalRun): 

167 extractor_run.wait() 

168 if extractor_run.status == Status.ERROR: 

169 print(f"{self.name} failed to compute features for {instance}.") 

170 for i, job in enumerate(extractor_run.jobs): 

171 print( 

172 f"Job {i} error yielded was:\n" 

173 f"\t-stdout: '{job.stdout}'\n" 

174 f"\t-stderr: '{job.stderr}'\n" 

175 ) 

176 return None 

177 # RunRunner adds a timestamp before the statement 

178 import re 

179 

180 pattern = re.compile( 

181 r"^(?P<timestamp1>\d+\.\d+)\/(?P<timestamp2>\d+\.\d+)\s+(?P<output>.*)$" 

182 ) 

183 output = [] 

184 for job in extractor_run.jobs: 

185 match = pattern.match(job.stdout) 

186 if match: 

187 output.append(ast.literal_eval(match.group("output"))) 

188 if len(output) == 1: 

189 return output[0] 

190 return output 

191 return None 

192 

193 def run_cli( 

194 self: Extractor, 

195 instance_set: InstanceSet | list[Path], 

196 feature_dataframe: FeatureDataFrame, 

197 cutoff_time: int, 

198 feature_group: str = None, 

199 run_on: Runner = Runner.SLURM, 

200 sbatch_options: list[str] = None, 

201 srun_options: list[str] = None, 

202 parallel_jobs: int = None, 

203 slurm_prepend: str | list[str] | Path = None, 

204 dependencies: list[Run] = None, 

205 log_dir: Path = None, 

206 ) -> None: 

207 """Run the Extractor CLI and write result to the FeatureDataFrame. 

208 

209 Args: 

210 instance_set: The instance set to run the Extractor on. 

211 feature_dataframe: The feature dataframe to write to. 

212 cutoff_time: CPU cutoff time in seconds 

213 feature_group: The feature group to compute. If left empty, 

214 will run on all feature groups. 

215 run_on: The runner to use. 

216 sbatch_options: Additional options to pass to sbatch. 

217 srun_options: Additional options to pass to srun. 

218 parallel_jobs: Number of parallel jobs to run. 

219 slurm_prepend: Slurm script to prepend to the sbatch 

220 dependencies: List of dependencies to add to the job. 

221 log_dir: The directory to write logs to. 

222 """ 

223 instances = ( 

224 instance_set 

225 if isinstance(instance_set, list) 

226 else instance_set.instance_paths 

227 ) 

228 log_dir = Path() if log_dir is None else log_dir 

229 feature_group = f"--feature-group {feature_group} " if feature_group else "" 

230 commands = [ 

231 f"python3 {Extractor.extractor_cli} " 

232 f"--extractor {self.directory} " 

233 f"--instance {instance_path} " 

234 f"--feature-csv {feature_dataframe.csv_filepath} " 

235 f"{feature_group}" 

236 f"--cutoff {cutoff_time} " 

237 f"--log-dir {log_dir}" 

238 for instance_path in instances 

239 ] 

240 

241 job_name = f"Run Extractor: {self.name} on {len(instances)} instances" 

242 import subprocess 

243 

244 run = rrr.add_to_queue( 

245 runner=run_on, 

246 cmd=commands, 

247 name=job_name, 

248 stdout=None if run_on == Runner.LOCAL else subprocess.PIPE, # Print 

249 stderr=None if run_on == Runner.LOCAL else subprocess.PIPE, # Print 

250 base_dir=log_dir, 

251 sbatch_options=sbatch_options, 

252 srun_options=srun_options, 

253 parallel_jobs=parallel_jobs, 

254 prepend=slurm_prepend, 

255 dependencies=dependencies, 

256 ) 

257 if isinstance(run, LocalRun): 

258 print("Waiting for the local calculations to finish.") 

259 run.wait() 

260 for job in run.jobs: 

261 jobs_done = sum(j.status == Status.COMPLETED for j in run.jobs) 

262 print(f"Executing Progress: {jobs_done} out of {len(run.jobs)}") 

263 if jobs_done == len(run.jobs): 

264 break 

265 job.wait() 

266 print("Computing features done!") 

267 else: 

268 print(f"Running {self.name} through Slurm with Job IDs: {run.run_id}") 

269 return run 

270 

271 def get_feature_vector( 

272 self: Extractor, result: Path, runsolver_values: Path = None 

273 ) -> list[str]: 

274 """Extracts feature vector from an output file. 

275 

276 Args: 

277 result: The raw output of the extractor 

278 runsolver_values: The output of runsolver. 

279 

280 Returns: 

281 A list of features. Vector of missing values upon failure. 

282 """ 

283 if ( 

284 result.exists() 

285 and RunSolver.get_status(runsolver_values, None) != SolverStatus.TIMEOUT 

286 ): 

287 feature_values = ast.literal_eval(result.read_text()) 

288 return [str(value) for _, _, value in feature_values] 

289 return [FeatureDataFrame.missing_value] * self.output_dimension