Coverage for sparkle/tools/pcsparser.py: 50%

276 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-11-05 14:48 +0000

1"""The Parameter Configuration Space Parser class.""" 

2from __future__ import annotations 

3import re 

4import sys 

5import numpy as np 

6from enum import Enum 

7from abc import ABC 

8from pathlib import Path 

9 

10import tabulate 

11 

12 

13class PCSObject(ABC): 

14 """General data structure to keep the pcs file in. 

15 

16 Fields are added by functions, such that checks can be conducted. 

17 """ 

18 def __init__(self: PCSObject) -> None: 

19 """Initialize the PCSObject.""" 

20 self.params = [] 

21 

22 def add_param(self: PCSObject, 

23 name: str, 

24 structure: str = "integer", 

25 domain: list = [-sys.maxsize, sys.maxsize], 

26 scale: str = "linear", 

27 default: str = "0", 

28 comment: str = None) -> None: 

29 """Add a parameter to the PCSObject.""" 

30 if structure not in ["integer", "real", "categorical", "ordinal"]: 

31 raise ValueError(f"Parameter structure {structure} not supported.") 

32 

33 # Domain check 

34 if structure in ["integer", "real"]: 

35 if len(domain) != 2: 

36 raise ValueError(f"Parameter domain {domain} not supported.") 

37 pass 

38 elif structure == "categorical": 

39 # TODO: check categories 

40 scale = None 

41 

42 self.params.append({ 

43 "name": name, 

44 "structure": structure, 

45 "domain": domain, 

46 "scale": scale, 

47 "default": default, 

48 "comment": comment, 

49 "type": "parameter", 

50 }) 

51 

52 def add_constraint(self: PCSObject, **kwargs: any) -> None: 

53 """Add a constraint to the PCSObject.""" 

54 # TODO add checks 

55 self.params.append({**kwargs, "type": "constraint"}) 

56 

57 def add_forbidden(self: PCSObject, **kwargs: any) -> None: 

58 """Add a forbidden clause to the PCSObject.""" 

59 # TODO add checks 

60 self.params.append({**kwargs, "type": "forbidden"}) 

61 

62 def add_comment(self: PCSObject, **kwargs: any) -> None: 

63 """Add a comment to the PCSObject.""" 

64 # TODO add checks 

65 self.params.append({**kwargs, "type": "comment"}) 

66 

67 def clear(self: PCSObject) -> None: 

68 """Clear the PCSObject.""" 

69 self.params = [] 

70 

71 def get(self: PCSObject, name: str) -> dict: 

72 """Get a parameter from the PCSObject based on the name.""" 

73 names = {p["name"]: i for i, p in enumerate(self.params) if "name" in p} 

74 if name in names: 

75 return self.params[names[name]] 

76 return None 

77 

78 

79class PCSConvention(Enum): 

80 """Internal pcs convention enum.""" 

81 unknown = "" 

82 SMAC = "smac" 

83 ParamILS = "paramils" 

84 IRACE = "irace" 

85 

86 

87class PCSParser(ABC): 

88 """Base interface object for the parser. 

89 

90 It loads the pcs files into the generic pcs object. Once a parameter file is loaded, 

91 it can be exported to another file 

92 """ 

93 

94 def __init__(self: PCSParser, inherit: PCSParser = None) -> None: 

95 """Initialize the PCSParser.""" 

96 if inherit is None: 

97 self.pcs = PCSObject() 

98 else: 

99 self.pcs = inherit.pcs 

100 

101 @staticmethod 

102 def _format_string_to_enum(string: str) -> PCSConvention: 

103 """Convert string to PCSConvention.""" 

104 for form in PCSConvention: 

105 if form.value == string.lower(): 

106 return form 

107 raise Exception("ERROR: parameter configuration space format is not supported.") 

108 

109 def check_validity(self: PCSParser) -> bool: 

110 """Check the validity of the pcs.""" 

111 # TODO implement 

112 

113 # check if for all parameters in constraints and forbidden clauses exists 

114 # Check for conflict between default values and constraints and forbidden clauses 

115 return True 

116 

117 def load(self: PCSParser, filepath: Path, convention: str = "smac") -> None: 

118 """Main import function.""" 

119 if isinstance(filepath, str): 

120 filepath = Path(filepath) 

121 convention = self._format_string_to_enum(convention) 

122 

123 # TODO check if file actually exists 

124 lines = filepath.open().readlines() 

125 if convention == PCSConvention.SMAC: 

126 parser = SMACParser(self) 

127 parser.parse(lines) 

128 self.pcs = parser.pcs 

129 else: 

