Coverage for python / lsst / daf / butler / _config.py: 45%

486 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 08:41 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28"""Configuration control.""" 

29 

30from __future__ import annotations 

31 

32__all__ = ("Config", "ConfigSubset") 

33 

34import copy 

35import io 

36import json 

37import logging 

38import os 

39import pprint 

40import sys 

41from collections import defaultdict 

42from collections.abc import Iterable, Iterator, Mapping, MutableMapping, Sequence 

43from pathlib import Path 

44from typing import IO, TYPE_CHECKING, Any, ClassVar 

45 

46import yaml 

47from yaml.representer import Representer 

48 

49from lsst.resources import ResourcePath, ResourcePathExpression 

50from lsst.utils import doImportType 

51 

52yaml.add_representer(defaultdict, Representer.represent_dict) 

53 

54 

55# Config module logger 

56log = logging.getLogger(__name__) 

57 

58# PATH-like environment variable to use for defaults. 

59CONFIG_PATH = "DAF_BUTLER_CONFIG_PATH" 

60 

61if TYPE_CHECKING: 

62 yamlLoader = yaml.SafeLoader 

63else: 

64 try: 

65 yamlLoader = yaml.CSafeLoader 

66 except AttributeError: 

67 # Not all installations have the C library 

68 # (but assume for mypy's sake that they're the same) 

69 yamlLoader = yaml.SafeLoader 

70 

71 

72def _doUpdate(d: Mapping[str, Any], u: Mapping[str, Any]) -> Mapping[str, Any]: 

73 if not isinstance(u, Mapping) or not isinstance(d, MutableMapping): 73 ↛ 74line 73 didn't jump to line 74 because the condition on line 73 was never true

74 raise RuntimeError(f"Only call update with Mapping, not {type(d)}") 

75 for k, v in u.items(): 

76 if isinstance(v, Mapping): 

77 lhs = d.get(k, {}) 

78 if not isinstance(lhs, Mapping): 78 ↛ 79line 78 didn't jump to line 79 because the condition on line 78 was never true

79 lhs = {} 

80 d[k] = _doUpdate(lhs, v) 

81 else: 

82 d[k] = v 

83 return d 

84 

85 

86def _checkNextItem(k: str | int, d: Any, create: bool, must_be_dict: bool) -> tuple[Any, bool]: 

87 """See if k is in d and if it is return the new child.""" 

88 nextVal = None 

89 isThere = False 

90 if d is None: 90 ↛ 92line 90 didn't jump to line 92 because the condition on line 90 was never true

91 # We have gone past the end of the hierarchy 

92 pass 

93 elif not must_be_dict and isinstance(d, Sequence): 93 ↛ 98line 93 didn't jump to line 98 because the condition on line 93 was never true

94 # Check for Sequence first because for lists 

95 # __contains__ checks whether value is found in list 

96 # not whether the index exists in list. When we traverse 

97 # the hierarchy we are interested in the index. 

98 try: 

99 nextVal = d[int(k)] 

100 isThere = True 

101 except IndexError: 

102 pass 

103 except ValueError: 

104 isThere = k in d 

105 elif k in d: 

106 nextVal = d[k] 

107 isThere = True 

108 elif create: 108 ↛ 109line 108 didn't jump to line 109 because the condition on line 108 was never true

109 d[k] = {} 

110 nextVal = d[k] 

111 isThere = True 

112 

113 return nextVal, isThere 

114 

115 

116class Loader(yamlLoader): 

117 """YAML Loader that supports file include directives. 

118 

119 Uses ``!include`` directive in a YAML file to point to another 

120 YAML file to be included. The path in the include directive is relative 

121 to the file containing that directive. 

122 

123 storageClasses: !include storageClasses.yaml 

124 

125 Examples 

126 -------- 

127 >>> with open("document.yaml", "r") as f: 

128 data = yaml.load(f, Loader=Loader) 

129 

130 Notes 

131 ----- 

132 See https://davidchall.github.io/yaml-includes.html 

133 

134 Parameters 

135 ---------- 

136 stream : `str` or `io.IO` 

137 The stream to parse. 

138 """ 

139 

140 def __init__(self, stream: str | IO): # types-PyYAML annotates 'stream' with a private type 

141 super().__init__(stream) 

142 # if this is a string and not a stream we may well lack a name 

143 if hasattr(stream, "name"): 143 ↛ 147line 143 didn't jump to line 147 because the condition on line 143 was always true

144 self._root = ResourcePath(stream.name, forceDirectory=False) 

145 else: 

146 # No choice but to assume a local filesystem 

147 self._root = ResourcePath("no-file.yaml", forceDirectory=False) 

148 self.add_constructor("!include", Loader.include) 

149 

150 def include(self, node: yaml.Node) -> list[Any] | dict[str, Any]: 

151 result: list[Any] | dict[str, Any] 

152 if isinstance(node, yaml.ScalarNode): 

153 return self.extractFile(self.construct_scalar(node)) # type: ignore[arg-type] 

154 

155 elif isinstance(node, yaml.SequenceNode): 

156 result = [] 

157 for filename in self.construct_sequence(node): 

158 result.append(self.extractFile(filename)) 

159 return result 

160 

161 elif isinstance(node, yaml.MappingNode): 

162 result = {} 

163 for k, v in self.construct_mapping(node).items(): 

164 if not isinstance(k, str): 

165 raise TypeError(f"Expected only strings in YAML mapping; got {k!r} of type {type(k)}.") 

166 result[k] = self.extractFile(v) 

167 return result 

168 

169 else: 

170 print("Error:: unrecognised node type in !include statement", file=sys.stderr) 

171 raise yaml.constructor.ConstructorError 

172 

173 def extractFile(self, filename: str) -> Any: 

174 # It is possible for the !include to point to an explicit URI 

175 # instead of a relative URI, therefore we first see if it is 

176 # scheme-less or not. If it has a scheme we use it directly 

177 # if it is scheme-less we use it relative to the file root. 

178 requesteduri = ResourcePath(filename, forceAbsolute=False, forceDirectory=False) 

179 

180 if requesteduri.scheme: 

181 fileuri = requesteduri 

182 else: 

183 fileuri = self._root.updatedFile(filename) 

184 

185 log.debug("Opening YAML file via !include: %s", fileuri) 

186 

187 # Read all the data from the resource 

188 data = fileuri.read() 

189 

190 # Store the bytes into a BytesIO so we can attach a .name 

191 stream = io.BytesIO(data) 

192 stream.name = fileuri.geturl() 

193 return yaml.load(stream, Loader) 

