Coverage for python / lsst / ctrl / bps / initialize.py: 19%

59 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-01 08:22 +0000

1# This file is part of ctrl_bps. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <https://www.gnu.org/licenses/>. 

27 

28"""Driver for initializing a BPS submission.""" 

29 

30__all__ = [ 

31 "custom_job_validator", 

32 "init_submission", 

33 "out_collection_validator", 

34 "output_run_validator", 

35 "submit_path_validator", 

36] 

37 

38import getpass 

39import logging 

40import re 

41import shutil 

42from collections.abc import Callable, Iterable 

43 

44from lsst.ctrl.bps import BPS_DEFAULTS, BPS_SEARCH_ORDER, BpsConfig 

45from lsst.ctrl.bps.bps_utils import _dump_env_info, _dump_pkg_info, mkdir 

46from lsst.pipe.base import Instrument 

47from lsst.utils import doImport 

48 

49_LOG = logging.getLogger(__name__) 

50 

51 

52def init_submission( 

53 config_file: str, validators: Iterable[Callable[[BpsConfig], None]] = (), **kwargs 

54) -> BpsConfig: 

55 """Initialize BPS configuration and create submit directory. 

56 

57 Parameters 

58 ---------- 

59 config_file : `str` 

60 Name of the configuration file. 

61 validators : `~collections.abc.Iterable` \ 

62 [`~collections.abc.Callable` [[`BpsConfig`], `None`]], optional 

63 A list of functions performing checks on the given configuration. 

64 Each function should take a single argument, a BpsConfig object, and 

65 raise if the check fails. By default, no checks are performed. 

66 **kwargs : `~typing.Any` 

67 Additional modifiers to the configuration. 

68 

69 Returns 

70 ------- 

71 config : `lsst.ctrl.bps.BpsConfig` 

72 Batch Processing Service configuration. 

73 """ 

74 config = BpsConfig( 

75 config_file, 

76 search_order=BPS_SEARCH_ORDER, 

77 defaults=BPS_DEFAULTS, 

78 wms_service_class_fqn=kwargs.get("wms_service"), 

79 ) 

80 

81 # Override config with command-line values. 

82 # Handle diffs between pipetask argument names vs bps yaml 

83 translation = { 

84 "input": "inCollection", 

85 "output_run": "outputRun", 

86 "qgraph": "qgraphFile", 

87 "pipeline": "pipelineYaml", 

88 "wms_service": "wmsServiceClass", 

89 "compute_site": "computeSite", 

90 } 

91 for key, value in kwargs.items(): 

92 # Don't want to override config with None or empty string values. 

93 if value: 

94 # pipetask argument parser converts some values to list, 

95 # but bps will want string. 

96 if not isinstance(value, str) and isinstance(value, Iterable): 

97 value = ",".join(value) 

98 new_key = translation.get(key, re.sub(r"_(\S)", lambda match: match.group(1).upper(), key)) 

99 config[f".bps_cmdline.{new_key}"] = value 

100 

101 # Run validation tests on the given config if any. 

102 for validator in validators: 

103 validator(config) 

104 

105 # Set some initial values 

106 config[".bps_defined.timestamp"] = Instrument.makeCollectionTimestamp() 

107 

108 if "operator" not in config: 

109 config[".bps_defined.operator"] = getpass.getuser() 

110 

111 if "uniqProcName" not in config: 

112 config[".bps_defined.uniqProcName"] = config["outputRun"].replace("/", "_") 

113 

114 # If requested, run WMS plugin checks early in submission process to 

115 # ensure WMS has what it will need for prepare() or submit(). 

116 if kwargs.get("runWmsSubmissionChecks", False): 

117 found, wms_class = config.search("wmsServiceClass") 

118 if not found: 

119 raise KeyError("Missing wmsServiceClass in bps config. Aborting.") 

120 

121 # Check that can import wms service class. 

122 wms_service_class = doImport(wms_class) 

123 wms_service = wms_service_class(config) 

124 

125 try: 

126 wms_service.run_submission_checks() 

127 except NotImplementedError: 

128 # Allow various plugins to implement only when needed to do extra 

129 # checks. 

130 _LOG.debug("run_submission_checks is not implemented in %s.", wms_class) 

131 else: 

132 _LOG.debug("Skipping submission checks.") 

133 

134 # Replace all bpsGenerateConfig 

135 config.generate_config() 

136 

137 # Make submit directory to contain all outputs. 

138 submit_path = mkdir(config["submitPath"]) 

139 config[".bps_defined.submitPath"] = str(submit_path) 

140 

141 # save copy of configs (orig and expanded config) 

142 shutil.copy2(config_file, submit_path) 

143 with open(f"{submit_path}/{config['uniqProcName']}_config.yaml", "w") as fh: 

144 config.dump(fh) 

145 

146 # Dump information about runtime environment and software versions in use. 

147 _dump_env_info(f"{submit_path}/{config['uniqProcName']}.env.info.yaml") 

148 _dump_pkg_info(f"{submit_path}/{config['uniqProcName']}.pkg.info.yaml") 

149 

150 return config 

151 

152 

153def output_run_validator(config: BpsConfig) -> None: 

154 """Check if 'outputRun' is specified in BPS config. 

155 

156 Parameters 

157 ---------- 

158 config : `BpsConfig` 

159 BPS configuration that needs to be validated. 

160 

161 Raises 

162 ------ 

163 KeyError 

164 Raised if 'outputRun' is not specified in the BPS configuration. 

165 """ 

166 if "outputRun" not in config: 

167 raise KeyError("Must specify the output run collection using 'outputRun'") 

168 

169 

170def submit_path_validator(config: BpsConfig) -> None: 

171 """Check if 'submitPath' is specified in BPS config. 

172 

173 Parameters 

174 ---------- 

175 config : `BpsConfig` 

176 BPS configuration that needs to be validated. 

177 

178 Raises 

179 ------ 

180 KeyError 

181 Raised if 'submitPath' is not specified in the BPS configuration. 

182 """ 

183 if "submitPath" not in config: 

184 raise KeyError("Must specify the submit-side run directory using 'submitPath'") 

185 

186 

187def out_collection_validator(config: BpsConfig) -> None: 

188 """Check if 'outCollection' is *not* specified in BPS config. 

189 

190 Parameters 

191 ---------- 

192 config : `BpsConfig` 

193 BPS configuration that needs to be validated. 

194 

195 Raises 

196 ------ 

197 KeyError 

198 Raised if 'outCollection' *is* specified in the BPS configuration. 

199 """ 

200 if "outCollection" in config: 

201 raise KeyError("'outCollection' is deprecated. Replace all references to it with 'outputRun'.") 

202 

203 

204def custom_job_validator(config: BpsConfig) -> None: 

205 """Check if 'customJob' is specified in BPS config. 

206 

207 Parameters 

208 ---------- 

209 config : `BpsConfig` 

210 BPS configuration that needs to be validated. 

211 

212 Raises 

213 ------ 

214 KeyError 

215 Raised if 'customJob' is not specified in the BPS configuration. 

216 """ 

217 if "customJob" not in config and "executable" not in config["customJob"]: 

218 raise KeyError("Must specify the details of the script to execute using 'customJob'.")