Coverage for sparkle/tools/pcsparser.py: 56%
344 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-07 15:22 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-07 15:22 +0000
1"""The Parameter Configuration Space Parser class."""
2from __future__ import annotations
3import re
4import sys
5import numpy as np
6from enum import Enum
7from abc import ABC
8from pathlib import Path
10import tabulate
11import ConfigSpace
14class PCSObject(ABC):
15 """General data structure to keep the pcs file in.
17 Fields are added by functions, such that checks can be conducted.
18 """
19 def __init__(self: PCSObject) -> None:
20 """Initialize the PCSObject."""
21 self.params = []
23 def add_param(self: PCSObject,
24 name: str,
25 structure: str = "integer",
26 domain: list = [-sys.maxsize, sys.maxsize],
27 scale: str = "linear",
28 default: str = "0",
29 comment: str = None) -> None:
30 """Add a parameter to the PCSObject."""
31 if structure not in ["integer", "real", "categorical", "ordinal"]:
32 raise ValueError(f"Parameter structure {structure} not supported.")
34 # Domain check
35 if structure in ["integer", "real"]:
36 if len(domain) != 2:
37 raise ValueError(f"Parameter domain {domain} not supported.")
38 pass
39 elif structure == "categorical":
40 # TODO: check categories
41 scale = None
43 self.params.append({
44 "name": name,
45 "structure": structure,
46 "domain": domain,
47 "scale": scale,
48 "default": default,
49 "comment": comment,
50 "type": "parameter",
51 })
53 def add_constraint(self: PCSObject, **kwargs: any) -> None:
54 """Add a constraint to the PCSObject."""
55 # TODO add checks
56 self.params.append({**kwargs, "type": "constraint"})
58 def add_forbidden(self: PCSObject, **kwargs: any) -> None:
59 """Add a forbidden clause to the PCSObject."""
60 # TODO add checks
61 self.params.append({**kwargs, "type": "forbidden"})
63 def add_comment(self: PCSObject, **kwargs: any) -> None:
64 """Add a comment to the PCSObject."""
65 # TODO add checks
66 self.params.append({**kwargs, "type": "comment"})
68 def clear(self: PCSObject) -> None:
69 """Clear the PCSObject."""
70 self.params = []
72 def get(self: PCSObject, name: str) -> dict:
73 """Get a parameter from the PCSObject based on the name."""
74 names = {p["name"]: i for i, p in enumerate(self.params) if "name" in p}
75 if name in names:
76 return self.params[names[name]]
77 return None
80class PCSConvention(Enum):
81 """Internal pcs convention enum."""
82 unknown = ""
83 SMAC = "smac"
84 ParamILS = "paramils"
85 IRACE = "irace"
86 ConfigSpace = "configspace"
89class PCSParser(ABC):
90 """Base interface object for the parser.
92 It loads the pcs files into the generic pcs object. Once a parameter file is loaded,
93 it can be exported to another file
94 """
96 def __init__(self: PCSParser, inherit: PCSParser = None) -> None:
97 """Initialize the PCSParser."""
98 if inherit is None:
99 self.pcs = PCSObject()
100 else:
101 self.pcs = inherit.pcs
103 @staticmethod
104 def _format_string_to_enum(string: str) -> PCSConvention:
105 """Convert string to PCSConvention."""
106 for form in PCSConvention:
107 if form.value == string.lower():
108 return form
109 raise Exception("ERROR: parameter configuration space format is not supported.")
111 def check_validity(self: PCSParser) -> bool:
112 """Check the validity of the pcs."""
113 # TODO implement
115 # check if for all parameters in constraints and forbidden clauses exists
116 # Check for conflict between default values and constraints and forbidden clauses
117 return True
119 def load(self: PCSParser, filepath: Path, convention: str = "smac") -> None:
120 """Main import function."""
121 if isinstance(filepath, str):
122 filepath = Path(filepath)
123 convention = self._format_string_to_enum(convention)
125 if convention == PCSConvention.SMAC:
126 lines = filepath.open().readlines()
127 parser = SMACParser(self)
128 parser.parse(lines)
129 self.pcs = parser.pcs
130 elif convention == PCSConvention.ConfigSpace:
131 if filepath.suffix == ".yaml":
132 self.pcs = ConfigSpace.ConfigurationSpace.from_yaml(filepath)
133 elif filepath.suffix == ".json":
134 self.pcs = ConfigSpace.ConfigurationSpace.from_json(filepath)
135 else:
136 raise Exception(f"File type for {convention.value}: {filepath.suffix}"
137 "not in accepted types: {'.yaml', '.json'}")
138 else:
139 raise Exception(f"ERROR: Importing the pcs convention for {convention.value}"
140 " is not yet implemented.")
142 def export(self: PCSParser,
143 destination: Path,
144 convention: str = "smac") -> None:
145 """Main export function."""
146 convention = self._format_string_to_enum(convention)
147 # TODO: SMAC2 writer
148 if convention == PCSConvention.ParamILS:
149 pcs = ParamILSParser(self).compile()
150 destination.open("w").write("### Parameter file generated by Sparkle\n"
151 f"{pcs}\n")
152 elif convention == PCSConvention.IRACE:
153 pcs, forbidden = IRACEParser(self).compile()
154 forbidden_file_name = destination.stem + "_forbidden.txt"
155 (destination.parent / forbidden_file_name).open("w").write(forbidden)
156 destination.open("w").write("### Parameter file generated by Sparkle\n"
157 f"{pcs}\n")
158 elif convention == PCSConvention.ConfigSpace:
159 self.pcs.to_json(destination)
160 else:
161 raise Exception(f"ERROR: Exporting the pcs convention for {convention.value}"
162 " is not yet implemented.")
164 def get_configspace(self: PCSObject) -> ConfigSpace.ConfigurationSpace:
165 """Get the ConfigurationSpace representationof the PCS file."""
166 cs = ConfigSpace.ConfigurationSpace()
167 parameters = [p for p in self.pcs.params if p["type"] == "parameter"]
168 constraints = [c for c in self.pcs.params if c["type"] == "constraint"]
169 forbidden = [f for f in self.pcs.params if f["type"] == "forbidden"]
170 for p in parameters:
171 if p["structure"] == "integer":
172 csparam = ConfigSpace.UniformIntegerHyperparameter(
173 name=p["name"],
174 lower=int(p["domain"][0]),
175 upper=int(p["domain"][1]),
176 default_value=int(p["default"]),
177 log=p["scale"] == "log",
178 )
179 # BetaIntegerHyperparameter
180 # Requires a alpha and beta
181 # NormalIntegerHyperparameter
182 # Requires a mu and sigma (mean and std deviation)
183 elif p["structure"] == "real":
184 csparam = ConfigSpace.UniformFloatHyperparameter(
185 name=p["name"],
186 lower=float(p["domain"][0]),
187 upper=float(p["domain"][1]),
188 default_value=float(p["default"]),
189 log=p["scale"] == "log",
190 )
191 # BetaFloatHyperparameter
192 # Requires an Alpha and Beta of the distribution
193 # NormalFloatHyperparameter
194 # Requires mu and sigma (Mean and std dev)
195 elif p["structure"] == "categorical":
196 csparam = ConfigSpace.CategoricalHyperparameter(
197 name=p["name"],
198 choices=p["domain"],
199 default_value=p["default"],
200 # Does not seem to contain any weights?
201 )
202 elif p["structure"] == "ordinal":
203 csparam = ConfigSpace.OrdinalHyperparameter(
204 name=p["name"],
205 sequence=p["domain"],
206 default_value=p["default"],
207 )
208 else:
209 raise Exception(f"ERROR: Unknown parameter structure: {p['structure']}")
210 # NOTE: Missing:
211 # elif p["structure"] == "constant":
212 cs.add(csparam)
213 for constraint in constraints:
214 # Constraints are called conditions in ConfigSpace, connected w conjections
215 conjunction = None
216 for operator, clause in constraint["conditions"]:
217 parent = cs[clause["parameter"]]
218 try:
219 if "items" in clause:
220 values = [type(parent.default_value)(i) for i in clause["items"]]
221 else:
222 values = type(parent.default_value)(clause["value"])
223 except Exception:
224 raise TypeError(
225 f"The clause {clause['items']} contains values that are not of "
226 f"the same type as parameter {clause['parameter']} "
227 f"[{type(parent.default_value)}].")
228 if "quantifier" not in clause:
229 condition = ConfigSpace.InCondition(
230 child=cs[constraint["parameter"]],
231 parent=parent,
232 values=values,
233 )
234 elif clause["quantifier"] == "==":
235 condition = ConfigSpace.EqualsCondition(
236 child=cs[constraint["parameter"]],
237 parent=parent,
238 value=values,
239 )
240 elif clause["quantifier"] == "!=":
241 condition = ConfigSpace.NotEqualsCondition(
242 child=cs[constraint["parameter"]],
243 parent=parent,
244 value=values,
245 )
246 elif clause["quantifier"] == ">":
247 condition = ConfigSpace.GreaterThanCondition(
248 child=cs[constraint["parameter"]],
249 parent=parent,
250 value=values,
251 )
252 elif clause["quantifier"] == "<":
253 condition = ConfigSpace.LessThanCondition(
254 child=cs[constraint["parameter"]],
255 parent=parent,
256 value=values,
257 )
258 # NOTE from SMAC2:
259 # There is no support for parenthesis with conditionals.
260 # The && connective has higher precedence than ||, so
261 # a||b&& c||d is the same as: a||(b&&c)||d
262 if conjunction is None:
263 conjunction = condition
264 elif operator == "&&":
265 conjunction = ConfigSpace.AndConjunction(conjunction, condition)
266 elif operator == "||":
267 conjunction = ConfigSpace.OrConjunction(conjunction, condition)
268 else:
269 raise Exception(f"ERROR: Unknown conjunction operator: {operator}")
270 cs.add(conjunction)
271 for forbid in forbidden:
272 # TODO: This section is ill supported by PCSParser so the values
273 # we find are wrong or incomplete for advanced clause types:
274 # It does not support &&/|| operators or multi variable in a single statement
275 if forbid["clause_type"] == "advanced":
276 print("WARNING: Advanced clauses not supported in PCSParser. "
277 f"Skipping forbidden clause: {forbid['clauses']}")
278 continue
279 # Therefore, we can only add forbidden with "=" operator and "&&" conjunction
280 conjunction = None
282 for clause in forbid["clauses"]:
283 parameter = cs[clause["param"]]
284 clause = ConfigSpace.ForbiddenEqualsClause(
285 hyperparameter=parameter,
286 value=type(parameter.default_value)(clause["value"]),
287 )
288 if conjunction is None:
289 conjunction = clause
290 else:
291 conjunction = ConfigSpace.ForbiddenAndConjunction(conjunction,
292 clause)
293 cs.add(conjunction)
294 return cs
297class SMACParser(PCSParser):
298 """The SMAC parser class."""
300 def parse(self: SMACParser, lines: list[str]) -> None:
301 """Parse the pcs file."""
302 self.pcs.clear()
304 # PARAMS
305 for line in lines:
306 # The only forbidden characters in parameter names are:
307 # spaces, commas, quotes, and parentheses
308 regex = (r"(?P<name>[^\s\"',]*)\s+(?P<structure>\w*)\s+(?P<domain>(\[|\{)"
309 r".*(\]|\}))\s*\[(?P<default>.*)\]\s*(?P<scale>log)"
310 r"*\s*#*(?P<comment>.*)")
311 m = re.match(regex, line)
312 if m is not None:
313 fields = m.groupdict()
314 fields["domain"] = re.sub(r"(?:\[|\]|\{|\})", "", fields["domain"])
315 fields["domain"] = re.split(r"\s*,\s*", fields["domain"])
316 self.pcs.add_param(**fields)
317 continue
319 # CONSTRAINTS
320 regex = (r"(?P<parameter>[^\s\"',]+)\s*\|\s"
321 r"*(?P<conditions>.+)\s*#*(?P<comment>.*)")
322 m = re.match(regex, line)
323 if m is not None:
324 constraint = m.groupdict()
325 constraint["conditions"] = self._parse_conditions(
326 constraint["conditions"])
327 self.pcs.add_constraint(**constraint)
328 continue
330 # FORBIDDEN CLAUSES
331 regex = r"\s*\{(?P<clauses>[^\}]+)\}\s*#*(?P<comment>.*)"
332 m = re.match(regex, line)
333 if m is not None:
334 forbidden = m.groupdict()
335 conditions = []
336 # Simple clauses
337 # {<parameter name 1>=<value 1>, ..., <parameter name N>=<value N>}
338 if "," in forbidden["clauses"]:
339 forbidden["clause_type"] = "simple"
340 for clause in re.split(r"\s*,\s*", forbidden["clauses"]):
341 m = re.match(r"(?P<param>[^\s\"',=]+)\s*=\s*"
342 r"(?P<value>[^\s\"',]+)", clause)
343 if m is not None:
344 conditions.append(m.groupdict())
345 else:
346 print(clause, "ERROR")
348 else: # Advanced clauses
349 forbidden["clause_type"] = "advanced"
350 # TODO decide if we need to further parse this down
351 conditions = [expr for expr in re.split(r"\s*(?:\|\||&&)\s*",
352 forbidden["clauses"])]
354 if len(conditions) == 0:
355 raise Exception(f"ERROR: cannot parse the following line:\n'{line}'")
357 forbidden["clauses"] = conditions
359 self.pcs.add_forbidden(**forbidden)
360 continue
362 # COMMENTLINE
363 regex = r"\s*#(?P<comment>.*)"
364 m = re.match(regex, line)
365 if m is not None:
366 comment = m.groupdict()
367 self.pcs.add_comment(**comment)
368 continue
370 # EMTPY LINE
371 regex = r"^\s*$"
372 m = re.match(regex, line)
373 if m is not None:
374 continue
376 # RAISE ERROR
377 raise Exception(f"ERROR: cannot parse the following line: \n'{line}'")
379 return
381 def _parse_conditions(self: SMACParser, conditions: str) -> list[tuple]:
382 """Parse the conditions."""
383 conditionlist = []
384 condition = None
385 operator = None
386 nested = 0
387 nested_start = 0
388 condition_start = 0
389 for pos, char in enumerate(conditions):
390 # Nested clauses
391 if char == "(":
392 if nested == 0:
393 nested_start = pos
394 nested += 1
395 elif char == ")":
396 nested -= 1
397 if nested == 0:
398 condition = self._parse_conditions(conditions[nested_start + 1:pos])
399 conditionlist.append((operator, condition))
400 if (pos + 1) == len(conditions):
401 return conditionlist
403 if pos > 1 and nested == 0:
404 for op in ["||", "&&"]:
405 if conditions[pos - 1: pos + 1] == op:
406 if not isinstance(condition, list):
407 condition = self._parse_condition(
408 conditions[condition_start:pos - 1])
409 conditionlist.append((operator, condition))
411 operator = op
412 condition_start = pos + 1
414 condition = self._parse_condition(conditions[condition_start:len(conditions)])
415 conditionlist.append((operator, condition))
417 return conditionlist
419 @staticmethod
420 def _parse_condition(condition: str) -> dict:
421 """Parse the condition."""
422 cont = False
424 m = re.match(r"\s*(?P<parameter>[^\s\"',]+)\s*(?P<quantifier>==|!=|<|>|<=|>=)"
425 r"\s*(?P<value>[^\s\"',]+)\s*", condition)
426 if m is not None:
427 condition = {
428 **m.groupdict(),
429 "type": "numerical",
430 }
431 cont = True
433 if not cont:
434 m = re.match(r"\s*(?P<parameter>[^\s\"',]+)\s+"
435 r"in\s*\{(?P<items>[^\}]+)\}\s*", condition)
436 if m is not None:
437 condition = {
438 **m.groupdict(),
439 "type": "categorical",
440 }
441 condition["items"] = re.split(r",\s*", condition["items"])
442 cont = True
444 if not cont:
445 raise Exception(f"ERROR: Couldn't parse '{condition}'")
447 return condition
449 def compile(self: SMACParser) -> str:
450 """Compile the PCS."""
451 # TODO implement
452 pass
455class ParamILSParser(PCSParser):
456 """PCS parser for ParamILS format."""
458 def parse(self: ParamILSParser, lines: list[str]) -> None:
459 """Parse the PCS."""
460 # TODO implement
461 pass
463 def compile(self: ParamILSParser) -> str:
464 """Compile the PCS."""
465 # TODO Produce warning if certain specifications cannot be kept in this format
466 # TODO granularity parameter that sets how log and real ranges should be expanded
467 granularity = 20
469 lines = []
470 for item in self.pcs.params:
471 if item["type"] == "parameter":
472 if item["structure"] in ["ordinal", "categorical"]:
473 domain = ",".join(item["domain"])
474 elif item["structure"] == "integer":
475 if len(item["domain"]) != 2:
476 raise ValueError(f"Domain {item['domain']} not supported.")
478 (minval, maxval) = [int(i) for i in item["domain"]]
479 if item["scale"] != "log":
480 # domain = f"{minval}, {(minval + 1)}..{maxval}"
481 domain = list(np.linspace(minval, maxval, granularity))
482 domain = list(set(np.round(domain).astype(int))) # Cast to int
483 if int(item["default"]) not in domain:
484 domain += [int(item["default"])]
485 domain.sort()
487 domain = ",".join([str(i) for i in domain])
488 else:
489 domain = list(np.unique(np.geomspace(minval, maxval, granularity,
490 dtype=int)))
491 # add default value
492 if int(item["default"]) not in domain:
493 domain += [int(item["default"])]
494 domain.sort()
496 domain = ",".join([str(i) for i in domain])
498 elif item["structure"] == "real":
499 if len(item["domain"]) != 2:
500 raise ValueError(f"Domain {item['domain']} not supported.")
502 (minval, maxval) = [float(i) for i in item["domain"]]
503 if item["scale"] != "log":
504 domain = list(np.linspace(minval, maxval, granularity))
505 else:
506 domain = list(np.unique(np.geomspace(minval, maxval, granularity,
507 dtype=float)))
508 # add default value
509 if float(item["default"]) not in domain:
510 domain += [float(item["default"])]
511 domain.sort()
513 # Filter duplicated in string format
514 domain = list(set([f"{i}" for i in domain]))
515 domain.sort(key=float)
516 domain = ",".join(domain)
518 domain = "{" + domain + "}"
519 line = f"{item['name']} {domain} [{item['default']}]"
520 if item["comment"] != "":
521 line += f" #{item['comment']}"
523 lines.append(line)
525 for item in self.pcs.params:
526 if item["type"] == "constraint":
527 line = f"{item['parameter']} | "
528 line += self._compile_conditions(item["conditions"])
529 if item["comment"] != "":
530 line += f" #{item['comment']}"
531 lines.append(line)
533 for item in self.pcs.params:
534 if item["type"] == "forbidden":
535 if item["clause_type"] == "simple":
536 clauses = [f"{cls['param']}={cls['value']}"
537 for cls in item["clauses"]]
538 line = "{" + ",".join(clauses) + "}"
539 if item["comment"] != "":
540 line += f"#{item['comment']}"
541 lines.append(line)
542 else:
543 print("WARNING: Advanced forbidden clauses "
544 "are not supported by ParamILS.")
545 pass
547 lines = "\n".join(lines)
548 return lines
550 def _compile_conditions(self: ParamILSParser, conditions: list[tuple]) -> str:
551 """Compile a list of conditions."""
552 line = ""
553 for operator, condition in conditions:
554 if operator is not None:
555 line += f" {operator} "
557 if isinstance(condition, list):
558 line += f"({self._compile_conditions(condition)})"
559 else:
560 if condition["type"] == "numerical":
561 line += f"{condition['parameter']} in " + "{"
562 param = self.pcs.get(condition["parameter"])
563 if param["structure"] == "categorical":
564 if condition["value"] in param["domain"]:
565 line += f"{condition['value']}" + "}"
566 # line += "{parameter} {quantifier} {value}".format(**condition)
567 if condition["type"] == "categorical":
568 items = ", ".join(condition["items"])
569 line += f"{condition['parameter']} in {{{items}}}"
570 return line
573class IRACEParser(PCSParser):
574 """Base interface object for the parser.
576 It loads the IRACE pcs files into the generic pcs object.
577 Once a parameter file is loaded, it can be exported to another file.
578 """
580 def __init__(self: IRACEParser, inherit: IRACEParser = None) -> None:
581 """Initialize the IRACEParser."""
582 if inherit is None:
583 self.pcs = PCSObject()
584 else:
585 self.pcs = inherit.pcs
587 def parse(self: IRACEParser, lines: list[str]) -> None:
588 """Parse the pcs file."""
589 # TODO implement
590 pass
592 def compile(self: IRACEParser) -> tuple[str, str]:
593 """Compile the PCS."""
594 # Create pcs table
595 header = ["# name", "switch", "type", "values",
596 "[conditions (using R syntax)]"]
597 rows = []
598 forbidden = [f for f in self.pcs.params if f["type"] == "forbidden"]
599 constraints = [c for c in self.pcs.params if c["type"] == "constraint"]
600 for param in [p for p in self.pcs.params if p["type"] == "parameter"]:
601 # IRACE writes conditions on the same line as param definitions
602 param_constraint = [c for c in constraints
603 if c["parameter"] == param["name"]]
604 condition_str = "|"
605 for constraint in param_constraint:
606 for operator, condition in constraint["conditions"]:
607 operator = operator if operator is not None else ""
608 condition_str +=\
609 (f" {operator} {condition['parameter']} %in% "
610 f"{condition['type'][0]}({','.join(condition['items'])})")
611 if condition_str == "|":
612 condition_str = ""
613 rows.append([param["name"], # Parameter name
614 f'"--{param["""name"""]} "', # Parameter argument name
615 param["structure"][0], # Parameter type
616 f"({','.join(param['domain'])})", # Parameter range/domain
617 condition_str]) # Parameter conditions
618 forbidden_rows = []
619 for f in forbidden:
620 forbidden_rows.append(" & ".join([f"({c['param']} = {c['value']})"
621 for c in f["clauses"]]))
622 return tabulate.tabulate(rows, headers=header, tablefmt="plain",
623 numalign="left"), "\n".join(forbidden_rows)