Coverage for sparkle/solver/solver.py: 36%

146 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-09-27 09:10 +0000

1"""File to handle a solver and its directories.""" 

2 

3from __future__ import annotations 

4import sys 

5from typing import Any 

6import shlex 

7import ast 

8import json 

9from pathlib import Path 

10 

11import runrunner as rrr 

12from runrunner.local import LocalRun 

13from runrunner.slurm import SlurmRun 

14from runrunner.base import Status, Runner 

15 

16from sparkle.tools import runsolver_parsing, general as tg 

17from sparkle.tools import pcsparser 

18from sparkle.types import SparkleCallable, SolverStatus 

19from sparkle.solver.verifier import SolutionVerifier 

20from sparkle.instance import InstanceSet 

21from sparkle.types import resolve_objective, SparkleObjective, UseTime 

22 

23 

24class Solver(SparkleCallable): 

25 """Class to handle a solver and its directories.""" 

26 meta_data = "solver_meta.txt" 

27 wrapper = "sparkle_solver_wrapper.py" 

28 

29 def __init__(self: Solver, 

30 directory: Path, 

31 raw_output_directory: Path = None, 

32 runsolver_exec: Path = None, 

33 deterministic: bool = None, 

34 verifier: SolutionVerifier = None) -> None: 

35 """Initialize solver. 

36 

37 Args: 

38 directory: Directory of the solver. 

39 raw_output_directory: Directory where solver will write its raw output. 

40 Defaults to directory / tmp 

41 runsolver_exec: Path to the runsolver executable. 

42 By default, runsolver in directory. 

43 deterministic: Bool indicating determinism of the algorithm. 

44 Defaults to False. 

45 verifier: The solution verifier to use. If None, no verifier is used. 

46 """ 

47 super().__init__(directory, runsolver_exec, raw_output_directory) 

48 self.deterministic = deterministic 

49 self.verifier = verifier 

50 self.meta_data_file = self.directory / Solver.meta_data 

51 

52 if self.raw_output_directory is None: 

53 self.raw_output_directory = self.directory / "tmp" 

54 self.raw_output_directory.mkdir(exist_ok=True) 

55 if self.runsolver_exec is None: 

56 self.runsolver_exec = self.directory / "runsolver" 

57 if not self.meta_data_file.exists(): 

58 self.meta_data_file = None 

59 if self.deterministic is None: 

60 if self.meta_data_file is not None: 

61 # Read the parameter from file 

62 meta_dict = ast.literal_eval(self.meta_data_file.open().read()) 

63 self.deterministic = meta_dict["deterministic"] 

64 else: 

65 self.deterministic = False 

66 

67 def _get_pcs_file(self: Solver) -> Path | bool: 

68 """Get path of the parameter file. 

69 

70 Returns: 

71 Path to the parameter file or False if the parameter file does not exist. 

72 """ 

73 pcs_files = [p for p in self.directory.iterdir() if p.suffix == ".pcs"] 

74 if len(pcs_files) != 1: 

75 # We only consider one PCS file per solver 

76 return False 

77 return pcs_files[0] 

78 

79 def get_pcs_file(self: Solver) -> Path: 

80 """Get path of the parameter file. 

81 

82 Returns: 

83 Path to the parameter file. None if it can not be resolved. 

84 """ 

85 if not (file_path := self._get_pcs_file()): 

86 return None 

87 return file_path 

88 

89 def read_pcs_file(self: Solver) -> bool: 

90 """Checks if the pcs file can be read.""" 

91 pcs_file = self._get_pcs_file() 

92 try: 

93 parser = pcsparser.PCSParser() 

94 parser.load(str(pcs_file), convention="smac") 

95 return True 

96 except SyntaxError: 

97 pass 

98 return False 

99 

100 def get_pcs(self: Solver) -> dict[str, tuple[str, str, str]]: 

101 """Get the parameter content of the PCS file.""" 

102 if not (pcs_file := self.get_pcs_file()): 

103 return None 

104 parser = pcsparser.PCSParser() 

105 parser.load(str(pcs_file), convention="smac") 

106 return [p for p in parser.pcs.params if p["type"] == "parameter"] 

107 

108 def build_cmd(self: Solver, 

109 instance: str | list[str], 

110 objectives: list[SparkleObjective], 

111 seed: int, 

112 cutoff_time: int = None, 

113 configuration: dict = None) -> list[str]: 

