Coverage for python / lsst / ctrl / bps / htcondor / provisioner.py: 17%

59 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-06 08:36 +0000

1# This file is part of ctrl_bps_htcondor. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <https://www.gnu.org/licenses/>. 

27 

28"""Module enabling provisioning resources during workflow execution.""" 

29 

30__all__ = ["Provisioner"] 

31 

32import logging 

33from pathlib import Path 

34from typing import Any 

35 

36from lsst.ctrl.bps import BpsConfig 

37 

38from .lssthtc import HTCDag, HTCJob 

39 

40_LOG = logging.getLogger(__name__) 

41 

42 

43class Provisioner: 

44 """Class responsible for enabling provisioning necessary resources. 

45 

46 Parameters 

47 ---------- 

48 config : `lsst.ctr.bps.BpsConfig` 

49 BPS configuration. 

50 search_opts : `dict` [`str`, `object`], optional 

51 Options to use while searching the BPS configuration for values. 

52 """ 

53 

54 def __init__(self, config: BpsConfig, search_opts: dict[str, Any] | None = None) -> None: 

55 self.config: BpsConfig = config 

56 self.search_opts: dict[str, Any] = { 

57 "expandVars": True, 

58 "searchobj": self.config[".provisioning"], 

59 "required": True, 

60 } 

61 if search_opts is not None: 

62 self.search_opts |= search_opts 

63 self.script_name: Path | None = None 

64 self.script_file: Path | None = None 

65 self.is_configured: bool = False 

66 self.is_prepared: bool = False 

67 

68 def configure(self) -> None: 

69 """Create the configuration file for the provisioning script. 

70 

71 The content of the configuration file for the provisioning script 

72 must be specified by ``provisioningScriptConfig`` setting in the BPS 

73 config and its location by ``provisioningScriptConfigPath``, 

74 respectively. 

75 

76 The method is effectively a no-op if *any* of the conditions below 

77 is met: 

78 

79 1. the value of the ``provisioningScriptConfig`` is an empty string 

80 (in such a case the class assumes that the provisioning script 

81 does not require any configuration file), 

82 2. the file specified by ``provisioningScriptConfigPath`` already 

83 exists. 

84 

85 Raises 

86 ------ 

87 OSError 

88 Raised if the configuration file cannot be created. 

89 """ 

90 search_opts = self.search_opts | {"expandEnvVars": True} 

91 

92 _, script_config_path = self.config.search("provisioningScriptConfigPath", opt=search_opts) 

93 script_config_path = Path(script_config_path) 

94 if script_config_path.is_file(): 

95 _LOG.info( 

96 "Configuration file for the provisioning script '%s' already exists; " 

97 "no further actions required", 

98 script_config_path, 

99 ) 

100 else: 

101 _, script_config_content = self.config.search("provisioningScriptConfig", opt=search_opts) 

102 if script_config_content: 

103 _LOG.info( 

104 "Configuration file for provisioning script '%s' not found; " 

105 "creating a new one using the content specified by 'provisioningScriptConfig' setting", 

106 script_config_path, 

107 ) 

108 

109 # If necessary, create directory that will hold the script's 

110 # configuration file. 

111 prefix = script_config_path.parent 

112 try: 

113 prefix.mkdir(parents=True, exist_ok=True) 

114 except OSError as exc: 

115 _LOG.error( 

116 "Cannot create configuration file for the provisioning script '%s': %s", 

117 script_config_path, 

118 exc.strerror, 

119 ) 

120 raise 

121 

122 script_config_path.write_text(script_config_content) 

123 else: 

124 _LOG.info( 

125 "Configuration for the provisioning script not provided; " 

126 "assuming the provisioning script requires no configuration file" 

127 ) 

128 self.is_configured = True 

129 

130 def prepare(self, filename: Path | str, prefix: Path | str = None) -> None: 

131 """Create the script responsible for provisioning resources. 

132 

133 The script is created based on the content defined by 

134 the ``provisioningScript`` setting in the BPS configuration. 

135 

136 Parameters 

137 ---------- 

138 filename : `pathlib.Path` | `str` 

139 Name of the file to use when creating the provisioning script. 

140 prefix : `pathlib.Path` | `str`, optional 

141 Directory in which to output the provisioning script. If not 

142 provided, the script will be written to the current directory. 

143 

144 Raises 

145 ------ 

146 RuntimeError 

147 Raised if the configuration step was omitted. 

148 """ 

149 if not self.is_configured: 

150 raise RuntimeError( 

151 f"Cannot create provisioning script: run {type(self).__qualname__}.configure() first" 

152 ) 

153 

154 self.script_name = Path(filename) 

155 self.script_file = Path(prefix) / self.script_name if prefix else self.script_name 

156 

157 search_opts = self.search_opts | {"expandEnvVars": False} 

158 _, script_content = self.config.search("provisioningScript", opt=search_opts) 

159 

160 _LOG.debug("Writing provisioning script to %s", self.script_file) 

161 with open(self.script_file, mode="w", encoding="utf8") as file: 

162 file.write(script_content) 

163 self.script_file.chmod(0o755) 

164 

165 self.is_prepared = True 

166 

167 def provision(self, dag: HTCDag) -> None: 

168 """Add the provisioning job to the HTCondor workflow. 

169 

170 Parameters 

171 ---------- 

172 dag : `lsst.ctrl.bps.htcondor.HTCDag` 

173 HTCondor DAG. 

174 

175 Raises 

176 ------ 

177 RuntimeError 

178 Raised if the prepare step was omitted. 

179 """ 

180 if not self.is_prepared: 

181 raise RuntimeError( 

182 f"Cannot add provisioning job to the workflow: " 

183 f"run {type(self).__qualname__}.prepare() to create it" 

184 ) 

185 

186 name = self.script_name.stem 

187 job = HTCJob(name=name, label=name) 

188 job.subfile = Path("jobs") / job.label / f"{name}.sub" 

189 job.add_job_attrs({"bps_job_name": job.name, "bps_job_label": job.label, "bps_job_quanta": ""}) 

190 cmds = { 

191 "universe": "local", 

192 "executable": f"{self.script_name}", 

193 "should_transfer_files": "NO", 

194 "getenv": "True", 

195 } 

196 cmds |= { 

197 "output": str(job.subfile.with_suffix(".$(Cluster).out")), 

198 "error": str(job.subfile.with_suffix(".$(Cluster).out")), 

199 "log": str(job.subfile.with_suffix(".$(Cluster).log")), 

200 } 

201 job.add_job_cmds(cmds) 

202 

203 dag.add_service_job(job) 

204 dag.add_attribs({"bps_provisioning_job": job.label})