Coverage for sparkle/tools/pcsparser.py: 56%

344 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-07 15:22 +0000

1"""The Parameter Configuration Space Parser class.""" 

2from __future__ import annotations 

3import re 

4import sys 

5import numpy as np 

6from enum import Enum 

7from abc import ABC 

8from pathlib import Path 

9 

10import tabulate 

11import ConfigSpace 

12 

13 

14class PCSObject(ABC): 

15 """General data structure to keep the pcs file in. 

16 

17 Fields are added by functions, such that checks can be conducted. 

18 """ 

19 def __init__(self: PCSObject) -> None: 

20 """Initialize the PCSObject.""" 

21 self.params = [] 

22 

23 def add_param(self: PCSObject, 

24 name: str, 

25 structure: str = "integer", 

26 domain: list = [-sys.maxsize, sys.maxsize], 

27 scale: str = "linear", 

28 default: str = "0", 

29 comment: str = None) -> None: 

30 """Add a parameter to the PCSObject.""" 

31 if structure not in ["integer", "real", "categorical", "ordinal"]: 

32 raise ValueError(f"Parameter structure {structure} not supported.") 

33 

34 # Domain check 

35 if structure in ["integer", "real"]: 

36 if len(domain) != 2: 

37 raise ValueError(f"Parameter domain {domain} not supported.") 

38 pass 

39 elif structure == "categorical": 

40 # TODO: check categories 

41 scale = None 

42 

43 self.params.append({ 

44 "name": name, 

45 "structure": structure, 

46 "domain": domain, 

47 "scale": scale, 

48 "default": default, 

49 "comment": comment, 

50 "type": "parameter", 

51 }) 

52 

53 def add_constraint(self: PCSObject, **kwargs: any) -> None: 

54 """Add a constraint to the PCSObject.""" 

55 # TODO add checks 

56 self.params.append({**kwargs, "type": "constraint"}) 

57 

58 def add_forbidden(self: PCSObject, **kwargs: any) -> None: 

59 """Add a forbidden clause to the PCSObject.""" 

60 # TODO add checks 

61 self.params.append({**kwargs, "type": "forbidden"}) 

62 

63 def add_comment(self: PCSObject, **kwargs: any) -> None: 

64 """Add a comment to the PCSObject.""" 

65 # TODO add checks 

66 self.params.append({**kwargs, "type": "comment"}) 

67 

68 def clear(self: PCSObject) -> None: 

69 """Clear the PCSObject.""" 

70 self.params = [] 

71 

72 def get(self: PCSObject, name: str) -> dict: 

73 """Get a parameter from the PCSObject based on the name.""" 

74 names = {p["name"]: i for i, p in enumerate(self.params) if "name" in p} 

75 if name in names: 

76 return self.params[names[name]] 

77 return None 

78 

79 

80class PCSConvention(Enum): 

81 """Internal pcs convention enum.""" 

82 unknown = "" 

83 SMAC = "smac" 

84 ParamILS = "paramils" 

85 IRACE = "irace" 

86 ConfigSpace = "configspace" 

87 

88 

89class PCSParser(ABC): 

90 """Base interface object for the parser. 

91 

92 It loads the pcs files into the generic pcs object. Once a parameter file is loaded, 

93 it can be exported to another file 

94 """ 

95 

96 def __init__(self: PCSParser, inherit: PCSParser = None) -> None: 

97 """Initialize the PCSParser.""" 

98 if inherit is None: 

99 self.pcs = PCSObject() 

100 else: 

101 self.pcs = inherit.pcs 

102 

103 @staticmethod 

104 def _format_string_to_enum(string: str) -> PCSConvention: 

105 """Convert string to PCSConvention.""" 

106 for form in PCSConvention: 

107 if form.value == string.lower(): 

108 return form 

109 raise Exception("ERROR: parameter configuration space format is not supported.") 

110 

111 def check_validity(self: PCSParser) -> bool: 

112 """Check the validity of the pcs.""" 

113 # TODO implement 

114 

115 # check if for all parameters in constraints and forbidden clauses exists 

116 # Check for conflict between default values and constraints and forbidden clauses 

117 return True 

118 

