Coverage for python / lsst / daf / butler / registry / sql_registry.py: 22%

408 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-28 08:36 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30from .. import ddl 

31 

32__all__ = ("SqlRegistry",) 

33 

34import contextlib 

35import logging 

36import warnings 

37from collections.abc import Iterable, Iterator, Mapping, Sequence 

38from typing import TYPE_CHECKING, Any 

39 

40import sqlalchemy 

41 

42from lsst.resources import ResourcePathExpression 

43from lsst.utils.iteration import ensure_iterable 

44 

45from .._collection_type import CollectionType 

46from .._config import Config 

47from .._dataset_ref import DatasetId, DatasetIdGenEnum, DatasetRef 

48from .._dataset_type import DatasetType 

49from .._exceptions import DataIdValueError, DimensionNameError, InconsistentDataIdError 

50from .._storage_class import StorageClassFactory 

51from .._timespan import Timespan 

52from ..dimensions import ( 

53 DataCoordinate, 

54 DataId, 

55 DimensionConfig, 

56 DimensionElement, 

57 DimensionGroup, 

58 DimensionRecord, 

59 DimensionUniverse, 

60) 

61from ..dimensions.record_cache import DimensionRecordCache 

62from ..direct_query_driver import DirectQueryDriver 

63from ..progress import Progress 

64from ..queries import Query 

65from ..registry import ( 

66 CollectionExpressionError, 

67 CollectionSummary, 

68 CollectionTypeError, 

69 ConflictingDefinitionError, 

70 NoDefaultCollectionError, 

71 OrphanedRecordError, 

72 RegistryConfig, 

73 RegistryDefaults, 

74) 

75from ..registry.interfaces import ChainedCollectionRecord, ReadOnlyDatabaseError, RunRecord 

76from ..registry.managers import RegistryManagerInstances, RegistryManagerTypes 

77from ..registry.wildcards import CollectionWildcard, DatasetTypeWildcard 

78from ..utils import transactional 

79from .expand_data_ids import expand_data_ids 

80 

81if TYPE_CHECKING: 

82 from .._butler_config import ButlerConfig 

83 from ..datastore._datastore import DatastoreOpaqueTable 

84 from ..datastore.stored_file_info import StoredDatastoreItemInfo 

85 from ..registry.interfaces import ( 

86 CollectionRecord, 

87 Database, 

88 DatastoreRegistryBridgeManager, 

89 ObsCoreTableManager, 

90 ) 

91 

92 

93_LOG = logging.getLogger(__name__) 

94 

95 

96class SqlRegistry: 

97 """Butler Registry implementation that uses SQL database as backend. 

98 

99 Parameters 

100 ---------- 

101 database : `Database` 

102 Database instance to store Registry. 

103 defaults : `RegistryDefaults` 

104 Default collection search path and/or output `~CollectionType.RUN` 

105 collection. 

106 managers : `RegistryManagerInstances` 

107 All the managers required for this registry. 

108 """ 

109 

110 defaultConfigFile: str | None = None 

111 """Path to configuration defaults. Accessed within the ``configs`` resource 

112 or relative to a search path. Can be None if no defaults specified. 

113 """ 

114 

115 @classmethod 

116 def forceRegistryConfig( 

117 cls, config: ButlerConfig | RegistryConfig | Config | str | None 

118 ) -> RegistryConfig: 

119 """Force the supplied config to a `RegistryConfig`. 

120 

121 Parameters 

122 ---------- 

123 config : `RegistryConfig`, `Config` or `str` or `None` 

124 Registry configuration, if missing then default configuration will 

125 be loaded from registry.yaml. 

126 

127 Returns 

128 ------- 

129 registry_config : `RegistryConfig` 

130 A registry config. 

131 """ 

132 if not isinstance(config, RegistryConfig): 

133 if isinstance(config, str | Config) or config is None: 

134 config = RegistryConfig(config) 

135 else: 

136 raise ValueError(f"Incompatible Registry configuration: {config}") 

137 return config 

138 

139 @classmethod 

140 def createFromConfig( 

141 cls, 

142 config: RegistryConfig | str | None = None, 

143 dimensionConfig: DimensionConfig | str | None = None, 

144 butlerRoot: ResourcePathExpression | None = None, 

145 ) -> SqlRegistry: 

146 """Create registry database and return `SqlRegistry` instance. 

147 

148 This method initializes database contents, database must be empty 

149 prior to calling this method. 

150 

151 Parameters 

152 ---------- 

153 config : `RegistryConfig` or `str`, optional 

154 Registry configuration, if missing then default configuration will 

155 be loaded from registry.yaml. 

156 dimensionConfig : `DimensionConfig` or `str`, optional 

157 Dimensions configuration, if missing then default configuration 

158 will be loaded from dimensions.yaml. 

159 butlerRoot : convertible to `lsst.resources.ResourcePath`, optional 

160 Path to the repository root this `SqlRegistry` will manage. 

161 

162 Returns 

163 ------- 

164 registry : `SqlRegistry` 

165 A new `SqlRegistry` instance. 

166 """ 

167 config = cls.forceRegistryConfig(config) 

168 config.replaceRoot(butlerRoot) 

169 

170 if isinstance(dimensionConfig, str): 

171 dimensionConfig = DimensionConfig(dimensionConfig) 

172 elif dimensionConfig is None: 

173 dimensionConfig = DimensionConfig() 

174 elif not isinstance(dimensionConfig, DimensionConfig): 

175 raise TypeError(f"Incompatible Dimension configuration type: {type(dimensionConfig)}") 

176 

177 managerTypes = RegistryManagerTypes.fromConfig(config) 

178 DatabaseClass = config.getDatabaseClass() 

179 database = DatabaseClass.fromUri( 

180 config.connectionString, 

181 origin=config.get("origin", 0), 

182 namespace=config.get("namespace"), 

183 allow_temporary_tables=config.areTemporaryTablesAllowed, 

184 ) 

185 

186 try: 

187 managers = managerTypes.makeRepo(database, dimensionConfig) 

188 return cls(database, RegistryDefaults(), managers) 

189 except Exception: 

190 database.dispose() 

191 raise 

192 

193 @classmethod 

194 def fromConfig( 

195 cls, 

196 config: ButlerConfig | RegistryConfig | Config | str, 

197 butlerRoot: ResourcePathExpression | None = None, 

198 writeable: bool = True, 

199 defaults: RegistryDefaults | None = None, 

200 ) -> SqlRegistry: 

201 """Create `Registry` subclass instance from `config`. 

202 

203 Registry database must be initialized prior to calling this method. 

204 

205 Parameters 

206 ---------- 

207 config : `ButlerConfig`, `RegistryConfig`, `Config` or `str` 

208 Registry configuration. 

209 butlerRoot : `lsst.resources.ResourcePathExpression`, optional 

210 Path to the repository root this `Registry` will manage. 

211 writeable : `bool`, optional 

212 If `True` (default) create a read-write connection to the database. 

213 defaults : `RegistryDefaults`, optional 

214 Default collection search path and/or output `~CollectionType.RUN` 

215 collection. 

216 

217 Returns 

218 ------- 

219 registry : `SqlRegistry` 

220 A new `SqlRegistry` subclass instance. 

221 """ 

222 config = cls.forceRegistryConfig(config) 

223 config.replaceRoot(butlerRoot) 

224 if defaults is None: 

225 defaults = RegistryDefaults() 

226 DatabaseClass = config.getDatabaseClass() 

227 database = DatabaseClass.fromUri( 

228 config.connectionString, 

229 origin=config.get("origin", 0), 

230 namespace=config.get("namespace"), 

231 writeable=writeable, 

232 allow_temporary_tables=config.areTemporaryTablesAllowed, 

233 ) 

234 try: 

235 managerTypes = RegistryManagerTypes.fromConfig(config) 

236 with database.session(): 

237 managers = managerTypes.loadRepo(database) 

238 

239 return cls(database, defaults, managers) 

240 except Exception: 

241 database.dispose() 

242 raise 

243 

244 def __init__( 

245 self, 

246 database: Database, 

247 defaults: RegistryDefaults, 

248 managers: RegistryManagerInstances, 

249 ): 

250 self._db = database 

251 self._managers = managers 

252 if managers.obscore is not None: 

253 managers.obscore.set_query_function(self._query) 

254 self.storageClasses = StorageClassFactory() 

255 # This is public to SqlRegistry's internal-to-daf_butler callers, but 

256 # it is intentionally not part of RegistryShim. 

