Coverage for sparkle/tools/pcsparser.py: 15%
244 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-09-27 09:10 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-09-27 09:10 +0000
1"""The Parameter Configuration Space Parser class."""
2from __future__ import annotations
3import re
4import sys
5import numpy as np
6from enum import Enum
7from abc import ABC
8from pathlib import Path
11class PCSObject(ABC):
12 """General data structure to keep the pcs file in.
14 Fields are added by functions, such that checks can be conducted.
15 """
16 def __init__(self: PCSObject) -> None:
17 """Initialize the PCSObject."""
18 self.params = []
20 def add_param(self: PCSObject,
21 name: str,
22 structure: str = "integer",
23 domain: list = [-sys.maxsize, sys.maxsize],
24 scale: str = "linear",
25 default: str = "0",
26 comment: str = None) -> None:
27 """Add a parameter to the PCSObject."""
28 if structure not in ["integer", "real", "categorical", "ordinal"]:
29 raise ValueError(f"Parameter structure {structure} not supported.")
31 # Domain check
32 if structure in ["integer", "real"]:
33 if len(domain) != 2:
34 raise ValueError(f"Parameter domain {domain} not supported.")
35 pass
36 elif structure == "categorical":
37 # TODO: check categories
38 scale = None
40 self.params.append({
41 "name": name,
42 "structure": structure,
43 "domain": domain,
44 "scale": scale,
45 "default": default,
46 "comment": comment,
47 "type": "parameter",
48 })
50 def add_constraint(self: PCSObject, **kwargs: any) -> None:
51 """Add a constraint to the PCSObject."""
52 # TODO add checks
53 self.params.append({**kwargs, "type": "constraint"})
55 def add_forbidden(self: PCSObject, **kwargs: any) -> None:
56 """Add a forbidden clause to the PCSObject."""
57 # TODO add checks
58 self.params.append({**kwargs, "type": "forbidden"})
60 def add_comment(self: PCSObject, **kwargs: any) -> None:
61 """Add a comment to the PCSObject."""
62 # TODO add checks
63 self.params.append({**kwargs, "type": "comment"})
65 def clear(self: PCSObject) -> None:
66 """Clear the PCSObject."""
67 self.params = []
69 def get(self: PCSObject, name: str) -> dict:
70 """Get a parameter from the PCSObject based on the name."""
71 names = {p["name"]: i for i, p in enumerate(self.params) if "name" in p}
72 if name in names:
73 return self.params[names[name]]
74 return None
77class PCSConvention(Enum):
78 """Internal pcs convention enum."""
79 unknown = ""
80 SMAC = "smac"
81 ParamILS = "paramils"
84class PCSParser(ABC):
85 """Base interface object for the parser.
87 It loads the pcs files into the generic pcs object. Once a parameter file is loaded,
88 it can be exported to another file
89 """
91 def __init__(self: PCSParser, inherit: PCSParser = None) -> None:
92 """Initialize the PCSParser."""
93 if inherit is None:
94 self.pcs = PCSObject()
95 else:
96 self.pcs = inherit.pcs
98 @staticmethod
99 def _format_string_to_enum(string: str) -> PCSConvention:
100 """Convert string to PCSConvention."""
101 for form in PCSConvention:
102 if form.value == string:
103 return form
104 raise Exception("ERROR: parameter configuration space format is not supported.")
106 def check_validity(self: PCSParser) -> bool:
107 """Check the validity of the pcs."""
108 # TODO implement
110 # check if for all parameters in constraints and forbidden clauses exists
111 # Check for conflict between default values and constraints and forbidden clauses
112 return True
114 def load(self: PCSParser, filepath: Path, convention: str = "smac") -> None:
115 """Main import function."""
116 if isinstance(filepath, str):
117 filepath = Path(filepath)
118 convention = self._format_string_to_enum(convention)
120 # TODO check if file actually exists
121 lines = filepath.open().readlines()
122 if convention == PCSConvention.SMAC:
123 parser = SMACParser(self)
124 parser.parse(lines)
125 self.pcs = parser.pcs
126 else:
127 raise Exception(f"ERROR: Importing the pcs convention for {convention.value}"
128 " is not yet implemented.")
130 def export(self: PCSParser,
131 convention: str = "smac",
132 destination: Path = None) -> None:
133 """Main export function."""
134 convention = self._format_string_to_enum(convention)
135 if convention == PCSConvention.ParamILS:
136 pcs = ParamILSParser(self).compile()
137 else:
138 raise Exception(f"ERROR: Exporting the pcs convention for {convention.value}"
139 " is not yet implemented.")
140 destination.open("w").write(pcs)
143class SMACParser(PCSParser):
144 """The SMAC parser class."""
146 def parse(self: SMACParser, lines: list[str]) -> None:
147 """Parse the pcs file."""
148 self.pcs.clear()
150 # PARAMS
151 for line in lines:
152 # The only forbidden characters in parameter names are:
153 # spaces, commas, quotes, and parentheses
154 regex = (r"(?P<name>[^\s\"',]*)\s+(?P<structure>\w*)\s+(?P<domain>(\[|\{)"
155 r".*(\]|\}))\s*\[(?P<default>.*)\]\s*(?P<scale>log)"
156 r"*\s*#*(?P<comment>.*)")
157 m = re.match(regex, line)
158 if m is not None:
159 fields = m.groupdict()
160 fields["domain"] = re.sub(r"(?:\[|\]|\{|\})", "", fields["domain"])
161 fields["domain"] = re.split(r"\s*,\s*", fields["domain"])
162 self.pcs.add_param(**fields)
163 continue
165 # CONSTRAINTS
166 regex = (r"(?P<parameter>[^\s\"',]+)\s*\|\s"
167 r"*(?P<conditions>.+)\s*#*(?P<comment>.*)")
168 m = re.match(regex, line)
169 if m is not None:
170 constraint = m.groupdict()
171 constraint["conditions"] = self._parse_conditions(
172 constraint["conditions"])
173 self.pcs.add_constraint(**constraint)
174 continue
176 # FORBIDDEN CLAUSES
177 regex = r"\s*\{(?P<clauses>[^\}]+)\}\s*#*(?P<comment>.*)"
178 m = re.match(regex, line)
179 if m is not None:
180 forbidden = m.groupdict()
181 conditions = []
182 # Simple clauses
183 # {<parameter name 1>=<value 1>, ..., <parameter name N>=<value N>}
184 if "," in forbidden["clauses"]:
185 forbidden["clause_type"] = "simple"
186 for clause in re.split(r"\s*,\s*", forbidden["clauses"]):
187 m = re.match(r"(?P<param>[^\s\"',=]+)\s*=\s*"
188 r"(?P<value>[^\s\"',]+)", clause)
189 if m is not None:
190 conditions.append(m.groupdict())
191 else:
192 print(clause, "ERROR")
194 else: # Advanced clauses
195 forbidden["clause_type"] = "advanced"
196 # TODO decide if we need to further parse this down
197 conditions = [expr for expr in re.split(r"\s*(?:\|\||&&)\s*",
198 forbidden["clauses"])]
200 if len(conditions) == 0:
201 raise Exception(f"ERROR: cannot parse the following line:\n'{line}'")
203 forbidden["clauses"] = conditions
205 self.pcs.add_forbidden(**forbidden)
206 continue
208 # COMMENTLINE
209 regex = r"\s*#(?P<comment>.*)"
210 m = re.match(regex, line)
211 if m is not None:
212 comment = m.groupdict()
213 self.pcs.add_comment(**comment)
214 continue
216 # EMTPY LINE
217 regex = r"^\s*$"
218 m = re.match(regex, line)
219 if m is not None:
220 continue
222 # RAISE ERROR
223 raise Exception(f"ERROR: cannot parse the following line: \n'{line}'")
225 return
227 def _parse_conditions(self: SMACParser, conditions: str) -> list[tuple]:
228 """Parse the conditions."""
229 conditionlist = []
230 condition = None
231 operator = None
232 nested = 0
233 nested_start = 0
234 condition_start = 0
235 for pos, char in enumerate(conditions):
236 # Nested clauses
237 if char == "(":
238 if nested == 0:
239 nested_start = pos
240 nested += 1
241 elif char == ")":
242 nested -= 1
243 if nested == 0:
244 condition = self._parse_conditions(conditions[nested_start + 1:pos])
245 conditionlist.append((operator, condition))
246 if (pos + 1) == len(conditions):
247 return conditionlist
249 if pos > 1 and nested == 0:
250 for op in ["||", "&&"]:
251 if conditions[pos - 1: pos + 1] == op:
252 if not isinstance(condition, list):
253 condition = self._parse_condition(
254 conditions[condition_start:pos - 1])
255 conditionlist.append((operator, condition))
257 operator = op
258 condition_start = pos + 1
260 condition = self._parse_condition(conditions[condition_start:len(conditions)])
261 conditionlist.append((operator, condition))
263 return conditionlist
265 @staticmethod
266 def _parse_condition(condition: str) -> dict:
267 """Parse the condition."""
268 cont = False
270 m = re.match(r"\s*(?P<parameter>[^\s\"',]+)\s*(?P<quantifier>==|!=|<|>|<=|>=)"
271 r"\s*(?P<value>[^\s\"',]+)\s*", condition)
272 if m is not None:
273 condition = {
274 **m.groupdict(),
275 "type": "numerical",
276 }
277 cont = True
279 if not cont:
280 m = re.match(r"\s*(?P<parameter>[^\s\"',]+)\s+"
281 r"in\s*\{(?P<items>[^\}]+)\}\s*", condition)
282 if m is not None:
283 condition = {
284 **m.groupdict(),
285 "type": "categorical",
286 }
287 condition["items"] = re.split(r",\s*", condition["items"])
288 cont = True
290 if not cont:
291 raise Exception(f"ERROR: Couldn't parse '{condition}'")
293 return condition
295 def compile(self: SMACParser) -> str:
296 """Compile the PCS."""
297 # TODO implement
298 pass
301class ParamILSParser(PCSParser):
302 """PCS parser for ParamILS format."""
304 def parse(self: ParamILSParser, lines: list[str]) -> None:
305 """Parse the PCS."""
306 # TODO implement
307 pass
309 def compile(self: ParamILSParser) -> str:
310 """Compile the PCS."""
311 # TODO Produce warning if certain specifications cannot be kept in this format
312 # TODO granularity parameter that sets how log and real ranges should be expanded
313 granularity = 20
315 lines = []
316 for item in self.pcs.params:
317 if item["type"] == "parameter":
318 if item["structure"] in ["ordinal", "categorical"]:
319 domain = ",".join(item["domain"])
320 elif item["structure"] == "integer":
321 if len(item["domain"]) != 2:
322 raise ValueError(f"Domain {item['domain']} not supported.")
324 (minval, maxval) = [int(i) for i in item["domain"]]
325 if item["scale"] != "log":
326 # domain = f"{minval}, {(minval + 1)}..{maxval}"
327 domain = list(np.linspace(minval, maxval, granularity))
328 domain = list(set(np.round(domain).astype(int))) # Cast to int
329 if int(item["default"]) not in domain:
330 domain += [int(item["default"])]
331 domain.sort()
333 domain = ",".join([str(i) for i in domain])
334 else:
335 domain = list(np.unique(np.geomspace(minval, maxval, granularity,
336 dtype=int)))
337 # add default value
338 if int(item["default"]) not in domain:
339 domain += [int(item["default"])]
340 domain.sort()
342 domain = ",".join([str(i) for i in domain])
344 elif item["structure"] == "real":
345 if len(item["domain"]) != 2:
346 raise ValueError(f"Domain {item['domain']} not supported.")
348 (minval, maxval) = [float(i) for i in item["domain"]]
349 if item["scale"] != "log":
350 domain = list(np.linspace(minval, maxval, granularity))
351 else:
352 domain = list(np.unique(np.geomspace(minval, maxval, granularity,
353 dtype=float)))
354 # add default value
355 if float(item["default"]) not in domain:
356 domain += [float(item["default"])]
357 domain.sort()
359 # Filter duplicated in string format
360 domain = list(set([f"{i}" for i in domain]))
361 domain.sort(key=float)
362 domain = ",".join(domain)
364 domain = "{" + domain + "}"
365 line = f"{item['name']} {domain} [{item['default']}]"
366 if item["comment"] != "":
367 line += f" #{item['comment']}"
369 lines.append(line)
371 for item in self.pcs.params:
372 if item["type"] == "constraint":
373 line = f"{item['parameter']} | "
374 line += self._compile_conditions(item["conditions"])
375 if item["comment"] != "":
376 line += f" #{item['comment']}"
377 lines.append(line)
379 for item in self.pcs.params:
380 if item["type"] == "forbidden":
381 if item["clause_type"] == "simple":
382 clauses = [f"{cls['param']}={cls['value']}"
383 for cls in item["clauses"]]
384 line = "{" + ",".join(clauses) + "}"
385 if item["comment"] != "":
386 line += f"#{item['comment']}"
387 lines.append(line)
388 else:
389 print("WARNING: Advanced forbidden clauses "
390 "are not supported by ParamILS.")
391 pass
393 lines = "\n".join(lines)
394 return lines
396 def _compile_conditions(self: ParamILSParser, conditions: list[tuple]) -> str:
397 """Compile a list of conditions."""
398 line = ""
399 for operator, condition in conditions:
400 if operator is not None:
401 line += f" {operator} "
403 if isinstance(condition, list):
404 line += f"({self._compile_conditions(condition)})"
405 else:
406 if condition["type"] == "numerical":
407 line += f"{condition['parameter']} in " + "{"
408 param = self.pcs.get(condition["parameter"])
409 if param["structure"] == "categorical":
410 if condition["value"] in param["domain"]:
411 line += f"{condition['value']}" + "}"
412 # line += "{parameter} {quantifier} {value}".format(**condition)
413 if condition["type"] == "categorical":
414 items = ", ".join(condition["items"])
415 line += f"{condition['parameter']} in {{{items}}}"
416 return line