Coverage for python / lsst / ctrl / bps / drivers.py: 13%

213 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-06 08:35 +0000

1# This file is part of ctrl_bps. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28"""Driver functions for each subcommand. 

29 

30Driver functions ensure that ensure all setup work is done before running 

31the subcommand method. 

32""" 

33 

34__all__ = [ 

35 "acquire_qgraph_driver", 

36 "cancel_driver", 

37 "cluster_qgraph_driver", 

38 "ping_driver", 

39 "prepare_driver", 

40 "report_driver", 

41 "restart_driver", 

42 "status_driver", 

43 "submit_driver", 

44 "submitcmd_driver", 

45 "transform_driver", 

46] 

47 

48 

49import logging 

50import os 

51from pathlib import Path 

52 

53from lsst.pipe.base.quantum_graph import PredictedQuantumGraph 

54from lsst.utils.timer import time_this 

55from lsst.utils.usage import get_peak_mem_usage 

56 

57from . import BPS_DEFAULTS, BPS_SEARCH_ORDER, DEFAULT_MEM_FMT, DEFAULT_MEM_UNIT, BpsConfig 

58from .bps_reports import compile_code_summary, compile_job_summary 

59from .bps_utils import _dump_env_info, _dump_pkg_info, _make_id_link 

60from .cancel import cancel 

61from .construct import construct 

62from .initialize import ( 

63 custom_job_validator, 

64 init_submission, 

65 out_collection_validator, 

66 output_run_validator, 

67 submit_path_validator, 

68) 

69from .ping import ping 

70from .pre_transform import acquire_quantum_graph, cluster_quanta 

71from .prepare import prepare 

72from .report import display_report, retrieve_report 

73from .restart import restart 

74from .status import status 

75from .submit import submit 

76from .transform import transform 

77 

78_LOG = logging.getLogger(__name__) 

79 

80 

81def _init_submission_driver(config_file: str, **kwargs) -> BpsConfig: 

82 """Initialize runtime environment. 

83 

84 Parameters 

85 ---------- 

86 config_file : `str` 

87 Name of the configuration file. 

88 **kwargs : `~typing.Any` 

89 Additional modifiers to the configuration. 

90 

91 Returns 

92 ------- 

93 config : `lsst.ctrl.bps.BpsConfig` 

94 Batch Processing Service configuration. 

95 """ 

96 validators = [submit_path_validator, output_run_validator, out_collection_validator] 

97 _LOG.info("Initializing BPS configuration and creating submit directory") 

98 with time_this( 

99 log=_LOG, 

100 level=logging.INFO, 

101 prefix=None, 

102 msg="BPS configuration initialized and submit directory created", 

103 mem_usage=True, 

104 mem_unit=DEFAULT_MEM_UNIT, 

105 mem_fmt=DEFAULT_MEM_FMT, 

106 ): 

107 config = init_submission(config_file, validators=validators, **kwargs) 

108 _log_mem_usage() 

109 

110 submit_path = config[".bps_defined.submitPath"] 

111 print(f"Submit dir: {submit_path}") 

112 return config 

113 

114 

115def acquire_qgraph_driver(config_file: str, **kwargs) -> tuple[BpsConfig, PredictedQuantumGraph]: 

116 """Read a quantum graph from a file or create one from pipeline definition. 

117 

118 Parameters 

119 ---------- 

120 config_file : `str` 

121 Name of the configuration file. 

122 **kwargs : `~typing.Any` 

123 Additional modifiers to the configuration. 

124 

125 Returns 

126 ------- 

127 config : `lsst.ctrl.bps.BpsConfig` 

128 Updated configuration. 

129 qgraph : `lsst.pipe.base.quantum_graph.PredictedQuantumGraph` 

130 A graph representing quanta. 

131 """ 

132 config = _init_submission_driver(config_file, **kwargs) 

133 

134 _LOG.info("Starting acquire stage (generating and/or reading quantum graph)") 

135 submit_path = config[".bps_defined.submitPath"] 

136 with time_this( 

137 log=_LOG, 

138 level=logging.INFO, 

139 prefix=None, 

140 msg="Acquire stage completed", 

141 mem_usage=True, 

142 mem_unit=DEFAULT_MEM_UNIT, 

143 mem_fmt=DEFAULT_MEM_FMT, 

144 ): 

145 qgraph_file, qgraph = acquire_quantum_graph(config, out_prefix=submit_path) 

146 _log_mem_usage() 

147 

148 config[".bps_defined.runQgraphFile"] = qgraph_file 

149 return config, qgraph 

150 

151 

152def cluster_qgraph_driver(config_file, **kwargs): 

153 """Group quanta into clusters. 

