Coverage for python / lsst / ctrl / bps / initialize.py: 19%
59 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:52 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:52 +0000
1# This file is part of ctrl_bps.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <https://www.gnu.org/licenses/>.
28"""Driver for initializing a BPS submission."""
30__all__ = [
31 "custom_job_validator",
32 "init_submission",
33 "out_collection_validator",
34 "output_run_validator",
35 "submit_path_validator",
36]
38import getpass
39import logging
40import re
41import shutil
42from collections.abc import Callable, Iterable
44from lsst.ctrl.bps import BPS_DEFAULTS, BPS_SEARCH_ORDER, BpsConfig
45from lsst.ctrl.bps.bps_utils import _dump_env_info, _dump_pkg_info, mkdir
46from lsst.pipe.base import Instrument
47from lsst.utils import doImport
49_LOG = logging.getLogger(__name__)
52def init_submission(
53 config_file: str, validators: Iterable[Callable[[BpsConfig], None]] = (), **kwargs
54) -> BpsConfig:
55 """Initialize BPS configuration and create submit directory.
57 Parameters
58 ----------
59 config_file : `str`
60 Name of the configuration file.
61 validators : `~collections.abc.Iterable` \
62 [`~collections.abc.Callable` [[`BpsConfig`], `None`]], optional
63 A list of functions performing checks on the given configuration.
64 Each function should take a single argument, a BpsConfig object, and
65 raise if the check fails. By default, no checks are performed.
66 **kwargs : `~typing.Any`
67 Additional modifiers to the configuration.
69 Returns
70 -------
71 config : `lsst.ctrl.bps.BpsConfig`
72 Batch Processing Service configuration.
73 """
74 config = BpsConfig(
75 config_file,
76 search_order=BPS_SEARCH_ORDER,
77 defaults=BPS_DEFAULTS,
78 wms_service_class_fqn=kwargs.get("wms_service"),
79 )
81 # Override config with command-line values.
82 # Handle diffs between pipetask argument names vs bps yaml
83 translation = {
84 "input": "inCollection",
85 "output_run": "outputRun",
86 "qgraph": "qgraphFile",
87 "pipeline": "pipelineYaml",
88 "wms_service": "wmsServiceClass",
89 "compute_site": "computeSite",
90 }
91 for key, value in kwargs.items():
92 # Don't want to override config with None or empty string values.
93 if value:
94 # pipetask argument parser converts some values to list,
95 # but bps will want string.
96 if not isinstance(value, str) and isinstance(value, Iterable):
97 value = ",".join(value)
98 new_key = translation.get(key, re.sub(r"_(\S)", lambda match: match.group(1).upper(), key))
99 config[f".bps_cmdline.{new_key}"] = value
101 # Run validation tests on the given config if any.
102 for validator in validators:
103 validator(config)
105 # Set some initial values
106 config[".bps_defined.timestamp"] = Instrument.makeCollectionTimestamp()
108 if "operator" not in config:
109 config[".bps_defined.operator"] = getpass.getuser()
111 if "uniqProcName" not in config:
112 config[".bps_defined.uniqProcName"] = config["outputRun"].replace("/", "_")
114 # If requested, run WMS plugin checks early in submission process to
115 # ensure WMS has what it will need for prepare() or submit().
116 if kwargs.get("runWmsSubmissionChecks", False):
117 found, wms_class = config.search("wmsServiceClass")
118 if not found:
119 raise KeyError("Missing wmsServiceClass in bps config. Aborting.")
121 # Check that can import wms service class.
122 wms_service_class = doImport(wms_class)
123 wms_service = wms_service_class(config)
125 try:
126 wms_service.run_submission_checks()
127 except NotImplementedError:
128 # Allow various plugins to implement only when needed to do extra
129 # checks.
130 _LOG.debug("run_submission_checks is not implemented in %s.", wms_class)
131 else:
132 _LOG.debug("Skipping submission checks.")
134 # Replace all bpsGenerateConfig
135 config.generate_config()
137 # Make submit directory to contain all outputs.
138 submit_path = mkdir(config["submitPath"])
139 config[".bps_defined.submitPath"] = str(submit_path)
141 # save copy of configs (orig and expanded config)
142 shutil.copy2(config_file, submit_path)
143 with open(f"{submit_path}/{config['uniqProcName']}_config.yaml", "w") as fh:
144 config.dump(fh)
146 # Dump information about runtime environment and software versions in use.
147 _dump_env_info(f"{submit_path}/{config['uniqProcName']}.env.info.yaml")
148 _dump_pkg_info(f"{submit_path}/{config['uniqProcName']}.pkg.info.yaml")
150 return config
153def output_run_validator(config: BpsConfig) -> None:
154 """Check if 'outputRun' is specified in BPS config.
156 Parameters
157 ----------
158 config : `BpsConfig`
159 BPS configuration that needs to be validated.
161 Raises
162 ------
163 KeyError
164 Raised if 'outputRun' is not specified in the BPS configuration.
165 """
166 if "outputRun" not in config:
167 raise KeyError("Must specify the output run collection using 'outputRun'")
170def submit_path_validator(config: BpsConfig) -> None:
171 """Check if 'submitPath' is specified in BPS config.
173 Parameters
174 ----------
175 config : `BpsConfig`
176 BPS configuration that needs to be validated.
178 Raises
179 ------
180 KeyError
181 Raised if 'submitPath' is not specified in the BPS configuration.
182 """
183 if "submitPath" not in config:
184 raise KeyError("Must specify the submit-side run directory using 'submitPath'")
187def out_collection_validator(config: BpsConfig) -> None:
188 """Check if 'outCollection' is *not* specified in BPS config.
190 Parameters
191 ----------
192 config : `BpsConfig`
193 BPS configuration that needs to be validated.
195 Raises
196 ------
197 KeyError
198 Raised if 'outCollection' *is* specified in the BPS configuration.
199 """
200 if "outCollection" in config:
201 raise KeyError("'outCollection' is deprecated. Replace all references to it with 'outputRun'.")
204def custom_job_validator(config: BpsConfig) -> None:
205 """Check if 'customJob' is specified in BPS config.
207 Parameters
208 ----------
209 config : `BpsConfig`
210 BPS configuration that needs to be validated.
212 Raises
213 ------
214 KeyError
215 Raised if 'customJob' is not specified in the BPS configuration.
216 """
217 if "customJob" not in config and "executable" not in config["customJob"]:
218 raise KeyError("Must specify the details of the script to execute using 'customJob'.")