Coverage for sparkle/CLI/compute_features.py: 91%

88 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-07-01 13:21 +0000

1#!/usr/bin/env python3 

2"""Sparkle command to compute features for instances.""" 

3from __future__ import annotations 

4import sys 

5import argparse 

6from pathlib import Path 

7 

8import runrunner as rrr 

9from runrunner.base import Runner, Status, Run 

10 

11from sparkle.selector import Extractor 

12from sparkle.CLI.help import global_variables as gv 

13from sparkle.CLI.help import logging as sl 

14from sparkle.platform.settings_objects import SettingState 

15from sparkle.CLI.help import argparse_custom as ac 

16from sparkle.CLI.initialise import check_for_initialise 

17from sparkle.structures import FeatureDataFrame 

18from sparkle.instance import Instance_Set, InstanceSet 

19from sparkle.CLI.help.nicknames import resolve_instance_name 

20 

21 

22def parser_function() -> argparse.ArgumentParser: 

23 """Define the command line arguments.""" 

24 parser = argparse.ArgumentParser(description="Sparkle command to Compute features " 

25 "for instances using added extractors " 

26 "and instances.") 

27 parser.add_argument(*ac.RecomputeFeaturesArgument.names, 

28 **ac.RecomputeFeaturesArgument.kwargs) 

29 parser.add_argument(*ac.SettingsFileArgument.names, 

30 **ac.SettingsFileArgument.kwargs) 

31 parser.add_argument(*ac.RunOnArgument.names, 

32 **ac.RunOnArgument.kwargs) 

33 return parser 

34 

35 

36def compute_features( 

37 feature_data: Path | FeatureDataFrame, 

38 recompute: bool, 

39 run_on: Runner = Runner.SLURM) -> Run: 

40 """Compute features for all instance and feature extractor combinations. 

41 

42 A RunRunner run is submitted for the computation of the features. 

43 The results are then stored in the csv file specified by feature_data_csv_path. 

44 

45 Args: 

46 feature_data: Feature Data Frame to use, or path to read it from. 

47 recompute: Specifies if features should be recomputed. 

48 run_on: Runner 

49 On which computer or cluster environment to run the solvers. 

50 Available: Runner.LOCAL, Runner.SLURM. Default: Runner.SLURM 

51 

52 Returns: 

53 The Slurm job or Local job 

54 """ 

55 if isinstance(feature_data, Path): 

56 feature_data = FeatureDataFrame(feature_data) 

57 if recompute: 

58 feature_data.reset_dataframe() 

59 jobs = feature_data.remaining_jobs() 

60 

61 # Lookup all instances to resolve the instance paths later 

62 instances: list[InstanceSet] = [] 

63 for instance_dir in gv.settings().DEFAULT_instance_dir.iterdir(): 

64 if instance_dir.is_dir(): 

65 instances.append(Instance_Set(instance_dir)) 

66 

67 # If there are no jobs, stop 

68 if not jobs: 

69 print("No feature computation jobs to run; stopping execution! To recompute " 

70 "feature values use the --recompute flag.") 

71 return None 

72 cutoff = gv.settings().get_general_extractor_cutoff_time() 

73 cmd_list = [] 

74 extractors = {} 

75 instance_paths = set() 

76 features_core = Path(__file__).parent.resolve() / "core" / "compute_features.py" 

77 # We create a job for each instance/extractor combination 

78 for instance_name, extractor_name, feature_group in jobs: 

79 extractor_path = gv.settings().DEFAULT_extractor_dir / extractor_name 

80 # Pass instances to avoid looking it up for every iteration 

81 instance_path = resolve_instance_name(str(instance_name), instances) 

82 instance_paths.add(instance_path) 

83 

84 cmd = (f"python3 {features_core} " 

85 f"--instance {instance_path} " 

86 f"--extractor {extractor_path} " 

87 f"--feature-csv {feature_data.csv_filepath} " 

88 f"--cutoff {cutoff} " 

89 f"--log-dir {sl.caller_log_dir}") 

90 if extractor_name in extractors: 

91 extractor = extractors[extractor_name] 

92 else: 

93 extractor = Extractor(extractor_path) 

94 extractors[extractor_name] = extractor 

95 if extractor.groupwise_computation: 

96 # Extractor job can be parallelised, thus creating i * e * g jobs 

97 cmd_list.append(cmd + f" --feature-group {feature_group}") 

98 else: 

99 cmd_list.append(cmd) 

100 

101 print(f"The number of compute jobs: {len(cmd_list)}") 

102 

103 parallel_jobs = min( 

104 len(cmd_list), gv.settings().get_number_of_jobs_in_parallel()) 

105 sbatch_options = gv.settings().get_slurm_extra_options(as_args=True) 

106 srun_options = ["-N1", "-n1"] + sbatch_options 

107 run = rrr.add_to_queue( 

108 runner=run_on, 

109 cmd=cmd_list, 

110 name=f"Compute Features: {len(extractors)} Extractors on " 

111 f"{len(instance_paths)} instances", 

112 parallel_jobs=parallel_jobs, 

113 base_dir=sl.caller_log_dir, 

114 sbatch_options=sbatch_options, 

115 srun_options=srun_options, 

116 prepend=gv.settings().get_slurm_job_prepend()) 

117 

118 if run_on == Runner.SLURM: 

119 print(f"Running the extractors through Slurm with Job IDs: {run.run_id}") 

120 elif run_on == Runner.LOCAL: 

121 print("Waiting for the local calculations to finish.") 

122 run.wait() 

123 for job in run.jobs: 

124 jobs_done = sum(j.status == Status.COMPLETED for j in run.jobs) 

125 print(f"Executing Progress: {jobs_done} out of {len(run.jobs)}") 

126 if jobs_done == len(run.jobs): 

127 break 

128 job.wait() 

129 print("Computing features done!") 

130 

131 return run 

132 

133 

134def main(argv: list[str]) -> None: 

135 """Main function of the compute features command.""" 

136 # Log command call 

137 sl.log_command(sys.argv) 

138 check_for_initialise() 

139 

140 # Define command line arguments 

141 parser = parser_function() 

142 

143 # Process command line arguments 

144 args = parser.parse_args(argv) 

145 if args.settings_file is not None: 

146 gv.settings().read_settings_ini( 

147 args.settings_file, SettingState.CMD_LINE 

148 ) # Do first, so other command line options can override settings from the file 

149 if args.run_on is not None: 

150 gv.settings().set_run_on( 

151 args.run_on.value, SettingState.CMD_LINE) 

152 run_on = gv.settings().get_run_on() 

153 

154 # Check if there are any feature extractors registered 

155 if not any([p.is_dir() for p in gv.settings().DEFAULT_extractor_dir.iterdir()]): 

156 print("No feature extractors present! Add feature extractors to Sparkle " 

157 "by using the add_feature_extractor command.") 

158 sys.exit() 

159 

160 # Start compute features 

161 print("Start computing features ...") 

162 compute_features(gv.settings().DEFAULT_feature_data_path, args.recompute, run_on) 

163 

164 # Write used settings to file 

165 gv.settings().write_used_settings() 

166 sys.exit(0) 

167 

168 

169if __name__ == "__main__": 

170 main(sys.argv[1:])