119 def load(self: PCSParser, filepath: Path, convention: str = "smac") -> None: 

120 """Main import function.""" 

121 if isinstance(filepath, str): 

122 filepath = Path(filepath) 

123 convention = self._format_string_to_enum(convention) 

124 

125 if convention == PCSConvention.SMAC: 

126 lines = filepath.open().readlines() 

127 parser = SMACParser(self) 

128 parser.parse(lines) 

129 self.pcs = parser.pcs 

130 elif convention == PCSConvention.ConfigSpace: 

131 if filepath.suffix == ".yaml": 

132 self.pcs = ConfigSpace.ConfigurationSpace.from_yaml(filepath) 

133 elif filepath.suffix == ".json": 

134 self.pcs = ConfigSpace.ConfigurationSpace.from_json(filepath) 

135 else: 

136 raise Exception(f"File type for {convention.value}: {filepath.suffix}" 

137 "not in accepted types: {'.yaml', '.json'}") 

138 else: 

139 raise Exception(f"ERROR: Importing the pcs convention for {convention.value}" 

140 " is not yet implemented.") 

141 

142 def export(self: PCSParser, 

143 destination: Path, 

144 convention: str = "smac") -> None: 

145 """Main export function.""" 

146 convention = self._format_string_to_enum(convention) 

147 # TODO: SMAC2 writer 

148 if convention == PCSConvention.ParamILS: 

149 pcs = ParamILSParser(self).compile() 

150 destination.open("w").write("### Parameter file generated by Sparkle\n" 

151 f"{pcs}\n") 

152 elif convention == PCSConvention.IRACE: 

153 pcs, forbidden = IRACEParser(self).compile() 

154 forbidden_file_name = destination.stem + "_forbidden.txt" 

155 (destination.parent / forbidden_file_name).open("w").write(forbidden) 

156 destination.open("w").write("### Parameter file generated by Sparkle\n" 

157 f"{pcs}\n") 

158 elif convention == PCSConvention.ConfigSpace: 

159 self.pcs.to_json(destination) 

160 else: 

161 raise Exception(f"ERROR: Exporting the pcs convention for {convention.value}" 

162 " is not yet implemented.") 

163 

164 def get_configspace(self: PCSObject) -> ConfigSpace.ConfigurationSpace: 

165 """Get the ConfigurationSpace representationof the PCS file.""" 

166 cs = ConfigSpace.ConfigurationSpace() 

167 parameters = [p for p in self.pcs.params if p["type"] == "parameter"] 

168 constraints = [c for c in self.pcs.params if c["type"] == "constraint"] 

169 forbidden = [f for f in self.pcs.params if f["type"] == "forbidden"] 

170 for p in parameters: 

171 if p["structure"] == "integer": 

172 csparam = ConfigSpace.UniformIntegerHyperparameter( 

173 name=p["name"], 

174 lower=int(p["domain"][0]), 

175 upper=int(p["domain"][1]), 

176 default_value=int(p["default"]), 

177 log=p["scale"] == "log", 

178 ) 

179 # BetaIntegerHyperparameter 

180 # Requires a alpha and beta 

181 # NormalIntegerHyperparameter 

182 # Requires a mu and sigma (mean and std deviation) 

183 elif p["structure"] == "real": 

184 csparam = ConfigSpace.UniformFloatHyperparameter( 

185 name=p["name"], 

186 lower=float(p["domain"][0]), 

187 upper=float(p["domain"][1]), 

188 default_value=float(p["default"]), 

189 log=p["scale"] == "log", 

190 ) 

191 # BetaFloatHyperparameter 

192 # Requires an Alpha and Beta of the distribution 

193 # NormalFloatHyperparameter 

194 # Requires mu and sigma (Mean and std dev) 

195 elif p["structure"] == "categorical": 

196 csparam = ConfigSpace.CategoricalHyperparameter( 

197 name=p["name"], 

198 choices=p["domain"], 

199 default_value=p["default"], 

200 # Does not seem to contain any weights? 

201 ) 

202 elif p["structure"] == "ordinal": 

203 csparam = ConfigSpace.OrdinalHyperparameter( 

204 name=p["name"], 

205 sequence=p["domain"], 

206 default_value=p["default"], 

207 ) 

