"""File to handle a solver and its directories."""
from __future__ import annotations
import sys
from typing import Any
import shlex
import ast
import json
from pathlib import Path
import runrunner as rrr
from runrunner.local import LocalRun
from runrunner.slurm import SlurmRun
from runrunner.base import Status, Runner
from sparkle.tools import runsolver_parsing, general as tg
from sparkle.tools import pcsparser
from sparkle.types import SparkleCallable, SolverStatus
from sparkle.solver.verifier import SolutionVerifier
from sparkle.instance import InstanceSet
from sparkle.types import resolve_objective, SparkleObjective, UseTime
[docs]
class Solver(SparkleCallable):
"""Class to handle a solver and its directories."""
meta_data = "solver_meta.txt"
wrapper = "sparkle_solver_wrapper.py"
def __init__(self: Solver,
directory: Path,
raw_output_directory: Path = None,
runsolver_exec: Path = None,
deterministic: bool = None,
verifier: SolutionVerifier = None) -> None:
"""Initialize solver.
Args:
directory: Directory of the solver.
raw_output_directory: Directory where solver will write its raw output.
Defaults to directory / tmp
runsolver_exec: Path to the runsolver executable.
By default, runsolver in directory.
deterministic: Bool indicating determinism of the algorithm.
Defaults to False.
verifier: The solution verifier to use. If None, no verifier is used.
"""
super().__init__(directory, runsolver_exec, raw_output_directory)
self.deterministic = deterministic
self.verifier = verifier
self.meta_data_file = self.directory / Solver.meta_data
if self.raw_output_directory is None:
self.raw_output_directory = self.directory / "tmp"
self.raw_output_directory.mkdir(exist_ok=True)
if self.runsolver_exec is None:
self.runsolver_exec = self.directory / "runsolver"
if not self.meta_data_file.exists():
self.meta_data_file = None
if self.deterministic is None:
if self.meta_data_file is not None:
# Read the parameter from file
meta_dict = ast.literal_eval(self.meta_data_file.open().read())
self.deterministic = meta_dict["deterministic"]
else:
self.deterministic = False
def _get_pcs_file(self: Solver) -> Path | bool:
"""Get path of the parameter file.
Returns:
Path to the parameter file or False if the parameter file does not exist.
"""
pcs_files = [p for p in self.directory.iterdir() if p.suffix == ".pcs"]
if len(pcs_files) != 1:
# We only consider one PCS file per solver
return False
return pcs_files[0]
[docs]
def get_pcs_file(self: Solver) -> Path:
"""Get path of the parameter file.
Returns:
Path to the parameter file. None if it can not be resolved.
"""
if not (file_path := self._get_pcs_file()):
return None
return file_path
[docs]
def read_pcs_file(self: Solver) -> bool:
"""Checks if the pcs file can be read."""
pcs_file = self._get_pcs_file()
try:
parser = pcsparser.PCSParser()
parser.load(str(pcs_file), convention="smac")
return True
except SyntaxError:
pass
return False
[docs]
def get_pcs(self: Solver) -> dict[str, tuple[str, str, str]]:
"""Get the parameter content of the PCS file."""
if not (pcs_file := self.get_pcs_file()):
return None
parser = pcsparser.PCSParser()
parser.load(str(pcs_file), convention="smac")
return [p for p in parser.pcs.params if p["type"] == "parameter"]
[docs]
def build_cmd(self: Solver,
instance: str | list[str],
objectives: list[SparkleObjective],
seed: int,
cutoff_time: int = None,
configuration: dict = None) -> list[str]:
"""Build the solver call on an instance with a configuration.
Args:
instance: Path to the instance.
seed: Seed of the solver.
cutoff_time: Cutoff time for the solver.
configuration: Configuration of the solver.
Returns:
List of commands and arguments to execute the solver.
"""
if configuration is None:
configuration = {}
# Ensure configuration contains required entries for each wrapper
configuration["solver_dir"] = str(self.directory.absolute())
configuration["instance"] = instance
configuration["seed"] = seed
configuration["objectives"] = ",".join([str(obj) for obj in objectives])
if cutoff_time is not None: # Use RunSolver
configuration["cutoff_time"] = cutoff_time
# Create RunSolver Logs
# --timestamp
# instructs to timestamp each line of the solver standard output and
# error files (which are then redirected to stdout)
# --use-pty
# use a pseudo-terminal to collect the solver output. Currently only
# available when lines are timestamped. Some I/O libraries (including
# the C library) automatically flushes the output after each line when
# the standard output is a terminal. There's no automatic flush when
# the standard output is a pipe or a plain file. See setlinebuf() for
# some details. This option instructs runsolver to use a
# pseudo-terminal instead of a pipe/file to collect the solver
# output. This fools the solver which will line-buffer its output.
# -w filename or --watcher-data filename
# sends the watcher informations to filename
# -v filename or --var filename
# save the most relevant information (times,...)
# in an easy to parse VAR=VALUE file
# -o filename or --solver-data filename
# redirects the solver output (both stdout and stderr) to filename
inst_name = Path(instance).name
raw_result_path =\
Path(f"{self.name}_{inst_name}_{tg.get_time_pid_random_string()}.rawres")
runsolver_watch_data_path = raw_result_path.with_suffix(".log")
runsolver_values_path = raw_result_path.with_suffix(".val")
solver_cmd = [str(self.runsolver_exec.absolute()),
"--timestamp", "--use-pty",
"--cpu-limit", str(cutoff_time),
"-w", str(runsolver_watch_data_path),
"-v", str(runsolver_values_path),
"-o", str(raw_result_path)]
else:
configuration["cutoff_time"] = sys.maxsize
solver_cmd = []
# Ensure stringification of dictionary will go correctly for key value pairs
configuration = {key: str(configuration[key]) for key in configuration}
solver_cmd += [str((self.directory / Solver.wrapper).absolute()),
f"'{json.dumps(configuration)}'"]
return solver_cmd
[docs]
def run(self: Solver,
instance: str | list[str] | InstanceSet,
objectives: list[SparkleObjective],
seed: int,
cutoff_time: int = None,
configuration: dict = None,
run_on: Runner = Runner.LOCAL,
commandname: str = "run_solver",
sbatch_options: list[str] = None,
cwd: Path = None) -> SlurmRun | list[dict[str, Any]] | dict[str, Any]:
"""Run the solver on an instance with a certain configuration.
Args:
instance: The instance(s) to run the solver on, list in case of multi-file.
In case of an instance set, will run on all instances in the set.
seed: Seed to run the solver with. Fill with abitrary int in case of
determnistic solver.
cutoff_time: The cutoff time for the solver, measured through RunSolver.
If None, will be executed without RunSolver.
configuration: The solver configuration to use. Can be empty.
cwd: Path where to execute. Defaults to self.raw_output_directory.
Returns:
Solver output dict possibly with runsolver values.
"""
if cwd is None:
cwd = self.raw_output_directory
cmds = []
if isinstance(instance, InstanceSet):
for inst in instance.instance_paths:
solver_cmd = self.build_cmd(inst.absolute(),
objectives=objectives,
seed=seed,
cutoff_time=cutoff_time,
configuration=configuration)
cmds.append(" ".join(solver_cmd))
else:
solver_cmd = self.build_cmd(instance,
objectives=objectives,
seed=seed,
cutoff_time=cutoff_time,
configuration=configuration)
cmds.append(" ".join(solver_cmd))
run = rrr.add_to_queue(runner=run_on,
cmd=cmds,
name=commandname,
base_dir=cwd,
path=cwd,
sbatch_options=sbatch_options)
if isinstance(run, LocalRun):
run.wait()
# Subprocess resulted in error
if run.status == Status.ERROR:
print(f"WARNING: Solver {self.name} execution seems to have failed!\n")
for i, job in enumerate(run.jobs):
print(f"[Job {i}] The used command was: {cmds[i]}\n"
"The error yielded was:\n"
f"\t-stdout: '{run.jobs[0]._process.stdout}'\n"
f"\t-stderr: '{run.jobs[0]._process.stderr}'\n")
return {"status": SolverStatus.ERROR, }
solver_outputs = []
for i, job in enumerate(run.jobs):
solver_cmd = cmds[i].split(" ")
runsolver_configuration = None
if solver_cmd[0] == str(self.runsolver_exec.absolute()):
runsolver_configuration = solver_cmd[:11]
solver_output = Solver.parse_solver_output(run.jobs[i].stdout,
runsolver_configuration,
cwd)
if self.verifier is not None:
solver_output["status"] = self.verifier.verifiy(
instance, Path(runsolver_configuration[-1]))
solver_outputs.append(solver_output)
return solver_outputs if len(solver_outputs) > 1 else solver_output
return run
[docs]
@staticmethod
def config_str_to_dict(config_str: str) -> dict[str, str]:
"""Parse a configuration string to a dictionary."""
# First we filter the configuration of unwanted characters
config_str = config_str.strip().replace("-", "")
# Then we split the string by spaces, but conserve substrings
config_list = shlex.split(config_str)
# We return empty for empty input OR uneven input
if config_str == "" or config_str == r"{}" or len(config_list) & 1:
return {}
config_dict = {}
for index in range(0, len(config_list), 2):
# As the value will already be a string object, no quotes are allowed in it
value = config_list[index + 1].strip('"').strip("'")
config_dict[config_list[index]] = value
return config_dict
[docs]
@staticmethod
def parse_solver_output(solver_output: str,
runsolver_configuration: list[str] = None,
cwd: Path = None) -> dict[str, Any]:
"""Parse the output of the solver.
Args:
solver_output: The output of the solver run which needs to be parsed
runsolver_configuration: The runsolver configuration to wrap the solver
with. If runsolver was not used this should be None.
cwd: Path where to execute. Defaults to self.raw_output_directory.
Returns:
Dictionary representing the parsed solver output
"""
if runsolver_configuration is not None:
parsed_output = runsolver_parsing.get_solver_output(runsolver_configuration,
solver_output,
cwd)
else:
parsed_output = ast.literal_eval(solver_output)
# cast status attribute from str to Enum
parsed_output["status"] = SolverStatus(parsed_output["status"])
# apply objectives to parsed output, runtime based objectives added here
for key, value in parsed_output.items():
if key == "status":
continue
objective = resolve_objective(key)
if objective is None:
continue
if objective.use_time == UseTime.NO:
if objective.post_process is not None:
parsed_output[objective] = objective.post_process(value)
else:
if runsolver_configuration is None:
continue
if objective.use_time == UseTime.CPU_TIME:
parsed_output[key] = parsed_output["cpu_time"]
else:
parsed_output[key] = parsed_output["wall_time"]
if objective.post_process is not None:
parsed_output[key] = objective.post_process(
parsed_output[key], parsed_output["cutoff_time"])
if "cutoff_time" in parsed_output:
del parsed_output["cutoff_time"]
return parsed_output