130 raise Exception(f"ERROR: Importing the pcs convention for {convention.value}" 

131 " is not yet implemented.") 

132 

133 def export(self: PCSParser, 

134 destination: Path, 

135 convention: str = "smac") -> None: 

136 """Main export function.""" 

137 convention = self._format_string_to_enum(convention) 

138 if convention == PCSConvention.ParamILS: 

139 pcs = ParamILSParser(self).compile() 

140 elif convention == PCSConvention.IRACE: 

141 pcs, forbidden = IRACEParser(self).compile() 

142 forbidden_file_name = destination.stem + "_forbidden.txt" 

143 (destination.parent / forbidden_file_name).open("w").write(forbidden) 

144 else: 

145 raise Exception(f"ERROR: Exporting the pcs convention for {convention.value}" 

146 " is not yet implemented.") 

147 destination.open("w").write("### Parameter file generated by Sparkle\n" 

148 f"{pcs}\n") 

149 

150 

151class SMACParser(PCSParser): 

152 """The SMAC parser class.""" 

153 

154 def parse(self: SMACParser, lines: list[str]) -> None: 

155 """Parse the pcs file.""" 

156 self.pcs.clear() 

157 

158 # PARAMS 

159 for line in lines: 

160 # The only forbidden characters in parameter names are: 

161 # spaces, commas, quotes, and parentheses 

162 regex = (r"(?P<name>[^\s\"',]*)\s+(?P<structure>\w*)\s+(?P<domain>(\[|\{)" 

163 r".*(\]|\}))\s*\[(?P<default>.*)\]\s*(?P<scale>log)" 

164 r"*\s*#*(?P<comment>.*)") 

165 m = re.match(regex, line) 

166 if m is not None: 

167 fields = m.groupdict() 

168 fields["domain"] = re.sub(r"(?:\[|\]|\{|\})", "", fields["domain"]) 

169 fields["domain"] = re.split(r"\s*,\s*", fields["domain"]) 

170 self.pcs.add_param(**fields) 

171 continue 

172 

173 # CONSTRAINTS 

174 regex = (r"(?P<parameter>[^\s\"',]+)\s*\|\s" 

175 r"*(?P<conditions>.+)\s*#*(?P<comment>.*)") 

176 m = re.match(regex, line) 

177 if m is not None: 

178 constraint = m.groupdict() 

179 constraint["conditions"] = self._parse_conditions( 

180 constraint["conditions"]) 

181 self.pcs.add_constraint(**constraint) 

182 continue 

183 

184 # FORBIDDEN CLAUSES 

185 regex = r"\s*\{(?P<clauses>[^\}]+)\}\s*#*(?P<comment>.*)" 

186 m = re.match(regex, line) 

187 if m is not None: 

188 forbidden = m.groupdict() 

189 conditions = [] 

190 # Simple clauses 

191 # {<parameter name 1>=<value 1>, ..., <parameter name N>=<value N>} 

192 if "," in forbidden["clauses"]: 

193 forbidden["clause_type"] = "simple" 

194 for clause in re.split(r"\s*,\s*", forbidden["clauses"]): 

195 m = re.match(r"(?P<param>[^\s\"',=]+)\s*=\s*" 

196 r"(?P<value>[^\s\"',]+)", clause) 

197 if m is not None: 

198 conditions.append(m.groupdict()) 

199 else: 

200 print(clause, "ERROR") 

201 

202 else: # Advanced clauses 

203 forbidden["clause_type"] = "advanced" 

204 # TODO decide if we need to further parse this down 

205 conditions = [expr for expr in re.split(r"\s*(?:\|\||&&)\s*", 

206 forbidden["clauses"])] 

207 

208 if len(conditions) == 0: 

209 raise Exception(f"ERROR: cannot parse the following line:\n'{line}'") 

210 

211 forbidden["clauses"] = conditions 

212 

213 self.pcs.add_forbidden(**forbidden) 

214 continue 

215 

216 # COMMENTLINE 

217 regex = r"\s*#(?P<comment>.*)" 

218 m = re.match(regex, line) 

219 if m is not None: 

220 comment = m.groupdict() 

221 self.pcs.add_comment(**comment) 

222 continue 

223 

224 # EMTPY LINE 

225 regex = r"^\s*$" 

226 m = re.match(regex, line) 

227 if m is not None: 

228 continue 

229 

230 # RAISE ERROR 

231 raise Exception(f"ERROR: cannot parse the following line: \n'{line}'") 

232 

233 return 

234 

235 def _parse_conditions(self: SMACParser, conditions: str) -> list[tuple]: 

236 """Parse the conditions.""" 

237 conditionlist = [] 

238 condition = None 