208 else: 

209 raise Exception(f"ERROR: Unknown parameter structure: {p['structure']}") 

210 # NOTE: Missing: 

211 # elif p["structure"] == "constant": 

212 cs.add(csparam) 

213 for constraint in constraints: 

214 # Constraints are called conditions in ConfigSpace, connected w conjections 

215 conjunction = None 

216 for operator, clause in constraint["conditions"]: 

217 parent = cs[clause["parameter"]] 

218 try: 

219 if "items" in clause: 

220 values = [type(parent.default_value)(i) for i in clause["items"]] 

221 else: 

222 values = type(parent.default_value)(clause["value"]) 

223 except Exception: 

224 raise TypeError( 

225 f"The clause {clause['items']} contains values that are not of " 

226 f"the same type as parameter {clause['parameter']} " 

227 f"[{type(parent.default_value)}].") 

228 if "quantifier" not in clause: 

229 condition = ConfigSpace.InCondition( 

230 child=cs[constraint["parameter"]], 

231 parent=parent, 

232 values=values, 

233 ) 

234 elif clause["quantifier"] == "==": 

235 condition = ConfigSpace.EqualsCondition( 

236 child=cs[constraint["parameter"]], 

237 parent=parent, 

238 value=values, 

239 ) 

240 elif clause["quantifier"] == "!=": 

241 condition = ConfigSpace.NotEqualsCondition( 

242 child=cs[constraint["parameter"]], 

243 parent=parent, 

244 value=values, 

245 ) 

246 elif clause["quantifier"] == ">": 

247 condition = ConfigSpace.GreaterThanCondition( 

248 child=cs[constraint["parameter"]], 

249 parent=parent, 

250 value=values, 

251 ) 

252 elif clause["quantifier"] == "<": 

253 condition = ConfigSpace.LessThanCondition( 

254 child=cs[constraint["parameter"]], 

255 parent=parent, 

256 value=values, 

257 ) 

258 # NOTE from SMAC2: 

259 # There is no support for parenthesis with conditionals. 

260 # The && connective has higher precedence than ||, so 

261 # a||b&& c||d is the same as: a||(b&&c)||d 

262 if conjunction is None: 

263 conjunction = condition 

264 elif operator == "&&": 

265 conjunction = ConfigSpace.AndConjunction(conjunction, condition) 

266 elif operator == "||": 

267 conjunction = ConfigSpace.OrConjunction(conjunction, condition) 

268 else: 

269 raise Exception(f"ERROR: Unknown conjunction operator: {operator}") 

270 cs.add(conjunction) 

271 for forbid in forbidden: 

272 # TODO: This section is ill supported by PCSParser so the values 

273 # we find are wrong or incomplete for advanced clause types: 

274 # It does not support &&/|| operators or multi variable in a single statement 

275 if forbid["clause_type"] == "advanced": 

276 print("WARNING: Advanced clauses not supported in PCSParser. " 

277 f"Skipping forbidden clause: {forbid['clauses']}") 

278 continue 

279 # Therefore, we can only add forbidden with "=" operator and "&&" conjunction 

280 conjunction = None 

281 

282 for clause in forbid["clauses"]: 

283 parameter = cs[clause["param"]] 

284 clause = ConfigSpace.ForbiddenEqualsClause( 

285 hyperparameter=parameter, 

286 value=type(parameter.default_value)(clause["value"]), 

287 ) 

288 if conjunction is None: 

289 conjunction = clause 

290 else: 

291 conjunction = ConfigSpace.ForbiddenAndConjunction(conjunction, 

292 clause) 

293 cs.add(conjunction) 

294 return cs 

295 

296 

297class SMACParser(PCSParser): 

298 """The SMAC parser class.""" 

299 

300 def parse(self: SMACParser, lines: list[str]) -> None: 

301 """Parse the pcs file.""" 

302 self.pcs.clear() 

303 

304 # PARAMS 

305 for line in lines: 

306 # The only forbidden characters in parameter names are: 

307 # spaces, commas, quotes, and parentheses 