257 self.dimension_record_cache = DimensionRecordCache( 

258 self._managers.dimensions.universe, 

259 fetch=self._managers.dimensions.fetch_cache_dict, 

260 ) 

261 # Intentionally invoke property setter to initialize defaults. This 

262 # can only be done after most of the rest of Registry has already been 

263 # initialized, and must be done before the property getter is used. 

264 self.defaults = defaults 

265 # TODO: This is currently initialized by `make_datastore_tables`, 

266 # eventually we'll need to do it during construction. 

267 # The mapping is indexed by the opaque table name. 

268 self._datastore_record_classes: Mapping[str, type[StoredDatastoreItemInfo]] = {} 

269 self._is_clone = False 

270 

271 def close(self) -> None: 

272 # Connection pool is shared between cloned instances, so only the root 

273 # instance should close it. 

274 # Note: The underlying SQLAlchemy call will create a fresh connection 

275 # pool, so nothing breaks if the root instance is accidentally closed 

276 # before the clones are finished -- we just have a small performance 

277 # hit from re-creating the connections. 

278 if not self._is_clone: 

279 self._db.dispose() 

280 

281 def __str__(self) -> str: 

282 return str(self._db) 

283 

284 def __repr__(self) -> str: 

285 return f"SqlRegistry({self._db!r}, {self.dimensions!r})" 

286 

287 def isWriteable(self) -> bool: 

288 """Return `True` if this registry allows write operations, and `False` 

289 otherwise. 

290 """ 

291 return self._db.isWriteable() 

292 

293 def copy(self, defaults: RegistryDefaults | None = None) -> SqlRegistry: 

294 """Create a new `SqlRegistry` backed by the same data repository 

295 as this one and sharing a database connection pool with it, but with 

296 independent defaults and database sessions. 

297 

298 Parameters 

299 ---------- 

300 defaults : `~lsst.daf.butler.registry.RegistryDefaults`, optional 

301 Default collections and data ID values for the new registry. If 

302 not provided, ``self.defaults`` will be used (but future changes 

303 to either registry's defaults will not affect the other). 

304 

305 Returns 

306 ------- 

307 copy : `SqlRegistry` 

308 A new `SqlRegistry` instance with its own defaults. 

309 """ 

310 if defaults is None: 

311 # No need to copy, because `RegistryDefaults` is immutable; we 

312 # effectively copy on write. 

313 defaults = self.defaults 

314 db = self._db.clone() 

315 result = SqlRegistry(db, defaults, self._managers.clone(db)) 

316 result._datastore_record_classes = dict(self._datastore_record_classes) 

317 result.dimension_record_cache.load_from(self.dimension_record_cache) 

318 result._is_clone = True 

319 return result 

320 

321 @property 

322 def dimensions(self) -> DimensionUniverse: 

323 """Definitions of all dimensions recognized by this `Registry` 

324 (`DimensionUniverse`). 

325 """ 

326 return self._managers.dimensions.universe 

327 

328 @property 

329 def defaults(self) -> RegistryDefaults: 

330 """Default collection search path and/or output `~CollectionType.RUN` 

331 collection (`~lsst.daf.butler.registry.RegistryDefaults`). 

332 

333 This is an immutable struct whose components may not be set 

334 individually, but the entire struct can be set by assigning to this 

335 property. 

336 """ 

337 return self._defaults 

338 

339 @defaults.setter 

340 def defaults(self, value: RegistryDefaults) -> None: 

341 if value.run is not None: 

342 self.registerRun(value.run) 

343 value.finish(self) 

344 self._defaults = value 

345 

346 def refresh(self) -> None: 

347 """Refresh all in-memory state by querying the database. 

348 

349 This may be necessary to enable querying for entities added by other 

350 registry instances after this one was constructed. 

351 """ 

352 self.dimension_record_cache.reset() 

353 with self._db.transaction(): 

354 self._managers.refresh() 

355 

356 def refresh_collection_summaries(self) -> None: 

357 """Refresh content of the collection summary tables in the database. 

358 

359 This only cleans dataset type summaries, we may want to add cleanup of 

360 governor summaries later. 

361 """ 

362 for dataset_type in self.queryDatasetTypes(): 

363 self._managers.datasets.refresh_collection_summaries(dataset_type) 

364 

365 def caching_context(self) -> contextlib.AbstractContextManager[None]: 

366 """Return context manager that enables caching. 

367 

368 Returns 

369 ------- 

370 manager 

371 A context manager that enables client-side caching. Entering 

372 the context returns `None`. 

373 """ 

374 return self._managers.caching_context_manager() 

375 

376 @contextlib.contextmanager 

377 def transaction(self, *, savepoint: bool = False) -> Iterator[None]: 

378 """Return a context manager that represents a transaction. 

379 

380 Parameters 

381 ---------- 

382 savepoint : `bool` 

383 Whether to issue a SAVEPOINT in the database. 

384 

385 Yields 

386 ------ 

387 `None` 

388 """ 

389 with self._db.transaction(savepoint=savepoint): 

390 yield 

391 

392 def resetConnectionPool(self) -> None: 

393 """Reset SQLAlchemy connection pool for `SqlRegistry` database. 

394 

395 This operation is useful when using registry with fork-based 

396 multiprocessing. To use registry across fork boundary one has to make 

397 sure that there are no currently active connections (no session or 

398 transaction is in progress) and connection pool is reset using this 

399 method. This method should be called by the child process immediately 

400 after the fork. 

401 """ 

402 self._db._engine.dispose() 

403 

404 def registerOpaqueTable(self, tableName: str, spec: ddl.TableSpec) -> None: 

405 """Add an opaque (to the `Registry`) table for use by a `Datastore` or 

406 other data repository client. 

407 

408 Opaque table records can be added via `insertOpaqueData`, retrieved via 

409 `fetchOpaqueData`, and removed via `deleteOpaqueData`. 

410 

411 Parameters 

412 ---------- 

413 tableName : `str` 

414 Logical name of the opaque table. This may differ from the 

415 actual name used in the database by a prefix and/or suffix. 

416 spec : `ddl.TableSpec` 

417 Specification for the table to be added. 

418 """ 

419 self._managers.opaque.register(tableName, spec) 

420 

421 @transactional 

422 def insertOpaqueData(self, tableName: str, *data: dict) -> None: 

423 """Insert records into an opaque table. 

424 

425 Parameters 

426 ---------- 

427 tableName : `str` 

428 Logical name of the opaque table. Must match the name used in a 

429 previous call to `registerOpaqueTable`. 

430 *data 

431 Each additional positional argument is a dictionary that represents 

432 a single row to be added. 

433 """ 

434 self._managers.opaque[tableName].insert(*data) 

435 

436 def fetchOpaqueData(self, tableName: str, **where: Any) -> Iterator[Mapping[str, Any]]: 

437 """Retrieve records from an opaque table. 

438 

439 Parameters 

440 ---------- 

441 tableName : `str` 

442 Logical name of the opaque table. Must match the name used in a 

443 previous call to `registerOpaqueTable`. 

444 **where 

445 Additional keyword arguments are interpreted as equality 

446 constraints that restrict the returned rows (combined with AND); 

447 keyword arguments are column names and values are the values they 

448 must have. 

449 

450 Yields 

451 ------ 

452 row : `dict` 

453 A dictionary representing a single result row. 

454 """ 

455 yield from self._managers.opaque[tableName].fetch(**where) 

456 

457 @transactional 

458 def deleteOpaqueData(self, tableName: str, **where: Any) -> None: 

459 """Remove records from an opaque table. 

460 

461 Parameters 

462 ---------- 

463 tableName : `str` 

464 Logical name of the opaque table. Must match the name used in a 

465 previous call to `registerOpaqueTable`. 

466 **where 

467 Additional keyword arguments are interpreted as equality 

468 constraints that restrict the deleted rows (combined with AND); 

469 keyword arguments are column names and values are the values they 

470 must have. 

471 """ 

472 self._managers.opaque[tableName].delete(where.keys(), where) 

473 

474 def registerCollection( 

475 self, name: str, type: CollectionType = CollectionType.TAGGED, doc: str | None = None 

476 ) -> bool: 

477 """Add a new collection if one with the given name does not exist. 

478 

479 Parameters 

480 ---------- 

481 name : `str` 

482 The name of the collection to create. 

483 type : `CollectionType` 

484 Enum value indicating the type of collection to create. 

485 doc : `str`, optional 

486 Documentation string for the collection. 

487 

488 Returns 

489 ------- 

490 registered : `bool` 

491 Boolean indicating whether the collection was already registered 

492 or was created by this call. 

493 

494 Notes 

495 ----- 

496 This method cannot be called within transactions, as it needs to be 

497 able to perform its own transaction to be concurrent. 

498 """ 

