Source code for sparkle.solver.extractor

"""Methods regarding feature extractors."""
from __future__ import annotations
from pathlib import Path
import ast
import subprocess
from sparkle.types import SparkleCallable, SolverStatus
from sparkle.structures import FeatureDataFrame
from sparkle.tools import RunSolver


[docs] class Extractor(SparkleCallable): """Extractor base class for extracting features from instances.""" wrapper = "sparkle_extractor_wrapper.py" def __init__(self: Extractor, directory: Path, runsolver_exec: Path = None, raw_output_directory: Path = None, ) -> None: """Initialize solver. Args: directory: Directory of the solver. runsolver_exec: Path to the runsolver executable. By default, runsolver in directory. raw_output_directory: Directory where solver will write its raw output. Defaults to directory / tmp """ super().__init__(directory, runsolver_exec, raw_output_directory) self._features = None self._feature_groups = None self._output_dimension = None self._groupwise_computation = None @property def features(self: Extractor) -> list[tuple[str, str]]: """Determines the features of the extractor.""" if self._features is None: extractor_process = subprocess.run( [self.directory / Extractor.wrapper, "-features"], capture_output=True) self._features = ast.literal_eval(extractor_process.stdout.decode()) return self._features @property def feature_groups(self: Extractor) -> list[str]: """Returns the various feature groups the Extractor has.""" if self._feature_groups is None: self._feature_groups = list(set([group for group, _ in self.features])) return self._feature_groups @property def output_dimension(self: Extractor) -> int: """The size of the output vector of the extractor.""" return len(self.features) @property def groupwise_computation(self: Extractor) -> bool: """Determines if you can call the extractor per group for parallelisation.""" if self._groupwise_computation is None: extractor_help = subprocess.run([self.directory / Extractor.wrapper, "-h"], capture_output=True) # Not the cleanest / most precise way to determine this self._groupwise_computation =\ "-feature_group" in extractor_help.stdout.decode() return self._groupwise_computation
[docs] def build_cmd(self: Extractor, instance: Path | list[Path], feature_group: str = None, output_file: Path = None, cutoff_time: int = None, log_dir: Path = None, ) -> list[str]: """Builds a command line string seperated by space. Args: instance: The instance to run on feature_group: The optional feature group to run the extractor for. outputfile: Optional file to write the output to. runsolver_args: The arguments for runsolver. If not present, will run the extractor without runsolver. Returns: The command seperated per item in the list. """ cmd_list_extractor = [] if not isinstance(instance, list): instance = [instance] cmd_list_extractor = [f"{self.directory / Extractor.wrapper}", "-extractor_dir", f"{self.directory}/", "-instance_file"] + [str(file) for file in instance] if feature_group is not None: cmd_list_extractor += ["-feature_group", feature_group] if output_file is not None: cmd_list_extractor += ["-output_file", str(output_file)] if cutoff_time is not None: # Extractor handles output file itself return RunSolver.wrap_command(self.runsolver_exec, cmd_list_extractor, cutoff_time, log_dir, log_name_base=self.name, raw_results_file=False) return cmd_list_extractor
[docs] def run(self: Extractor, instance: Path | list[Path], feature_group: str = None, output_file: Path = None, cutoff_time: int = None, log_dir: Path = None) -> list | None: """Runs an extractor job with Runrunner. Args: extractor_path: Path to the executable instance: Path to the instance to run on feature_group: The feature group to compute. Must be supported by the extractor to use. output_file: Target output. If None, piped to the RunRunner job. cutoff_time: CPU cutoff time in seconds log_dir: Directory to write logs. Defaults to self.raw_output_directory. Returns: The features or None if an output file is used, or features can not be found. """ if log_dir is None: log_dir = self.raw_output_directory if feature_group is not None and not self.groupwise_computation: # This extractor cannot handle groups, compute all features feature_group = None cmd_extractor = self.build_cmd( instance, feature_group, output_file, cutoff_time, log_dir) extractor = subprocess.run(cmd_extractor, capture_output=True) if output_file is None: try: features = ast.literal_eval( extractor.stdout.decode().split(maxsplit=1)[1]) return features except Exception: return None return None
[docs] def get_feature_vector(self: Extractor, result: Path, runsolver_values: Path = None) -> list[str]: """Extracts feature vector from an output file. Args: result: The raw output of the extractor runsolver_values: The output of runsolver. Returns: A list of features. Vector of missing values upon failure. """ if result.exists() and RunSolver.get_status(runsolver_values, None) != SolverStatus.TIMEOUT: feature_values = ast.literal_eval(result.read_text()) return [str(value) for _, _, value in feature_values] return [FeatureDataFrame.missing_value] * self.output_dimension