Coverage for sparkle/CLI/check.py: 63%

81 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-09-29 10:17 +0000

1"""Command to help users check if their input solvers, datasets etc. are correct.""" 

2 

3import sys 

4import os 

5import argparse 

6 

7from runrunner import Runner 

8 

9from sparkle.solver import Solver 

10from sparkle.instance import Instance_Set, InstanceSet 

11from sparkle.selector.extractor import Extractor 

12from sparkle.platform import Settings 

13from sparkle.types import SolverStatus 

14 

15from sparkle.CLI.help import logging as sl 

16from sparkle.CLI.help import argparse_custom as ac 

17from sparkle.CLI.help import global_variables as gv 

18 

19 

20def parser_function() -> argparse.ArgumentParser: 

21 """Define the command line arguments.""" 

22 parser = argparse.ArgumentParser( 

23 description="Command to help users check if their input solvers, instance sets " 

24 "or feature extractors are readable by Sparkle. Specify a path to " 

25 "an instance to test calling the wrapper." 

26 ) 

27 parser.add_argument(*ac.CheckTypeArgument.names, **ac.CheckTypeArgument.kwargs) 

28 parser.add_argument(*ac.CheckPathArgument.names, **ac.CheckPathArgument.kwargs) 

29 parser.add_argument(*ac.InstancePathOptional.names, **ac.InstancePathOptional.kwargs) 

30 parser.add_argument(*ac.SeedArgument.names, **ac.SeedArgument.kwargs) 

31 # Settings arguments 

32 parser.add_argument( 

33 *Settings.OPTION_solver_cutoff_time.args, 

34 **Settings.OPTION_solver_cutoff_time.kwargs, 

35 ) 

36 return parser 

37 

38 

39def main(argv: list[str]) -> None: 

40 """Main entry point of the Check command.""" 

41 parser = parser_function() 

42 args = parser.parse_args(argv) 

43 settings = gv.settings(args) 

44 

45 # Log command call 

46 sl.log_command(sys.argv, settings.random_state) 

47 

48 type_map = { 

49 "extractor": Extractor, 

50 "feature-extractor": Extractor, 

51 "solver": Solver, 

52 "instance-set": Instance_Set, 

53 "Extractor": Extractor, 

54 "Feature-Extractor": Extractor, 

55 "Instance-Set": Instance_Set, 

56 "Solver": Solver, 

57 "FeatureExtractor": Extractor, 

58 "InstanceSet": Instance_Set, 

59 } 

60 type = type_map[args.type] 

61 print(f"Checking {type.__name__} in directory {args.path} ...") 

62 object = type(args.path) 

63 print("Resolved to:") 

64 print(object.__repr__()) 

65 

66 # Conduct object specific tests 

67 if isinstance(object, Solver): 

68 if object.pcs_file: 

69 print() 

70 print(object.get_configuration_space()) 

71 if not os.access(object.wrapper_file, os.X_OK): 

72 print( 

73 f"Wrapper {object.wrapper_file} is not executable!" 

74 f"Check that wrapper execution rights are set for all." 

75 ) 

76 sys.exit(-1) 

77 if args.instance_path: # Instance to test with 

78 object._runsolver_exec = settings.DEFAULT_runsolver_exec 

79 if not object.runsolver_exec.exists(): 

80 print( 

81 f"Runsolver {object.runsolver_exec} not found. " 

82 "Checking without Runsolver currently not supported." 

83 ) 

84 else: 

85 # Test the wrapper with a dummy call 

86 print(f"\nTesting Wrapper {object.wrapper} ...") 

87 # Patchfix runsolver 

88 objectives = settings.objectives if settings.objectives else [] 

89 cutoff = ( 

90 settings.solver_cutoff_time if settings.solver_cutoff_time else 5 

91 ) 

92 configuration = {} 

93 if object.pcs_file: 

94 print("\nSample Configuration:") 

95 sample_conf = object.get_configuration_space().sample_configuration() 

96 print(sample_conf) 

97 configuration = dict(sample_conf) 

98 result = object.run( 

99 instances=args.instance_path, 

100 objectives=objectives, 

101 seed=42, # Dummy seed 

102 cutoff_time=cutoff, 

103 configuration=configuration, 

104 log_dir=sl.caller_log_dir, 

105 run_on=Runner.LOCAL, 

106 ) 

107 print("Result:") 

108 for obj in objectives: # Check objectives 

109 if obj.name not in result: 

110 print(f"\tSolver output is missing objective {obj.name}") 

111 else: 

112 print(f"\t{obj.name}: {result[obj.name]}") 

113 status_key = [key for key in result.keys() if key.startswith("status")][ 

114 0 

115 ] 

116 if result[status_key] == SolverStatus.UNKNOWN: 

117 print( 

118 f"Sparkle was unable to process {obj.name} output. " 

119 "Check that your wrapper is able to handle KILL SIGNALS. " 

120 "It is important to always communicate back to Sparkle " 

121 "on regular exit and termination signals for stability." 

122 ) 

123 sys.exit(-1) 

124 elif isinstance(object, Extractor): 

125 if args.instance_path: # Test on an instance 

126 # Patchfix runsolver 

127 object.runsolver_exec = settings.DEFAULT_runsolver_exec 

128 if not object.runsolver_exec.exists(): 

129 print( 

130 f"Runsolver {object.runsolver_exec} not found. " 

131 "Checking without Runsolver currently not supported." 

132 ) 

133 else: 

134 print(f"\nTesting Wrapper {object.wrapper} ...") 

135 # cutoff = args.cutoff_time if args.cutoff_time else 20 # Short call 

136 result = object.run( 

137 instance=args.instance_path, log_dir=sl.caller_log_dir 

138 ) 

139 # Print feature results 

140 print("Feature values:") 

141 for f_group, f_name, f_value in result: 

142 print(f"\t[{f_group}] {f_name}: {f_value}") 

143 else: 

144 print("Feature names:") 

145 for f_group, fname in object.features: 

146 print(f"\t{f_group}: {fname}") 

147 elif isinstance(object, InstanceSet): 

148 print("\nList of instances:") 

149 for i_name, i_path in zip(object.instance_names, object.instance_paths): 

150 print(f"\t{i_name}: {i_path}") 

151 

152 sys.exit(0) 

153 

154 

155if __name__ == "__main__": 

156 main(sys.argv[1:])