Coverage for sparkle/CLI/compute_features.py: 90%

81 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-07 15:22 +0000

1#!/usr/bin/env python3 

2"""Sparkle command to compute features for instances.""" 

3from __future__ import annotations 

4import sys 

5import argparse 

6from pathlib import Path 

7 

8import runrunner as rrr 

9from runrunner.base import Runner, Status, Run 

10 

11from sparkle.solver import Extractor 

12from sparkle.CLI.help import global_variables as gv 

13from sparkle.CLI.help import logging as sl 

14from sparkle.platform.settings_objects import SettingState 

15from sparkle.CLI.help import argparse_custom as ac 

16from sparkle.CLI.initialise import check_for_initialise 

17from sparkle.structures import FeatureDataFrame 

18 

19 

20def parser_function() -> argparse.ArgumentParser: 

21 """Define the command line arguments.""" 

22 parser = argparse.ArgumentParser(description="Sparkle command to Compute features " 

23 "for instances using added extractors " 

24 "and instances.") 

25 parser.add_argument(*ac.RecomputeFeaturesArgument.names, 

26 **ac.RecomputeFeaturesArgument.kwargs) 

27 parser.add_argument(*ac.SettingsFileArgument.names, 

28 **ac.SettingsFileArgument.kwargs) 

29 parser.add_argument(*ac.RunOnArgument.names, 

30 **ac.RunOnArgument.kwargs) 

31 

32 return parser 

33 

34 

35def compute_features( 

36 feature_data: Path | FeatureDataFrame, 

37 recompute: bool, 

38 run_on: Runner = Runner.SLURM) -> Run: 

39 """Compute features for all instance and feature extractor combinations. 

40 

41 A RunRunner run is submitted for the computation of the features. 

42 The results are then stored in the csv file specified by feature_data_csv_path. 

43 

44 Args: 

45 feature_data: Feature Data Frame to use, or path to read it from. 

46 recompute: Specifies if features should be recomputed. 

47 run_on: Runner 

48 On which computer or cluster environment to run the solvers. 

49 Available: Runner.LOCAL, Runner.SLURM. Default: Runner.SLURM 

50 

51 Returns: 

52 The Slurm job or Local job 

53 """ 

54 if isinstance(feature_data, Path): 

55 feature_data = FeatureDataFrame(feature_data) 

56 if recompute: 

57 feature_data.reset_dataframe() 

58 jobs = feature_data.remaining_jobs() 

59 

60 # If there are no jobs, stop 

61 if not jobs: 

62 print("No feature computation jobs to run; stopping execution! To recompute " 

63 "feature values use the --recompute flag.") 

64 return None 

65 cutoff = gv.settings().get_general_extractor_cutoff_time() 

66 cmd_list = [] 

67 extractors = {} 

68 instance_paths = set() 

69 features_core = Path(__file__).parent.resolve() / "core" / "compute_features.py" 

70 # We create a job for each instance/extractor combination 

71 for instance_path, extractor_name, feature_group in jobs: 

72 extractor_path = gv.settings().DEFAULT_extractor_dir / extractor_name 

73 instance_paths.add(instance_path) 

74 cmd = (f"python3 {features_core} " 

75 f"--instance {instance_path} " 

76 f"--extractor {extractor_path} " 

77 f"--feature-csv {feature_data.csv_filepath} " 

78 f"--cutoff {cutoff} " 

79 f"--log-dir {sl.caller_log_dir}") 

80 if extractor_name in extractors: 

81 extractor = extractors[extractor_name] 

82 else: 

83 extractor = Extractor(extractor_path) 

84 extractors[extractor_name] = extractor 

85 if extractor.groupwise_computation: 

86 # Extractor job can be parallelised, thus creating i * e * g jobs 

87 cmd_list.append(cmd + f" --feature-group {feature_group}") 

88 else: 

89 cmd_list.append(cmd) 

90 

91 print(f"The number of compute jobs: {len(cmd_list)}") 

92 

93 parallel_jobs = min(len(cmd_list), gv.settings().get_number_of_jobs_in_parallel()) 

94 sbatch_options = gv.settings().get_slurm_extra_options(as_args=True) 

95 srun_options = ["-N1", "-n1"] + sbatch_options 

96 run = rrr.add_to_queue( 

97 runner=run_on, 

98 cmd=cmd_list, 

99 name=f"Compute Features: {len(extractors)} Extractors on " 

100 f"{len(instance_paths)} instances", 

101 parallel_jobs=parallel_jobs, 

102 base_dir=sl.caller_log_dir, 

103 sbatch_options=sbatch_options, 

104 srun_options=srun_options) 

105 

106 if run_on == Runner.SLURM: 

107 print(f"Running the extractors through Slurm with Job IDs: {run.run_id}") 

108 elif run_on == Runner.LOCAL: 

109 print("Waiting for the local calculations to finish.") 

110 run.wait() 

111 for job in run.jobs: 

112 jobs_done = sum(j.status == Status.COMPLETED for j in run.jobs) 

113 print(f"Executing Progress: {jobs_done} out of {len(run.jobs)}") 

114 if jobs_done == len(run.jobs): 

115 break 

116 job.wait() 

117 print("Computing features done!") 

118 

119 return run 

120 

121 

122def main(argv: list[str]) -> None: 

123 """Main function of the compute features command.""" 

124 # Log command call 

125 sl.log_command(sys.argv) 

126 check_for_initialise() 

127 

128 # Define command line arguments 

129 parser = parser_function() 

130 

131 # Process command line arguments 

132 args = parser.parse_args(argv) 

133 

134 if ac.set_by_user(args, "settings_file"): 

135 gv.settings().read_settings_ini( 

136 args.settings_file, SettingState.CMD_LINE 

137 ) # Do first, so other command line options can override settings from the file 

138 if args.run_on is not None: 

139 gv.settings().set_run_on( 

140 args.run_on.value, SettingState.CMD_LINE) 

141 run_on = gv.settings().get_run_on() 

142 

143 # Check if there are any feature extractors registered 

144 if not any([p.is_dir() for p in gv.settings().DEFAULT_extractor_dir.iterdir()]): 

145 print("No feature extractors present! Add feature extractors to Sparkle " 

146 "by using the add_feature_extractor command.") 

147 sys.exit() 

148 

149 # Start compute features 

150 print("Start computing features ...") 

151 compute_features(gv.settings().DEFAULT_feature_data_path, args.recompute, run_on) 

152 

153 # Write used settings to file 

154 gv.settings().write_used_settings() 

155 sys.exit(0) 

156 

157 

158if __name__ == "__main__": 

159 main(sys.argv[1:])