Coverage for sparkle/selector/extractor.py: 57%

100 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-09-29 10:17 +0000

1"""Methods regarding feature extractors.""" 

2 

3from __future__ import annotations 

4from typing import Any 

5from pathlib import Path 

6import ast 

7import subprocess 

8 

9import runrunner as rrr 

10from runrunner.base import Status, Runner 

11from runrunner.local import Run, LocalRun 

12 

13from sparkle.types import SparkleCallable, SolverStatus 

14from sparkle.structures import FeatureDataFrame 

15from sparkle.tools import RunSolver 

16from sparkle.instance import InstanceSet 

17 

18 

19class Extractor(SparkleCallable): 

20 """Extractor base class for extracting features from instances.""" 

21 

22 wrapper = "sparkle_extractor_wrapper.py" 

23 extractor_cli = Path(__file__).parent / "extractor_cli.py" 

24 

25 def __init__(self: Extractor, directory: Path) -> None: 

26 """Initialize solver. 

27 

28 Args: 

29 directory: Directory of the solver. 

30 runsolver_exec: Path to the runsolver executable. 

31 By default, runsolver in directory. 

32 """ 

33 super().__init__(directory) 

34 self._features = None 

35 self._feature_groups = None 

36 self._groupwise_computation = None 

37 

38 def __str__(self: Extractor) -> str: 

39 """Return the string representation of the extractor.""" 

40 return self.name 

41 

42 def __repr__(self: Extractor) -> str: 

43 """Return detailed representation of the extractor.""" 

44 return ( 

45 f"{self.name}:\n" 

46 f"\t- Directory: {self.directory}\n" 

47 f"\t- Wrapper: {self.wrapper}\n" 

48 f"\t- # Feature Groups: {len(self.feature_groups)}\n" 

49 f"\t- Output Dimension (# Features): {self.output_dimension}\n" 

50 f"\t- Groupwise Computation Enabled: {self.groupwise_computation}" 

51 ) 

52 

53 @property 

54 def features(self: Extractor) -> list[tuple[str, str]]: 

55 """Determines the features of the extractor.""" 

56 if self._features is None: 

57 extractor_process = subprocess.run( 

58 [self.directory / Extractor.wrapper, "-features"], capture_output=True 

59 ) 

60 self._features = ast.literal_eval(extractor_process.stdout.decode()) 

61 return self._features 

62 

63 @property 

64 def feature_groups(self: Extractor) -> list[str]: 

65 """Returns the various feature groups the Extractor has.""" 

66 if self._feature_groups is None: 

67 self._feature_groups = list(set([group for group, _ in self.features])) 

68 return self._feature_groups 

69 

70 @property 

71 def output_dimension(self: Extractor) -> int: 

72 """The size of the output vector of the extractor.""" 

73 return len(self.features) 

74 

75 @property 

76 def groupwise_computation(self: Extractor) -> bool: 

77 """Determines if you can call the extractor per group for parallelisation.""" 

78 if self._groupwise_computation is None: 

79 extractor_help = subprocess.run( 

80 [self.directory / Extractor.wrapper, "-h"], capture_output=True 

81 ) 

82 # Not the cleanest / most precise way to determine this 

83 self._groupwise_computation = ( 

84 "-feature_group" in extractor_help.stdout.decode() 

85 ) 

86 return self._groupwise_computation 

87 

88 def build_cmd( 

89 self: Extractor, 

90 instance: Path | list[Path], 

91 feature_group: str = None, 

92 output_file: Path = None, 

93 cutoff_time: int = None, 

94 log_dir: Path = None, 

95 ) -> list[str]: 

96 """Builds a command line string seperated by space. 

97 

98 Args: 

99 instance: The instance to run on 

100 feature_group: The optional feature group to run the extractor for. 

101 output_file: Optional file to write the output to. 

102 runsolver_args: The arguments for runsolver. If not present, 

103 will run the extractor without runsolver. 

104 cutoff_time: The maximum runtime. 

105 log_dir: Directory path for logs. 

106 

107 Returns: 

108 The command seperated per item in the list. 

109 """ 

110 cmd_list_extractor = [] 

111 if not isinstance(instance, list): 

112 instance = [instance] 

113 cmd_list_extractor = [ 

114 f"{self.directory / Extractor.wrapper}", 

115 "-extractor_dir", 

116 f"{self.directory}/", 

117 "-instance_file", 

118 ] + [str(file) for file in instance] 

119 if feature_group is not None: 

120 cmd_list_extractor += ["-feature_group", feature_group] 

121 if output_file is not None: 

122 cmd_list_extractor += ["-output_file", str(output_file)] 

123 if cutoff_time is not None: 

124 # Extractor handles output file itself 

125 return RunSolver.wrap_command( 

126 self.runsolver_exec, 

127 cmd_list_extractor, 

128 cutoff_time, 

129 log_dir, 

130 log_name_base=self.name, 

131 raw_results_file=False, 

132 ) 

133 return cmd_list_extractor 

134 

135 def run( 

136 self: Extractor, 

137 instance: Path | list[Path], 

138 feature_group: str = None, 

139 output_file: Path = None, 

140 cutoff_time: int = None, 

141 log_dir: Path = None, 

142 ) -> list[list[Any]] | list[Any] | None: 

143 """Runs an extractor job with Runrunner. 

144 

145 Args: 

146 extractor_path: Path to the executable 

147 instance: Path to the instance to run on 

148 feature_group: The feature group to compute. Must be supported by the 

149 extractor to use. 

150 output_file: Target output. If None, piped to the RunRunner job. 

151 cutoff_time: CPU cutoff time in seconds 

152 log_dir: Directory to write logs. Defaults to CWD. 

153 

154 Returns: 

155 The features or None if an output file is used, or features can not be found. 

156 """ 

