Skip to content

API

Verifier

Base verifier class.

CompleteVerifier

Bases: Verifier

Abstract class for complete verifiers.

Source code in autoverify/verifier/verifier.py
class CompleteVerifier(Verifier):
    """Abstract class for complete verifiers."""

    def verify_property(
        self,
        network: Path,
        property: Path,
        *,
        config: Configuration | Path | None = None,
        timeout: int = DEFAULT_VERIFICATION_TIMEOUT_SEC,
    ) -> CompleteVerificationResult:
        """Verify the property on the network.

        Runs the verifier and feeds the network/property instance as input.
        Complete verification will result in one of the following three
        possibilities: `SAT`, `UNSAT`, `TIMEOUT`.

        Args:
            network: The `Path` to the network in `.onnx` format.
            property: The `Path` to the property in `.vnnlib` format.
            config: The configuration of the verification tool to be used. If
                `None` is passed, the default configuration of the verification
                tool will be used.
            timeout: The maximum number of seconds that can be spent on the
                verification query.

        Returns:
            CompleteVerificationResult: A `Result` object containing information
                about the verification attempt. TODO: Link docs or something
        """
        network, property = network.resolve(), property.resolve()
        self._check_instance(network, property)

        if config is None:
            config = self.default_config

        # Tools use different configuration formats and methods, so we let
        # them do some initialization here
        config = self._init_config(network, property, config)

        run_cmd, output_file = self._get_run_cmd(
            network, property, config=config, timeout=timeout
        )

        outcome = self._run_verification(
            run_cmd,
            result_file=output_file,
            timeout=timeout,
        )

        # Shutting down after timeout may take some time, so we set the took
        # value to the actual timeout
        if outcome.result == "TIMEOUT":
            outcome.took = timeout

        # TODO: What is the point of wrapping in Ok/Err here
        return Ok(outcome) if outcome.result != "ERR" else Err(outcome)

    def verify_instance(
        self,
        instance: VerificationInstance,
        *,
        config: Configuration | Path | None = None,
    ) -> CompleteVerificationResult:
        """See the `verify_property` docstring."""
        return self.verify_property(
            instance.network,
            instance.property,
            timeout=instance.timeout,
            config=config,
        )

    def verify_batch(
        self,
        instances: Iterable[VerificationInstance],
        *,
        config: Configuration | Path | None = None,
    ) -> list[CompleteVerificationResult]:
        """Verify a batch. Not yet implemented."""
        for instance in instances:
            self._check_instance(instance.network, instance.property)

        if config is None:
            config = self.default_config

        return self._verify_batch(
            instances,
            config=config,
        )

    @abstractmethod
    def _verify_batch(
        self,
        instances: Iterable[VerificationInstance],
        *,
        config: Configuration | Path | None,
    ) -> list[CompleteVerificationResult]:
        raise NotImplementedError

    def _allocate_run_cmd(
        self,
        run_cmd: str,
        contexts: list[ContextManager[None]],
    ) -> str:
        # TODO: GPU allocation
        assert self._cpu_gpu_allocation is not None

        taskset_cmd = taskset_cpu_range(self._cpu_gpu_allocation[0:2])
        lines = []

        gpu_dev = self._cpu_gpu_allocation[2]
        gpus = nvidia_gpu_count()

        if gpu_dev > gpus - 1:
            raise ValueError(
                f"Asked for GPU {gpu_dev} (0-indexed), "
                f"but only found {gpus} GPU(s)"
            )

        if gpu_dev >= 0:
            contexts.append(environment(CUDA_VISIBLE_DEVICES=str(gpu_dev)))

        for line in run_cmd.splitlines():
            line = line.lstrip()
            if len(line) == 0 or line.isspace():
                continue

            # HACK: Why does taskset not work with `source` and `conda`?
            if line.startswith("source") or line.startswith("conda"):
                lines.append(line)
            else:
                lines.append(taskset_cmd + " " + line)

        return "\n".join(lines)

    def set_timeout_event(self):
        """Signal that the process has timed out."""
        try:
            self._timeout_event.set()  # type: ignore
        except AttributeError:
            pass

    def _run_verification(
        self,
        run_cmd: str,
        *,
        result_file: Path | None = None,
        timeout: int = DEFAULT_VERIFICATION_TIMEOUT_SEC,
    ) -> CompleteVerificationData:
        contexts = self.contexts or []
        output_lines: list[str] = []
        result: str = ""

        if self._cpu_gpu_allocation is not None:
            run_cmd = self._allocate_run_cmd(run_cmd, contexts)

        with ExitStack() as stack:
            for context in contexts:
                stack.enter_context(context)

            process = subprocess.Popen(
                run_cmd,
                executable="/bin/bash",
                stdout=subprocess.PIPE,
                stderr=subprocess.STDOUT,
                shell=True,
                universal_newlines=True,
                preexec_fn=os.setsid,
            )

            before_t = time.time()
            self._timeout_event: threading.Event | None = threading.Event()

            def _terminate(timeout_sec):
                assert self._timeout_event
                on_time = self._timeout_event.wait(timeout_sec)

                if not on_time:
                    global result
                    result = "TIMEOUT"  # type: ignore

                if pid_exists(process.pid):
                    os.killpg(os.getpgid(process.pid), signal.SIGTERM)

            t = threading.Thread(target=_terminate, args=[timeout])
            t.start()

            assert process.stdout

            for line in iter(process.stdout.readline, ""):
                output_lines.append(line)

            process.stdout.close()
            return_code = process.wait()
            took_t = time.time() - before_t
            self._timeout_event.set()

            output_str = "".join(output_lines)
            counter_example: str | None = None

            if result != "TIMEOUT":
                if return_code > 0:
                    result = "ERR"
                else:
                    result, counter_example = self._parse_result(
                        output_str, result_file
                    )

            self._timeout_event = None

            return CompleteVerificationData(
                result,  # type: ignore
                took_t,
                counter_example,
                "",  # TODO: Remove err field; its piped it to stdout
                output_str,
            )

        raise RuntimeError("Exception during handling of verification")

set_timeout_event()

Signal that the process has timed out.

Source code in autoverify/verifier/verifier.py
def set_timeout_event(self):
    """Signal that the process has timed out."""
    try:
        self._timeout_event.set()  # type: ignore
    except AttributeError:
        pass

verify_batch(instances, *, config=None)

Verify a batch. Not yet implemented.

Source code in autoverify/verifier/verifier.py
def verify_batch(
    self,
    instances: Iterable[VerificationInstance],
    *,
    config: Configuration | Path | None = None,
) -> list[CompleteVerificationResult]:
    """Verify a batch. Not yet implemented."""
    for instance in instances:
        self._check_instance(instance.network, instance.property)

    if config is None:
        config = self.default_config

    return self._verify_batch(
        instances,
        config=config,
    )

verify_instance(instance, *, config=None)

See the verify_property docstring.

Source code in autoverify/verifier/verifier.py
def verify_instance(
    self,
    instance: VerificationInstance,
    *,
    config: Configuration | Path | None = None,
) -> CompleteVerificationResult:
    """See the `verify_property` docstring."""
    return self.verify_property(
        instance.network,
        instance.property,
        timeout=instance.timeout,
        config=config,
    )

verify_property(network, property, *, config=None, timeout=DEFAULT_VERIFICATION_TIMEOUT_SEC)

Verify the property on the network.

Runs the verifier and feeds the network/property instance as input. Complete verification will result in one of the following three possibilities: SAT, UNSAT, TIMEOUT.

Parameters:

Name Type Description Default
network Path

The Path to the network in .onnx format.

required
property Path

The Path to the property in .vnnlib format.

required
config Configuration | Path | None

The configuration of the verification tool to be used. If None is passed, the default configuration of the verification tool will be used.

None
timeout int

The maximum number of seconds that can be spent on the verification query.

DEFAULT_VERIFICATION_TIMEOUT_SEC

Returns:

Name Type Description
CompleteVerificationResult CompleteVerificationResult

A Result object containing information about the verification attempt. TODO: Link docs or something

Source code in autoverify/verifier/verifier.py
def verify_property(
    self,
    network: Path,
    property: Path,
    *,
    config: Configuration | Path | None = None,
    timeout: int = DEFAULT_VERIFICATION_TIMEOUT_SEC,
) -> CompleteVerificationResult:
    """Verify the property on the network.

    Runs the verifier and feeds the network/property instance as input.
    Complete verification will result in one of the following three
    possibilities: `SAT`, `UNSAT`, `TIMEOUT`.

    Args:
        network: The `Path` to the network in `.onnx` format.
        property: The `Path` to the property in `.vnnlib` format.
        config: The configuration of the verification tool to be used. If
            `None` is passed, the default configuration of the verification
            tool will be used.
        timeout: The maximum number of seconds that can be spent on the
            verification query.

    Returns:
        CompleteVerificationResult: A `Result` object containing information
            about the verification attempt. TODO: Link docs or something
    """
    network, property = network.resolve(), property.resolve()
    self._check_instance(network, property)

    if config is None:
        config = self.default_config

    # Tools use different configuration formats and methods, so we let
    # them do some initialization here
    config = self._init_config(network, property, config)

    run_cmd, output_file = self._get_run_cmd(
        network, property, config=config, timeout=timeout
    )

    outcome = self._run_verification(
        run_cmd,
        result_file=output_file,
        timeout=timeout,
    )

    # Shutting down after timeout may take some time, so we set the took
    # value to the actual timeout
    if outcome.result == "TIMEOUT":
        outcome.took = timeout

    # TODO: What is the point of wrapping in Ok/Err here
    return Ok(outcome) if outcome.result != "ERR" else Err(outcome)

Verifier

Bases: ABC

Abstract class to represent a verifier tool.

