Coverage for sparkle/tools/pcsparser.py: 15%

244 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-09-27 09:10 +0000

1"""The Parameter Configuration Space Parser class.""" 

2from __future__ import annotations 

3import re 

4import sys 

5import numpy as np 

6from enum import Enum 

7from abc import ABC 

8from pathlib import Path 

9 

10 

11class PCSObject(ABC): 

12 """General data structure to keep the pcs file in. 

13 

14 Fields are added by functions, such that checks can be conducted. 

15 """ 

16 def __init__(self: PCSObject) -> None: 

17 """Initialize the PCSObject.""" 

18 self.params = [] 

19 

20 def add_param(self: PCSObject, 

21 name: str, 

22 structure: str = "integer", 

23 domain: list = [-sys.maxsize, sys.maxsize], 

24 scale: str = "linear", 

25 default: str = "0", 

26 comment: str = None) -> None: 

27 """Add a parameter to the PCSObject.""" 

28 if structure not in ["integer", "real", "categorical", "ordinal"]: 

29 raise ValueError(f"Parameter structure {structure} not supported.") 

30 

31 # Domain check 

32 if structure in ["integer", "real"]: 

33 if len(domain) != 2: 

34 raise ValueError(f"Parameter domain {domain} not supported.") 

35 pass 

36 elif structure == "categorical": 

37 # TODO: check categories 

38 scale = None 

39 

40 self.params.append({ 

41 "name": name, 

42 "structure": structure, 

43 "domain": domain, 

44 "scale": scale, 

45 "default": default, 

46 "comment": comment, 

47 "type": "parameter", 

48 }) 

49 

50 def add_constraint(self: PCSObject, **kwargs: any) -> None: 

51 """Add a constraint to the PCSObject.""" 

52 # TODO add checks 

53 self.params.append({**kwargs, "type": "constraint"}) 

54 

55 def add_forbidden(self: PCSObject, **kwargs: any) -> None: 

56 """Add a forbidden clause to the PCSObject.""" 

57 # TODO add checks 

58 self.params.append({**kwargs, "type": "forbidden"}) 

59 

60 def add_comment(self: PCSObject, **kwargs: any) -> None: 

61 """Add a comment to the PCSObject.""" 

62 # TODO add checks 

63 self.params.append({**kwargs, "type": "comment"}) 

64 

65 def clear(self: PCSObject) -> None: 

66 """Clear the PCSObject.""" 

67 self.params = [] 

68 

69 def get(self: PCSObject, name: str) -> dict: 

70 """Get a parameter from the PCSObject based on the name.""" 

71 names = {p["name"]: i for i, p in enumerate(self.params) if "name" in p} 

72 if name in names: 

73 return self.params[names[name]] 

74 return None 

75 

76 

77class PCSConvention(Enum): 

78 """Internal pcs convention enum.""" 

79 unknown = "" 

80 SMAC = "smac" 

81 ParamILS = "paramils" 

82 

83 

84class PCSParser(ABC): 

85 """Base interface object for the parser. 

86 

87 It loads the pcs files into the generic pcs object. Once a parameter file is loaded, 

88 it can be exported to another file 

89 """ 

90 

91 def __init__(self: PCSParser, inherit: PCSParser = None) -> None: 

92 """Initialize the PCSParser.""" 

93 if inherit is None: 

94 self.pcs = PCSObject() 

95 else: 

96 self.pcs = inherit.pcs 

97 

98 @staticmethod 

99 def _format_string_to_enum(string: str) -> PCSConvention: 

100 """Convert string to PCSConvention.""" 

101 for form in PCSConvention: 

102 if form.value == string: 

103 return form 

104 raise Exception("ERROR: parameter configuration space format is not supported.") 

105 

106 def check_validity(self: PCSParser) -> bool: 

107 """Check the validity of the pcs.""" 

108 # TODO implement 

109 

110 # check if for all parameters in constraints and forbidden clauses exists 