114 """Build the solver call on an instance with a configuration. 

115 

116 Args: 

117 instance: Path to the instance. 

118 seed: Seed of the solver. 

119 cutoff_time: Cutoff time for the solver. 

120 configuration: Configuration of the solver. 

121 

122 Returns: 

123 List of commands and arguments to execute the solver. 

124 """ 

125 if configuration is None: 

126 configuration = {} 

127 # Ensure configuration contains required entries for each wrapper 

128 configuration["solver_dir"] = str(self.directory.absolute()) 

129 configuration["instance"] = instance 

130 configuration["seed"] = seed 

131 configuration["objectives"] = ",".join([str(obj) for obj in objectives]) 

132 if cutoff_time is not None: # Use RunSolver 

133 configuration["cutoff_time"] = cutoff_time 

134 # Create RunSolver Logs 

135 # --timestamp 

136 # instructs to timestamp each line of the solver standard output and 

137 # error files (which are then redirected to stdout) 

138 

139 # --use-pty 

140 # use a pseudo-terminal to collect the solver output. Currently only 

141 # available when lines are timestamped. Some I/O libraries (including 

142 # the C library) automatically flushes the output after each line when 

143 # the standard output is a terminal. There's no automatic flush when 

144 # the standard output is a pipe or a plain file. See setlinebuf() for 

145 # some details. This option instructs runsolver to use a 

146 # pseudo-terminal instead of a pipe/file to collect the solver 

147 # output. This fools the solver which will line-buffer its output. 

148 

149 # -w filename or --watcher-data filename 

150 # sends the watcher informations to filename 

151 

152 # -v filename or --var filename 

153 # save the most relevant information (times,...) 

154 # in an easy to parse VAR=VALUE file 

155 

156 # -o filename or --solver-data filename 

157 # redirects the solver output (both stdout and stderr) to filename 

158 inst_name = Path(instance).name 

159 raw_result_path =\ 

160 Path(f"{self.name}_{inst_name}_{tg.get_time_pid_random_string()}.rawres") 

161 runsolver_watch_data_path = raw_result_path.with_suffix(".log") 

162 runsolver_values_path = raw_result_path.with_suffix(".val") 

163 

164 solver_cmd = [str(self.runsolver_exec.absolute()), 

165 "--timestamp", "--use-pty", 

166 "--cpu-limit", str(cutoff_time), 

167 "-w", str(runsolver_watch_data_path), 

168 "-v", str(runsolver_values_path), 

169 "-o", str(raw_result_path)] 

170 else: 

171 configuration["cutoff_time"] = sys.maxsize 

172 solver_cmd = [] 

173 

174 # Ensure stringification of dictionary will go correctly for key value pairs 

175 configuration = {key: str(configuration[key]) for key in configuration} 

176 solver_cmd += [str((self.directory / Solver.wrapper).absolute()), 

177 f"'{json.dumps(configuration)}'"] 

178 return solver_cmd 

179 

180 def run(self: Solver, 

181 instance: str | list[str] | InstanceSet, 

182 objectives: list[SparkleObjective], 

183 seed: int, 

184 cutoff_time: int = None, 

185 configuration: dict = None, 

186 run_on: Runner = Runner.LOCAL, 

187 commandname: str = "run_solver", 

188 sbatch_options: list[str] = None, 

189 cwd: Path = None) -> SlurmRun | list[dict[str, Any]] | dict[str, Any]: 

190 """Run the solver on an instance with a certain configuration. 

191 

192 Args: 

193 instance: The instance(s) to run the solver on, list in case of multi-file. 

194 In case of an instance set, will run on all instances in the set. 

195 seed: Seed to run the solver with. Fill with abitrary int in case of 

196 determnistic solver. 

197 cutoff_time: The cutoff time for the solver, measured through RunSolver. 

198 If None, will be executed without RunSolver. 

199 configuration: The solver configuration to use. Can be empty. 

200 cwd: Path where to execute. Defaults to self.raw_output_directory. 

201 

202 Returns: 

203 Solver output dict possibly with runsolver values. 

204 """ 

205 if cwd is None: 

206 cwd = self.raw_output_directory 

207 cmds = [] 

208 if isinstance(instance, InstanceSet): 

209 for inst in instance.instance_paths: 

210 solver_cmd = self.build_cmd(inst.absolute(), 

211 objectives=objectives, 

212 seed=seed, 

213 cutoff_time=cutoff_time, 

214 configuration=configuration) 

215 cmds.append(" ".join(solver_cmd)) 

216 else: 