Source code in autoverify/verifier/verifier.py
class Verifier(ABC):
    """Abstract class to represent a verifier tool."""

    # TODO: GPU Mode attribute
    def __init__(
        self,
        batch_size: int = 512,
        cpu_gpu_allocation: tuple[int, int, int] | None = None,
    ):
        """New instance. This is used with super calls."""
        self._batch_size = batch_size
        self._cpu_gpu_allocation = cpu_gpu_allocation

    def get_init_attributes(self) -> dict[str, Any]:
        """Get attributes provided during initialization of the verifier."""
        return {
            "batch_size": self._batch_size,
        }

    @property
    @abstractmethod
    def name(self) -> str:
        """Unique verifier name."""
        raise NotImplementedError

    @property
    @abstractmethod
    def config_space(self) -> ConfigurationSpace:
        """Configuration space to sample from."""
        raise NotImplementedError

    @property
    @abstractmethod
    def contexts(self) -> list[ContextManager[None]] | None:
        """Contexts to run the verification in."""
        raise NotImplementedError

    @property
    def tool_path(self) -> Path:
        """The path where the verifier is installed."""
        tool_path = VERIFIER_DIR / self.name / TOOL_DIR_NAME

        if not tool_path.exists():
            raise FileNotFoundError(
                f"Could not find installation for tool {self.name}"
            )

        return Path(tool_path)  # mypy complains tool_path is any

    @property
    def conda_env_name(self) -> str:
        """The conda environment name associated with the verifier."""
        return get_verifier_conda_env_name(self.name)

    @property
    def conda_lib_path(self) -> Path:
        return get_conda_env_lib_path(self.conda_env_name)

    @property
    def default_config(self) -> Configuration:
        """Return the default configuration."""
        return self.config_space.get_default_configuration()

    @abstractmethod
    def _get_run_cmd(
        self,
        network: Path,
        property: Path,
        *,
        config: Any,
        timeout: int = DEFAULT_VERIFICATION_TIMEOUT_SEC,
    ) -> tuple[str, Path | None]:
        """Get the cli command to run the verification."""
        raise NotImplementedError

    @abstractmethod
    def _parse_result(
        self,
        output: str,
        result_file: Path | None,
    ) -> tuple[VerificationResultString, str | None]:
        """Parse the output to get the result."""
        raise NotImplementedError

    def _init_config(
        self,
        network: Path,
        property: Path,
        config: Configuration | Path,
    ) -> Any:
        """Init the config, return type that is needed."""
        return config

    # TODO: Overload like in ConfigSpace to distinguish between return types
    def sample_configuration(
        self, *, size: int = 1
    ) -> Configuration | list[Configuration]:
        """Sample one or more configurations.

        Args:
            size: The number of configurations to sample.

        Returns:
            Configuration | list[Configuration]: The sampled configuration(s).
        """
        return self.config_space.sample_configuration(size=size)

    @staticmethod
    def is_same_config(config1: Any, config2: Any) -> bool:
        """Check if two configs are the same."""
        raise NotImplementedError

    # TODO: Make this a function in util/
    @staticmethod
    def _check_instance(network: Path, property: Path):
        if not check_file_extension(network, ".onnx"):
            raise ValueError("Network should be in onnx format")

        if not check_file_extension(property, ".vnnlib"):
            raise ValueError("Property should be in vnnlib format")

conda_env_name: str property

The conda environment name associated with the verifier.

config_space: ConfigurationSpace abstractmethod property

Configuration space to sample from.

contexts: list[ContextManager[None]] | None abstractmethod property

Contexts to run the verification in.

default_config: Configuration property

Return the default configuration.

name: str abstractmethod property

Unique verifier name.

tool_path: Path property

The path where the verifier is installed.

__init__(batch_size=512, cpu_gpu_allocation=None)

New instance. This is used with super calls.

Source code in autoverify/verifier/verifier.py
def __init__(
    self,
    batch_size: int = 512,
    cpu_gpu_allocation: tuple[int, int, int] | None = None,
):
    """New instance. This is used with super calls."""
    self._batch_size = batch_size
    self._cpu_gpu_allocation = cpu_gpu_allocation

get_init_attributes()

Get attributes provided during initialization of the verifier.

Source code in autoverify/verifier/verifier.py
def get_init_attributes(self) -> dict[str, Any]:
    """Get attributes provided during initialization of the verifier."""
    return {
        "batch_size": self._batch_size,
    }

is_same_config(config1, config2) staticmethod

Check if two configs are the same.

Source code in autoverify/verifier/verifier.py
@staticmethod
def is_same_config(config1: Any, config2: Any) -> bool:
    """Check if two configs are the same."""
    raise NotImplementedError

sample_configuration(*, size=1)

Sample one or more configurations.

Parameters:

Name Type Description Default
size int

The number of configurations to sample.

1

Returns:

Type Description
Configuration | list[Configuration]

Configuration | list[Configuration]: The sampled configuration(s).

Source code in autoverify/verifier/verifier.py
def sample_configuration(
    self, *, size: int = 1
) -> Configuration | list[Configuration]:
    """Sample one or more configurations.

    Args:
        size: The number of configurations to sample.

    Returns:
        Configuration | list[Configuration]: The sampled configuration(s).
    """
    return self.config_space.sample_configuration(size=size)

Classes for data about verification.

CompleteVerificationData dataclass

Class holding data about a verification run.

Attributes:

Name Type Description
result VerificationResultString

Outcome (e.g. SAT, UNSAT...)

took float

Walltime spent

counter_example str | None

Example that violates property (if SAT)

err str

stderr

stdout str

stdout

Source code in autoverify/verifier/verification_result.py
@dataclass
class CompleteVerificationData:
    """Class holding data about a verification run.

    Attributes:
        result: Outcome (e.g. SAT, UNSAT...)
        took: Walltime spent
        counter_example: Example that violates property (if SAT)
        err: stderr
        stdout: stdout
    """

    result: VerificationResultString
    took: float
    counter_example: str | None = None
    err: str = ""
    stdout: str = ""

AB-Crown

AbCrown

Bases: CompleteVerifier

AB-Crown.

Source code in autoverify/verifier/complete/abcrown/verifier.py
class AbCrown(CompleteVerifier):
    """AB-Crown."""

    name: str = "abcrown"
    config_space: ConfigurationSpace = AbCrownConfigspace

    def __init__(
        self,
        batch_size: int = 512,
        cpu_gpu_allocation: tuple[int, int, int] | None = None,
        yaml_override: dict[str, Any] | None = None,
    ):
        """New instance."""
        if cpu_gpu_allocation and cpu_gpu_allocation[2] < 0:
            raise ValueError("AB-Crown CPU only mode not yet supported")

        super().__init__(batch_size, cpu_gpu_allocation)
        self._yaml_override = yaml_override

    @property
    def contexts(self) -> list[ContextManager[None]]:
        # TODO: Narrow the pkill_match_list patterns further. People may be
        # running scripts called 'abcrown.py'
        # Ideally just keep track of PIDs rather than pkill name matching
        return [
            cwd(self.tool_path / "complete_verifier"),
            pkill_matches(["python abcrown.py"]),
        ]

    def _parse_result(
        self,
        output: str,
        result_file: Path | None,
    ) -> tuple[VerificationResultString, str | None]:
        if find_substring("Result: sat", output):
            with open(str(result_file), "r") as f:
                counter_example = f.read()

            return "SAT", counter_example
        elif find_substring("Result: unsat", output):
            return "UNSAT", None
        elif find_substring("Result: timeout", output):
            return "TIMEOUT", None

        return "TIMEOUT", None

    def _get_run_cmd(
        self,
        network: Path,
        property: Path,
        *,
        config: Path,
        timeout: int = DEFAULT_VERIFICATION_TIMEOUT_SEC,
    ) -> tuple[str, Path | None]:
        result_file = Path(tmp_file(".txt").name)
        source_cmd = get_conda_source_cmd(get_conda_path())

        run_cmd = f"""
        {" ".join(source_cmd)}
        conda activate {self.conda_env_name}
        python abcrown.py --config {str(config)} \
        --results_file {str(result_file)} \
        --timeout {str(timeout)}
        """

        return run_cmd, result_file

    def _verify_batch(
        self,
        instances: Iterable[Any],
        *,
        config: Configuration | Path | None,
    ) -> list[CompleteVerificationResult]:
        # source_cmd = get_conda_source_cmd()
        # TODO:
        raise NotImplementedError("Batch verification not supported yet")

    def _init_config(
        self,
        network: Path,
        property: Path,
        config: Configuration | Path,
    ) -> Path:
        if isinstance(config, Configuration):
            yaml_config = AbcrownYamlConfig.from_config(
                config,
                network,
                property,
                batch_size=self._batch_size,
                yaml_override=self._yaml_override,
            )
        elif isinstance(config, Path):
            yaml_config = AbcrownYamlConfig.from_yaml(
                config,
                network,
                property,
                batch_size=self._batch_size,
                yaml_override=self._yaml_override,
            )
        else:
            raise ValueError("Config should be a Configuration or Path")

        return Path(yaml_config.get_yaml_file_path())

__init__(batch_size=512, cpu_gpu_allocation=None, yaml_override=None)

New instance.

Source code in autoverify/verifier/complete/abcrown/verifier.py
def __init__(
    self,
    batch_size: int = 512,
    cpu_gpu_allocation: tuple[int, int, int] | None = None,
    yaml_override: dict[str, Any] | None = None,
):
    """New instance."""
    if cpu_gpu_allocation and cpu_gpu_allocation[2] < 0:
        raise ValueError("AB-Crown CPU only mode not yet supported")

    super().__init__(batch_size, cpu_gpu_allocation)
    self._yaml_override = yaml_override

nnenum

Nnenum

Bases: CompleteVerifier

Nnenum.

Source code in autoverify/verifier/complete/nnenum/verifier.py
class Nnenum(CompleteVerifier):
    """Nnenum."""

    name: str = "nnenum"
    config_space: ConfigurationSpace = NnenumConfigspace

    # HACK: Should not need to instantiate a whole new instance just to
    # change `_use_auto_settings`.
    def __init__(
        self,
        batch_size: int = 512,
        cpu_gpu_allocation: tuple[int, int, int] | None = None,
        use_auto_settings: bool = True,
    ):
        """New instance."""
        if cpu_gpu_allocation and cpu_gpu_allocation[2] >= 0:
            raise ValueError("Nnenum does not use a GPU, please set it to -1.")

        super().__init__(batch_size, cpu_gpu_allocation)
        self._use_auto_settings = use_auto_settings

    @property
    def contexts(self) -> list[ContextManager[None]]:
        return [
            cwd(self.tool_path / "src"),
            environment(OPENBLAS_NUM_THREADS="1", OMP_NUM_THREADS="1"),
            pkill_matches(["python -m nnenum.nnenum"]),
        ]

    def _parse_result(
        self, _: str, result_file: Path | None
    ) -> tuple[VerificationResultString, str | None]:
        with open(str(result_file), "r") as f:
            result_txt = f.read()

        first_line = result_txt.split("\n", maxsplit=1)[0]

        if first_line == "unsat":
            return "UNSAT", None
        elif first_line == "sat":
            counter_example = self._parse_counter_example(result_txt)
            return "SAT", counter_example
        elif first_line == "timeout":
            return "TIMEOUT", None

        return "TIMEOUT", None

    def _parse_counter_example(self, result_txt: str) -> str:
        return result_txt.split("\n", maxsplit=1)[1]

    def _get_run_cmd(
        self,
        network: Path,
        property: Path,
        *,
        config: dict[str, Any],
        timeout: int = DEFAULT_VERIFICATION_TIMEOUT_SEC,
    ) -> tuple[str, Path | None]:
        result_file = Path(tmp_file(".txt").name)
        source_cmd = get_conda_source_cmd(get_conda_path())

        # In nnenum, settings are normally passed as a one word string
        # over the CLI. This word then selects from some pre-defined settings.
        # We want some more control however, so we also make an option to pass
        # a stringified dict of exact settings.
        # The "none" value for settings_str is used as a flag that makes
        # nnenum use the dict of exact settings instead.
        settings_str = "none"
        if self._use_auto_settings:
            settings_str = "auto"  # "auto" is the default
            config = {}

        run_cmd = f"""
        {" ".join(source_cmd)}
        conda activate {self.conda_env_name}
        python -m nnenum.nnenum {str(network)} {str(property)} {str(timeout)} \
        {str(result_file)} \
        {str(cpu_count())} \
        {settings_str} \
        {shlex.quote(str(config))} \
        """

        return run_cmd, result_file

    def _verify_batch(
        self,
        instances: Iterable[Any],
        *,
        config: Configuration | Path | None,
    ) -> list[CompleteVerificationResult]:
        # source_cmd = get_conda_source_cmd()
        # TODO:
        raise NotImplementedError("Batch verification not supported yet")

    def _init_config(
        self,
        network: Path,
        property: Path,
        config: Configuration | Path,
    ) -> dict[str, Any]:
        if isinstance(config, Path):
            raise ValueError("Configuration files for nnenum not supported yet")

        import copy

        dict_config = copy.deepcopy(dict(config))

        # HACK: Cant safely evaluate `np.inf`, instead we pass it as a string
        # that is converted back to `np.inf` in the nnenum code.
        if dict_config["INF_OVERAPPROX_MIN_GEN_LIMIT"] is True:
            dict_config["OVERAPPROX_MIN_GEN_LIMIT"] = "_inf"

        if dict_config["INF_OVERAPPROX_LP_TIMEOUT"] is True:
            dict_config["OVERAPPROX_LP_TIMEOUT"] = "_inf"

        del dict_config["INF_OVERAPPROX_LP_TIMEOUT"]
        del dict_config["INF_OVERAPPROX_MIN_GEN_LIMIT"]

        return dict_config

    @staticmethod
    def is_same_config(
        config1: Configuration | str, config2: Configuration | str
    ) -> bool:
        """Return if two configs are equivalent."""
        if isinstance(config1, Configuration):
            config1 = str(config1["settings_mode"])  # type: ignore
        if isinstance(config2, Configuration):
            config2 = str(config2["settings_mode"])  # type: ignore

        return config1 == config2