111 # Check for conflict between default values and constraints and forbidden clauses 

112 return True 

113 

114 def load(self: PCSParser, filepath: Path, convention: str = "smac") -> None: 

115 """Main import function.""" 

116 if isinstance(filepath, str): 

117 filepath = Path(filepath) 

118 convention = self._format_string_to_enum(convention) 

119 

120 # TODO check if file actually exists 

121 lines = filepath.open().readlines() 

122 if convention == PCSConvention.SMAC: 

123 parser = SMACParser(self) 

124 parser.parse(lines) 

125 self.pcs = parser.pcs 

126 else: 

127 raise Exception(f"ERROR: Importing the pcs convention for {convention.value}" 

128 " is not yet implemented.") 

129 

130 def export(self: PCSParser, 

131 convention: str = "smac", 

132 destination: Path = None) -> None: 

133 """Main export function.""" 

134 convention = self._format_string_to_enum(convention) 

135 if convention == PCSConvention.ParamILS: 

136 pcs = ParamILSParser(self).compile() 

137 else: 

138 raise Exception(f"ERROR: Exporting the pcs convention for {convention.value}" 

139 " is not yet implemented.") 

140 destination.open("w").write(pcs) 

141 

142 

143class SMACParser(PCSParser): 

144 """The SMAC parser class.""" 

145 

146 def parse(self: SMACParser, lines: list[str]) -> None: 

147 """Parse the pcs file.""" 

148 self.pcs.clear() 

149 

150 # PARAMS 

151 for line in lines: 

152 # The only forbidden characters in parameter names are: 

153 # spaces, commas, quotes, and parentheses 

154 regex = (r"(?P<name>[^\s\"',]*)\s+(?P<structure>\w*)\s+(?P<domain>(\[|\{)" 

155 r".*(\]|\}))\s*\[(?P<default>.*)\]\s*(?P<scale>log)" 

156 r"*\s*#*(?P<comment>.*)") 

157 m = re.match(regex, line) 

158 if m is not None: 

159 fields = m.groupdict() 

160 fields["domain"] = re.sub(r"(?:\[|\]|\{|\})", "", fields["domain"]) 

161 fields["domain"] = re.split(r"\s*,\s*", fields["domain"]) 

162 self.pcs.add_param(**fields) 

163 continue 

164 

165 # CONSTRAINTS 

166 regex = (r"(?P<parameter>[^\s\"',]+)\s*\|\s" 

167 r"*(?P<conditions>.+)\s*#*(?P<comment>.*)") 

168 m = re.match(regex, line) 

169 if m is not None: 

170 constraint = m.groupdict() 

171 constraint["conditions"] = self._parse_conditions( 

172 constraint["conditions"]) 

173 self.pcs.add_constraint(**constraint) 

174 continue 

175 

176 # FORBIDDEN CLAUSES 

177 regex = r"\s*\{(?P<clauses>[^\}]+)\}\s*#*(?P<comment>.*)" 

178 m = re.match(regex, line) 

179 if m is not None: 

180 forbidden = m.groupdict() 

181 conditions = [] 

182 # Simple clauses 

183 # {<parameter name 1>=<value 1>, ..., <parameter name N>=<value N>} 

184 if "," in forbidden["clauses"]: 

185 forbidden["clause_type"] = "simple" 

186 for clause in re.split(r"\s*,\s*", forbidden["clauses"]): 

187 m = re.match(r"(?P<param>[^\s\"',=]+)\s*=\s*" 

188 r"(?P<value>[^\s\"',]+)", clause) 

189 if m is not None: 

190 conditions.append(m.groupdict()) 

191 else: 

192 print(clause, "ERROR") 

193 

194 else: # Advanced clauses 

195 forbidden["clause_type"] = "advanced" 

196 # TODO decide if we need to further parse this down 

197 conditions = [expr for expr in re.split(r"\s*(?:\|\||&&)\s*", 

198 forbidden["clauses"])] 

199 

200 if len(conditions) == 0: 