194 

195 

196# Type of the key used for accessing items in configuration object. It can be 

197# a single string as described below or a sequence of srtings and integer 

198# indices. Indices are used to access items in sequences stored in config. 

199_ConfigKey = str | Sequence[str | int] 

200 

201 

202class Config(MutableMapping): 

203 r"""Implements a datatype that is used by `Butler` for configuration. 

204 

205 It is essentially a `dict` with key/value pairs, including nested dicts 

206 (as values). In fact, it can be initialized with a `dict`. 

207 This is explained next: 

208 

209 Config extends the `dict` api so that hierarchical values may be accessed 

210 with delimited notation or as a tuple. If a string is given the delimiter 

211 is picked up from the first character in that string. For example, 

212 ``foo.getValue(".a.b.c")``, ``foo["a"]["b"]["c"]``, ``foo["a", "b", "c"]``, 

213 ``foo[".a.b.c"]``, and ``foo["/a/b/c"]`` all achieve the same outcome. 

214 If the first character is alphanumeric, no delimiter will be used. 

215 ``foo["a.b.c"]`` will be a single key ``a.b.c`` as will ``foo[":a.b.c"]``. 

216 Unicode characters can be used as the delimiter for distinctiveness if 

217 required. 

218 

219 If a key in the hierarchy starts with a non-alphanumeric character care 

220 should be used to ensure that either the tuple interface is used or 

221 a distinct delimiter is always given in string form. 

222 

223 Finally, the delimiter can be escaped if it is part of a key and also 

224 has to be used as a delimiter. For example, ``foo[r".a.b\.c"]`` results in 

225 a two element hierarchy of ``a`` and ``b.c``. For hard-coded strings it is 

226 always better to use a different delimiter in these cases. 

227 

228 Note that adding a multi-level key implicitly creates any nesting levels 

229 that do not exist, but removing multi-level keys does not automatically 

230 remove empty nesting levels. As a result: 

231 

232 >>> c = Config() 

233 >>> c[".a.b"] = 1 

234 >>> del c[".a.b"] 

235 >>> c["a"] 

236 Config({'a': {}}) 

237 

238 Storage formats supported: 

239 

240 - yaml: read and write is supported. 

241 - json: read and write is supported but no ``!include`` directive. 

242 

243 Parameters 

244 ---------- 

245 other : `lsst.resources.ResourcePath` or `Config` or `dict` 

246 Other source of configuration, can be: 

247 

248 - (`lsst.resources.ResourcePathExpression`) 

249 Treated as a URI to a config file. Must end with ".yaml". 

250 - (`Config`) Copies the other Config's values into this one. 

251 - (`dict`) Copies the values from the dict into this Config. 

252 

253 If `None` is provided an empty `Config` will be created. 

254 """ 

255 

256 _D: str = "→" 

257 """Default internal delimiter to use for components in the hierarchy when 

258 constructing keys for external use (see `Config.names()`).""" 

259 

260 includeKey: ClassVar[str] = "includeConfigs" 

261 """Key used to indicate that another config should be included at this 

262 part of the hierarchy.""" 

263 

264 resourcesPackage: str = "lsst.daf.butler" 

265 """Package to search for default configuration data. The resources 

266 themselves will be within a ``configs`` resource hierarchy.""" 

267 

268 def __init__(self, other: ResourcePathExpression | Config | Mapping[str, Any] | None = None): 

269 self._data: dict[str, Any] = {} 

270 self.configFile: ResourcePath | None = None 

271 

272 if other is None: 

273 return 

274 

275 if isinstance(other, Config): 

276 # Deep copy might be more efficient but if someone has overridden 

277 # a config entry to store a complex object then deep copy may 

278 # fail. Safer to use update(). 

279 self.update(other._data) 

280 self.configFile = other.configFile 

281 elif isinstance(other, dict | Mapping): 

282 # In most cases we have a dict, and it's more efficient 

283 # to check for a dict instance before checking the generic mapping. 

284 self.update(other) 

285 elif isinstance(other, str | ResourcePath | Path): 285 ↛ 292line 285 didn't jump to line 292 because the condition on line 285 was always true

286 # if other is a string, assume it is a file path/URI 

287 self.__initFromUri(other) 

288 self._processExplicitIncludes() 

289 else: 

290 # if the config specified by other could not be recognized raise 

291 # a runtime error. 

292 raise RuntimeError(f"A Config could not be loaded from other: {other}") 

293 

294 def ppprint(self) -> str: 

295 """Return config as formatted readable string. 

296 

297 Examples 

298 -------- 

299 use: ``pdb> print(myConfigObject.ppprint())`` 

300 

301 Returns 

302 ------- 

303 s : `str` 

304 A prettyprint formatted string representing the config. 

305 """ 

306 return pprint.pformat(self._data, indent=2, width=1) 

307 

308 def __repr__(self) -> str: 

309 return f"{type(self).__name__}({self._data!r})" 

310 

311 def __str__(self) -> str: 

312 return self.ppprint() 

313 

314 def __len__(self) -> int: 

315 return len(self._data) 

316 

317 def __iter__(self) -> Iterator[str]: 

318 return iter(self._data) 

319 

320 def copy(self) -> Config: 

321 return type(self)(self) 

322 

323 @classmethod 

324 def fromString(cls, string: str, format: str = "yaml") -> Config: 

325 """Create a new Config instance from a serialized string. 

326 

327 Parameters 

328 ---------- 

329 string : `str` 

330 String containing content in specified format. 

331 format : `str`, optional 

332 Format of the supplied string. Can be ``json`` or ``yaml``. 

333 

334 Returns 

335 ------- 

336 c : `Config` 

337 Newly-constructed Config. 

338 """ 

339 if format == "yaml": 

340 new_config = cls().__initFromYaml(string) 

341 elif format == "json": 

342 new_config = cls().__initFromJson(string) 

343 else: 

344 raise ValueError(f"Unexpected format of string: {format}") 

345 new_config._processExplicitIncludes() 

346 return new_config 

347 

348 @classmethod 

349 def fromYaml(cls, string: str) -> Config: 

350 """Create a new Config instance from a YAML string. 

351 

352 Parameters 

353 ---------- 

354 string : `str` 

355 String containing content in YAML format. 

356 

357 Returns 

358 ------- 

359 c : `Config` 

360 Newly-constructed Config. 

361 """ 

362 return cls.fromString(string, format="yaml") 

363 

364 def __initFromUri(self, path: ResourcePathExpression) -> None: 

