Coverage for src / sparkle / CLI / check.py: 17%
88 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 15:31 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 15:31 +0000
1"""Command to help users check if their input solvers, datasets etc. are correct."""
3import sys
4import os
5import argparse
7from runrunner import Runner
9from sparkle.solver import Solver
10from sparkle.instance import Instance_Set, InstanceSet
11from sparkle.selector.extractor import Extractor
12from sparkle.platform import Settings
13from sparkle.types import SolverStatus
15from sparkle.CLI.help import logging as sl
16from sparkle.CLI.help import argparse_custom as ac
17from sparkle.CLI.help import global_variables as gv
20def parser_function() -> argparse.ArgumentParser:
21 """Define the command line arguments."""
22 parser = argparse.ArgumentParser(
23 description="Command to help users check if their input solvers, instance sets "
24 "or feature extractors are readable by Sparkle. Specify a path to "
25 "an instance to test calling the wrapper."
26 )
27 parser.add_argument(*ac.CheckTypeArgument.names, **ac.CheckTypeArgument.kwargs)
28 parser.add_argument(*ac.CheckPathArgument.names, **ac.CheckPathArgument.kwargs)
29 parser.add_argument(*ac.InstancePathOptional.names, **ac.InstancePathOptional.kwargs)
30 parser.add_argument(*ac.SeedArgument.names, **ac.SeedArgument.kwargs)
31 # Settings arguments
32 parser.add_argument(
33 *Settings.OPTION_solver_cutoff_time.args,
34 **Settings.OPTION_solver_cutoff_time.kwargs,
35 )
36 return parser
39def main(argv: list[str]) -> None:
40 """Main entry point of the Check command."""
41 parser = parser_function()
42 args = parser.parse_args(argv)
43 settings = gv.settings(args)
45 # Log command call
46 sl.log_command(sys.argv, settings.random_state)
48 type_map = {
49 "extractor": Extractor,
50 "feature-extractor": Extractor,
51 "solver": Solver,
52 "instance-set": Instance_Set,
53 "Extractor": Extractor,
54 "Feature-Extractor": Extractor,
55 "Instance-Set": Instance_Set,
56 "Solver": Solver,
57 "FeatureExtractor": Extractor,
58 "InstanceSet": Instance_Set,
59 }
60 if args.type not in type_map:
61 options_text = "\n".join([f"\t- {value}" for value in type_map.keys()])
62 raise ValueError(
63 f"Unknown type {args.type}. Please choose from:\n{options_text}"
64 )
65 type = type_map[args.type]
66 print(f"Checking {type.__name__} in directory {args.path} ...")
67 object = type(args.path)
68 print("Resolved to:")
69 print(object.__repr__())
71 # Conduct object specific tests
72 if isinstance(object, Solver):
73 if object.pcs_file:
74 print()
75 print(object.get_configuration_space())
76 if not os.access(object.wrapper_file, os.X_OK):
77 print(
78 f"Wrapper {object.wrapper_file} is not executable! "
79 f"Check that wrapper execution rights are set for all."
80 )
81 sys.exit(-1)
82 if args.instance_path: # Instance to test with
83 object._runsolver_exec = settings.DEFAULT_runsolver_exec
84 if False and not object.runsolver_exec.exists():
85 print(
86 f"Runsolver {object.runsolver_exec} not found. "
87 "Checking without Runsolver currently not supported."
88 )
89 else:
90 # Test the wrapper with a dummy call
91 print(f"\nTesting Wrapper {object.wrapper} ...")
92 # Patchfix runsolver
93 objectives = settings.objectives if settings.objectives else []
94 cutoff = (
95 settings.solver_cutoff_time if settings.solver_cutoff_time else 5
96 )
97 configuration = {}
98 if object.pcs_file:
99 print("\nSample Configuration:")
100 sample_conf = object.get_configuration_space().sample_configuration()
101 print(sample_conf)
102 configuration = dict(sample_conf)
103 result = object.run(
104 instances=args.instance_path,
105 objectives=objectives,
106 seed=42, # Dummy seed
107 cutoff_time=cutoff,
108 configuration=configuration,
109 log_dir=sl.caller_log_dir,
110 run_on=Runner.LOCAL,
111 )
112 log = [p for p in sl.caller_log_dir.iterdir() if p.suffix == ".rawres"]
113 if len(log) > 0:
114 print("\nSolver Output:")
115 print(log[0].open().read())
116 else:
117 print("\nERROR: Solver Log output not found!\n")
118 print("\nResult:")
119 for obj in objectives: # Check objectives
120 if obj.name not in result:
121 print(f"\tSolver output is missing objective {obj.name}")
122 else:
123 print(f"\t{obj.name}: {result[obj.name]}")
124 status_key = [key for key in result.keys() if key.startswith("status")][
125 0
126 ]
127 if result[status_key] == SolverStatus.UNKNOWN:
128 print(
129 f"Sparkle was unable to process {obj.name} output. "
130 "Check that your wrapper is able to handle KILL SIGNALS. "
131 "It is important to always communicate back to Sparkle "
132 "on regular exit and termination signals for stability."
133 )
134 sys.exit(-1)
135 elif isinstance(object, Extractor):
136 if args.instance_path: # Test on an instance
137 # Patchfix runsolver
138 object.runsolver_exec = settings.DEFAULT_runsolver_exec
139 if not object.runsolver_exec.exists():
140 print(
141 f"Runsolver {object.runsolver_exec} not found. "
142 "Checking without Runsolver currently not supported."
143 )
144 else:
145 print(f"\nTesting Wrapper {object.wrapper} ...")
146 # cutoff = args.cutoff_time if args.cutoff_time else 20 # Short call
147 result = object.run(
148 instance=args.instance_path, log_dir=sl.caller_log_dir
149 )
150 # Print feature results
151 print("Feature values:")
152 for f_group, f_name, f_value in result:
153 print(f"\t[{f_group}] {f_name}: {f_value}")
154 else:
155 print("Feature names:")
156 for f_group, fname in object.features:
157 print(f"\t{f_group}: {fname}")
158 elif isinstance(object, InstanceSet):
159 print("\nList of instances:")
160 for i_name, i_path in zip(object.instance_names, object.instance_paths):
161 print(f"\t{i_name}: {i_path}")
163 sys.exit(0)
166if __name__ == "__main__":
167 main(sys.argv[1:])