Coverage for sparkle/CLI/run_parallel_portfolio.py: 13%

210 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-11-05 14:48 +0000

1#!/usr/bin/env python3 

2# -*- coding: UTF-8 -*- 

3"""Sparkle command to execute a parallel algorithm portfolio.""" 

4 

5import sys 

6import argparse 

7import random 

8import time 

9import shutil 

10import csv 

11import itertools 

12from pathlib import Path, PurePath 

13 

14from tqdm import tqdm 

15 

16import runrunner as rrr 

17from runrunner.base import Runner 

18from runrunner.slurm import Status 

19 

20from sparkle.CLI.help.reporting_scenario import Scenario 

21from sparkle.CLI.help import logging as sl 

22from sparkle.CLI.help import global_variables as gv 

23from sparkle.platform import CommandName, COMMAND_DEPENDENCIES 

24from sparkle.CLI.initialise import check_for_initialise 

25from sparkle.CLI.help import argparse_custom as ac 

26from sparkle.CLI.help.nicknames import resolve_object_name 

27from sparkle.platform.settings_objects import Settings, SettingState 

28from sparkle.solver import Solver 

29from sparkle.instance import Instance_Set, InstanceSet 

30from sparkle.types import SolverStatus, resolve_objective, UseTime 

31 

32 

33def run_parallel_portfolio(instances_set: InstanceSet, 

34 portfolio_path: Path, 

35 solvers: list[Solver], 

36 run_on: Runner = Runner.SLURM) -> None: 

37 """Run the parallel algorithm portfolio. 

38 

39 Args: 

40 instances_set: Set of instances to run on. 

41 portfolio_path: Path to the parallel portfolio. 

42 solvers: List of solvers to run on the instances. 

43 run_on: Currently only supports Slurm. 

44 """ 

45 num_solvers, num_instances = len(solvers), len(instances_set._instance_paths) 

46 seeds_per_solver = gv.settings().get_parallel_portfolio_number_of_seeds_per_solver() 

47 num_jobs = num_solvers * num_instances * seeds_per_solver 

48 parallel_jobs = min(gv.settings().get_number_of_jobs_in_parallel(), num_jobs) 

49 if parallel_jobs > num_jobs: 

50 print("WARNING: Not all jobs will be started at the same time due to the " 

51 "limitation of number of Slurm jobs that can be run in parallel. Check" 

52 " your Sparkle Slurm Settings.") 

53 print(f"Sparkle parallel portfolio is running {seeds_per_solver} seed(s) per solver " 

54 f"on {num_solvers} solvers for {num_instances} instances ...") 

55 cmd_list = [] 

56 cutoff = gv.settings().get_general_target_cutoff_time() 

57 objectives = gv.settings().get_general_sparkle_objectives() 

58 # Create a command for each instance-solver-seed combination 

59 for instance, solver in itertools.product(instances_set._instance_paths, solvers): 

60 for _ in range(seeds_per_solver): 

61 seed = int(random.getrandbits(32)) 

62 solver_call_list = solver.build_cmd( 

63 instance.absolute(), 

64 objectives=objectives, 

65 seed=seed, 

66 cutoff_time=cutoff, 

67 log_dir=portfolio_path) 

68 cmd_list.append((" ".join(solver_call_list)).replace("'", '"')) 

69 # Jobs are added in to the runrunner object in the same order they are provided 

70 sbatch_options = gv.settings().get_slurm_extra_options(as_args=True) 

71 run = rrr.add_to_queue( 

72 runner=run_on, 

73 cmd=cmd_list, 

74 name=CommandName.RUN_PARALLEL_PORTFOLIO, 

75 parallel_jobs=parallel_jobs, 

76 base_dir=sl.caller_log_dir, 

77 srun_options=["-N1", "-n1"] + sbatch_options, 

78 sbatch_options=sbatch_options 

79 ) 

80 check_interval = gv.settings().get_parallel_portfolio_check_interval() 

81 instances_done = [False] * num_instances 

82 # We record the 'best' of all seed results per solver-instance, 

83 # setting start values for objectives that are always present 

84 default_objective_values = {} 

85 for o in objectives: 

86 default_value = float(sys.maxsize) if o.minimise else 0 