154 

155 Parameters 

156 ---------- 

157 config_file : `str` 

158 Name of the configuration file. 

159 **kwargs : `~typing.Any` 

160 Additional modifiers to the configuration. 

161 

162 Returns 

163 ------- 

164 config : `lsst.ctrl.bps.BpsConfig` 

165 Updated configuration. 

166 clustered_qgraph : `lsst.ctrl.bps.ClusteredQuantumGraph` 

167 A graph representing clustered quanta. 

168 """ 

169 config, qgraph = acquire_qgraph_driver(config_file, **kwargs) 

170 

171 _LOG.info("Starting cluster stage (grouping quanta into jobs)") 

172 with time_this( 

173 log=_LOG, 

174 level=logging.INFO, 

175 prefix=None, 

176 msg="Cluster stage completed", 

177 mem_usage=True, 

178 mem_unit=DEFAULT_MEM_UNIT, 

179 mem_fmt=DEFAULT_MEM_FMT, 

180 ): 

181 clustered_qgraph = cluster_quanta(config, qgraph, config["uniqProcName"]) 

182 _log_mem_usage() 

183 

184 _LOG.info("ClusteredQuantumGraph contains %d cluster(s)", len(clustered_qgraph)) 

185 

186 submit_path = config[".bps_defined.submitPath"] 

187 _, save_clustered_qgraph = config.search("saveClusteredQgraph", opt={"default": False}) 

188 if save_clustered_qgraph: 

189 clustered_qgraph.save(os.path.join(submit_path, "bps_clustered_qgraph.pickle")) 

190 _, save_dot = config.search("saveDot", opt={"default": False}) 

191 if save_dot: 

192 clustered_qgraph.draw(os.path.join(submit_path, "bps_clustered_qgraph.dot")) 

193 return config, clustered_qgraph 

194 

195 

196def transform_driver(config_file, **kwargs): 

197 """Create a workflow for a specific workflow management system. 

198 

199 Parameters 

200 ---------- 

201 config_file : `str` 

202 Name of the configuration file. 

203 **kwargs : `~typing.Any` 

204 Additional modifiers to the configuration. 

205 

206 Returns 

207 ------- 

208 generic_workflow_config : `lsst.ctrl.bps.BpsConfig` 

209 Configuration to use when creating the workflow. 

210 generic_workflow : `lsst.ctrl.bps.BaseWmsWorkflow` 

211 Representation of the abstract/scientific workflow specific to a given 

212 workflow management system. 

213 """ 

214 config, clustered_qgraph = cluster_qgraph_driver(config_file, **kwargs) 

215 submit_path = config[".bps_defined.submitPath"] 

216 

217 _LOG.info("Starting transform stage (creating generic workflow)") 

218 with time_this( 

219 log=_LOG, 

220 level=logging.INFO, 

221 prefix=None, 

222 msg="Transform stage completed", 

223 mem_usage=True, 

224 mem_unit=DEFAULT_MEM_UNIT, 

225 mem_fmt=DEFAULT_MEM_FMT, 

226 ): 

227 generic_workflow, generic_workflow_config = transform(config, clustered_qgraph, submit_path) 

228 _LOG.info("Generic workflow name '%s'", generic_workflow.name) 

229 _log_mem_usage() 

230 

231 num_jobs = sum(generic_workflow.job_counts.values()) 

232 _LOG.info("GenericWorkflow contains %d job(s) (including final)", num_jobs) 

233 

234 _, save_workflow = config.search("saveGenericWorkflow", opt={"default": False}) 

235 if save_workflow: 

236 with open(os.path.join(submit_path, "bps_generic_workflow.pickle"), "wb") as outfh: 

237 generic_workflow.save(outfh, "pickle") 

238 _, save_dot = config.search("saveDot", opt={"default": False}) 

239 if save_dot: 

240 with open(os.path.join(submit_path, "bps_generic_workflow.dot"), "w") as outfh: 

241 generic_workflow.draw(outfh, "dot") 

242 return generic_workflow_config, generic_workflow 

243 

244 

245def prepare_driver(config_file, **kwargs): 

246 """Create a representation of the generic workflow. 