__init__(batch_size=512, cpu_gpu_allocation=None, use_auto_settings=True)

New instance.

Source code in autoverify/verifier/complete/nnenum/verifier.py
def __init__(
    self,
    batch_size: int = 512,
    cpu_gpu_allocation: tuple[int, int, int] | None = None,
    use_auto_settings: bool = True,
):
    """New instance."""
    if cpu_gpu_allocation and cpu_gpu_allocation[2] >= 0:
        raise ValueError("Nnenum does not use a GPU, please set it to -1.")

    super().__init__(batch_size, cpu_gpu_allocation)
    self._use_auto_settings = use_auto_settings

is_same_config(config1, config2) staticmethod

Return if two configs are equivalent.

Source code in autoverify/verifier/complete/nnenum/verifier.py
@staticmethod
def is_same_config(
    config1: Configuration | str, config2: Configuration | str
) -> bool:
    """Return if two configs are equivalent."""
    if isinstance(config1, Configuration):
        config1 = str(config1["settings_mode"])  # type: ignore
    if isinstance(config2, Configuration):
        config2 = str(config2["settings_mode"])  # type: ignore

    return config1 == config2

Oval-BaB

OvalBab

Bases: CompleteVerifier

Oval-BaB.

Source code in autoverify/verifier/complete/ovalbab/verifier.py
class OvalBab(CompleteVerifier):
    """Oval-BaB."""

    name: str = "ovalbab"
    config_space: ConfigurationSpace = OvalBabConfigspace

    def __init__(
        self,
        batch_size: int = 512,
        cpu_gpu_allocation: tuple[int, int, int] | None = None,
    ):
        """New instance."""
        if cpu_gpu_allocation and cpu_gpu_allocation[2] < 0:
            raise ValueError("Oval-BaB CPU only mode not yet supported")
        super().__init__(batch_size, cpu_gpu_allocation)

    @property
    def contexts(self) -> list[ContextManager[None]]:
        return [
            cwd(self.tool_path),
            environment(
                LD_LIBRARY_PATH=str(
                    find_conda_lib(self.conda_env_name, "libcudart.so.11.0")
                )
            ),
            pkill_matches(["python tools/bab_tools/bab_from_vnnlib.py"]),
        ]

    def _parse_result(
        self,
        _: str,
        result_file: Path | None,
    ) -> tuple[VerificationResultString, str | None]:
        with open(str(result_file), "r") as f:
            result_text = f.read()

        if find_substring("violated", result_text):
            # TODO: Counterexample (not sure if its saved at all by ovalbab?)
            return "SAT", None
        elif find_substring("holds", result_text):
            return "UNSAT", None

        return "TIMEOUT", None

    def _get_run_cmd(
        self,
        network: Path,
        property: Path,
        *,
        config: Path,
        timeout: int = DEFAULT_VERIFICATION_TIMEOUT_SEC,
    ) -> tuple[str, Path | None]:
        result_file = Path(tmp_file(".txt").name)
        source_cmd = get_conda_source_cmd(get_conda_path())

        run_cmd = f"""
        {" ".join(source_cmd)}
        conda activate {self.conda_env_name}
        python tools/bab_tools/bab_from_vnnlib.py --mode run_instance \
        --onnx {str(network)} \
        --vnnlib {str(property)} \
        --result_file {str(result_file)} \
        --json {str(config)} \
        --instance_timeout {timeout}
        """

        return run_cmd, result_file

    def _init_config(
        self,
        network: Path,
        property: Path,
        config: Configuration | Path,
    ) -> Path:
        if isinstance(config, Configuration):
            json_config = OvalbabJsonConfig.from_config(config)
        elif isinstance(config, Path):
            json_config = OvalbabJsonConfig.from_json(config)
        else:
            raise ValueError("Config should be a Configuration, Path or None")

        return Path(json_config.get_json_file_path())

    def _verify_batch(
        self,
        instances: Iterable[Any],
        *,
        config: Configuration | Path | None,
    ) -> list[CompleteVerificationResult]:
        # source_cmd = get_conda_source_cmd()
        # TODO:
        raise NotImplementedError("Batch verification not supported yet")

__init__(batch_size=512, cpu_gpu_allocation=None)

New instance.

Source code in autoverify/verifier/complete/ovalbab/verifier.py
def __init__(
    self,
    batch_size: int = 512,
    cpu_gpu_allocation: tuple[int, int, int] | None = None,
):
    """New instance."""
    if cpu_gpu_allocation and cpu_gpu_allocation[2] < 0:
        raise ValueError("Oval-BaB CPU only mode not yet supported")
    super().__init__(batch_size, cpu_gpu_allocation)

VeriNet

Verinet

Bases: CompleteVerifier

VeriNet.

Source code in autoverify/verifier/complete/verinet/verifier.py
class Verinet(CompleteVerifier):
    """VeriNet."""

    name: str = "verinet"
    config_space: ConfigurationSpace = VerinetConfigspace

    # HACK: Quick hack to add some attributes to a VeriNet instance.
    # Ideally, these could be passed when calling `verify_property/instance`
    # or inside the Configuration.
    def __init__(
        self,
        batch_size: int = 512,
        cpu_gpu_allocation: tuple[int, int, int] | None = None,
        gpu_mode: bool = True,
        input_shape: list[int] | None = None,
        dnnv_simplify: bool = False,
        transpose_matmul_weights: bool = False,
    ):
        """New instance."""
        if cpu_gpu_allocation and cpu_gpu_allocation[2] < 0:
            raise ValueError("VeriNet CPU only mode not yet supported")

        super().__init__(batch_size, cpu_gpu_allocation)
        self._gpu_mode = gpu_mode
        self._input_shape = input_shape
        self._dnnv_simplify = dnnv_simplify
        self._transpose_matmul_weights = transpose_matmul_weights

    @property
    def contexts(self) -> list[ContextManager[None]]:
        return [
            cwd(self.tool_path),
            environment(
                OPENBLAS_NUM_THREADS="1",
                OMP_NUM_THREADS="1",
                LD_LIBRARY_PATH=str(
                    find_conda_lib(self.conda_env_name, "libcudart.so.11.0")
                ),
            ),
            pkill_matches(
                [
                    "multiprocessing.spawn",
                    "multiprocessing.forkserver",
                    "python cli.py",  # TODO: Too broad
                ]
            ),
        ]

    def _parse_result(
        self,
        output: str,
        _: Path | None,
    ) -> tuple[VerificationResultString, str | None]:
        if find_substring("STATUS:  Status.Safe", output):
            return "UNSAT", None
        elif find_substring("STATUS:  Status.Unsafe", output):
            return "SAT", None
        elif find_substring("STATUS:  Status.Undecided", output):
            return "TIMEOUT", None

        return "TIMEOUT", None

    def _get_run_cmd(
        self,
        network: Path,
        property: Path,
        *,
        config: Configuration,
        timeout: int = DEFAULT_VERIFICATION_TIMEOUT_SEC,
    ) -> tuple[str, Path | None]:
        source_cmd = get_conda_source_cmd(get_conda_path())
        input_shape = self._input_shape or get_input_shape(network)

        # params in order:
        # network, prop, timeout, config, input_shape, max_procs, gpu_mode,
        # dnnv_simplify
        run_cmd = f"""
        {" ".join(source_cmd)}
        conda activate {self.conda_env_name}
        python cli.py {str(network)} {str(property)} {timeout} \
        {shlex.quote(str(dict(config)))} \
        {shlex.quote(str(input_shape))} \
        {-1} \
        {self._gpu_mode} \
        {self._dnnv_simplify} \
        {self._transpose_matmul_weights} \
        """

        return run_cmd, None

    def _verify_batch(
        self,
        instances: Iterable[Any],
        *,
        config: Configuration | Path | None,
    ) -> list[CompleteVerificationResult]:
        # source_cmd = get_conda_source_cmd()
        # TODO:
        raise NotImplementedError("Batch verification not supported yet")

__init__(batch_size=512, cpu_gpu_allocation=None, gpu_mode=True, input_shape=None, dnnv_simplify=False, transpose_matmul_weights=False)

New instance.

Source code in autoverify/verifier/complete/verinet/verifier.py
def __init__(
    self,
    batch_size: int = 512,
    cpu_gpu_allocation: tuple[int, int, int] | None = None,
    gpu_mode: bool = True,
    input_shape: list[int] | None = None,
    dnnv_simplify: bool = False,
    transpose_matmul_weights: bool = False,
):
    """New instance."""
    if cpu_gpu_allocation and cpu_gpu_allocation[2] < 0:
        raise ValueError("VeriNet CPU only mode not yet supported")

    super().__init__(batch_size, cpu_gpu_allocation)
    self._gpu_mode = gpu_mode
    self._input_shape = input_shape
    self._dnnv_simplify = dnnv_simplify
    self._transpose_matmul_weights = transpose_matmul_weights