217 solver_cmd = self.build_cmd(instance, 

218 objectives=objectives, 

219 seed=seed, 

220 cutoff_time=cutoff_time, 

221 configuration=configuration) 

222 cmds.append(" ".join(solver_cmd)) 

223 run = rrr.add_to_queue(runner=run_on, 

224 cmd=cmds, 

225 name=commandname, 

226 base_dir=cwd, 

227 path=cwd, 

228 sbatch_options=sbatch_options) 

229 

230 if isinstance(run, LocalRun): 

231 run.wait() 

232 # Subprocess resulted in error 

233 if run.status == Status.ERROR: 

234 print(f"WARNING: Solver {self.name} execution seems to have failed!\n") 

235 for i, job in enumerate(run.jobs): 

236 print(f"[Job {i}] The used command was: {cmds[i]}\n" 

237 "The error yielded was:\n" 

238 f"\t-stdout: '{run.jobs[0]._process.stdout}'\n" 

239 f"\t-stderr: '{run.jobs[0]._process.stderr}'\n") 

240 return {"status": SolverStatus.ERROR, } 

241 

242 solver_outputs = [] 

243 for i, job in enumerate(run.jobs): 

244 solver_cmd = cmds[i].split(" ") 

245 runsolver_configuration = None 

246 if solver_cmd[0] == str(self.runsolver_exec.absolute()): 

247 runsolver_configuration = solver_cmd[:11] 

248 solver_output = Solver.parse_solver_output(run.jobs[i].stdout, 

249 runsolver_configuration, 

250 cwd) 

251 if self.verifier is not None: 

252 solver_output["status"] = self.verifier.verifiy( 

253 instance, Path(runsolver_configuration[-1])) 

254 solver_outputs.append(solver_output) 

255 return solver_outputs if len(solver_outputs) > 1 else solver_output 

256 return run 

257 

258 @staticmethod 

259 def config_str_to_dict(config_str: str) -> dict[str, str]: 

260 """Parse a configuration string to a dictionary.""" 

261 # First we filter the configuration of unwanted characters 

262 config_str = config_str.strip().replace("-", "") 

263 # Then we split the string by spaces, but conserve substrings 

264 config_list = shlex.split(config_str) 

265 # We return empty for empty input OR uneven input 

266 if config_str == "" or config_str == r"{}" or len(config_list) & 1: 

267 return {} 

268 config_dict = {} 

269 for index in range(0, len(config_list), 2): 

270 # As the value will already be a string object, no quotes are allowed in it 

271 value = config_list[index + 1].strip('"').strip("'") 

272 config_dict[config_list[index]] = value 

273 return config_dict 

274 

275 @staticmethod 

276 def parse_solver_output(solver_output: str, 

277 runsolver_configuration: list[str] = None, 

278 cwd: Path = None) -> dict[str, Any]: 

279 """Parse the output of the solver. 

280 

281 Args: 

282 solver_output: The output of the solver run which needs to be parsed 

283 runsolver_configuration: The runsolver configuration to wrap the solver 

284 with. If runsolver was not used this should be None. 

285 cwd: Path where to execute. Defaults to self.raw_output_directory. 

286 

287 Returns: 

288 Dictionary representing the parsed solver output 

289 """ 

290 if runsolver_configuration is not None: 

291 parsed_output = runsolver_parsing.get_solver_output(runsolver_configuration, 

292 solver_output, 

293 cwd) 

294 else: 

295 parsed_output = ast.literal_eval(solver_output) 

296 

297 # cast status attribute from str to Enum 

298 parsed_output["status"] = SolverStatus(parsed_output["status"]) 

299 # apply objectives to parsed output, runtime based objectives added here 

300 for key, value in parsed_output.items(): 

301 if key == "status": 

302 continue 

303 objective = resolve_objective(key) 

304 if objective is None: 

305 continue 

306 if objective.use_time == UseTime.NO: 

307 if objective.post_process is not None: 

308 parsed_output[objective] = objective.post_process(value) 

309 else: 

310 if runsolver_configuration is None: 

311 continue 

312 if objective.use_time == UseTime.CPU_TIME: 

313 parsed_output[key] = parsed_output["cpu_time"] 

314 else: 

315 parsed_output[key] = parsed_output["wall_time"] 

316 if objective.post_process is not None: 

317 parsed_output[key] = objective.post_process( 

318 parsed_output[key], parsed_output["cutoff_time"]) 

319 if "cutoff_time" in parsed_output: 

320 del parsed_output["cutoff_time"] 

321 return parsed_output