"""File to handle a solver and its directories."""
from __future__ import annotations
import sys
from typing import Any
import shlex
import ast
import json
from pathlib import Path
from ConfigSpace import ConfigurationSpace
import runrunner as rrr
from runrunner.local import LocalRun
from runrunner.slurm import Run, SlurmRun
from runrunner.base import Status, Runner
from sparkle.tools import pcsparser, RunSolver
from sparkle.types import SparkleCallable, SolverStatus
from sparkle.solver import verifiers
from sparkle.instance import InstanceSet
from sparkle.structures import PerformanceDataFrame
from sparkle.types import resolve_objective, SparkleObjective, UseTime
[docs]
class Solver(SparkleCallable):
"""Class to handle a solver and its directories."""
meta_data = "solver_meta.txt"
wrapper = "sparkle_solver_wrapper.py"
solver_cli = Path(__file__).parent / "solver_cli.py"
def __init__(self: Solver,
directory: Path,
raw_output_directory: Path = None,
runsolver_exec: Path = None,
deterministic: bool = None,
verifier: verifiers.SolutionVerifier = None) -> None:
"""Initialize solver.
Args:
directory: Directory of the solver.
raw_output_directory: Directory where solver will write its raw output.
runsolver_exec: Path to the runsolver executable.
By default, runsolver in directory.
deterministic: Bool indicating determinism of the algorithm.
Defaults to False.
verifier: The solution verifier to use. If None, no verifier is used.
"""
super().__init__(directory, runsolver_exec, raw_output_directory)
self.deterministic = deterministic
self.verifier = verifier
meta_data_file = self.directory / Solver.meta_data
if self.runsolver_exec is None:
self.runsolver_exec = self.directory / "runsolver"
if meta_data_file.exists():
meta_data = ast.literal_eval(meta_data_file.open().read())
# We only override the deterministic and verifier from file if not set
if self.deterministic is None:
if ("deterministic" in meta_data
and meta_data["deterministic"] is not None):
self.deterministic = meta_data["deterministic"]
if self.verifier is None and "verifier" in meta_data:
if isinstance(meta_data["verifier"], tuple): # File verifier
self.verifier = verifiers.mapping[meta_data["verifier"][0]](
Path(meta_data["verifier"][1])
)
elif meta_data["verifier"] in verifiers.mapping:
self.verifier = verifiers.mapping[meta_data["verifier"]]
if self.deterministic is None: # Default to False
self.deterministic = False
def __str__(self: Solver) -> str:
"""Return the sting representation of the solver."""
return self.name
def _get_pcs_file(self: Solver, port_type: str = None) -> Path | bool:
"""Get path of the parameter file.
Returns:
Path to the parameter file or False if the parameter file does not exist.
"""
pcs_files = [p for p in self.directory.iterdir() if p.suffix == ".pcs"
and (port_type is None or port_type in p.name)]
if len(pcs_files) == 0:
return False
if len(pcs_files) != 1:
# Generated PCS files present, this is a quick fix to take the original
pcs_files = sorted(pcs_files, key=lambda p: len(p.name))
return pcs_files[0]
[docs]
def get_pcs_file(self: Solver, port_type: str = None) -> Path:
"""Get path of the parameter file.
Returns:
Path to the parameter file. None if it can not be resolved.
"""
if not (file_path := self._get_pcs_file(port_type)):
return None
return file_path
[docs]
def read_pcs_file(self: Solver) -> bool:
"""Checks if the pcs file can be read."""
pcs_file = self._get_pcs_file()
try:
parser = pcsparser.PCSParser()
parser.load(str(pcs_file), convention="smac")
return True
except SyntaxError:
pass
return False
[docs]
def get_pcs(self: Solver) -> dict[str, tuple[str, str, str]]:
"""Get the parameter content of the PCS file."""
if not (pcs_file := self.get_pcs_file()):
return None
parser = pcsparser.PCSParser()
parser.load(str(pcs_file), convention="smac")
return [p for p in parser.pcs.params if p["type"] == "parameter"]
[docs]
def port_pcs(self: Solver, port_type: pcsparser.PCSConvention) -> None:
"""Port the parameter file to the given port type."""
pcs_file = self.get_pcs_file()
parser = pcsparser.PCSParser()
parser.load(str(pcs_file), convention="smac")
target_pcs_file = pcs_file.parent / f"{pcs_file.stem}_{port_type}.pcs"
if target_pcs_file.exists(): # Already exists, possibly user defined
return
parser.export(convention=port_type,
destination=target_pcs_file)
[docs]
def get_configspace(self: Solver) -> ConfigurationSpace:
"""Get the parameter content of the PCS file."""
if not (pcs_file := self.get_pcs_file()):
return None
parser = pcsparser.PCSParser()
parser.load(str(pcs_file), convention="smac")
return parser.get_configspace()
[docs]
def get_forbidden(self: Solver, port_type: pcsparser.PCSConvention) -> Path:
"""Get the path to the file containing forbidden parameter combinations."""
if port_type == "IRACE":
forbidden = [p for p in self.directory.iterdir()
if p.name.endswith("forbidden.txt")]
if len(forbidden) > 0:
return forbidden[0]
return None
[docs]
def build_cmd(self: Solver,
instance: str | list[str],
objectives: list[SparkleObjective],
seed: int,
cutoff_time: int = None,
configuration: dict = None,
log_dir: Path = None) -> list[str]:
"""Build the solver call on an instance with a configuration.
Args:
instance: Path to the instance.
seed: Seed of the solver.
cutoff_time: Cutoff time for the solver.
configuration: Configuration of the solver.
Returns:
List of commands and arguments to execute the solver.
"""
if configuration is None:
configuration = {}
# Ensure configuration contains required entries for each wrapper
configuration["solver_dir"] = str(self.directory.absolute())
configuration["instance"] = instance
configuration["seed"] = seed
configuration["objectives"] = ",".join([str(obj) for obj in objectives])
configuration["cutoff_time"] =\
cutoff_time if cutoff_time is not None else sys.maxsize
if "configuration_id" in configuration:
del configuration["configuration_id"]
# Ensure stringification of dictionary will go correctly for key value pairs
configuration = {key: str(configuration[key]) for key in configuration}
solver_cmd = [str((self.directory / Solver.wrapper)),
f"'{json.dumps(configuration)}'"]
if log_dir is None:
log_dir = Path()
if cutoff_time is not None: # Use RunSolver
log_name_base = f"{Path(instance).name}_{self.name}"
return RunSolver.wrap_command(self.runsolver_exec,
solver_cmd,
cutoff_time,
log_dir,
log_name_base=log_name_base)
return solver_cmd
[docs]
def run(self: Solver,
instances: str | list[str] | InstanceSet | list[InstanceSet],
objectives: list[SparkleObjective],
seed: int,
cutoff_time: int = None,
configuration: dict = None,
run_on: Runner = Runner.LOCAL,
sbatch_options: list[str] = None,
log_dir: Path = None,
) -> SlurmRun | list[dict[str, Any]] | dict[str, Any]:
"""Run the solver on an instance with a certain configuration.
Args:
instance: The instance(s) to run the solver on, list in case of multi-file.
In case of an instance set, will run on all instances in the set.
seed: Seed to run the solver with. Fill with abitrary int in case of
determnistic solver.
cutoff_time: The cutoff time for the solver, measured through RunSolver.
If None, will be executed without RunSolver.
configuration: The solver configuration to use. Can be empty.
log_dir: Path where to place output files. Defaults to
self.raw_output_directory.
Returns:
Solver output dict possibly with runsolver values.
"""
if log_dir is None:
log_dir = self.raw_output_directory
cmds = []
instances = [instances] if not isinstance(instances, list) else instances
set_label = instances.name if isinstance(instances, InstanceSet) else "instances"
for instance in instances:
paths = instance.instace_paths if isinstance(instance,
InstanceSet) else [instance]
for instance_path in paths:
solver_cmd = self.build_cmd(instance_path,
objectives=objectives,
seed=seed,
cutoff_time=cutoff_time,
configuration=configuration,
log_dir=log_dir)
cmds.append(" ".join(solver_cmd))
commandname = f"Run Solver: {self.name} on {set_label}"
run = rrr.add_to_queue(runner=run_on,
cmd=cmds,
name=commandname,
base_dir=log_dir,
sbatch_options=sbatch_options)
if isinstance(run, LocalRun):
run.wait()
import time
time.sleep(5)
# Subprocess resulted in error
if run.status == Status.ERROR:
print(f"WARNING: Solver {self.name} execution seems to have failed!\n")
for i, job in enumerate(run.jobs):
print(f"[Job {i}] The used command was: {cmds[i]}\n"
"The error yielded was:\n"
f"\t-stdout: '{run.jobs[0]._process.stdout}'\n"
f"\t-stderr: '{run.jobs[0]._process.stderr}'\n")
return {"status": SolverStatus.ERROR, }
solver_outputs = []
for i, job in enumerate(run.jobs):
solver_cmd = cmds[i].split(" ")
solver_output = Solver.parse_solver_output(run.jobs[i].stdout,
solver_call=solver_cmd,
objectives=objectives,
verifier=self.verifier)
solver_outputs.append(solver_output)
return solver_outputs if len(solver_outputs) > 1 else solver_output
return run
[docs]
@staticmethod
def config_str_to_dict(config_str: str) -> dict[str, str]:
"""Parse a configuration string to a dictionary."""
# First we filter the configuration of unwanted characters
config_str = config_str.strip().replace("-", "")
# Then we split the string by spaces, but conserve substrings
config_list = shlex.split(config_str)
# We return empty for empty input OR uneven input
if config_str == "" or config_str == r"{}" or len(config_list) & 1:
return {}
config_dict = {}
for index in range(0, len(config_list), 2):
# As the value will already be a string object, no quotes are allowed in it
value = config_list[index + 1].strip('"').strip("'")
config_dict[config_list[index]] = value
return config_dict
[docs]
@staticmethod
def parse_solver_output(
solver_output: str,
solver_call: list[str | Path] = None,
objectives: list[SparkleObjective] = None,
verifier: verifiers.SolutionVerifier = None) -> dict[str, Any]:
"""Parse the output of the solver.
Args:
solver_output: The output of the solver run which needs to be parsed
solver_call: The solver call used to run the solver
objectives: The objectives to apply to the solver output
verifier: The verifier to check the solver output
Returns:
Dictionary representing the parsed solver output
"""
used_runsolver = False
if solver_call is not None and len(solver_call) > 2:
used_runsolver = True
parsed_output = RunSolver.get_solver_output(solver_call,
solver_output)
else:
parsed_output = ast.literal_eval(solver_output)
# cast status attribute from str to Enum
parsed_output["status"] = SolverStatus(parsed_output["status"])
# Apply objectives to parsed output, runtime based objectives added here
if verifier is not None and used_runsolver:
# Horrible hack to get the instance from the solver input
solver_call_str: str = " ".join(solver_call)
solver_input_str = solver_call_str.split(Solver.wrapper, maxsplit=1)[1]
solver_input_str = solver_input_str[solver_input_str.index("{"):
solver_input_str.index("}") + 1]
solver_input = ast.literal_eval(solver_input_str)
target_instance = Path(solver_input["instance"])
parsed_output["status"] = verifier.verify(
target_instance, parsed_output, solver_call)
# Create objective map
objectives = {o.stem: o for o in objectives} if objectives else {}
removable_keys = ["cutoff_time"] # Keys to remove
# apply objectives to parsed output, runtime based objectives added here
for key, value in parsed_output.items():
if objectives and key in objectives:
objective = objectives[key]
removable_keys.append(key) # We translate it into the full name
else:
objective = resolve_objective(key)
# If not found in objectives, resolve to which objective the output belongs
if objective is None: # Could not parse, skip
continue
if objective.use_time == UseTime.NO:
if objective.post_process is not None:
parsed_output[key] = objective.post_process(value)
else:
if not used_runsolver:
continue
if objective.use_time == UseTime.CPU_TIME:
parsed_output[key] = parsed_output["cpu_time"]
else:
parsed_output[key] = parsed_output["wall_time"]
if objective.post_process is not None:
parsed_output[key] = objective.post_process(
parsed_output[key],
parsed_output["cutoff_time"],
parsed_output["status"])
# Replace or remove keys based on the objective names
for key in removable_keys:
if key in parsed_output:
if key in objectives:
# Map the result to the objective
parsed_output[objectives[key].name] = parsed_output[key]
if key != objectives[key].name: # Only delete actual mappings
del parsed_output[key]
else:
del parsed_output[key]
return parsed_output