Coverage for src / sparkle / selector / selector_cli.py: 81%

75 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 15:31 +0000

1#!/usr/bin/env python3 

2# -*- coding: UTF-8 -*- 

3"""Execute Sparkle portfolio selector, read/write to SelectionScenario.""" 

4 

5import argparse 

6import sys 

7from filelock import FileLock, Timeout 

8from pathlib import Path 

9 

10from sparkle.structures import PerformanceDataFrame, FeatureDataFrame 

11from sparkle.solver import Solver 

12from sparkle.selector import SelectionScenario 

13from sparkle.instance import Instance_Set 

14 

15 

16def main(argv: list[str]) -> None: 

17 """Main function of the Selector CLI.""" 

18 # Define command line arguments 

19 parser = argparse.ArgumentParser() 

20 parser.add_argument( 

21 "--selector-scenario", 

22 required=True, 

23 type=Path, 

24 help="path to portfolio selector scenario", 

25 ) 

26 parser.add_argument( 

27 "--instance", required=True, type=Path, help="path to instance to run on" 

28 ) 

29 parser.add_argument( 

30 "--feature-data", required=True, type=Path, help="path to feature data" 

31 ) 

32 parser.add_argument( 

33 "--seed", 

34 type=int, 

35 required=False, 

36 help="seed to use for the solver. If not provided, read from " 

37 "the PerformanceDataFrame or generate one.", 

38 ) 

39 parser.add_argument( 

40 "--log-dir", type=Path, required=False, help="path to the log directory" 

41 ) 

42 args = parser.parse_args(argv) 

43 

44 # Process command line arguments 

45 selector_scenario = SelectionScenario.from_file(args.selector_scenario) 

46 feature_data = FeatureDataFrame(Path(args.feature_data)) 

47 instance_set = Instance_Set(args.instance) 

48 instance = str(instance_set.instance_paths[0]) 

49 instance_name = instance_set.instance_names[0] 

50 seed = args.seed 

51 if seed is None: 

52 # Try to read from PerformanceDataFrame 

53 seed = selector_scenario.selector_performance_data.get_value( 

54 selector_scenario.__selector_solver_name__, 

55 instance_name, 

56 solver_fields=[PerformanceDataFrame.column_seed], 

57 ) 

58 if seed is None: # Still no value 

59 import random 

60 

61 seed = random.randint(0, 2**32 - 1) 

62 

63 # Note: Following code could be adjusted to run entire instance set 

64 # Run portfolio selector 

65 print(f"Sparkle portfolio selector predicting for instance {instance_name} ...") 

66 feature_instance_name = instance_name 

67 if instance_name not in feature_data.instances: 

68 if Path(instance_name).name in feature_data.instances: 

69 feature_instance_name = Path(instance_name).name 

70 elif str(instance) in feature_data.instances: 

71 feature_instance_name = str(instance) 

72 elif str(Path(instance).with_suffix("")) in feature_data.instances: 

73 feature_instance_name = str(Path(instance).with_suffix("")) 

74 else: 

75 raise ValueError( 

76 f"Could not resolve {instance} features in {feature_data.csv_filepath}" 

77 ) 

78 predict_schedule = selector_scenario.selector.run( 

79 selector_scenario.selector_file_path, feature_instance_name, feature_data 

80 ) 

81 

82 if predict_schedule is None: # Selector Failed to produce prediction 

83 sys.exit(-1) 

84 

85 print( 

86 f"Predicting done! Running schedule [{', '.join(str(x) for x in predict_schedule)}] ..." 

87 ) 

88 performance_data = selector_scenario.selector_performance_data 

89 selector_output = {} 

90 for solver, config_id, cutoff_time in predict_schedule: 

91 config = performance_data.get_full_configuration(solver, config_id) 

92 solver = Solver(Path(solver)) 

93 print( 

94 f"\t- Calling {solver.name} ({config_id}) with time budget {cutoff_time} " 

95 f"on instance {instance}..." 

96 ) 

97 solver_output = solver.run( # Runs locally by default 

98 instance, 

99 objectives=[selector_scenario.objective], 

100 seed=seed, 

101 cutoff_time=cutoff_time, 

102 configuration=config, 

103 log_dir=args.log_dir, 

104 ) 

105 for key in solver_output: 

106 if key in selector_output and isinstance(solver_output[key], (int, float)): 

107 selector_output[key] += solver_output[key] 

108 else: 

109 selector_output[key] = solver_output[key] 

110 print(f"\t- Calling solver {solver.name} ({config_id}) done!") 

111 solver_status = solver_output["status"] 

112 if solver_status.positive: 

113 print( 

114 f"[{solver_status}] {solver.name} ({config_id}) was succesfull on {instance}" 

115 ) 

116 break 

117 print(f"[{solver_status}] {solver.name} ({config_id}) failed on {instance}") 

118 

119 selector_value = selector_output[selector_scenario.objective.name] 

120 if selector_scenario.objective.post_process: 

121 selector_value = selector_scenario.objective.post_process( 

122 selector_output[selector_scenario.objective.name], 

123 cutoff_time, 

124 selector_output["status"], 

125 ) 

126 if solver_output["status"].positive: 

127 print( 

128 f"Selector {selector_scenario.selector.name} solved {instance} " 

129 f"with a value of {selector_value} ({selector_scenario.objective.name})." 

130 ) 

131 else: 

132 print(f"Selector {selector_scenario.selector.name} did not solve {instance}.") 

133 print(f"Writing results to {performance_data.csv_filepath} ...") 

134 try: 

135 # Creating a seperate locked file for writing 

136 lock = FileLock(f"{performance_data.csv_filepath}.lock") 

137 with lock.acquire(timeout=60): 

138 # Reload the dataframe to latest version 

139 performance_data = PerformanceDataFrame(performance_data.csv_filepath) 

140 performance_data.set_value( 

141 selector_value, 

142 selector_scenario.__selector_solver_name__, 

143 instance_name, 

144 objective=selector_scenario.objective.name, 

145 append_write_csv=True, 

146 ) 

147 lock.release() 

148 except Timeout: 

149 print(f"ERROR: Cannot acquire File Lock on {performance_data}.") 

150 

151 

152if __name__ == "__main__": 

153 main(sys.argv[1:])