Coverage for sparkle/CLI/compute_features.py: 90%
81 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-07 15:22 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-07 15:22 +0000
1#!/usr/bin/env python3
2"""Sparkle command to compute features for instances."""
3from __future__ import annotations
4import sys
5import argparse
6from pathlib import Path
8import runrunner as rrr
9from runrunner.base import Runner, Status, Run
11from sparkle.solver import Extractor
12from sparkle.CLI.help import global_variables as gv
13from sparkle.CLI.help import logging as sl
14from sparkle.platform.settings_objects import SettingState
15from sparkle.CLI.help import argparse_custom as ac
16from sparkle.CLI.initialise import check_for_initialise
17from sparkle.structures import FeatureDataFrame
20def parser_function() -> argparse.ArgumentParser:
21 """Define the command line arguments."""
22 parser = argparse.ArgumentParser(description="Sparkle command to Compute features "
23 "for instances using added extractors "
24 "and instances.")
25 parser.add_argument(*ac.RecomputeFeaturesArgument.names,
26 **ac.RecomputeFeaturesArgument.kwargs)
27 parser.add_argument(*ac.SettingsFileArgument.names,
28 **ac.SettingsFileArgument.kwargs)
29 parser.add_argument(*ac.RunOnArgument.names,
30 **ac.RunOnArgument.kwargs)
32 return parser
35def compute_features(
36 feature_data: Path | FeatureDataFrame,
37 recompute: bool,
38 run_on: Runner = Runner.SLURM) -> Run:
39 """Compute features for all instance and feature extractor combinations.
41 A RunRunner run is submitted for the computation of the features.
42 The results are then stored in the csv file specified by feature_data_csv_path.
44 Args:
45 feature_data: Feature Data Frame to use, or path to read it from.
46 recompute: Specifies if features should be recomputed.
47 run_on: Runner
48 On which computer or cluster environment to run the solvers.
49 Available: Runner.LOCAL, Runner.SLURM. Default: Runner.SLURM
51 Returns:
52 The Slurm job or Local job
53 """
54 if isinstance(feature_data, Path):
55 feature_data = FeatureDataFrame(feature_data)
56 if recompute:
57 feature_data.reset_dataframe()
58 jobs = feature_data.remaining_jobs()
60 # If there are no jobs, stop
61 if not jobs:
62 print("No feature computation jobs to run; stopping execution! To recompute "
63 "feature values use the --recompute flag.")
64 return None
65 cutoff = gv.settings().get_general_extractor_cutoff_time()
66 cmd_list = []
67 extractors = {}
68 instance_paths = set()
69 features_core = Path(__file__).parent.resolve() / "core" / "compute_features.py"
70 # We create a job for each instance/extractor combination
71 for instance_path, extractor_name, feature_group in jobs:
72 extractor_path = gv.settings().DEFAULT_extractor_dir / extractor_name
73 instance_paths.add(instance_path)
74 cmd = (f"python3 {features_core} "
75 f"--instance {instance_path} "
76 f"--extractor {extractor_path} "
77 f"--feature-csv {feature_data.csv_filepath} "
78 f"--cutoff {cutoff} "
79 f"--log-dir {sl.caller_log_dir}")
80 if extractor_name in extractors:
81 extractor = extractors[extractor_name]
82 else:
83 extractor = Extractor(extractor_path)
84 extractors[extractor_name] = extractor
85 if extractor.groupwise_computation:
86 # Extractor job can be parallelised, thus creating i * e * g jobs
87 cmd_list.append(cmd + f" --feature-group {feature_group}")
88 else:
89 cmd_list.append(cmd)
91 print(f"The number of compute jobs: {len(cmd_list)}")
93 parallel_jobs = min(len(cmd_list), gv.settings().get_number_of_jobs_in_parallel())
94 sbatch_options = gv.settings().get_slurm_extra_options(as_args=True)
95 srun_options = ["-N1", "-n1"] + sbatch_options
96 run = rrr.add_to_queue(
97 runner=run_on,
98 cmd=cmd_list,
99 name=f"Compute Features: {len(extractors)} Extractors on "
100 f"{len(instance_paths)} instances",
101 parallel_jobs=parallel_jobs,
102 base_dir=sl.caller_log_dir,
103 sbatch_options=sbatch_options,
104 srun_options=srun_options)
106 if run_on == Runner.SLURM:
107 print(f"Running the extractors through Slurm with Job IDs: {run.run_id}")
108 elif run_on == Runner.LOCAL:
109 print("Waiting for the local calculations to finish.")
110 run.wait()
111 for job in run.jobs:
112 jobs_done = sum(j.status == Status.COMPLETED for j in run.jobs)
113 print(f"Executing Progress: {jobs_done} out of {len(run.jobs)}")
114 if jobs_done == len(run.jobs):
115 break
116 job.wait()
117 print("Computing features done!")
119 return run
122def main(argv: list[str]) -> None:
123 """Main function of the compute features command."""
124 # Log command call
125 sl.log_command(sys.argv)
126 check_for_initialise()
128 # Define command line arguments
129 parser = parser_function()
131 # Process command line arguments
132 args = parser.parse_args(argv)
134 if ac.set_by_user(args, "settings_file"):
135 gv.settings().read_settings_ini(
136 args.settings_file, SettingState.CMD_LINE
137 ) # Do first, so other command line options can override settings from the file
138 if args.run_on is not None:
139 gv.settings().set_run_on(
140 args.run_on.value, SettingState.CMD_LINE)
141 run_on = gv.settings().get_run_on()
143 # Check if there are any feature extractors registered
144 if not any([p.is_dir() for p in gv.settings().DEFAULT_extractor_dir.iterdir()]):
145 print("No feature extractors present! Add feature extractors to Sparkle "
146 "by using the add_feature_extractor command.")
147 sys.exit()
149 # Start compute features
150 print("Start computing features ...")
151 compute_features(gv.settings().DEFAULT_feature_data_path, args.recompute, run_on)
153 # Write used settings to file
154 gv.settings().write_used_settings()
155 sys.exit(0)
158if __name__ == "__main__":
159 main(sys.argv[1:])