247 

248 Parameters 

249 ---------- 

250 config_file : `str` 

251 Name of the configuration file. 

252 **kwargs : `~typing.Any` 

253 Additional modifiers to the configuration. 

254 

255 Returns 

256 ------- 

257 wms_config : `lsst.ctrl.bps.BpsConfig` 

258 Configuration to use when creating the workflow. 

259 workflow : `lsst.ctrl.bps.BaseWmsWorkflow` 

260 Representation of the abstract/scientific workflow specific to a given 

261 workflow management system. 

262 """ 

263 kwargs.setdefault("runWmsSubmissionChecks", True) 

264 generic_workflow_config, generic_workflow = transform_driver(config_file, **kwargs) 

265 submit_path = generic_workflow_config[".bps_defined.submitPath"] 

266 

267 _LOG.info("Starting prepare stage (creating specific implementation of workflow)") 

268 with time_this( 

269 log=_LOG, 

270 level=logging.INFO, 

271 prefix=None, 

272 msg="Prepare stage completed", 

273 mem_usage=True, 

274 mem_unit=DEFAULT_MEM_UNIT, 

275 mem_fmt=DEFAULT_MEM_FMT, 

276 ): 

277 wms_workflow = prepare(generic_workflow_config, generic_workflow, submit_path) 

278 _log_mem_usage() 

279 

280 wms_workflow_config = generic_workflow_config 

281 return wms_workflow_config, wms_workflow 

282 

283 

284def submit_driver(config_file, **kwargs): 

285 """Submit workflow for execution. 

286 

287 Parameters 

288 ---------- 

289 config_file : `str` 

290 Name of the configuration file. 

291 **kwargs : `~typing.Any` 

292 Additional modifiers to the configuration. 

293 """ 

294 kwargs.setdefault("runWmsSubmissionChecks", True) 

295 

296 _LOG.info( 

297 "DISCLAIMER: All values regarding memory consumption reported below are approximate and may " 

298 "not accurately reflect actual memory usage by the bps process." 

299 ) 

300 

301 remote_build = {} 

302 config = BpsConfig( 

303 config_file, 

304 search_order=BPS_SEARCH_ORDER, 

305 defaults=BPS_DEFAULTS, 

306 wms_service_class_fqn=kwargs.get("wms_service"), 

307 ) 

308 _, remote_build = config.search("remoteBuild", opt={"default": {}}) 

309 if remote_build: 

310 if config["wmsServiceClass"] == "lsst.ctrl.bps.panda.PanDAService": 

311 if not remote_build.search("enabled", opt={"default": False})[1]: 

312 remote_build = {} 

313 _LOG.info("The workflow is submitted to the local Data Facility.") 

314 else: 

315 _LOG.info( 

316 "Remote submission is enabled. The workflow is submitted to a remote Data Facility." 

317 ) 

318 _LOG.info("Initializing execution environment") 

319 with time_this( 

320 log=_LOG, 

321 level=logging.INFO, 

322 prefix=None, 

323 msg="Initializing execution environment completed", 

324 mem_usage=True, 

325 mem_unit=DEFAULT_MEM_UNIT, 

326 mem_fmt=DEFAULT_MEM_FMT, 

327 ): 

328 config = _init_submission_driver(config_file, **kwargs) 

329 kwargs["remote_build"] = remote_build 

330 kwargs["config_file"] = config_file 

331 wms_workflow = None 

332 else: 

333 _LOG.info("The workflow is submitted to the local Data Facility.") 

334 

335 _LOG.info("Starting submission process") 

336 with time_this( 

337 log=_LOG, 

338 level=logging.INFO, 

339 prefix=None, 

340 msg="Submission process completed", 

341 mem_usage=True, 

342 mem_unit=DEFAULT_MEM_UNIT, 

343 mem_fmt=DEFAULT_MEM_FMT, 

344 ): 

345 if not remote_build: 

346 wms_workflow_config, wms_workflow = prepare_driver(config_file, **kwargs) 

347 else: 

348 wms_workflow_config = config 

349 

350 _LOG.info("Starting submit stage") 

351 with time_this( 

352 log=_LOG, 

353 level=logging.INFO, 

354 prefix=None, 

355 msg="Submit stage completed", 

356 mem_usage=True, 

357 mem_unit=DEFAULT_MEM_UNIT, 

358 mem_fmt=DEFAULT_MEM_FMT, 

359 ): 

360 workflow = submit(wms_workflow_config, wms_workflow, **kwargs) 

361 if not wms_workflow: 

362 wms_workflow = workflow 

363 _LOG.info("Run '%s' submitted for execution with id '%s'", wms_workflow.name, wms_workflow.run_id) 

364 _log_mem_usage() 

365 

366 _make_id_link(wms_workflow_config, wms_workflow.run_id) 

367 

368 print(f"Run Id: {wms_workflow.run_id}") 

369 print(f"Run Name: {wms_workflow.name}") 

370 

371 

372def restart_driver(wms_service, run_id): 

373 """Restart a failed workflow. 