Portfolio

Parallel portfolio.

ConfiguredVerifier dataclass

Class representing an interal configured verifier.

Attributes:

Name Type Description
verifier str

Name of the verifier.

configuration Configuration

Its configuration.

resources tuple[int, int] | None

Number of cores and GPUs.

Source code in autoverify/portfolio/portfolio.py
@dataclass(frozen=True, eq=True, repr=True)
class ConfiguredVerifier:
    """Class representing an interal configured verifier.

    Attributes:
        verifier: Name of the verifier.
        configuration: Its configuration.
        resources: Number of cores and GPUs.
    """

    verifier: str
    configuration: Configuration
    resources: tuple[int, int] | None = None

Portfolio

Bases: MutableSet[ConfiguredVerifier]

Portfolio of ConfiguredVerifiers.

Source code in autoverify/portfolio/portfolio.py
class Portfolio(MutableSet[ConfiguredVerifier]):
    """Portfolio of ConfiguredVerifiers."""

    def __init__(self, *cvs: ConfiguredVerifier):
        """Initialize a new PF with the passed verifiers."""
        self._pf_set: set[ConfiguredVerifier] = set(cvs)
        self._costs: dict[str, float] = {}

    def __contains__(self, cv: object):
        """Check if a CV is in the PF."""
        # cant type annotate the func arg or mypy gets mad
        assert isinstance(cv, ConfiguredVerifier)
        return cv in self._pf_set

    def __iter__(self):
        """Iterate the contents of the PF."""
        return iter(self._pf_set)

    def __len__(self):
        """Number of CVs in the PF."""
        return len(self._pf_set)

    def __str__(self):
        """String representation of the PF."""
        res = ""

        for cv in self:
            res += str(cv) + "\n"

        return res

    def get_set(self):
        """Get the underlying set."""
        return self._pf_set

    @property
    def configs(self) -> list[Configuration]:
        """All the configurations in the PF."""
        configs = []

        for cv in self._pf_set:
            configs.append(cv.configuration)

        return configs

    def get_cost(self, instance: str):
        """Get the currently known costs of an instance."""
        return self._costs[instance]

    def get_costs(self, instances: Iterable[str]) -> dict[str, float]:
        """Get costs of more than one instance."""
        costs: dict[str, float] = {}

        for inst in instances:
            if inst in self._costs:
                costs[inst] = self._costs[inst]

        return costs

    def get_all_costs(self) -> dict[str, float]:
        """All the recorded costs."""
        return self._costs

    def get_mean_cost(self) -> float:
        """Get the mean cost."""
        return float(np.mean(list(self._costs.values())))

    def get_median_cost(self) -> float:
        """Get the median cost."""
        return float(np.median(list(self._costs.values())))

    def get_total_cost(self) -> float:
        """Get the total cost."""
        return float(np.sum(list(self._costs.values())))

    def update_costs(self, costs: Mapping[str, float]):
        """Upate the costs based on the new costs mapping."""
        for instance, cost in costs.items():
            if instance not in self._costs:
                self._costs[instance] = cost
                continue

            self._costs[instance] = min(self._costs[instance], cost)

    def add(self, cv: ConfiguredVerifier):
        """Add a CV to the PF, no duplicates allowed."""
        if cv in self._pf_set:
            raise ValueError(f"{cv} is already in the portfolio")

        self._pf_set.add(cv)

    def discard(self, cv: ConfiguredVerifier):
        """Remove a CV from the PF."""
        if cv not in self._pf_set:
            raise ValueError(f"{cv} is not in the portfolio")

        self._pf_set.discard(cv)

    def reallocate_resources(
        self, strategy: ResourceStrategy = ResourceStrategy.Auto
    ):
        """Realloacte based on current contents and given strategy."""
        if strategy != ResourceStrategy.Auto:
            raise NotImplementedError(
                "Given `ResourceStrategy` is not supported yet."
            )

        # NOTE: Should put this alloc stuff in a function
        n_cores = cpu_count()
        cores_per = n_cores // len(self)
        cores_remainder = n_cores % len(self)

        for cv in self:
            verifier = cv.verifier
            resources = cv.resources
            config = cv.configuration

            extra_core = 0
            if cores_remainder > 0:
                extra_core = 1
                cores_remainder -= 1

            new_resources = (
                (cores_per + extra_core, resources[1]) if resources else None
            )

            self.discard(cv)
            self.add(ConfiguredVerifier(verifier, config, new_resources))

    def to_json(self, out_json_path: Path):
        """Write the portfolio in JSON format to the specified path."""
        json_list: list[dict[str, Any]] = []

        for cv in self._pf_set:
            cfg_dict = dict(cv.configuration)
            to_write = {
                "verifier": cv.verifier,
                "configuration": cfg_dict,
                "resources": cv.resources,
            }
            json_list.append(to_write)

        with open(out_json_path, "w") as f:
            json.dump(json_list, f, indent=4, default=str)

    @classmethod
    def from_json(
        cls,
        json_file: Path,
        config_space_map: Mapping[str, ConfigurationSpace] | None = None,
    ) -> Portfolio:
        """Instantiate a new Portfolio class from a JSON file."""
        with open(json_file.expanduser().resolve()) as f:
            pf_json = json.load(f)

        pf = Portfolio()

        for cv in pf_json:
            if config_space_map is None:
                cfg_space = get_verifier_configspace(cv["verifier"])
            else:
                cfg_space = config_space_map[cv["verifier"]]

            cv["configuration"] = Configuration(cfg_space, cv["configuration"])

            if cv["resources"]:
                cv["resources"] = tuple(cv["resources"])

            pf.add(ConfiguredVerifier(**cv))

        return pf

    def str_compact(self) -> str:
        """Get the portfolio in string form in a compact way."""
        cvs: list[str] = []

        for cv in self:
            cvs.append(
                "\t".join(
                    [
                        str(cv.verifier),
                        str(hash(cv.configuration)),
                        str(cv.resources),
                    ]
                )
            )

        return "\n".join(cvs)

    def dump_costs(self):
        """Print the costs for each instance."""
        for instance, cost in self._costs.items():
            print(instance, cost)

configs: list[Configuration] property

All the configurations in the PF.

__contains__(cv)

Check if a CV is in the PF.

Source code in autoverify/portfolio/portfolio.py
def __contains__(self, cv: object):
    """Check if a CV is in the PF."""
    # cant type annotate the func arg or mypy gets mad
    assert isinstance(cv, ConfiguredVerifier)
    return cv in self._pf_set

__init__(*cvs)

Initialize a new PF with the passed verifiers.

Source code in autoverify/portfolio/portfolio.py
def __init__(self, *cvs: ConfiguredVerifier):
    """Initialize a new PF with the passed verifiers."""
    self._pf_set: set[ConfiguredVerifier] = set(cvs)
    self._costs: dict[str, float] = {}

__iter__()

Iterate the contents of the PF.

Source code in autoverify/portfolio/portfolio.py
def __iter__(self):
    """Iterate the contents of the PF."""
    return iter(self._pf_set)

__len__()

Number of CVs in the PF.

Source code in autoverify/portfolio/portfolio.py
def __len__(self):
    """Number of CVs in the PF."""
    return len(self._pf_set)

__str__()

String representation of the PF.

Source code in autoverify/portfolio/portfolio.py
def __str__(self):
    """String representation of the PF."""
    res = ""

    for cv in self:
        res += str(cv) + "\n"

    return res

add(cv)

Add a CV to the PF, no duplicates allowed.

Source code in autoverify/portfolio/portfolio.py
def add(self, cv: ConfiguredVerifier):
    """Add a CV to the PF, no duplicates allowed."""
    if cv in self._pf_set:
        raise ValueError(f"{cv} is already in the portfolio")

    self._pf_set.add(cv)

discard(cv)

Remove a CV from the PF.

Source code in autoverify/portfolio/portfolio.py
def discard(self, cv: ConfiguredVerifier):
    """Remove a CV from the PF."""
    if cv not in self._pf_set:
        raise ValueError(f"{cv} is not in the portfolio")

    self._pf_set.discard(cv)

dump_costs()

Print the costs for each instance.

Source code in autoverify/portfolio/portfolio.py
def dump_costs(self):
    """Print the costs for each instance."""
    for instance, cost in self._costs.items():
        print(instance, cost)

from_json(json_file, config_space_map=None) classmethod

Instantiate a new Portfolio class from a JSON file.

Source code in autoverify/portfolio/portfolio.py
@classmethod
def from_json(
    cls,
    json_file: Path,
    config_space_map: Mapping[str, ConfigurationSpace] | None = None,
) -> Portfolio:
    """Instantiate a new Portfolio class from a JSON file."""
    with open(json_file.expanduser().resolve()) as f:
        pf_json = json.load(f)

    pf = Portfolio()

    for cv in pf_json:
        if config_space_map is None:
            cfg_space = get_verifier_configspace(cv["verifier"])
        else:
            cfg_space = config_space_map[cv["verifier"]]

        cv["configuration"] = Configuration(cfg_space, cv["configuration"])

        if cv["resources"]:
            cv["resources"] = tuple(cv["resources"])

        pf.add(ConfiguredVerifier(**cv))

    return pf

get_all_costs()

All the recorded costs.

Source code in autoverify/portfolio/portfolio.py
def get_all_costs(self) -> dict[str, float]:
    """All the recorded costs."""
    return self._costs

get_cost(instance)

Get the currently known costs of an instance.

Source code in autoverify/portfolio/portfolio.py
def get_cost(self, instance: str):
    """Get the currently known costs of an instance."""
    return self._costs[instance]

get_costs(instances)

Get costs of more than one instance.

Source code in autoverify/portfolio/portfolio.py
def get_costs(self, instances: Iterable[str]) -> dict[str, float]:
    """Get costs of more than one instance."""
    costs: dict[str, float] = {}

    for inst in instances:
        if inst in self._costs:
            costs[inst] = self._costs[inst]

    return costs

get_mean_cost()

Get the mean cost.

Source code in autoverify/portfolio/portfolio.py
def get_mean_cost(self) -> float:
    """Get the mean cost."""
    return float(np.mean(list(self._costs.values())))

get_median_cost()

Get the median cost.

Source code in autoverify/portfolio/portfolio.py
def get_median_cost(self) -> float:
    """Get the median cost."""
    return float(np.median(list(self._costs.values())))

get_set()

Get the underlying set.

Source code in autoverify/portfolio/portfolio.py
def get_set(self):
    """Get the underlying set."""
    return self._pf_set

get_total_cost()

Get the total cost.

Source code in autoverify/portfolio/portfolio.py
def get_total_cost(self) -> float:
    """Get the total cost."""
    return float(np.sum(list(self._costs.values())))