201 raise Exception(f"ERROR: cannot parse the following line:\n'{line}'") 

202 

203 forbidden["clauses"] = conditions 

204 

205 self.pcs.add_forbidden(**forbidden) 

206 continue 

207 

208 # COMMENTLINE 

209 regex = r"\s*#(?P<comment>.*)" 

210 m = re.match(regex, line) 

211 if m is not None: 

212 comment = m.groupdict() 

213 self.pcs.add_comment(**comment) 

214 continue 

215 

216 # EMTPY LINE 

217 regex = r"^\s*$" 

218 m = re.match(regex, line) 

219 if m is not None: 

220 continue 

221 

222 # RAISE ERROR 

223 raise Exception(f"ERROR: cannot parse the following line: \n'{line}'") 

224 

225 return 

226 

227 def _parse_conditions(self: SMACParser, conditions: str) -> list[tuple]: 

228 """Parse the conditions.""" 

229 conditionlist = [] 

230 condition = None 

231 operator = None 

232 nested = 0 

233 nested_start = 0 

234 condition_start = 0 

235 for pos, char in enumerate(conditions): 

236 # Nested clauses 

237 if char == "(": 

238 if nested == 0: 

239 nested_start = pos 

240 nested += 1 

241 elif char == ")": 

242 nested -= 1 

243 if nested == 0: 

244 condition = self._parse_conditions(conditions[nested_start + 1:pos]) 

245 conditionlist.append((operator, condition)) 

246 if (pos + 1) == len(conditions): 

247 return conditionlist 

248 

249 if pos > 1 and nested == 0: 

250 for op in ["||", "&&"]: 

251 if conditions[pos - 1: pos + 1] == op: 

252 if not isinstance(condition, list): 

253 condition = self._parse_condition( 

254 conditions[condition_start:pos - 1]) 

255 conditionlist.append((operator, condition)) 

256 

257 operator = op 

258 condition_start = pos + 1 

259 

260 condition = self._parse_condition(conditions[condition_start:len(conditions)]) 

261 conditionlist.append((operator, condition)) 

262 

263 return conditionlist 

264 

265 @staticmethod 

266 def _parse_condition(condition: str) -> dict: 

267 """Parse the condition.""" 

268 cont = False 

269 

270 m = re.match(r"\s*(?P<parameter>[^\s\"',]+)\s*(?P<quantifier>==|!=|<|>|<=|>=)" 

271 r"\s*(?P<value>[^\s\"',]+)\s*", condition) 

272 if m is not None: 

273 condition = { 

274 **m.groupdict(), 

275 "type": "numerical", 

276 } 

277 cont = True 

278 

279 if not cont: 

280 m = re.match(r"\s*(?P<parameter>[^\s\"',]+)\s+" 

281 r"in\s*\{(?P<items>[^\}]+)\}\s*", condition) 

282 if m is not None: 

283 condition = { 

284 **m.groupdict(), 

285 "type": "categorical", 

286 } 

287 condition["items"] = re.split(r",\s*", condition["items"]) 

288 cont = True 

289 

290 if not cont: 

291 raise Exception(f"ERROR: Couldn't parse '{condition}'") 

292 

293 return condition 

294 

295 def compile(self: SMACParser) -> str: 

296 """Compile the PCS.""" 

297 # TODO implement 

298 pass 

299 

300 

301class ParamILSParser(PCSParser): 

302 """PCS parser for ParamILS format.""" 

303 

304 def parse(self: ParamILSParser, lines: list[str]) -> None: 

305 """Parse the PCS.""" 

306 # TODO implement 

307 pass 

308 

309 def compile(self: ParamILSParser) -> str: 

310 """Compile the PCS.""" 

311 # TODO Produce warning if certain specifications cannot be kept in this format 

312 # TODO granularity parameter that sets how log and real ranges should be expanded 

313 granularity = 20 

314 

315 lines = [] 

316 for item in self.pcs.params: 

317 if item["type"] == "parameter": 