365 """Load a file from a path or an URI. 

366 

367 Parameters 

368 ---------- 

369 path : `lsst.resources.ResourcePathExpression` 

370 Path or a URI to a persisted config file. 

371 """ 

372 uri = ResourcePath(path, forceDirectory=False) 

373 ext = uri.getExtension() 

374 if ext == ".yaml": 374 ↛ 381line 374 didn't jump to line 381 because the condition on line 374 was always true

375 log.debug("Opening YAML config file: %s", uri.geturl()) 

376 content = uri.read() 

377 # Use a stream so we can name it 

378 stream = io.BytesIO(content) 

379 stream.name = uri.geturl() 

380 self.__initFromYaml(stream) 

381 elif ext == ".json": 

382 log.debug("Opening JSON config file: %s", uri.geturl()) 

383 content = uri.read() 

384 self.__initFromJson(content) 

385 else: 

386 # This URI does not have a valid extension. It might be because 

387 # we ended up with a directory and not a file. Before we complain 

388 # about an extension, do an existence check. No need to do 

389 # the (possibly expensive) existence check in the default code 

390 # path above because we will find out soon enough that the file 

391 # is not there. 

392 if not uri.exists(): 

393 raise FileNotFoundError(f"Config location {uri} does not exist.") 

394 raise RuntimeError(f"The Config URI does not have a supported extension: {uri}") 

395 self.configFile = uri 

396 

397 def __initFromYaml(self, stream: IO | str | bytes) -> Config: 

398 """Load a YAML config from any readable stream that contains one. 

399 

400 Parameters 

401 ---------- 

402 stream : `IO` or `str` 

403 Stream to pass to the YAML loader. Accepts anything that 

404 `yaml.load` accepts. This can include a string as well as an 

405 IO stream. 

406 

407 Raises 

408 ------ 

409 yaml.YAMLError 

410 If there is an error loading the file. 

411 """ 

412 content = yaml.load(stream, Loader=Loader) 

413 if content is None: 413 ↛ 414line 413 didn't jump to line 414 because the condition on line 413 was never true

414 content = {} 

415 self._data = content 

416 return self 

417 

418 def __initFromJson(self, stream: IO | str | bytes) -> Config: 

419 """Load a JSON config from any readable stream that contains one. 

420 

421 Parameters 

422 ---------- 

423 stream : `IO` or `str` 

424 Stream to pass to the JSON loader. This can include a string as 

425 well as an IO stream. 

426 

427 Raises 

428 ------ 

429 TypeError: 

430 Raised if there is an error loading the content. 

431 """ 

432 if isinstance(stream, bytes | str): 

433 content = json.loads(stream) 

434 else: 

435 content = json.load(stream) 

436 if content is None: 

437 content = {} 

438 self._data = content 

439 return self 

440 

441 def _processExplicitIncludes(self) -> None: 

442 """Scan through the configuration searching for the special includes. 

443 

444 Looks for ``includeConfigs`` directive and processes the includes. 

445 """ 

446 # Search paths for config files 

447 searchPaths = [ResourcePath(os.path.curdir, forceDirectory=True)] 

448 if self.configFile is not None: 448 ↛ 456line 448 didn't jump to line 456 because the condition on line 448 was always true

449 if isinstance(self.configFile, ResourcePath): 449 ↛ 452line 449 didn't jump to line 452 because the condition on line 449 was always true

450 configDir = self.configFile.dirname() 

451 else: 

452 raise RuntimeError(f"Unexpected type for config file: {self.configFile}") 

453 searchPaths.append(configDir) 

454 

455 # Ensure we know what delimiter to use 

456 names = self.nameTuples() 

457 for path in names: 

458 if path[-1] == self.includeKey: 458 ↛ 459line 458 didn't jump to line 459 because the condition on line 458 was never true

459 log.debug("Processing file include directive at %s", self._D + self._D.join(path)) 

460 basePath = path[:-1] 

461 

462 # Extract the includes and then delete them from the config 

463 includes = self[path] 

464 del self[path] 

465 

466 # Be consistent and convert to a list 

467 if not isinstance(includes, list): 

468 includes = [includes] 

469 

470 # Read each file assuming it is a reference to a file 

471 # The file can be relative to config file or cwd 

472 # ConfigSubset search paths are not used 

473 subConfigs = [] 

474 for fileName in includes: 

475 # Expand any shell variables -- this could be URI 

476 fileName = ResourcePath( 

477 os.path.expandvars(fileName), forceAbsolute=False, forceDirectory=False 

478 ) 

479 found = None 

480 if fileName.isabs(): 

481 found = fileName 

482 else: 

483 for dir in searchPaths: 

484 specific = dir.join(fileName.path) 

485 # Remote resource check might be expensive 

486 if specific.exists(): 

487 found = specific 

488 break 

489 if not found: 

490 raise RuntimeError(f"Unable to find referenced include file: {fileName}") 

491 

492 # Read the referenced Config as a Config 

493 subConfigs.append(type(self)(found)) 

494 

495 # Now we need to merge these sub configs with the current 

496 # information that was present in this node in the config 

497 # tree with precedence given to the explicit values 

498 newConfig = subConfigs.pop(0) 

499 for sc in subConfigs: 

500 newConfig.update(sc) 

501 

502 # Explicit values take precedence 

503 if not basePath: 

504 # This is an include at the root config 

505 newConfig.update(self) 

506 # Replace the current config 

507 self._data = newConfig._data 

508 else: 

509 newConfig.update(self[basePath]) 

510 # And reattach to the base config 

511 self[basePath] = newConfig 

512 

513 @staticmethod 

514 def _splitIntoKeys(key: _ConfigKey) -> list[str | int]: 

515 r"""Split the argument for get/set/in into a hierarchical list. 

516 

517 Parameters 

518 ---------- 

519 key : `str` or iterable 

520 Argument given to get/set/in. If an iterable is provided it will 

521 be converted to a list. If the first character of the string 

522 is not an alphanumeric character then it will be used as the 

523 delimiter for the purposes of splitting the remainder of the 

524 string. If the delimiter is also in one of the keys then it 

525 can be escaped using ``\``. There is no default delimiter. 

526 

527 Returns 

528 ------- 

529 keys : `list` 

530 Hierarchical keys as a `list`. 

531 """ 

532 if isinstance(key, str): 

533 if not key[0].isalnum(): 533 ↛ 534line 533 didn't jump to line 534 because the condition on line 533 was never true

534 d = key[0] 

535 key = key[1:] 

536 else: 

537 return [ 

538 key, 

539 ] 

540 escaped = f"\\{d}" 

541 temp = None 