reallocate_resources(strategy=ResourceStrategy.Auto)

Realloacte based on current contents and given strategy.

Source code in autoverify/portfolio/portfolio.py
def reallocate_resources(
    self, strategy: ResourceStrategy = ResourceStrategy.Auto
):
    """Realloacte based on current contents and given strategy."""
    if strategy != ResourceStrategy.Auto:
        raise NotImplementedError(
            "Given `ResourceStrategy` is not supported yet."
        )

    # NOTE: Should put this alloc stuff in a function
    n_cores = cpu_count()
    cores_per = n_cores // len(self)
    cores_remainder = n_cores % len(self)

    for cv in self:
        verifier = cv.verifier
        resources = cv.resources
        config = cv.configuration

        extra_core = 0
        if cores_remainder > 0:
            extra_core = 1
            cores_remainder -= 1

        new_resources = (
            (cores_per + extra_core, resources[1]) if resources else None
        )

        self.discard(cv)
        self.add(ConfiguredVerifier(verifier, config, new_resources))

str_compact()

Get the portfolio in string form in a compact way.

Source code in autoverify/portfolio/portfolio.py
def str_compact(self) -> str:
    """Get the portfolio in string form in a compact way."""
    cvs: list[str] = []

    for cv in self:
        cvs.append(
            "\t".join(
                [
                    str(cv.verifier),
                    str(hash(cv.configuration)),
                    str(cv.resources),
                ]
            )
        )

    return "\n".join(cvs)

to_json(out_json_path)

Write the portfolio in JSON format to the specified path.

Source code in autoverify/portfolio/portfolio.py
def to_json(self, out_json_path: Path):
    """Write the portfolio in JSON format to the specified path."""
    json_list: list[dict[str, Any]] = []

    for cv in self._pf_set:
        cfg_dict = dict(cv.configuration)
        to_write = {
            "verifier": cv.verifier,
            "configuration": cfg_dict,
            "resources": cv.resources,
        }
        json_list.append(to_write)

    with open(out_json_path, "w") as f:
        json.dump(json_list, f, indent=4, default=str)

update_costs(costs)

Upate the costs based on the new costs mapping.

Source code in autoverify/portfolio/portfolio.py
def update_costs(self, costs: Mapping[str, float]):
    """Upate the costs based on the new costs mapping."""
    for instance, cost in costs.items():
        if instance not in self._costs:
            self._costs[instance] = cost
            continue

        self._costs[instance] = min(self._costs[instance], cost)

PortfolioScenario dataclass

Scenario for constructing a parallel portfolio.

Attributes:

Name Type Description
verifiers Sequence[str]

The name of the verifiers to consider.

resources list[tuple[str, int, int]]

How many cores and GPUs the verifiers need.

instances Sequence[VerificationInstance]

The instances on which the PF is constructed.

length int

The max length of the PF.

seconds_per_iter float

Number of seconds for each Hydra iteration.

configs_per_iter int

Number of configs each iteration.

alpha float

Tune/Pick time split.

added_per_iter int

Entries added to the PF per iter.

stop_early bool

Stop procedure if some early stop conditions are met.

resource_strategy ResourceStrategy

Strat to divide the resources.

output_dir Path | None

Dir where logs are stored.

vnn_compat_mode bool

Use vnn compatability for some verifiers.

benchmark str | None

VNNCOMP benchmark if vnn_compat_mode is True.

verifier_kwargs dict[str, dict[str, Any]] | None

Kwargs passed to verifiers.

uses_simplified_network Iterable[str] | None

If the network uses the dnnv simplified nets.

Source code in autoverify/portfolio/portfolio.py
@dataclass
class PortfolioScenario:
    """Scenario for constructing a parallel portfolio.

    Attributes:
        verifiers: The name of the verifiers to consider.
        resources: How many cores and GPUs the verifiers need.
        instances: The instances on which the PF is constructed.
        length: The max length of the PF.
        seconds_per_iter: Number of seconds for each Hydra iteration.
        configs_per_iter: Number of configs each iteration.
        alpha: Tune/Pick time split.
        added_per_iter: Entries added to the PF per iter.
        stop_early: Stop procedure if some early stop conditions are met.
        resource_strategy: Strat to divide the resources.
        output_dir: Dir where logs are stored.
        vnn_compat_mode: Use vnn compatability for some verifiers.
        benchmark: VNNCOMP benchmark if vnn_compat_mode is `True`.
        verifier_kwargs: Kwargs passed to verifiers.
        uses_simplified_network: If the network uses the dnnv simplified nets.
    """

    verifiers: Sequence[str]
    resources: list[tuple[str, int, int]]
    instances: Sequence[VerificationInstance]
    length: int  # TODO: Rename to max_length?
    seconds_per_iter: float

    # Optional
    configs_per_iter: int = 1
    alpha: float = 0.5  # tune/pick split
    added_per_iter: int = 1
    stop_early: bool = True
    resource_strategy: ResourceStrategy = ResourceStrategy.Auto
    output_dir: Path | None = None
    vnn_compat_mode: bool = False
    benchmark: str | None = None
    verifier_kwargs: dict[str, dict[str, Any]] | None = None
    uses_simplified_network: Iterable[str] | None = None

    def __post_init__(self):
        """Validate the PF scenario."""
        if self.added_per_iter > 1 or self.configs_per_iter > 1:
            raise ValueError(
                "Adding more than 1 config per iter not supported yet."
            )

        if not 0 <= self.alpha <= 1:
            raise ValueError(f"Alpha should be in [0.0, 1.0], got {self.alpha}")

        if self.output_dir is None:
            current_time = datetime.datetime.now()
            formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
            self.output_dir = Path(f"hydra_out/{formatted_time}")

        self.tune_budget = self.alpha
        self.pick_budget = 1 - self.alpha

        if self.added_per_iter > self.length:
            raise ValueError("Entries added per iter should be <= length")

        if self.vnn_compat_mode:
            if not self.benchmark:
                raise ValueError("Use a benchmark name if vnn_compat_mode=True")

        if self.vnn_compat_mode and self.verifier_kwargs:
            raise ValueError(
                "Cannot use vnn_compat_mode and "
                "verifier_kwargs at the same time."
            )

        self.n_iters = math.ceil(self.length / self.added_per_iter)
        self._verify_resources()

    def _verify_resources(self):
        # Check for duplicates
        seen = set()
        for r in self.resources:
            if r[0] in seen:
                raise ValueError(f"Duplicate name '{r[0]}' in resources")

            if r[0] not in self.verifiers:
                raise ValueError(f"{r[0]} in resources but not in verifiers.")

            seen.add(r[0])

        for v in self.verifiers:
            if v not in seen:
                raise ValueError(
                    f"Verifier '{v}' in verifiers but not in resources."
                )

        if self.resource_strategy == ResourceStrategy.Auto:
            for r in self.resources:
                if r[1] != 0:
                    raise ValueError(
                        "CPU resources should be 0 when using `Auto`"
                    )
        else:
            raise NotImplementedError(
                f"ResourceStrategy {self.resource_strategy} "
                f"is not implemented yet."
            )

    def get_smac_scenario_kwargs(self) -> dict[str, Any]:
        """Return the SMAC scenario kwargs as a dict.

        Returns:
            dict[str, Any]: The SMAC scenario as a dict.
        """
        assert self.output_dir is not None  # This is set in `__post_init__`
        self.output_dir.mkdir(parents=True, exist_ok=True)

        return {
            "instances": verification_instances_to_smac_instances(
                self.instances
            ),
            "instance_features": index_features(self.instances),
            "output_directory": self.output_dir,
        }

    def get_smac_instances(self) -> list[str]:
        """Get the instances of the scenario as SMAC instances.

        Returns:
            list[str]: The SMAC instances.
        """
        return verification_instances_to_smac_instances(self.instances)

__post_init__()

Validate the PF scenario.

Source code in autoverify/portfolio/portfolio.py
def __post_init__(self):
    """Validate the PF scenario."""
    if self.added_per_iter > 1 or self.configs_per_iter > 1:
        raise ValueError(
            "Adding more than 1 config per iter not supported yet."
        )

    if not 0 <= self.alpha <= 1:
        raise ValueError(f"Alpha should be in [0.0, 1.0], got {self.alpha}")

    if self.output_dir is None:
        current_time = datetime.datetime.now()
        formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
        self.output_dir = Path(f"hydra_out/{formatted_time}")

    self.tune_budget = self.alpha
    self.pick_budget = 1 - self.alpha

    if self.added_per_iter > self.length:
        raise ValueError("Entries added per iter should be <= length")

    if self.vnn_compat_mode:
        if not self.benchmark:
            raise ValueError("Use a benchmark name if vnn_compat_mode=True")

    if self.vnn_compat_mode and self.verifier_kwargs:
        raise ValueError(
            "Cannot use vnn_compat_mode and "
            "verifier_kwargs at the same time."
        )

    self.n_iters = math.ceil(self.length / self.added_per_iter)
    self._verify_resources()

get_smac_instances()

Get the instances of the scenario as SMAC instances.

Returns:

Type Description
list[str]

list[str]: The SMAC instances.

Source code in autoverify/portfolio/portfolio.py
def get_smac_instances(self) -> list[str]:
    """Get the instances of the scenario as SMAC instances.

    Returns:
        list[str]: The SMAC instances.
    """
    return verification_instances_to_smac_instances(self.instances)

get_smac_scenario_kwargs()

Return the SMAC scenario kwargs as a dict.

Returns:

Type Description
dict[str, Any]

dict[str, Any]: The SMAC scenario as a dict.

Source code in autoverify/portfolio/portfolio.py
def get_smac_scenario_kwargs(self) -> dict[str, Any]:
    """Return the SMAC scenario kwargs as a dict.

    Returns:
        dict[str, Any]: The SMAC scenario as a dict.
    """
    assert self.output_dir is not None  # This is set in `__post_init__`
    self.output_dir.mkdir(parents=True, exist_ok=True)

    return {
        "instances": verification_instances_to_smac_instances(
            self.instances
        ),
        "instance_features": index_features(self.instances),
        "output_directory": self.output_dir,
    }

Class to run parallel portfolio.

PortfolioRunner

Class to run a portfolio in parallel.

