"""Configurator class to use different algorithm configurators."""
from __future__ import annotations
import re
import shutil
import decimal
from pathlib import Path
import runrunner as rrr
from runrunner import Runner, Run
from sparkle.solver import Solver
from sparkle.instance import InstanceSet, Instance_Set
from sparkle.structures import PerformanceDataFrame
from sparkle.types import SparkleObjective
[docs]
class Configurator:
"""Abstact class to use different configurators like SMAC."""
configurator_cli_path = Path(__file__).parent.resolve() / "configurator_cli.py"
full_name = "Configurator Abstract Class"
version = "NaN"
def __init__(self: Configurator,
multi_objective_support: bool = False) -> None:
"""Initialize Configurator.
Args:
multi_objective_support: Whether the configurator supports
multi objective optimization for solvers.
"""
self.multiobjective = multi_objective_support
@property
def name(self: Configurator) -> str:
"""Return the name of the configurator."""
return self.__class__.__name__
[docs]
@staticmethod
def scenario_class() -> ConfigurationScenario:
"""Return the scenario class of the configurator."""
return ConfigurationScenario
[docs]
@staticmethod
def check_requirements(verbose: bool = False) -> bool:
"""Check if the configurator is installed."""
raise NotImplementedError
[docs]
@staticmethod
def download_requirements() -> None:
"""Download the configurator."""
raise NotImplementedError
[docs]
@staticmethod
def organise_output(output_source: Path,
output_target: Path,
scenario: ConfigurationScenario,
configuration_id: str) -> None | str:
"""Method to restructure and clean up after a single configurator call.
Args:
output_source: Path to the output file of the configurator run.
output_target: Path to the Performance DataFrame to store result.
scenario: ConfigurationScenario of the configuration.
configuration_id: ID (of the run) of the configuration.
"""
raise NotImplementedError
[docs]
@staticmethod
def save_configuration(scenario: ConfigurationScenario,
configuration_id: str,
configuration: dict,
output_target: Path) -> dict | None:
"""Method to save a configuration to a file.
If the output_target is None, return the configuration.
Args:
scenario: ConfigurationScenario of the configuration. Should be removed.
configuration_id: ID (of the run) of the configuration.
configuration: Configuration to save.
output_target: Path to the Performance DataFrame to store result.
"""
if output_target is None or not output_target.exists():
return configuration
# Save result to Performance DataFrame
from filelock import FileLock
lock = FileLock(f"{output_target}.lock")
with lock.acquire(timeout=600):
performance_data = PerformanceDataFrame(output_target)
# Resolve absolute path to Solver column
solver = [s for s in performance_data.solvers
if Path(s).name == scenario.solver.name][0]
# Update the configuration ID by adding the configuration
performance_data.add_configuration(
solver=solver,
configuration_id=configuration_id,
configuration=configuration)
performance_data.save_csv()
[docs]
def get_status_from_logs(self: Configurator) -> None:
"""Method to scan the log files of the configurator for warnings."""
raise NotImplementedError
[docs]
class ConfigurationScenario:
"""Template class to handle a configuration scenarios."""
def __init__(self: ConfigurationScenario,
solver: Solver,
instance_set: InstanceSet,
sparkle_objectives: list[SparkleObjective],
number_of_runs: int,
parent_directory: Path) -> None:
"""Initialize scenario paths and names.
Args:
solver: Solver that should be configured.
instance_set: Instances object for the scenario.
sparkle_objectives: Sparkle Objectives to optimize.
number_of_runs: The number of configurator runs to perform.
parent_directory: Directory in which the scenario should be placed.
"""
self.solver = solver
self.instance_set = instance_set
self.sparkle_objectives = sparkle_objectives
self.number_of_runs = number_of_runs
self.directory = parent_directory / self.name
self.scenario_file_path = self.directory / "scenario.txt"
self.timestamp_path = self.directory / "timestamp"
self.validation: Path = self.directory / "validation"
self.tmp: Path = self.directory / "tmp"
self.results_directory: Path = self.directory / "results"
self._ablation_scenario: AblationScenario = None
self._timestamp: str = None
@property
def configurator(self: ConfigurationScenario) -> Configurator:
"""Return the type of configurator the scenario belongs to."""
return Configurator
@property
def name(self: ConfigurationScenario) -> str:
"""Return the name of the scenario."""
return f"{self.solver.name}_{self.instance_set.name}"
@property
def timestamp(self: ConfigurationScenario) -> str:
"""Return the timestamp of the scenario."""
if not self.timestamp_path.exists():
return None
if self._timestamp is None:
self._timestamp = self.timestamp_path.read_text().strip()
return self._timestamp
from datetime import datetime
stamp = datetime.fromtimestamp(self.scenario_file_path.stat().st_mtime)
return stamp.strftime("%Y%m%d-%H%M")
@property
def configuration_ids(self: ConfigurationScenario) -> list[str]:
"""Return the IDs of the configurations for the scenario.
Only exists after the scenario has been created.
Returns:
List of configuration IDs, one for each run.
"""
return [f"{self.configurator.__name__}_{self.timestamp}_{i}"
for i in range(self.number_of_runs)]
@property
def ablation_scenario(self: ConfigurationScenario) -> AblationScenario:
"""Return the ablation scenario for the scenario if it exists."""
if self._ablation_scenario is not None:
return self._ablation_scenario
for scenario in self.directory.glob("*/ablation_config.txt"):
self._ablation_scenario = AblationScenario.from_file(scenario, self)
return self._ablation_scenario
return None
[docs]
def create_scenario(self: ConfigurationScenario, parent_directory: Path) -> None:
"""Create scenario with solver and instances in the parent directory.
This prepares all the necessary subdirectories related to configuration.
Args:
parent_directory: Directory in which the scenario should be created.
"""
raise NotImplementedError
[docs]
def create_scenario_file(self: ConfigurationScenario) -> Path:
"""Create a file with the configuration scenario."""
with self.timestamp_path.open("w") as fout:
from datetime import datetime
stamp = datetime.fromtimestamp(datetime.now().timestamp())
fout.write(stamp.strftime("%Y%m%d-%H%M"))
[docs]
def serialise(self: ConfigurationScenario) -> dict:
"""Serialize the configuration scenario."""
raise NotImplementedError
[docs]
@classmethod
def find_scenario(cls: ConfigurationScenario,
directory: Path,
solver: Solver,
instance_set: InstanceSet) -> ConfigurationScenario:
"""Resolve a scenario from a directory and Solver / Training set."""
scenario_name = f"{solver.name}_{instance_set.name}"
path = directory / f"{scenario_name}" / "scenario.txt"
if not path.exists():
return None
return cls.from_file(path)
[docs]
@staticmethod
def from_file(scenario_file: Path) -> ConfigurationScenario:
"""Reads scenario file and initalises ConfigurationScenario."""
raise NotImplementedError
[docs]
class AblationScenario:
"""Class for ablation analysis."""
# We use the SMAC2 target algorithm for solver output handling
configurator_target = Path(__file__).parent.resolve() /\
"implementations" / "SMAC2" / "smac2_target_algorithm.py"
ablation_dir = Path(__file__).parent / "implementations" / "ablationAnalysis-0.9.4"
ablation_executable = ablation_dir / "ablationAnalysis"
ablation_validation_executable = ablation_dir / "ablationValidation"
def __init__(self: AblationScenario,
configuration_scenario: ConfigurationScenario,
test_set: InstanceSet,
cutoff_length: str,
concurrent_clis: int,
best_configuration: dict,
ablation_racing: bool = False) -> None:
"""Initialize ablation scenario.
Args:
solver: Solver object
configuration_scenario: Configuration scenario
train_set: The training instance
test_set: The test instance
cutoff_length: The cutoff length for ablation analysis
concurrent_clis: The maximum number of concurrent jobs on a single node
best_configuration: The configuration to ablate from.
ablation_racing: Whether to use ablation racing
"""
self.config_scenario = configuration_scenario
self.solver = configuration_scenario.solver
self.train_set = configuration_scenario.instance_set
self.concurrent_clis = None
self.test_set = test_set
self.cutoff_time = configuration_scenario.solver_cutoff_time
self.cutoff_length = cutoff_length
self.concurrent_clis = concurrent_clis
self.best_configuration = best_configuration
self.ablation_racing = ablation_racing
self.scenario_name = f"ablation_{configuration_scenario.name}"
if self.test_set is not None:
self.scenario_name += f"_{self.test_set.name}"
self.scenario_dir = configuration_scenario.directory / self.scenario_name
# Create required scenario Paths
self.tmp_dir = self.scenario_dir / "tmp"
self.validation_dir = self.scenario_dir / "validation"
self.validation_dir_tmp = self.validation_dir / "tmp"
self.table_file = self.validation_dir / "log" / "ablation-validation-run1234.txt"
[docs]
@staticmethod
def check_requirements(verbose: bool = False) -> bool:
"""Check if Ablation Analysis is installed."""
import warnings
if no_java := shutil.which("java") is None:
if verbose:
warnings.warn(
"AblationAnalysis requires Java 1.8.0_402, but Java is not installed"
". Please ensure Java is installed."
)
if no_exec := not AblationScenario.ablation_executable.exists():
if verbose:
warnings.warn(
"AblationAnalysis executable not found. Please ensure Ablation"
" Analysis is installed in the expected Path "
f"({AblationScenario.ablation_executable}).")
if no_validation := not AblationScenario.ablation_validation_executable.exists():
if verbose:
warnings.warn(
"AblationAnalysis Validation executable not found. Please ensure "
"Ablation Analysis is installed in the expected Path "
f"({AblationScenario.ablation_validation_executable}).")
return not (no_java or no_exec or no_validation)
[docs]
@staticmethod
def download_requirements(
ablation_url: str =
"https://github.com/ADA-research/Sparkle/raw/refs/heads/development"
"/Resources/Other/ablationAnalysis-0.9.4.zip"
) -> None:
"""Download Ablation Analysis executable."""
if AblationScenario.ablation_executable.exists():
return # Already installed
from urllib.request import urlopen
import zipfile, io
AblationScenario.ablation_dir.mkdir(parents=True, exist_ok=True)
r = urlopen(ablation_url, timeout=60)
z = zipfile.ZipFile(io.BytesIO(r.read()))
z.extractall(AblationScenario.ablation_dir)
# Ensure execution rights
AblationScenario.ablation_executable.chmod(0o755)
AblationScenario.ablation_validation_executable.chmod(0o755)
[docs]
def create_configuration_file(self: AblationScenario) -> Path:
"""Create a configuration file for ablation analysis.
Returns:
Path to the created configuration file.
"""
objective = self.config_scenario.sparkle_objectives[0]
pcs = self.solver.get_configuration_space()
parameter_names = [p.name for p in pcs.values()]
# We need to remove any redundant keys that are not in PCS
best_configuration = self.best_configuration.copy()
removable_keys = [key for key in best_configuration
if key not in parameter_names]
for key in removable_keys:
del best_configuration[key]
opt_config_str = " ".join([f"-{k} {v}" for k, v in best_configuration.items()])
# We need to check which params are missing and supplement with default values
for p in list(pcs.values()):
if p.name not in opt_config_str:
opt_config_str += f" -{p.name} {p.default_value}"
# Ablation cannot deal with E scientific notation in floats
ctx = decimal.Context(prec=16)
for config in opt_config_str.split(" -"):
_, value = config.strip().split(" ")
if "e" in value.lower():
value = value.strip("'")
float_value = float(value.lower())
formatted = format(ctx.create_decimal(float_value), "f")
opt_config_str = opt_config_str.replace(value, formatted)
smac_run_obj = "RUNTIME" if objective.time else "QUALITY"
objective_str = "MEAN10" if objective.time else "MEAN"
pcs_file_path = f"{self.config_scenario.solver.pcs_file.absolute()}"
# Create config file
config_file = self.scenario_dir / "ablation_config.txt"
config = (f'algo = "{AblationScenario.configurator_target.absolute()} '
f"{self.config_scenario.solver.directory.absolute()} "
f'{self.tmp_dir.absolute()} {objective}"\n'
f"execdir = {self.tmp_dir.absolute()}\n"
"experimentDir = ./\n"
f"deterministic = {1 if self.solver.deterministic else 0}\n"
f"run_obj = {smac_run_obj}\n"
f"overall_obj = {objective_str}\n"
f"cutoffTime = {self.cutoff_time}\n"
f"cutoff_length = {self.cutoff_length}\n"
f"cli-cores = {self.concurrent_clis}\n"
f"useRacing = {self.ablation_racing}\n"
"seed = 1234\n" # NOTE: This does not seem right
f"paramfile = {pcs_file_path}\n"
"instance_file = instances_train.txt\n"
"test_instance_file = instances_test.txt\n"
"sourceConfiguration = DEFAULT\n"
f'targetConfiguration = "{opt_config_str}"')
config_file.open("w").write(config)
# Write config to validation directory
conf_valid = config.replace(f"execdir = {self.tmp_dir.absolute()}\n",
f"execdir = {self.validation_dir_tmp.absolute()}\n")
(self.validation_dir / config_file.name).open("w").write(conf_valid)
return self.validation_dir / config_file.name
[docs]
def create_instance_file(self: AblationScenario, test: bool = False) -> Path:
"""Create an instance file for ablation analysis."""
file_suffix = "_train.txt"
instance_set = self.train_set
if test:
file_suffix = "_test.txt"
instance_set = self.test_set if self.test_set is not None else self.train_set
# We give the Ablation script the paths of the instances
file_instance = self.scenario_dir / f"instances{file_suffix}"
with file_instance.open("w") as fh:
for instance in instance_set._instance_paths:
# We need to unpack the multi instance file paths in quotes
if isinstance(instance, list):
joined_instances = " ".join(
[str(file.absolute()) for file in instance])
fh.write(f"{joined_instances}\n")
else:
fh.write(f"{instance.absolute()}\n")
# Copy to validation directory
shutil.copyfile(file_instance, self.validation_dir / file_instance.name)
return file_instance
[docs]
def create_scenario(self: AblationScenario, override_dirs: bool = False) -> None:
"""Create scenario directory and files."""
if self.scenario_dir.exists():
print("WARNING: Found existing ablation scenario.")
if not override_dirs:
print("Set override to True to overwrite existing scenario.")
return
print("Overwriting existing scenario...")
shutil.rmtree(self.scenario_dir)
self.tmp_dir.mkdir(parents=True, exist_ok=True)
self.validation_dir_tmp.mkdir(parents=True, exist_ok=True)
self.create_instance_file()
self.create_instance_file(test=True)
self.create_configuration_file()
[docs]
def check_for_ablation(self: AblationScenario) -> bool:
"""Checks if ablation has terminated successfully."""
if not self.table_file.is_file():
return False
# First line in the table file should be "Ablation analysis validation complete."
table_line = self.table_file.open().readline().strip()
return table_line == "Ablation analysis validation complete."
[docs]
def read_ablation_table(self: AblationScenario) -> list[list[str]]:
"""Read from ablation table of a scenario."""
if not self.check_for_ablation():
# No ablation table exists for this solver-instance pair
return []
results = [["Round", "Flipped parameter", "Source value", "Target value",
"Validation result"]]
for line in self.table_file.open().readlines():
# Pre-process lines from the ablation file and add to the results dictionary.
# Sometimes ablation rounds switch multiple parameters at once.
# EXAMPLE: 2 EDR, EDRalpha 0, 0.1 1, 0.1013241633106732 486.31691
# To split the row correctly, we remove the space before the comma separated
# parameters and add it back.
# T.S. 30-01-2024: the results object is a nested list not dictionary?
values = re.sub(r"\s+", " ", line.strip())
values = re.sub(r", ", ",", values)
values = [val.replace(",", ", ") for val in values.split(" ")]
if len(values) == 5:
results.append(values)
return results
[docs]
def submit_ablation(self: AblationScenario,
log_dir: Path,
sbatch_options: list[str] = [],
slurm_prepend: str | list[str] | Path = None,
run_on: Runner = Runner.SLURM) -> list[Run]:
"""Submit an ablation job.
Args:
log_dir: Directory to store job logs
sbatch_options: Options to pass to sbatch
slurm_prepend: Script to prepend to sbatch script
run_on: Determines to which RunRunner queue the job is added
Returns:
A list of Run objects. Empty when running locally.
"""
if not self.check_requirements(verbose=True):
raise RuntimeError(
"Ablation Analysis is not available. Please ensure Java and Ablation "
"Analysis is installed and try again."
)
# 1. submit the ablation to the runrunner queue
cmd = (f"{AblationScenario.ablation_executable.absolute()} "
"--optionFile ablation_config.txt")
srun_options = ["-N1", "-n1", f"-c{self.concurrent_clis}"]
sbatch_options += [f"--cpus-per-task={self.concurrent_clis}"]
run_ablation = rrr.add_to_queue(
runner=run_on,
cmd=cmd,
name=f"Ablation analysis: {self.solver.name} on {self.train_set.name}",
base_dir=log_dir,
path=self.scenario_dir,
sbatch_options=sbatch_options,
srun_options=srun_options,
prepend=slurm_prepend)
runs = []
if run_on == Runner.LOCAL:
run_ablation.wait()
runs.append(run_ablation)
# 2. Run ablation validation run if we have a test set to run on
if self.test_set is not None:
# Validation dir should have a copy of all needed files, except for the
# output of the ablation run, which is stored in ablation-run[seed].txt
cmd = f"{AblationScenario.ablation_validation_executable.absolute()} "\
"--optionFile ablation_config.txt "\
"--ablationLogFile ../log/ablation-run1234.txt"
run_ablation_validation = rrr.add_to_queue(
runner=run_on,
cmd=cmd,
name=f"Ablation validation: Test set {self.test_set.name}",
path=self.validation_dir,
base_dir=log_dir,
dependencies=run_ablation,
sbatch_options=sbatch_options,
prepend=slurm_prepend)
if run_on == Runner.LOCAL:
run_ablation_validation.wait()
runs.append(run_ablation_validation)
return runs
[docs]
@staticmethod
def from_file(path: Path,
config_scenario: ConfigurationScenario) -> AblationScenario:
"""Reads scenario file and initalises AblationScenario."""
variables = {}
for line in path.open().readlines():
if line.strip() == "":
continue
key, value = line.strip().split(" = ", maxsplit=1)
variables[key] = value
best_conf = {}
for keyvalue in variables["targetConfiguration"].replace('"', "").split("-"):
keyvalue = keyvalue.strip()
if keyvalue:
key, value = keyvalue.strip().split(" ", maxsplit=1)
best_conf[key] = value
test_set = None
if (path.parent / "instances_test.txt").exists():
test_path = (path.parent / "instances_test.txt").open().readline().strip()
test_path = Path(test_path).parent
if test_path != config_scenario.instance_set.directory:
test_set = Instance_Set(test_path)
return AblationScenario(config_scenario,
test_set,
variables["cutoff_length"],
int(variables["cli-cores"]),
best_conf,
ablation_racing=bool(variables["useRacing"]))