Coverage for sparkle/selector/selector_cli.py: 82%

76 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-09-29 10:17 +0000

1#!/usr/bin/env python3 

2# -*- coding: UTF-8 -*- 

3"""Execute Sparkle portfolio selector, read/write to SelectionScenario.""" 

4 

5import argparse 

6import sys 

7from filelock import FileLock, Timeout 

8from pathlib import Path 

9 

10from sparkle.structures import PerformanceDataFrame, FeatureDataFrame 

11from sparkle.solver import Solver 

12from sparkle.selector import SelectionScenario 

13from sparkle.instance import Instance_Set 

14 

15 

16def main(argv: list[str]) -> None: 

17 """Main function of the Selector CLI.""" 

18 # Define command line arguments 

19 parser = argparse.ArgumentParser() 

20 parser.add_argument( 

21 "--selector-scenario", 

22 required=True, 

23 type=Path, 

24 help="path to portfolio selector scenario", 

25 ) 

26 parser.add_argument( 

27 "--instance", required=True, type=Path, help="path to instance to run on" 

28 ) 

29 parser.add_argument( 

30 "--feature-data", required=True, type=Path, help="path to feature data" 

31 ) 

32 parser.add_argument( 

33 "--seed", 

34 type=int, 

35 required=False, 

36 help="seed to use for the solver. If not provided, read from " 

37 "the PerformanceDataFrame or generate one.", 

38 ) 

39 parser.add_argument( 

40 "--log-dir", type=Path, required=False, help="path to the log directory" 

41 ) 

42 args = parser.parse_args(argv) 

43 

44 # Process command line arguments 

45 selector_scenario = SelectionScenario.from_file(args.selector_scenario) 

46 feature_data = FeatureDataFrame(Path(args.feature_data)) 

47 instance_set = Instance_Set(args.instance) 

48 instance = str(instance_set.instance_paths[0]) 

49 instance_name = instance_set.instance_names[0] 

50 seed = args.seed 

51 if seed is None: 

52 # Try to read from PerformanceDataFrame 

53 seed = selector_scenario.selector_performance_data.get_value( 

54 selector_scenario.__selector_solver_name__, 

55 instance_name, 

56 solver_fields=[PerformanceDataFrame.column_seed], 

57 ) 

58 if seed is None: # Still no value 

59 import random 

60 

61 seed = random.randint(0, 2**32 - 1) 

62 

63 # Note: Following code could be adjusted to run entire instance set 

64 # Run portfolio selector 

65 print(f"Sparkle portfolio selector predicting for instance {instance_name} ...") 

66 feature_instance_name = instance_name 

67 if instance_name not in feature_data.instances: 

68 if Path(instance_name).name in feature_data.instances: 

69 feature_instance_name = Path(instance_name).name 

70 elif str(instance) in feature_data.instances: 

71 feature_instance_name = str(instance) 

72 elif str(Path(instance).with_suffix("")) in feature_data.instances: 

73 feature_instance_name = str(Path(instance).with_suffix("")) 

74 else: 

75 raise ValueError( 

76 f"Could not resolve {instance} features in {feature_data.csv_filepath}" 

77 ) 

78 predict_schedule = selector_scenario.selector.run( 

79 selector_scenario.selector_file_path, feature_instance_name, feature_data 

80 ) 

81 

82 if predict_schedule is None: # Selector Failed to produce prediction 

83 sys.exit(-1) 

84 

85 print("Predicting done! Running schedule...") 

86 performance_data = selector_scenario.selector_performance_data 

87 selector_output = {} 

88 for solver, config_id, cutoff_time in predict_schedule: 

89 config = performance_data.get_full_configuration(solver, config_id) 

90 solver = Solver(Path(solver)) 

91 print( 

92 f"\t- Calling {solver.name} ({config_id}) with time budget {cutoff_time} " 

93 f"on instance {instance}..." 

94 ) 

95 solver_output = solver.run( # Runs locally by default 

96 instance, 

97 objectives=[selector_scenario.objective], 

98 seed=seed, 

99 cutoff_time=cutoff_time, 

100 configuration=config, 

101 log_dir=args.log_dir, 

102 ) 

103 print(solver_output) 

104 for key in solver_output: 

105 if key in selector_output and isinstance(solver_output[key], (int, float)): 

106 selector_output[key] += solver_output[key] 

107 else: 

108 selector_output[key] = solver_output[key] 

109 print(f"\t- Calling solver {solver.name} ({config_id}) done!") 

110 

111 if solver_output["status"].positive: 

112 print(f"{instance} was solved by {solver.name} ({config_id})") 

113 break 

114 print(f"{instance} is not solved in this call") 

115 

116 selector_value = selector_output[selector_scenario.objective.name] 

117 if selector_scenario.objective.post_process: 

118 selector_value = selector_scenario.objective.post_process( 

119 selector_output[selector_scenario.objective.name], 

120 cutoff_time, 

121 selector_output["status"], 

122 ) 

123 if solver_output["status"].positive: 

124 print( 

125 f"Selector {selector_scenario.selector.name} solved {instance} " 

126 f"with a value of {selector_value} ({selector_scenario.objective.name})." 

127 ) 

128 else: 

129 print(f"Selector {selector_scenario.selector.name} did not solve {instance}.") 

130 print(f"Writing results to {performance_data.csv_filepath} ...") 

131 try: 

132 # Creating a seperate locked file for writing 

133 lock = FileLock(f"{performance_data.csv_filepath}.lock") 

134 with lock.acquire(timeout=60): 

135 # Reload the dataframe to latest version 

136 performance_data = PerformanceDataFrame(performance_data.csv_filepath) 

137 performance_data.set_value( 

138 selector_value, 

139 selector_scenario.__selector_solver_name__, 

140 instance_name, 

141 objective=selector_scenario.objective.name, 

142 ) 

143 performance_data.save_csv() 

144 lock.release() 

145 except Timeout: 

146 print(f"ERROR: Cannot acquire File Lock on {performance_data}.") 

147 

148 

149if __name__ == "__main__": 

150 main(sys.argv[1:])