Coverage for python / lsst / ctrl / bps / htcondor / dagman_configurator.py: 32%

47 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-18 08:49 +0000

1# This file is part of ctrl_bps_htcondor. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <https://www.gnu.org/licenses/>. 

27 

28"""Module enabling configuring DAGMan via submit YAML.""" 

29 

30__all__ = ["DagmanConfigurator"] 

31 

32import logging 

33import os 

34from pathlib import Path 

35from typing import Any 

36 

37import htcondor 

38from pydantic import AliasGenerator, ConfigDict, create_model 

39 

40from lsst.ctrl.bps import BpsConfig 

41 

42from .lssthtc import HTCDag 

43 

44_LOG = logging.getLogger(__name__) 

45 

46# Extract DAGMan configuration options with their types and default values from 

47# the local HTCondor configuration. 

48# 

49# Notes 

50# ----- 

51# There are some DAGMan configuration options that names do not start with 

52# ``DAGMAN_`` (e.g., ``MAX_DAGMAN_LOG``). Hence, do not use 

53# ``key.startswith("DAGMAN_")``. 

54_fields = {key.lower(): (type(val), val) for key, val in htcondor.param.items() if "DAGMAN_" in key} 

55 

56# Add some valid configuration options are not set by default by HTCondor and 

57# are missing from ``htcondor.param``. 

58# 

59# Notes 

60# ----- 

61# A complete list of configuration options HTCondor supports can be found in 

62# ``src/condor_utils/param_info.in`` in 

63# `HTCondor GitHub repository <https://github.com/htcondor/htcondor>`_. 

64_fields.update( 

65 { 

66 "dagman_debug": (str, ""), 

67 "dagman_node_record_info": (str, ""), 

68 "dagman_record_machine_attrs": (str, ""), 

69 } 

70) 

71 

72# Dynamically create a Pydantic model encapsulating the DAGMan configuration 

73# options gathered above. 

74_DagmanOptions = create_model( 

75 "DagmanOptions", 

76 __config__=ConfigDict( 

77 alias_generator=AliasGenerator( 

78 serialization_alias=lambda name: name.upper(), 

79 ), 

80 extra="allow", 

81 serialize_by_alias=True, 

82 ), 

83 **_fields, 

84) 

85 

86 

87class DagmanConfigurator: 

88 """Class responsible for setting WMS-specific configuration options. 

89 

90 Parameters 

91 ---------- 

92 config : `lsst.ctrl.bps.BpsConfig` 

93 BPS configuration. 

94 search_opts : `dict` [`str`, `~typing.Any`], optional 

95 Options to use while searching the BPS configuration for values. 

96 

97 Raises 

98 ------ 

99 KeyError 

100 Raised if DAGMan configuration is missing from the BPS configuration. 

101 """ 

102 

103 def __init__(self, config: BpsConfig, search_opts: dict[str, Any] | None = None) -> None: 

104 if search_opts is None: 

105 search_opts = {} 

106 _, site = config.search("computeSite", search_opts) 

107 if site: 

108 search_opts["curvals"] = {"curr_site": site} 

109 _, wms_config = config.search("wmsConfig", search_opts) 

110 if not wms_config: 

111 raise KeyError("WMS-specific configuration not found") 

112 self._options = _DagmanOptions.model_validate({key.lower(): val for key, val in wms_config.items()}) 

113 if self._options.model_extra: 

114 unknown_opts = [key.upper() for key in self._options.model_extra] 

115 _LOG.warning( 

116 "The following WMS-specific config options were not recognized and will be ignored: %s.", 

117 ", ".join(unknown_opts), 

118 ) 

119 self.config_path: Path | None = None 

120 self.prefix: Path | None = None 

121 

122 @property 

123 def options(self) -> dict[str, Any]: 

124 """DAGMan configuration options set via BPS (`dict` [`str`, `Any`]).""" 

125 return { 

126 key: val 

127 for key, val in self._options.model_dump(exclude_unset=True).items() 

128 if key not in self._options.model_extra 

129 } 

130 

131 def prepare(self, filename: os.PathLike | str, prefix: os.PathLike | str | None) -> None: 

132 """Write WMS-specific configuration to a file. 

133 

134 Parameters 

135 ---------- 

136 filename : `str`, optional 

137 Name of the file to use when creating the DAG configuration. 

138 prefix : `pathlib.Path` | `str`, optional 

139 Directory in which to output the DAG configuration file. If not 

140 provided, the script will be written to the current directory. 

141 

142 Raises 

143 ------ 

144 OSError 

145 Raised if the configuration file cannot be created. 

146 """ 

147 if prefix: 

148 self.prefix = Path(prefix) 

149 self.config_path = self.prefix / filename if self.prefix else Path(filename) 

150 try: 

151 self.config_path.parent.mkdir(parents=True, exist_ok=True) 

152 except OSError as exc: 

153 _LOG.error( 

154 "Could not write WMS-specific configuration file '%s': %s", 

155 self.config_path, 

156 exc.strerror, 

157 ) 

158 raise 

159 

160 # Populate the DAG configuration file only with options that were 

161 # explicitly set in the BPS configuration. 

162 # 

163 # Notes 

164 # ----- 

165 # The Pydantic model we are using to represent the DAGMan configuration 

166 # options allows for extra fields. However, it seems that 

167 # BaseModel.model_dump() does not support excluding these fields during 

168 # serialization at the moment (Pydantic ver. 2.12), so we have to do it 

169 # manually. 

170 self.config_path.write_text("\n".join(f"{key} = {val}" for key, val in self.options.items())) 

171 

172 def configure(self, dag: HTCDag) -> None: 

173 """Add DAG configuration file to the workflow. 

174 

175 Parameters 

176 ---------- 

177 dag : `lsst.ctrl.bps.htcondor.HTCDag` 

178 HTCondor DAG. 

179 

180 Raises 

181 ------ 

182 RuntimeError 

183 Raised if the prepare step was omitted. 

184 

185 Notes 

186 ----- 

187 The path to the DAG configuration is added as a DAG attribute named 

188 ``bps_wms_config_path``. The stored path is relative to the prefix. 

189 """ 

190 if self.config_path is None: 

191 raise RuntimeError( 

192 f"cannot add WMS-specific configuration to the workflow: file does not exist. " 

193 f"(hint: run {type(self).__qualname__}.prepare() to create it)" 

194 ) 

195 config_path = self.config_path.relative_to(self.prefix) if self.prefix else self.config_path 

196 dag.add_attribs({"bps_wms_config_path": str(config_path)})