Coverage for sparkle/CLI/compute_features.py: 91%
88 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-07-01 13:21 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-07-01 13:21 +0000
1#!/usr/bin/env python3
2"""Sparkle command to compute features for instances."""
3from __future__ import annotations
4import sys
5import argparse
6from pathlib import Path
8import runrunner as rrr
9from runrunner.base import Runner, Status, Run
11from sparkle.selector import Extractor
12from sparkle.CLI.help import global_variables as gv
13from sparkle.CLI.help import logging as sl
14from sparkle.platform.settings_objects import SettingState
15from sparkle.CLI.help import argparse_custom as ac
16from sparkle.CLI.initialise import check_for_initialise
17from sparkle.structures import FeatureDataFrame
18from sparkle.instance import Instance_Set, InstanceSet
19from sparkle.CLI.help.nicknames import resolve_instance_name
22def parser_function() -> argparse.ArgumentParser:
23 """Define the command line arguments."""
24 parser = argparse.ArgumentParser(description="Sparkle command to Compute features "
25 "for instances using added extractors "
26 "and instances.")
27 parser.add_argument(*ac.RecomputeFeaturesArgument.names,
28 **ac.RecomputeFeaturesArgument.kwargs)
29 parser.add_argument(*ac.SettingsFileArgument.names,
30 **ac.SettingsFileArgument.kwargs)
31 parser.add_argument(*ac.RunOnArgument.names,
32 **ac.RunOnArgument.kwargs)
33 return parser
36def compute_features(
37 feature_data: Path | FeatureDataFrame,
38 recompute: bool,
39 run_on: Runner = Runner.SLURM) -> Run:
40 """Compute features for all instance and feature extractor combinations.
42 A RunRunner run is submitted for the computation of the features.
43 The results are then stored in the csv file specified by feature_data_csv_path.
45 Args:
46 feature_data: Feature Data Frame to use, or path to read it from.
47 recompute: Specifies if features should be recomputed.
48 run_on: Runner
49 On which computer or cluster environment to run the solvers.
50 Available: Runner.LOCAL, Runner.SLURM. Default: Runner.SLURM
52 Returns:
53 The Slurm job or Local job
54 """
55 if isinstance(feature_data, Path):
56 feature_data = FeatureDataFrame(feature_data)
57 if recompute:
58 feature_data.reset_dataframe()
59 jobs = feature_data.remaining_jobs()
61 # Lookup all instances to resolve the instance paths later
62 instances: list[InstanceSet] = []
63 for instance_dir in gv.settings().DEFAULT_instance_dir.iterdir():
64 if instance_dir.is_dir():
65 instances.append(Instance_Set(instance_dir))
67 # If there are no jobs, stop
68 if not jobs:
69 print("No feature computation jobs to run; stopping execution! To recompute "
70 "feature values use the --recompute flag.")
71 return None
72 cutoff = gv.settings().get_general_extractor_cutoff_time()
73 cmd_list = []
74 extractors = {}
75 instance_paths = set()
76 features_core = Path(__file__).parent.resolve() / "core" / "compute_features.py"
77 # We create a job for each instance/extractor combination
78 for instance_name, extractor_name, feature_group in jobs:
79 extractor_path = gv.settings().DEFAULT_extractor_dir / extractor_name
80 # Pass instances to avoid looking it up for every iteration
81 instance_path = resolve_instance_name(str(instance_name), instances)
82 instance_paths.add(instance_path)
84 cmd = (f"python3 {features_core} "
85 f"--instance {instance_path} "
86 f"--extractor {extractor_path} "
87 f"--feature-csv {feature_data.csv_filepath} "
88 f"--cutoff {cutoff} "
89 f"--log-dir {sl.caller_log_dir}")
90 if extractor_name in extractors:
91 extractor = extractors[extractor_name]
92 else:
93 extractor = Extractor(extractor_path)
94 extractors[extractor_name] = extractor
95 if extractor.groupwise_computation:
96 # Extractor job can be parallelised, thus creating i * e * g jobs
97 cmd_list.append(cmd + f" --feature-group {feature_group}")
98 else:
99 cmd_list.append(cmd)
101 print(f"The number of compute jobs: {len(cmd_list)}")
103 parallel_jobs = min(
104 len(cmd_list), gv.settings().get_number_of_jobs_in_parallel())
105 sbatch_options = gv.settings().get_slurm_extra_options(as_args=True)
106 srun_options = ["-N1", "-n1"] + sbatch_options
107 run = rrr.add_to_queue(
108 runner=run_on,
109 cmd=cmd_list,
110 name=f"Compute Features: {len(extractors)} Extractors on "
111 f"{len(instance_paths)} instances",
112 parallel_jobs=parallel_jobs,
113 base_dir=sl.caller_log_dir,
114 sbatch_options=sbatch_options,
115 srun_options=srun_options,
116 prepend=gv.settings().get_slurm_job_prepend())
118 if run_on == Runner.SLURM:
119 print(f"Running the extractors through Slurm with Job IDs: {run.run_id}")
120 elif run_on == Runner.LOCAL:
121 print("Waiting for the local calculations to finish.")
122 run.wait()
123 for job in run.jobs:
124 jobs_done = sum(j.status == Status.COMPLETED for j in run.jobs)
125 print(f"Executing Progress: {jobs_done} out of {len(run.jobs)}")
126 if jobs_done == len(run.jobs):
127 break
128 job.wait()
129 print("Computing features done!")
131 return run
134def main(argv: list[str]) -> None:
135 """Main function of the compute features command."""
136 # Log command call
137 sl.log_command(sys.argv)
138 check_for_initialise()
140 # Define command line arguments
141 parser = parser_function()
143 # Process command line arguments
144 args = parser.parse_args(argv)
145 if args.settings_file is not None:
146 gv.settings().read_settings_ini(
147 args.settings_file, SettingState.CMD_LINE
148 ) # Do first, so other command line options can override settings from the file
149 if args.run_on is not None:
150 gv.settings().set_run_on(
151 args.run_on.value, SettingState.CMD_LINE)
152 run_on = gv.settings().get_run_on()
154 # Check if there are any feature extractors registered
155 if not any([p.is_dir() for p in gv.settings().DEFAULT_extractor_dir.iterdir()]):
156 print("No feature extractors present! Add feature extractors to Sparkle "
157 "by using the add_feature_extractor command.")
158 sys.exit()
160 # Start compute features
161 print("Start computing features ...")
162 compute_features(gv.settings().DEFAULT_feature_data_path, args.recompute, run_on)
164 # Write used settings to file
165 gv.settings().write_used_settings()
166 sys.exit(0)
169if __name__ == "__main__":
170 main(sys.argv[1:])