499 _, registered = self._managers.collections.register(name, type, doc=doc) 

500 return registered 

501 

502 def getCollectionType(self, name: str) -> CollectionType: 

503 """Return an enumeration value indicating the type of the given 

504 collection. 

505 

506 Parameters 

507 ---------- 

508 name : `str` 

509 The name of the collection. 

510 

511 Returns 

512 ------- 

513 type : `CollectionType` 

514 Enum value indicating the type of this collection. 

515 

516 Raises 

517 ------ 

518 lsst.daf.butler.registry.MissingCollectionError 

519 Raised if no collection with the given name exists. 

520 """ 

521 return self._managers.collections.find(name).type 

522 

523 def get_collection_record(self, name: str) -> CollectionRecord: 

524 """Return the record for this collection. 

525 

526 Parameters 

527 ---------- 

528 name : `str` 

529 Name of the collection for which the record is to be retrieved. 

530 

531 Returns 

532 ------- 

533 record : `CollectionRecord` 

534 The record for this collection. 

535 """ 

536 return self._managers.collections.find(name) 

537 

538 def registerRun(self, name: str, doc: str | None = None) -> bool: 

539 """Add a new run if one with the given name does not exist. 

540 

541 Parameters 

542 ---------- 

543 name : `str` 

544 The name of the run to create. 

545 doc : `str`, optional 

546 Documentation string for the collection. 

547 

548 Returns 

549 ------- 

550 registered : `bool` 

551 Boolean indicating whether a new run was registered. `False` 

552 if it already existed. 

553 

554 Notes 

555 ----- 

556 This method cannot be called within transactions, as it needs to be 

557 able to perform its own transaction to be concurrent. 

558 """ 

559 _, registered = self._managers.collections.register(name, CollectionType.RUN, doc=doc) 

560 return registered 

561 

562 @transactional 

563 def removeCollection(self, name: str) -> None: 

564 """Remove the given collection from the registry. 

565 

566 Parameters 

567 ---------- 

568 name : `str` 

569 The name of the collection to remove. 

570 

571 Raises 

572 ------ 

573 lsst.daf.butler.registry.MissingCollectionError 

574 Raised if no collection with the given name exists. 

575 sqlalchemy.exc.IntegrityError 

576 Raised if the database rows associated with the collection are 

577 still referenced by some other table, such as a dataset in a 

578 datastore (for `~CollectionType.RUN` collections only) or a 

579 `~CollectionType.CHAINED` collection of which this collection is 

580 a child. 

581 

582 Notes 

583 ----- 

584 If this is a `~CollectionType.RUN` collection, all datasets and quanta 

585 in it will removed from the `Registry` database. This requires that 

586 those datasets be removed (or at least trashed) from any datastores 

587 that hold them first. 

588 

589 A collection may not be deleted as long as it is referenced by a 

590 `~CollectionType.CHAINED` collection; the ``CHAINED`` collection must 

591 be deleted or redefined first. 

592 """ 

593 self._managers.collections.remove(name) 

594 

595 def getCollectionChain(self, parent: str) -> tuple[str, ...]: 

596 """Return the child collections in a `~CollectionType.CHAINED` 

597 collection. 

598 

599 Parameters 

600 ---------- 

601 parent : `str` 

602 Name of the chained collection. Must have already been added via 

603 a call to `Registry.registerCollection`. 

604 

605 Returns 

606 ------- 

607 children : `~collections.abc.Sequence` [ `str` ] 

608 An ordered sequence of collection names that are searched when the 

609 given chained collection is searched. 

610 

611 Raises 

612 ------ 

613 lsst.daf.butler.registry.MissingCollectionError 

614 Raised if ``parent`` does not exist in the `Registry`. 

615 lsst.daf.butler.registry.CollectionTypeError 

616 Raised if ``parent`` does not correspond to a 

617 `~CollectionType.CHAINED` collection. 

618 """ 

619 record = self._managers.collections.find(parent) 

620 if record.type is not CollectionType.CHAINED: 

621 raise CollectionTypeError(f"Collection '{parent}' has type {record.type.name}, not CHAINED.") 

622 assert isinstance(record, ChainedCollectionRecord) 

623 return record.children 

624 

625 @transactional 

626 def setCollectionChain(self, parent: str, children: Any, *, flatten: bool = False) -> None: 

627 """Define or redefine a `~CollectionType.CHAINED` collection. 

628 

629 Parameters 

630 ---------- 

631 parent : `str` 

632 Name of the chained collection. Must have already been added via 

633 a call to `Registry.registerCollection`. 

634 children : collection expression 

635 An expression defining an ordered search of child collections, 

636 generally an iterable of `str`; see 

637 :ref:`daf_butler_collection_expressions` for more information. 

638 flatten : `bool`, optional 

639 If `True` (`False` is default), recursively flatten out any nested 

640 `~CollectionType.CHAINED` collections in ``children`` first. 

641 

642 Raises 

643 ------ 

644 lsst.daf.butler.registry.MissingCollectionError 

645 Raised when any of the given collections do not exist in the 

646 `Registry`. 

647 lsst.daf.butler.registry.CollectionTypeError 

648 Raised if ``parent`` does not correspond to a 

649 `~CollectionType.CHAINED` collection. 

650 CollectionCycleError 

651 Raised if the given collections contains a cycle. 

652 

653 Notes 

654 ----- 

655 If this function is called within a call to ``Butler.transaction``, it 

656 will hold a lock that prevents other processes from modifying the 

657 parent collection until the end of the transaction. Keep these 

658 transactions short. 

659 """ 

660 children = CollectionWildcard.from_expression(children).require_ordered() 

661 if flatten: 

662 children = self.queryCollections(children, flattenChains=True) 

663 

664 self._managers.collections.update_chain(parent, list(children), allow_use_in_caching_context=True) 

665 

666 def getCollectionParentChains(self, collection: str) -> set[str]: 

667 """Return the CHAINED collections that directly contain the given one. 

668 

669 Parameters 

670 ---------- 

671 collection : `str` 

672 Name of the collection. 

673 

674 Returns 

675 ------- 

676 chains : `set` of `str` 

677 Set of `~CollectionType.CHAINED` collection names. 

678 """ 

679 return self._managers.collections.getParentChains(self._managers.collections.find(collection).key) 

680 

681 def getCollectionDocumentation(self, collection: str) -> str | None: 

682 """Retrieve the documentation string for a collection. 

683 

684 Parameters 

685 ---------- 

686 collection : `str` 

687 Name of the collection. 

688 

689 Returns 

690 ------- 

691 docs : `str` or `None` 

692 Docstring for the collection with the given name. 

693 """ 

694 return self._managers.collections.getDocumentation(self._managers.collections.find(collection).key) 

695 

696 def setCollectionDocumentation(self, collection: str, doc: str | None) -> None: 

697 """Set the documentation string for a collection. 

698 

699 Parameters 

700 ---------- 

701 collection : `str` 

702 Name of the collection. 

703 doc : `str` or `None` 

704 Docstring for the collection with the given name; will replace any 

705 existing docstring. Passing `None` will remove any existing 

706 docstring. 

707 """ 

708 self._managers.collections.setDocumentation(self._managers.collections.find(collection).key, doc) 

709 

710 def getCollectionSummary(self, collection: str) -> CollectionSummary: 

711 """Return a summary for the given collection. 

712 

713 Parameters 

714 ---------- 

715 collection : `str` 

716 Name of the collection for which a summary is to be retrieved. 

717 

718 Returns 

719 ------- 

720 summary : `~lsst.daf.butler.registry.CollectionSummary` 

721 Summary of the dataset types and governor dimension values in 

722 this collection. 

723 """ 

724 record = self._managers.collections.find(collection) 

725 return self._managers.datasets.getCollectionSummary(record) 

726 

727 def registerDatasetType(self, datasetType: DatasetType) -> bool: 

728 """Add a new `DatasetType` to the Registry. 

729 

730 It is not an error to register the same `DatasetType` twice. 

731 

732 Parameters 

733 ---------- 

734 datasetType : `DatasetType` 

735 The `DatasetType` to be added. 

736 

737 Returns 

738 ------- 

739 inserted : `bool` 

740 `True` if ``datasetType`` was inserted, `False` if an identical 

741 existing `DatasetType` was found. Note that in either case the 

742 DatasetType is guaranteed to be defined in the Registry 

743 consistently with the given definition. 

744 

745 Raises 

746 ------ 

747 ValueError 

748 Raised if the dimensions or storage class are invalid. 

749 lsst.daf.butler.registry.ConflictingDefinitionError 

750 Raised if this `DatasetType` is already registered with a different 

751 definition. 

752 

753 Notes 

754 ----- 

755 This method cannot be called within transactions, as it needs to be 

756 able to perform its own transaction to be concurrent. 

757 """ 

