Coverage for sparkle/CLI/core/compute_features.py: 0%
37 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-07-01 13:21 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-07-01 13:21 +0000
1#!/usr/bin/env python3
2# -*- coding: UTF-8 -*-
3"""Compute features for an instance, only for internal calls from Sparkle."""
4import argparse
5from pathlib import Path
6from filelock import FileLock
8from sparkle.CLI.help import global_variables as gv
9from sparkle.structures import FeatureDataFrame
10from sparkle.selector import Extractor
13if __name__ == "__main__":
14 # Define command line arguments
15 parser = argparse.ArgumentParser()
16 parser.add_argument("--instance", required=True, type=Path, nargs="+",
17 help="path to instance file(s) to run on")
18 parser.add_argument("--extractor", required=True, type=str,
19 help="path to feature extractor")
20 parser.add_argument("--feature-csv", required=True, type=str,
21 help="path to feature data CSV file")
22 parser.add_argument("--cutoff", required=True, type=str,
23 help="the maximum CPU time for the extractor.")
24 parser.add_argument("--feature-group", required=False, type=str,
25 help="the group of features to compute, if available for the "
26 "extractor. If not available or provided, all groups will"
27 " be computed.")
28 parser.add_argument("--log-dir", type=Path, required=False,
29 help="path to the log directory")
30 args = parser.parse_args()
32 # Process command line arguments
33 log_dir =\
34 args.log_dir if args.log_dir is not None else gv.settings().DEFAULT_tmp_output
36 # Instance agument is a list to allow for multifile instances
37 instance_path: list[Path] = args.instance
38 instance_name = instance_path[0].stem
39 extractor_path = Path(args.extractor)
40 feature_data_csv_path = Path(args.feature_csv)
41 cutoff_extractor = args.cutoff
43 # Ensure stringifcation of path objects
44 if isinstance(instance_path, list):
45 instance_list = [str(filepath) for filepath in instance_path]
46 else:
47 instance_list = [str(instance_path)]
49 extractor = Extractor(extractor_path,
50 gv.settings().DEFAULT_runsolver_exec)
51 features = extractor.run(instance_list,
52 feature_group=args.feature_group,
53 cutoff_time=cutoff_extractor,
54 log_dir=log_dir)
56 # Now that we have our result, we write it to the FeatureDataCSV with a FileLock
57 lock = FileLock(f"{feature_data_csv_path}.lock")
58 if features is not None:
59 print(f"Writing features to CSV: {instance_name}, {extractor_path.name}")
60 with lock.acquire(timeout=60):
61 feature_data = FeatureDataFrame(feature_data_csv_path)
62 instance_key = instance_name if instance_name in feature_data.instances else\
63 str(instance_path[0].with_suffix(""))
64 for feature_group, feature_name, value in features:
65 feature_data.set_value(instance_key, extractor_path.name,
66 feature_group, feature_name, float(value))
67 feature_data.save_csv()
68 lock.release()
69 else:
70 print("EXCEPTION during retrieving extractor results.\n"
71 f"****** WARNING: Feature vector computation on instance {instance_path}"
72 " failed! ******")