Coverage for python / lsst / ctrl / bps / htcondor / dagman_configurator.py: 32%
47 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 09:05 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 09:05 +0000
1# This file is part of ctrl_bps_htcondor.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <https://www.gnu.org/licenses/>.
28"""Module enabling configuring DAGMan via submit YAML."""
30__all__ = ["DagmanConfigurator"]
32import logging
33import os
34from pathlib import Path
35from typing import Any
37import htcondor
38from pydantic import AliasGenerator, ConfigDict, create_model
40from lsst.ctrl.bps import BpsConfig
42from .lssthtc import HTCDag
44_LOG = logging.getLogger(__name__)
46# Extract DAGMan configuration options with their types and default values from
47# the local HTCondor configuration.
48#
49# Notes
50# -----
51# There are some DAGMan configuration options that names do not start with
52# ``DAGMAN_`` (e.g., ``MAX_DAGMAN_LOG``). Hence, do not use
53# ``key.startswith("DAGMAN_")``.
54_fields = {key.lower(): (type(val), val) for key, val in htcondor.param.items() if "DAGMAN_" in key}
56# Add some valid configuration options are not set by default by HTCondor and
57# are missing from ``htcondor.param``.
58#
59# Notes
60# -----
61# A complete list of configuration options HTCondor supports can be found in
62# ``src/condor_utils/param_info.in`` in
63# `HTCondor GitHub repository <https://github.com/htcondor/htcondor>`_.
64_fields.update(
65 {
66 "dagman_debug": (str, ""),
67 "dagman_node_record_info": (str, ""),
68 "dagman_record_machine_attrs": (str, ""),
69 }
70)
72# Dynamically create a Pydantic model encapsulating the DAGMan configuration
73# options gathered above.
74_DagmanOptions = create_model(
75 "DagmanOptions",
76 __config__=ConfigDict(
77 alias_generator=AliasGenerator(
78 serialization_alias=lambda name: name.upper(),
79 ),
80 extra="allow",
81 serialize_by_alias=True,
82 ),
83 **_fields,
84)
87class DagmanConfigurator:
88 """Class responsible for setting WMS-specific configuration options.
90 Parameters
91 ----------
92 config : `lsst.ctrl.bps.BpsConfig`
93 BPS configuration.
94 search_opts : `dict` [`str`, `~typing.Any`], optional
95 Options to use while searching the BPS configuration for values.
97 Raises
98 ------
99 KeyError
100 Raised if DAGMan configuration is missing from the BPS configuration.
101 """
103 def __init__(self, config: BpsConfig, search_opts: dict[str, Any] | None = None) -> None:
104 if search_opts is None:
105 search_opts = {}
106 _, site = config.search("computeSite", search_opts)
107 if site:
108 search_opts["curvals"] = {"curr_site": site}
109 _, wms_config = config.search("wmsConfig", search_opts)
110 if not wms_config:
111 raise KeyError("WMS-specific configuration not found")
112 self._options = _DagmanOptions.model_validate({key.lower(): val for key, val in wms_config.items()})
113 if self._options.model_extra:
114 unknown_opts = [key.upper() for key in self._options.model_extra]
115 _LOG.warning(
116 "The following WMS-specific config options were not recognized and will be ignored: %s.",
117 ", ".join(unknown_opts),
118 )
119 self.config_path: Path | None = None
120 self.prefix: Path | None = None
122 @property
123 def options(self) -> dict[str, Any]:
124 """DAGMan configuration options set via BPS (`dict` [`str`, `Any`])."""
125 return {
126 key: val
127 for key, val in self._options.model_dump(exclude_unset=True).items()
128 if key not in self._options.model_extra
129 }
131 def prepare(self, filename: os.PathLike | str, prefix: os.PathLike | str | None) -> None:
132 """Write WMS-specific configuration to a file.
134 Parameters
135 ----------
136 filename : `str`, optional
137 Name of the file to use when creating the DAG configuration.
138 prefix : `pathlib.Path` | `str`, optional
139 Directory in which to output the DAG configuration file. If not
140 provided, the script will be written to the current directory.
142 Raises
143 ------
144 OSError
145 Raised if the configuration file cannot be created.
146 """
147 if prefix:
148 self.prefix = Path(prefix)
149 self.config_path = self.prefix / filename if self.prefix else Path(filename)
150 try:
151 self.config_path.parent.mkdir(parents=True, exist_ok=True)
152 except OSError as exc:
153 _LOG.error(
154 "Could not write WMS-specific configuration file '%s': %s",
155 self.config_path,
156 exc.strerror,
157 )
158 raise
160 # Populate the DAG configuration file only with options that were
161 # explicitly set in the BPS configuration.
162 #
163 # Notes
164 # -----
165 # The Pydantic model we are using to represent the DAGMan configuration
166 # options allows for extra fields. However, it seems that
167 # BaseModel.model_dump() does not support excluding these fields during
168 # serialization at the moment (Pydantic ver. 2.12), so we have to do it
169 # manually.
170 self.config_path.write_text("\n".join(f"{key} = {val}" for key, val in self.options.items()))
172 def configure(self, dag: HTCDag) -> None:
173 """Add DAG configuration file to the workflow.
175 Parameters
176 ----------
177 dag : `lsst.ctrl.bps.htcondor.HTCDag`
178 HTCondor DAG.
180 Raises
181 ------
182 RuntimeError
183 Raised if the prepare step was omitted.
185 Notes
186 -----
187 The path to the DAG configuration is added as a DAG attribute named
188 ``bps_wms_config_path``. The stored path is relative to the prefix.
189 """
190 if self.config_path is None:
191 raise RuntimeError(
192 f"cannot add WMS-specific configuration to the workflow: file does not exist. "
193 f"(hint: run {type(self).__qualname__}.prepare() to create it)"
194 )
195 config_path = self.config_path.relative_to(self.prefix) if self.prefix else self.config_path
196 dag.add_attribs({"bps_wms_config_path": str(config_path)})