758 return self._managers.datasets.register_dataset_type(datasetType) 

759 

760 def removeDatasetType(self, name: str | tuple[str, ...]) -> None: 

761 """Remove the named `DatasetType` from the registry. 

762 

763 .. warning:: 

764 

765 Registry implementations can cache the dataset type definitions. 

766 This means that deleting the dataset type definition may result in 

767 unexpected behavior from other butler processes that are active 

768 that have not seen the deletion. 

769 

770 Parameters 

771 ---------- 

772 name : `str` or `tuple` [`str`] 

773 Name of the type to be removed or tuple containing a list of type 

774 names to be removed. Wildcards are allowed. 

775 

776 Raises 

777 ------ 

778 lsst.daf.butler.registry.OrphanedRecordError 

779 Raised if an attempt is made to remove the dataset type definition 

780 when there are already datasets associated with it. 

781 

782 Notes 

783 ----- 

784 If the dataset type is not registered the method will return without 

785 action. 

786 """ 

787 for datasetTypeExpression in ensure_iterable(name): 

788 # Catch any warnings from the caller specifying a component 

789 # dataset type. This will result in an error later but the 

790 # warning could be confusing when the caller is not querying 

791 # anything. 

792 with warnings.catch_warnings(): 

793 warnings.simplefilter("ignore", category=FutureWarning) 

794 datasetTypes = list(self.queryDatasetTypes(datasetTypeExpression)) 

795 if not datasetTypes: 

796 _LOG.info("Dataset type %r not defined", datasetTypeExpression) 

797 else: 

798 for datasetType in datasetTypes: 

799 self._managers.datasets.remove_dataset_type(datasetType.name) 

800 _LOG.info("Removed dataset type %r", datasetType.name) 

801 

802 def getDatasetType(self, name: str) -> DatasetType: 

803 """Get the `DatasetType`. 

804 

805 Parameters 

806 ---------- 

807 name : `str` 

808 Name of the type. 

809 

810 Returns 

811 ------- 

812 type : `DatasetType` 

813 The `DatasetType` associated with the given name. 

814 

815 Raises 

816 ------ 

817 lsst.daf.butler.registry.MissingDatasetTypeError 

818 Raised if the requested dataset type has not been registered. 

819 

820 Notes 

821 ----- 

822 This method handles component dataset types automatically, though most 

823 other registry operations do not. 

824 """ 

825 parent_name, component = DatasetType.splitDatasetTypeName(name) 

826 parent_dataset_type = self._managers.datasets.get_dataset_type(parent_name) 

827 if component is None: 

828 return parent_dataset_type 

829 else: 

830 return parent_dataset_type.makeComponentDatasetType(component) 

831 

832 def supportsIdGenerationMode(self, mode: DatasetIdGenEnum) -> bool: 

833 """Test whether the given dataset ID generation mode is supported by 

834 `insertDatasets`. 

835 

836 Parameters 

837 ---------- 

838 mode : `DatasetIdGenEnum` 

839 Enum value for the mode to test. 

840 

841 Returns 

842 ------- 

843 supported : `bool` 

844 Whether the given mode is supported. 

845 """ 

846 return True 

847 

848 @transactional 

849 def insertDatasets( 

850 self, 

851 datasetType: DatasetType | str, 

852 dataIds: Iterable[DataId], 

853 run: str | None = None, 

854 expand: bool = True, 

855 idGenerationMode: DatasetIdGenEnum = DatasetIdGenEnum.UNIQUE, 

856 ) -> list[DatasetRef]: 

857 """Insert one or more datasets into the `Registry`. 

858 

859 This always adds new datasets; to associate existing datasets with 

860 a new collection, use ``associate``. 

861 

862 Parameters 

863 ---------- 

864 datasetType : `DatasetType` or `str` 

865 A `DatasetType` or the name of one. 

866 dataIds : `~collections.abc.Iterable` of `dict` or `DataCoordinate` 

867 Dimension-based identifiers for the new datasets. 

868 run : `str`, optional 

869 The name of the run that produced the datasets. Defaults to 

870 ``self.defaults.run``. 

871 expand : `bool`, optional 

872 If `True` (default), expand data IDs as they are inserted. This is 

873 necessary in general to allow datastore to generate file templates, 

874 but it may be disabled if the caller can guarantee this is 

875 unnecessary. 

876 idGenerationMode : `DatasetIdGenEnum`, optional 

877 Specifies option for generating dataset IDs. By default unique IDs 

878 are generated for each inserted dataset. 

879 

880 Returns 

881 ------- 

882 refs : `list` of `DatasetRef` 

883 Resolved `DatasetRef` instances for all given data IDs (in the same 

884 order). 

885 

886 Raises 

887 ------ 

888 lsst.daf.butler.registry.DatasetTypeError 

889 Raised if ``datasetType`` is not known to registry. 

890 lsst.daf.butler.registry.CollectionTypeError 

891 Raised if ``run`` collection type is not `~CollectionType.RUN`. 

892 lsst.daf.butler.registry.NoDefaultCollectionError 

893 Raised if ``run`` is `None` and ``self.defaults.run`` is `None`. 

894 lsst.daf.butler.registry.ConflictingDefinitionError 

895 If a dataset with the same dataset type and data ID as one of those 

896 given already exists in ``run``. 

897 lsst.daf.butler.registry.MissingCollectionError 

898 Raised if ``run`` does not exist in the registry. 

899 """ 

900 datasetType = self._managers.datasets.conform_exact_dataset_type(datasetType) 

901 if run is None: 

902 if self.defaults.run is None: 

903 raise NoDefaultCollectionError( 

904 "No run provided to insertDatasets, and no default from registry construction." 

905 ) 

906 run = self.defaults.run 

907 runRecord = self._managers.collections.find(run) 

908 if runRecord.type is not CollectionType.RUN: 

909 raise CollectionTypeError( 

910 f"Given collection is of type {runRecord.type.name}; RUN collection required." 

911 ) 

912 assert isinstance(runRecord, RunRecord) 

913 

914 expandedDataIds = [ 

915 DataCoordinate.standardize(dataId, dimensions=datasetType.dimensions) for dataId in dataIds 

916 ] 

917 if expand: 

918 _LOG.debug("Expanding %d data IDs", len(expandedDataIds)) 

919 expandedDataIds = self.expand_data_ids(expandedDataIds) 

920 _LOG.debug("Finished expanding data IDs") 

921 

922 try: 

923 refs = list( 

924 self._managers.datasets.insert(datasetType.name, runRecord, expandedDataIds, idGenerationMode) 

925 ) 

926 if self._managers.obscore: 

927 self._managers.obscore.add_datasets(refs) 

928 except sqlalchemy.exc.IntegrityError as err: 

929 raise ConflictingDefinitionError( 

930 "A database constraint failure was triggered by inserting " 

931 f"one or more datasets of type {datasetType} into " 

932 f"collection '{run}'. " 

933 "This probably means a dataset with the same data ID " 

934 "and dataset type already exists, but it may also mean a " 

935 "dimension row is missing." 

936 ) from err 

937 return refs 

938 

939 @transactional 

940 def _importDatasets( 

941 self, 

942 datasets: Iterable[DatasetRef], 

943 expand: bool = True, 

944 assume_new: bool = False, 

945 ) -> list[DatasetRef]: 