308 regex = (r"(?P<name>[^\s\"',]*)\s+(?P<structure>\w*)\s+(?P<domain>(\[|\{)" 

309 r".*(\]|\}))\s*\[(?P<default>.*)\]\s*(?P<scale>log)" 

310 r"*\s*#*(?P<comment>.*)") 

311 m = re.match(regex, line) 

312 if m is not None: 

313 fields = m.groupdict() 

314 fields["domain"] = re.sub(r"(?:\[|\]|\{|\})", "", fields["domain"]) 

315 fields["domain"] = re.split(r"\s*,\s*", fields["domain"]) 

316 self.pcs.add_param(**fields) 

317 continue 

318 

319 # CONSTRAINTS 

320 regex = (r"(?P<parameter>[^\s\"',]+)\s*\|\s" 

321 r"*(?P<conditions>.+)\s*#*(?P<comment>.*)") 

322 m = re.match(regex, line) 

323 if m is not None: 

324 constraint = m.groupdict() 

325 constraint["conditions"] = self._parse_conditions( 

326 constraint["conditions"]) 

327 self.pcs.add_constraint(**constraint) 

328 continue 

329 

330 # FORBIDDEN CLAUSES 

331 regex = r"\s*\{(?P<clauses>[^\}]+)\}\s*#*(?P<comment>.*)" 

332 m = re.match(regex, line) 

333 if m is not None: 

334 forbidden = m.groupdict() 

335 conditions = [] 

336 # Simple clauses 

337 # {<parameter name 1>=<value 1>, ..., <parameter name N>=<value N>} 

338 if "," in forbidden["clauses"]: 

339 forbidden["clause_type"] = "simple" 

340 for clause in re.split(r"\s*,\s*", forbidden["clauses"]): 

341 m = re.match(r"(?P<param>[^\s\"',=]+)\s*=\s*" 

342 r"(?P<value>[^\s\"',]+)", clause) 

343 if m is not None: 

344 conditions.append(m.groupdict()) 

345 else: 

346 print(clause, "ERROR") 

347 

348 else: # Advanced clauses 

349 forbidden["clause_type"] = "advanced" 

350 # TODO decide if we need to further parse this down 

351 conditions = [expr for expr in re.split(r"\s*(?:\|\||&&)\s*", 

352 forbidden["clauses"])] 

353 

354 if len(conditions) == 0: 

355 raise Exception(f"ERROR: cannot parse the following line:\n'{line}'") 

356 

357 forbidden["clauses"] = conditions 

358 

359 self.pcs.add_forbidden(**forbidden) 

360 continue 

361 

362 # COMMENTLINE 

363 regex = r"\s*#(?P<comment>.*)" 

364 m = re.match(regex, line) 

365 if m is not None: 

366 comment = m.groupdict() 

367 self.pcs.add_comment(**comment) 

368 continue 

369 

370 # EMTPY LINE 

371 regex = r"^\s*$" 

372 m = re.match(regex, line) 

373 if m is not None: 

374 continue 

375 

376 # RAISE ERROR 

377 raise Exception(f"ERROR: cannot parse the following line: \n'{line}'") 

378 

379 return 

380 

381 def _parse_conditions(self: SMACParser, conditions: str) -> list[tuple]: 

382 """Parse the conditions.""" 

383 conditionlist = [] 

384 condition = None 

385 operator = None 

386 nested = 0 

387 nested_start = 0 

388 condition_start = 0 

389 for pos, char in enumerate(conditions): 

390 # Nested clauses 

391 if char == "(": 

392 if nested == 0: 

393 nested_start = pos 

394 nested += 1 

395 elif char == ")": 

396 nested -= 1 

397 if nested == 0: 

398 condition = self._parse_conditions(conditions[nested_start + 1:pos]) 

399 conditionlist.append((operator, condition)) 

400 if (pos + 1) == len(conditions): 

401 return conditionlist 

402 

403 if pos > 1 and nested == 0: 

404 for op in ["||", "&&"]: 

405 if conditions[pos - 1: pos + 1] == op: 

406 if not isinstance(condition, list): 

407 condition = self._parse_condition( 

408 conditions[condition_start:pos - 1]) 

409 conditionlist.append((operator, condition)) 

410 

411 operator = op 

412 condition_start = pos + 1 

