Coverage for python / lsst / ctrl / bps / htcondor / provisioner.py: 17%
59 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:50 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:50 +0000
1# This file is part of ctrl_bps_htcondor.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <https://www.gnu.org/licenses/>.
28"""Module enabling provisioning resources during workflow execution."""
30__all__ = ["Provisioner"]
32import logging
33from pathlib import Path
34from typing import Any
36from lsst.ctrl.bps import BpsConfig
38from .lssthtc import HTCDag, HTCJob
40_LOG = logging.getLogger(__name__)
43class Provisioner:
44 """Class responsible for enabling provisioning necessary resources.
46 Parameters
47 ----------
48 config : `lsst.ctr.bps.BpsConfig`
49 BPS configuration.
50 search_opts : `dict` [`str`, `object`], optional
51 Options to use while searching the BPS configuration for values.
52 """
54 def __init__(self, config: BpsConfig, search_opts: dict[str, Any] | None = None) -> None:
55 self.config: BpsConfig = config
56 self.search_opts: dict[str, Any] = {
57 "expandVars": True,
58 "searchobj": self.config[".provisioning"],
59 "required": True,
60 }
61 if search_opts is not None:
62 self.search_opts |= search_opts
63 self.script_name: Path | None = None
64 self.script_file: Path | None = None
65 self.is_configured: bool = False
66 self.is_prepared: bool = False
68 def configure(self) -> None:
69 """Create the configuration file for the provisioning script.
71 The content of the configuration file for the provisioning script
72 must be specified by ``provisioningScriptConfig`` setting in the BPS
73 config and its location by ``provisioningScriptConfigPath``,
74 respectively.
76 The method is effectively a no-op if *any* of the conditions below
77 is met:
79 1. the value of the ``provisioningScriptConfig`` is an empty string
80 (in such a case the class assumes that the provisioning script
81 does not require any configuration file),
82 2. the file specified by ``provisioningScriptConfigPath`` already
83 exists.
85 Raises
86 ------
87 OSError
88 Raised if the configuration file cannot be created.
89 """
90 search_opts = self.search_opts | {"expandEnvVars": True}
92 _, script_config_path = self.config.search("provisioningScriptConfigPath", opt=search_opts)
93 script_config_path = Path(script_config_path)
94 if script_config_path.is_file():
95 _LOG.info(
96 "Configuration file for the provisioning script '%s' already exists; "
97 "no further actions required",
98 script_config_path,
99 )
100 else:
101 _, script_config_content = self.config.search("provisioningScriptConfig", opt=search_opts)
102 if script_config_content:
103 _LOG.info(
104 "Configuration file for provisioning script '%s' not found; "
105 "creating a new one using the content specified by 'provisioningScriptConfig' setting",
106 script_config_path,
107 )
109 # If necessary, create directory that will hold the script's
110 # configuration file.
111 prefix = script_config_path.parent
112 try:
113 prefix.mkdir(parents=True, exist_ok=True)
114 except OSError as exc:
115 _LOG.error(
116 "Cannot create configuration file for the provisioning script '%s': %s",
117 script_config_path,
118 exc.strerror,
119 )
120 raise
122 script_config_path.write_text(script_config_content)
123 else:
124 _LOG.info(
125 "Configuration for the provisioning script not provided; "
126 "assuming the provisioning script requires no configuration file"
127 )
128 self.is_configured = True
130 def prepare(self, filename: Path | str, prefix: Path | str = None) -> None:
131 """Create the script responsible for provisioning resources.
133 The script is created based on the content defined by
134 the ``provisioningScript`` setting in the BPS configuration.
136 Parameters
137 ----------
138 filename : `pathlib.Path` | `str`
139 Name of the file to use when creating the provisioning script.
140 prefix : `pathlib.Path` | `str`, optional
141 Directory in which to output the provisioning script. If not
142 provided, the script will be written to the current directory.
144 Raises
145 ------
146 RuntimeError
147 Raised if the configuration step was omitted.
148 """
149 if not self.is_configured:
150 raise RuntimeError(
151 f"Cannot create provisioning script: run {type(self).__qualname__}.configure() first"
152 )
154 self.script_name = Path(filename)
155 self.script_file = Path(prefix) / self.script_name if prefix else self.script_name
157 search_opts = self.search_opts | {"expandEnvVars": False}
158 _, script_content = self.config.search("provisioningScript", opt=search_opts)
160 _LOG.debug("Writing provisioning script to %s", self.script_file)
161 with open(self.script_file, mode="w", encoding="utf8") as file:
162 file.write(script_content)
163 self.script_file.chmod(0o755)
165 self.is_prepared = True
167 def provision(self, dag: HTCDag) -> None:
168 """Add the provisioning job to the HTCondor workflow.
170 Parameters
171 ----------
172 dag : `lsst.ctrl.bps.htcondor.HTCDag`
173 HTCondor DAG.
175 Raises
176 ------
177 RuntimeError
178 Raised if the prepare step was omitted.
179 """
180 if not self.is_prepared:
181 raise RuntimeError(
182 f"Cannot add provisioning job to the workflow: "
183 f"run {type(self).__qualname__}.prepare() to create it"
184 )
186 name = self.script_name.stem
187 job = HTCJob(name=name, label=name)
188 job.subfile = Path("jobs") / job.label / f"{name}.sub"
189 job.add_job_attrs({"bps_job_name": job.name, "bps_job_label": job.label, "bps_job_quanta": ""})
190 cmds = {
191 "universe": "local",
192 "executable": f"{self.script_name}",
193 "should_transfer_files": "NO",
194 "getenv": "True",
195 }
196 cmds |= {
197 "output": str(job.subfile.with_suffix(".$(Cluster).out")),
198 "error": str(job.subfile.with_suffix(".$(Cluster).out")),
199 "log": str(job.subfile.with_suffix(".$(Cluster).log")),
200 }
201 job.add_job_cmds(cmds)
203 dag.add_service_job(job)
204 dag.add_attribs({"bps_provisioning_job": job.label})