946 """Import one or more datasets into the `Registry`. 

947 

948 This differs from `insertDatasets` method in that this method accepts 

949 `DatasetRef` instances, which already have a dataset ID. 

950 

951 Parameters 

952 ---------- 

953 datasets : `~collections.abc.Iterable` of `DatasetRef` 

954 Datasets to be inserted. All `DatasetRef` instances must have 

955 identical ``run`` attributes. ``run`` 

956 attribute can be `None` and defaults to ``self.defaults.run``. 

957 Datasets can specify ``id`` attribute which will be used for 

958 inserted datasets. 

959 Datasets can be of multiple dataset types, but all the dataset 

960 types must have the same set of dimensions. 

961 expand : `bool`, optional 

962 If `True` (default), expand data IDs as they are inserted. This is 

963 necessary in general, but it may be disabled if the caller can 

964 guarantee this is unnecessary. 

965 assume_new : `bool`, optional 

966 If `True`, assume datasets are new. If `False`, datasets that are 

967 identical to an existing one are ignored. 

968 

969 Returns 

970 ------- 

971 refs : `list` of `DatasetRef` 

972 `DatasetRef` instances for all given data IDs (in the same order). 

973 If any of ``datasets`` has an ID which already exists in the 

974 database then it will not be inserted or updated, but a 

975 `DatasetRef` will be returned for it in any case. 

976 

977 Raises 

978 ------ 

979 lsst.daf.butler.registry.NoDefaultCollectionError 

980 Raised if ``run`` is `None` and ``self.defaults.run`` is `None`. 

981 lsst.daf.butler.registry.DatasetTypeError 

982 Raised if a dataset type is not known to registry. 

983 lsst.daf.butler.registry.ConflictingDefinitionError 

984 If a dataset with the same dataset type and data ID as one of those 

985 given already exists in ``run``, or if ``assume_new=True`` and at 

986 least one dataset is not new. 

987 lsst.daf.butler.registry.MissingCollectionError 

988 Raised if ``run`` does not exist in the registry. 

989 

990 Notes 

991 ----- 

992 This method is considered middleware-internal. 

993 """ 

994 datasets = list(datasets) 

995 if not datasets: 

996 # nothing to do 

997 return [] 

998 

999 # find run name 

1000 runs = {dataset.run for dataset in datasets} 

1001 if len(runs) != 1: 

1002 raise ValueError(f"Multiple run names in input datasets: {runs}") 

1003 run = runs.pop() 

1004 

1005 runRecord = self._managers.collections.find(run) 

1006 if runRecord.type is not CollectionType.RUN: 

1007 raise CollectionTypeError( 

1008 f"Given collection '{runRecord.name}' is of type {runRecord.type.name};" 

1009 " RUN collection required." 

1010 ) 

1011 assert isinstance(runRecord, RunRecord) 

1012 

1013 if expand: 

1014 _LOG.debug("Expanding %d data IDs", len(datasets)) 

1015 datasets = self.expand_refs(datasets) 

1016 _LOG.debug("Finished expanding data IDs") 

1017 

1018 try: 

1019 self._managers.datasets.import_(runRecord, datasets, assume_new=assume_new) 

1020 if self._managers.obscore: 

1021 self._managers.obscore.add_datasets(datasets) 

1022 except sqlalchemy.exc.IntegrityError as err: 

1023 raise ConflictingDefinitionError( 

1024 "A database constraint failure was triggered by inserting " 

1025 f"one or more datasets into collection '{run}'. " 

1026 "This probably means a dataset with the same data ID " 

1027 "and dataset type already exists, but it may also mean a " 

1028 "dimension row is missing, or the dataset was assumed to be " 

1029 "new when it was not." 

1030 ) from err 

1031 return datasets 

1032 

1033 def getDataset(self, id: DatasetId) -> DatasetRef | None: 

1034 """Retrieve a Dataset entry. 

1035 

1036 Parameters 

1037 ---------- 

1038 id : `DatasetId` 

1039 The unique identifier for the dataset. 

1040 

1041 Returns 

1042 ------- 

1043 ref : `DatasetRef` or `None` 

1044 A ref to the Dataset, or `None` if no matching Dataset 

1045 was found. 

1046 """ 

1047 refs = self._managers.datasets.get_dataset_refs([id]) 

1048 if len(refs) == 0: 

1049 return None 

1050 else: 

1051 return refs[0] 

1052 

1053 def _fetch_run_dataset_ids(self, run: str) -> list[DatasetId]: 

1054 """Return the IDs of all datasets in the given ``RUN`` 

1055 collection. 

1056 

1057 Parameters 

1058 ---------- 

1059 run : `str` 

1060 Name of the collection. 

1061 

1062 Returns 

1063 ------- 

1064 dataset_ids : `list` [`uuid.UUID`] 

1065 List of dataset IDs. 

1066 

1067 Notes 

1068 ----- 

1069 This is a middleware-internal interface. 

1070 """ 

1071 run_record = self._managers.collections.find(run) 

1072 if not isinstance(run_record, RunRecord): 

1073 raise CollectionTypeError(f"{run!r} is not a RUN collection.") 

1074 return self._managers.datasets.fetch_run_dataset_ids(run_record) 

1075 

1076 @transactional 

1077 def removeDatasets(self, refs: Iterable[DatasetRef]) -> None: 

1078 """Remove datasets from the Registry. 

1079 

1080 The datasets will be removed unconditionally from all collections. 

1081 `Datastore` records will *not* be deleted; the caller is responsible 

1082 for ensuring that the dataset has already been removed from all 

1083 Datastores. 

1084 

1085 Parameters 

1086 ---------- 

1087 refs : `~collections.abc.Iterable` [`DatasetRef`] 

1088 References to the datasets to be removed. Should be considered 

1089 invalidated upon return. 

1090 

1091 Raises 

1092 ------ 

1093 lsst.daf.butler.registry.OrphanedRecordError 

1094 Raised if any dataset is still present in any `Datastore`. 

1095 """ 

1096 try: 

1097 self._managers.datasets.delete(refs) 

1098 except sqlalchemy.exc.IntegrityError as err: 

1099 raise OrphanedRecordError( 

1100 "One or more datasets is still present in one or more Datastores." 

1101 ) from err 

1102 

1103 @transactional 

1104 def associate(self, collection: str, refs: Iterable[DatasetRef]) -> None: 

1105 """Add existing datasets to a `~CollectionType.TAGGED` collection. 

1106 

1107 If a DatasetRef with the same exact ID is already in a collection 

1108 nothing is changed. If a `DatasetRef` with the same `DatasetType` and 

1109 data ID but with different ID exists in the collection, 

1110 `~lsst.daf.butler.registry.ConflictingDefinitionError` is raised. 

1111 

1112 Parameters 

1113 ---------- 

1114 collection : `str` 

1115 Indicates the collection the datasets should be associated with. 

1116 refs : `~collections.abc.Iterable` [ `DatasetRef` ] 

1117 An iterable of resolved `DatasetRef` instances that already exist 

1118 in this `Registry`. 

1119 

1120 Raises 

1121 ------ 

1122 lsst.daf.butler.registry.ConflictingDefinitionError 

1123 If a Dataset with the given `DatasetRef` already exists in the 

1124 given collection. 

1125 lsst.daf.butler.registry.MissingCollectionError 

1126 Raised if ``collection`` does not exist in the registry. 

1127 lsst.daf.butler.registry.CollectionTypeError 

1128 Raise adding new datasets to the given ``collection`` is not 

1129 allowed. 

1130 """ 

1131 progress = Progress("lsst.daf.butler.Registry.associate", level=logging.DEBUG) 

1132 collectionRecord = self._managers.collections.find(collection) 

1133 for datasetType, refsForType in progress.iter_item_chunks( 

1134 DatasetRef.iter_by_type(refs), desc="Associating datasets by type" 

1135 ): 

1136 try: 

1137 self._managers.datasets.associate(datasetType, collectionRecord, refsForType) 

1138 if self._managers.obscore: 

1139 # If a TAGGED collection is being monitored by ObsCore 

1140 # manager then we may need to save the dataset. 

1141 self._managers.obscore.associate(refsForType, collectionRecord) 

1142 except sqlalchemy.exc.IntegrityError as err: 

1143 raise ConflictingDefinitionError( 

1144 f"Constraint violation while associating dataset of type {datasetType.name} with " 

1145 f"collection {collection}. This probably means that one or more datasets with the same " 

1146 "dataset type and data ID already exist in the collection, but it may also indicate " 

1147 "that the datasets do not exist." 

1148 ) from err 

1149 

1150 @transactional 

1151 def disassociate(self, collection: str, refs: Iterable[DatasetRef]) -> None: 

1152 """Remove existing datasets from a `~CollectionType.TAGGED` collection. 

1153 

1154 ``collection`` and ``ref`` combinations that are not currently 

1155 associated are silently ignored. 

1156 

1157 Parameters 

1158 ---------- 

1159 collection : `str` 

1160 The collection the datasets should no longer be associated with. 

1161 refs : `~collections.abc.Iterable` [ `DatasetRef` ] 

1162 An iterable of resolved `DatasetRef` instances that already exist 

1163 in this `Registry`. 

1164 

1165 Raises 

1166 ------ 

1167 lsst.daf.butler.AmbiguousDatasetError 

1168 Raised if any of the given dataset references is unresolved. 

1169 lsst.daf.butler.registry.MissingCollectionError 

1170 Raised if ``collection`` does not exist in the registry. 

1171 lsst.daf.butler.registry.CollectionTypeError 

1172 Raise adding new datasets to the given ``collection`` is not 

1173 allowed. 

1174 """ 

