Coverage for sparkle/CLI/compute_features.py: 0%
77 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-09-27 09:10 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-09-27 09:10 +0000
1#!/usr/bin/env python3
2"""Sparkle command to compute features for instances."""
3from __future__ import annotations
4import sys
5import argparse
6from pathlib import Path
8import runrunner as rrr
9from runrunner.base import Runner, Status, Run
11from sparkle.solver import Extractor
12from sparkle.CLI.help import global_variables as gv
13from sparkle.CLI.help import logging as sl
14from sparkle.platform.settings_objects import SettingState
15from sparkle.CLI.help import argparse_custom as ac
16from sparkle.platform import COMMAND_DEPENDENCIES, CommandName
17from sparkle.CLI.initialise import check_for_initialise
18from sparkle.structures import FeatureDataFrame
21def parser_function() -> argparse.ArgumentParser:
22 """Define the command line arguments."""
23 parser = argparse.ArgumentParser()
24 parser.add_argument(*ac.RecomputeFeaturesArgument.names,
25 **ac.RecomputeFeaturesArgument.kwargs)
26 parser.add_argument(*ac.SettingsFileArgument.names,
27 **ac.SettingsFileArgument.kwargs)
28 parser.add_argument(*ac.RunOnArgument.names,
29 **ac.RunOnArgument.kwargs)
31 return parser
34def compute_features(
35 feature_data: Path | FeatureDataFrame,
36 recompute: bool,
37 run_on: Runner = Runner.SLURM) -> Run:
38 """Compute features for all instance and feature extractor combinations.
40 A RunRunner run is submitted for the computation of the features.
41 The results are then stored in the csv file specified by feature_data_csv_path.
43 Args:
44 feature_data: Feature Data Frame to use, or path to read it from.
45 recompute: Specifies if features should be recomputed.
46 run_on: Runner
47 On which computer or cluster environment to run the solvers.
48 Available: Runner.LOCAL, Runner.SLURM. Default: Runner.SLURM
50 Returns:
51 The Slurm job or Local job
52 """
53 if isinstance(feature_data, Path):
54 feature_data = FeatureDataFrame(feature_data)
55 if recompute:
56 feature_data.reset_dataframe()
57 jobs = feature_data.remaining_jobs()
59 # If there are no jobs, stop
60 if not jobs:
61 print("No feature computation jobs to run; stopping execution! To recompute "
62 "feature values use the --recompute flag.")
63 return None
64 cutoff = gv.settings().get_general_extractor_cutoff_time()
65 cmd_list = []
66 extractors = {}
67 features_core = Path(__file__).parent.resolve() / "core" / "compute_features.py"
68 # We create a job for each instance/extractor combination
69 for instance_path, extractor_name, feature_group in jobs:
70 extractor_path = gv.settings().DEFAULT_extractor_dir / extractor_name
71 cmd = (f"{features_core} "
72 f"--instance {instance_path} "
73 f"--extractor {extractor_path} "
74 f"--feature-csv {feature_data.csv_filepath} "
75 f"--cutoff {cutoff}")
76 if extractor_name in extractors:
77 extractor = extractors[extractor_name]
78 else:
79 extractor = Extractor(extractor_path)
80 extractors[extractor_name] = extractor
81 if extractor.groupwise_computation:
82 # Extractor job can be parallelised, thus creating i * e * g jobs
83 cmd_list.append(cmd + f" --feature-group {feature_group}")
84 else:
85 cmd_list.append(cmd)
87 print(f"The number of compute jobs: {len(cmd_list)}")
89 parallel_jobs = min(len(cmd_list), gv.settings().get_number_of_jobs_in_parallel())
90 sbatch_options = gv.settings().get_slurm_extra_options(as_args=True)
91 srun_options = ["-N1", "-n1"] + sbatch_options
92 run = rrr.add_to_queue(
93 runner=run_on,
94 cmd=cmd_list,
95 name=CommandName.COMPUTE_FEATURES,
96 parallel_jobs=parallel_jobs,
97 base_dir=sl.caller_log_dir,
98 sbatch_options=sbatch_options,
99 srun_options=srun_options)
101 if run_on == Runner.SLURM:
102 print(f"Running the extractors through Slurm with Job IDs: {run.run_id}")
103 elif run_on == Runner.LOCAL:
104 print("Waiting for the local calculations to finish.")
105 run.wait()
106 for job in run.jobs:
107 jobs_done = sum(j.status == Status.COMPLETED for j in run.jobs)
108 print(f"Executing Progress: {jobs_done} out of {len(run.jobs)}")
109 if jobs_done == len(run.jobs):
110 break
111 job.wait()
112 print("Computing features done!")
114 return run
117if __name__ == "__main__":
118 # Log command call
119 sl.log_command(sys.argv)
121 # Define command line arguments
122 parser = parser_function()
124 # Process command line arguments
125 args = parser.parse_args()
127 if args.run_on is not None:
128 gv.settings().set_run_on(
129 args.run_on.value, SettingState.CMD_LINE)
130 run_on = gv.settings().get_run_on()
132 check_for_initialise(COMMAND_DEPENDENCIES[CommandName.COMPUTE_FEATURES])
134 if ac.set_by_user(args, "settings_file"):
135 gv.settings().read_settings_ini(
136 args.settings_file, SettingState.CMD_LINE
137 ) # Do first, so other command line options can override settings from the file
139 # Check if there are any feature extractors registered
140 if not any([p.is_dir() for p in gv.settings().DEFAULT_extractor_dir.iterdir()]):
141 print("No feature extractors present! Add feature extractors to Sparkle "
142 "by using the add_feature_extractor command.")
143 sys.exit()
145 # Start compute features
146 print("Start computing features ...")
147 compute_features(gv.settings().DEFAULT_feature_data_path, args.recompute, run_on)
149 # Write used settings to file
150 gv.settings().write_used_settings()