Source code in autoverify/portfolio/portfolio_runner.py
class PortfolioRunner:
    """Class to run a portfolio in parallel."""

    def __init__(
        self,
        portfolio: Portfolio,
        vbs_mode: bool = False,
        *,
        n_cpu: int | None = None,
        n_gpu: int | None = None,
    ):
        """Initialize a new portfolio runner.

        Arguments:
            portfolio: The portfolio that will be run.
            vbs_mode: If the PF will be run in VBS mode.
            n_cpu: Override number of CPUs
            n_gpu: Override number of GPUs.
        """
        self._portfolio = portfolio
        self._vbs_mode = vbs_mode
        self._n_cpu = n_cpu
        self._n_gpu = n_gpu

        if not self._vbs_mode:
            self._init_resources()

        self._is_cleaning = False

        def _wrap_cleanup(*_):
            if self._is_cleaning:
                return

            self._is_cleaning = True
            self._cleanup()
            self._is_cleaning = False

        signal.signal(signal.SIGINT, _wrap_cleanup)
        signal.signal(signal.SIGTERM, _wrap_cleanup)

    def _init_resources(self):
        self._allocation: dict[ConfiguredVerifier, tuple[int, int, int]] = {}
        if self._n_cpu:
            cpu_left = self._n_cpu
        else:
            cpu_left = cpu_count()
        if self._n_gpu:
            gpu_left = self._n_gpu
        else:
            gpu_left = nvidia_gpu_count()

        for cv in self._portfolio:
            if cv.resources is None:
                raise ValueError(
                    "No resources for"
                    f"{cv.verifier} :: {cv.configuration} found."
                )

            # CPU (taskset) and GPU (CUDA_VISIBLE_DEVICES) index start at 0
            n_cpu, n_gpu = cv.resources[0], cv.resources[1]

            if n_gpu > gpu_left:
                raise RuntimeError("No more GPUs left")
            if n_cpu > cpu_left:
                raise RuntimeError("No more CPUs left")
            if n_cpu <= 0:
                raise RuntimeError("CPUs should be > 0")

            # Currently only support 1 GPU per verifier
            if n_gpu > 0:
                curr_gpu = gpu_left - 1
                gpu_left -= 1
            else:
                curr_gpu = -1

            cpu_high = cpu_left
            cpu_low = cpu_left - n_cpu
            cpu_left = cpu_low

            self._allocation[cv] = (cpu_low, cpu_high - 1, curr_gpu)

    def evaluate_vbs(
        self,
        instances: list[VerificationInstance],
        *,
        out_csv: Path | None = None,
        vnncompat: bool = False,
        benchmark: str | None = None,
    ) -> _VbsResult:
        """Evaluate the PF in vbs mode.

        Arguments:
            instances: Instances to evaluate.
            out_csv: File where the results are written to.
            vnncompat: Use some compat kwargs.
            benchmark: Only if vnncompat, benchmark name.
        """
        results: _CostDict = {}

        if vnncompat and benchmark is None:
            raise ValueError("Need a benchmark if vnncompat=True")
        elif not vnncompat and benchmark is not None:
            raise ValueError("Only use benchmark if vnncompat=True")

        for cv in self._portfolio:
            assert cv.resources is not None

            for instance in instances:
                verifier = _get_verifier(instance, cv, vnncompat, benchmark)
                logger.info(f"{cv.verifier} on {str(instance)}")
                result = verifier.verify_instance(instance)
                self._add_result(cv, instance, result, results)

                if out_csv:
                    self._csv_log_result(
                        out_csv,
                        result,
                        instance,
                        cv.verifier,
                        cv.configuration,
                    )

        return self._vbs_from_cost_dict(results)

    @staticmethod
    def _csv_log_result(
        out_csv: Path,
        result: CompleteVerificationResult,
        instance: VerificationInstance,
        verifier: str,
        configuration: Configuration,
    ):
        if isinstance(result, Ok):
            res_d = result.unwrap()
            success = "OK"
        elif isinstance(result, Err):
            res_d = result.unwrap_err()
            success = "ERR"

        inst_d = {
            "network": instance.network,
            "property": instance.property,
            "timeout": instance.timeout,
            "verifier": verifier,
            "config": configuration,
            "success": success,
        }

        if not out_csv.exists():
            init_verification_result_csv(out_csv)

        vdr = VerificationDataResult.from_verification_result(res_d, inst_d)
        csv_append_verification_result(vdr, out_csv)

    @staticmethod
    def _vbs_from_cost_dict(cost_dict: _CostDict) -> _VbsResult:
        vbs: _VbsResult = {}

        for cv, instance_costs in cost_dict.items():
            for instance, cost in instance_costs.items():
                str_inst = str(instance)

                if str_inst not in vbs:
                    vbs[str_inst] = (cost, cv.verifier)
                    continue

                if cost < vbs[str_inst][0]:
                    vbs[str_inst] = (cost, cv.verifier)

        return vbs

    @staticmethod
    def _add_result(
        cv: ConfiguredVerifier,
        instance: VerificationInstance,
        result: CompleteVerificationResult,
        results: _CostDict,
    ):
        cost: float

        if isinstance(result, Ok):
            cost = result.unwrap().took
        elif isinstance(result, Err):
            cost = float("inf")

        logger.info(f"Cost: {cost}")

        if cv not in results:
            results[cv] = {}

        results[cv][instance] = cost

    # TODO: Arg types
    def _get_verifiers(
        self,
        instance,
        vnncompat,
        benchmark,
        verifier_kwargs,
    ) -> dict[ConfiguredVerifier, CompleteVerifier]:
        verifiers: dict[ConfiguredVerifier, CompleteVerifier] = {}

        for cv in self._portfolio:
            v = _get_verifier(
                instance,
                cv,
                vnncompat,
                benchmark,
                verifier_kwargs,
                self._allocation,
            )
            assert isinstance(v, CompleteVerifier)
            verifiers[cv] = v

        return verifiers

    def verify_instances(
        self,
        instances: Iterable[VerificationInstance],
        *,
        out_csv: Path | None = None,
        vnncompat: bool = False,
        benchmark: str | None = None,
        verifier_kwargs: dict[str, dict[str, Any]] | None = None,
        uses_simplified_network: Iterable[str] | None = None,
    ) -> dict[VerificationInstance, VerificationDataResult]:
        """Run the PF in parallel.

        Arguments:
            instances: Instances to evaluate.
            out_csv: File where the results are written to.
            vnncompat: Use some compat kwargs.
            benchmark: Only if vnncompat, benchmark name.
            verifier_kwargs: Kwargs passed to verifiers.
            uses_simplified_network: Have some verifiers use simplified nets.
        """
        if self._vbs_mode:
            raise RuntimeError("Function not compatible with vbs_mode")

        if out_csv:
            out_csv = out_csv.expanduser().resolve()

        results: dict[VerificationInstance, VerificationDataResult] = {}

        with concurrent.futures.ThreadPoolExecutor() as executor:
            for instance in instances:
                logger.info(f"Running portfolio on {str(instance)}")

                futures: dict[
                    Future[CompleteVerificationResult], ConfiguredVerifier
                ] = {}
                self._verifiers = self._get_verifiers(
                    instance,
                    vnncompat,
                    benchmark,
                    verifier_kwargs,
                )
                is_solved = False

                for cv in self._portfolio:
                    if (
                        uses_simplified_network
                        and cv.verifier in uses_simplified_network
                    ):
                        target_instance = instance.as_simplified_network()
                    else:
                        target_instance = instance

                    future = executor.submit(
                        self._verifiers[cv].verify_instance, target_instance
                    )
                    futures[future] = cv

                for future in concurrent.futures.as_completed(futures):
                    fut_cv = futures[future]
                    result = future.result()

                    if not is_solved:
                        got_solved = self._process_result(
                            result, results, fut_cv, instance, self._verifiers
                        )

                        if got_solved and not is_solved:
                            is_solved = True

                        if out_csv:
                            self._csv_log_result(
                                out_csv,
                                result,
                                instance,
                                fut_cv.verifier,
                                fut_cv.configuration,
                            )

        return results

    def _process_result(
        self,
        result: CompleteVerificationResult,
        results: dict[VerificationInstance, VerificationDataResult],
        cv: ConfiguredVerifier,
        instance: VerificationInstance,
        verifiers: dict[ConfiguredVerifier, CompleteVerifier],
    ) -> bool:
        instance_data: dict[str, Any] = {
            "network": instance.network,
            "property": instance.property,
            "timeout": instance.timeout,
            "verifier": cv.verifier,
            "config": cv.configuration,
        }

        if isinstance(result, Ok):
            r = result.unwrap()

            if r.result == "TIMEOUT":
                log_string = f"{cv.verifier} timed out"
            else:
                self._cancel_running(cv, verifiers)
                log_string = (
                    f"Verified by {cv.verifier} in {r.took:.2f} sec, "
                    f"result = {r.result}"
                )

            instance_data["success"] = "OK"
            results[instance] = VerificationDataResult.from_verification_result(
                r, instance_data
            )

            # Signal that the instance was solved
            if r.result in ["SAT", "UNSAT"]:
                logger.info(log_string)
                return True
        elif isinstance(result, Err):
            log_string = f"{cv.verifier} errored."
            r = result.unwrap_err()

            instance_data["success"] = "ERR"
            results[instance] = VerificationDataResult.from_verification_result(
                r, instance_data
            )

        logger.info(log_string)
        # Instance was not solved
        return False

    def _cancel_running(
        self,
        first_cv: ConfiguredVerifier,
        verifiers: dict[ConfiguredVerifier, CompleteVerifier],
    ):
        others = set_iter_except(self._portfolio.get_set(), first_cv)
        for other_cv in others:
            verifiers[other_cv].set_timeout_event()

    def _cleanup(self):
        """Kill all running verifiers processes."""
        if not self._verifiers:
            sys.exit(0)

        for verifier_inst in self._verifiers.values():
            try:
                verifier_inst.set_timeout_event()
            finally:
                pass

        sys.exit(0)

__init__(portfolio, vbs_mode=False, *, n_cpu=None, n_gpu=None)

Initialize a new portfolio runner.

Parameters:

Name Type Description Default
portfolio Portfolio

The portfolio that will be run.

required
vbs_mode bool

If the PF will be run in VBS mode.

False
n_cpu int | None

Override number of CPUs

None
n_gpu int | None

Override number of GPUs.

None
Source code in autoverify/portfolio/portfolio_runner.py
def __init__(
    self,
    portfolio: Portfolio,
    vbs_mode: bool = False,
    *,
    n_cpu: int | None = None,
    n_gpu: int | None = None,
):
    """Initialize a new portfolio runner.

    Arguments:
        portfolio: The portfolio that will be run.
        vbs_mode: If the PF will be run in VBS mode.
        n_cpu: Override number of CPUs
        n_gpu: Override number of GPUs.
    """
    self._portfolio = portfolio
    self._vbs_mode = vbs_mode
    self._n_cpu = n_cpu
    self._n_gpu = n_gpu

    if not self._vbs_mode:
        self._init_resources()

    self._is_cleaning = False

    def _wrap_cleanup(*_):
        if self._is_cleaning:
            return

        self._is_cleaning = True
        self._cleanup()
        self._is_cleaning = False

    signal.signal(signal.SIGINT, _wrap_cleanup)
    signal.signal(signal.SIGTERM, _wrap_cleanup)

evaluate_vbs(instances, *, out_csv=None, vnncompat=False, benchmark=None)

Evaluate the PF in vbs mode.

Parameters:

Name Type Description Default
instances list[VerificationInstance]

Instances to evaluate.

required
out_csv Path | None

File where the results are written to.

None
vnncompat bool

Use some compat kwargs.

False
benchmark str | None

Only if vnncompat, benchmark name.

None
Source code in autoverify/portfolio/portfolio_runner.py
def evaluate_vbs(
    self,
    instances: list[VerificationInstance],
    *,
    out_csv: Path | None = None,
    vnncompat: bool = False,
    benchmark: str | None = None,
) -> _VbsResult:
    """Evaluate the PF in vbs mode.

    Arguments:
        instances: Instances to evaluate.
        out_csv: File where the results are written to.
        vnncompat: Use some compat kwargs.
        benchmark: Only if vnncompat, benchmark name.
    """
    results: _CostDict = {}

    if vnncompat and benchmark is None:
        raise ValueError("Need a benchmark if vnncompat=True")
    elif not vnncompat and benchmark is not None:
        raise ValueError("Only use benchmark if vnncompat=True")

    for cv in self._portfolio:
        assert cv.resources is not None

        for instance in instances:
            verifier = _get_verifier(instance, cv, vnncompat, benchmark)
            logger.info(f"{cv.verifier} on {str(instance)}")
            result = verifier.verify_instance(instance)
            self._add_result(cv, instance, result, results)

            if out_csv:
                self._csv_log_result(
                    out_csv,
                    result,
                    instance,
                    cv.verifier,
                    cv.configuration,
                )

    return self._vbs_from_cost_dict(results)

verify_instances(instances, *, out_csv=None, vnncompat=False, benchmark=None, verifier_kwargs=None, uses_simplified_network=None)

Run the PF in parallel.

Parameters:

Name Type Description Default
instances Iterable[VerificationInstance]

Instances to evaluate.

required
out_csv Path | None

File where the results are written to.

None
vnncompat bool

Use some compat kwargs.

False
benchmark str | None

Only if vnncompat, benchmark name.

None
verifier_kwargs dict[str, dict[str, Any]] | None

Kwargs passed to verifiers.

None
uses_simplified_network Iterable[str] | None

Have some verifiers use simplified nets.

None
Source code in autoverify/portfolio/portfolio_runner.py
def verify_instances(
    self,
    instances: Iterable[VerificationInstance],
    *,
    out_csv: Path | None = None,
    vnncompat: bool = False,
    benchmark: str | None = None,
    verifier_kwargs: dict[str, dict[str, Any]] | None = None,
    uses_simplified_network: Iterable[str] | None = None,
) -> dict[VerificationInstance, VerificationDataResult]:
    """Run the PF in parallel.

    Arguments:
        instances: Instances to evaluate.
        out_csv: File where the results are written to.
        vnncompat: Use some compat kwargs.
        benchmark: Only if vnncompat, benchmark name.
        verifier_kwargs: Kwargs passed to verifiers.
        uses_simplified_network: Have some verifiers use simplified nets.
    """
    if self._vbs_mode:
        raise RuntimeError("Function not compatible with vbs_mode")

    if out_csv:
        out_csv = out_csv.expanduser().resolve()

    results: dict[VerificationInstance, VerificationDataResult] = {}

    with concurrent.futures.ThreadPoolExecutor() as executor:
        for instance in instances:
            logger.info(f"Running portfolio on {str(instance)}")

            futures: dict[
                Future[CompleteVerificationResult], ConfiguredVerifier
            ] = {}
            self._verifiers = self._get_verifiers(
                instance,
                vnncompat,
                benchmark,
                verifier_kwargs,
            )
            is_solved = False

            for cv in self._portfolio:
                if (
                    uses_simplified_network
                    and cv.verifier in uses_simplified_network
                ):
                    target_instance = instance.as_simplified_network()
                else:
                    target_instance = instance

                future = executor.submit(
                    self._verifiers[cv].verify_instance, target_instance
                )
                futures[future] = cv

            for future in concurrent.futures.as_completed(futures):
                fut_cv = futures[future]
                result = future.result()

                if not is_solved:
                    got_solved = self._process_result(
                        result, results, fut_cv, instance, self._verifiers
                    )

                    if got_solved and not is_solved:
                        is_solved = True

                    if out_csv:
                        self._csv_log_result(
                            out_csv,
                            result,
                            instance,
                            fut_cv.verifier,
                            fut_cv.configuration,
                        )

    return results

Util

summary.

VerificationDataResult dataclass

summary.

Source code in autoverify/util/instances.py
@dataclass
class VerificationDataResult:
    """_summary_."""

    network: str
    property: str
    timeout: int | None
    verifier: str
    config: str
    success: Literal["OK", "ERR"]
    result: VerificationResultString
    took: float
    counter_example: str | tuple[str, str] | None
    stderr: str | None
    stdout: str | None

    def __post_init__(self):
        """_summary_."""
        if self.config == "None":
            self.config = "default"

    def as_csv_row(self) -> list[str]:
        """Convert data to a csv row writeable."""
        if isinstance(self.counter_example, tuple):
            self.counter_example = "\n".join(self.counter_example)

        return [
            self.network,
            self.property,
            str(self.timeout),
            self.verifier,
            self.config,
            self.success,
            self.result,
            str(self.took),
            self.counter_example or "",
            self.stderr or "",
            self.stdout or "",
        ]

    @classmethod
    def from_verification_result(
        cls,
        verif_res: CompleteVerificationData,
        instance_data: dict[str, Any],  # TODO: Narrow Any
    ):
        """Create from a `CompleteVerificationData`."""
        # TODO: Unpack dict
        return cls(
            instance_data["network"],
            instance_data["property"],
            instance_data["timeout"],
            instance_data["verifier"],
            instance_data["config"],
            instance_data["success"],
            verif_res.result,
            verif_res.took,
            verif_res.counter_example,
            verif_res.err,
            verif_res.stdout,
        )

__post_init__()

summary.

Source code in autoverify/util/instances.py
def __post_init__(self):
    """_summary_."""
    if self.config == "None":
        self.config = "default"

as_csv_row()

Convert data to a csv row writeable.

Source code in autoverify/util/instances.py
def as_csv_row(self) -> list[str]:
    """Convert data to a csv row writeable."""
    if isinstance(self.counter_example, tuple):
        self.counter_example = "\n".join(self.counter_example)

    return [
        self.network,
        self.property,
        str(self.timeout),
        self.verifier,
        self.config,
        self.success,
        self.result,
        str(self.took),
        self.counter_example or "",
        self.stderr or "",
        self.stdout or "",
    ]

from_verification_result(verif_res, instance_data) classmethod

Create from a CompleteVerificationData.

Source code in autoverify/util/instances.py
@classmethod
def from_verification_result(
    cls,
    verif_res: CompleteVerificationData,
    instance_data: dict[str, Any],  # TODO: Narrow Any
):
    """Create from a `CompleteVerificationData`."""
    # TODO: Unpack dict
    return cls(
        instance_data["network"],
        instance_data["property"],
        instance_data["timeout"],
        instance_data["verifier"],
        instance_data["config"],
        instance_data["success"],
        verif_res.result,
        verif_res.took,
        verif_res.counter_example,
        verif_res.err,
        verif_res.stdout,
    )

csv_append_verification_result(verification_result, csv_path)

summary.

Source code in autoverify/util/instances.py
def csv_append_verification_result(
    verification_result: VerificationDataResult, csv_path: Path
):
    """_summary_."""
    with open(str(csv_path.expanduser()), "a") as csv_file:
        writer = csv.writer(csv_file)
        writer.writerow(verification_result.as_csv_row())

init_verification_result_csv(csv_path)

summary.

Source code in autoverify/util/instances.py
def init_verification_result_csv(csv_path: Path):
    """_summary_."""
    with open(str(csv_path.expanduser()), "w") as csv_file:
        writer = csv.writer(csv_file)
        writer.writerow(get_dataclass_field_names(VerificationDataResult))

read_all_vnncomp_instances(vnncomp_path)

Reads all benchmarks, see the read_vnncomp_instances docstring.

Source code in autoverify/util/instances.py
def read_all_vnncomp_instances(
    vnncomp_path: Path,
) -> dict[str, list[VerificationInstance]]:
    """Reads all benchmarks, see the `read_vnncomp_instances` docstring."""
    instances: dict[str, list[VerificationInstance]] = {}

    for path in vnncomp_path.iterdir():
        if not path.is_dir():
            continue

        instances[path.name] = read_vnncomp_instances(path.name, vnncomp_path)

    return instances

read_verification_result_from_csv(csv_path)

Reads a verification results csv to a list of its dataclass.

Source code in autoverify/util/instances.py
def read_verification_result_from_csv(
    csv_path: Path,
) -> list[VerificationDataResult]:
    """Reads a verification results csv to a list of its dataclass."""
    results_df = pd.read_csv(csv_path)
    verification_results: list[VerificationDataResult] = []

    for _, row in results_df.iterrows():
        row = row.to_list()
        verification_results.append(VerificationDataResult(*row))

    return verification_results

read_vnncomp_instances(benchmark, vnncomp_path, *, predicate=None, as_smac=False, resolve_paths=True, instance_file_name='instances.csv')

Read the instances of a VNNCOMP benchmark.

Reads the CSV file of a VNNCOMP benchmark, parsing the network, property and timeout values.

Parameters:

Name Type Description Default
benchmark str

The name of the benchmark directory.

required
vnncomp_path Path

The path to the VNNCOMP benchmark directory

required
predicate _InstancePredicate | Iterable[_InstancePredicate] | None

A function that, given a VerificationInstance returns either True or False. If False is returned, the VerificationInstance is dropped from the returned instances.

None
as_smac bool

Return the instances as smac instance strings.

False

Returns:

Type Description
list[VerificationInstance] | list[str]

list[VerificationInstance] | list[str]: A list of VerificationInstance or string objects that hold the network, property and timeout.