1175 progress = Progress("lsst.daf.butler.Registry.disassociate", level=logging.DEBUG) 

1176 collectionRecord = self._managers.collections.find(collection) 

1177 for datasetType, refsForType in progress.iter_item_chunks( 

1178 DatasetRef.iter_by_type(refs), desc="Disassociating datasets by type" 

1179 ): 

1180 self._managers.datasets.disassociate(datasetType, collectionRecord, refsForType) 

1181 if self._managers.obscore: 

1182 self._managers.obscore.disassociate(refsForType, collectionRecord) 

1183 

1184 @transactional 

1185 def certify(self, collection: str, refs: Iterable[DatasetRef], timespan: Timespan) -> None: 

1186 """Associate one or more datasets with a calibration collection and a 

1187 validity range within it. 

1188 

1189 Parameters 

1190 ---------- 

1191 collection : `str` 

1192 The name of an already-registered `~CollectionType.CALIBRATION` 

1193 collection. 

1194 refs : `~collections.abc.Iterable` [ `DatasetRef` ] 

1195 Datasets to be associated. 

1196 timespan : `Timespan` 

1197 The validity range for these datasets within the collection. 

1198 

1199 Raises 

1200 ------ 

1201 lsst.daf.butler.AmbiguousDatasetError 

1202 Raised if any of the given `DatasetRef` instances is unresolved. 

1203 lsst.daf.butler.registry.ConflictingDefinitionError 

1204 Raised if the collection already contains a different dataset with 

1205 the same `DatasetType` and data ID and an overlapping validity 

1206 range. 

1207 DatasetTypeError 

1208 Raised if ``ref.datasetType.isCalibration() is False`` for any ref. 

1209 CollectionTypeError 

1210 Raised if 

1211 ``collection.type is not CollectionType.CALIBRATION``. 

1212 """ 

1213 progress = Progress("lsst.daf.butler.Registry.certify", level=logging.DEBUG) 

1214 with self._managers.caching_context.enable_collection_record_cache(): 

1215 collectionRecord = self._managers.collections.find(collection) 

1216 for datasetType, refsForType in progress.iter_item_chunks( 

1217 DatasetRef.iter_by_type(refs), desc="Certifying datasets by type" 

1218 ): 

1219 self._managers.datasets.certify( 

1220 datasetType, collectionRecord, refsForType, timespan, self._query 

1221 ) 

1222 

1223 @transactional 

1224 def decertify( 

1225 self, 

1226 collection: str, 

1227 datasetType: str | DatasetType, 

1228 timespan: Timespan, 

1229 *, 

1230 dataIds: Iterable[DataId] | None = None, 

1231 ) -> None: 

1232 """Remove or adjust datasets to clear a validity range within a 

1233 calibration collection. 

1234 

1235 Parameters 

1236 ---------- 

1237 collection : `str` 

1238 The name of an already-registered `~CollectionType.CALIBRATION` 

1239 collection. 

1240 datasetType : `str` or `DatasetType` 

1241 Name or `DatasetType` instance for the datasets to be decertified. 

1242 timespan : `Timespan`, optional 

1243 The validity range to remove datasets from within the collection. 

1244 Datasets that overlap this range but are not contained by it will 

1245 have their validity ranges adjusted to not overlap it, which may 

1246 split a single dataset validity range into two. 

1247 dataIds : `~collections.abc.Iterable` [`dict` or `DataCoordinate`], \ 

1248 optional 

1249 Data IDs that should be decertified within the given validity range 

1250 If `None`, all data IDs for ``self.datasetType`` will be 

1251 decertified. 

1252 

1253 Raises 

1254 ------ 

1255 DatasetTypeError 

1256 Raised if ``datasetType.isCalibration() is False``. 

1257 CollectionTypeError 

1258 Raised if 

1259 ``collection.type is not CollectionType.CALIBRATION``. 

1260 """ 

1261 collectionRecord = self._managers.collections.find(collection) 

1262 if isinstance(datasetType, str): 

1263 datasetType = self.getDatasetType(datasetType) 

1264 standardizedDataIds = None 

1265 if dataIds is not None: 

1266 standardizedDataIds = [ 

1267 DataCoordinate.standardize(d, dimensions=datasetType.dimensions) for d in dataIds 

1268 ] 

1269 self._managers.datasets.decertify( 

1270 datasetType, collectionRecord, timespan, data_ids=standardizedDataIds, query_func=self._query 

1271 ) 

1272 

1273 def getDatastoreBridgeManager(self) -> DatastoreRegistryBridgeManager: 

1274 """Return an object that allows a new `Datastore` instance to 

1275 communicate with this `Registry`. 

1276 

1277 Returns 

1278 ------- 

1279 manager : `~.interfaces.DatastoreRegistryBridgeManager` 

1280 Object that mediates communication between this `Registry` and its 

1281 associated datastores. 

1282 """ 

1283 return self._managers.datastores 

1284 

1285 def getDatasetLocations(self, ref: DatasetRef) -> Iterable[str]: 

1286 """Retrieve datastore locations for a given dataset. 

1287 

1288 Parameters 

1289 ---------- 

1290 ref : `DatasetRef` 

1291 A reference to the dataset for which to retrieve storage 

1292 information. 

1293 

1294 Returns 

1295 ------- 

1296 datastores : `~collections.abc.Iterable` [ `str` ] 

1297 All the matching datastores holding this dataset. 

1298 

1299 Raises 

1300 ------ 

1301 lsst.daf.butler.AmbiguousDatasetError 

1302 Raised if ``ref.id`` is `None`. 

1303 """ 

1304 return self._managers.datastores.findDatastores(ref) 

1305 

1306 def expandDataId( 

1307 self, 

1308 dataId: DataId | None = None, 

1309 *, 

1310 dimensions: Iterable[str] | DimensionGroup | None = None, 

1311 records: Mapping[str, DimensionRecord | None] | None = None, 

1312 withDefaults: bool = True, 

1313 **kwargs: Any, 

1314 ) -> DataCoordinate: 

1315 """Expand a dimension-based data ID to include additional information. 

1316 

1317 Parameters 

1318 ---------- 

1319 dataId : `DataCoordinate` or `dict`, optional 

1320 Data ID to be expanded; augmented and overridden by ``kwargs``. 

1321 dimensions : `~collections.abc.Iterable` [ `str` ], \ 

1322 `DimensionGroup`, optional 

1323 The dimensions to be identified by the new `DataCoordinate`. 

1324 If not provided, will be inferred from the keys of ``dataId`` and 

1325 ``**kwargs``, and ``universe`` must be provided unless ``dataId`` 

1326 is already a `DataCoordinate`. 

1327 records : `~collections.abc.Mapping` [`str`, `DimensionRecord`], \ 

1328 optional 

1329 Dimension record data to use before querying the database for that 

1330 data, keyed by element name. 

1331 withDefaults : `bool`, optional 

1332 Utilize ``self.defaults.dataId`` to fill in missing governor 

1333 dimension key-value pairs. Defaults to `True` (i.e. defaults are 

1334 used). 

1335 **kwargs 

1336 Additional keywords are treated like additional key-value pairs for 

1337 ``dataId``, extending and overriding. 

1338 

1339 Returns 

1340 ------- 

1341 expanded : `DataCoordinate` 

1342 A data ID that includes full metadata for all of the dimensions it 

1343 identifies, i.e. guarantees that ``expanded.hasRecords()`` and 

1344 ``expanded.hasFull()`` both return `True`. 

1345 

1346 Raises 

1347 ------ 

1348 lsst.daf.butler.registry.DataIdError 

1349 Raised when ``dataId`` or keyword arguments specify unknown 

1350 dimensions or values, or when a resulting data ID contains 

1351 contradictory key-value pairs, according to dimension 

1352 relationships. 

1353 

1354 Notes 

1355 ----- 

1356 This method cannot be relied upon to reject invalid data ID values 

1357 for dimensions that do actually not have any record columns. For 

1358 efficiency reasons the records for these dimensions (which have only 

1359 dimension key values that are given by the caller) may be constructed 

1360 directly rather than obtained from the registry database. 

1361 """ 

1362 if not withDefaults: 

1363 defaults = None 

1364 else: 

1365 defaults = self.defaults.dataId 

1366 standardized = DataCoordinate.standardize( 

1367 dataId, 

1368 dimensions=dimensions, 

1369 universe=self.dimensions, 

1370 defaults=defaults, 

1371 **kwargs, 

1372 ) 

