Coverage for src / sparkle / CLI / add_feature_extractor.py: 82%

65 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 15:31 +0000

1#!/usr/bin/env python3 

2"""Sparkle command to add a feature extractor to the Sparkle platform.""" 

3 

4import os 

5import stat 

6import sys 

7import shutil 

8import argparse 

9from pathlib import Path 

10 

11from sparkle.platform import file_help as sfh 

12from sparkle.CLI.help import global_variables as gv 

13from sparkle.structures import FeatureDataFrame 

14from sparkle.CLI.help import logging as sl 

15from sparkle.CLI.initialise import check_for_initialise 

16from sparkle.CLI.help import argparse_custom as ac 

17from sparkle.selector import Extractor 

18 

19 

20def parser_function() -> argparse.ArgumentParser: 

21 """Define the command line arguments.""" 

22 # Define command line arguments 

23 parser = argparse.ArgumentParser( 

24 description="Add a feature extractor to the platform." 

25 ) 

26 parser.add_argument( 

27 *ac.ExtractorPathArgument.names, **ac.ExtractorPathArgument.kwargs 

28 ) 

29 parser.add_argument( 

30 *ac.NicknameFeatureExtractorArgument.names, 

31 **ac.NicknameFeatureExtractorArgument.kwargs, 

32 ) 

33 parser.add_argument(*ac.NoCopyArgument.names, **ac.NoCopyArgument.kwargs) 

34 return parser 

35 

36 

37def main(argv: list[str]) -> None: 

38 """Main function of the add feature extractor command.""" 

39 # Log command call 

40 sl.log_command(sys.argv, gv.settings().random_state) 

41 check_for_initialise() 

42 

43 parser = parser_function() 

44 

45 # Process command line arguments 

46 args = parser.parse_args(argv) 

47 

48 extractor_source_path = Path(args.extractor_path) 

49 if not extractor_source_path.exists(): 

50 print(f'Feature extractor path "{extractor_source_path}" does not exist!') 

51 sys.exit(-1) 

52 

53 nickname_str = args.nickname 

54 

55 # Start add feature extractor 

56 extractor_target_path = ( 

57 gv.settings().DEFAULT_extractor_dir / extractor_source_path.name 

58 ) 

59 

60 if extractor_target_path.exists(): 

61 print( 

62 f"Feature extractor {extractor_source_path.name} already exists! " 

63 "Can not add feature extractor." 

64 ) 

65 sys.exit(-1) 

66 

67 # Check execution permissions for wrapper 

68 extractor_source = Extractor(extractor_source_path) 

69 if extractor_source.wrapper is None: 

70 print( 

71 f"The Extractor has no wrapper in its directory; please check that the directory {extractor_source_path} contains a file with the name '{Extractor.wrapper_file_name}'!" 

72 ) 

73 sys.exit(-1) 

74 if not extractor_source.wrapper.is_file() or not os.access( 

75 extractor_source.wrapper, os.X_OK 

76 ): 

77 print( 

78 f"The file {extractor_source.wrapper} does not exist or is \ 

79 not executable." 

80 ) 

81 sys.exit(-1) 

82 

83 # Get the extractor features groups and names from the wrapper, try to add to FDF 

84 feature_dataframe = FeatureDataFrame(gv.settings().DEFAULT_feature_data_path) 

85 feature_dataframe.add_extractor(extractor_source.name, extractor_source.features) 

86 

87 if args.no_copy: 

88 print( 

89 f"Creating symbolic link from {extractor_source_path} " 

90 f"to {extractor_target_path}..." 

91 ) 

92 extractor_target_path.symlink_to(extractor_source_path.absolute()) 

93 else: 

94 print(f"Copying feature extractor {extractor_source_path.name} ...") 

95 extractor_target_path.mkdir() 

96 shutil.copytree(extractor_source_path, extractor_target_path, dirs_exist_ok=True) 

97 

98 extractor = Extractor(extractor_target_path) 

99 # Everything passed, can save FDF 

100 feature_dataframe.save_csv() 

101 

102 # Add RunSolver executable to the solver 

103 runsolver_path = gv.settings().DEFAULT_runsolver_exec 

104 if runsolver_path.name in [file.name for file in extractor_target_path.iterdir()]: 

105 print( 

106 "Warning! RunSolver executable detected in Extractor " 

107 f"{extractor.name}. This will be replaced with " 

108 f"Sparkle's version of RunSolver. ({runsolver_path})" 

109 ) 

110 

111 if runsolver_path.exists(): 

112 runsolver_target = extractor.directory / runsolver_path.name 

113 shutil.copyfile(runsolver_path, runsolver_target) 

114 runsolver_target.chmod(runsolver_target.stat().st_mode | stat.S_IEXEC) 

115 else: 

116 print("Warning! RunSolver does not exists. Falling back to PyRunSolver.") 

117 

118 print(f"Adding feature extractor {extractor_target_path.name} done!") 

119 

120 if nickname_str is not None: 

121 sfh.add_remove_platform_item( 

122 extractor_target_path, 

123 gv.extractor_nickname_list_path, 

124 gv.file_storage_data_mapping[gv.extractor_nickname_list_path], 

125 key=nickname_str, 

126 ) 

127 

128 # Write used settings to file 

129 gv.settings().write_used_settings() 

130 sys.exit(0) 

131 

132 

133if __name__ == "__main__": 

134 main(sys.argv[1:])