Coverage for sparkle/tools/pcsparser.py: 50%
276 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-05 14:48 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-05 14:48 +0000
1"""The Parameter Configuration Space Parser class."""
2from __future__ import annotations
3import re
4import sys
5import numpy as np
6from enum import Enum
7from abc import ABC
8from pathlib import Path
10import tabulate
13class PCSObject(ABC):
14 """General data structure to keep the pcs file in.
16 Fields are added by functions, such that checks can be conducted.
17 """
18 def __init__(self: PCSObject) -> None:
19 """Initialize the PCSObject."""
20 self.params = []
22 def add_param(self: PCSObject,
23 name: str,
24 structure: str = "integer",
25 domain: list = [-sys.maxsize, sys.maxsize],
26 scale: str = "linear",
27 default: str = "0",
28 comment: str = None) -> None:
29 """Add a parameter to the PCSObject."""
30 if structure not in ["integer", "real", "categorical", "ordinal"]:
31 raise ValueError(f"Parameter structure {structure} not supported.")
33 # Domain check
34 if structure in ["integer", "real"]:
35 if len(domain) != 2:
36 raise ValueError(f"Parameter domain {domain} not supported.")
37 pass
38 elif structure == "categorical":
39 # TODO: check categories
40 scale = None
42 self.params.append({
43 "name": name,
44 "structure": structure,
45 "domain": domain,
46 "scale": scale,
47 "default": default,
48 "comment": comment,
49 "type": "parameter",
50 })
52 def add_constraint(self: PCSObject, **kwargs: any) -> None:
53 """Add a constraint to the PCSObject."""
54 # TODO add checks
55 self.params.append({**kwargs, "type": "constraint"})
57 def add_forbidden(self: PCSObject, **kwargs: any) -> None:
58 """Add a forbidden clause to the PCSObject."""
59 # TODO add checks
60 self.params.append({**kwargs, "type": "forbidden"})
62 def add_comment(self: PCSObject, **kwargs: any) -> None:
63 """Add a comment to the PCSObject."""
64 # TODO add checks
65 self.params.append({**kwargs, "type": "comment"})
67 def clear(self: PCSObject) -> None:
68 """Clear the PCSObject."""
69 self.params = []
71 def get(self: PCSObject, name: str) -> dict:
72 """Get a parameter from the PCSObject based on the name."""
73 names = {p["name"]: i for i, p in enumerate(self.params) if "name" in p}
74 if name in names:
75 return self.params[names[name]]
76 return None
79class PCSConvention(Enum):
80 """Internal pcs convention enum."""
81 unknown = ""
82 SMAC = "smac"
83 ParamILS = "paramils"
84 IRACE = "irace"
87class PCSParser(ABC):
88 """Base interface object for the parser.
90 It loads the pcs files into the generic pcs object. Once a parameter file is loaded,
91 it can be exported to another file
92 """
94 def __init__(self: PCSParser, inherit: PCSParser = None) -> None:
95 """Initialize the PCSParser."""
96 if inherit is None:
97 self.pcs = PCSObject()
98 else:
99 self.pcs = inherit.pcs
101 @staticmethod
102 def _format_string_to_enum(string: str) -> PCSConvention:
103 """Convert string to PCSConvention."""
104 for form in PCSConvention:
105 if form.value == string.lower():
106 return form
107 raise Exception("ERROR: parameter configuration space format is not supported.")
109 def check_validity(self: PCSParser) -> bool:
110 """Check the validity of the pcs."""
111 # TODO implement
113 # check if for all parameters in constraints and forbidden clauses exists
114 # Check for conflict between default values and constraints and forbidden clauses
115 return True
117 def load(self: PCSParser, filepath: Path, convention: str = "smac") -> None:
118 """Main import function."""
119 if isinstance(filepath, str):
120 filepath = Path(filepath)
121 convention = self._format_string_to_enum(convention)
123 # TODO check if file actually exists
124 lines = filepath.open().readlines()
125 if convention == PCSConvention.SMAC:
126 parser = SMACParser(self)
127 parser.parse(lines)
128 self.pcs = parser.pcs
129 else:
130 raise Exception(f"ERROR: Importing the pcs convention for {convention.value}"
131 " is not yet implemented.")
133 def export(self: PCSParser,
134 destination: Path,
135 convention: str = "smac") -> None:
136 """Main export function."""
137 convention = self._format_string_to_enum(convention)
138 if convention == PCSConvention.ParamILS:
139 pcs = ParamILSParser(self).compile()
140 elif convention == PCSConvention.IRACE:
141 pcs, forbidden = IRACEParser(self).compile()
142 forbidden_file_name = destination.stem + "_forbidden.txt"
143 (destination.parent / forbidden_file_name).open("w").write(forbidden)
144 else:
145 raise Exception(f"ERROR: Exporting the pcs convention for {convention.value}"
146 " is not yet implemented.")
147 destination.open("w").write("### Parameter file generated by Sparkle\n"
148 f"{pcs}\n")
151class SMACParser(PCSParser):
152 """The SMAC parser class."""
154 def parse(self: SMACParser, lines: list[str]) -> None:
155 """Parse the pcs file."""
156 self.pcs.clear()
158 # PARAMS
159 for line in lines:
160 # The only forbidden characters in parameter names are:
161 # spaces, commas, quotes, and parentheses
162 regex = (r"(?P<name>[^\s\"',]*)\s+(?P<structure>\w*)\s+(?P<domain>(\[|\{)"
163 r".*(\]|\}))\s*\[(?P<default>.*)\]\s*(?P<scale>log)"
164 r"*\s*#*(?P<comment>.*)")
165 m = re.match(regex, line)
166 if m is not None:
167 fields = m.groupdict()
168 fields["domain"] = re.sub(r"(?:\[|\]|\{|\})", "", fields["domain"])
169 fields["domain"] = re.split(r"\s*,\s*", fields["domain"])
170 self.pcs.add_param(**fields)
171 continue
173 # CONSTRAINTS
174 regex = (r"(?P<parameter>[^\s\"',]+)\s*\|\s"
175 r"*(?P<conditions>.+)\s*#*(?P<comment>.*)")
176 m = re.match(regex, line)
177 if m is not None:
178 constraint = m.groupdict()
179 constraint["conditions"] = self._parse_conditions(
180 constraint["conditions"])
181 self.pcs.add_constraint(**constraint)
182 continue
184 # FORBIDDEN CLAUSES
185 regex = r"\s*\{(?P<clauses>[^\}]+)\}\s*#*(?P<comment>.*)"
186 m = re.match(regex, line)
187 if m is not None:
188 forbidden = m.groupdict()
189 conditions = []
190 # Simple clauses
191 # {<parameter name 1>=<value 1>, ..., <parameter name N>=<value N>}
192 if "," in forbidden["clauses"]:
193 forbidden["clause_type"] = "simple"
194 for clause in re.split(r"\s*,\s*", forbidden["clauses"]):
195 m = re.match(r"(?P<param>[^\s\"',=]+)\s*=\s*"
196 r"(?P<value>[^\s\"',]+)", clause)
197 if m is not None:
198 conditions.append(m.groupdict())
199 else:
200 print(clause, "ERROR")
202 else: # Advanced clauses
203 forbidden["clause_type"] = "advanced"
204 # TODO decide if we need to further parse this down
205 conditions = [expr for expr in re.split(r"\s*(?:\|\||&&)\s*",
206 forbidden["clauses"])]
208 if len(conditions) == 0:
209 raise Exception(f"ERROR: cannot parse the following line:\n'{line}'")
211 forbidden["clauses"] = conditions
213 self.pcs.add_forbidden(**forbidden)
214 continue
216 # COMMENTLINE
217 regex = r"\s*#(?P<comment>.*)"
218 m = re.match(regex, line)
219 if m is not None:
220 comment = m.groupdict()
221 self.pcs.add_comment(**comment)
222 continue
224 # EMTPY LINE
225 regex = r"^\s*$"
226 m = re.match(regex, line)
227 if m is not None:
228 continue
230 # RAISE ERROR
231 raise Exception(f"ERROR: cannot parse the following line: \n'{line}'")
233 return
235 def _parse_conditions(self: SMACParser, conditions: str) -> list[tuple]:
236 """Parse the conditions."""
237 conditionlist = []
238 condition = None
239 operator = None
240 nested = 0
241 nested_start = 0
242 condition_start = 0
243 for pos, char in enumerate(conditions):
244 # Nested clauses
245 if char == "(":
246 if nested == 0:
247 nested_start = pos
248 nested += 1
249 elif char == ")":
250 nested -= 1
251 if nested == 0:
252 condition = self._parse_conditions(conditions[nested_start + 1:pos])
253 conditionlist.append((operator, condition))
254 if (pos + 1) == len(conditions):
255 return conditionlist
257 if pos > 1 and nested == 0:
258 for op in ["||", "&&"]:
259 if conditions[pos - 1: pos + 1] == op:
260 if not isinstance(condition, list):
261 condition = self._parse_condition(
262 conditions[condition_start:pos - 1])
263 conditionlist.append((operator, condition))
265 operator = op
266 condition_start = pos + 1
268 condition = self._parse_condition(conditions[condition_start:len(conditions)])
269 conditionlist.append((operator, condition))
271 return conditionlist
273 @staticmethod
274 def _parse_condition(condition: str) -> dict:
275 """Parse the condition."""
276 cont = False
278 m = re.match(r"\s*(?P<parameter>[^\s\"',]+)\s*(?P<quantifier>==|!=|<|>|<=|>=)"
279 r"\s*(?P<value>[^\s\"',]+)\s*", condition)
280 if m is not None:
281 condition = {
282 **m.groupdict(),
283 "type": "numerical",
284 }
285 cont = True
287 if not cont:
288 m = re.match(r"\s*(?P<parameter>[^\s\"',]+)\s+"
289 r"in\s*\{(?P<items>[^\}]+)\}\s*", condition)
290 if m is not None:
291 condition = {
292 **m.groupdict(),
293 "type": "categorical",
294 }
295 condition["items"] = re.split(r",\s*", condition["items"])
296 cont = True
298 if not cont:
299 raise Exception(f"ERROR: Couldn't parse '{condition}'")
301 return condition
303 def compile(self: SMACParser) -> str:
304 """Compile the PCS."""
305 # TODO implement
306 pass
309class ParamILSParser(PCSParser):
310 """PCS parser for ParamILS format."""
312 def parse(self: ParamILSParser, lines: list[str]) -> None:
313 """Parse the PCS."""
314 # TODO implement
315 pass
317 def compile(self: ParamILSParser) -> str:
318 """Compile the PCS."""
319 # TODO Produce warning if certain specifications cannot be kept in this format
320 # TODO granularity parameter that sets how log and real ranges should be expanded
321 granularity = 20
323 lines = []
324 for item in self.pcs.params:
325 if item["type"] == "parameter":
326 if item["structure"] in ["ordinal", "categorical"]:
327 domain = ",".join(item["domain"])
328 elif item["structure"] == "integer":
329 if len(item["domain"]) != 2:
330 raise ValueError(f"Domain {item['domain']} not supported.")
332 (minval, maxval) = [int(i) for i in item["domain"]]
333 if item["scale"] != "log":
334 # domain = f"{minval}, {(minval + 1)}..{maxval}"
335 domain = list(np.linspace(minval, maxval, granularity))
336 domain = list(set(np.round(domain).astype(int))) # Cast to int
337 if int(item["default"]) not in domain:
338 domain += [int(item["default"])]
339 domain.sort()
341 domain = ",".join([str(i) for i in domain])
342 else:
343 domain = list(np.unique(np.geomspace(minval, maxval, granularity,
344 dtype=int)))
345 # add default value
346 if int(item["default"]) not in domain:
347 domain += [int(item["default"])]
348 domain.sort()
350 domain = ",".join([str(i) for i in domain])
352 elif item["structure"] == "real":
353 if len(item["domain"]) != 2:
354 raise ValueError(f"Domain {item['domain']} not supported.")
356 (minval, maxval) = [float(i) for i in item["domain"]]
357 if item["scale"] != "log":
358 domain = list(np.linspace(minval, maxval, granularity))
359 else:
360 domain = list(np.unique(np.geomspace(minval, maxval, granularity,
361 dtype=float)))
362 # add default value
363 if float(item["default"]) not in domain:
364 domain += [float(item["default"])]
365 domain.sort()
367 # Filter duplicated in string format
368 domain = list(set([f"{i}" for i in domain]))
369 domain.sort(key=float)
370 domain = ",".join(domain)
372 domain = "{" + domain + "}"
373 line = f"{item['name']} {domain} [{item['default']}]"
374 if item["comment"] != "":
375 line += f" #{item['comment']}"
377 lines.append(line)
379 for item in self.pcs.params:
380 if item["type"] == "constraint":
381 line = f"{item['parameter']} | "
382 line += self._compile_conditions(item["conditions"])
383 if item["comment"] != "":
384 line += f" #{item['comment']}"
385 lines.append(line)
387 for item in self.pcs.params:
388 if item["type"] == "forbidden":
389 if item["clause_type"] == "simple":
390 clauses = [f"{cls['param']}={cls['value']}"
391 for cls in item["clauses"]]
392 line = "{" + ",".join(clauses) + "}"
393 if item["comment"] != "":
394 line += f"#{item['comment']}"
395 lines.append(line)
396 else:
397 print("WARNING: Advanced forbidden clauses "
398 "are not supported by ParamILS.")
399 pass
401 lines = "\n".join(lines)
402 return lines
404 def _compile_conditions(self: ParamILSParser, conditions: list[tuple]) -> str:
405 """Compile a list of conditions."""
406 line = ""
407 for operator, condition in conditions:
408 if operator is not None:
409 line += f" {operator} "
411 if isinstance(condition, list):
412 line += f"({self._compile_conditions(condition)})"
413 else:
414 if condition["type"] == "numerical":
415 line += f"{condition['parameter']} in " + "{"
416 param = self.pcs.get(condition["parameter"])
417 if param["structure"] == "categorical":
418 if condition["value"] in param["domain"]:
419 line += f"{condition['value']}" + "}"
420 # line += "{parameter} {quantifier} {value}".format(**condition)
421 if condition["type"] == "categorical":
422 items = ", ".join(condition["items"])
423 line += f"{condition['parameter']} in {{{items}}}"
424 return line
427class IRACEParser(PCSParser):
428 """Base interface object for the parser.
430 It loads the IRACE pcs files into the generic pcs object.
431 Once a parameter file is loaded, it can be exported to another file.
432 """
434 def __init__(self: IRACEParser, inherit: IRACEParser = None) -> None:
435 """Initialize the IRACEParser."""
436 if inherit is None:
437 self.pcs = PCSObject()
438 else:
439 self.pcs = inherit.pcs
441 def parse(self: IRACEParser, lines: list[str]) -> None:
442 """Parse the pcs file."""
443 # TODO implement
444 pass
446 def compile(self: IRACEParser) -> tuple[str, str]:
447 """Compile the PCS."""
448 # Create pcs table
449 header = ["# name", "switch", "type", "values",
450 "[conditions (using R syntax)]"]
451 rows = []
452 forbidden = [f for f in self.pcs.params if f["type"] == "forbidden"]
453 constraints = [c for c in self.pcs.params if c["type"] == "constraint"]
454 for param in [p for p in self.pcs.params if p["type"] == "parameter"]:
455 # IRACE writes conditions on the same line as param definitions
456 param_constraint = [c for c in constraints
457 if c["parameter"] == param["name"]]
458 condition_str = "|"
459 for constraint in param_constraint:
460 for operator, condition in constraint["conditions"]:
461 operator = operator if operator is not None else ""
462 condition_str +=\
463 (f" {operator} {condition['parameter']} %in% "
464 f"{condition['type'][0]}({','.join(condition['items'])})")
465 if condition_str == "|":
466 condition_str = ""
467 rows.append([param["name"], # Parameter name
468 f'"--{param["""name"""]} "', # Parameter argument name
469 param["structure"][0], # Parameter type
470 f"({','.join(param['domain'])})", # Parameter range/domain
471 condition_str]) # Parameter conditions
472 forbidden_rows = []
473 for f in forbidden:
474 forbidden_rows.append(" & ".join([f"({c['param']} = {c['value']})"
475 for c in f["clauses"]]))
476 return tabulate.tabulate(rows, headers=header, tablefmt="plain",
477 numalign="left"), "\n".join(forbidden_rows)