Coverage for python / lsst / daf / butler / _config.py: 45%
486 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:18 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:18 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28"""Configuration control."""
30from __future__ import annotations
32__all__ = ("Config", "ConfigSubset")
34import copy
35import io
36import json
37import logging
38import os
39import pprint
40import sys
41from collections import defaultdict
42from collections.abc import Iterable, Iterator, Mapping, MutableMapping, Sequence
43from pathlib import Path
44from typing import IO, TYPE_CHECKING, Any, ClassVar
46import yaml
47from yaml.representer import Representer
49from lsst.resources import ResourcePath, ResourcePathExpression
50from lsst.utils import doImportType
52yaml.add_representer(defaultdict, Representer.represent_dict)
55# Config module logger
56log = logging.getLogger(__name__)
58# PATH-like environment variable to use for defaults.
59CONFIG_PATH = "DAF_BUTLER_CONFIG_PATH"
61if TYPE_CHECKING:
62 yamlLoader = yaml.SafeLoader
63else:
64 try:
65 yamlLoader = yaml.CSafeLoader
66 except AttributeError:
67 # Not all installations have the C library
68 # (but assume for mypy's sake that they're the same)
69 yamlLoader = yaml.SafeLoader
72def _doUpdate(d: Mapping[str, Any], u: Mapping[str, Any]) -> Mapping[str, Any]:
73 if not isinstance(u, Mapping) or not isinstance(d, MutableMapping): 73 ↛ 74line 73 didn't jump to line 74 because the condition on line 73 was never true
74 raise RuntimeError(f"Only call update with Mapping, not {type(d)}")
75 for k, v in u.items():
76 if isinstance(v, Mapping):
77 lhs = d.get(k, {})
78 if not isinstance(lhs, Mapping): 78 ↛ 79line 78 didn't jump to line 79 because the condition on line 78 was never true
79 lhs = {}
80 d[k] = _doUpdate(lhs, v)
81 else:
82 d[k] = v
83 return d
86def _checkNextItem(k: str | int, d: Any, create: bool, must_be_dict: bool) -> tuple[Any, bool]:
87 """See if k is in d and if it is return the new child."""
88 nextVal = None
89 isThere = False
90 if d is None: 90 ↛ 92line 90 didn't jump to line 92 because the condition on line 90 was never true
91 # We have gone past the end of the hierarchy
92 pass
93 elif not must_be_dict and isinstance(d, Sequence): 93 ↛ 98line 93 didn't jump to line 98 because the condition on line 93 was never true
94 # Check for Sequence first because for lists
95 # __contains__ checks whether value is found in list
96 # not whether the index exists in list. When we traverse
97 # the hierarchy we are interested in the index.
98 try:
99 nextVal = d[int(k)]
100 isThere = True
101 except IndexError:
102 pass
103 except ValueError:
104 isThere = k in d
105 elif k in d:
106 nextVal = d[k]
107 isThere = True
108 elif create: 108 ↛ 109line 108 didn't jump to line 109 because the condition on line 108 was never true
109 d[k] = {}
110 nextVal = d[k]
111 isThere = True
113 return nextVal, isThere
116class Loader(yamlLoader):
117 """YAML Loader that supports file include directives.
119 Uses ``!include`` directive in a YAML file to point to another
120 YAML file to be included. The path in the include directive is relative
121 to the file containing that directive.
123 storageClasses: !include storageClasses.yaml
125 Examples
126 --------
127 >>> with open("document.yaml", "r") as f:
128 data = yaml.load(f, Loader=Loader)
130 Notes
131 -----
132 See https://davidchall.github.io/yaml-includes.html
134 Parameters
135 ----------
136 stream : `str` or `io.IO`
137 The stream to parse.
138 """
140 def __init__(self, stream: str | IO): # types-PyYAML annotates 'stream' with a private type
141 super().__init__(stream)
142 # if this is a string and not a stream we may well lack a name
143 if hasattr(stream, "name"): 143 ↛ 147line 143 didn't jump to line 147 because the condition on line 143 was always true
144 self._root = ResourcePath(stream.name, forceDirectory=False)
145 else:
146 # No choice but to assume a local filesystem
147 self._root = ResourcePath("no-file.yaml", forceDirectory=False)
148 self.add_constructor("!include", Loader.include)
150 def include(self, node: yaml.Node) -> list[Any] | dict[str, Any]:
151 result: list[Any] | dict[str, Any]
152 if isinstance(node, yaml.ScalarNode):
153 return self.extractFile(self.construct_scalar(node)) # type: ignore[arg-type]
155 elif isinstance(node, yaml.SequenceNode):
156 result = []
157 for filename in self.construct_sequence(node):
158 result.append(self.extractFile(filename))
159 return result
161 elif isinstance(node, yaml.MappingNode):
162 result = {}
163 for k, v in self.construct_mapping(node).items():
164 if not isinstance(k, str):
165 raise TypeError(f"Expected only strings in YAML mapping; got {k!r} of type {type(k)}.")
166 result[k] = self.extractFile(v)
167 return result
169 else:
170 print("Error:: unrecognised node type in !include statement", file=sys.stderr)
171 raise yaml.constructor.ConstructorError
173 def extractFile(self, filename: str) -> Any:
174 # It is possible for the !include to point to an explicit URI
175 # instead of a relative URI, therefore we first see if it is
176 # scheme-less or not. If it has a scheme we use it directly
177 # if it is scheme-less we use it relative to the file root.
178 requesteduri = ResourcePath(filename, forceAbsolute=False, forceDirectory=False)
180 if requesteduri.scheme:
181 fileuri = requesteduri
182 else:
183 fileuri = self._root.updatedFile(filename)
185 log.debug("Opening YAML file via !include: %s", fileuri)
187 # Read all the data from the resource
188 data = fileuri.read()
190 # Store the bytes into a BytesIO so we can attach a .name
191 stream = io.BytesIO(data)
192 stream.name = fileuri.geturl()
193 return yaml.load(stream, Loader)
196# Type of the key used for accessing items in configuration object. It can be
197# a single string as described below or a sequence of srtings and integer
198# indices. Indices are used to access items in sequences stored in config.
199_ConfigKey = str | Sequence[str | int]
202class Config(MutableMapping):
203 r"""Implements a datatype that is used by `Butler` for configuration.
205 It is essentially a `dict` with key/value pairs, including nested dicts
206 (as values). In fact, it can be initialized with a `dict`.
207 This is explained next:
209 Config extends the `dict` api so that hierarchical values may be accessed
210 with delimited notation or as a tuple. If a string is given the delimiter
211 is picked up from the first character in that string. For example,
212 ``foo.getValue(".a.b.c")``, ``foo["a"]["b"]["c"]``, ``foo["a", "b", "c"]``,
213 ``foo[".a.b.c"]``, and ``foo["/a/b/c"]`` all achieve the same outcome.
214 If the first character is alphanumeric, no delimiter will be used.
215 ``foo["a.b.c"]`` will be a single key ``a.b.c`` as will ``foo[":a.b.c"]``.
216 Unicode characters can be used as the delimiter for distinctiveness if
217 required.
219 If a key in the hierarchy starts with a non-alphanumeric character care
220 should be used to ensure that either the tuple interface is used or
221 a distinct delimiter is always given in string form.
223 Finally, the delimiter can be escaped if it is part of a key and also
224 has to be used as a delimiter. For example, ``foo[r".a.b\.c"]`` results in
225 a two element hierarchy of ``a`` and ``b.c``. For hard-coded strings it is
226 always better to use a different delimiter in these cases.
228 Note that adding a multi-level key implicitly creates any nesting levels
229 that do not exist, but removing multi-level keys does not automatically
230 remove empty nesting levels. As a result:
232 >>> c = Config()
233 >>> c[".a.b"] = 1
234 >>> del c[".a.b"]
235 >>> c["a"]
236 Config({'a': {}})
238 Storage formats supported:
240 - yaml: read and write is supported.
241 - json: read and write is supported but no ``!include`` directive.
243 Parameters
244 ----------
245 other : `lsst.resources.ResourcePath` or `Config` or `dict`
246 Other source of configuration, can be:
248 - (`lsst.resources.ResourcePathExpression`)
249 Treated as a URI to a config file. Must end with ".yaml".
250 - (`Config`) Copies the other Config's values into this one.
251 - (`dict`) Copies the values from the dict into this Config.
253 If `None` is provided an empty `Config` will be created.
254 """
256 _D: str = "→"
257 """Default internal delimiter to use for components in the hierarchy when
258 constructing keys for external use (see `Config.names()`)."""
260 includeKey: ClassVar[str] = "includeConfigs"
261 """Key used to indicate that another config should be included at this
262 part of the hierarchy."""
264 resourcesPackage: str = "lsst.daf.butler"
265 """Package to search for default configuration data. The resources
266 themselves will be within a ``configs`` resource hierarchy."""
268 def __init__(self, other: ResourcePathExpression | Config | Mapping[str, Any] | None = None):
269 self._data: dict[str, Any] = {}
270 self.configFile: ResourcePath | None = None
272 if other is None:
273 return
275 if isinstance(other, Config):
276 # Deep copy might be more efficient but if someone has overridden
277 # a config entry to store a complex object then deep copy may
278 # fail. Safer to use update().
279 self.update(other._data)
280 self.configFile = other.configFile
281 elif isinstance(other, dict | Mapping):
282 # In most cases we have a dict, and it's more efficient
283 # to check for a dict instance before checking the generic mapping.
284 self.update(other)
285 elif isinstance(other, str | ResourcePath | Path): 285 ↛ 292line 285 didn't jump to line 292 because the condition on line 285 was always true
286 # if other is a string, assume it is a file path/URI
287 self.__initFromUri(other)
288 self._processExplicitIncludes()
289 else:
290 # if the config specified by other could not be recognized raise
291 # a runtime error.
292 raise RuntimeError(f"A Config could not be loaded from other: {other}")
294 def ppprint(self) -> str:
295 """Return config as formatted readable string.
297 Examples
298 --------
299 use: ``pdb> print(myConfigObject.ppprint())``
301 Returns
302 -------
303 s : `str`
304 A prettyprint formatted string representing the config.
305 """
306 return pprint.pformat(self._data, indent=2, width=1)
308 def __repr__(self) -> str:
309 return f"{type(self).__name__}({self._data!r})"
311 def __str__(self) -> str:
312 return self.ppprint()
314 def __len__(self) -> int:
315 return len(self._data)
317 def __iter__(self) -> Iterator[str]:
318 return iter(self._data)
320 def copy(self) -> Config:
321 return type(self)(self)
323 @classmethod
324 def fromString(cls, string: str, format: str = "yaml") -> Config:
325 """Create a new Config instance from a serialized string.
327 Parameters
328 ----------
329 string : `str`
330 String containing content in specified format.
331 format : `str`, optional
332 Format of the supplied string. Can be ``json`` or ``yaml``.
334 Returns
335 -------
336 c : `Config`
337 Newly-constructed Config.
338 """
339 if format == "yaml":
340 new_config = cls().__initFromYaml(string)
341 elif format == "json":
342 new_config = cls().__initFromJson(string)
343 else:
344 raise ValueError(f"Unexpected format of string: {format}")
345 new_config._processExplicitIncludes()
346 return new_config
348 @classmethod
349 def fromYaml(cls, string: str) -> Config:
350 """Create a new Config instance from a YAML string.
352 Parameters
353 ----------
354 string : `str`
355 String containing content in YAML format.
357 Returns
358 -------
359 c : `Config`
360 Newly-constructed Config.
361 """
362 return cls.fromString(string, format="yaml")
364 def __initFromUri(self, path: ResourcePathExpression) -> None:
365 """Load a file from a path or an URI.
367 Parameters
368 ----------
369 path : `lsst.resources.ResourcePathExpression`
370 Path or a URI to a persisted config file.
371 """
372 uri = ResourcePath(path, forceDirectory=False)
373 ext = uri.getExtension()
374 if ext == ".yaml": 374 ↛ 381line 374 didn't jump to line 381 because the condition on line 374 was always true
375 log.debug("Opening YAML config file: %s", uri.geturl())
376 content = uri.read()
377 # Use a stream so we can name it
378 stream = io.BytesIO(content)
379 stream.name = uri.geturl()
380 self.__initFromYaml(stream)
381 elif ext == ".json":
382 log.debug("Opening JSON config file: %s", uri.geturl())
383 content = uri.read()
384 self.__initFromJson(content)
385 else:
386 # This URI does not have a valid extension. It might be because
387 # we ended up with a directory and not a file. Before we complain
388 # about an extension, do an existence check. No need to do
389 # the (possibly expensive) existence check in the default code
390 # path above because we will find out soon enough that the file
391 # is not there.
392 if not uri.exists():
393 raise FileNotFoundError(f"Config location {uri} does not exist.")
394 raise RuntimeError(f"The Config URI does not have a supported extension: {uri}")
395 self.configFile = uri
397 def __initFromYaml(self, stream: IO | str | bytes) -> Config:
398 """Load a YAML config from any readable stream that contains one.
400 Parameters
401 ----------
402 stream : `IO` or `str`
403 Stream to pass to the YAML loader. Accepts anything that
404 `yaml.load` accepts. This can include a string as well as an
405 IO stream.
407 Raises
408 ------
409 yaml.YAMLError
410 If there is an error loading the file.
411 """
412 content = yaml.load(stream, Loader=Loader)
413 if content is None: 413 ↛ 414line 413 didn't jump to line 414 because the condition on line 413 was never true
414 content = {}
415 self._data = content
416 return self
418 def __initFromJson(self, stream: IO | str | bytes) -> Config:
419 """Load a JSON config from any readable stream that contains one.
421 Parameters
422 ----------
423 stream : `IO` or `str`
424 Stream to pass to the JSON loader. This can include a string as
425 well as an IO stream.
427 Raises
428 ------
429 TypeError:
430 Raised if there is an error loading the content.
431 """
432 if isinstance(stream, bytes | str):
433 content = json.loads(stream)
434 else:
435 content = json.load(stream)
436 if content is None:
437 content = {}
438 self._data = content
439 return self
441 def _processExplicitIncludes(self) -> None:
442 """Scan through the configuration searching for the special includes.
444 Looks for ``includeConfigs`` directive and processes the includes.
445 """
446 # Search paths for config files
447 searchPaths = [ResourcePath(os.path.curdir, forceDirectory=True)]
448 if self.configFile is not None: 448 ↛ 456line 448 didn't jump to line 456 because the condition on line 448 was always true
449 if isinstance(self.configFile, ResourcePath): 449 ↛ 452line 449 didn't jump to line 452 because the condition on line 449 was always true
450 configDir = self.configFile.dirname()
451 else:
452 raise RuntimeError(f"Unexpected type for config file: {self.configFile}")
453 searchPaths.append(configDir)
455 # Ensure we know what delimiter to use
456 names = self.nameTuples()
457 for path in names:
458 if path[-1] == self.includeKey: 458 ↛ 459line 458 didn't jump to line 459 because the condition on line 458 was never true
459 log.debug("Processing file include directive at %s", self._D + self._D.join(path))
460 basePath = path[:-1]
462 # Extract the includes and then delete them from the config
463 includes = self[path]
464 del self[path]
466 # Be consistent and convert to a list
467 if not isinstance(includes, list):
468 includes = [includes]
470 # Read each file assuming it is a reference to a file
471 # The file can be relative to config file or cwd
472 # ConfigSubset search paths are not used
473 subConfigs = []
474 for fileName in includes:
475 # Expand any shell variables -- this could be URI
476 fileName = ResourcePath(
477 os.path.expandvars(fileName), forceAbsolute=False, forceDirectory=False
478 )
479 found = None
480 if fileName.isabs():
481 found = fileName
482 else:
483 for dir in searchPaths:
484 specific = dir.join(fileName.path)
485 # Remote resource check might be expensive
486 if specific.exists():
487 found = specific
488 break
489 if not found:
490 raise RuntimeError(f"Unable to find referenced include file: {fileName}")
492 # Read the referenced Config as a Config
493 subConfigs.append(type(self)(found))
495 # Now we need to merge these sub configs with the current
496 # information that was present in this node in the config
497 # tree with precedence given to the explicit values
498 newConfig = subConfigs.pop(0)
499 for sc in subConfigs:
500 newConfig.update(sc)
502 # Explicit values take precedence
503 if not basePath:
504 # This is an include at the root config
505 newConfig.update(self)
506 # Replace the current config
507 self._data = newConfig._data
508 else:
509 newConfig.update(self[basePath])
510 # And reattach to the base config
511 self[basePath] = newConfig
513 @staticmethod
514 def _splitIntoKeys(key: _ConfigKey) -> list[str | int]:
515 r"""Split the argument for get/set/in into a hierarchical list.
517 Parameters
518 ----------
519 key : `str` or iterable
520 Argument given to get/set/in. If an iterable is provided it will
521 be converted to a list. If the first character of the string
522 is not an alphanumeric character then it will be used as the
523 delimiter for the purposes of splitting the remainder of the
524 string. If the delimiter is also in one of the keys then it
525 can be escaped using ``\``. There is no default delimiter.
527 Returns
528 -------
529 keys : `list`
530 Hierarchical keys as a `list`.
531 """
532 if isinstance(key, str):
533 if not key[0].isalnum(): 533 ↛ 534line 533 didn't jump to line 534 because the condition on line 533 was never true
534 d = key[0]
535 key = key[1:]
536 else:
537 return [
538 key,
539 ]
540 escaped = f"\\{d}"
541 temp = None
542 if escaped in key:
543 # Complain at the attempt to escape the escape
544 doubled = rf"\{escaped}"
545 if doubled in key:
546 raise ValueError(
547 f"Escaping an escaped delimiter ({doubled} in {key}) is not yet supported."
548 )
549 # Replace with a character that won't be in the string
550 temp = "\r"
551 if temp in key or d == temp:
552 raise ValueError(
553 f"Can not use character {temp!r} in hierarchical key or as"
554 " delimiter if escaping the delimiter"
555 )
556 key = key.replace(escaped, temp)
557 hierarchy = key.split(d)
558 if temp:
559 hierarchy = [h.replace(temp, d) for h in hierarchy]
560 # Copy the list to keep mypy quiet.
561 return list(hierarchy)
562 elif isinstance(key, Iterable): 562 ↛ 566line 562 didn't jump to line 566 because the condition on line 562 was always true
563 return list(key)
564 else:
565 # Do not try to guess.
566 raise TypeError(f"Provided key [{key}] neither str nor iterable.")
568 def _getKeyHierarchy(self, name: _ConfigKey) -> list[str | int]:
569 """Retrieve the key hierarchy for accessing the Config.
571 Parameters
572 ----------
573 name : `str` or `tuple`
574 Delimited string or `tuple` of hierarchical keys.
576 Returns
577 -------
578 hierarchy : `list` of `str`
579 Hierarchy to use as a `list`. If the name is available directly
580 as a key in the Config it will be used regardless of the presence
581 of any nominal delimiter.
582 """
583 keys: list[str | int]
584 if name in self._data:
585 keys = [name]
586 else:
587 keys = self._splitIntoKeys(name)
588 return keys
590 def _findInHierarchy(self, keys: Sequence[str | int], create: bool = False) -> tuple[list[Any], bool]:
591 """Look for hierarchy of keys in Config.
593 Parameters
594 ----------
595 keys : `list` or `tuple`
596 Keys to search in hierarchy.
597 create : `bool`, optional
598 If `True`, if a part of the hierarchy does not exist, insert an
599 empty `dict` into the hierarchy.
601 Returns
602 -------
603 hierarchy : `list`
604 List of the value corresponding to each key in the supplied
605 hierarchy. Only keys that exist in the hierarchy will have
606 a value.
607 complete : `bool`
608 `True` if the full hierarchy exists and the final element
609 in ``hierarchy`` is the value of relevant value.
610 """
611 d: Any = self._data
613 # For the first key, d must be a dict so it is a waste
614 # of time to check for a sequence.
615 must_be_dict = True
617 hierarchy = []
618 complete = True
619 for k in keys:
620 d, isThere = _checkNextItem(k, d, create, must_be_dict)
621 if isThere:
622 hierarchy.append(d)
623 else:
624 complete = False
625 break
626 # Second time round it might be a sequence.
627 must_be_dict = False
629 return hierarchy, complete
631 def __getitem__(self, name: _ConfigKey) -> Any:
632 # Override the split for the simple case where there is an exact
633 # match. This allows `Config.items()` to work via a simple
634 # __iter__ implementation that returns top level keys of
635 # self._data.
637 # If the name matches a key in the top-level hierarchy, bypass
638 # all further cleverness.
639 found_directly = False
640 try:
641 if isinstance(name, str): 641 ↛ 647line 641 didn't jump to line 647 because the condition on line 641 was always true
642 data = self._data[name]
643 found_directly = True
644 except KeyError:
645 pass
647 if not found_directly:
648 keys = self._getKeyHierarchy(name)
650 hierarchy, complete = self._findInHierarchy(keys)
651 if not complete: 651 ↛ 653line 651 didn't jump to line 653 because the condition on line 651 was always true
652 raise KeyError(f"{name} not found")
653 data = hierarchy[-1]
655 # In most cases we have a dict, and it's more efficient
656 # to check for a dict instance before checking the generic mapping.
657 if isinstance(data, dict | Mapping):
658 data = Config(data)
659 # Ensure that child configs inherit the parent internal delimiter
660 if self._D != Config._D: 660 ↛ 661line 660 didn't jump to line 661 because the condition on line 660 was never true
661 data._D = self._D
662 return data
664 def __setitem__(self, name: _ConfigKey, value: Any) -> None:
665 keys = self._getKeyHierarchy(name)
666 last = keys.pop()
667 if isinstance(value, Config):
668 value = copy.deepcopy(value._data)
670 hierarchy, complete = self._findInHierarchy(keys, create=True)
671 if hierarchy:
672 data = hierarchy[-1]
673 else:
674 data = self._data
676 try:
677 data[last] = value
678 except TypeError:
679 data[int(last)] = value
681 def __contains__(self, key: Any) -> bool:
682 if not isinstance(key, str | Sequence): 682 ↛ 683line 682 didn't jump to line 683 because the condition on line 682 was never true
683 return False
684 keys = self._getKeyHierarchy(key)
685 hierarchy, complete = self._findInHierarchy(keys)
686 return complete
688 def __delitem__(self, key: str | Sequence[str]) -> None:
689 keys = self._getKeyHierarchy(key)
690 last = keys.pop()
691 hierarchy, complete = self._findInHierarchy(keys)
692 if complete: 692 ↛ 699line 692 didn't jump to line 699 because the condition on line 692 was always true
693 if hierarchy: 693 ↛ 694line 693 didn't jump to line 694 because the condition on line 693 was never true
694 data = hierarchy[-1]
695 else:
696 data = self._data
697 del data[last]
698 else:
699 raise KeyError(f"{key} not found in Config")
701 def update(self, other: Mapping[str, Any]) -> None: # type: ignore[override]
702 """Update config from other `Config` or `dict`.
704 Like `dict.update`, but will add or modify keys in nested dicts,
705 instead of overwriting the nested dict entirely.
707 Parameters
708 ----------
709 other : `dict` or `Config`
710 Source of configuration.
712 Examples
713 --------
714 >>> c = Config({"a": {"b": 1}})
715 >>> c.update({"a": {"c": 2}})
716 >>> print(c)
717 {'a': {'b': 1, 'c': 2}}
719 >>> foo = {"a": {"b": 1}}
720 >>> foo.update({"a": {"c": 2}})
721 >>> print(foo)
722 {'a': {'c': 2}}
723 """
724 _doUpdate(self._data, other)
726 def merge(self, other: Mapping) -> None:
727 """Merge another Config into this one.
729 Like `Config.update()`, but will add keys & values from other that
730 DO NOT EXIST in self.
732 Keys and values that already exist in self will NOT be overwritten.
734 Parameters
735 ----------
736 other : `dict` or `Config`
737 Source of configuration.
738 """
739 if not isinstance(other, Mapping):
740 raise TypeError(f"Can only merge a Mapping into a Config, not {type(other)}")
742 # Convert the supplied mapping to a Config for consistency
743 # This will do a deepcopy if it is already a Config
744 otherCopy = Config(other)
745 otherCopy.update(self)
746 self._data = otherCopy._data
748 def nameTuples(self, topLevelOnly: bool = False) -> list[tuple[str, ...]]:
749 """Get tuples representing the name hierarchies of all keys.
751 The tuples returned from this method are guaranteed to be usable
752 to access items in the configuration object.
754 Parameters
755 ----------
756 topLevelOnly : `bool`, optional
757 If False, the default, a full hierarchy of names is returned.
758 If True, only the top level are returned.
760 Returns
761 -------
762 names : `list` of `tuple` of `str`
763 List of all names present in the `Config` where each element
764 in the list is a `tuple` of strings representing the hierarchy.
765 """
766 if topLevelOnly: 766 ↛ 767line 766 didn't jump to line 767 because the condition on line 766 was never true
767 return [(k,) for k in self]
769 def getKeysAsTuples(
770 d: Mapping[str, Any] | Sequence[str], keys: list[tuple[str, ...]], base: tuple[str, ...] | None
771 ) -> None:
772 if isinstance(d, Sequence):
773 theseKeys: Iterable[Any] = range(len(d))
774 else:
775 theseKeys = d.keys()
776 for key in theseKeys:
777 val = d[key]
778 levelKey = base + (key,) if base is not None else (key,)
779 keys.append(levelKey)
780 if isinstance(val, Mapping | Sequence) and not isinstance(val, str):
781 getKeysAsTuples(val, keys, levelKey)
783 keys: list[tuple[str, ...]] = []
784 getKeysAsTuples(self._data, keys, None)
785 return keys
787 def names(self, topLevelOnly: bool = False, delimiter: str | None = None) -> list[str]:
788 """Get a delimited name of all the keys in the hierarchy.
790 The values returned from this method are guaranteed to be usable
791 to access items in the configuration object.
793 Parameters
794 ----------
795 topLevelOnly : `bool`, optional
796 If False, the default, a full hierarchy of names is returned.
797 If True, only the top level are returned.
798 delimiter : `str`, optional
799 Delimiter to use when forming the keys. If the delimiter is
800 present in any of the keys, it will be escaped in the returned
801 names. If `None` given a delimiter will be automatically provided.
802 The delimiter can not be alphanumeric.
804 Returns
805 -------
806 names : `list` of `str`
807 List of all names present in the `Config`.
809 Notes
810 -----
811 This is different than the built-in method `dict.keys`, which will
812 return only the first level keys.
814 Raises
815 ------
816 ValueError
817 The supplied delimiter is alphanumeric.
818 """
819 if topLevelOnly:
820 return list(self.keys())
822 # Get all the tuples of hierarchical keys
823 nameTuples = self.nameTuples()
825 if delimiter is not None and delimiter.isalnum():
826 raise ValueError(f"Supplied delimiter ({delimiter!r}) must not be alphanumeric.")
828 if delimiter is None:
829 # Start with something, and ensure it does not need to be
830 # escaped (it is much easier to understand if not escaped)
831 delimiter = self._D
833 # Form big string for easy check of delimiter clash
834 combined = "".join("".join(str(s) for s in k) for k in nameTuples)
836 # Try a delimiter and keep trying until we get something that
837 # works.
838 ntries = 0
839 while delimiter in combined:
840 log.debug("Delimiter '%s' could not be used. Trying another.", delimiter)
841 ntries += 1
843 if ntries > 100:
844 raise ValueError(f"Unable to determine a delimiter for Config {self}")
846 # try another one
847 while True:
848 delimiter = chr(ord(delimiter) + 1)
849 if not delimiter.isalnum():
850 break
852 log.debug("Using delimiter %r", delimiter)
854 # Form the keys, escaping the delimiter if necessary
855 strings = [
856 delimiter + delimiter.join(str(s).replace(delimiter, f"\\{delimiter}") for s in k)
857 for k in nameTuples
858 ]
859 return strings
861 def asArray(self, name: str | Sequence[str]) -> Sequence[Any]:
862 """Get a value as an array.
864 May contain one or more elements.
866 Parameters
867 ----------
868 name : `str`
869 Key to use to retrieve value.
871 Returns
872 -------
873 array : `collections.abc.Sequence`
874 The value corresponding to name, but guaranteed to be returned
875 as a list with at least one element. If the value is a
876 `~collections.abc.Sequence` (and not a `str`) the value itself
877 will be returned, else the value will be the first element.
878 """
879 val = self.get(name)
880 if isinstance(val, str) or not isinstance(val, Sequence):
881 val = [val]
882 return val
884 def __eq__(self, other: Any) -> bool:
885 if isinstance(other, Config):
886 other = other._data
887 return self._data == other
889 def __ne__(self, other: Any) -> bool:
890 if isinstance(other, Config):
891 other = other._data
892 return self._data != other
894 #######
895 # i/o #
897 def dump(self, output: IO | None = None, format: str = "yaml") -> str | None:
898 """Write the config to an output stream.
900 Parameters
901 ----------
902 output : `IO`, optional
903 The stream to use for output. If `None` the serialized content
904 will be returned.
905 format : `str`, optional
906 The format to use for the output. Can be "yaml" or "json".
908 Returns
909 -------
910 serialized : `str` or `None`
911 If a stream was given the stream will be used and the return
912 value will be `None`. If the stream was `None` the
913 serialization will be returned as a string.
914 """
915 if format == "yaml":
916 return yaml.safe_dump(self._data, output, default_flow_style=False)
917 elif format == "json":
918 if output is not None:
919 json.dump(self._data, output, ensure_ascii=False)
920 return None
921 else:
922 return json.dumps(self._data, ensure_ascii=False)
923 raise ValueError(f"Unsupported format for Config serialization: {format}")
925 def dumpToUri(
926 self,
927 uri: ResourcePathExpression,
928 updateFile: bool = True,
929 defaultFileName: str = "butler.yaml",
930 overwrite: bool = True,
931 ) -> None:
932 """Write the config to location pointed to by given URI.
934 Currently supports 's3' and 'file' URI schemes.
936 Parameters
937 ----------
938 uri : `lsst.resources.ResourcePathExpression`
939 URI of location where the Config will be written.
940 updateFile : bool, optional
941 If True and uri does not end on a filename with extension, will
942 append ``defaultFileName`` to the target uri. True by default.
943 defaultFileName : bool, optional
944 The file name that will be appended to target uri if updateFile is
945 True and uri does not end on a file with an extension.
946 overwrite : bool, optional
947 If True the configuration will be written even if it already
948 exists at that location.
949 """
950 # Make local copy of URI or create new one
951 uri = ResourcePath(uri)
953 if updateFile and not uri.getExtension():
954 if uri.isdir():
955 uri = uri.join(defaultFileName, forceDirectory=False)
956 else:
957 uri = uri.updatedFile(defaultFileName)
959 # Try to work out the format from the extension
960 ext = uri.getExtension()
961 format = ext[1:].lower()
963 output = self.dump(format=format)
964 assert output is not None, "Config.dump guarantees not-None return when output arg is None"
965 uri.write(output.encode(), overwrite=overwrite)
966 self.configFile = uri
968 @staticmethod
969 def updateParameters(
970 configType: type[ConfigSubset],
971 config: Config,
972 full: Config,
973 toUpdate: dict[str, Any] | None = None,
974 toCopy: Sequence[str | Sequence[str]] | None = None,
975 overwrite: bool = True,
976 toMerge: Sequence[str | Sequence[str]] | None = None,
977 ) -> None:
978 """Update specific config parameters.
980 Allows for named parameters to be set to new values in bulk, and
981 for other values to be set by copying from a reference config.
983 Assumes that the supplied config is compatible with ``configType``
984 and will attach the updated values to the supplied config by
985 looking for the related component key. It is assumed that
986 ``config`` and ``full`` are from the same part of the
987 configuration hierarchy.
989 Parameters
990 ----------
991 configType : `ConfigSubset`
992 Config type to use to extract relevant items from ``config``.
993 config : `Config`
994 A `Config` to update. Only the subset understood by
995 the supplied `ConfigSubset` will be modified. Default values
996 will not be inserted and the content will not be validated
997 since mandatory keys are allowed to be missing until
998 populated later by merging.
999 full : `Config`
1000 A complete config with all defaults expanded that can be
1001 converted to a ``configType``. Read-only and will not be
1002 modified by this method. Values are read from here if
1003 ``toCopy`` is defined.
1005 Repository-specific options that should not be obtained
1006 from defaults when Butler instances are constructed
1007 should be copied from ``full`` to ``config``.
1008 toUpdate : `dict`, optional
1009 A `dict` defining the keys to update and the new value to use.
1010 The keys and values can be any supported by `Config`
1011 assignment.
1012 toCopy : `tuple`, optional
1013 `tuple` of keys whose values should be copied from ``full``
1014 into ``config``.
1015 overwrite : `bool`, optional
1016 If `False`, do not modify a value in ``config`` if the key
1017 already exists. Default is always to overwrite.
1018 toMerge : `tuple`, optional
1019 Keys to merge content from full to config without overwriting
1020 pre-existing values. Only works if the key refers to a hierarchy.
1021 The ``overwrite`` flag is ignored.
1023 Raises
1024 ------
1025 ValueError
1026 Neither ``toUpdate``, ``toCopy`` nor ``toMerge`` were defined.
1027 """
1028 if toUpdate is None and toCopy is None and toMerge is None:
1029 raise ValueError("At least one of toUpdate, toCopy, or toMerge parameters must be set.")
1031 # If this is a parent configuration then we need to ensure that
1032 # the supplied config has the relevant component key in it.
1033 # If this is a parent configuration we add in the stub entry
1034 # so that the ConfigSubset constructor will do the right thing.
1035 # We check full for this since that is guaranteed to be complete.
1036 if (
1037 configType.component is not None
1038 and configType.component in full
1039 and configType.component not in config
1040 ):
1041 config[configType.component] = {}
1043 # Extract the part of the config we wish to update
1044 localConfig = configType(config, mergeDefaults=False, validate=False)
1046 key: str | Sequence[str]
1047 if toUpdate:
1048 for key, value in toUpdate.items():
1049 if key in localConfig and not overwrite:
1050 log.debug(
1051 "Not overriding key '%s' with value '%s' in config %s",
1052 key,
1053 value,
1054 localConfig.__class__.__name__,
1055 )
1056 else:
1057 localConfig[key] = value
1059 if toCopy or toMerge:
1060 localFullConfig = configType(full, mergeDefaults=False)
1062 if toCopy:
1063 for key in toCopy:
1064 if key in localConfig and not overwrite:
1065 log.debug(
1066 "Not overriding key '%s' from defaults in config %s",
1067 key,
1068 localConfig.__class__.__name__,
1069 )
1070 else:
1071 localConfig[key] = localFullConfig[key]
1072 if toMerge:
1073 for key in toMerge:
1074 if key in localConfig:
1075 # Get the node from the config to do the merge
1076 # but then have to reattach to the config.
1077 subset = localConfig[key]
1078 subset.merge(localFullConfig[key])
1079 localConfig[key] = subset
1080 else:
1081 localConfig[key] = localFullConfig[key]
1083 # Reattach to parent if this is a child config
1084 if configType.component is not None and configType.component in config:
1085 config[configType.component] = localConfig
1086 else:
1087 config.update(localConfig)
1089 def toDict(self) -> dict[str, Any]:
1090 """Convert a `Config` to a standalone hierarchical `dict`.
1092 Returns
1093 -------
1094 d : `dict`
1095 The standalone hierarchical `dict` with any `Config` classes
1096 in the hierarchy converted to `dict`.
1098 Notes
1099 -----
1100 This can be useful when passing a Config to some code that
1101 expects native Python types.
1102 """
1103 output = copy.deepcopy(self._data)
1104 for k, v in output.items():
1105 if isinstance(v, Config):
1106 v = v.toDict()
1107 output[k] = v
1108 return output
1111class ConfigSubset(Config):
1112 """Config representing a subset of a more general configuration.
1114 Subclasses define their own component and when given a configuration
1115 that includes that component, the resulting configuration only includes
1116 the subset. For example, your config might contain ``dimensions`` if it's
1117 part of a global config and that subset will be stored. If ``dimensions``
1118 can not be found it is assumed that the entire contents of the
1119 configuration should be used.
1121 Default values are read from the environment or supplied search paths
1122 using the default configuration file name specified in the subclass.
1123 This allows a configuration class to be instantiated without any
1124 additional arguments.
1126 Additional validation can be specified to check for keys that are mandatory
1127 in the configuration.
1129 Parameters
1130 ----------
1131 other : `Config` or `~lsst.resources.ResourcePathExpression` or `dict`
1132 Argument specifying the configuration information as understood
1133 by `Config`.
1134 validate : `bool`, optional
1135 If `True` required keys will be checked to ensure configuration
1136 consistency.
1137 mergeDefaults : `bool`, optional
1138 If `True` defaults will be read and the supplied config will
1139 be combined with the defaults, with the supplied values taking
1140 precedence.
1141 searchPaths : `list` or `tuple`, optional
1142 Explicit additional paths to search for defaults. They should
1143 be supplied in priority order. These paths have higher priority
1144 than those read from the environment in
1145 `ConfigSubset.defaultSearchPaths()`. Paths can be `str` referring to
1146 the local file system or URIs, `lsst.resources.ResourcePath`.
1147 """
1149 component: ClassVar[str | None] = None
1150 """Component to use from supplied config. Can be None. If specified the
1151 key is not required. Can be a full dot-separated path to a component.
1152 """
1154 requiredKeys: ClassVar[Sequence[str]] = ()
1155 """Keys that are required to be specified in the configuration.
1156 """
1158 defaultConfigFile: ClassVar[str | None] = None
1159 """Name of the file containing defaults for this config class.
1160 """
1162 def __init__(
1163 self,
1164 other: Config | ResourcePathExpression | Mapping[str, Any] | None = None,
1165 validate: bool = True,
1166 mergeDefaults: bool = True,
1167 searchPaths: Sequence[ResourcePathExpression] | None = None,
1168 ):
1169 # Create a blank object to receive the defaults
1170 # Once we have the defaults we then update with the external values
1171 super().__init__()
1173 # Create a standard Config rather than subset
1174 externalConfig = Config(other)
1176 # Select the part we need from it
1177 # To simplify the use of !include we also check for the existence of
1178 # component.component (since the included files can themselves
1179 # include the component name)
1180 if self.component is not None: 1180 ↛ 1189line 1180 didn't jump to line 1189 because the condition on line 1180 was always true
1181 doubled = (self.component, self.component)
1182 # Must check for double depth first
1183 if doubled in externalConfig: 1183 ↛ 1184line 1183 didn't jump to line 1184 because the condition on line 1183 was never true
1184 externalConfig = externalConfig[doubled]
1185 elif self.component in externalConfig:
1186 externalConfig._data = externalConfig._data[self.component]
1188 # Default files read to create this configuration
1189 self.filesRead: list[ResourcePath | str] = []
1191 # Assume we are not looking up child configurations
1192 containerKey = None
1194 # Sometimes we do not want to merge with defaults.
1195 if mergeDefaults:
1196 # Supplied search paths have highest priority
1197 fullSearchPath: list[ResourcePath | str] = []
1198 if searchPaths: 1198 ↛ 1199line 1198 didn't jump to line 1199 because the condition on line 1198 was never true
1199 fullSearchPath = [ResourcePath(path, forceDirectory=True) for path in searchPaths]
1201 # Read default paths from environment
1202 fullSearchPath.extend(self.defaultSearchPaths())
1204 # There are two places to find defaults for this particular config
1205 # - The "defaultConfigFile" defined in the subclass
1206 # - The class specified in the "cls" element in the config.
1207 # Read cls after merging in case it changes.
1208 if self.defaultConfigFile is not None: 1208 ↛ 1213line 1208 didn't jump to line 1213 because the condition on line 1208 was always true
1209 self._updateWithConfigsFromPath(fullSearchPath, self.defaultConfigFile)
1211 # Can have a class specification in the external config (priority)
1212 # or from the defaults.
1213 pytype = None
1214 if "cls" in externalConfig: 1214 ↛ 1215line 1214 didn't jump to line 1215 because the condition on line 1214 was never true
1215 pytype = externalConfig["cls"]
1216 elif "cls" in self: 1216 ↛ 1217line 1216 didn't jump to line 1217 because the condition on line 1216 was never true
1217 pytype = self["cls"]
1219 if pytype is not None: 1219 ↛ 1220line 1219 didn't jump to line 1220 because the condition on line 1219 was never true
1220 try:
1221 cls = doImportType(pytype)
1222 except ImportError as e:
1223 raise RuntimeError(f"Failed to import cls '{pytype}' for config {type(self)}") from e
1224 # The class referenced from the config file is not required
1225 # to specify a default config file.
1226 defaultsFile = getattr(cls, "defaultConfigFile", None)
1227 if defaultsFile is not None:
1228 self._updateWithConfigsFromPath(fullSearchPath, defaultsFile)
1230 # Get the container key in case we need it and it is specified.
1231 containerKey = getattr(cls, "containerKey", None)
1233 # Now update this object with the external values so that the external
1234 # values always override the defaults
1235 self.update(externalConfig)
1236 if not self.configFile: 1236 ↛ 1242line 1236 didn't jump to line 1242 because the condition on line 1236 was always true
1237 self.configFile = externalConfig.configFile
1239 # If this configuration has child configurations of the same
1240 # config class, we need to expand those defaults as well.
1242 if mergeDefaults and containerKey is not None and containerKey in self: 1242 ↛ 1243line 1242 didn't jump to line 1243 because the condition on line 1242 was never true
1243 for idx, subConfig in enumerate(self[containerKey]):
1244 self[containerKey, idx] = type(self)(
1245 other=subConfig, validate=validate, mergeDefaults=mergeDefaults, searchPaths=searchPaths
1246 )
1248 if validate:
1249 self.validate()
1251 @classmethod
1252 def defaultSearchPaths(cls) -> list[ResourcePath | str]:
1253 """Read environment to determine search paths to use.
1255 Global defaults, at lowest priority, are found in the ``config``
1256 directory of the butler source tree. Additional defaults can be
1257 defined using the environment variable ``$DAF_BUTLER_CONFIG_PATH``
1258 which is a PATH-like variable where paths at the front of the list
1259 have priority over those later.
1261 Returns
1262 -------
1263 paths : `list`
1264 Returns a list of paths to search. The returned order is in
1265 priority with the highest priority paths first. The butler config
1266 configuration resources will not be included here but will
1267 always be searched last.
1269 Notes
1270 -----
1271 The environment variable is split on the standard ``:`` path separator.
1272 This currently makes it incompatible with usage of URIs.
1273 """
1274 # We can pick up defaults from multiple search paths
1275 # We fill defaults by using the butler config path and then
1276 # the config path environment variable in reverse order.
1277 defaultsPaths: list[str | ResourcePath] = []
1279 if CONFIG_PATH in os.environ: 1279 ↛ 1280line 1279 didn't jump to line 1280 because the condition on line 1279 was never true
1280 externalPaths = os.environ[CONFIG_PATH].split(os.pathsep)
1281 defaultsPaths.extend(externalPaths)
1283 # Add the package defaults as a resource
1284 defaultsPaths.append(ResourcePath(f"resource://{cls.resourcesPackage}/configs", forceDirectory=True))
1285 return defaultsPaths
1287 def _updateWithConfigsFromPath(
1288 self, searchPaths: Sequence[str | ResourcePath], configFile: ResourcePath | str
1289 ) -> None:
1290 """Search the supplied paths, merging the configuration values.
1292 The values read will override values currently stored in the object.
1293 Every file found in the path will be read, such that the earlier
1294 path entries have higher priority.
1296 Parameters
1297 ----------
1298 searchPaths : `list` of `lsst.resources.ResourcePath`, `str`
1299 Paths to search for the supplied configFile. This path
1300 is the priority order, such that files read from the
1301 first path entry will be selected over those read from
1302 a later path. Can contain `str` referring to the local file
1303 system or a URI string.
1304 configFile : `lsst.resources.ResourcePath`
1305 File to locate in path. If absolute path it will be read
1306 directly and the search path will not be used. Can be a URI
1307 to an explicit resource (which will ignore the search path)
1308 which is assumed to exist.
1309 """
1310 uri = ResourcePath(configFile, forceDirectory=False)
1311 if uri.isabs() and uri.exists(): 1311 ↛ 1313line 1311 didn't jump to line 1313 because the condition on line 1311 was never true
1312 # Assume this resource exists
1313 self._updateWithOtherConfigFile(configFile)
1314 self.filesRead.append(configFile)
1315 else:
1316 # Reverse order so that high priority entries
1317 # update the object last.
1318 for pathDir in reversed(searchPaths):
1319 if isinstance(pathDir, str | ResourcePath): 1319 ↛ 1326line 1319 didn't jump to line 1326 because the condition on line 1319 was always true
1320 pathDir = ResourcePath(pathDir, forceDirectory=True)
1321 file = pathDir.join(configFile)
1322 if file.exists(): 1322 ↛ 1318line 1322 didn't jump to line 1318 because the condition on line 1322 was always true
1323 self.filesRead.append(file)
1324 self._updateWithOtherConfigFile(file)
1325 else:
1326 raise ValueError(f"Unexpected search path type encountered: {pathDir!r}")
1328 def _updateWithOtherConfigFile(self, file: Config | str | ResourcePath | Mapping[str, Any]) -> None:
1329 """Read in some defaults and update.
1331 Update the configuration by reading the supplied file as a config
1332 of this class, and merging such that these values override the
1333 current values. Contents of the external config are not validated.
1335 Parameters
1336 ----------
1337 file : `Config`, `str`, `lsst.resources.ResourcePath`, or `dict`
1338 Entity that can be converted to a `ConfigSubset`.
1339 """
1340 # Use this class to read the defaults so that subsetting can happen
1341 # correctly.
1342 externalConfig = type(self)(file, validate=False, mergeDefaults=False)
1343 self.update(externalConfig)
1345 def validate(self) -> None:
1346 """Check that mandatory keys are present in this configuration.
1348 Ignored if ``requiredKeys`` is empty.
1349 """
1350 # Validation
1351 missing = [k for k in self.requiredKeys if k not in self._data]
1352 if missing: 1352 ↛ 1353line 1352 didn't jump to line 1353 because the condition on line 1352 was never true
1353 raise KeyError(f"Mandatory keys ({missing}) missing from supplied configuration for {type(self)}")