Coverage for src/sparkle/selector/selector_cli.py: 81%
75 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-15 14:11 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-15 14:11 +0000
1#!/usr/bin/env python3
2# -*- coding: UTF-8 -*-
3"""Execute Sparkle portfolio selector, read/write to SelectionScenario."""
5import argparse
6import sys
7from filelock import FileLock, Timeout
8from pathlib import Path
10from sparkle.structures import PerformanceDataFrame, FeatureDataFrame
11from sparkle.solver import Solver
12from sparkle.selector import SelectionScenario
13from sparkle.instance import Instance_Set
16def main(argv: list[str]) -> None:
17 """Main function of the Selector CLI."""
18 # Define command line arguments
19 parser = argparse.ArgumentParser()
20 parser.add_argument(
21 "--selector-scenario",
22 required=True,
23 type=Path,
24 help="path to portfolio selector scenario",
25 )
26 parser.add_argument(
27 "--instance", required=True, type=Path, help="path to instance to run on"
28 )
29 parser.add_argument(
30 "--feature-data", required=True, type=Path, help="path to feature data"
31 )
32 parser.add_argument(
33 "--seed",
34 type=int,
35 required=False,
36 help="seed to use for the solver. If not provided, read from "
37 "the PerformanceDataFrame or generate one.",
38 )
39 parser.add_argument(
40 "--log-dir", type=Path, required=False, help="path to the log directory"
41 )
42 args = parser.parse_args(argv)
44 # Process command line arguments
45 selector_scenario = SelectionScenario.from_file(args.selector_scenario)
46 feature_data = FeatureDataFrame(Path(args.feature_data))
47 instance_set = Instance_Set(args.instance)
48 instance = str(instance_set.instance_paths[0])
49 instance_name = instance_set.instance_names[0]
50 seed = args.seed
51 if seed is None:
52 # Try to read from PerformanceDataFrame
53 seed = selector_scenario.selector_performance_data.get_value(
54 selector_scenario.__selector_solver_name__,
55 instance_name,
56 solver_fields=[PerformanceDataFrame.column_seed],
57 )
58 if seed is None: # Still no value
59 import random
61 seed = random.randint(0, 2**32 - 1)
63 # Note: Following code could be adjusted to run entire instance set
64 # Run portfolio selector
65 print(f"Sparkle portfolio selector predicting for instance {instance_name} ...")
66 feature_instance_name = instance_name
67 if instance_name not in feature_data.instances:
68 if Path(instance_name).name in feature_data.instances:
69 feature_instance_name = Path(instance_name).name
70 elif str(instance) in feature_data.instances:
71 feature_instance_name = str(instance)
72 elif str(Path(instance).with_suffix("")) in feature_data.instances:
73 feature_instance_name = str(Path(instance).with_suffix(""))
74 else:
75 raise ValueError(
76 f"Could not resolve {instance} features in {feature_data.csv_filepath}"
77 )
78 predict_schedule = selector_scenario.selector.run(
79 selector_scenario.selector_file_path, feature_instance_name, feature_data
80 )
82 if predict_schedule is None: # Selector Failed to produce prediction
83 sys.exit(-1)
85 print("Predicting done! Running schedule...")
86 performance_data = selector_scenario.selector_performance_data
87 selector_output = {}
88 for solver, config_id, cutoff_time in predict_schedule:
89 config = performance_data.get_full_configuration(solver, config_id)
90 solver = Solver(Path(solver))
91 print(
92 f"\t- Calling {solver.name} ({config_id}) with time budget {cutoff_time} "
93 f"on instance {instance}..."
94 )
95 solver_output = solver.run( # Runs locally by default
96 instance,
97 objectives=[selector_scenario.objective],
98 seed=seed,
99 cutoff_time=cutoff_time,
100 configuration=config,
101 log_dir=args.log_dir,
102 )
103 for key in solver_output:
104 if key in selector_output and isinstance(solver_output[key], (int, float)):
105 selector_output[key] += solver_output[key]
106 else:
107 selector_output[key] = solver_output[key]
108 print(f"\t- Calling solver {solver.name} ({config_id}) done!")
110 if solver_output["status"].positive:
111 print(f"{instance} was solved by {solver.name} ({config_id})")
112 break
113 print(f"{instance} is not solved in this call")
115 selector_value = selector_output[selector_scenario.objective.name]
116 if selector_scenario.objective.post_process:
117 selector_value = selector_scenario.objective.post_process(
118 selector_output[selector_scenario.objective.name],
119 cutoff_time,
120 selector_output["status"],
121 )
122 if solver_output["status"].positive:
123 print(
124 f"Selector {selector_scenario.selector.name} solved {instance} "
125 f"with a value of {selector_value} ({selector_scenario.objective.name})."
126 )
127 else:
128 print(f"Selector {selector_scenario.selector.name} did not solve {instance}.")
129 print(f"Writing results to {performance_data.csv_filepath} ...")
130 try:
131 # Creating a seperate locked file for writing
132 lock = FileLock(f"{performance_data.csv_filepath}.lock")
133 with lock.acquire(timeout=60):
134 # Reload the dataframe to latest version
135 performance_data = PerformanceDataFrame(performance_data.csv_filepath)
136 performance_data.set_value(
137 selector_value,
138 selector_scenario.__selector_solver_name__,
139 instance_name,
140 objective=selector_scenario.objective.name,
141 )
142 performance_data.save_csv()
143 lock.release()
144 except Timeout:
145 print(f"ERROR: Cannot acquire File Lock on {performance_data}.")
148if __name__ == "__main__":
149 main(sys.argv[1:])