542 if escaped in key: 

543 # Complain at the attempt to escape the escape 

544 doubled = rf"\{escaped}" 

545 if doubled in key: 

546 raise ValueError( 

547 f"Escaping an escaped delimiter ({doubled} in {key}) is not yet supported." 

548 ) 

549 # Replace with a character that won't be in the string 

550 temp = "\r" 

551 if temp in key or d == temp: 

552 raise ValueError( 

553 f"Can not use character {temp!r} in hierarchical key or as" 

554 " delimiter if escaping the delimiter" 

555 ) 

556 key = key.replace(escaped, temp) 

557 hierarchy = key.split(d) 

558 if temp: 

559 hierarchy = [h.replace(temp, d) for h in hierarchy] 

560 # Copy the list to keep mypy quiet. 

561 return list(hierarchy) 

562 elif isinstance(key, Iterable): 562 ↛ 566line 562 didn't jump to line 566 because the condition on line 562 was always true

563 return list(key) 

564 else: 

565 # Do not try to guess. 

566 raise TypeError(f"Provided key [{key}] neither str nor iterable.") 

567 

568 def _getKeyHierarchy(self, name: _ConfigKey) -> list[str | int]: 

569 """Retrieve the key hierarchy for accessing the Config. 

570 

571 Parameters 

572 ---------- 

573 name : `str` or `tuple` 

574 Delimited string or `tuple` of hierarchical keys. 

575 

576 Returns 

577 ------- 

578 hierarchy : `list` of `str` 

579 Hierarchy to use as a `list`. If the name is available directly 

580 as a key in the Config it will be used regardless of the presence 

581 of any nominal delimiter. 

582 """ 

583 keys: list[str | int] 

584 if name in self._data: 

585 keys = [name] 

586 else: 

587 keys = self._splitIntoKeys(name) 

588 return keys 

589 

590 def _findInHierarchy(self, keys: Sequence[str | int], create: bool = False) -> tuple[list[Any], bool]: 

591 """Look for hierarchy of keys in Config. 

592 

593 Parameters 

594 ---------- 

595 keys : `list` or `tuple` 

596 Keys to search in hierarchy. 

597 create : `bool`, optional 

598 If `True`, if a part of the hierarchy does not exist, insert an 

599 empty `dict` into the hierarchy. 

600 

601 Returns 

602 ------- 

603 hierarchy : `list` 

604 List of the value corresponding to each key in the supplied 

605 hierarchy. Only keys that exist in the hierarchy will have 

606 a value. 

607 complete : `bool` 

608 `True` if the full hierarchy exists and the final element 

609 in ``hierarchy`` is the value of relevant value. 

610 """ 

611 d: Any = self._data 

612 

613 # For the first key, d must be a dict so it is a waste 

614 # of time to check for a sequence. 

615 must_be_dict = True 

616 

617 hierarchy = [] 

618 complete = True 

619 for k in keys: 

620 d, isThere = _checkNextItem(k, d, create, must_be_dict) 

621 if isThere: 

622 hierarchy.append(d) 

623 else: 

624 complete = False 

625 break 

626 # Second time round it might be a sequence. 

627 must_be_dict = False 

628 

629 return hierarchy, complete 

630 

631 def __getitem__(self, name: _ConfigKey) -> Any: 

632 # Override the split for the simple case where there is an exact 

633 # match. This allows `Config.items()` to work via a simple 

634 # __iter__ implementation that returns top level keys of 

635 # self._data. 

636 

637 # If the name matches a key in the top-level hierarchy, bypass 

638 # all further cleverness. 

639 found_directly = False 

640 try: 

641 if isinstance(name, str): 641 ↛ 647line 641 didn't jump to line 647 because the condition on line 641 was always true

642 data = self._data[name] 

643 found_directly = True 

644 except KeyError: 

645 pass 

646 

647 if not found_directly: 

648 keys = self._getKeyHierarchy(name) 

649 

650 hierarchy, complete = self._findInHierarchy(keys) 

651 if not complete: 651 ↛ 653line 651 didn't jump to line 653 because the condition on line 651 was always true

652 raise KeyError(f"{name} not found") 

653 data = hierarchy[-1] 

654 

655 # In most cases we have a dict, and it's more efficient 

656 # to check for a dict instance before checking the generic mapping. 

657 if isinstance(data, dict | Mapping): 

658 data = Config(data) 

659 # Ensure that child configs inherit the parent internal delimiter 

660 if self._D != Config._D: 660 ↛ 661line 660 didn't jump to line 661 because the condition on line 660 was never true

661 data._D = self._D 

662 return data 

663 

664 def __setitem__(self, name: _ConfigKey, value: Any) -> None: 

665 keys = self._getKeyHierarchy(name) 

666 last = keys.pop() 

667 if isinstance(value, Config): 

668 value = copy.deepcopy(value._data) 

669 

670 hierarchy, complete = self._findInHierarchy(keys, create=True) 

671 if hierarchy: 

672 data = hierarchy[-1] 

673 else: 

674 data = self._data 

675 

676 try: 

677 data[last] = value 

678 except TypeError: 

679 data[int(last)] = value 

680 

681 def __contains__(self, key: Any) -> bool: 

682 if not isinstance(key, str | Sequence): 682 ↛ 683line 682 didn't jump to line 683 because the condition on line 682 was never true

683 return False 

684 keys = self._getKeyHierarchy(key) 

685 hierarchy, complete = self._findInHierarchy(keys) 

686 return complete 

687 

688 def __delitem__(self, key: str | Sequence[str]) -> None: 

689 keys = self._getKeyHierarchy(key) 

690 last = keys.pop() 

691 hierarchy, complete = self._findInHierarchy(keys) 

692 if complete: 692 ↛ 699line 692 didn't jump to line 699 because the condition on line 692 was always true

693 if hierarchy: 693 ↛ 694line 693 didn't jump to line 694 because the condition on line 693 was never true

694 data = hierarchy[-1] 

695 else: 

696 data = self._data 

697 del data[last] 

698 else: 

699 raise KeyError(f"{key} not found in Config") 

700 

701 def update(self, other: Mapping[str, Any]) -> None: # type: ignore[override] 

702 """Update config from other `Config` or `dict`. 

703 

704 Like `dict.update`, but will add or modify keys in nested dicts, 

705 instead of overwriting the nested dict entirely. 

706 

707 Parameters 

708 ---------- 

709 other : `dict` or `Config` 

710 Source of configuration. 

711 

712 Examples 

713 -------- 

714 >>> c = Config({"a": {"b": 1}}) 

715 >>> c.update({"a": {"c": 2}}) 

716 >>> print(c) 

717 {'a': {'b': 1, 'c': 2}} 

718 

719 >>> foo = {"a": {"b": 1}} 

720 >>> foo.update({"a": {"c": 2}}) 

721 >>> print(foo) 

722 {'a': {'c': 2}} 

723 """ 