413 

414 condition = self._parse_condition(conditions[condition_start:len(conditions)]) 

415 conditionlist.append((operator, condition)) 

416 

417 return conditionlist 

418 

419 @staticmethod 

420 def _parse_condition(condition: str) -> dict: 

421 """Parse the condition.""" 

422 cont = False 

423 

424 m = re.match(r"\s*(?P<parameter>[^\s\"',]+)\s*(?P<quantifier>==|!=|<|>|<=|>=)" 

425 r"\s*(?P<value>[^\s\"',]+)\s*", condition) 

426 if m is not None: 

427 condition = { 

428 **m.groupdict(), 

429 "type": "numerical", 

430 } 

431 cont = True 

432 

433 if not cont: 

434 m = re.match(r"\s*(?P<parameter>[^\s\"',]+)\s+" 

435 r"in\s*\{(?P<items>[^\}]+)\}\s*", condition) 

436 if m is not None: 

437 condition = { 

438 **m.groupdict(), 

439 "type": "categorical", 

440 } 

441 condition["items"] = re.split(r",\s*", condition["items"]) 

442 cont = True 

443 

444 if not cont: 

445 raise Exception(f"ERROR: Couldn't parse '{condition}'") 

446 

447 return condition 

448 

449 def compile(self: SMACParser) -> str: 

450 """Compile the PCS.""" 

451 # TODO implement 

452 pass 

453 

454 

455class ParamILSParser(PCSParser): 

456 """PCS parser for ParamILS format.""" 

457 

458 def parse(self: ParamILSParser, lines: list[str]) -> None: 

459 """Parse the PCS.""" 

460 # TODO implement 

461 pass 

462 

463 def compile(self: ParamILSParser) -> str: 

464 """Compile the PCS.""" 

465 # TODO Produce warning if certain specifications cannot be kept in this format 

466 # TODO granularity parameter that sets how log and real ranges should be expanded 

467 granularity = 20 

468 

469 lines = [] 

470 for item in self.pcs.params: 

471 if item["type"] == "parameter": 

472 if item["structure"] in ["ordinal", "categorical"]: 

473 domain = ",".join(item["domain"]) 

474 elif item["structure"] == "integer": 

475 if len(item["domain"]) != 2: 

476 raise ValueError(f"Domain {item['domain']} not supported.") 

477 

478 (minval, maxval) = [int(i) for i in item["domain"]] 

479 if item["scale"] != "log": 

480 # domain = f"{minval}, {(minval + 1)}..{maxval}" 

481 domain = list(np.linspace(minval, maxval, granularity)) 

482 domain = list(set(np.round(domain).astype(int))) # Cast to int 

483 if int(item["default"]) not in domain: 

484 domain += [int(item["default"])] 

485 domain.sort() 

486 

487 domain = ",".join([str(i) for i in domain]) 

488 else: 

489 domain = list(np.unique(np.geomspace(minval, maxval, granularity, 

490 dtype=int))) 

491 # add default value 

492 if int(item["default"]) not in domain: 

493 domain += [int(item["default"])] 

494 domain.sort() 

495 

496 domain = ",".join([str(i) for i in domain]) 

497 

498 elif item["structure"] == "real": 

499 if len(item["domain"]) != 2: 

500 raise ValueError(f"Domain {item['domain']} not supported.") 

501 

502 (minval, maxval) = [float(i) for i in item["domain"]] 

503 if item["scale"] != "log": 

504 domain = list(np.linspace(minval, maxval, granularity)) 

505 else: 

506 domain = list(np.unique(np.geomspace(minval, maxval, granularity, 

507 dtype=float))) 

508 # add default value 

509 if float(item["default"]) not in domain: 

510 domain += [float(item["default"])] 

511 domain.sort() 

512 

513 # Filter duplicated in string format 

514 domain = list(set([f"{i}" for i in domain])) 

515 domain.sort(key=float) 

516 domain = ",".join(domain) 

517 

518 domain = "{" + domain + "}" 

519 line = f"{item['name']} {domain} [{item['default']}]" 

520 if item["comment"] != "": 

521 line += f" #{item['comment']}" 

522 

523 lines.append(line) 

524 

