Coverage for sparkle/CLI/compute_features.py: 90%
81 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-03 10:42 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-03 10:42 +0000
1#!/usr/bin/env python3
2"""Sparkle command to compute features for instances."""
3from __future__ import annotations
4import sys
5import argparse
6from pathlib import Path
8import runrunner as rrr
9from runrunner.base import Runner, Status, Run
11from sparkle.solver import Extractor
12from sparkle.CLI.help import global_variables as gv
13from sparkle.CLI.help import logging as sl
14from sparkle.platform.settings_objects import SettingState
15from sparkle.CLI.help import argparse_custom as ac
16from sparkle.CLI.initialise import check_for_initialise
17from sparkle.structures import FeatureDataFrame
20def parser_function() -> argparse.ArgumentParser:
21 """Define the command line arguments."""
22 parser = argparse.ArgumentParser(description="Sparkle command to Compute features "
23 "for instances using added extractors "
24 "and instances.")
25 parser.add_argument(*ac.RecomputeFeaturesArgument.names,
26 **ac.RecomputeFeaturesArgument.kwargs)
27 parser.add_argument(*ac.SettingsFileArgument.names,
28 **ac.SettingsFileArgument.kwargs)
29 parser.add_argument(*ac.RunOnArgument.names,
30 **ac.RunOnArgument.kwargs)
32 return parser
35def compute_features(
36 feature_data: Path | FeatureDataFrame,
37 recompute: bool,
38 run_on: Runner = Runner.SLURM) -> Run:
39 """Compute features for all instance and feature extractor combinations.
41 A RunRunner run is submitted for the computation of the features.
42 The results are then stored in the csv file specified by feature_data_csv_path.
44 Args:
45 feature_data: Feature Data Frame to use, or path to read it from.
46 recompute: Specifies if features should be recomputed.
47 run_on: Runner
48 On which computer or cluster environment to run the solvers.
49 Available: Runner.LOCAL, Runner.SLURM. Default: Runner.SLURM
51 Returns:
52 The Slurm job or Local job
53 """
54 if isinstance(feature_data, Path):
55 feature_data = FeatureDataFrame(feature_data)
56 if recompute:
57 feature_data.reset_dataframe()
58 jobs = feature_data.remaining_jobs()
60 # If there are no jobs, stop
61 if not jobs:
62 print("No feature computation jobs to run; stopping execution! To recompute "
63 "feature values use the --recompute flag.")
64 return None
65 cutoff = gv.settings().get_general_extractor_cutoff_time()
66 cmd_list = []
67 extractors = {}
68 instance_paths = set()
69 features_core = Path(__file__).parent.resolve() / "core" / "compute_features.py"
70 # We create a job for each instance/extractor combination
71 for instance_path, extractor_name, feature_group in jobs:
72 extractor_path = gv.settings().DEFAULT_extractor_dir / extractor_name
73 instance_paths.add(instance_path)
74 cmd = (f"python3 {features_core} "
75 f"--instance {instance_path} "
76 f"--extractor {extractor_path} "
77 f"--feature-csv {feature_data.csv_filepath} "
78 f"--cutoff {cutoff} "
79 f"--log-dir {sl.caller_log_dir}")
80 if extractor_name in extractors:
81 extractor = extractors[extractor_name]
82 else:
83 extractor = Extractor(extractor_path)
84 extractors[extractor_name] = extractor
85 if extractor.groupwise_computation:
86 # Extractor job can be parallelised, thus creating i * e * g jobs
87 cmd_list.append(cmd + f" --feature-group {feature_group}")
88 else:
89 cmd_list.append(cmd)
91 print(f"The number of compute jobs: {len(cmd_list)}")
93 parallel_jobs = min(len(cmd_list), gv.settings().get_number_of_jobs_in_parallel())
94 sbatch_options = gv.settings().get_slurm_extra_options(as_args=True)
95 srun_options = ["-N1", "-n1"] + sbatch_options
96 run = rrr.add_to_queue(
97 runner=run_on,
98 cmd=cmd_list,
99 name=f"Compute Features: {len(extractors)} Extractors on "
100 f"{len(instance_paths)} instances",
101 parallel_jobs=parallel_jobs,
102 base_dir=sl.caller_log_dir,
103 sbatch_options=sbatch_options,
104 srun_options=srun_options,
105 prepend=gv.settings().get_slurm_job_prepend())
107 if run_on == Runner.SLURM:
108 print(f"Running the extractors through Slurm with Job IDs: {run.run_id}")
109 elif run_on == Runner.LOCAL:
110 print("Waiting for the local calculations to finish.")
111 run.wait()
112 for job in run.jobs:
113 jobs_done = sum(j.status == Status.COMPLETED for j in run.jobs)
114 print(f"Executing Progress: {jobs_done} out of {len(run.jobs)}")
115 if jobs_done == len(run.jobs):
116 break
117 job.wait()
118 print("Computing features done!")
120 return run
123def main(argv: list[str]) -> None:
124 """Main function of the compute features command."""
125 # Log command call
126 sl.log_command(sys.argv)
127 check_for_initialise()
129 # Define command line arguments
130 parser = parser_function()
132 # Process command line arguments
133 args = parser.parse_args(argv)
135 if ac.set_by_user(args, "settings_file"):
136 gv.settings().read_settings_ini(
137 args.settings_file, SettingState.CMD_LINE
138 ) # Do first, so other command line options can override settings from the file
139 if args.run_on is not None:
140 gv.settings().set_run_on(
141 args.run_on.value, SettingState.CMD_LINE)
142 run_on = gv.settings().get_run_on()
144 # Check if there are any feature extractors registered
145 if not any([p.is_dir() for p in gv.settings().DEFAULT_extractor_dir.iterdir()]):
146 print("No feature extractors present! Add feature extractors to Sparkle "
147 "by using the add_feature_extractor command.")
148 sys.exit()
150 # Start compute features
151 print("Start computing features ...")
152 compute_features(gv.settings().DEFAULT_feature_data_path, args.recompute, run_on)
154 # Write used settings to file
155 gv.settings().write_used_settings()
156 sys.exit(0)
159if __name__ == "__main__":
160 main(sys.argv[1:])