Coverage for sparkle/tools/runsolver_parsing.py: 10%

93 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-09-27 09:10 +0000

1"""Tools to parse runsolver I/O.""" 

2import sys 

3from pathlib import Path 

4import ast 

5import re 

6 

7from sparkle.types import SolverStatus 

8 

9 

10def get_measurements(runsolver_values_path: Path, 

11 not_found: float = -1.0) -> tuple[float, float, float]: 

12 """Return the CPU and wallclock time reported by runsolver in values log.""" 

13 cpu_time, wall_time, memory = not_found, not_found, not_found 

14 if runsolver_values_path.exists(): 

15 with runsolver_values_path.open("r") as infile: 

16 lines = [line.strip().split("=") for line in infile.readlines() 

17 if line.count("=") == 1] 

18 for keyword, value in lines: 

19 if keyword == "WCTIME": 

20 wall_time = float(value) 

21 elif keyword == "CPUTIME": 

22 cpu_time = float(value) 

23 elif keyword == "MAXVM": 

24 memory = float(int(value) / 1024.0) # MB 

25 # Order is fixed, CPU is the last thing we want to read, so break 

26 break 

27 return cpu_time, wall_time, memory 

28 

29 

30def get_status(runsolver_values_path: Path, runsolver_raw_path: Path) -> SolverStatus: 

31 """Get run status from runsolver logs.""" 

32 if not runsolver_values_path.exists() and (runsolver_raw_path is not None 

33 and not runsolver_raw_path.exists()): 

34 # Runsolver logs were not created, job was stopped ''incorrectly'' 

35 return SolverStatus.CRASHED 

36 # First check if runsolver reported time out 

37 if runsolver_values_path.exists(): 

38 for line in reversed(runsolver_values_path.open("r").readlines()): 

39 if line.strip().startswith("TIMEOUT="): 

40 if line.strip() == "TIMEOUT=true": 

41 return SolverStatus.TIMEOUT 

42 break 

43 if runsolver_raw_path is None: 

44 return SolverStatus.UNKNOWN 

45 if not runsolver_raw_path.exists(): 

46 # Runsolver log was not created, job was stopped ''incorrectly'' 

47 return SolverStatus.KILLED 

48 # Last line of runsolver log should contain the raw sparkle solver wrapper output 

49 runsolver_raw_contents = runsolver_raw_path.open("r").read().strip() 

50 # cutoff_time = 

51 sparkle_wrapper_dict_str = runsolver_raw_contents.splitlines()[-1] 

52 solver_regex_filter = re.findall("{.*}", sparkle_wrapper_dict_str)[0] 

53 output_dict = ast.literal_eval(solver_regex_filter) 

54 status = SolverStatus(output_dict["status"]) 

55 # if status == SolverStatus.CRASHED and cpu_time > cutoff_time 

56 return status 

57 

58 

59def get_solver_args(runsolver_log_path: Path) -> str: 

60 """Retrieves solver arguments dict from runsolver log.""" 

61 if runsolver_log_path.exists(): 

62 for line in runsolver_log_path.open("r").readlines(): 

63 if line.startswith("command line:"): 

64 return line.split("sparkle_solver_wrapper.py", 1)[1].strip().strip("'") 

65 return "" 

66 

67 

68def get_solver_output(runsolver_configuration: list[str], 

69 process_output: str, 

70 log_dir: Path) -> dict[str, str | object]: 

71 """Decode solver output dictionary when called with runsolver.""" 

72 solver_input = None 

73 solver_output = None 

74 value_data_file = None 

75 cutoff_time = sys.maxsize 

76 for idx, conf in enumerate(runsolver_configuration): 

77 if not isinstance(conf, str): 

78 # Looking for arg names 

79 continue 

80 conf = conf.strip() 

81 if conf == "-o" or conf == "--solver-data": 

82 # solver output was redirected 

83 solver_data_file = Path(runsolver_configuration[idx + 1]) 

84 if (log_dir / solver_data_file).exists(): 

85 solver_output = (log_dir / solver_data_file).open("r").read() 

86 if "-v" in conf or "--var" in conf: 

87 value_data_file = Path(runsolver_configuration[idx + 1]) 

88 if "--cpu-limit" in conf: 

89 cutoff_time = float(runsolver_configuration[idx + 1]) 

90 if "-w" in conf or "--watcher-data" in conf: 

91 watch_file = Path(runsolver_configuration[idx + 1]) 

92 args_str = get_solver_args(log_dir / watch_file) 

93 if args_str == "": # Could not find log file or args 

94 continue 

95 solver_input = re.findall("{.*}", args_str)[0] 

96 solver_input = ast.literal_eval(solver_input) 

97 cutoff_time = float(solver_input["cutoff_time"]) 

98 

99 if solver_output is None: 

100 # Still empty, try to read from subprocess 

101 solver_output = process_output 

102 # Format output to only the brackets (dict) 

103 # NOTE: It should have only one match, do we want some error logging here? 

104 try: 

105 solver_regex_filter = re.findall("{.*}", solver_output)[0] 

106 output_dict = ast.literal_eval(solver_regex_filter) 

107 except Exception: 

108 config_str = " ".join(runsolver_configuration) 

109 print("WARNING: Solver output decoding failed from RunSolver configuration: " 

110 f"'{config_str}'. Setting status to 'UNKNOWN'.") 

111 output_dict = {"status": SolverStatus.UNKNOWN} 

112 

113 output_dict["cutoff_time"] = cutoff_time 

114 if value_data_file is not None: 

115 cpu_time, wall_time, memory = get_measurements(log_dir / value_data_file) 

116 output_dict["cpu_time"] = cpu_time 

117 output_dict["wall_time"] = wall_time 

118 output_dict["memory"] = memory 

119 else: # Could not retrieve cpu and wall time (log does not exist) 

120 output_dict["cpu_time"], output_dict["wall_time"] = -1.0, -1.0 

121 if output_dict["cpu_time"] > cutoff_time: 

122 output_dict["status"] = SolverStatus.TIMEOUT 

123 # Add the missing objectives (runtime based) 

124 if solver_input is not None and "objectives" in solver_input: 

125 objectives = solver_input["objectives"].split(",") 

126 for o_name in objectives: 

127 if o_name not in output_dict: 

128 output_dict[o_name] = None 

129 return output_dict