157 log_dir = Path() if log_dir is None else log_dir 

158 if feature_group is not None and not self.groupwise_computation: 

159 # This extractor cannot handle groups, compute all features 

160 feature_group = None 

161 cmd_extractor = self.build_cmd( 

162 instance, feature_group, output_file, cutoff_time, log_dir 

163 ) 

164 run_on = Runner.LOCAL # TODO: Let this function also handle Slurm runs 

165 extractor_run = rrr.add_to_queue(runner=run_on, cmd=" ".join(cmd_extractor)) 

166 if isinstance(extractor_run, LocalRun): 

167 extractor_run.wait() 

168 if extractor_run.status == Status.ERROR: 

169 print(f"{self.name} failed to compute features for {instance}.") 

170 for i, job in enumerate(extractor_run.jobs): 

171 print( 

172 f"Job {i} error yielded was:\n" 

173 f"\t-stdout: '{job.stdout}'\n" 

174 f"\t-stderr: '{job.stderr}'\n" 

175 ) 

176 return None 

177 # RunRunner adds a stamp before the statement 

178 output = [ 

179 ast.literal_eval(job.stdout.split("\t", maxsplit=1)[-1]) 

180 for job in extractor_run.jobs 

181 ] 

182 if len(output) == 1: 

183 return output[0] 

184 return output 

185 return None 

186 

187 def run_cli( 

188 self: Extractor, 

189 instance_set: InstanceSet | list[Path], 

190 feature_dataframe: FeatureDataFrame, 

191 cutoff_time: int, 

192 feature_group: str = None, 

193 run_on: Runner = Runner.SLURM, 

194 sbatch_options: list[str] = None, 

195 srun_options: list[str] = None, 

196 parallel_jobs: int = None, 

197 slurm_prepend: str | list[str] | Path = None, 

198 dependencies: list[Run] = None, 

199 log_dir: Path = None, 

200 ) -> None: 

201 """Run the Extractor CLI and write result to the FeatureDataFrame. 

202 

203 Args: 

204 instance_set: The instance set to run the Extractor on. 

205 feature_dataframe: The feature dataframe to write to. 

206 cutoff_time: CPU cutoff time in seconds 

207 feature_group: The feature group to compute. If left empty, 

208 will run on all feature groups. 

209 run_on: The runner to use. 

210 sbatch_options: Additional options to pass to sbatch. 

211 srun_options: Additional options to pass to srun. 

212 parallel_jobs: Number of parallel jobs to run. 

213 slurm_prepend: Slurm script to prepend to the sbatch 

214 dependencies: List of dependencies to add to the job. 

215 log_dir: The directory to write logs to. 

216 """ 

217 instances = ( 

218 instance_set 

219 if isinstance(instance_set, list) 

220 else instance_set.instance_paths 

221 ) 

222 log_dir = Path() if log_dir is None else log_dir 

223 feature_group = f"--feature-group {feature_group} " if feature_group else "" 

224 commands = [ 

225 f"python3 {Extractor.extractor_cli} " 

226 f"--extractor {self.directory} " 

227 f"--instance {instance_path} " 

228 f"--feature-csv {feature_dataframe.csv_filepath} " 

229 f"{feature_group}" 

230 f"--cutoff {cutoff_time} " 

231 f"--log-dir {log_dir}" 

232 for instance_path in instances 

233 ] 

234 

235 job_name = f"Run Extractor: {self.name} on {len(instances)} instances" 

236 import subprocess 

237 

238 run = rrr.add_to_queue( 

239 runner=run_on, 

240 cmd=commands, 

241 name=job_name, 

242 stdout=None if run_on == Runner.LOCAL else subprocess.PIPE, # Print 

243 stderr=None if run_on == Runner.LOCAL else subprocess.PIPE, # Print 

244 base_dir=log_dir, 

245 sbatch_options=sbatch_options, 

246 srun_options=srun_options, 

247 parallel_jobs=parallel_jobs, 

248 prepend=slurm_prepend, 

249 dependencies=dependencies, 

250 ) 

251 if isinstance(run, LocalRun): 

252 print("Waiting for the local calculations to finish.") 

253 run.wait() 

254 for job in run.jobs: 

255 jobs_done = sum(j.status == Status.COMPLETED for j in run.jobs) 

256 print(f"Executing Progress: {jobs_done} out of {len(run.jobs)}") 

257 if jobs_done == len(run.jobs): 

258 break 

259 job.wait() 

260 print("Computing features done!") 

261 else: 

262 print(f"Running {self.name} through Slurm with Job IDs: {run.run_id}") 

263 return run 

264 

265 def get_feature_vector( 

266 self: Extractor, result: Path, runsolver_values: Path = None 

267 ) -> list[str]: 

268 """Extracts feature vector from an output file. 

269 

270 Args: 

271 result: The raw output of the extractor 

272 runsolver_values: The output of runsolver. 

273 

274 Returns: 

275 A list of features. Vector of missing values upon failure. 

276 """ 

277 if ( 

278 result.exists() 

279 and RunSolver.get_status(runsolver_values, None) != SolverStatus.TIMEOUT 

280 ): 

281 feature_values = ast.literal_eval(result.read_text()) 

282 return [str(value) for _, _, value in feature_values] 

283 return [FeatureDataFrame.missing_value] * self.output_dimension