87 # Default values for time objectives can be linked to cutoff time 

88 if o.time: 

89 default_value = cutoff + 1.0 

90 if o.post_process is not None: 

91 default_value = o.post_process(default_value, cutoff) 

92 default_objective_values[o.name] = default_value 

93 job_output_dict = {instance_name: {solver.name: default_objective_values.copy() 

94 for solver in solvers} 

95 for instance_name in instances_set._instance_names} 

96 n_instance_jobs = num_solvers * seeds_per_solver 

97 

98 with tqdm(total=len(instances_done)) as pbar: 

99 pbar.set_description("Instances done") 

100 while not all(instances_done): 

101 prev_done = sum(instances_done) 

102 time.sleep(check_interval) 

103 job_status_list = [r.status for r in run.jobs] 

104 job_status_completed = [status == Status.COMPLETED 

105 for status in job_status_list] 

106 # The jobs are sorted by instance 

107 for i, instance in enumerate(instances_set._instance_paths): 

108 if instances_done[i]: 

109 continue 

110 instance_job_slice = slice(i * n_instance_jobs, 

111 (i + 1) * n_instance_jobs) 

112 if any(job_status_completed[instance_job_slice]): 

113 instances_done[i] = True 

114 # Kill all running jobs for this instance 

115 solver_kills = [0] * num_solvers 

116 for job_index in range(i * n_instance_jobs, 

117 (i + 1) * n_instance_jobs): 

118 if not job_status_completed[job_index]: 

119 run.jobs[job_index].kill() 

120 solver_index = int( 

121 (job_index % n_instance_jobs) / seeds_per_solver) 

122 solver_kills[solver_index] += 1 

123 for solver_index in range(num_solvers): 

124 # All seeds of a solver were killed on instance, set status kill 

125 if solver_kills[solver_index] == seeds_per_solver: 

126 solver_name = solvers[solver_index].name 

127 job_output_dict[instance.name][solver_name]["status"] =\ 

128 SolverStatus.KILLED 

129 pbar.update(sum(instances_done) - prev_done) 

130 

131 # Attempt to verify that all logs have been written (Slurm I/O latency) 

132 for index, cmd in enumerate(cmd_list): 

133 runsolver_configuration = cmd.split(" ")[:11] 

134 logs = [Path(p) for p in runsolver_configuration 

135 if Path(p).suffix in [".log", ".val", ".rawres"]] 

136 if not all([p.exists() for p in logs]): 

137 time.sleep(check_interval) 

138 

139 # Now iterate over runsolver logs to get runtime, get the lowest value per seed 

140 for index, cmd in enumerate(cmd_list): 

141 runsolver_configuration = cmd.split(" ")[:11] 

142 solver_output = Solver.parse_solver_output(run.jobs[i].stdout, 

143 runsolver_configuration) 

144 solver_index = int((index % n_instance_jobs) / seeds_per_solver) 

145 solver_name = solvers[solver_index].name 

146 instance_name = instances_set._instance_names[int(index / n_instance_jobs)] 

147 cpu_time = solver_output["cpu_time"] 

148 cmd_output = job_output_dict[instance_name][solver_name] 

149 if cpu_time > 0.0 and cpu_time < cmd_output["cpu_time"]: 

150 for key, value in solver_output.items(): 

151 if key in [o.name for o in objectives]: 

152 job_output_dict[instance_name][solver_name][key] = value 

153 if "status" not in cmd_output or cmd_output["status"] != SolverStatus.KILLED: 

154 cmd_output["status"] = solver_output["status"] 

155 

156 # Fix the CPU/WC time for non existent logs to instance min time + check_interval 

157 for instance in job_output_dict.keys(): 

158 no_log_solvers = [] 

159 min_time = cutoff 

160 for solver in job_output_dict[instance].keys(): 

161 cpu_time = job_output_dict[instance][solver]["cpu_time"] 

162 if cpu_time == -1.0 or cpu_time == float(sys.maxsize): 

163 no_log_solvers.append(solver) 

164 elif cpu_time < min_time: 

165 min_time = cpu_time 

166 for solver in no_log_solvers: 

167 job_output_dict[instance][solver]["cpu_time"] = min_time + check_interval 

