Coverage for sparkle/CLI/compute_features.py: 0%

77 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-09-27 09:10 +0000

1#!/usr/bin/env python3 

2"""Sparkle command to compute features for instances.""" 

3from __future__ import annotations 

4import sys 

5import argparse 

6from pathlib import Path 

7 

8import runrunner as rrr 

9from runrunner.base import Runner, Status, Run 

10 

11from sparkle.solver import Extractor 

12from sparkle.CLI.help import global_variables as gv 

13from sparkle.CLI.help import logging as sl 

14from sparkle.platform.settings_objects import SettingState 

15from sparkle.CLI.help import argparse_custom as ac 

16from sparkle.platform import COMMAND_DEPENDENCIES, CommandName 

17from sparkle.CLI.initialise import check_for_initialise 

18from sparkle.structures import FeatureDataFrame 

19 

20 

21def parser_function() -> argparse.ArgumentParser: 

22 """Define the command line arguments.""" 

23 parser = argparse.ArgumentParser() 

24 parser.add_argument(*ac.RecomputeFeaturesArgument.names, 

25 **ac.RecomputeFeaturesArgument.kwargs) 

26 parser.add_argument(*ac.SettingsFileArgument.names, 

27 **ac.SettingsFileArgument.kwargs) 

28 parser.add_argument(*ac.RunOnArgument.names, 

29 **ac.RunOnArgument.kwargs) 

30 

31 return parser 

32 

33 

34def compute_features( 

35 feature_data: Path | FeatureDataFrame, 

36 recompute: bool, 

37 run_on: Runner = Runner.SLURM) -> Run: 

38 """Compute features for all instance and feature extractor combinations. 

39 

40 A RunRunner run is submitted for the computation of the features. 

41 The results are then stored in the csv file specified by feature_data_csv_path. 

42 

43 Args: 

44 feature_data: Feature Data Frame to use, or path to read it from. 

45 recompute: Specifies if features should be recomputed. 

46 run_on: Runner 

47 On which computer or cluster environment to run the solvers. 

48 Available: Runner.LOCAL, Runner.SLURM. Default: Runner.SLURM 

49 

50 Returns: 

51 The Slurm job or Local job 

52 """ 

53 if isinstance(feature_data, Path): 

54 feature_data = FeatureDataFrame(feature_data) 

55 if recompute: 

56 feature_data.reset_dataframe() 

57 jobs = feature_data.remaining_jobs() 

58 

59 # If there are no jobs, stop 

60 if not jobs: 

61 print("No feature computation jobs to run; stopping execution! To recompute " 

62 "feature values use the --recompute flag.") 

63 return None 

64 cutoff = gv.settings().get_general_extractor_cutoff_time() 

65 cmd_list = [] 

66 extractors = {} 

67 features_core = Path(__file__).parent.resolve() / "core" / "compute_features.py" 

68 # We create a job for each instance/extractor combination 

69 for instance_path, extractor_name, feature_group in jobs: 

70 extractor_path = gv.settings().DEFAULT_extractor_dir / extractor_name 

71 cmd = (f"{features_core} " 

72 f"--instance {instance_path} " 

73 f"--extractor {extractor_path} " 

74 f"--feature-csv {feature_data.csv_filepath} " 

75 f"--cutoff {cutoff}") 

76 if extractor_name in extractors: 

77 extractor = extractors[extractor_name] 

78 else: 

79 extractor = Extractor(extractor_path) 

80 extractors[extractor_name] = extractor 

81 if extractor.groupwise_computation: 

82 # Extractor job can be parallelised, thus creating i * e * g jobs 

83 cmd_list.append(cmd + f" --feature-group {feature_group}") 

84 else: 

85 cmd_list.append(cmd) 

86 

87 print(f"The number of compute jobs: {len(cmd_list)}") 

88 

89 parallel_jobs = min(len(cmd_list), gv.settings().get_number_of_jobs_in_parallel()) 

90 sbatch_options = gv.settings().get_slurm_extra_options(as_args=True) 

91 srun_options = ["-N1", "-n1"] + sbatch_options 

92 run = rrr.add_to_queue( 

93 runner=run_on, 

94 cmd=cmd_list, 

95 name=CommandName.COMPUTE_FEATURES, 

96 parallel_jobs=parallel_jobs, 

97 base_dir=sl.caller_log_dir, 

98 sbatch_options=sbatch_options, 

99 srun_options=srun_options) 

100 

101 if run_on == Runner.SLURM: 

102 print(f"Running the extractors through Slurm with Job IDs: {run.run_id}") 

103 elif run_on == Runner.LOCAL: 

104 print("Waiting for the local calculations to finish.") 

105 run.wait() 

106 for job in run.jobs: 

107 jobs_done = sum(j.status == Status.COMPLETED for j in run.jobs) 

108 print(f"Executing Progress: {jobs_done} out of {len(run.jobs)}") 

109 if jobs_done == len(run.jobs): 

110 break 

111 job.wait() 

112 print("Computing features done!") 

113 

114 return run 

115 

116 

117if __name__ == "__main__": 

118 # Log command call 

119 sl.log_command(sys.argv) 

120 

121 # Define command line arguments 

122 parser = parser_function() 

123 

124 # Process command line arguments 

125 args = parser.parse_args() 

126 

127 if args.run_on is not None: 

128 gv.settings().set_run_on( 

129 args.run_on.value, SettingState.CMD_LINE) 

130 run_on = gv.settings().get_run_on() 

131 

132 check_for_initialise(COMMAND_DEPENDENCIES[CommandName.COMPUTE_FEATURES]) 

133 

134 if ac.set_by_user(args, "settings_file"): 

135 gv.settings().read_settings_ini( 

136 args.settings_file, SettingState.CMD_LINE 

137 ) # Do first, so other command line options can override settings from the file 

138 

139 # Check if there are any feature extractors registered 

140 if not any([p.is_dir() for p in gv.settings().DEFAULT_extractor_dir.iterdir()]): 

141 print("No feature extractors present! Add feature extractors to Sparkle " 

142 "by using the add_feature_extractor command.") 

143 sys.exit() 

144 

145 # Start compute features 

146 print("Start computing features ...") 

147 compute_features(gv.settings().DEFAULT_feature_data_path, args.recompute, run_on) 

148 

149 # Write used settings to file 

150 gv.settings().write_used_settings()