318 if item["structure"] in ["ordinal", "categorical"]: 

319 domain = ",".join(item["domain"]) 

320 elif item["structure"] == "integer": 

321 if len(item["domain"]) != 2: 

322 raise ValueError(f"Domain {item['domain']} not supported.") 

323 

324 (minval, maxval) = [int(i) for i in item["domain"]] 

325 if item["scale"] != "log": 

326 # domain = f"{minval}, {(minval + 1)}..{maxval}" 

327 domain = list(np.linspace(minval, maxval, granularity)) 

328 domain = list(set(np.round(domain).astype(int))) # Cast to int 

329 if int(item["default"]) not in domain: 

330 domain += [int(item["default"])] 

331 domain.sort() 

332 

333 domain = ",".join([str(i) for i in domain]) 

334 else: 

335 domain = list(np.unique(np.geomspace(minval, maxval, granularity, 

336 dtype=int))) 

337 # add default value 

338 if int(item["default"]) not in domain: 

339 domain += [int(item["default"])] 

340 domain.sort() 

341 

342 domain = ",".join([str(i) for i in domain]) 

343 

344 elif item["structure"] == "real": 

345 if len(item["domain"]) != 2: 

346 raise ValueError(f"Domain {item['domain']} not supported.") 

347 

348 (minval, maxval) = [float(i) for i in item["domain"]] 

349 if item["scale"] != "log": 

350 domain = list(np.linspace(minval, maxval, granularity)) 

351 else: 

352 domain = list(np.unique(np.geomspace(minval, maxval, granularity, 

353 dtype=float))) 

354 # add default value 

355 if float(item["default"]) not in domain: 

356 domain += [float(item["default"])] 

357 domain.sort() 

358 

359 # Filter duplicated in string format 

360 domain = list(set([f"{i}" for i in domain])) 

361 domain.sort(key=float) 

362 domain = ",".join(domain) 

363 

364 domain = "{" + domain + "}" 

365 line = f"{item['name']} {domain} [{item['default']}]" 

366 if item["comment"] != "": 

367 line += f" #{item['comment']}" 

368 

369 lines.append(line) 

370 

371 for item in self.pcs.params: 

372 if item["type"] == "constraint": 

373 line = f"{item['parameter']} | " 

374 line += self._compile_conditions(item["conditions"]) 

375 if item["comment"] != "": 

376 line += f" #{item['comment']}" 

377 lines.append(line) 

378 

379 for item in self.pcs.params: 

380 if item["type"] == "forbidden": 

381 if item["clause_type"] == "simple": 

382 clauses = [f"{cls['param']}={cls['value']}" 

383 for cls in item["clauses"]] 

384 line = "{" + ",".join(clauses) + "}" 

385 if item["comment"] != "": 

386 line += f"#{item['comment']}" 

387 lines.append(line) 

388 else: 

389 print("WARNING: Advanced forbidden clauses " 

390 "are not supported by ParamILS.") 

391 pass 

392 

393 lines = "\n".join(lines) 

394 return lines 

395 

396 def _compile_conditions(self: ParamILSParser, conditions: list[tuple]) -> str: 

397 """Compile a list of conditions.""" 

398 line = "" 

399 for operator, condition in conditions: 

400 if operator is not None: 

401 line += f" {operator} " 

402 

403 if isinstance(condition, list): 

404 line += f"({self._compile_conditions(condition)})" 

405 else: 

406 if condition["type"] == "numerical": 

407 line += f"{condition['parameter']} in " + "{" 

408 param = self.pcs.get(condition["parameter"]) 

409 if param["structure"] == "categorical": 

410 if condition["value"] in param["domain"]: 

411 line += f"{condition['value']}" + "}" 

412 # line += "{parameter} {quantifier} {value}".format(**condition) 

413 if condition["type"] == "categorical": 

414 items = ", ".join(condition["items"]) 

415 line += f"{condition['parameter']} in {{{items}}}" 

416 return line