Source code for sparkle.selector.extractor

"""Methods regarding feature extractors."""
from __future__ import annotations
from typing import Any
from pathlib import Path
import ast
import subprocess

import runrunner as rrr
from runrunner.base import Status, Runner
from runrunner.local import LocalRun

from sparkle.types import SparkleCallable, SolverStatus
from sparkle.structures import FeatureDataFrame
from sparkle.tools import RunSolver


[docs] class Extractor(SparkleCallable): """Extractor base class for extracting features from instances.""" wrapper = "sparkle_extractor_wrapper.py" def __init__(self: Extractor, directory: Path, runsolver_exec: Path = None) -> None: """Initialize solver. Args: directory: Directory of the solver. runsolver_exec: Path to the runsolver executable. By default, runsolver in directory. """ super().__init__(directory, runsolver_exec) self._features = None self._feature_groups = None self._groupwise_computation = None def __str__(self: Extractor) -> str: """Return the string representation of the extractor.""" return self.name def __repr__(self: Extractor) -> str: """Return detailed representation of the extractor.""" return f"{self.name}:\n"\ f"\t- Directory: {self.directory}\n"\ f"\t- Wrapper: {self.wrapper}\n"\ f"\t- # Feature Groups: {len(self.feature_groups)}\n"\ f"\t- Output Dimension (# Features): {self.output_dimension}\n"\ f"\t- Groupwise Computation Enabled: {self.groupwise_computation}" @property def features(self: Extractor) -> list[tuple[str, str]]: """Determines the features of the extractor.""" if self._features is None: extractor_process = subprocess.run( [self.directory / Extractor.wrapper, "-features"], capture_output=True) self._features = ast.literal_eval(extractor_process.stdout.decode()) return self._features @property def feature_groups(self: Extractor) -> list[str]: """Returns the various feature groups the Extractor has.""" if self._feature_groups is None: self._feature_groups = list(set([group for group, _ in self.features])) return self._feature_groups @property def output_dimension(self: Extractor) -> int: """The size of the output vector of the extractor.""" return len(self.features) @property def groupwise_computation(self: Extractor) -> bool: """Determines if you can call the extractor per group for parallelisation.""" if self._groupwise_computation is None: extractor_help = subprocess.run([self.directory / Extractor.wrapper, "-h"], capture_output=True) # Not the cleanest / most precise way to determine this self._groupwise_computation =\ "-feature_group" in extractor_help.stdout.decode() return self._groupwise_computation
[docs] def build_cmd(self: Extractor, instance: Path | list[Path], feature_group: str = None, output_file: Path = None, cutoff_time: int = None, log_dir: Path = None, ) -> list[str]: """Builds a command line string seperated by space. Args: instance: The instance to run on feature_group: The optional feature group to run the extractor for. outputfile: Optional file to write the output to. runsolver_args: The arguments for runsolver. If not present, will run the extractor without runsolver. Returns: The command seperated per item in the list. """ cmd_list_extractor = [] if not isinstance(instance, list): instance = [instance] cmd_list_extractor = [f"{self.directory / Extractor.wrapper}", "-extractor_dir", f"{self.directory}/", "-instance_file"] + [str(file) for file in instance] if feature_group is not None: cmd_list_extractor += ["-feature_group", feature_group] if output_file is not None: cmd_list_extractor += ["-output_file", str(output_file)] if cutoff_time is not None: # Extractor handles output file itself return RunSolver.wrap_command(self.runsolver_exec, cmd_list_extractor, cutoff_time, log_dir, log_name_base=self.name, raw_results_file=False) return cmd_list_extractor
[docs] def run(self: Extractor, instance: Path | list[Path], feature_group: str = None, output_file: Path = None, cutoff_time: int = None, log_dir: Path = None) -> list[list[Any]] | list[Any] | None: """Runs an extractor job with Runrunner. Args: extractor_path: Path to the executable instance: Path to the instance to run on feature_group: The feature group to compute. Must be supported by the extractor to use. output_file: Target output. If None, piped to the RunRunner job. cutoff_time: CPU cutoff time in seconds log_dir: Directory to write logs. Defaults to CWD. Returns: The features or None if an output file is used, or features can not be found. """ log_dir = Path() if log_dir is None else log_dir if feature_group is not None and not self.groupwise_computation: # This extractor cannot handle groups, compute all features feature_group = None cmd_extractor = self.build_cmd( instance, feature_group, output_file, cutoff_time, log_dir) run_on = Runner.LOCAL # TODO: Let this function also handle Slurm runs extractor_run = rrr.add_to_queue(runner=run_on, cmd=" ".join(cmd_extractor)) if isinstance(extractor_run, LocalRun): extractor_run.wait() if extractor_run.status == Status.ERROR: print(f"{self.name} failed to compute features for {instance}.") for i, job in enumerate(extractor_run.jobs): print(f"Job {i} error yielded was:\n" f"\t-stdout: '{job.stdout}'\n" f"\t-stderr: '{job.stderr}'\n") return None # RunRunner adds a stamp before the statement output = [ast.literal_eval(job.stdout.split("\t", maxsplit=1)[-1]) for job in extractor_run.jobs] if len(output) == 1: return output[0] return output return None
[docs] def get_feature_vector(self: Extractor, result: Path, runsolver_values: Path = None) -> list[str]: """Extracts feature vector from an output file. Args: result: The raw output of the extractor runsolver_values: The output of runsolver. Returns: A list of features. Vector of missing values upon failure. """ if result.exists() and RunSolver.get_status(runsolver_values, None) != SolverStatus.TIMEOUT: feature_values = ast.literal_eval(result.read_text()) return [str(value) for _, _, value in feature_values] return [FeatureDataFrame.missing_value] * self.output_dimension