724 _doUpdate(self._data, other) 

725 

726 def merge(self, other: Mapping) -> None: 

727 """Merge another Config into this one. 

728 

729 Like `Config.update()`, but will add keys & values from other that 

730 DO NOT EXIST in self. 

731 

732 Keys and values that already exist in self will NOT be overwritten. 

733 

734 Parameters 

735 ---------- 

736 other : `dict` or `Config` 

737 Source of configuration. 

738 """ 

739 if not isinstance(other, Mapping): 

740 raise TypeError(f"Can only merge a Mapping into a Config, not {type(other)}") 

741 

742 # Convert the supplied mapping to a Config for consistency 

743 # This will do a deepcopy if it is already a Config 

744 otherCopy = Config(other) 

745 otherCopy.update(self) 

746 self._data = otherCopy._data 

747 

748 def nameTuples(self, topLevelOnly: bool = False) -> list[tuple[str, ...]]: 

749 """Get tuples representing the name hierarchies of all keys. 

750 

751 The tuples returned from this method are guaranteed to be usable 

752 to access items in the configuration object. 

753 

754 Parameters 

755 ---------- 

756 topLevelOnly : `bool`, optional 

757 If False, the default, a full hierarchy of names is returned. 

758 If True, only the top level are returned. 

759 

760 Returns 

761 ------- 

762 names : `list` of `tuple` of `str` 

763 List of all names present in the `Config` where each element 

764 in the list is a `tuple` of strings representing the hierarchy. 

765 """ 

766 if topLevelOnly: 766 ↛ 767line 766 didn't jump to line 767 because the condition on line 766 was never true

767 return [(k,) for k in self] 

768 

769 def getKeysAsTuples( 

770 d: Mapping[str, Any] | Sequence[str], keys: list[tuple[str, ...]], base: tuple[str, ...] | None 

771 ) -> None: 

772 if isinstance(d, Sequence): 

773 theseKeys: Iterable[Any] = range(len(d)) 

774 else: 

775 theseKeys = d.keys() 

776 for key in theseKeys: 

777 val = d[key] 

778 levelKey = base + (key,) if base is not None else (key,) 

779 keys.append(levelKey) 

780 if isinstance(val, Mapping | Sequence) and not isinstance(val, str): 

781 getKeysAsTuples(val, keys, levelKey) 

782 

783 keys: list[tuple[str, ...]] = [] 

784 getKeysAsTuples(self._data, keys, None) 

785 return keys 

786 

787 def names(self, topLevelOnly: bool = False, delimiter: str | None = None) -> list[str]: 

788 """Get a delimited name of all the keys in the hierarchy. 

789 

790 The values returned from this method are guaranteed to be usable 

791 to access items in the configuration object. 

792 

793 Parameters 

794 ---------- 

795 topLevelOnly : `bool`, optional 

796 If False, the default, a full hierarchy of names is returned. 

797 If True, only the top level are returned. 

798 delimiter : `str`, optional 

799 Delimiter to use when forming the keys. If the delimiter is 

800 present in any of the keys, it will be escaped in the returned 

801 names. If `None` given a delimiter will be automatically provided. 

802 The delimiter can not be alphanumeric. 

803 

804 Returns 

805 ------- 

806 names : `list` of `str` 

807 List of all names present in the `Config`. 

808 

809 Notes 

810 ----- 

811 This is different than the built-in method `dict.keys`, which will 

812 return only the first level keys. 

813 

814 Raises 

815 ------ 

816 ValueError 

817 The supplied delimiter is alphanumeric. 

818 """ 

819 if topLevelOnly: 

820 return list(self.keys()) 

821 

822 # Get all the tuples of hierarchical keys 

823 nameTuples = self.nameTuples() 

824 

825 if delimiter is not None and delimiter.isalnum(): 

826 raise ValueError(f"Supplied delimiter ({delimiter!r}) must not be alphanumeric.") 

827 

828 if delimiter is None: 

829 # Start with something, and ensure it does not need to be 

830 # escaped (it is much easier to understand if not escaped) 

831 delimiter = self._D 

832 

833 # Form big string for easy check of delimiter clash 

834 combined = "".join("".join(str(s) for s in k) for k in nameTuples) 

835 

836 # Try a delimiter and keep trying until we get something that 

837 # works. 

838 ntries = 0 

839 while delimiter in combined: 

840 log.debug("Delimiter '%s' could not be used. Trying another.", delimiter) 

841 ntries += 1 

842 

843 if ntries > 100: 

844 raise ValueError(f"Unable to determine a delimiter for Config {self}") 

845 

846 # try another one 

847 while True: 

848 delimiter = chr(ord(delimiter) + 1) 

849 if not delimiter.isalnum(): 

850 break 

851 

852 log.debug("Using delimiter %r", delimiter) 

853 

854 # Form the keys, escaping the delimiter if necessary 

855 strings = [ 

856 delimiter + delimiter.join(str(s).replace(delimiter, f"\\{delimiter}") for s in k) 

857 for k in nameTuples 

858 ] 

859 return strings 

860 

861 def asArray(self, name: str | Sequence[str]) -> Sequence[Any]: 

862 """Get a value as an array. 

863 

864 May contain one or more elements. 

865 

866 Parameters 

867 ---------- 

868 name : `str` 

869 Key to use to retrieve value. 

870 

871 Returns 

872 ------- 

873 array : `collections.abc.Sequence` 

874 The value corresponding to name, but guaranteed to be returned 

875 as a list with at least one element. If the value is a 

876 `~collections.abc.Sequence` (and not a `str`) the value itself 

877 will be returned, else the value will be the first element. 

878 """ 

879 val = self.get(name) 

880 if isinstance(val, str) or not isinstance(val, Sequence): 

881 val = [val] 

882 return val 

883 

884 def __eq__(self, other: Any) -> bool: 

885 if isinstance(other, Config): 

886 other = other._data 

887 return self._data == other 

888 

889 def __ne__(self, other: Any) -> bool: 

890 if isinstance(other, Config): 

891 other = other._data 

892 return self._data != other 

893 

894 ####### 

895 # i/o # 

896 

897 def dump(self, output: IO | None = None, format: str = "yaml") -> str | None: 