374 

375 Parameters 

376 ---------- 

377 wms_service : `str` 

378 Name of the class. 

379 run_id : `str` 

380 Id or path of workflow that need to be restarted. 

381 """ 

382 if wms_service is None: 

383 default_config = BpsConfig({}, defaults=BPS_DEFAULTS) 

384 wms_service = default_config["wmsServiceClass"] 

385 

386 new_run_id, run_name, message = restart(wms_service, run_id) 

387 if new_run_id is not None: 

388 path = Path(run_id) 

389 if path.exists(): 

390 _dump_env_info(f"{run_id}/{run_name}.env.info.yaml") 

391 _dump_pkg_info(f"{run_id}/{run_name}.pkg.info.yaml") 

392 config = BpsConfig(f"{run_id}/{run_name}_config.yaml") 

393 _make_id_link(config, new_run_id) 

394 

395 print(f"Run Id: {new_run_id}") 

396 print(f"Run Name: {run_name}") 

397 else: 

398 if message: 

399 print(f"Restart failed: {message}") 

400 else: 

401 print("Restart failed: Unknown error") 

402 

403 

404def report_driver( 

405 wms_service: str | None = None, 

406 run_id: str | None = None, 

407 user: str | None = None, 

408 hist_days: float = 0.0, 

409 pass_thru: str | None = None, 

410 is_global: bool = False, 

411 return_exit_codes: bool = False, 

412): 

413 """Print out the summary of jobs submitted for execution. 

414 

415 Parameters 

416 ---------- 

417 wms_service : `str`, optional 

418 Name of the class. 

419 run_id : `str`, optional 

420 A run id the report will be restricted to. 

421 user : `str`, optional 

422 A user the report will be restricted to. 

423 hist_days : `float`, optional 

424 Number of past days to consider while preparing the report. By default, 

425 only the currently running workflows are included in the report. 

426 If the report is restricted to a single run (i.e., ``run_id`` is set), 

427 the history search will be limited by default to two past days. 

428 pass_thru : `str`, optional 

429 A string to pass directly to the WMS service class. 

430 is_global : `bool`, optional 

431 If set, all available job queues will be queried for job information. 

432 Defaults to False which means that only a local job queue will be 

433 queried for information. 

434 

435 Only applicable in the context of a WMS using distributed job queues 

436 (e.g., HTCondor). 

437 return_exit_codes : `bool`, optional 

438 If set, return exit codes related to jobs with a 

439 non-success status. Defaults to False, which means that only 

440 the summary state is returned. 

441 

442 Only applicable in the context of a WMS with associated 

443 handlers to return exit codes from jobs. 

