Coverage for python / lsst / ctrl / bps / bps_config.py: 11%
181 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 09:00 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 09:00 +0000
1# This file is part of ctrl_bps.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <https://www.gnu.org/licenses/>.
28"""Configuration class that adds order to searching sections for value,
29expands environment variables and other config variables.
30"""
32__all__ = ["BPS_DEFAULTS", "BPS_SEARCH_ORDER", "BpsConfig", "BpsFormatter"]
35import copy
36import logging
37import os
38import re
39import string
40from os.path import expandvars, normpath
41from typing import Any
43from lsst.daf.butler import Config
44from lsst.resources import ResourcePath
45from lsst.utils import doImport
47from .bps_utils import bps_eval
49_LOG = logging.getLogger(__name__)
51# Using lsst.daf.butler.Config to resolve possible includes.
52BPS_DEFAULTS = Config(ResourcePath("resource://lsst.ctrl.bps/etc/bps_defaults.yaml")).toDict()
54BPS_SEARCH_ORDER = ["bps_cmdline", "payload", "cluster", "pipetask", "site", "cloud", "bps_defined"]
56# Need a string that won't be a valid default value
57# to indicate whether default was defined for search.
58# And None is a valid default value.
59_NO_SEARCH_DEFAULT_VALUE = "__NO_SEARCH_DEFAULT_VALUE__"
62class BpsFormatter(string.Formatter):
63 """String formatter class that allows BPS config search options."""
65 def get_field(self, field_name, args, kwargs):
66 _, val = args[0].search(field_name, opt=args[1])
67 return val, field_name
69 def get_value(self, key, args, kwargs):
70 _, val = args[0].search(key, opt=args[1])
71 return val
74class BpsConfig(Config):
75 """Contains the configuration for a BPS submission.
77 Parameters
78 ----------
79 other : `str`, `dict`, `~lsst.daf.butler.Config`, `BpsConfig`
80 Path to a YAML file or a dict/Config/BpsConfig containing configuration
81 to copy.
82 search_order : `list` [`str`], optional
83 Root section names in the order in which they should be searched.
84 defaults : `str`, `dict`, `~lsst.daf.butler.Config`, optional
85 Default settings that will be used to prepopulate the config.
86 If the WMS service default settings are available, they will be added
87 afterwards. WMS settings takes precedence over provided defaults.
88 wms_service_class_fqn : `str`, optional
89 Fully qualified name of the WMS service class to use to get plugin's
90 specific default settings. If `None` (default), the WMS service
91 class provided by
93 1. ``other`` config,
94 2. environmental variable ``BPS_WMS_SERVICE_CLASS``,
95 3. default settings
97 will be used instead. The list above also reflects the priorities
98 if the WMS service class is defined in multiple places. For example,
99 the name of service class found in ``other`` takes precedence over
100 the name of the service class provided by the BPS_SERVICE_CLASS and/or
101 the default settings.
103 Raises
104 ------
105 ValueError
106 Raised if the class cannot be instantiated from the provided object.
107 """
109 def __init__(self, other, search_order=None, defaults=None, wms_service_class_fqn=None):
110 # In BPS config, the same setting can be defined multiple times in
111 # different sections. The sections are search in a pre-defined
112 # order. Hence, a value which is found first effectively overrides
113 # values in later sections, if any. To achieve this goal,
114 # the special methods __getitem__ and __contains__ were redefined to
115 # use a custom search function internally. For this reason we can't
116 # use super().__init__(other) as the super class defines its own
117 # __getitem__ which is utilized during the initialization process (
118 # e.g. in expressions like self[<key>]). However, this function will
119 # be overridden by the one defined here, in the subclass. Instead
120 # we just initialize internal data structures and populate them
121 # using the inherited update() method which does not rely on super
122 # class __getitem__ method.
123 super().__init__()
125 try:
126 other_config = Config(other)
127 except Exception as exc:
128 raise ValueError(f"A BpsConfig could not be loaded from other: {other}") from exc
130 config = Config()
132 # Pre-populate the config with default settings if any were provided
133 # by the caller. Include WMS plugin specific defaults and/or
134 # overrides as well if available.
135 if defaults:
136 config.update(defaults)
138 # If the WMS service class was not specified explicitly by the
139 # caller, try to use the value provided by either:
140 #
141 # 1. 'other' config,
142 # 2. environmental variable BPS_WMS_SERVICE_CLASS,
143 # 3. default settings
144 #
145 # (in decreasing priority).
146 if wms_service_class_fqn is None:
147 wms_service_class_fqn = other_config.get(
148 "wmsServiceClass",
149 os.environ.get("BPS_WMS_SERVICE_CLASS", config.get("wmsServiceClass")),
150 )
151 try:
152 wms_service_class = doImport(wms_service_class_fqn)
153 except TypeError:
154 # Do not die if the WMS service class is still not set.
155 pass
156 else:
157 wms_service = wms_service_class({})
158 wms_defaults = wms_service.defaults
159 if wms_defaults:
160 config.update(wms_defaults)
162 # Set the service class to the one which was defaults was used.
163 config["wmsServiceClass"] = wms_service_class_fqn
165 # Include values and/or apply overrides from 'other' config.
166 config.update(other_config)
167 self.update(config)
169 if isinstance(other, BpsConfig):
170 self.formatter = copy.deepcopy(other.formatter)
171 self.search_order = copy.deepcopy(other.search_order) if search_order is None else search_order
172 else:
173 self.formatter = BpsFormatter()
174 self.search_order = BPS_SEARCH_ORDER if search_order is None else search_order
176 # Make sure search sections exist.
177 for key in self.search_order:
178 if not Config.__contains__(self, key):
179 self[key] = {}
181 def copy(self):
182 """Make a copy of config.
184 Returns
185 -------
186 copy : `lsst.ctrl.bps.BpsConfig`
187 A duplicate of itself.
188 """
189 return BpsConfig(self)
191 def get(self, key, default=""):
192 """Return the value for key if key is in the config, else default.
194 If default is not given, it defaults to an empty string.
196 Parameters
197 ----------
198 key : `str`
199 Key to look for in config.
200 default : `~typing.Any`, optional
201 Default value to return if the key is not in the config.
203 Returns
204 -------
205 val : Any
206 Value from config if found, default otherwise.
208 Notes
209 -----
210 The provided default value (an empty string) was chosen to maintain
211 the internal consistency with other methods of the class.
212 """
213 _, val = self.search(key, opt={"default": default})
214 return val
216 def __getitem__(self, name):
217 """Return the value from the config for the given name.
219 Parameters
220 ----------
221 name : `str`
222 Key to look for in config
224 Returns
225 -------
226 val : `str`, `int`, `lsst.ctrl.bps.BpsConfig`, ...
227 Value from config if found.
228 """
229 _, val = self.search(name, {})
231 return val
233 def __contains__(self, name):
234 """Check whether name is in config.
236 Parameters
237 ----------
238 name : `str`
239 Key to look for in config.
241 Returns
242 -------
243 found : `bool`
244 Whether name was in config or not.
245 """
246 found, _ = self.search(name, {})
247 return found
249 def search(self, key, opt=None):
250 """Search for key using given opt following hierarchy rules.
252 Search hierarchy rules: current values, a given search object, and
253 search order of config sections.
255 Parameters
256 ----------
257 key : `str`
258 Key to look for in config.
259 opt : `dict` [`str`, `~typing.Any`], optional
260 Options dictionary to use while searching. All are optional.
262 ``"curvals"``
263 Means to pass in values for search order key
264 (curr_<sectname>) or variable replacements.
265 (`dict`, optional)
266 ``"default"``
267 Value to return if not found. (`~typing.Any`, optional)
268 ``"replaceEnvVars"``
269 If search result is string, whether to replace environment
270 variables inside it with special placeholder (<ENV:name>).
271 By default set to False. (`bool`)
272 ``"expandEnvVars"``
273 If search result is string, whether to replace environment
274 variables inside it with current environment value.
275 By default set to False. (`bool`)
276 ``"replaceVars"``
277 If search result is string, whether to replace variables
278 inside it. By default set to True. (`bool`)
279 ``"required"``
280 If replacing variables, whether to raise exception if
281 variable is undefined. By default set to False. (`bool`)
283 Returns
284 -------
285 found : `bool`
286 Whether name was in config or not.
287 value : `str`, `int`, `lsst.ctrl.bps.BpsConfig`, ...
288 Value from config if found.
289 """
290 _LOG.debug("search: initial key = '%s', opt = '%s'", key, opt)
292 if opt is None:
293 opt = {}
295 found = False
296 value = ""
298 # start with stored current values
299 curvals = None
300 if Config.__contains__(self, "current"):
301 curvals = copy.deepcopy(Config.__getitem__(self, "current"))
302 else:
303 curvals = {}
305 # override with current values passed into function if given
306 if "curvals" in opt:
307 for ckey, cval in list(opt["curvals"].items()):
308 _LOG.debug("using specified curval %s = %s", ckey, cval)
309 curvals[ckey] = cval
311 _LOG.debug("curvals = %s", curvals)
313 # There's a problem with the searchobj being a BpsConfig
314 # and its handling of __getitem__. Until that part of
315 # BpsConfig is rewritten, force the searchobj to a Config.
316 if "searchobj" in opt:
317 opt["searchobj"] = Config(opt["searchobj"])
319 if key in curvals:
320 _LOG.debug("found %s in curvals", key)
321 found = True
322 value = curvals[key]
323 elif "searchobj" in opt and key in opt["searchobj"]:
324 found = True
325 value = opt["searchobj"][key]
326 else:
327 for sect in self.search_order:
328 if Config.__contains__(self, sect):
329 _LOG.debug("Searching '%s' section for key '%s'", sect, key)
330 search_sect = Config.__getitem__(self, sect)
331 if "curr_" + sect in curvals:
332 currkey = curvals["curr_" + sect]
333 _LOG.debug("currkey for section %s = %s", sect, currkey)
334 if Config.__contains__(search_sect, currkey):
335 search_sect = Config.__getitem__(search_sect, currkey)
337 _LOG.debug("%s %s", key, search_sect)
338 if Config.__contains__(search_sect, key):
339 found = True
340 value = Config.__getitem__(search_sect, key)
341 break
342 else:
343 _LOG.debug("Missing search section '%s' while searching for '%s'", sect, key)
345 # lastly check root values
346 if not found:
347 _LOG.debug("Searching root section for key '%s'", key)
348 if Config.__contains__(self, key):
349 found = True
350 value = Config.__getitem__(self, key)
351 _LOG.debug("root value='%s'", value)
353 if not found and "default" in opt:
354 value = opt["default"]
355 found = True # ????
357 if not found and opt.get("required", False):
358 print(f"\n\nError: search for {key} failed")
359 print("\tcurrent = ", self.get("current"))
360 print("\topt = ", opt)
361 print("\tcurvals = ", curvals)
362 print("\n\n")
363 raise KeyError(f"Error: Search failed {key}")
365 _LOG.debug("found=%s, value=%s", found, value)
367 _LOG.debug("opt=%s %s", opt, type(opt))
368 if found and isinstance(value, str):
369 if key == "subDirTemplate":
370 # Save if template ends with slash
371 template_endswith_slash = value.endswith("/")
373 if opt.get("expandEnvVars", True):
374 _LOG.debug("before format=%s", value)
375 value = re.sub(r"<ENV:([^>]+)>", r"$\1", value)
376 value = expandvars(value)
377 elif opt.get("replaceEnvVars", False):
378 # Don't replace double dollar signs or $( to allow
379 # pass-through to WMS
380 value = re.sub(r"\${([^$(}]+)}", r"<ENV:\1>", value)
381 value = re.sub(r"\$([^$(}]+)", r"<ENV:\1>", value)
383 if opt.get("replaceVars", True):
384 value = self.replace_vars(value, opt)
385 if key == "subDirTemplate":
386 # Make yaml-specified subdirs easier to read
387 # by removing empty subdirs (//). normpath
388 # removes any trailing slash.
389 value = normpath(value)
390 # Check if subDirTemplate pattern actually ends in slash
391 # If so, the value returned should.
392 if template_endswith_slash:
393 value += "/"
395 _LOG.debug("after format=%s", value)
397 if found and isinstance(value, Config):
398 value = BpsConfig(value, search_order=[])
400 return found, value
402 def replace_vars(self, value: str, opt: dict[str, Any]) -> str:
403 """Replace variables in string with values except those
404 in opt['skipNames'].
406 Parameters
407 ----------
408 value : `str`
409 Value in which to replace variables.
410 opt : `dict` [`str`, Any]
411 Options to be used when searching and replacing values.
412 In particular "skipNames" lists variable names to
413 not replace.
414 """
415 # default only applies to original search key
416 # Instead of doing deep copies of opt (especially with
417 # the recursive calls), temporarily remove default value
418 # and put it back.
419 default = opt.pop("default", _NO_SEARCH_DEFAULT_VALUE)
421 # Temporarily replace any env vars so formatter doesn't try to
422 # replace them.
423 value = re.sub(r"\${([^}]+)}", r"<BPSTMP:\1>", value)
424 for name in opt.get("skipNames", {}):
425 value = value.replace(f"{{{name}}}", f"<BPSTMP2:{name}>")
427 # Replace special keys for WMS to fill in.
428 value = re.sub(r"{wms([^}]+)}", lambda x: f"<WMS:{x[1][0].lower() + x[1][1:]}>", value)
430 value = self.formatter.format(value, self, opt)
432 # Replace any temporary place holders.
433 value = re.sub(r"<BPSTMP:([^>]+)>", r"${\1}", value)
434 value = re.sub(r"<BPSTMP2:([^>]+)>", r"{\1}", value)
436 # if default was originally in opt
437 if default != _NO_SEARCH_DEFAULT_VALUE:
438 opt["default"] = default
440 # check for bpsEval
441 value = re.sub(
442 r"bpsEval\(([^,)]+), ([^)]+)\)", lambda m: str(bps_eval(m.group(1), m.group(2))), value
443 )
444 if "bpsEval" in value:
445 raise ValueError(f"Unparsable bpsEval in '{value}'")
447 return value
449 def generate_config(self) -> None:
450 """Update config with values generated by bpsGenerateConfig
451 entries.
452 """
453 _LOG.debug("generate_config before: %s", self)
454 self._recursive_generate_config("", self)
455 _LOG.debug("generate_config after: %s", self)
457 def _recursive_generate_config(self, recursive_key: str, sub_config: Config) -> None:
458 """Update config with values generated by bpsGenerateConfig
459 entries.
461 Parameters
462 ----------
463 recursive_key : `str`
464 Corresponds to a new subconfig in which to search
465 and replace bpsGenerateConfig.
467 sub_config : `lsst.daf.butler.Config`
468 The nested config corresponding to the recursive_key.
470 Raises
471 ------
472 ValueError
473 If bpsGenerateConfig value isn't parseable.
474 ImportError
475 If problems importing bpsGenerateConfig's method.
476 """
477 _LOG.debug("recursive_key = '%s'", recursive_key)
478 genkey = "bpsGenerateConfig" # to make it easier to change
480 # Save to avoid dictionary changed size during iteration error.
481 orig_keys = list(sub_config)
482 for key in orig_keys:
483 value = Config.__getitem__(sub_config, key)
484 _LOG.debug("key = %s, type(value) = %s", key, type(value))
485 if isinstance(value, Config):
486 self._recursive_generate_config(f"{recursive_key}.{key}", value)
487 elif key == genkey:
488 value = self.replace_vars(value, {"searchobj": sub_config})
490 m = re.match(r"(\S+)\((.+)\)", value)
491 if m:
492 results = bps_eval(m.group(1), m.group(2))
493 del sub_config[genkey]
494 sub_config.update(results)
495 if recursive_key:
496 self[recursive_key] = sub_config
497 _LOG.debug("After config = %s", self)
498 else:
499 raise ValueError(f"Unparsable {genkey} value='{value}'")