239 operator = None 

240 nested = 0 

241 nested_start = 0 

242 condition_start = 0 

243 for pos, char in enumerate(conditions): 

244 # Nested clauses 

245 if char == "(": 

246 if nested == 0: 

247 nested_start = pos 

248 nested += 1 

249 elif char == ")": 

250 nested -= 1 

251 if nested == 0: 

252 condition = self._parse_conditions(conditions[nested_start + 1:pos]) 

253 conditionlist.append((operator, condition)) 

254 if (pos + 1) == len(conditions): 

255 return conditionlist 

256 

257 if pos > 1 and nested == 0: 

258 for op in ["||", "&&"]: 

259 if conditions[pos - 1: pos + 1] == op: 

260 if not isinstance(condition, list): 

261 condition = self._parse_condition( 

262 conditions[condition_start:pos - 1]) 

263 conditionlist.append((operator, condition)) 

264 

265 operator = op 

266 condition_start = pos + 1 

267 

268 condition = self._parse_condition(conditions[condition_start:len(conditions)]) 

269 conditionlist.append((operator, condition)) 

270 

271 return conditionlist 

272 

273 @staticmethod 

274 def _parse_condition(condition: str) -> dict: 

275 """Parse the condition.""" 

276 cont = False 

277 

278 m = re.match(r"\s*(?P<parameter>[^\s\"',]+)\s*(?P<quantifier>==|!=|<|>|<=|>=)" 

279 r"\s*(?P<value>[^\s\"',]+)\s*", condition) 

280 if m is not None: 

281 condition = { 

282 **m.groupdict(), 

283 "type": "numerical", 

284 } 

285 cont = True 

286 

287 if not cont: 

288 m = re.match(r"\s*(?P<parameter>[^\s\"',]+)\s+" 

289 r"in\s*\{(?P<items>[^\}]+)\}\s*", condition) 

290 if m is not None: 

291 condition = { 

292 **m.groupdict(), 

293 "type": "categorical", 

294 } 

295 condition["items"] = re.split(r",\s*", condition["items"]) 

296 cont = True 

297 

298 if not cont: 

299 raise Exception(f"ERROR: Couldn't parse '{condition}'") 

300 

301 return condition 

302 

303 def compile(self: SMACParser) -> str: 

304 """Compile the PCS.""" 

305 # TODO implement 

306 pass 

307 

308 

309class ParamILSParser(PCSParser): 

310 """PCS parser for ParamILS format.""" 

311 

312 def parse(self: ParamILSParser, lines: list[str]) -> None: 

313 """Parse the PCS.""" 

314 # TODO implement 

315 pass 

316 

317 def compile(self: ParamILSParser) -> str: 

318 """Compile the PCS.""" 

319 # TODO Produce warning if certain specifications cannot be kept in this format 

320 # TODO granularity parameter that sets how log and real ranges should be expanded 

321 granularity = 20 

322 

323 lines = [] 

324 for item in self.pcs.params: 

325 if item["type"] == "parameter": 

326 if item["structure"] in ["ordinal", "categorical"]: 

327 domain = ",".join(item["domain"]) 

328 elif item["structure"] == "integer": 

329 if len(item["domain"]) != 2: 

330 raise ValueError(f"Domain {item['domain']} not supported.") 

331 

332 (minval, maxval) = [int(i) for i in item["domain"]] 

333 if item["scale"] != "log": 

334 # domain = f"{minval}, {(minval + 1)}..{maxval}" 

335 domain = list(np.linspace(minval, maxval, granularity)) 

336 domain = list(set(np.round(domain).astype(int))) # Cast to int 

337 if int(item["default"]) not in domain: 

338 domain += [int(item["default"])] 

339 domain.sort() 

340 

341 domain = ",".join([str(i) for i in domain]) 

342 else: 

343 domain = list(np.unique(np.geomspace(minval, maxval, granularity, 

344 dtype=int))) 

345 # add default value 

346 if int(item["default"]) not in domain: 

347 domain += [int(item["default"])] 

348 domain.sort() 

349 

350 domain = ",".join([str(i) for i in domain]) 

351 

352 elif item["structure"] == "real": 

353 if len(item["domain"]) != 2: 

354 raise ValueError(f"Domain {item['domain']} not supported.") 

355 

356 (minval, maxval) = [float(i) for i in item["domain"]] 

357 if item["scale"] != "log": 

358 domain = list(np.linspace(minval, maxval, granularity)) 

359 else: 

360 domain = list(np.unique(np.geomspace(minval, maxval, granularity, 

361 dtype=float))) 

362 # add default value 

363 if float(item["default"]) not in domain: 