1373 if standardized.hasRecords(): 

1374 return standardized 

1375 if records is None: 

1376 records = {} 

1377 else: 

1378 records = dict(records) 

1379 if isinstance(dataId, DataCoordinate) and dataId.hasRecords() and not kwargs: 

1380 for element_name in dataId.dimensions.elements: 

1381 records[element_name] = dataId.records[element_name] 

1382 keys: dict[str, str | int] = dict(standardized.mapping) 

1383 for element_name in standardized.dimensions.lookup_order: 

1384 element = self.dimensions[element_name] 

1385 record = records.get(element_name, ...) # Use ... to mean not found; None might mean NULL 

1386 if record is ...: 

1387 if element_name in self.dimensions.dimensions.names and keys.get(element_name) is None: 

1388 raise DimensionNameError(f"No value or null value for dimension {element_name}.") 

1389 else: 

1390 record = self._managers.dimensions.fetch_one( 

1391 element_name, 

1392 DataCoordinate.standardize(keys, dimensions=element.minimal_group), 

1393 self.dimension_record_cache, 

1394 ) 

1395 records[element_name] = record 

1396 if record is not None: 

1397 for d in element.implied: 

1398 value = getattr(record, d.name) 

1399 if keys.setdefault(d.name, value) != value: 

1400 raise InconsistentDataIdError( 

1401 f"Data ID {standardized} has {d.name}={keys[d.name]!r}, " 

1402 f"but {element_name} implies {d.name}={value!r}." 

1403 ) 

1404 else: 

1405 if element_name in standardized.dimensions.names: 

1406 raise DataIdValueError( 

1407 f"Could not fetch record for dimension {element.name} via keys {keys}." 

1408 ) 

1409 if element.defines_relationships: 

1410 raise InconsistentDataIdError( 

1411 f"Could not fetch record for element {element_name} via keys {keys}, " 

1412 "but it is marked as defining relationships; this means one or more dimensions are " 

1413 "have inconsistent values.", 

1414 ) 

1415 return DataCoordinate.standardize(keys, dimensions=standardized.dimensions).expanded(records=records) 

1416 

1417 def expand_data_ids(self, data_ids: Iterable[DataCoordinate]) -> list[DataCoordinate]: 

1418 return expand_data_ids(data_ids, self.dimensions, self._query, self.dimension_record_cache) 

1419 

1420 def expand_refs(self, dataset_refs: list[DatasetRef]) -> list[DatasetRef]: 

1421 expanded_ids = self.expand_data_ids([ref.dataId for ref in dataset_refs]) 

1422 return [ref.expanded(data_id) for ref, data_id in zip(dataset_refs, expanded_ids)] 

1423 

1424 def insertDimensionData( 

1425 self, 

1426 element: DimensionElement | str, 

1427 *data: Mapping[str, Any] | DimensionRecord, 

1428 conform: bool = True, 

1429 replace: bool = False, 

1430 skip_existing: bool = False, 

1431 ) -> None: 

1432 """Insert one or more dimension records into the database. 

1433 

1434 Parameters 

1435 ---------- 

1436 element : `DimensionElement` or `str` 

1437 The `DimensionElement` or name thereof that identifies the table 

1438 records will be inserted into. 

1439 *data : `dict` or `DimensionRecord` 

1440 One or more records to insert. 

1441 conform : `bool`, optional 

1442 If `False` (`True` is default) perform no checking or conversions, 

1443 and assume that ``element`` is a `DimensionElement` instance and 

1444 ``data`` is a one or more `DimensionRecord` instances of the 

1445 appropriate subclass. 

1446 replace : `bool`, optional 

1447 If `True` (`False` is default), replace existing records in the 

1448 database if there is a conflict. 

1449 skip_existing : `bool`, optional 

1450 If `True` (`False` is default), skip insertion if a record with 

1451 the same primary key values already exists. Unlike 

1452 `syncDimensionData`, this will not detect when the given record 

1453 differs from what is in the database, and should not be used when 

1454 this is a concern. 

1455 """ 

1456 if isinstance(element, str): 

1457 element = self.dimensions[element] 

1458 if conform: 

1459 records = [ 

1460 row if isinstance(row, DimensionRecord) else element.RecordClass(**row) for row in data 

1461 ] 

1462 else: 

1463 # Ignore typing since caller said to trust them with conform=False. 

1464 records = data # type: ignore 

1465 if element.name in self.dimension_record_cache: 

1466 self.dimension_record_cache.reset() 

1467 self._managers.dimensions.insert( 

1468 element, 

1469 *records, 

1470 replace=replace, 

1471 skip_existing=skip_existing, 

1472 ) 

1473 

1474 def syncDimensionData( 

1475 self, 

1476 element: DimensionElement | str, 

1477 row: Mapping[str, Any] | DimensionRecord, 

1478 conform: bool = True, 

1479 update: bool = False, 

1480 ) -> bool | dict[str, Any]: 

1481 """Synchronize the given dimension record with the database, inserting 

1482 if it does not already exist and comparing values if it does. 

1483 

1484 Parameters 

1485 ---------- 

1486 element : `DimensionElement` or `str` 

1487 The `DimensionElement` or name thereof that identifies the table 

1488 records will be inserted into. 

1489 row : `dict` or `DimensionRecord` 

1490 The record to insert. 

1491 conform : `bool`, optional 

1492 If `False` (`True` is default) perform no checking or conversions, 

1493 and assume that ``element`` is a `DimensionElement` instance and 

1494 ``data`` is a `DimensionRecord` instances of the appropriate 

1495 subclass. 

1496 update : `bool`, optional 

1497 If `True` (`False` is default), update the existing record in the 

1498 database if there is a conflict. 

1499 

1500 Returns 

1501 ------- 

1502 inserted_or_updated : `bool` or `dict` 

1503 `True` if a new row was inserted, `False` if no changes were 

1504 needed, or a `dict` mapping updated column names to their old 

1505 values if an update was performed (only possible if 

1506 ``update=True``). 

1507 

1508 Raises 

1509 ------ 

1510 lsst.daf.butler.registry.ConflictingDefinitionError 

1511 Raised if the record exists in the database (according to primary 

1512 key lookup) but is inconsistent with the given one. 

1513 """ 

1514 if conform: 

1515 if isinstance(element, str): 

1516 element = self.dimensions[element] 

1517 record = row if isinstance(row, DimensionRecord) else element.RecordClass(**row) 

1518 else: 

1519 # Ignore typing since caller said to trust them with conform=False. 

1520 record = row # type: ignore 

1521 if record.definition.name in self.dimension_record_cache: 

1522 self.dimension_record_cache.reset() 

1523 return self._managers.dimensions.sync(record, update=update) 

1524 

1525 def queryDatasetTypes( 

1526 self, 

1527 expression: Any = ..., 

1528 *, 

1529 missing: list[str] | None = None, 

1530 ) -> Iterable[DatasetType]: 

1531 """Iterate over the dataset types whose names match an expression. 

1532 

1533 Parameters 

1534 ---------- 

1535 expression : dataset type expression, optional 

1536 An expression that fully or partially identifies the dataset types 

1537 to return, such as a `str`, `re.Pattern`, or iterable thereof. 

1538 ``...`` can be used to return all dataset types, and is the 

1539 default. See :ref:`daf_butler_dataset_type_expressions` for more 

1540 information. 

1541 missing : `list` of `str`, optional 

1542 String dataset type names that were explicitly given (i.e. not 

1543 regular expression patterns) but not found will be appended to this 

1544 list, if it is provided. 

1545 

1546 Returns 

1547 ------- 

1548 dataset_types : `~collections.abc.Iterable` [ `DatasetType`] 

1549 An `~collections.abc.Iterable` of `DatasetType` instances whose 

1550 names match ``expression``. 

1551 

1552 Raises 

1553 ------ 

1554 lsst.daf.butler.registry.DatasetTypeExpressionError 

1555 Raised when ``expression`` is invalid. 

1556 """ 

1557 wildcard = DatasetTypeWildcard.from_expression(expression) 

1558 return self._managers.datasets.resolve_wildcard(wildcard, missing=missing) 

1559 

1560 def queryCollections( 

1561 self, 

1562 expression: Any = ..., 

1563 datasetType: DatasetType | None = None, 

1564 collectionTypes: Iterable[CollectionType] | CollectionType = CollectionType.all(), 

1565 flattenChains: bool = False, 

1566 includeChains: bool | None = None, 

1567 ) -> Sequence[str]: 

