Coverage for src / sparkle / CLI / check.py: 17%

88 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 15:31 +0000

1"""Command to help users check if their input solvers, datasets etc. are correct.""" 

2 

3import sys 

4import os 

5import argparse 

6 

7from runrunner import Runner 

8 

9from sparkle.solver import Solver 

10from sparkle.instance import Instance_Set, InstanceSet 

11from sparkle.selector.extractor import Extractor 

12from sparkle.platform import Settings 

13from sparkle.types import SolverStatus 

14 

15from sparkle.CLI.help import logging as sl 

16from sparkle.CLI.help import argparse_custom as ac 

17from sparkle.CLI.help import global_variables as gv 

18 

19 

20def parser_function() -> argparse.ArgumentParser: 

21 """Define the command line arguments.""" 

22 parser = argparse.ArgumentParser( 

23 description="Command to help users check if their input solvers, instance sets " 

24 "or feature extractors are readable by Sparkle. Specify a path to " 

25 "an instance to test calling the wrapper." 

26 ) 

27 parser.add_argument(*ac.CheckTypeArgument.names, **ac.CheckTypeArgument.kwargs) 

28 parser.add_argument(*ac.CheckPathArgument.names, **ac.CheckPathArgument.kwargs) 

29 parser.add_argument(*ac.InstancePathOptional.names, **ac.InstancePathOptional.kwargs) 

30 parser.add_argument(*ac.SeedArgument.names, **ac.SeedArgument.kwargs) 

31 # Settings arguments 

32 parser.add_argument( 

33 *Settings.OPTION_solver_cutoff_time.args, 

34 **Settings.OPTION_solver_cutoff_time.kwargs, 

35 ) 

36 return parser 

37 

38 

39def main(argv: list[str]) -> None: 

40 """Main entry point of the Check command.""" 

41 parser = parser_function() 

42 args = parser.parse_args(argv) 

43 settings = gv.settings(args) 

44 

45 # Log command call 

46 sl.log_command(sys.argv, settings.random_state) 

47 

48 type_map = { 

49 "extractor": Extractor, 

50 "feature-extractor": Extractor, 

51 "solver": Solver, 

52 "instance-set": Instance_Set, 

53 "Extractor": Extractor, 

54 "Feature-Extractor": Extractor, 

55 "Instance-Set": Instance_Set, 

56 "Solver": Solver, 

57 "FeatureExtractor": Extractor, 

58 "InstanceSet": Instance_Set, 

59 } 

60 if args.type not in type_map: 

61 options_text = "\n".join([f"\t- {value}" for value in type_map.keys()]) 

62 raise ValueError( 

63 f"Unknown type {args.type}. Please choose from:\n{options_text}" 

64 ) 

65 type = type_map[args.type] 

66 print(f"Checking {type.__name__} in directory {args.path} ...") 

67 object = type(args.path) 

68 print("Resolved to:") 

69 print(object.__repr__()) 

70 

71 # Conduct object specific tests 

72 if isinstance(object, Solver): 

73 if object.pcs_file: 

74 print() 

75 print(object.get_configuration_space()) 

76 if not os.access(object.wrapper_file, os.X_OK): 

77 print( 

78 f"Wrapper {object.wrapper_file} is not executable! " 

79 f"Check that wrapper execution rights are set for all." 

80 ) 

81 sys.exit(-1) 

82 if args.instance_path: # Instance to test with 

83 object._runsolver_exec = settings.DEFAULT_runsolver_exec 

84 if False and not object.runsolver_exec.exists(): 

85 print( 

86 f"Runsolver {object.runsolver_exec} not found. " 

87 "Checking without Runsolver currently not supported." 

88 ) 

89 else: 

90 # Test the wrapper with a dummy call 

91 print(f"\nTesting Wrapper {object.wrapper} ...") 

92 # Patchfix runsolver 

93 objectives = settings.objectives if settings.objectives else [] 

94 cutoff = ( 

95 settings.solver_cutoff_time if settings.solver_cutoff_time else 5 

96 ) 

97 configuration = {} 

98 if object.pcs_file: 

99 print("\nSample Configuration:") 

100 sample_conf = object.get_configuration_space().sample_configuration() 

101 print(sample_conf) 

102 configuration = dict(sample_conf) 

103 result = object.run( 

104 instances=args.instance_path, 

105 objectives=objectives, 

106 seed=42, # Dummy seed 

107 cutoff_time=cutoff, 

108 configuration=configuration, 

109 log_dir=sl.caller_log_dir, 

110 run_on=Runner.LOCAL, 

111 ) 

112 log = [p for p in sl.caller_log_dir.iterdir() if p.suffix == ".rawres"] 

113 if len(log) > 0: 

114 print("\nSolver Output:") 

115 print(log[0].open().read()) 

116 else: 

117 print("\nERROR: Solver Log output not found!\n") 

118 print("\nResult:") 

119 for obj in objectives: # Check objectives 

120 if obj.name not in result: 

121 print(f"\tSolver output is missing objective {obj.name}") 

122 else: 

123 print(f"\t{obj.name}: {result[obj.name]}") 

124 status_key = [key for key in result.keys() if key.startswith("status")][ 

125 0 

126 ] 

127 if result[status_key] == SolverStatus.UNKNOWN: 

128 print( 

129 f"Sparkle was unable to process {obj.name} output. " 

130 "Check that your wrapper is able to handle KILL SIGNALS. " 

131 "It is important to always communicate back to Sparkle " 

132 "on regular exit and termination signals for stability." 

133 ) 

134 sys.exit(-1) 

135 elif isinstance(object, Extractor): 

136 if args.instance_path: # Test on an instance 

137 # Patchfix runsolver 

138 object.runsolver_exec = settings.DEFAULT_runsolver_exec 

139 if not object.runsolver_exec.exists(): 

140 print( 

141 f"Runsolver {object.runsolver_exec} not found. " 

142 "Checking without Runsolver currently not supported." 

143 ) 

144 else: 

145 print(f"\nTesting Wrapper {object.wrapper} ...") 

146 # cutoff = args.cutoff_time if args.cutoff_time else 20 # Short call 

147 result = object.run( 

148 instance=args.instance_path, log_dir=sl.caller_log_dir 

149 ) 

150 # Print feature results 

151 print("Feature values:") 

152 for f_group, f_name, f_value in result: 

153 print(f"\t[{f_group}] {f_name}: {f_value}") 

154 else: 

155 print("Feature names:") 

156 for f_group, fname in object.features: 

157 print(f"\t{f_group}: {fname}") 

158 elif isinstance(object, InstanceSet): 

159 print("\nList of instances:") 

160 for i_name, i_path in zip(object.instance_names, object.instance_paths): 

161 print(f"\t{i_name}: {i_path}") 

162 

163 sys.exit(0) 

164 

165 

166if __name__ == "__main__": 

167 main(sys.argv[1:])