Source code for sparkle.tools.runsolver

"""Class for handling the Runsolver Wrapper."""
from __future__ import annotations
import sys
import ast
import re
import warnings
from pathlib import Path

from sparkle.types import SolverStatus
from sparkle.tools.general import get_time_pid_random_string


[docs] class RunSolver: """Class representation of RunSolver. For more information see: http://www.cril.univ-artois.fr/~roussel/runsolver/ """ def __init__(self: RunSolver) -> None: """Currently RunSolver has no instance specific methods or properties.""" pass
[docs] @staticmethod def wrap_command( runsolver_executable: Path, command: list[str], cutoff_time: int, log_directory: Path, log_name_base: str = None, raw_results_file: bool = True) -> list[str]: """Wrap a command with the RunSolver call and arguments. Args: runsolver_executable: The Path to the runsolver executable. Is returned as an *absolute* path in the output. command: The command to wrap. cutoff_time: The cutoff CPU time for the solver. log_directory: The directory where to write the solver output. log_name_base: A user defined name to easily identify the logs. Defaults to "runsolver". raw_results_file: Whether to use the raw results file. Returns: List of commands and arguments to execute the solver. """ # Create RunSolver Logs # --timestamp # instructs to timestamp each line of the solver standard output and # error files (which are then redirected to stdout) # --use-pty # use a pseudo-terminal to collect the solver output. Currently only # available when lines are timestamped. Some I/O libraries (including # the C library) automatically flushes the output after each line when # the standard output is a terminal. There's no automatic flush when # the standard output is a pipe or a plain file. See setlinebuf() for # some details. This option instructs runsolver to use a # pseudo-terminal instead of a pipe/file to collect the solver # output. This fools the solver which will line-buffer its output. # -w filename or --watcher-data filename # sends the watcher informations to filename # -v filename or --var filename # save the most relevant information (times,...) # in an easy to parse VAR=VALUE file # -o filename or --solver-data filename # redirects the solver output (both stdout and stderr) to filename log_name_base = "runsolver" if log_name_base is None else log_name_base unique_stamp = get_time_pid_random_string() raw_result_path = log_directory / Path(f"{log_name_base}_{unique_stamp}.rawres") watcher_data_path = raw_result_path.with_suffix(".log") var_values_path = raw_result_path.with_suffix(".val") return [str(runsolver_executable.absolute()), "--timestamp", "--use-pty", "--cpu-limit", str(cutoff_time), "-w", str(watcher_data_path), "-v", str(var_values_path)] + ( ["-o", str(raw_result_path)] if raw_results_file else []) + command
[docs] @staticmethod def get_measurements(runsolver_values_path: Path, not_found: float = -1.0) -> tuple[float, float, float]: """Return the CPU and wallclock time reported by runsolver in values log.""" cpu_time, wall_time, memory = not_found, not_found, not_found if runsolver_values_path.exists(): with runsolver_values_path.open("r") as infile: lines = [line.strip().split("=") for line in infile.readlines() if line.count("=") == 1] for keyword, value in lines: if keyword == "WCTIME": wall_time = float(value) elif keyword == "CPUTIME": cpu_time = float(value) elif keyword == "MAXVM": memory = float(int(value) / 1024.0) # MB # Order is fixed, CPU is the last thing we want to read, so break break return cpu_time, wall_time, memory
[docs] @staticmethod def get_status(runsolver_values_path: Path, runsolver_raw_path: Path) -> SolverStatus: """Get run status from runsolver logs.""" if not runsolver_values_path.exists() and (runsolver_raw_path is not None and not runsolver_raw_path.exists()): # Runsolver logs were not created, job was stopped ''incorrectly'' return SolverStatus.CRASHED # First check if runsolver reported time out if runsolver_values_path.exists(): for line in reversed(runsolver_values_path.open("r").readlines()): if line.strip().startswith("TIMEOUT="): if line.strip() == "TIMEOUT=true": return SolverStatus.TIMEOUT break if runsolver_raw_path is None: return SolverStatus.UNKNOWN if not runsolver_raw_path.exists(): # Runsolver log was not created, job was stopped ''incorrectly'' return SolverStatus.KILLED # Last line of runsolver log should contain the raw sparkle solver wrapper output runsolver_raw_contents = runsolver_raw_path.open("r").read().strip() # cutoff_time = sparkle_wrapper_dict_str = runsolver_raw_contents.splitlines()[-1] solver_regex_filter = re.findall("{.*}", sparkle_wrapper_dict_str)[0] output_dict = ast.literal_eval(solver_regex_filter) status = SolverStatus(output_dict["status"]) # if status == SolverStatus.CRASHED and cpu_time > cutoff_time return status
[docs] @staticmethod def get_solver_args(runsolver_log_path: Path) -> str: """Retrieves solver arguments dict from runsolver log.""" if runsolver_log_path.exists(): for line in runsolver_log_path.open("r").readlines(): if line.startswith("command line:"): return (line.split("sparkle_solver_wrapper.py", 1)[1] .strip().strip("'")) return ""
[docs] @staticmethod def get_solver_output(runsolver_configuration: list[str | Path], process_output: str) -> dict[str, str | object]: """Decode solver output dictionary when called with runsolver.""" solver_input = None solver_output = None value_data_file = None cutoff_time = sys.maxsize for idx, conf in enumerate(runsolver_configuration): if not isinstance(conf, str): # Looking for arg names continue conf = conf.strip() if conf == "-o" or conf == "--solver-data": # solver output was redirected solver_data_file = Path(runsolver_configuration[idx + 1]) if solver_data_file.exists(): solver_output = solver_data_file.open("r").read() if "-v" in conf or "--var" in conf: value_data_file = Path(runsolver_configuration[idx + 1]) if "--cpu-limit" in conf: cutoff_time = float(runsolver_configuration[idx + 1]) if "-w" in conf or "--watcher-data" in conf: watch_file = Path(runsolver_configuration[idx + 1]) args_str = RunSolver.get_solver_args(watch_file) if args_str == "": # Could not find log file or args continue solver_input = re.findall("{.*}", args_str)[0] solver_input = ast.literal_eval(solver_input) cutoff_time = float(solver_input["cutoff_time"]) if solver_output is None: # Still empty, try to read from subprocess solver_output = process_output # Format output to only the brackets (dict) # NOTE: It should have only one match, do we want some error logging here? try: solver_regex_filter = re.findall("{.*}", solver_output)[0] output_dict = ast.literal_eval(solver_regex_filter) except Exception: config_str = " ".join([str(c) for c in runsolver_configuration]) warnings.warn("Solver output decoding failed from RunSolver configuration: " f"'{config_str}'. Setting status to 'UNKNOWN'.", category=RuntimeWarning) output_dict = {"status": SolverStatus.UNKNOWN} output_dict["cutoff_time"] = cutoff_time if value_data_file is not None: cpu_time, wall_time, memory = RunSolver.get_measurements(value_data_file) output_dict["cpu_time"] = cpu_time output_dict["wall_time"] = wall_time output_dict["memory"] = memory else: # Could not retrieve cpu and wall time (log does not exist) output_dict["cpu_time"], output_dict["wall_time"] = -1.0, -1.0 if output_dict["cpu_time"] > cutoff_time: output_dict["status"] = SolverStatus.TIMEOUT # Add the missing objectives (runtime based) if solver_input is not None and "objectives" in solver_input: objectives = solver_input["objectives"].split(",") for o_name in objectives: if o_name not in output_dict: output_dict[o_name] = None return output_dict