444 """ 

445 if not wms_service: 

446 default_config = BpsConfig(BPS_DEFAULTS) 

447 wms_service = os.environ.get("BPS_WMS_SERVICE_CLASS", default_config["wmsServiceClass"]) 

448 

449 # When reporting on a single run: 

450 # * increase history until a better mechanism for handling completed jobs 

451 # is available. 

452 # * massage the retrieved reports using BPS report postprocessors. 

453 if run_id: 

454 hist_days = max(hist_days, 2) 

455 postprocessors = [compile_job_summary] 

456 if return_exit_codes: 

457 postprocessors.append(compile_code_summary) 

458 else: 

459 postprocessors = None 

460 

461 runs, messages = retrieve_report( 

462 wms_service, 

463 run_id=run_id, 

464 user=user, 

465 hist=hist_days, 

466 pass_thru=pass_thru, 

467 is_global=is_global, 

468 return_exit_codes=return_exit_codes, 

469 postprocessors=postprocessors, 

470 ) 

471 

472 if runs or messages: 

473 display_report( 

474 runs, 

475 messages, 

476 is_detailed=bool(run_id), 

477 is_global=is_global, 

478 return_exit_codes=return_exit_codes, 

479 ) 

480 else: 

481 if run_id: 

482 print( 

483 f"No records found for job id '{run_id}'. " 

484 f"Hints: Double check id, retry with a larger --hist value (currently: {hist_days}), " 

485 "and/or use --global to search all job queues." 

486 ) 

487 

488 

489def status_driver(wms_service: str, run_id: str, hist_days: float, is_global: bool = False) -> int: 

490 """Print out status of workflow submitted for execution. 

491 

492 Parameters 

493 ---------- 

494 wms_service : `str` 

495 Name of the class. 

496 run_id : `str` 

497 A run id the report will be restricted to. 

498 hist_days : `float` 

499 Number of days. 

500 is_global : `bool`, optional 

501 If set, all available job queues will be queried for job information. 

502 Defaults to False which means that only a local job queue will be 

503 queried for information. 

504 

505 Only applicable in the context of a WMS using distributed job queues 

506 (e.g., HTCondor). 

507 

508 Returns 

509 ------- 

510 state : `int` 

511 Status of submitted workflow. 

512 """ 

513 if wms_service is None: 

514 default_config = BpsConfig(BPS_DEFAULTS) 

515 wms_service = os.environ.get("BPS_WMS_SERVICE_CLASS", default_config["wmsServiceClass"]) 

516 

517 state, message = status( 

518 wms_service, 

519 run_id=run_id, 

520 hist=hist_days, 

521 is_global=is_global, 

522 ) 

523 

524 _LOG.info("status: %s", state.name) 

525 if message: 

526 _LOG.warning(message) 

527 

528 return state.value 

529 

530 

531def cancel_driver(wms_service, run_id, user, require_bps, pass_thru, is_global=False): 

532 """Cancel submitted workflows. 

533 

534 Parameters 

535 ---------- 

536 wms_service : `str` 

537 Name of the Workload Management System service class. 

538 run_id : `str` 

539 ID or path of job that should be canceled. 

540 user : `str` 

541 User whose submitted jobs should be canceled. 

542 require_bps : `bool` 

543 Whether to require given run_id/user to be a bps submitted job. 

544 pass_thru : `str` 

545 Information to pass through to WMS. 

546 is_global : `bool`, optional 

547 If set, all available job queues will be checked for jobs to cancel. 

548 Defaults to False which means that only a local job queue will be 

549 checked. 

550 

551 Only applicable in the context of a WMS using distributed job queues 

552 (e.g., HTCondor). 

553 """ 

554 if wms_service is None: 

555 default_config = BpsConfig({}, defaults=BPS_DEFAULTS) 

556 wms_service = default_config["wmsServiceClass"] 

557 cancel(wms_service, run_id, user, require_bps, pass_thru, is_global=is_global) 

558 

559 

560def ping_driver(wms_service=None, pass_thru=None): 

561 """Check whether WMS services are up, reachable, and any authentication, 

562 if needed, succeeds. 

563 

564 The services to be checked are those needed for submit, report, cancel, 

565 restart, but ping cannot guarantee whether jobs would actually run 

566 successfully. 

567 

568 Parameters 

569 ---------- 

570 wms_service : `str`, optional 

571 Name of the Workload Management System service class. 

572 pass_thru : `str`, optional 

573 Information to pass through to WMS. 

574 

575 Returns 

576 ------- 

577 success : `int` 

578 Whether services are up and usable (0) or not (non-zero). 

