"""Basic benchmarking job and result representation"""
import logging
import itertools
from enum import Enum
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Optional
if TYPE_CHECKING:
from DikeBenchmarker.benchmarkingmethods.benchmarker import AbstractBenchmarker
logger = logging.getLogger(__name__)
[docs]
class JobState(Enum):
"""Possible states of a Job."""
CREATED = 1
SUBMITTED = 2
RUNNING = 3
FINISHED = 4
FAILED = 5
CANCELLED = 6
[docs]
class JobStateError(Exception):
"""Raised when an invalid state transition is attempted on a Job."""
[docs]
class Job:
"""
Benchmarking Job that behaves like a future.
Identity: benchmark_id, solver_id, created_at (ctor time).
Lifecycle:
CREATED (initial)
--[put into JobLog]--> SUBMITTED
--[start working on]--> RUNNING
--[finish working on]--> FINISHED | FAILED
CREATED/SUBMITTED -> CANCELLED
"""
_id_counter = itertools.count()
def __init__(
self,
job_producer: "AbstractBenchmarker",
benchmark_id: str,
solver_id: str,
checker_id: str,
logroot: str,
retries: int = 3,
) -> None:
self.uid = next(Job._id_counter)
self.job_producer: "AbstractBenchmarker" = job_producer
self.benchmark_id: str = benchmark_id
self.solver_id: str = solver_id
self.checker_id: str = checker_id
self.logroot: str = logroot
self.retries: int = retries
# timestamps
self.created_at: datetime = datetime.now(timezone.utc)
self.submitted_at: Optional[datetime] = None
self.started_at: Optional[datetime] = None
self.finished_at: Optional[datetime] = None
# state data
self.state: JobState = JobState.CREATED
self.result: Optional["Result"] = None
self.error: Optional[str] = None
[docs]
def clone_retry(self, decrement: int = 1) -> "Job":
"""
Create a clone of this job with identical benchmark_id, solver_id, checker_id, and logroot.
The cloned job will have a new created_at timestamp and will be in the CREATED state.
The retries count will be decremented by the specified amount.
"""
return Job(
job_producer=self.job_producer,
benchmark_id=self.benchmark_id,
solver_id=self.solver_id,
checker_id=self.checker_id,
logroot=self.logroot,
retries=self.retries - decrement,
)
[docs]
def get_log_prefix(self) -> str:
"""
Get the logfile prefix for this job.
"""
return f"{self.logroot}/{self.solver_id}/{self.solver_id}.{self.benchmark_id}"
[docs]
def mark_submitted(self) -> None:
"""
Mark the job as submitted.
Called by the infrastructure adaptor upon receiving the job.
"""
if self.state == JobState.SUBMITTED:
logger.warning(f"job {self} wants to be marked as {self.state.name} but it already is {self.state.name}")
if self.submitted_at is None:
self.submitted_at = datetime.now(timezone.utc)
return
if self.state != JobState.CREATED:
raise JobStateError(f"Cannot mark job as SUBMITTED from state {self.state.name}")
self.state = JobState.SUBMITTED
self.submitted_at = datetime.now(timezone.utc)
[docs]
def mark_running(self) -> None:
"""
Mark the job as running.
Called by the infrastructure adaptor once the job started to run.
"""
if self.state == JobState.RUNNING:
logger.warning(f"job {self} wants to be marked as {self.state.name} but it already is {self.state.name}")
return
if self.state != JobState.SUBMITTED:
raise JobStateError(f"Cannot mark job as RUNNING from state {self.state.name}")
self.state = JobState.RUNNING
self.started_at = datetime.now(timezone.utc)
[docs]
def set_finished(self) -> None:
"""
Mark the job as finished.
Called by the infrastructure adaptor when the job has completed successfully.
"""
if self.state != JobState.RUNNING:
raise JobStateError(f"Cannot mark job as FINISHED from state {self.state.name}")
self.state = JobState.FINISHED
self.finished_at = datetime.now(timezone.utc)
[docs]
def set_failed(self, error: str) -> None:
"""
Mark the job as failed.
Called by the infrastructure adaptor when the job has completed unsuccessfully.
"""
if self.state != JobState.RUNNING:
raise JobStateError(f"Cannot mark job as FAILED from state {self.state.name}")
self.error = error
self.state = JobState.FAILED
self.finished_at = datetime.now(timezone.utc)
[docs]
def cancel_local(self) -> bool:
"""
Mark the job as cancelled.
Called by the benchmarker to prevent the job from being submitted to the external system.
"""
if self.state in (JobState.CREATED, JobState.SUBMITTED):
self.state = JobState.CANCELLED
self.finished_at = datetime.now(timezone.utc)
return True
return False
def __repr__(self) -> str:
return f"Job({self.benchmark_id!r}, {self.solver_id!r}, {self.state.name})"
[docs]
class Result:
"""
Represents the result of a benchmark job.
Contains a reference to the job and its resource usage.
"""
def __init__(self, job: "Job", runtime=None, memory=None, failed: bool = False):
self.job = job
self.runtime = runtime
self.memory = memory
self.failed = failed
[docs]
def has_failed(self) -> bool:
return self.failed
[docs]
def get_job(self) -> "Job":
return self.job
def __repr__(self):
return f"BenchmarkResult(inst_id={self.job.benchmark_id}, solver_id={self.job.solver_id}, perf={self.runtime})"