1568 """Iterate over the collections whose names match an expression. 

1569 

1570 Parameters 

1571 ---------- 

1572 expression : collection expression, optional 

1573 An expression that identifies the collections to return, such as 

1574 a `str` (for full matches or partial matches via globs), 

1575 `re.Pattern` (for partial matches), or iterable thereof. ``...`` 

1576 can be used to return all collections, and is the default. 

1577 See :ref:`daf_butler_collection_expressions` for more information. 

1578 datasetType : `DatasetType`, optional 

1579 If provided, only yield collections that may contain datasets of 

1580 this type. This is a conservative approximation in general; it may 

1581 yield collections that do not have any such datasets. 

1582 collectionTypes : `~collections.abc.Set` [`CollectionType`] or \ 

1583 `CollectionType`, optional 

1584 If provided, only yield collections of these types. 

1585 flattenChains : `bool`, optional 

1586 If `True` (`False` is default), recursively yield the child 

1587 collections of matching `~CollectionType.CHAINED` collections. 

1588 includeChains : `bool`, optional 

1589 If `True`, yield records for matching `~CollectionType.CHAINED` 

1590 collections. Default is the opposite of ``flattenChains``: include 

1591 either CHAINED collections or their children, but not both. 

1592 

1593 Returns 

1594 ------- 

1595 collections : `~collections.abc.Sequence` [ `str` ] 

1596 The names of collections that match ``expression``. 

1597 

1598 Raises 

1599 ------ 

1600 lsst.daf.butler.registry.CollectionExpressionError 

1601 Raised when ``expression`` is invalid. 

1602 

1603 Notes 

1604 ----- 

1605 The order in which collections are returned is unspecified, except that 

1606 the children of a `~CollectionType.CHAINED` collection are guaranteed 

1607 to be in the order in which they are searched. When multiple parent 

1608 `~CollectionType.CHAINED` collections match the same criteria, the 

1609 order in which the two lists appear is unspecified, and the lists of 

1610 children may be incomplete if a child has multiple parents. 

1611 """ 

1612 # Right now the datasetTypes argument is completely ignored, but that 

1613 # is consistent with its [lack of] guarantees. DM-24939 or a follow-up 

1614 # ticket will take care of that. 

1615 if datasetType is not None: 

1616 warnings.warn( 

1617 "The datasetType parameter should no longer be used. It has" 

1618 " never had any effect. Will be removed after v28", 

1619 FutureWarning, 

1620 ) 

1621 try: 

1622 wildcard = CollectionWildcard.from_expression(expression) 

1623 except TypeError as exc: 

1624 raise CollectionExpressionError(f"Invalid collection expression '{expression}'") from exc 

1625 collectionTypes = ensure_iterable(collectionTypes) 

1626 return [ 

1627 record.name 

1628 for record in self._managers.collections.resolve_wildcard( 

1629 wildcard, 

1630 collection_types=frozenset(collectionTypes), 

1631 flatten_chains=flattenChains, 

1632 include_chains=includeChains, 

1633 ) 

1634 ] 

1635 

1636 @contextlib.contextmanager 

1637 def _query(self) -> Iterator[Query]: 

1638 """Context manager returning a `Query` object used for construction 

1639 and execution of complex queries. 

1640 """ 

1641 with self._query_driver(self.defaults.collections, self.defaults.dataId) as driver: 

1642 yield Query(driver) 

1643 

1644 @contextlib.contextmanager 

1645 def _query_driver( 

1646 self, 

1647 default_collections: Iterable[str], 

1648 default_data_id: DataCoordinate, 

1649 ) -> Iterator[DirectQueryDriver]: 

1650 """Set up a `QueryDriver` instance for query execution.""" 

1651 # Query internals do repeated lookups of the same collections, so it 

1652 # benefits from the collection record cache. 

1653 with self._managers.caching_context.enable_collection_record_cache(): 

1654 driver = DirectQueryDriver( 

1655 self._db, 

1656 self.dimensions, 

1657 self._managers, 

1658 self.dimension_record_cache, 

1659 default_collections=default_collections, 

1660 default_data_id=default_data_id, 

1661 ) 

1662 with driver: 

1663 yield driver 

1664 

1665 def get_datastore_records(self, ref: DatasetRef) -> DatasetRef: 

1666 """Retrieve datastore records for given ref. 

1667 

1668 Parameters 

1669 ---------- 

1670 ref : `DatasetRef` 

1671 Dataset reference for which to retrieve its corresponding datastore 

1672 records. 

1673 

1674 Returns 

1675 ------- 

1676 updated_ref : `DatasetRef` 

1677 Dataset reference with filled datastore records. 

1678 

1679 Notes 

1680 ----- 

1681 If this method is called with the dataset ref that is not known to the 

1682 registry then the reference with an empty set of records is returned. 

1683 """ 

1684 datastore_records: dict[str, list[StoredDatastoreItemInfo]] = {} 

1685 for opaque, record_class in self._datastore_record_classes.items(): 

1686 records = self.fetchOpaqueData(opaque, dataset_id=ref.id) 

1687 datastore_records[opaque] = [record_class.from_record(record) for record in records] 

1688 return ref.replace(datastore_records=datastore_records) 

1689 

1690 def store_datastore_records(self, refs: Mapping[str, DatasetRef]) -> None: 

1691 """Store datastore records for given refs. 

1692 

1693 Parameters 

1694 ---------- 

1695 refs : `~collections.abc.Mapping` [`str`, `DatasetRef`] 

1696 Mapping of a datastore name to dataset reference stored in that 

1697 datastore, reference must include datastore records. 

1698 """ 

1699 for datastore_name, ref in refs.items(): 

1700 # Store ref IDs in the bridge table. 

1701 bridge = self._managers.datastores.register(datastore_name) 

1702 bridge.insert([ref]) 

1703 

1704 # store records in opaque tables 

1705 assert ref._datastore_records is not None, "Dataset ref must have datastore records" 

1706 for table_name, records in ref._datastore_records.items(): 

1707 opaque_table = self._managers.opaque.get(table_name) 

1708 assert opaque_table is not None, f"Unexpected opaque table name {table_name}" 

1709 opaque_table.insert(*(record.to_record(dataset_id=ref.id) for record in records)) 

1710 

1711 def make_datastore_tables(self, tables: Mapping[str, DatastoreOpaqueTable]) -> None: 

1712 """Create opaque tables used by datastores. 

1713 

1714 Parameters 

1715 ---------- 

1716 tables : `~collections.abc.Mapping` 

1717 Maps opaque table name to its definition. 

1718 

1719 Notes 

1720 ----- 

1721 This method should disappear in the future when opaque table 

1722 definitions will be provided during `Registry` construction. 

1723 """ 

1724 datastore_record_classes = {} 

1725 for table_name, table_def in tables.items(): 

1726 datastore_record_classes[table_name] = table_def.record_class 

1727 try: 

1728 self._managers.opaque.register(table_name, table_def.table_spec) 

1729 except ReadOnlyDatabaseError: 

1730 # If the database is read only and we just tried and failed to 

1731 # create a table, it means someone is trying to create a 

1732 # read-only butler client for an empty repo. That should be 

1733 # okay, as long as they then try to get any datasets before 

1734 # some other client creates the table. Chances are they're 

1735 # just validating configuration. 

1736 pass 

1737 self._datastore_record_classes = datastore_record_classes 

1738 

1739 def preload_cache(self, *, load_dimension_record_cache: bool) -> None: 

1740 """Immediately load caches that are used for common operations. 

1741 

1742 Parameters 

1743 ---------- 

1744 load_dimension_record_cache : `bool` 

1745 If True, preload the dimension record cache. When this cache is 

1746 preloaded, subsequent external changes to governor dimension 

1747 records will not be visible to this Butler. 

1748 """ 

1749 self._managers.datasets.preload_cache() 

1750 

1751 if load_dimension_record_cache: 

1752 self.dimension_record_cache.preload_cache() 

1753 

1754 @property 

1755 def obsCoreTableManager(self) -> ObsCoreTableManager | None: 

1756 """The ObsCore manager instance for this registry 

1757 (`~.interfaces.ObsCoreTableManager` 

1758 or `None`). 

1759 

1760 ObsCore manager may not be implemented for all registry backend, or 

1761 may not be enabled for many repositories. 

1762 """ 

1763 return self._managers.obscore 

1764 

1765 storageClasses: StorageClassFactory 

1766 """All storage classes known to the registry (`StorageClassFactory`). 

1767 """ 

1768 

1769 _defaults: RegistryDefaults 

1770 """Default collections used for registry queries (`RegistryDefaults`)."""