898 """Write the config to an output stream. 

899 

900 Parameters 

901 ---------- 

902 output : `IO`, optional 

903 The stream to use for output. If `None` the serialized content 

904 will be returned. 

905 format : `str`, optional 

906 The format to use for the output. Can be "yaml" or "json". 

907 

908 Returns 

909 ------- 

910 serialized : `str` or `None` 

911 If a stream was given the stream will be used and the return 

912 value will be `None`. If the stream was `None` the 

913 serialization will be returned as a string. 

914 """ 

915 if format == "yaml": 

916 return yaml.safe_dump(self._data, output, default_flow_style=False) 

917 elif format == "json": 

918 if output is not None: 

919 json.dump(self._data, output, ensure_ascii=False) 

920 return None 

921 else: 

922 return json.dumps(self._data, ensure_ascii=False) 

923 raise ValueError(f"Unsupported format for Config serialization: {format}") 

924 

925 def dumpToUri( 

926 self, 

927 uri: ResourcePathExpression, 

928 updateFile: bool = True, 

929 defaultFileName: str = "butler.yaml", 

930 overwrite: bool = True, 

931 ) -> None: 

932 """Write the config to location pointed to by given URI. 

933 

934 Currently supports 's3' and 'file' URI schemes. 

935 

936 Parameters 

937 ---------- 

938 uri : `lsst.resources.ResourcePathExpression` 

939 URI of location where the Config will be written. 

940 updateFile : bool, optional 

941 If True and uri does not end on a filename with extension, will 

942 append ``defaultFileName`` to the target uri. True by default. 

943 defaultFileName : bool, optional 

944 The file name that will be appended to target uri if updateFile is 

945 True and uri does not end on a file with an extension. 

946 overwrite : bool, optional 

947 If True the configuration will be written even if it already 

948 exists at that location. 

949 """ 

950 # Make local copy of URI or create new one 

951 uri = ResourcePath(uri) 

952 

953 if updateFile and not uri.getExtension(): 

954 if uri.isdir(): 

955 uri = uri.join(defaultFileName, forceDirectory=False) 

956 else: 

957 uri = uri.updatedFile(defaultFileName) 

958 

959 # Try to work out the format from the extension 

960 ext = uri.getExtension() 

961 format = ext[1:].lower() 

962 

963 output = self.dump(format=format) 

964 assert output is not None, "Config.dump guarantees not-None return when output arg is None" 

965 uri.write(output.encode(), overwrite=overwrite) 

966 self.configFile = uri 

967 

968 @staticmethod 

969 def updateParameters( 

970 configType: type[ConfigSubset], 

971 config: Config, 

972 full: Config, 

973 toUpdate: dict[str, Any] | None = None, 

974 toCopy: Sequence[str | Sequence[str]] | None = None, 

975 overwrite: bool = True, 

976 toMerge: Sequence[str | Sequence[str]] | None = None, 

977 ) -> None: 

978 """Update specific config parameters. 

979 

980 Allows for named parameters to be set to new values in bulk, and 

981 for other values to be set by copying from a reference config. 

982 

983 Assumes that the supplied config is compatible with ``configType`` 

984 and will attach the updated values to the supplied config by 

985 looking for the related component key. It is assumed that 

986 ``config`` and ``full`` are from the same part of the 

987 configuration hierarchy. 

988 

989 Parameters 

990 ---------- 

991 configType : `ConfigSubset` 

992 Config type to use to extract relevant items from ``config``. 

993 config : `Config` 

994 A `Config` to update. Only the subset understood by 

995 the supplied `ConfigSubset` will be modified. Default values 

996 will not be inserted and the content will not be validated 

997 since mandatory keys are allowed to be missing until 

998 populated later by merging. 

999 full : `Config` 

1000 A complete config with all defaults expanded that can be 

1001 converted to a ``configType``. Read-only and will not be 

1002 modified by this method. Values are read from here if 

1003 ``toCopy`` is defined. 

1004 

1005 Repository-specific options that should not be obtained 

1006 from defaults when Butler instances are constructed 

1007 should be copied from ``full`` to ``config``. 

1008 toUpdate : `dict`, optional 

1009 A `dict` defining the keys to update and the new value to use. 

1010 The keys and values can be any supported by `Config` 

1011 assignment. 

1012 toCopy : `tuple`, optional 

1013 `tuple` of keys whose values should be copied from ``full`` 

1014 into ``config``. 

1015 overwrite : `bool`, optional 

1016 If `False`, do not modify a value in ``config`` if the key 

1017 already exists. Default is always to overwrite. 

1018 toMerge : `tuple`, optional 

1019 Keys to merge content from full to config without overwriting 

1020 pre-existing values. Only works if the key refers to a hierarchy. 

1021 The ``overwrite`` flag is ignored. 

1022 

1023 Raises 

1024 ------ 

1025 ValueError 

1026 Neither ``toUpdate``, ``toCopy`` nor ``toMerge`` were defined. 

1027 """ 

1028 if toUpdate is None and toCopy is None and toMerge is None: 

1029 raise ValueError("At least one of toUpdate, toCopy, or toMerge parameters must be set.") 

1030 

1031 # If this is a parent configuration then we need to ensure that 

1032 # the supplied config has the relevant component key in it. 

1033 # If this is a parent configuration we add in the stub entry 

1034 # so that the ConfigSubset constructor will do the right thing. 

1035 # We check full for this since that is guaranteed to be complete. 

1036 if ( 

1037 configType.component is not None 

1038 and configType.component in full 

1039 and configType.component not in config 

1040 ): 

1041 config[configType.component] = {} 

1042 

1043 # Extract the part of the config we wish to update 

1044 localConfig = configType(config, mergeDefaults=False, validate=False) 

1045 

1046 key: str | Sequence[str] 

1047 if toUpdate: 

1048 for key, value in toUpdate.items(): 

1049 if key in localConfig and not overwrite: 

1050 log.debug( 

1051 "Not overriding key '%s' with value '%s' in config %s", 

1052 key, 

1053 value, 

1054 localConfig.__class__.__name__, 

1055 ) 

1056 else: 

1057 localConfig[key] = value 

1058 

1059 if toCopy or toMerge: 

1060 localFullConfig = configType(full, mergeDefaults=False) 

1061 

1062 if toCopy: 

1063 for key in toCopy: 

1064 if key in localConfig and not overwrite: 

1065 log.debug( 

1066 "Not overriding key '%s' from defaults in config %s", 

1067 key, 

1068 localConfig.__class__.__name__, 

1069 ) 

1070 else: 

1071 localConfig[key] = localFullConfig[key] 