525 for item in self.pcs.params: 

526 if item["type"] == "constraint": 

527 line = f"{item['parameter']} | " 

528 line += self._compile_conditions(item["conditions"]) 

529 if item["comment"] != "": 

530 line += f" #{item['comment']}" 

531 lines.append(line) 

532 

533 for item in self.pcs.params: 

534 if item["type"] == "forbidden": 

535 if item["clause_type"] == "simple": 

536 clauses = [f"{cls['param']}={cls['value']}" 

537 for cls in item["clauses"]] 

538 line = "{" + ",".join(clauses) + "}" 

539 if item["comment"] != "": 

540 line += f"#{item['comment']}" 

541 lines.append(line) 

542 else: 

543 print("WARNING: Advanced forbidden clauses " 

544 "are not supported by ParamILS.") 

545 pass 

546 

547 lines = "\n".join(lines) 

548 return lines 

549 

550 def _compile_conditions(self: ParamILSParser, conditions: list[tuple]) -> str: 

551 """Compile a list of conditions.""" 

552 line = "" 

553 for operator, condition in conditions: 

554 if operator is not None: 

555 line += f" {operator} " 

556 

557 if isinstance(condition, list): 

558 line += f"({self._compile_conditions(condition)})" 

559 else: 

560 if condition["type"] == "numerical": 

561 line += f"{condition['parameter']} in " + "{" 

562 param = self.pcs.get(condition["parameter"]) 

563 if param["structure"] == "categorical": 

564 if condition["value"] in param["domain"]: 

565 line += f"{condition['value']}" + "}" 

566 # line += "{parameter} {quantifier} {value}".format(**condition) 

567 if condition["type"] == "categorical": 

568 items = ", ".join(condition["items"]) 

569 line += f"{condition['parameter']} in {{{items}}}" 

570 return line 

571 

572 

573class IRACEParser(PCSParser): 

574 """Base interface object for the parser. 

575 

576 It loads the IRACE pcs files into the generic pcs object. 

577 Once a parameter file is loaded, it can be exported to another file. 

578 """ 

579 

580 def __init__(self: IRACEParser, inherit: IRACEParser = None) -> None: 

581 """Initialize the IRACEParser.""" 

582 if inherit is None: 

583 self.pcs = PCSObject() 

584 else: 

585 self.pcs = inherit.pcs 

586 

587 def parse(self: IRACEParser, lines: list[str]) -> None: 

588 """Parse the pcs file.""" 

589 # TODO implement 

590 pass 

591 

592 def compile(self: IRACEParser) -> tuple[str, str]: 

593 """Compile the PCS.""" 

594 # Create pcs table 

595 header = ["# name", "switch", "type", "values", 

596 "[conditions (using R syntax)]"] 

597 rows = [] 

598 forbidden = [f for f in self.pcs.params if f["type"] == "forbidden"] 

599 constraints = [c for c in self.pcs.params if c["type"] == "constraint"] 

600 for param in [p for p in self.pcs.params if p["type"] == "parameter"]: 

601 # IRACE writes conditions on the same line as param definitions 

602 param_constraint = [c for c in constraints 

603 if c["parameter"] == param["name"]] 

604 condition_str = "|" 

605 for constraint in param_constraint: 

606 for operator, condition in constraint["conditions"]: 

607 operator = operator if operator is not None else "" 

608 condition_str +=\ 

609 (f" {operator} {condition['parameter']} %in% " 

610 f"{condition['type'][0]}({','.join(condition['items'])})") 

611 if condition_str == "|": 

612 condition_str = "" 

613 rows.append([param["name"], # Parameter name 

614 f'"--{param["""name"""]} "', # Parameter argument name 

615 param["structure"][0], # Parameter type 

616 f"({','.join(param['domain'])})", # Parameter range/domain 

617 condition_str]) # Parameter conditions 

618 forbidden_rows = [] 

619 for f in forbidden: 

620 forbidden_rows.append(" & ".join([f"({c['param']} = {c['value']})" 

621 for c in f["clauses"]])) 

622 return tabulate.tabulate(rows, headers=header, tablefmt="plain", 

623 numalign="left"), "\n".join(forbidden_rows)