168 job_output_dict[instance][solver]["wall_time"] = min_time + check_interval 

169 # Fix runtime objectives with resolved CPU/Wall times 

170 for key, value in job_output_dict[instance][solver].items(): 

171 objective = resolve_objective(key) 

172 if objective is not None and objective.time: 

173 if objective.use_time == UseTime.CPU_TIME: 

174 value = job_output_dict[instance][solver]["cpu_time"] 

175 else: 

176 value = job_output_dict[instance][solver]["wall_time"] 

177 if objective.post_process is not None: 

178 value = objective.post_process(value, cutoff) 

179 job_output_dict[instance][solver][key] = value 

180 

181 for index, instance_name in enumerate(instances_set._instance_names): 

182 index_str = f"[{index + 1}/{num_instances}] " 

183 instance_output = job_output_dict[instance_name] 

184 if all([instance_output[k]["status"] == SolverStatus.TIMEOUT 

185 for k in instance_output.keys()]): 

186 print(f"\n{index_str}{instance_name} was not solved within the cutoff-time.") 

187 continue 

188 print(f"\n{index_str}{instance_name} yielded the following Solver results:") 

189 for sindex in range(index * num_solvers, (index + 1) * num_solvers): 

190 solver_name = solvers[sindex % num_solvers].name 

191 job_info = job_output_dict[instance_name][solver_name] 

192 print(f"\t- {solver_name} ended with status {job_info['status']} in " 

193 f"{job_info['cpu_time']}s CPU-Time ({job_info['wall_time']}s WC-Time)") 

194 

195 # Write the results to a CSV 

196 csv_path = portfolio_path / "results.csv" 

197 values_header = ["status"] + [o.name for o in objectives] 

198 header = ["Instance", "Solver"] + values_header 

199 result_rows = [header] 

200 for instance_name in job_output_dict.keys(): 

201 for solver_name in job_output_dict[instance_name].keys(): 

202 job_o = job_output_dict[instance_name][solver_name] 

203 values = [instance_name, solver_name] + [ 

204 job_o[key] if key in job_o else "None" 

205 for key in values_header] 

206 result_rows.append(values) 

207 with csv_path.open("w") as out: 

208 writer = csv.writer(out) 

209 writer.writerows(result_rows) 

210 

211 

212def parser_function() -> argparse.ArgumentParser: 

213 """Define the command line arguments. 

214 

215 Returns: 

216 parser: The parser with the parsed command line arguments 

217 """ 

218 parser = argparse.ArgumentParser(description="Run a portfolio of solvers on an " 

219 "instance set in parallel.") 

220 parser.add_argument(*ac.InstancePath.names, 

221 **ac.InstancePath.kwargs) 

222 parser.add_argument(*ac.NicknamePortfolioArgument.names, 

223 **ac.NicknamePortfolioArgument.kwargs) 

224 parser.add_argument(*ac.SolversArgument.names, 

225 **ac.SolversArgument.kwargs) 

226 parser.add_argument(*ac.SparkleObjectiveArgument.names, 

227 **ac.SparkleObjectiveArgument.kwargs) 

228 parser.add_argument(*ac.CutOffTimeArgument.names, 

229 **ac.CutOffTimeArgument.kwargs) 

230 parser.add_argument(*ac.SolverSeedsArgument.names, 

231 **ac.SolverSeedsArgument.kwargs) 

232 parser.add_argument(*ac.RunOnArgument.names, 

233 **ac.RunOnArgument.kwargs) 

234 parser.add_argument(*ac.SettingsFileArgument.names, 

235 **ac.SettingsFileArgument.kwargs) 

236 return parser 

237 

238 

239def main(argv: list[str]) -> None: 

240 """Main method of run parallel portfolio command.""" 

241 # Log command call 

242 sl.log_command(sys.argv) 

243 

244 # Define command line arguments 

245 parser = parser_function() 

246 

247 # Process command line arguments 

248 args = parser.parse_args(argv) 

249 if args.solvers is not None: 

250 solver_paths = [resolve_object_name("".join(s), 

251 target_dir=gv.settings().DEFAULT_solver_dir) 

252 for s in args.solvers] 

253 if None in solver_paths: 