1072 if toMerge: 

1073 for key in toMerge: 

1074 if key in localConfig: 

1075 # Get the node from the config to do the merge 

1076 # but then have to reattach to the config. 

1077 subset = localConfig[key] 

1078 subset.merge(localFullConfig[key]) 

1079 localConfig[key] = subset 

1080 else: 

1081 localConfig[key] = localFullConfig[key] 

1082 

1083 # Reattach to parent if this is a child config 

1084 if configType.component is not None and configType.component in config: 

1085 config[configType.component] = localConfig 

1086 else: 

1087 config.update(localConfig) 

1088 

1089 def toDict(self) -> dict[str, Any]: 

1090 """Convert a `Config` to a standalone hierarchical `dict`. 

1091 

1092 Returns 

1093 ------- 

1094 d : `dict` 

1095 The standalone hierarchical `dict` with any `Config` classes 

1096 in the hierarchy converted to `dict`. 

1097 

1098 Notes 

1099 ----- 

1100 This can be useful when passing a Config to some code that 

1101 expects native Python types. 

1102 """ 

1103 output = copy.deepcopy(self._data) 

1104 for k, v in output.items(): 

1105 if isinstance(v, Config): 

1106 v = v.toDict() 

1107 output[k] = v 

1108 return output 

1109 

1110 

1111class ConfigSubset(Config): 

1112 """Config representing a subset of a more general configuration. 

1113 

1114 Subclasses define their own component and when given a configuration 

1115 that includes that component, the resulting configuration only includes 

1116 the subset. For example, your config might contain ``dimensions`` if it's 

1117 part of a global config and that subset will be stored. If ``dimensions`` 

1118 can not be found it is assumed that the entire contents of the 

1119 configuration should be used. 

1120 

1121 Default values are read from the environment or supplied search paths 

1122 using the default configuration file name specified in the subclass. 

1123 This allows a configuration class to be instantiated without any 

1124 additional arguments. 

1125 

1126 Additional validation can be specified to check for keys that are mandatory 

1127 in the configuration. 

1128 

1129 Parameters 

1130 ---------- 

1131 other : `Config` or `~lsst.resources.ResourcePathExpression` or `dict` 

1132 Argument specifying the configuration information as understood 

1133 by `Config`. 

1134 validate : `bool`, optional 

1135 If `True` required keys will be checked to ensure configuration 

1136 consistency. 

1137 mergeDefaults : `bool`, optional 

1138 If `True` defaults will be read and the supplied config will 

1139 be combined with the defaults, with the supplied values taking 

1140 precedence. 

1141 searchPaths : `list` or `tuple`, optional 

1142 Explicit additional paths to search for defaults. They should 

1143 be supplied in priority order. These paths have higher priority 

1144 than those read from the environment in 

1145 `ConfigSubset.defaultSearchPaths()`. Paths can be `str` referring to 

1146 the local file system or URIs, `lsst.resources.ResourcePath`. 

1147 """ 

1148 

1149 component: ClassVar[str | None] = None 

1150 """Component to use from supplied config. Can be None. If specified the 

1151 key is not required. Can be a full dot-separated path to a component. 

1152 """ 

1153 

1154 requiredKeys: ClassVar[Sequence[str]] = () 

1155 """Keys that are required to be specified in the configuration. 

1156 """ 

1157 

1158 defaultConfigFile: ClassVar[str | None] = None 

1159 """Name of the file containing defaults for this config class. 

1160 """ 

1161 

1162 def __init__( 

1163 self, 

1164 other: Config | ResourcePathExpression | Mapping[str, Any] | None = None, 

1165 validate: bool = True, 

1166 mergeDefaults: bool = True, 

1167 searchPaths: Sequence[ResourcePathExpression] | None = None, 

1168 ): 

1169 # Create a blank object to receive the defaults 

1170 # Once we have the defaults we then update with the external values 

1171 super().__init__() 

1172 

1173 # Create a standard Config rather than subset 

1174 externalConfig = Config(other) 

1175 

1176 # Select the part we need from it 

1177 # To simplify the use of !include we also check for the existence of 

1178 # component.component (since the included files can themselves 

1179 # include the component name) 

1180 if self.component is not None: 1180 ↛ 1189line 1180 didn't jump to line 1189 because the condition on line 1180 was always true

1181 doubled = (self.component, self.component) 

1182 # Must check for double depth first 

1183 if doubled in externalConfig: 1183 ↛ 1184line 1183 didn't jump to line 1184 because the condition on line 1183 was never true

1184 externalConfig = externalConfig[doubled] 

1185 elif self.component in externalConfig: 

1186 externalConfig._data = externalConfig._data[self.component] 

1187 

1188 # Default files read to create this configuration 

1189 self.filesRead: list[ResourcePath | str] = [] 

1190 

1191 # Assume we are not looking up child configurations 

1192 containerKey = None 

1193 

1194 # Sometimes we do not want to merge with defaults. 

1195 if mergeDefaults: 

1196 # Supplied search paths have highest priority 

1197 fullSearchPath: list[ResourcePath | str] = [] 

1198 if searchPaths: 1198 ↛ 1199line 1198 didn't jump to line 1199 because the condition on line 1198 was never true

1199 fullSearchPath = [ResourcePath(path, forceDirectory=True) for path in searchPaths] 

1200 

1201 # Read default paths from environment 

1202 fullSearchPath.extend(self.defaultSearchPaths()) 

1203 

1204 # There are two places to find defaults for this particular config 

1205 # - The "defaultConfigFile" defined in the subclass 

1206 # - The class specified in the "cls" element in the config. 

1207 # Read cls after merging in case it changes. 

1208 if self.defaultConfigFile is not None: 1208 ↛ 1213line 1208 didn't jump to line 1213 because the condition on line 1208 was always true

1209 self._updateWithConfigsFromPath(fullSearchPath, self.defaultConfigFile) 

1210 

1211 # Can have a class specification in the external config (priority) 

1212 # or from the defaults. 

1213 pytype = None 

1214 if "cls" in externalConfig: 1214 ↛ 1215line 1214 didn't jump to line 1215 because the condition on line 1214 was never true

1215 pytype = externalConfig["cls"] 

1216 elif "cls" in self: 1216 ↛ 1217line 1216 didn't jump to line 1217 because the condition on line 1216 was never true

1217 pytype = self["cls"] 

1218 

1219 if pytype is not None: 1219 ↛ 1220line 1219 didn't jump to line 1220 because the condition on line 1219 was never true

1220 try: 

1221 cls = doImportType(pytype) 

