Coverage for src / sparkle / selector / selector_cli.py: 81%
75 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 15:31 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 15:31 +0000
1#!/usr/bin/env python3
2# -*- coding: UTF-8 -*-
3"""Execute Sparkle portfolio selector, read/write to SelectionScenario."""
5import argparse
6import sys
7from filelock import FileLock, Timeout
8from pathlib import Path
10from sparkle.structures import PerformanceDataFrame, FeatureDataFrame
11from sparkle.solver import Solver
12from sparkle.selector import SelectionScenario
13from sparkle.instance import Instance_Set
16def main(argv: list[str]) -> None:
17 """Main function of the Selector CLI."""
18 # Define command line arguments
19 parser = argparse.ArgumentParser()
20 parser.add_argument(
21 "--selector-scenario",
22 required=True,
23 type=Path,
24 help="path to portfolio selector scenario",
25 )
26 parser.add_argument(
27 "--instance", required=True, type=Path, help="path to instance to run on"
28 )
29 parser.add_argument(
30 "--feature-data", required=True, type=Path, help="path to feature data"
31 )
32 parser.add_argument(
33 "--seed",
34 type=int,
35 required=False,
36 help="seed to use for the solver. If not provided, read from "
37 "the PerformanceDataFrame or generate one.",
38 )
39 parser.add_argument(
40 "--log-dir", type=Path, required=False, help="path to the log directory"
41 )
42 args = parser.parse_args(argv)
44 # Process command line arguments
45 selector_scenario = SelectionScenario.from_file(args.selector_scenario)
46 feature_data = FeatureDataFrame(Path(args.feature_data))
47 instance_set = Instance_Set(args.instance)
48 instance = str(instance_set.instance_paths[0])
49 instance_name = instance_set.instance_names[0]
50 seed = args.seed
51 if seed is None:
52 # Try to read from PerformanceDataFrame
53 seed = selector_scenario.selector_performance_data.get_value(
54 selector_scenario.__selector_solver_name__,
55 instance_name,
56 solver_fields=[PerformanceDataFrame.column_seed],
57 )
58 if seed is None: # Still no value
59 import random
61 seed = random.randint(0, 2**32 - 1)
63 # Note: Following code could be adjusted to run entire instance set
64 # Run portfolio selector
65 print(f"Sparkle portfolio selector predicting for instance {instance_name} ...")
66 feature_instance_name = instance_name
67 if instance_name not in feature_data.instances:
68 if Path(instance_name).name in feature_data.instances:
69 feature_instance_name = Path(instance_name).name
70 elif str(instance) in feature_data.instances:
71 feature_instance_name = str(instance)
72 elif str(Path(instance).with_suffix("")) in feature_data.instances:
73 feature_instance_name = str(Path(instance).with_suffix(""))
74 else:
75 raise ValueError(
76 f"Could not resolve {instance} features in {feature_data.csv_filepath}"
77 )
78 predict_schedule = selector_scenario.selector.run(
79 selector_scenario.selector_file_path, feature_instance_name, feature_data
80 )
82 if predict_schedule is None: # Selector Failed to produce prediction
83 sys.exit(-1)
85 print(
86 f"Predicting done! Running schedule [{', '.join(str(x) for x in predict_schedule)}] ..."
87 )
88 performance_data = selector_scenario.selector_performance_data
89 selector_output = {}
90 for solver, config_id, cutoff_time in predict_schedule:
91 config = performance_data.get_full_configuration(solver, config_id)
92 solver = Solver(Path(solver))
93 print(
94 f"\t- Calling {solver.name} ({config_id}) with time budget {cutoff_time} "
95 f"on instance {instance}..."
96 )
97 solver_output = solver.run( # Runs locally by default
98 instance,
99 objectives=[selector_scenario.objective],
100 seed=seed,
101 cutoff_time=cutoff_time,
102 configuration=config,
103 log_dir=args.log_dir,
104 )
105 for key in solver_output:
106 if key in selector_output and isinstance(solver_output[key], (int, float)):
107 selector_output[key] += solver_output[key]
108 else:
109 selector_output[key] = solver_output[key]
110 print(f"\t- Calling solver {solver.name} ({config_id}) done!")
111 solver_status = solver_output["status"]
112 if solver_status.positive:
113 print(
114 f"[{solver_status}] {solver.name} ({config_id}) was succesfull on {instance}"
115 )
116 break
117 print(f"[{solver_status}] {solver.name} ({config_id}) failed on {instance}")
119 selector_value = selector_output[selector_scenario.objective.name]
120 if selector_scenario.objective.post_process:
121 selector_value = selector_scenario.objective.post_process(
122 selector_output[selector_scenario.objective.name],
123 cutoff_time,
124 selector_output["status"],
125 )
126 if solver_output["status"].positive:
127 print(
128 f"Selector {selector_scenario.selector.name} solved {instance} "
129 f"with a value of {selector_value} ({selector_scenario.objective.name})."
130 )
131 else:
132 print(f"Selector {selector_scenario.selector.name} did not solve {instance}.")
133 print(f"Writing results to {performance_data.csv_filepath} ...")
134 try:
135 # Creating a seperate locked file for writing
136 lock = FileLock(f"{performance_data.csv_filepath}.lock")
137 with lock.acquire(timeout=60):
138 # Reload the dataframe to latest version
139 performance_data = PerformanceDataFrame(performance_data.csv_filepath)
140 performance_data.set_value(
141 selector_value,
142 selector_scenario.__selector_solver_name__,
143 instance_name,
144 objective=selector_scenario.objective.name,
145 append_write_csv=True,
146 )
147 lock.release()
148 except Timeout:
149 print(f"ERROR: Cannot acquire File Lock on {performance_data}.")
152if __name__ == "__main__":
153 main(sys.argv[1:])