Source code for DikeBenchmarker.benchmarkatoms

"""Basic benchmarking job and result representation"""

import logging
import itertools
from enum import Enum
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Optional

if TYPE_CHECKING:
    from DikeBenchmarker.benchmarkingmethods.benchmarker import AbstractBenchmarker

logger = logging.getLogger(__name__)


[docs] class JobState(Enum): """Possible states of a Job.""" CREATED = 1 SUBMITTED = 2 RUNNING = 3 FINISHED = 4 FAILED = 5 CANCELLED = 6
[docs] class JobStateError(Exception): """Raised when an invalid state transition is attempted on a Job."""
[docs] class Job: """ Benchmarking Job that behaves like a future. Identity: benchmark_id, solver_id, created_at (ctor time). Lifecycle: CREATED (initial) --[put into JobLog]--> SUBMITTED --[start working on]--> RUNNING --[finish working on]--> FINISHED | FAILED CREATED/SUBMITTED -> CANCELLED """ _id_counter = itertools.count() def __init__( self, job_producer: "AbstractBenchmarker", benchmark_id: str, solver_id: str, checker_id: str, logroot: str, retries: int = 3, ) -> None: self.uid = next(Job._id_counter) self.job_producer: "AbstractBenchmarker" = job_producer self.benchmark_id: str = benchmark_id self.solver_id: str = solver_id self.checker_id: str = checker_id self.logroot: str = logroot self.retries: int = retries # timestamps self.created_at: datetime = datetime.now(timezone.utc) self.submitted_at: Optional[datetime] = None self.started_at: Optional[datetime] = None self.finished_at: Optional[datetime] = None # state data self.state: JobState = JobState.CREATED self.result: Optional["Result"] = None self.error: Optional[str] = None
[docs] def clone_retry(self, decrement: int = 1) -> "Job": """ Create a clone of this job with identical benchmark_id, solver_id, checker_id, and logroot. The cloned job will have a new created_at timestamp and will be in the CREATED state. The retries count will be decremented by the specified amount. """ return Job( job_producer=self.job_producer, benchmark_id=self.benchmark_id, solver_id=self.solver_id, checker_id=self.checker_id, logroot=self.logroot, retries=self.retries - decrement, )
[docs] def get_log_prefix(self) -> str: """ Get the logfile prefix for this job. """ return f"{self.logroot}/{self.solver_id}/{self.solver_id}.{self.benchmark_id}"
[docs] def mark_submitted(self) -> None: """ Mark the job as submitted. Called by the infrastructure adaptor upon receiving the job. """ if self.state == JobState.SUBMITTED: logger.warning(f"job {self} wants to be marked as {self.state.name} but it already is {self.state.name}") if self.submitted_at is None: self.submitted_at = datetime.now(timezone.utc) return if self.state != JobState.CREATED: raise JobStateError(f"Cannot mark job as SUBMITTED from state {self.state.name}") self.state = JobState.SUBMITTED self.submitted_at = datetime.now(timezone.utc)
[docs] def mark_running(self) -> None: """ Mark the job as running. Called by the infrastructure adaptor once the job started to run. """ if self.state == JobState.RUNNING: logger.warning(f"job {self} wants to be marked as {self.state.name} but it already is {self.state.name}") return if self.state != JobState.SUBMITTED: raise JobStateError(f"Cannot mark job as RUNNING from state {self.state.name}") self.state = JobState.RUNNING self.started_at = datetime.now(timezone.utc)
[docs] def set_finished(self) -> None: """ Mark the job as finished. Called by the infrastructure adaptor when the job has completed successfully. """ if self.state != JobState.RUNNING: raise JobStateError(f"Cannot mark job as FINISHED from state {self.state.name}") self.state = JobState.FINISHED self.finished_at = datetime.now(timezone.utc)
[docs] def set_failed(self, error: str) -> None: """ Mark the job as failed. Called by the infrastructure adaptor when the job has completed unsuccessfully. """ if self.state != JobState.RUNNING: raise JobStateError(f"Cannot mark job as FAILED from state {self.state.name}") self.error = error self.state = JobState.FAILED self.finished_at = datetime.now(timezone.utc)
[docs] def cancel_local(self) -> bool: """ Mark the job as cancelled. Called by the benchmarker to prevent the job from being submitted to the external system. """ if self.state in (JobState.CREATED, JobState.SUBMITTED): self.state = JobState.CANCELLED self.finished_at = datetime.now(timezone.utc) return True return False
def __repr__(self) -> str: return f"Job({self.benchmark_id!r}, {self.solver_id!r}, {self.state.name})"
[docs] class Result: """ Represents the result of a benchmark job. Contains a reference to the job and its resource usage. """ def __init__(self, job: "Job", runtime=None, memory=None, failed: bool = False): self.job = job self.runtime = runtime self.memory = memory self.failed = failed
[docs] def has_failed(self) -> bool: return self.failed
[docs] def get_job(self) -> "Job": return self.job
def __repr__(self): return f"BenchmarkResult(inst_id={self.job.benchmark_id}, solver_id={self.job.solver_id}, perf={self.runtime})"