Coverage for python / lsst / ctrl / bps / parsl / configuration.py: 19%
36 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:24 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:24 +0000
1# This file is part of ctrl_bps_parsl.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org) and the LSST DESC (https://www.lsstdesc.org/).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <https://www.gnu.org/licenses/>.
28import logging
29import os
30from typing import Any, Literal, TypeVar, overload
32from lsst.ctrl.bps import BpsConfig
34__all__ = (
35 "get_bps_config_value",
36 "get_workflow_filename",
37 "get_workflow_name",
38 "set_parsl_logging",
39)
42T = TypeVar("T")
45# Default provided, not required
46@overload
47def get_bps_config_value(
48 config: BpsConfig,
49 key: str,
50 dataType: type[T],
51 default: T,
52) -> T: ...
55# No default, but required
56@overload
57def get_bps_config_value(
58 config: BpsConfig,
59 key: str,
60 dataType: type[T],
61 default: T | None = None,
62 *,
63 required: Literal[True],
64) -> T: ...
67# No default, not required
68@overload
69def get_bps_config_value(
70 config: BpsConfig,
71 key: str,
72 dataType: type[T],
73 default: T | None = None,
74) -> T | None: ...
77def get_bps_config_value(
78 config: BpsConfig,
79 key: str,
80 dataType: type[T],
81 default: T | None = None,
82 *,
83 required: bool = False,
84) -> T | None:
85 """Get a value from the BPS configuration.
87 I find this more useful than ``BpsConfig.__getitem__`` or
88 ``BpsConfig.get``.
90 Parameters
91 ----------
92 config : `lsst.ctrl.bps.BpsConfig`
93 Configuration from which to retrieve value.
94 key : `str`
95 Key name.
96 dataType : `type`
97 We require that the returned value have this type.
98 default : `typing.Any`, optional
99 Default value to be provided if ``key`` doesn't exist in the
100 ``config``. A default value of `None` means that there is no default.
101 required : `bool`, optional
102 If `True`, the returned value may come from the configuration or from
103 the default, but it may not be `None`.
105 Returns
106 -------
107 `type` or `None`
108 Value for ``key`` in the ``config`` if it exists, otherwise
109 ``default``, if provided.
111 Raises
112 ------
113 KeyError
114 Raised if ``key`` is not in ``config`` and no default is provided but
115 a value is ``required``.
116 RuntimeError
117 Raised if the value is not set or is of the wrong type.
118 """
119 options: dict[str, Any] = {"expandEnvVars": True, "replaceVars": True, "required": required}
120 if default is not None:
121 options["default"] = default
122 found, value = config.search(key, options)
123 if not found and default is None:
124 if required:
125 raise KeyError(f"No value found for {key} and no default provided")
126 return None
127 if not isinstance(value, dataType):
128 raise RuntimeError(f"Configuration value {key}={value} is not of type {dataType}")
129 return value
132def get_workflow_name(config: BpsConfig) -> str:
133 """Get name of this workflow.
135 The workflow name is constructed by joining the ``project`` and
136 ``campaign`` (if set; otherwise ``operator``) entries in the BPS
137 configuration.
139 If ``project`` is not set, then use ``uniqProcName`` as the workflow
140 name.
142 Parameters
143 ----------
144 config : `lsst.ctrl.bps.BpsConfig`
145 BPS configuration.
147 Returns
148 -------
149 name : `str`
150 Workflow name.
151 """
152 project = get_bps_config_value(config, "project", str, None)
154 if project is None:
155 return get_bps_config_value(config, "uniqProcName", str, required=True)
157 campaign = get_bps_config_value(
158 config, "campaign", str, get_bps_config_value(config, "operator", str, required=True)
159 )
160 return f"{project}.{campaign}"
163def get_workflow_filename(out_prefix: str) -> str:
164 """Get filename for persisting workflow.
166 Parameters
167 ----------
168 out_prefix : `str`
169 Directory which should contain workflow file.
171 Returns
172 -------
173 filename : `str`
174 Filename for persisting workflow.
175 """
176 return os.path.join(out_prefix, "parsl_workflow.pickle")
179def set_parsl_logging(config: BpsConfig) -> int:
180 """Set parsl logging levels.
182 The logging level is set by the ``parsl.log_level`` entry in the BPS
183 configuration.
185 Parameters
186 ----------
187 config : `lsst.ctrl.bps.BpsConfig`
188 BPS configuration.
190 Returns
191 -------
192 level : `int`
193 Logging level applied to ``parsl`` loggers.
194 """
195 level_name = get_bps_config_value(config, ".parsl.log_level", str, "INFO")
196 if level_name not in ("CRITICAL", "DEBUG", "ERROR", "FATAL", "INFO", "WARN"):
197 raise RuntimeError(f"Unrecognised parsl.log_level: {level_name}")
198 level: int = getattr(logging, level_name)
199 for name in logging.root.manager.loggerDict:
200 if name.startswith("parsl"):
201 logging.getLogger(name).setLevel(level)
202 logging.getLogger("database_manager").setLevel(logging.INFO)
203 return level