1222 except ImportError as e: 

1223 raise RuntimeError(f"Failed to import cls '{pytype}' for config {type(self)}") from e 

1224 # The class referenced from the config file is not required 

1225 # to specify a default config file. 

1226 defaultsFile = getattr(cls, "defaultConfigFile", None) 

1227 if defaultsFile is not None: 

1228 self._updateWithConfigsFromPath(fullSearchPath, defaultsFile) 

1229 

1230 # Get the container key in case we need it and it is specified. 

1231 containerKey = getattr(cls, "containerKey", None) 

1232 

1233 # Now update this object with the external values so that the external 

1234 # values always override the defaults 

1235 self.update(externalConfig) 

1236 if not self.configFile: 1236 ↛ 1242line 1236 didn't jump to line 1242 because the condition on line 1236 was always true

1237 self.configFile = externalConfig.configFile 

1238 

1239 # If this configuration has child configurations of the same 

1240 # config class, we need to expand those defaults as well. 

1241 

1242 if mergeDefaults and containerKey is not None and containerKey in self: 1242 ↛ 1243line 1242 didn't jump to line 1243 because the condition on line 1242 was never true

1243 for idx, subConfig in enumerate(self[containerKey]): 

1244 self[containerKey, idx] = type(self)( 

1245 other=subConfig, validate=validate, mergeDefaults=mergeDefaults, searchPaths=searchPaths 

1246 ) 

1247 

1248 if validate: 

1249 self.validate() 

1250 

1251 @classmethod 

1252 def defaultSearchPaths(cls) -> list[ResourcePath | str]: 

1253 """Read environment to determine search paths to use. 

1254 

1255 Global defaults, at lowest priority, are found in the ``config`` 

1256 directory of the butler source tree. Additional defaults can be 

1257 defined using the environment variable ``$DAF_BUTLER_CONFIG_PATH`` 

1258 which is a PATH-like variable where paths at the front of the list 

1259 have priority over those later. 

1260 

1261 Returns 

1262 ------- 

1263 paths : `list` 

1264 Returns a list of paths to search. The returned order is in 

1265 priority with the highest priority paths first. The butler config 

1266 configuration resources will not be included here but will 

1267 always be searched last. 

1268 

1269 Notes 

1270 ----- 

1271 The environment variable is split on the standard ``:`` path separator. 

1272 This currently makes it incompatible with usage of URIs. 

1273 """ 

1274 # We can pick up defaults from multiple search paths 

1275 # We fill defaults by using the butler config path and then 

1276 # the config path environment variable in reverse order. 

1277 defaultsPaths: list[str | ResourcePath] = [] 

1278 

1279 if CONFIG_PATH in os.environ: 1279 ↛ 1280line 1279 didn't jump to line 1280 because the condition on line 1279 was never true

1280 externalPaths = os.environ[CONFIG_PATH].split(os.pathsep) 

1281 defaultsPaths.extend(externalPaths) 

1282 

1283 # Add the package defaults as a resource 

1284 defaultsPaths.append(ResourcePath(f"resource://{cls.resourcesPackage}/configs", forceDirectory=True)) 

1285 return defaultsPaths 

1286 

1287 def _updateWithConfigsFromPath( 

1288 self, searchPaths: Sequence[str | ResourcePath], configFile: ResourcePath | str 

1289 ) -> None: 

1290 """Search the supplied paths, merging the configuration values. 

1291 

1292 The values read will override values currently stored in the object. 

1293 Every file found in the path will be read, such that the earlier 

1294 path entries have higher priority. 

1295 

1296 Parameters 

1297 ---------- 

1298 searchPaths : `list` of `lsst.resources.ResourcePath`, `str` 

1299 Paths to search for the supplied configFile. This path 

1300 is the priority order, such that files read from the 

1301 first path entry will be selected over those read from 

1302 a later path. Can contain `str` referring to the local file 

1303 system or a URI string. 

1304 configFile : `lsst.resources.ResourcePath` 

1305 File to locate in path. If absolute path it will be read 

1306 directly and the search path will not be used. Can be a URI 

1307 to an explicit resource (which will ignore the search path) 

1308 which is assumed to exist. 

1309 """ 

1310 uri = ResourcePath(configFile, forceDirectory=False) 

1311 if uri.isabs() and uri.exists(): 1311 ↛ 1313line 1311 didn't jump to line 1313 because the condition on line 1311 was never true

1312 # Assume this resource exists 

1313 self._updateWithOtherConfigFile(configFile) 

1314 self.filesRead.append(configFile) 

1315 else: 

1316 # Reverse order so that high priority entries 

1317 # update the object last. 

1318 for pathDir in reversed(searchPaths): 

1319 if isinstance(pathDir, str | ResourcePath): 1319 ↛ 1326line 1319 didn't jump to line 1326 because the condition on line 1319 was always true

1320 pathDir = ResourcePath(pathDir, forceDirectory=True) 

1321 file = pathDir.join(configFile) 

1322 if file.exists(): 1322 ↛ 1318line 1322 didn't jump to line 1318 because the condition on line 1322 was always true

1323 self.filesRead.append(file) 

1324 self._updateWithOtherConfigFile(file) 

1325 else: 

1326 raise ValueError(f"Unexpected search path type encountered: {pathDir!r}") 

1327 

1328 def _updateWithOtherConfigFile(self, file: Config | str | ResourcePath | Mapping[str, Any]) -> None: 

1329 """Read in some defaults and update. 

1330 

1331 Update the configuration by reading the supplied file as a config 

1332 of this class, and merging such that these values override the 

1333 current values. Contents of the external config are not validated. 

1334 

1335 Parameters 

1336 ---------- 

1337 file : `Config`, `str`, `lsst.resources.ResourcePath`, or `dict` 

1338 Entity that can be converted to a `ConfigSubset`. 

1339 """ 

1340 # Use this class to read the defaults so that subsetting can happen 

1341 # correctly. 

1342 externalConfig = type(self)(file, validate=False, mergeDefaults=False) 

1343 self.update(externalConfig) 

1344 

1345 def validate(self) -> None: 

1346 """Check that mandatory keys are present in this configuration. 

1347 

1348 Ignored if ``requiredKeys`` is empty. 

1349 """ 

1350 # Validation 

1351 missing = [k for k in self.requiredKeys if k not in self._data] 

1352 if missing: 1352 ↛ 1353line 1352 didn't jump to line 1353 because the condition on line 1352 was never true

1353 raise KeyError(f"Mandatory keys ({missing}) missing from supplied configuration for {type(self)}")