364 domain += [float(item["default"])] 

365 domain.sort() 

366 

367 # Filter duplicated in string format 

368 domain = list(set([f"{i}" for i in domain])) 

369 domain.sort(key=float) 

370 domain = ",".join(domain) 

371 

372 domain = "{" + domain + "}" 

373 line = f"{item['name']} {domain} [{item['default']}]" 

374 if item["comment"] != "": 

375 line += f" #{item['comment']}" 

376 

377 lines.append(line) 

378 

379 for item in self.pcs.params: 

380 if item["type"] == "constraint": 

381 line = f"{item['parameter']} | " 

382 line += self._compile_conditions(item["conditions"]) 

383 if item["comment"] != "": 

384 line += f" #{item['comment']}" 

385 lines.append(line) 

386 

387 for item in self.pcs.params: 

388 if item["type"] == "forbidden": 

389 if item["clause_type"] == "simple": 

390 clauses = [f"{cls['param']}={cls['value']}" 

391 for cls in item["clauses"]] 

392 line = "{" + ",".join(clauses) + "}" 

393 if item["comment"] != "": 

394 line += f"#{item['comment']}" 

395 lines.append(line) 

396 else: 

397 print("WARNING: Advanced forbidden clauses " 

398 "are not supported by ParamILS.") 

399 pass 

400 

401 lines = "\n".join(lines) 

402 return lines 

403 

404 def _compile_conditions(self: ParamILSParser, conditions: list[tuple]) -> str: 

405 """Compile a list of conditions.""" 

406 line = "" 

407 for operator, condition in conditions: 

408 if operator is not None: 

409 line += f" {operator} " 

410 

411 if isinstance(condition, list): 

412 line += f"({self._compile_conditions(condition)})" 

413 else: 

414 if condition["type"] == "numerical": 

415 line += f"{condition['parameter']} in " + "{" 

416 param = self.pcs.get(condition["parameter"]) 

417 if param["structure"] == "categorical": 

418 if condition["value"] in param["domain"]: 

419 line += f"{condition['value']}" + "}" 

420 # line += "{parameter} {quantifier} {value}".format(**condition) 

421 if condition["type"] == "categorical": 

422 items = ", ".join(condition["items"]) 

423 line += f"{condition['parameter']} in {{{items}}}" 

424 return line 

425 

426 

427class IRACEParser(PCSParser): 

428 """Base interface object for the parser. 

429 

430 It loads the IRACE pcs files into the generic pcs object. 

431 Once a parameter file is loaded, it can be exported to another file. 

432 """ 

433 

434 def __init__(self: IRACEParser, inherit: IRACEParser = None) -> None: 

435 """Initialize the IRACEParser.""" 

436 if inherit is None: 

437 self.pcs = PCSObject() 

438 else: 

439 self.pcs = inherit.pcs 

440 

441 def parse(self: IRACEParser, lines: list[str]) -> None: 

442 """Parse the pcs file.""" 

443 # TODO implement 

444 pass 

445 

446 def compile(self: IRACEParser) -> tuple[str, str]: 

447 """Compile the PCS.""" 

448 # Create pcs table 

449 header = ["# name", "switch", "type", "values", 

450 "[conditions (using R syntax)]"] 

451 rows = [] 

452 forbidden = [f for f in self.pcs.params if f["type"] == "forbidden"] 

453 constraints = [c for c in self.pcs.params if c["type"] == "constraint"] 

454 for param in [p for p in self.pcs.params if p["type"] == "parameter"]: 

455 # IRACE writes conditions on the same line as param definitions 

456 param_constraint = [c for c in constraints 

457 if c["parameter"] == param["name"]] 

458 condition_str = "|" 

459 for constraint in param_constraint: 

460 for operator, condition in constraint["conditions"]: 

461 operator = operator if operator is not None else "" 

462 condition_str +=\ 

463 (f" {operator} {condition['parameter']} %in% " 

464 f"{condition['type'][0]}({','.join(condition['items'])})") 

465 if condition_str == "|": 

466 condition_str = "" 

467 rows.append([param["name"], # Parameter name 

468 f'"--{param["""name"""]} "', # Parameter argument name 

469 param["structure"][0], # Parameter type 

470 f"({','.join(param['domain'])})", # Parameter range/domain 

471 condition_str]) # Parameter conditions 

472 forbidden_rows = [] 

473 for f in forbidden: 

474 forbidden_rows.append(" & ".join([f"({c['param']} = {c['value']})" 

475 for c in f["clauses"]])) 

476 return tabulate.tabulate(rows, headers=header, tablefmt="plain", 

477 numalign="left"), "\n".join(forbidden_rows)