254 print("Some solvers not recognised! Check solver names:") 

255 for i, name in enumerate(solver_paths): 

256 if solver_paths[i] is None: 

257 print(f'\t- "{solver_paths[i]}" ') 

258 sys.exit(-1) 

259 solvers = [Solver(p) for p in solver_paths] 

260 else: 

261 solvers = [Solver(p) for p in 

262 gv.settings().DEFAULT_solver_dir.iterdir() if p.is_dir()] 

263 

264 check_for_initialise(COMMAND_DEPENDENCIES[CommandName.RUN_PARALLEL_PORTFOLIO]) 

265 

266 # Compare current settings to latest.ini 

267 prev_settings = Settings(PurePath("Settings/latest.ini")) 

268 Settings.check_settings_changes(gv.settings(), prev_settings) 

269 

270 # Do first, so other command line options can override settings from the file 

271 if args.settings_file is not None: 

272 gv.settings().read_settings_ini(args.settings_file, SettingState.CMD_LINE) 

273 

274 portfolio_path = args.portfolio_name 

275 

276 if args.run_on is not None: 

277 gv.settings().set_run_on( 

278 args.run_on.value, SettingState.CMD_LINE) 

279 run_on = gv.settings().get_run_on() 

280 

281 if args.solver_seeds is not None: 

282 gv.settings().set_parallel_portfolio_number_of_seeds_per_solver( 

283 args.solver_seeds, SettingState.CMD_LINE) 

284 

285 if run_on == Runner.LOCAL: 

286 print("Parallel Portfolio is not fully supported yet for Local runs. Exiting.") 

287 sys.exit(-1) 

288 

289 # Retrieve instance set 

290 data_set = resolve_object_name( 

291 args.instance_path, 

292 gv.file_storage_data_mapping[gv.instances_nickname_path], 

293 gv.settings().DEFAULT_instance_dir, 

294 Instance_Set) 

295 

296 print(f"Running on {data_set.size} instance(s)...") 

297 

298 if args.cutoff_time is not None: 

299 gv.settings().set_general_target_cutoff_time(args.cutoff_time, 

300 SettingState.CMD_LINE) 

301 

302 if args.objectives is not None: 

303 gv.settings().set_general_sparkle_objectives( 

304 args.objectives, SettingState.CMD_LINE) 

305 if not gv.settings().get_general_sparkle_objectives()[0].time: 

306 print("ERROR: Parallel Portfolio is currently only relevant for " 

307 "RunTime objectives. In all other cases, use validation") 

308 sys.exit(-1) 

309 

310 if args.portfolio_name is not None: # Use a nickname 

311 portfolio_path = gv.settings().DEFAULT_parallel_portfolio_output_raw /\ 

312 args.portfolio_name 

313 else: # Generate a timestamped nickname 

314 timestamp = time.strftime("%Y-%m-%d-%H:%M:%S", time.gmtime(time.time())) 

315 randintstamp = int(random.getrandbits(32)) 

316 portfolio_path = gv.settings().DEFAULT_parallel_portfolio_output_raw /\ 

317 f"{timestamp}_{randintstamp}" 

318 if portfolio_path.exists(): 

319 print(f"[WARNING] Portfolio path {portfolio_path} already exists! " 

320 "Overwrite? [y/n] ", end="") 

321 user_input = input() 

322 if user_input != "y": 

323 sys.exit() 

324 shutil.rmtree(portfolio_path) 

325 portfolio_path.mkdir(parents=True) 

326 run_parallel_portfolio(data_set, portfolio_path, solvers, run_on=run_on) 

327 

328 # Update latest scenario 

329 gv.latest_scenario().set_parallel_portfolio_path(portfolio_path) 

330 gv.latest_scenario().set_latest_scenario(Scenario.PARALLEL_PORTFOLIO) 

331 gv.latest_scenario().set_parallel_portfolio_instance_path(args.instance_path) 

332 # Write used scenario to file 

333 gv.latest_scenario().write_scenario_ini() 

334 # Write used settings to file 

335 gv.settings().write_used_settings() 

336 print("Running Sparkle parallel portfolio is done!") 

337 sys.exit(0) 

338 

339 

340if __name__ == "__main__": 

341 main(sys.argv[1:])