Coverage for python / lsst / ctrl / bps / drivers.py: 13%
213 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 08:47 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 08:47 +0000
1# This file is part of ctrl_bps.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28"""Driver functions for each subcommand.
30Driver functions ensure that ensure all setup work is done before running
31the subcommand method.
32"""
34__all__ = [
35 "acquire_qgraph_driver",
36 "cancel_driver",
37 "cluster_qgraph_driver",
38 "ping_driver",
39 "prepare_driver",
40 "report_driver",
41 "restart_driver",
42 "status_driver",
43 "submit_driver",
44 "submitcmd_driver",
45 "transform_driver",
46]
49import logging
50import os
51from pathlib import Path
53from lsst.pipe.base.quantum_graph import PredictedQuantumGraph
54from lsst.utils.timer import time_this
55from lsst.utils.usage import get_peak_mem_usage
57from . import BPS_DEFAULTS, BPS_SEARCH_ORDER, DEFAULT_MEM_FMT, DEFAULT_MEM_UNIT, BpsConfig
58from .bps_reports import compile_code_summary, compile_job_summary
59from .bps_utils import _dump_env_info, _dump_pkg_info, _make_id_link
60from .cancel import cancel
61from .construct import construct
62from .initialize import (
63 custom_job_validator,
64 init_submission,
65 out_collection_validator,
66 output_run_validator,
67 submit_path_validator,
68)
69from .ping import ping
70from .pre_transform import acquire_quantum_graph, cluster_quanta
71from .prepare import prepare
72from .report import display_report, retrieve_report
73from .restart import restart
74from .status import status
75from .submit import submit
76from .transform import transform
78_LOG = logging.getLogger(__name__)
81def _init_submission_driver(config_file: str, **kwargs) -> BpsConfig:
82 """Initialize runtime environment.
84 Parameters
85 ----------
86 config_file : `str`
87 Name of the configuration file.
88 **kwargs : `~typing.Any`
89 Additional modifiers to the configuration.
91 Returns
92 -------
93 config : `lsst.ctrl.bps.BpsConfig`
94 Batch Processing Service configuration.
95 """
96 validators = [submit_path_validator, output_run_validator, out_collection_validator]
97 _LOG.info("Initializing BPS configuration and creating submit directory")
98 with time_this(
99 log=_LOG,
100 level=logging.INFO,
101 prefix=None,
102 msg="BPS configuration initialized and submit directory created",
103 mem_usage=True,
104 mem_unit=DEFAULT_MEM_UNIT,
105 mem_fmt=DEFAULT_MEM_FMT,
106 ):
107 config = init_submission(config_file, validators=validators, **kwargs)
108 _log_mem_usage()
110 submit_path = config[".bps_defined.submitPath"]
111 print(f"Submit dir: {submit_path}")
112 return config
115def acquire_qgraph_driver(config_file: str, **kwargs) -> tuple[BpsConfig, PredictedQuantumGraph]:
116 """Read a quantum graph from a file or create one from pipeline definition.
118 Parameters
119 ----------
120 config_file : `str`
121 Name of the configuration file.
122 **kwargs : `~typing.Any`
123 Additional modifiers to the configuration.
125 Returns
126 -------
127 config : `lsst.ctrl.bps.BpsConfig`
128 Updated configuration.
129 qgraph : `lsst.pipe.base.quantum_graph.PredictedQuantumGraph`
130 A graph representing quanta.
131 """
132 config = _init_submission_driver(config_file, **kwargs)
134 _LOG.info("Starting acquire stage (generating and/or reading quantum graph)")
135 submit_path = config[".bps_defined.submitPath"]
136 with time_this(
137 log=_LOG,
138 level=logging.INFO,
139 prefix=None,
140 msg="Acquire stage completed",
141 mem_usage=True,
142 mem_unit=DEFAULT_MEM_UNIT,
143 mem_fmt=DEFAULT_MEM_FMT,
144 ):
145 qgraph_file, qgraph = acquire_quantum_graph(config, out_prefix=submit_path)
146 _log_mem_usage()
148 config[".bps_defined.runQgraphFile"] = qgraph_file
149 return config, qgraph
152def cluster_qgraph_driver(config_file, **kwargs):
153 """Group quanta into clusters.
155 Parameters
156 ----------
157 config_file : `str`
158 Name of the configuration file.
159 **kwargs : `~typing.Any`
160 Additional modifiers to the configuration.
162 Returns
163 -------
164 config : `lsst.ctrl.bps.BpsConfig`
165 Updated configuration.
166 clustered_qgraph : `lsst.ctrl.bps.ClusteredQuantumGraph`
167 A graph representing clustered quanta.
168 """
169 config, qgraph = acquire_qgraph_driver(config_file, **kwargs)
171 _LOG.info("Starting cluster stage (grouping quanta into jobs)")
172 with time_this(
173 log=_LOG,
174 level=logging.INFO,
175 prefix=None,
176 msg="Cluster stage completed",
177 mem_usage=True,
178 mem_unit=DEFAULT_MEM_UNIT,
179 mem_fmt=DEFAULT_MEM_FMT,
180 ):
181 clustered_qgraph = cluster_quanta(config, qgraph, config["uniqProcName"])
182 _log_mem_usage()
184 _LOG.info("ClusteredQuantumGraph contains %d cluster(s)", len(clustered_qgraph))
186 submit_path = config[".bps_defined.submitPath"]
187 _, save_clustered_qgraph = config.search("saveClusteredQgraph", opt={"default": False})
188 if save_clustered_qgraph:
189 clustered_qgraph.save(os.path.join(submit_path, "bps_clustered_qgraph.pickle"))
190 _, save_dot = config.search("saveDot", opt={"default": False})
191 if save_dot:
192 clustered_qgraph.draw(os.path.join(submit_path, "bps_clustered_qgraph.dot"))
193 return config, clustered_qgraph
196def transform_driver(config_file, **kwargs):
197 """Create a workflow for a specific workflow management system.
199 Parameters
200 ----------
201 config_file : `str`
202 Name of the configuration file.
203 **kwargs : `~typing.Any`
204 Additional modifiers to the configuration.
206 Returns
207 -------
208 generic_workflow_config : `lsst.ctrl.bps.BpsConfig`
209 Configuration to use when creating the workflow.
210 generic_workflow : `lsst.ctrl.bps.BaseWmsWorkflow`
211 Representation of the abstract/scientific workflow specific to a given
212 workflow management system.
213 """
214 config, clustered_qgraph = cluster_qgraph_driver(config_file, **kwargs)
215 submit_path = config[".bps_defined.submitPath"]
217 _LOG.info("Starting transform stage (creating generic workflow)")
218 with time_this(
219 log=_LOG,
220 level=logging.INFO,
221 prefix=None,
222 msg="Transform stage completed",
223 mem_usage=True,
224 mem_unit=DEFAULT_MEM_UNIT,
225 mem_fmt=DEFAULT_MEM_FMT,
226 ):
227 generic_workflow, generic_workflow_config = transform(config, clustered_qgraph, submit_path)
228 _LOG.info("Generic workflow name '%s'", generic_workflow.name)
229 _log_mem_usage()
231 num_jobs = sum(generic_workflow.job_counts.values())
232 _LOG.info("GenericWorkflow contains %d job(s) (including final)", num_jobs)
234 _, save_workflow = config.search("saveGenericWorkflow", opt={"default": False})
235 if save_workflow:
236 with open(os.path.join(submit_path, "bps_generic_workflow.pickle"), "wb") as outfh:
237 generic_workflow.save(outfh, "pickle")
238 _, save_dot = config.search("saveDot", opt={"default": False})
239 if save_dot:
240 with open(os.path.join(submit_path, "bps_generic_workflow.dot"), "w") as outfh:
241 generic_workflow.draw(outfh, "dot")
242 return generic_workflow_config, generic_workflow
245def prepare_driver(config_file, **kwargs):
246 """Create a representation of the generic workflow.
248 Parameters
249 ----------
250 config_file : `str`
251 Name of the configuration file.
252 **kwargs : `~typing.Any`
253 Additional modifiers to the configuration.
255 Returns
256 -------
257 wms_config : `lsst.ctrl.bps.BpsConfig`
258 Configuration to use when creating the workflow.
259 workflow : `lsst.ctrl.bps.BaseWmsWorkflow`
260 Representation of the abstract/scientific workflow specific to a given
261 workflow management system.
262 """
263 kwargs.setdefault("runWmsSubmissionChecks", True)
264 generic_workflow_config, generic_workflow = transform_driver(config_file, **kwargs)
265 submit_path = generic_workflow_config[".bps_defined.submitPath"]
267 _LOG.info("Starting prepare stage (creating specific implementation of workflow)")
268 with time_this(
269 log=_LOG,
270 level=logging.INFO,
271 prefix=None,
272 msg="Prepare stage completed",
273 mem_usage=True,
274 mem_unit=DEFAULT_MEM_UNIT,
275 mem_fmt=DEFAULT_MEM_FMT,
276 ):
277 wms_workflow = prepare(generic_workflow_config, generic_workflow, submit_path)
278 _log_mem_usage()
280 wms_workflow_config = generic_workflow_config
281 return wms_workflow_config, wms_workflow
284def submit_driver(config_file, **kwargs):
285 """Submit workflow for execution.
287 Parameters
288 ----------
289 config_file : `str`
290 Name of the configuration file.
291 **kwargs : `~typing.Any`
292 Additional modifiers to the configuration.
293 """
294 kwargs.setdefault("runWmsSubmissionChecks", True)
296 _LOG.info(
297 "DISCLAIMER: All values regarding memory consumption reported below are approximate and may "
298 "not accurately reflect actual memory usage by the bps process."
299 )
301 remote_build = {}
302 config = BpsConfig(
303 config_file,
304 search_order=BPS_SEARCH_ORDER,
305 defaults=BPS_DEFAULTS,
306 wms_service_class_fqn=kwargs.get("wms_service"),
307 )
308 _, remote_build = config.search("remoteBuild", opt={"default": {}})
309 if remote_build:
310 if config["wmsServiceClass"] == "lsst.ctrl.bps.panda.PanDAService":
311 if not remote_build.search("enabled", opt={"default": False})[1]:
312 remote_build = {}
313 _LOG.info("The workflow is submitted to the local Data Facility.")
314 else:
315 _LOG.info(
316 "Remote submission is enabled. The workflow is submitted to a remote Data Facility."
317 )
318 _LOG.info("Initializing execution environment")
319 with time_this(
320 log=_LOG,
321 level=logging.INFO,
322 prefix=None,
323 msg="Initializing execution environment completed",
324 mem_usage=True,
325 mem_unit=DEFAULT_MEM_UNIT,
326 mem_fmt=DEFAULT_MEM_FMT,
327 ):
328 config = _init_submission_driver(config_file, **kwargs)
329 kwargs["remote_build"] = remote_build
330 kwargs["config_file"] = config_file
331 wms_workflow = None
332 else:
333 _LOG.info("The workflow is submitted to the local Data Facility.")
335 _LOG.info("Starting submission process")
336 with time_this(
337 log=_LOG,
338 level=logging.INFO,
339 prefix=None,
340 msg="Submission process completed",
341 mem_usage=True,
342 mem_unit=DEFAULT_MEM_UNIT,
343 mem_fmt=DEFAULT_MEM_FMT,
344 ):
345 if not remote_build:
346 wms_workflow_config, wms_workflow = prepare_driver(config_file, **kwargs)
347 else:
348 wms_workflow_config = config
350 _LOG.info("Starting submit stage")
351 with time_this(
352 log=_LOG,
353 level=logging.INFO,
354 prefix=None,
355 msg="Submit stage completed",
356 mem_usage=True,
357 mem_unit=DEFAULT_MEM_UNIT,
358 mem_fmt=DEFAULT_MEM_FMT,
359 ):
360 workflow = submit(wms_workflow_config, wms_workflow, **kwargs)
361 if not wms_workflow:
362 wms_workflow = workflow
363 _LOG.info("Run '%s' submitted for execution with id '%s'", wms_workflow.name, wms_workflow.run_id)
364 _log_mem_usage()
366 _make_id_link(wms_workflow_config, wms_workflow.run_id)
368 print(f"Run Id: {wms_workflow.run_id}")
369 print(f"Run Name: {wms_workflow.name}")
372def restart_driver(wms_service, run_id):
373 """Restart a failed workflow.
375 Parameters
376 ----------
377 wms_service : `str`
378 Name of the class.
379 run_id : `str`
380 Id or path of workflow that need to be restarted.
381 """
382 if wms_service is None:
383 default_config = BpsConfig({}, defaults=BPS_DEFAULTS)
384 wms_service = default_config["wmsServiceClass"]
386 new_run_id, run_name, message = restart(wms_service, run_id)
387 if new_run_id is not None:
388 path = Path(run_id)
389 if path.exists():
390 _dump_env_info(f"{run_id}/{run_name}.env.info.yaml")
391 _dump_pkg_info(f"{run_id}/{run_name}.pkg.info.yaml")
392 config = BpsConfig(f"{run_id}/{run_name}_config.yaml")
393 _make_id_link(config, new_run_id)
395 print(f"Run Id: {new_run_id}")
396 print(f"Run Name: {run_name}")
397 else:
398 if message:
399 print(f"Restart failed: {message}")
400 else:
401 print("Restart failed: Unknown error")
404def report_driver(
405 wms_service: str | None = None,
406 run_id: str | None = None,
407 user: str | None = None,
408 hist_days: float = 0.0,
409 pass_thru: str | None = None,
410 is_global: bool = False,
411 return_exit_codes: bool = False,
412):
413 """Print out the summary of jobs submitted for execution.
415 Parameters
416 ----------
417 wms_service : `str`, optional
418 Name of the class.
419 run_id : `str`, optional
420 A run id the report will be restricted to.
421 user : `str`, optional
422 A user the report will be restricted to.
423 hist_days : `float`, optional
424 Number of past days to consider while preparing the report. By default,
425 only the currently running workflows are included in the report.
426 If the report is restricted to a single run (i.e., ``run_id`` is set),
427 the history search will be limited by default to two past days.
428 pass_thru : `str`, optional
429 A string to pass directly to the WMS service class.
430 is_global : `bool`, optional
431 If set, all available job queues will be queried for job information.
432 Defaults to False which means that only a local job queue will be
433 queried for information.
435 Only applicable in the context of a WMS using distributed job queues
436 (e.g., HTCondor).
437 return_exit_codes : `bool`, optional
438 If set, return exit codes related to jobs with a
439 non-success status. Defaults to False, which means that only
440 the summary state is returned.
442 Only applicable in the context of a WMS with associated
443 handlers to return exit codes from jobs.
444 """
445 if not wms_service:
446 default_config = BpsConfig(BPS_DEFAULTS)
447 wms_service = os.environ.get("BPS_WMS_SERVICE_CLASS", default_config["wmsServiceClass"])
449 # When reporting on a single run:
450 # * increase history until a better mechanism for handling completed jobs
451 # is available.
452 # * massage the retrieved reports using BPS report postprocessors.
453 if run_id:
454 hist_days = max(hist_days, 2)
455 postprocessors = [compile_job_summary]
456 if return_exit_codes:
457 postprocessors.append(compile_code_summary)
458 else:
459 postprocessors = None
461 runs, messages = retrieve_report(
462 wms_service,
463 run_id=run_id,
464 user=user,
465 hist=hist_days,
466 pass_thru=pass_thru,
467 is_global=is_global,
468 return_exit_codes=return_exit_codes,
469 postprocessors=postprocessors,
470 )
472 if runs or messages:
473 display_report(
474 runs,
475 messages,
476 is_detailed=bool(run_id),
477 is_global=is_global,
478 return_exit_codes=return_exit_codes,
479 )
480 else:
481 if run_id:
482 print(
483 f"No records found for job id '{run_id}'. "
484 f"Hints: Double check id, retry with a larger --hist value (currently: {hist_days}), "
485 "and/or use --global to search all job queues."
486 )
489def status_driver(wms_service: str, run_id: str, hist_days: float, is_global: bool = False) -> int:
490 """Print out status of workflow submitted for execution.
492 Parameters
493 ----------
494 wms_service : `str`
495 Name of the class.
496 run_id : `str`
497 A run id the report will be restricted to.
498 hist_days : `float`
499 Number of days.
500 is_global : `bool`, optional
501 If set, all available job queues will be queried for job information.
502 Defaults to False which means that only a local job queue will be
503 queried for information.
505 Only applicable in the context of a WMS using distributed job queues
506 (e.g., HTCondor).
508 Returns
509 -------
510 state : `int`
511 Status of submitted workflow.
512 """
513 if wms_service is None:
514 default_config = BpsConfig(BPS_DEFAULTS)
515 wms_service = os.environ.get("BPS_WMS_SERVICE_CLASS", default_config["wmsServiceClass"])
517 state, message = status(
518 wms_service,
519 run_id=run_id,
520 hist=hist_days,
521 is_global=is_global,
522 )
524 _LOG.info("status: %s", state.name)
525 if message:
526 _LOG.warning(message)
528 return state.value
531def cancel_driver(wms_service, run_id, user, require_bps, pass_thru, is_global=False):
532 """Cancel submitted workflows.
534 Parameters
535 ----------
536 wms_service : `str`
537 Name of the Workload Management System service class.
538 run_id : `str`
539 ID or path of job that should be canceled.
540 user : `str`
541 User whose submitted jobs should be canceled.
542 require_bps : `bool`
543 Whether to require given run_id/user to be a bps submitted job.
544 pass_thru : `str`
545 Information to pass through to WMS.
546 is_global : `bool`, optional
547 If set, all available job queues will be checked for jobs to cancel.
548 Defaults to False which means that only a local job queue will be
549 checked.
551 Only applicable in the context of a WMS using distributed job queues
552 (e.g., HTCondor).
553 """
554 if wms_service is None:
555 default_config = BpsConfig({}, defaults=BPS_DEFAULTS)
556 wms_service = default_config["wmsServiceClass"]
557 cancel(wms_service, run_id, user, require_bps, pass_thru, is_global=is_global)
560def ping_driver(wms_service=None, pass_thru=None):
561 """Check whether WMS services are up, reachable, and any authentication,
562 if needed, succeeds.
564 The services to be checked are those needed for submit, report, cancel,
565 restart, but ping cannot guarantee whether jobs would actually run
566 successfully.
568 Parameters
569 ----------
570 wms_service : `str`, optional
571 Name of the Workload Management System service class.
572 pass_thru : `str`, optional
573 Information to pass through to WMS.
575 Returns
576 -------
577 success : `int`
578 Whether services are up and usable (0) or not (non-zero).
579 """
580 if wms_service is None:
581 default_config = BpsConfig({}, defaults=BPS_DEFAULTS)
582 wms_service = default_config["wmsServiceClass"]
583 status, message = ping(wms_service, pass_thru)
585 if message:
586 if not status:
587 _LOG.info(message)
588 else:
589 _LOG.error(message)
591 # Log overall status message
592 if not status:
593 _LOG.info("Ping successful.")
594 else:
595 _LOG.error("Ping failed (%d).", status)
597 return status
600def submitcmd_driver(config_file: str, **kwargs) -> None:
601 """Submit a command for execution.
603 Parameters
604 ----------
605 config_file : `str`
606 Name of the configuration file.
607 **kwargs : `~typing.Any`
608 Additional modifiers to the configuration.
609 """
610 validators = [submit_path_validator, custom_job_validator]
611 _LOG.info("Initializing BPS configuration and creating submit directory")
612 with time_this(
613 log=_LOG,
614 level=logging.INFO,
615 prefix=None,
616 msg="BPS configuration initialized and submit directory created",
617 mem_usage=True,
618 mem_unit=DEFAULT_MEM_UNIT,
619 mem_fmt=DEFAULT_MEM_FMT,
620 ):
621 config = init_submission(config_file, validators=validators, **kwargs)
622 _log_mem_usage()
624 submit_path = config[".bps_defined.submitPath"]
626 _LOG.info("Starting construction stage (creating generic workflow)")
627 with time_this(
628 log=_LOG,
629 level=logging.INFO,
630 prefix=None,
631 msg="Construction stage completed",
632 mem_usage=True,
633 mem_unit=DEFAULT_MEM_UNIT,
634 mem_fmt=DEFAULT_MEM_FMT,
635 ):
636 generic_workflow, generic_workflow_config = construct(config)
637 _LOG.info("Generic workflow name '%s'", generic_workflow.name)
638 _log_mem_usage()
640 _, save_workflow = config.search("saveGenericWorkflow", opt={"default": False})
641 if save_workflow:
642 with open(os.path.join(submit_path, "bps_generic_workflow.pickle"), "wb") as outfh:
643 generic_workflow.save(outfh, "pickle")
644 _, save_dot = config.search("saveDot", opt={"default": False})
645 if save_dot:
646 with open(os.path.join(submit_path, "bps_generic_workflow.dot"), "w") as outfh:
647 generic_workflow.draw(outfh, "dot")
649 _LOG.info("Starting prepare stage (creating specific implementation of workflow)")
650 with time_this(
651 log=_LOG,
652 level=logging.INFO,
653 prefix=None,
654 msg="Prepare stage completed",
655 mem_usage=True,
656 mem_unit=DEFAULT_MEM_UNIT,
657 mem_fmt=DEFAULT_MEM_FMT,
658 ):
659 wms_workflow = prepare(generic_workflow_config, generic_workflow, submit_path)
660 _log_mem_usage()
662 wms_workflow_config = generic_workflow_config
664 if kwargs.get("dry_run", False):
665 return
667 _LOG.info("Starting submit stage")
668 with time_this(
669 log=_LOG,
670 level=logging.INFO,
671 prefix=None,
672 msg="Submit stage completed",
673 mem_usage=True,
674 mem_unit=DEFAULT_MEM_UNIT,
675 mem_fmt=DEFAULT_MEM_FMT,
676 ):
677 submit(wms_workflow_config, wms_workflow, **kwargs)
678 _log_mem_usage()
679 print(f"Run Id: {wms_workflow.run_id}")
680 print(f"Run Name: {wms_workflow.name}")
683def _log_mem_usage() -> None:
684 """Log memory usage."""
685 if _LOG.isEnabledFor(logging.INFO):
686 _LOG.info(
687 "Peak memory usage for bps process %s (main), %s (largest child process)",
688 *tuple(f"{val.to(DEFAULT_MEM_UNIT):{DEFAULT_MEM_FMT}}" for val in get_peak_mem_usage()),
689 )