579 """ 

580 if wms_service is None: 

581 default_config = BpsConfig({}, defaults=BPS_DEFAULTS) 

582 wms_service = default_config["wmsServiceClass"] 

583 status, message = ping(wms_service, pass_thru) 

584 

585 if message: 

586 if not status: 

587 _LOG.info(message) 

588 else: 

589 _LOG.error(message) 

590 

591 # Log overall status message 

592 if not status: 

593 _LOG.info("Ping successful.") 

594 else: 

595 _LOG.error("Ping failed (%d).", status) 

596 

597 return status 

598 

599 

600def submitcmd_driver(config_file: str, **kwargs) -> None: 

601 """Submit a command for execution. 

602 

603 Parameters 

604 ---------- 

605 config_file : `str` 

606 Name of the configuration file. 

607 **kwargs : `~typing.Any` 

608 Additional modifiers to the configuration. 

609 """ 

610 validators = [submit_path_validator, custom_job_validator] 

611 _LOG.info("Initializing BPS configuration and creating submit directory") 

612 with time_this( 

613 log=_LOG, 

614 level=logging.INFO, 

615 prefix=None, 

616 msg="BPS configuration initialized and submit directory created", 

617 mem_usage=True, 

618 mem_unit=DEFAULT_MEM_UNIT, 

619 mem_fmt=DEFAULT_MEM_FMT, 

620 ): 

621 config = init_submission(config_file, validators=validators, **kwargs) 

622 _log_mem_usage() 

623 

624 submit_path = config[".bps_defined.submitPath"] 

625 

626 _LOG.info("Starting construction stage (creating generic workflow)") 

627 with time_this( 

628 log=_LOG, 

629 level=logging.INFO, 

630 prefix=None, 

631 msg="Construction stage completed", 

632 mem_usage=True, 

633 mem_unit=DEFAULT_MEM_UNIT, 

634 mem_fmt=DEFAULT_MEM_FMT, 

635 ): 

636 generic_workflow, generic_workflow_config = construct(config) 

637 _LOG.info("Generic workflow name '%s'", generic_workflow.name) 

638 _log_mem_usage() 

639 

640 _, save_workflow = config.search("saveGenericWorkflow", opt={"default": False}) 

641 if save_workflow: 

642 with open(os.path.join(submit_path, "bps_generic_workflow.pickle"), "wb") as outfh: 

643 generic_workflow.save(outfh, "pickle") 

644 _, save_dot = config.search("saveDot", opt={"default": False}) 

645 if save_dot: 

646 with open(os.path.join(submit_path, "bps_generic_workflow.dot"), "w") as outfh: 

647 generic_workflow.draw(outfh, "dot") 

648 

649 _LOG.info("Starting prepare stage (creating specific implementation of workflow)") 

650 with time_this( 

651 log=_LOG, 

652 level=logging.INFO, 

653 prefix=None, 

654 msg="Prepare stage completed", 

655 mem_usage=True, 

656 mem_unit=DEFAULT_MEM_UNIT, 

657 mem_fmt=DEFAULT_MEM_FMT, 

658 ): 

659 wms_workflow = prepare(generic_workflow_config, generic_workflow, submit_path) 

660 _log_mem_usage() 

661 

662 wms_workflow_config = generic_workflow_config 

663 

664 if kwargs.get("dry_run", False): 

665 return 

666 

667 _LOG.info("Starting submit stage") 

668 with time_this( 

669 log=_LOG, 

670 level=logging.INFO, 

671 prefix=None, 

672 msg="Submit stage completed", 

673 mem_usage=True, 

674 mem_unit=DEFAULT_MEM_UNIT, 

675 mem_fmt=DEFAULT_MEM_FMT, 

676 ): 

677 submit(wms_workflow_config, wms_workflow, **kwargs) 

678 _log_mem_usage() 

679 print(f"Run Id: {wms_workflow.run_id}") 

680 print(f"Run Name: {wms_workflow.name}") 

681 

682 

683def _log_mem_usage() -> None: 

684 """Log memory usage.""" 

685 if _LOG.isEnabledFor(logging.INFO): 

686 _LOG.info( 

687 "Peak memory usage for bps process %s (main), %s (largest child process)", 

688 *tuple(f"{val.to(DEFAULT_MEM_UNIT):{DEFAULT_MEM_FMT}}" for val in get_peak_mem_usage()), 

689 )