Source code in autoverify/util/instances.py
def read_vnncomp_instances(
    benchmark: str,
    vnncomp_path: Path,
    *,
    predicate: _InstancePredicate | Iterable[_InstancePredicate] | None = None,
    as_smac: bool = False,
    resolve_paths: bool = True,
    instance_file_name: str = "instances.csv",
) -> list[VerificationInstance] | list[str]:
    """Read the instances of a VNNCOMP benchmark.

    Reads the CSV file of a VNNCOMP benchmark, parsing the network, property and
    timeout values.

    Args:
        benchmark: The name of the benchmark directory.
        vnncomp_path: The path to the VNNCOMP benchmark directory
        predicate: A function that, given a `VerificationInstance` returns
            either True or False. If False is returned, the
            `VerificationInstance` is dropped from the returned instances.
        as_smac: Return the instances as smac instance strings.

    Returns:
        list[VerificationInstance] | list[str]: A list of
            `VerificationInstance` or string objects
            that hold the network, property and timeout.
    """
    if not vnncomp_path.is_dir():
        raise ValueError("Could not find VNNCOMP directory")

    benchmark_dir = vnncomp_path / benchmark

    if not benchmark_dir.is_dir():
        raise ValueError(
            f"{benchmark} is not a valid benchmark in {str(vnncomp_path)}"
        )

    instances = benchmark_dir / instance_file_name
    verification_instances = []

    if predicate and not isinstance(predicate, Iterable):
        predicate = [predicate]

    with open(str(instances)) as csv_file:
        reader = csv.reader(csv_file)

        for row in reader:
            network, property, timeout = row

            net_path = Path(str(benchmark_dir / network))
            prop_path = Path(str(benchmark_dir / property))

            instance = VerificationInstance(
                net_path if not resolve_paths else net_path.resolve(),
                prop_path if not resolve_paths else prop_path.resolve(),
                int(float(timeout)),  # FIXME: Timeouts can be floats
            )

            if predicate and not _passes_at_least_1(predicate, instance):
                continue

            if as_smac:
                instance = instance.as_smac_instance()  # type: ignore

            verification_instances.append(instance)

    return verification_instances

unique_networks(instances)

Utility function to keep only unique networks.

Source code in autoverify/util/instances.py
def unique_networks(
    instances: Iterable[VerificationInstance],
) -> list[VerificationInstance]:
    """Utility function to keep only unique networks."""
    unique = []
    seen: set[str] = set()

    for instance in instances:
        if instance.network.name not in seen:
            unique.append(instance)
        seen.add(instance.network.name)

    return unique

verification_instances_to_smac_instances(instances)

Convert a list of VerificationInstace objects to SMAC instances.

See the as_smac_instance docstring of the VerificationInstance class for more details.

Parameters:

Name Type Description Default
instances Sequence[VerificationInstance]

The list of VerificationInstance objects to convert.

required

Returns:

Type Description
list[str]

list[str]: The SMAC instance strings.

Source code in autoverify/util/instances.py
def verification_instances_to_smac_instances(
    instances: Sequence[VerificationInstance],
) -> list[str]:
    """Convert a list of `VerificationInstace` objects to SMAC instances.

    See the `as_smac_instance` docstring of the `VerificationInstance` class for
    more details.

    Args:
        instances: The list of `VerificationInstance` objects to convert.

    Returns:
        list[str]: The SMAC instance strings.
    """
    return [inst.as_smac_instance() for inst in instances]

write_verification_results_to_csv(results, csv_path)

Writes a verification results df to a csv.

Source code in autoverify/util/instances.py
def write_verification_results_to_csv(results: pd.DataFrame, csv_path: Path):
    """Writes a verification results df to a csv."""
    results.to_csv(csv_path, index=False)

VerificationInstance.

VerificationInstance dataclass

VerificationInstance class.

Attributes:

Name Type Description
network Path

The Path to the network.

property Path

The Path to the property.

timeout int

Maximum wallclock time.

Source code in autoverify/util/verification_instance.py
@dataclass(frozen=True, eq=True)
class VerificationInstance:
    """VerificationInstance class.

    Attributes:
        network: The `Path` to the network.
        property: The `Path` to the property.
        timeout: Maximum wallclock time.
    """

    network: Path
    property: Path
    timeout: int

    @classmethod
    def from_str(cls, str_instance: str):
        """Create from a comma seperated string."""
        network, property, timeout = str_instance.split(",")
        return cls(Path(network), Path(property), int(timeout))

    def __hash__(self):
        """Hash the VI."""
        return hash(
            (
                self.network.expanduser().resolve(),
                self.property.expanduser().resolve(),
                self.timeout,
            )
        )

    def __str__(self):
        """Short string representation of the `VerificationInstance`."""
        return f"{self.network.name} :: {self.property.name} :: {self.timeout}"

    def as_smac_instance(self) -> str:
        """Return the instance in a `f"{network},{property},{timeout}"` format.

        A SMAC instance has to be passed as a single string to the
        target_function, in which we split the instance string on the comma
        again to obtain the network, property and timeout.

        If no timeout is specified, the `DEFAULT_VERIFICATION_TIMEOUT_SEC`
        global is used.

        Returns:
            str: The smac instance string
        """
        timeout: int = self.timeout or DEFAULT_VERIFICATION_TIMEOUT_SEC

        return f"{str(self.network)},{str(self.property)},{str(timeout)}"

    def as_row(self, resolve_paths: bool = True) -> list[str]:
        """Returns the instance as a list of strings."""
        net = (
            str(self.network.expanduser().resolve())
            if resolve_paths
            else str(self.network)
        )

        prop = (
            str(self.property.expanduser().resolve())
            if resolve_paths
            else str(self.property)
        )

        return [net, prop, str(self.timeout)]

    # HACK: Assuming some names and layouts here...
    def as_simplified_network(self) -> VerificationInstance:
        """Changes the network path.

        Assumes a "onnx_simplified" dir is present at the same level.
        """
        simplified_nets_dir = self.network.parent.parent / "onnx_simplified"

        return VerificationInstance(
            simplified_nets_dir / self.network.name, self.property, self.timeout
        )

__hash__()

Hash the VI.

Source code in autoverify/util/verification_instance.py
def __hash__(self):
    """Hash the VI."""
    return hash(
        (
            self.network.expanduser().resolve(),
            self.property.expanduser().resolve(),
            self.timeout,
        )
    )

__str__()

Short string representation of the VerificationInstance.

Source code in autoverify/util/verification_instance.py
def __str__(self):
    """Short string representation of the `VerificationInstance`."""
    return f"{self.network.name} :: {self.property.name} :: {self.timeout}"

as_row(resolve_paths=True)

Returns the instance as a list of strings.

Source code in autoverify/util/verification_instance.py
def as_row(self, resolve_paths: bool = True) -> list[str]:
    """Returns the instance as a list of strings."""
    net = (
        str(self.network.expanduser().resolve())
        if resolve_paths
        else str(self.network)
    )

    prop = (
        str(self.property.expanduser().resolve())
        if resolve_paths
        else str(self.property)
    )

    return [net, prop, str(self.timeout)]

as_simplified_network()

Changes the network path.

Assumes a "onnx_simplified" dir is present at the same level.

Source code in autoverify/util/verification_instance.py
def as_simplified_network(self) -> VerificationInstance:
    """Changes the network path.

    Assumes a "onnx_simplified" dir is present at the same level.
    """
    simplified_nets_dir = self.network.parent.parent / "onnx_simplified"

    return VerificationInstance(
        simplified_nets_dir / self.network.name, self.property, self.timeout
    )

as_smac_instance()

Return the instance in a f"{network},{property},{timeout}" format.

A SMAC instance has to be passed as a single string to the target_function, in which we split the instance string on the comma again to obtain the network, property and timeout.

If no timeout is specified, the DEFAULT_VERIFICATION_TIMEOUT_SEC global is used.

Returns:

Name Type Description
str str

The smac instance string

Source code in autoverify/util/verification_instance.py
def as_smac_instance(self) -> str:
    """Return the instance in a `f"{network},{property},{timeout}"` format.

    A SMAC instance has to be passed as a single string to the
    target_function, in which we split the instance string on the comma
    again to obtain the network, property and timeout.

    If no timeout is specified, the `DEFAULT_VERIFICATION_TIMEOUT_SEC`
    global is used.

    Returns:
        str: The smac instance string
    """
    timeout: int = self.timeout or DEFAULT_VERIFICATION_TIMEOUT_SEC

    return f"{str(self.network)},{str(self.property)},{str(timeout)}"

from_str(str_instance) classmethod

Create from a comma seperated string.

Source code in autoverify/util/verification_instance.py
@classmethod
def from_str(cls, str_instance: str):
    """Create from a comma seperated string."""
    network, property, timeout = str_instance.split(",")
    return cls(Path(network), Path(property), int(timeout))

Verifier VNNCOMP compatability.

Return verifier instances that should be compatible with the given benchmark + instance.

inst_bench_to_kwargs(benchmark, verifier, instance)

Get the kwargs for a benchmark.

Source code in autoverify/util/vnncomp.py
def inst_bench_to_kwargs(
    benchmark: str,
    verifier: str,
    instance: VerificationInstance,
) -> dict[str, Any]:
    """Get the kwargs for a benchmark."""
    if verifier == "nnenum":
        return {"use_auto_settings": True}
    elif verifier == "abcrown":  # TODO: All benchmarks
        if benchmark == "acasxu":
            return {"yaml_override": {"data__num_outputs": 5}}
        elif benchmark.startswith("sri_resnet_"):
            return {
                "yaml_override": {
                    "model__onnx_quirks": "{'Reshape': {'fix_batch_size': True}}"  # noqa: E501
                }
            }
        return {}
    elif verifier == "ovalbab":
        return {}
    elif verifier == "verinet":
        if benchmark == "acasxu":
            return {"transpose_matmul_weights": True}
        elif benchmark == "cifar2020":
            if instance.network.name.find("convBigRELU") >= 0:
                return {"dnnv_simplify": True}
        elif benchmark == "cifar100_tinyimagenet_resnet":
            return {"dnnv_simplify": True}
        elif benchmark == "nn4sys":
            if instance.network.name == "lindex.onnx":
                return {"dnnv_simplify": True}
        return {}

    raise ValueError("Invalid verifier")

inst_bench_to_verifier(benchmark, instance, verifier, allocation=None)

Get an instantiated verifier.

Source code in autoverify/util/vnncomp.py
def inst_bench_to_verifier(
    benchmark: str,
    instance: VerificationInstance,
    verifier: str,
    allocation: tuple[int, int, int] | None = None,
) -> CompleteVerifier:
    """Get an instantiated verifier."""
    verifier_inst = verifier_from_name(verifier)(
        **inst_bench_to_kwargs(benchmark, verifier, instance),
        cpu_gpu_allocation=allocation,
    )
    assert isinstance(verifier_inst, CompleteVerifier)
    return verifier_inst

inst_bench_verifier_config(benchmark, instance, verifier, configs_dir)

Return the verifier and the VNNCOMP config.

Source code in autoverify/util/vnncomp.py
def inst_bench_verifier_config(
    benchmark: str,
    instance: VerificationInstance,
    verifier: str,
    configs_dir: Path,
) -> Configuration | Path | None:
    """Return the verifier and the VNNCOMP config."""
    if benchmark == "test_props":
        return None

    if verifier == "nnenum":
        return None
    elif verifier == "abcrown":
        cfg_file = _get_abcrown_config(benchmark, instance)
        return Path(configs_dir / "abcrown" / cfg_file)
    elif verifier == "ovalbab":
        cfg = _get_ovalbab_config(benchmark)
        if cfg is not None:
            return Path(configs_dir / "ovalbab" / cfg)
        return cfg
    elif verifier == "verinet":
        return _get_verinet_config(benchmark)

    raise ValueError("Invalid verifier")