Coverage for python / lsst / daf / butler / registry / sql_registry.py: 22%
408 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 08:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 08:49 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30from .. import ddl
32__all__ = ("SqlRegistry",)
34import contextlib
35import logging
36import warnings
37from collections.abc import Iterable, Iterator, Mapping, Sequence
38from typing import TYPE_CHECKING, Any
40import sqlalchemy
42from lsst.resources import ResourcePathExpression
43from lsst.utils.iteration import ensure_iterable
45from .._collection_type import CollectionType
46from .._config import Config
47from .._dataset_ref import DatasetId, DatasetIdGenEnum, DatasetRef
48from .._dataset_type import DatasetType
49from .._exceptions import DataIdValueError, DimensionNameError, InconsistentDataIdError
50from .._storage_class import StorageClassFactory
51from .._timespan import Timespan
52from ..dimensions import (
53 DataCoordinate,
54 DataId,
55 DimensionConfig,
56 DimensionElement,
57 DimensionGroup,
58 DimensionRecord,
59 DimensionUniverse,
60)
61from ..dimensions.record_cache import DimensionRecordCache
62from ..direct_query_driver import DirectQueryDriver
63from ..progress import Progress
64from ..queries import Query
65from ..registry import (
66 CollectionExpressionError,
67 CollectionSummary,
68 CollectionTypeError,
69 ConflictingDefinitionError,
70 NoDefaultCollectionError,
71 OrphanedRecordError,
72 RegistryConfig,
73 RegistryDefaults,
74)
75from ..registry.interfaces import ChainedCollectionRecord, ReadOnlyDatabaseError, RunRecord
76from ..registry.managers import RegistryManagerInstances, RegistryManagerTypes
77from ..registry.wildcards import CollectionWildcard, DatasetTypeWildcard
78from ..utils import transactional
79from .expand_data_ids import expand_data_ids
81if TYPE_CHECKING:
82 from .._butler_config import ButlerConfig
83 from ..datastore._datastore import DatastoreOpaqueTable
84 from ..datastore.stored_file_info import StoredDatastoreItemInfo
85 from ..registry.interfaces import (
86 CollectionRecord,
87 Database,
88 DatastoreRegistryBridgeManager,
89 ObsCoreTableManager,
90 )
93_LOG = logging.getLogger(__name__)
96class SqlRegistry:
97 """Butler Registry implementation that uses SQL database as backend.
99 Parameters
100 ----------
101 database : `Database`
102 Database instance to store Registry.
103 defaults : `RegistryDefaults`
104 Default collection search path and/or output `~CollectionType.RUN`
105 collection.
106 managers : `RegistryManagerInstances`
107 All the managers required for this registry.
108 """
110 defaultConfigFile: str | None = None
111 """Path to configuration defaults. Accessed within the ``configs`` resource
112 or relative to a search path. Can be None if no defaults specified.
113 """
115 @classmethod
116 def forceRegistryConfig(
117 cls, config: ButlerConfig | RegistryConfig | Config | str | None
118 ) -> RegistryConfig:
119 """Force the supplied config to a `RegistryConfig`.
121 Parameters
122 ----------
123 config : `RegistryConfig`, `Config` or `str` or `None`
124 Registry configuration, if missing then default configuration will
125 be loaded from registry.yaml.
127 Returns
128 -------
129 registry_config : `RegistryConfig`
130 A registry config.
131 """
132 if not isinstance(config, RegistryConfig):
133 if isinstance(config, str | Config) or config is None:
134 config = RegistryConfig(config)
135 else:
136 raise ValueError(f"Incompatible Registry configuration: {config}")
137 return config
139 @classmethod
140 def createFromConfig(
141 cls,
142 config: RegistryConfig | str | None = None,
143 dimensionConfig: DimensionConfig | str | None = None,
144 butlerRoot: ResourcePathExpression | None = None,
145 ) -> SqlRegistry:
146 """Create registry database and return `SqlRegistry` instance.
148 This method initializes database contents, database must be empty
149 prior to calling this method.
151 Parameters
152 ----------
153 config : `RegistryConfig` or `str`, optional
154 Registry configuration, if missing then default configuration will
155 be loaded from registry.yaml.
156 dimensionConfig : `DimensionConfig` or `str`, optional
157 Dimensions configuration, if missing then default configuration
158 will be loaded from dimensions.yaml.
159 butlerRoot : convertible to `lsst.resources.ResourcePath`, optional
160 Path to the repository root this `SqlRegistry` will manage.
162 Returns
163 -------
164 registry : `SqlRegistry`
165 A new `SqlRegistry` instance.
166 """
167 config = cls.forceRegistryConfig(config)
168 config.replaceRoot(butlerRoot)
170 if isinstance(dimensionConfig, str):
171 dimensionConfig = DimensionConfig(dimensionConfig)
172 elif dimensionConfig is None:
173 dimensionConfig = DimensionConfig()
174 elif not isinstance(dimensionConfig, DimensionConfig):
175 raise TypeError(f"Incompatible Dimension configuration type: {type(dimensionConfig)}")
177 managerTypes = RegistryManagerTypes.fromConfig(config)
178 DatabaseClass = config.getDatabaseClass()
179 database = DatabaseClass.fromUri(
180 config.connectionString,
181 origin=config.get("origin", 0),
182 namespace=config.get("namespace"),
183 allow_temporary_tables=config.areTemporaryTablesAllowed,
184 )
186 try:
187 managers = managerTypes.makeRepo(database, dimensionConfig)
188 return cls(database, RegistryDefaults(), managers)
189 except Exception:
190 database.dispose()
191 raise
193 @classmethod
194 def fromConfig(
195 cls,
196 config: ButlerConfig | RegistryConfig | Config | str,
197 butlerRoot: ResourcePathExpression | None = None,
198 writeable: bool = True,
199 defaults: RegistryDefaults | None = None,
200 ) -> SqlRegistry:
201 """Create `Registry` subclass instance from `config`.
203 Registry database must be initialized prior to calling this method.
205 Parameters
206 ----------
207 config : `ButlerConfig`, `RegistryConfig`, `Config` or `str`
208 Registry configuration.
209 butlerRoot : `lsst.resources.ResourcePathExpression`, optional
210 Path to the repository root this `Registry` will manage.
211 writeable : `bool`, optional
212 If `True` (default) create a read-write connection to the database.
213 defaults : `RegistryDefaults`, optional
214 Default collection search path and/or output `~CollectionType.RUN`
215 collection.
217 Returns
218 -------
219 registry : `SqlRegistry`
220 A new `SqlRegistry` subclass instance.
221 """
222 config = cls.forceRegistryConfig(config)
223 config.replaceRoot(butlerRoot)
224 if defaults is None:
225 defaults = RegistryDefaults()
226 DatabaseClass = config.getDatabaseClass()
227 database = DatabaseClass.fromUri(
228 config.connectionString,
229 origin=config.get("origin", 0),
230 namespace=config.get("namespace"),
231 writeable=writeable,
232 allow_temporary_tables=config.areTemporaryTablesAllowed,
233 )
234 try:
235 managerTypes = RegistryManagerTypes.fromConfig(config)
236 with database.session():
237 managers = managerTypes.loadRepo(database)
239 return cls(database, defaults, managers)
240 except Exception:
241 database.dispose()
242 raise
244 def __init__(
245 self,
246 database: Database,
247 defaults: RegistryDefaults,
248 managers: RegistryManagerInstances,
249 ):
250 self._db = database
251 self._managers = managers
252 if managers.obscore is not None:
253 managers.obscore.set_query_function(self._query)
254 self.storageClasses = StorageClassFactory()
255 # This is public to SqlRegistry's internal-to-daf_butler callers, but
256 # it is intentionally not part of RegistryShim.
257 self.dimension_record_cache = DimensionRecordCache(
258 self._managers.dimensions.universe,
259 fetch=self._managers.dimensions.fetch_cache_dict,
260 )
261 # Intentionally invoke property setter to initialize defaults. This
262 # can only be done after most of the rest of Registry has already been
263 # initialized, and must be done before the property getter is used.
264 self.defaults = defaults
265 # TODO: This is currently initialized by `make_datastore_tables`,
266 # eventually we'll need to do it during construction.
267 # The mapping is indexed by the opaque table name.
268 self._datastore_record_classes: Mapping[str, type[StoredDatastoreItemInfo]] = {}
269 self._is_clone = False
271 def close(self) -> None:
272 # Connection pool is shared between cloned instances, so only the root
273 # instance should close it.
274 # Note: The underlying SQLAlchemy call will create a fresh connection
275 # pool, so nothing breaks if the root instance is accidentally closed
276 # before the clones are finished -- we just have a small performance
277 # hit from re-creating the connections.
278 if not self._is_clone:
279 self._db.dispose()
281 def __str__(self) -> str:
282 return str(self._db)
284 def __repr__(self) -> str:
285 return f"SqlRegistry({self._db!r}, {self.dimensions!r})"
287 def isWriteable(self) -> bool:
288 """Return `True` if this registry allows write operations, and `False`
289 otherwise.
290 """
291 return self._db.isWriteable()
293 def copy(self, defaults: RegistryDefaults | None = None) -> SqlRegistry:
294 """Create a new `SqlRegistry` backed by the same data repository
295 as this one and sharing a database connection pool with it, but with
296 independent defaults and database sessions.
298 Parameters
299 ----------
300 defaults : `~lsst.daf.butler.registry.RegistryDefaults`, optional
301 Default collections and data ID values for the new registry. If
302 not provided, ``self.defaults`` will be used (but future changes
303 to either registry's defaults will not affect the other).
305 Returns
306 -------
307 copy : `SqlRegistry`
308 A new `SqlRegistry` instance with its own defaults.
309 """
310 if defaults is None:
311 # No need to copy, because `RegistryDefaults` is immutable; we
312 # effectively copy on write.
313 defaults = self.defaults
314 db = self._db.clone()
315 result = SqlRegistry(db, defaults, self._managers.clone(db))
316 result._datastore_record_classes = dict(self._datastore_record_classes)
317 result.dimension_record_cache.load_from(self.dimension_record_cache)
318 result._is_clone = True
319 return result
321 @property
322 def dimensions(self) -> DimensionUniverse:
323 """Definitions of all dimensions recognized by this `Registry`
324 (`DimensionUniverse`).
325 """
326 return self._managers.dimensions.universe
328 @property
329 def defaults(self) -> RegistryDefaults:
330 """Default collection search path and/or output `~CollectionType.RUN`
331 collection (`~lsst.daf.butler.registry.RegistryDefaults`).
333 This is an immutable struct whose components may not be set
334 individually, but the entire struct can be set by assigning to this
335 property.
336 """
337 return self._defaults
339 @defaults.setter
340 def defaults(self, value: RegistryDefaults) -> None:
341 if value.run is not None:
342 self.registerRun(value.run)
343 value.finish(self)
344 self._defaults = value
346 def refresh(self) -> None:
347 """Refresh all in-memory state by querying the database.
349 This may be necessary to enable querying for entities added by other
350 registry instances after this one was constructed.
351 """
352 self.dimension_record_cache.reset()
353 with self._db.transaction():
354 self._managers.refresh()
356 def refresh_collection_summaries(self) -> None:
357 """Refresh content of the collection summary tables in the database.
359 This only cleans dataset type summaries, we may want to add cleanup of
360 governor summaries later.
361 """
362 for dataset_type in self.queryDatasetTypes():
363 self._managers.datasets.refresh_collection_summaries(dataset_type)
365 def caching_context(self) -> contextlib.AbstractContextManager[None]:
366 """Return context manager that enables caching.
368 Returns
369 -------
370 manager
371 A context manager that enables client-side caching. Entering
372 the context returns `None`.
373 """
374 return self._managers.caching_context_manager()
376 @contextlib.contextmanager
377 def transaction(self, *, savepoint: bool = False) -> Iterator[None]:
378 """Return a context manager that represents a transaction.
380 Parameters
381 ----------
382 savepoint : `bool`
383 Whether to issue a SAVEPOINT in the database.
385 Yields
386 ------
387 `None`
388 """
389 with self._db.transaction(savepoint=savepoint):
390 yield
392 def resetConnectionPool(self) -> None:
393 """Reset SQLAlchemy connection pool for `SqlRegistry` database.
395 This operation is useful when using registry with fork-based
396 multiprocessing. To use registry across fork boundary one has to make
397 sure that there are no currently active connections (no session or
398 transaction is in progress) and connection pool is reset using this
399 method. This method should be called by the child process immediately
400 after the fork.
401 """
402 self._db._engine.dispose()
404 def registerOpaqueTable(self, tableName: str, spec: ddl.TableSpec) -> None:
405 """Add an opaque (to the `Registry`) table for use by a `Datastore` or
406 other data repository client.
408 Opaque table records can be added via `insertOpaqueData`, retrieved via
409 `fetchOpaqueData`, and removed via `deleteOpaqueData`.
411 Parameters
412 ----------
413 tableName : `str`
414 Logical name of the opaque table. This may differ from the
415 actual name used in the database by a prefix and/or suffix.
416 spec : `ddl.TableSpec`
417 Specification for the table to be added.
418 """
419 self._managers.opaque.register(tableName, spec)
421 @transactional
422 def insertOpaqueData(self, tableName: str, *data: dict) -> None:
423 """Insert records into an opaque table.
425 Parameters
426 ----------
427 tableName : `str`
428 Logical name of the opaque table. Must match the name used in a
429 previous call to `registerOpaqueTable`.
430 *data
431 Each additional positional argument is a dictionary that represents
432 a single row to be added.
433 """
434 self._managers.opaque[tableName].insert(*data)
436 def fetchOpaqueData(self, tableName: str, **where: Any) -> Iterator[Mapping[str, Any]]:
437 """Retrieve records from an opaque table.
439 Parameters
440 ----------
441 tableName : `str`
442 Logical name of the opaque table. Must match the name used in a
443 previous call to `registerOpaqueTable`.
444 **where
445 Additional keyword arguments are interpreted as equality
446 constraints that restrict the returned rows (combined with AND);
447 keyword arguments are column names and values are the values they
448 must have.
450 Yields
451 ------
452 row : `dict`
453 A dictionary representing a single result row.
454 """
455 yield from self._managers.opaque[tableName].fetch(**where)
457 @transactional
458 def deleteOpaqueData(self, tableName: str, **where: Any) -> None:
459 """Remove records from an opaque table.
461 Parameters
462 ----------
463 tableName : `str`
464 Logical name of the opaque table. Must match the name used in a
465 previous call to `registerOpaqueTable`.
466 **where
467 Additional keyword arguments are interpreted as equality
468 constraints that restrict the deleted rows (combined with AND);
469 keyword arguments are column names and values are the values they
470 must have.
471 """
472 self._managers.opaque[tableName].delete(where.keys(), where)
474 def registerCollection(
475 self, name: str, type: CollectionType = CollectionType.TAGGED, doc: str | None = None
476 ) -> bool:
477 """Add a new collection if one with the given name does not exist.
479 Parameters
480 ----------
481 name : `str`
482 The name of the collection to create.
483 type : `CollectionType`
484 Enum value indicating the type of collection to create.
485 doc : `str`, optional
486 Documentation string for the collection.
488 Returns
489 -------
490 registered : `bool`
491 Boolean indicating whether the collection was already registered
492 or was created by this call.
494 Notes
495 -----
496 This method cannot be called within transactions, as it needs to be
497 able to perform its own transaction to be concurrent.
498 """
499 _, registered = self._managers.collections.register(name, type, doc=doc)
500 return registered
502 def getCollectionType(self, name: str) -> CollectionType:
503 """Return an enumeration value indicating the type of the given
504 collection.
506 Parameters
507 ----------
508 name : `str`
509 The name of the collection.
511 Returns
512 -------
513 type : `CollectionType`
514 Enum value indicating the type of this collection.
516 Raises
517 ------
518 lsst.daf.butler.registry.MissingCollectionError
519 Raised if no collection with the given name exists.
520 """
521 return self._managers.collections.find(name).type
523 def get_collection_record(self, name: str) -> CollectionRecord:
524 """Return the record for this collection.
526 Parameters
527 ----------
528 name : `str`
529 Name of the collection for which the record is to be retrieved.
531 Returns
532 -------
533 record : `CollectionRecord`
534 The record for this collection.
535 """
536 return self._managers.collections.find(name)
538 def registerRun(self, name: str, doc: str | None = None) -> bool:
539 """Add a new run if one with the given name does not exist.
541 Parameters
542 ----------
543 name : `str`
544 The name of the run to create.
545 doc : `str`, optional
546 Documentation string for the collection.
548 Returns
549 -------
550 registered : `bool`
551 Boolean indicating whether a new run was registered. `False`
552 if it already existed.
554 Notes
555 -----
556 This method cannot be called within transactions, as it needs to be
557 able to perform its own transaction to be concurrent.
558 """
559 _, registered = self._managers.collections.register(name, CollectionType.RUN, doc=doc)
560 return registered
562 @transactional
563 def removeCollection(self, name: str) -> None:
564 """Remove the given collection from the registry.
566 Parameters
567 ----------
568 name : `str`
569 The name of the collection to remove.
571 Raises
572 ------
573 lsst.daf.butler.registry.MissingCollectionError
574 Raised if no collection with the given name exists.
575 sqlalchemy.exc.IntegrityError
576 Raised if the database rows associated with the collection are
577 still referenced by some other table, such as a dataset in a
578 datastore (for `~CollectionType.RUN` collections only) or a
579 `~CollectionType.CHAINED` collection of which this collection is
580 a child.
582 Notes
583 -----
584 If this is a `~CollectionType.RUN` collection, all datasets and quanta
585 in it will removed from the `Registry` database. This requires that
586 those datasets be removed (or at least trashed) from any datastores
587 that hold them first.
589 A collection may not be deleted as long as it is referenced by a
590 `~CollectionType.CHAINED` collection; the ``CHAINED`` collection must
591 be deleted or redefined first.
592 """
593 self._managers.collections.remove(name)
595 def getCollectionChain(self, parent: str) -> tuple[str, ...]:
596 """Return the child collections in a `~CollectionType.CHAINED`
597 collection.
599 Parameters
600 ----------
601 parent : `str`
602 Name of the chained collection. Must have already been added via
603 a call to `Registry.registerCollection`.
605 Returns
606 -------
607 children : `~collections.abc.Sequence` [ `str` ]
608 An ordered sequence of collection names that are searched when the
609 given chained collection is searched.
611 Raises
612 ------
613 lsst.daf.butler.registry.MissingCollectionError
614 Raised if ``parent`` does not exist in the `Registry`.
615 lsst.daf.butler.registry.CollectionTypeError
616 Raised if ``parent`` does not correspond to a
617 `~CollectionType.CHAINED` collection.
618 """
619 record = self._managers.collections.find(parent)
620 if record.type is not CollectionType.CHAINED:
621 raise CollectionTypeError(f"Collection '{parent}' has type {record.type.name}, not CHAINED.")
622 assert isinstance(record, ChainedCollectionRecord)
623 return record.children
625 @transactional
626 def setCollectionChain(self, parent: str, children: Any, *, flatten: bool = False) -> None:
627 """Define or redefine a `~CollectionType.CHAINED` collection.
629 Parameters
630 ----------
631 parent : `str`
632 Name of the chained collection. Must have already been added via
633 a call to `Registry.registerCollection`.
634 children : collection expression
635 An expression defining an ordered search of child collections,
636 generally an iterable of `str`; see
637 :ref:`daf_butler_collection_expressions` for more information.
638 flatten : `bool`, optional
639 If `True` (`False` is default), recursively flatten out any nested
640 `~CollectionType.CHAINED` collections in ``children`` first.
642 Raises
643 ------
644 lsst.daf.butler.registry.MissingCollectionError
645 Raised when any of the given collections do not exist in the
646 `Registry`.
647 lsst.daf.butler.registry.CollectionTypeError
648 Raised if ``parent`` does not correspond to a
649 `~CollectionType.CHAINED` collection.
650 CollectionCycleError
651 Raised if the given collections contains a cycle.
653 Notes
654 -----
655 If this function is called within a call to ``Butler.transaction``, it
656 will hold a lock that prevents other processes from modifying the
657 parent collection until the end of the transaction. Keep these
658 transactions short.
659 """
660 children = CollectionWildcard.from_expression(children).require_ordered()
661 if flatten:
662 children = self.queryCollections(children, flattenChains=True)
664 self._managers.collections.update_chain(parent, list(children), allow_use_in_caching_context=True)
666 def getCollectionParentChains(self, collection: str) -> set[str]:
667 """Return the CHAINED collections that directly contain the given one.
669 Parameters
670 ----------
671 collection : `str`
672 Name of the collection.
674 Returns
675 -------
676 chains : `set` of `str`
677 Set of `~CollectionType.CHAINED` collection names.
678 """
679 return self._managers.collections.getParentChains(self._managers.collections.find(collection).key)
681 def getCollectionDocumentation(self, collection: str) -> str | None:
682 """Retrieve the documentation string for a collection.
684 Parameters
685 ----------
686 collection : `str`
687 Name of the collection.
689 Returns
690 -------
691 docs : `str` or `None`
692 Docstring for the collection with the given name.
693 """
694 return self._managers.collections.getDocumentation(self._managers.collections.find(collection).key)
696 def setCollectionDocumentation(self, collection: str, doc: str | None) -> None:
697 """Set the documentation string for a collection.
699 Parameters
700 ----------
701 collection : `str`
702 Name of the collection.
703 doc : `str` or `None`
704 Docstring for the collection with the given name; will replace any
705 existing docstring. Passing `None` will remove any existing
706 docstring.
707 """
708 self._managers.collections.setDocumentation(self._managers.collections.find(collection).key, doc)
710 def getCollectionSummary(self, collection: str) -> CollectionSummary:
711 """Return a summary for the given collection.
713 Parameters
714 ----------
715 collection : `str`
716 Name of the collection for which a summary is to be retrieved.
718 Returns
719 -------
720 summary : `~lsst.daf.butler.registry.CollectionSummary`
721 Summary of the dataset types and governor dimension values in
722 this collection.
723 """
724 record = self._managers.collections.find(collection)
725 return self._managers.datasets.getCollectionSummary(record)
727 def registerDatasetType(self, datasetType: DatasetType) -> bool:
728 """Add a new `DatasetType` to the Registry.
730 It is not an error to register the same `DatasetType` twice.
732 Parameters
733 ----------
734 datasetType : `DatasetType`
735 The `DatasetType` to be added.
737 Returns
738 -------
739 inserted : `bool`
740 `True` if ``datasetType`` was inserted, `False` if an identical
741 existing `DatasetType` was found. Note that in either case the
742 DatasetType is guaranteed to be defined in the Registry
743 consistently with the given definition.
745 Raises
746 ------
747 ValueError
748 Raised if the dimensions or storage class are invalid.
749 lsst.daf.butler.registry.ConflictingDefinitionError
750 Raised if this `DatasetType` is already registered with a different
751 definition.
753 Notes
754 -----
755 This method cannot be called within transactions, as it needs to be
756 able to perform its own transaction to be concurrent.
757 """
758 return self._managers.datasets.register_dataset_type(datasetType)
760 def removeDatasetType(self, name: str | tuple[str, ...]) -> None:
761 """Remove the named `DatasetType` from the registry.
763 .. warning::
765 Registry implementations can cache the dataset type definitions.
766 This means that deleting the dataset type definition may result in
767 unexpected behavior from other butler processes that are active
768 that have not seen the deletion.
770 Parameters
771 ----------
772 name : `str` or `tuple` [`str`]
773 Name of the type to be removed or tuple containing a list of type
774 names to be removed. Wildcards are allowed.
776 Raises
777 ------
778 lsst.daf.butler.registry.OrphanedRecordError
779 Raised if an attempt is made to remove the dataset type definition
780 when there are already datasets associated with it.
782 Notes
783 -----
784 If the dataset type is not registered the method will return without
785 action.
786 """
787 for datasetTypeExpression in ensure_iterable(name):
788 # Catch any warnings from the caller specifying a component
789 # dataset type. This will result in an error later but the
790 # warning could be confusing when the caller is not querying
791 # anything.
792 with warnings.catch_warnings():
793 warnings.simplefilter("ignore", category=FutureWarning)
794 datasetTypes = list(self.queryDatasetTypes(datasetTypeExpression))
795 if not datasetTypes:
796 _LOG.info("Dataset type %r not defined", datasetTypeExpression)
797 else:
798 for datasetType in datasetTypes:
799 self._managers.datasets.remove_dataset_type(datasetType.name)
800 _LOG.info("Removed dataset type %r", datasetType.name)
802 def getDatasetType(self, name: str) -> DatasetType:
803 """Get the `DatasetType`.
805 Parameters
806 ----------
807 name : `str`
808 Name of the type.
810 Returns
811 -------
812 type : `DatasetType`
813 The `DatasetType` associated with the given name.
815 Raises
816 ------
817 lsst.daf.butler.registry.MissingDatasetTypeError
818 Raised if the requested dataset type has not been registered.
820 Notes
821 -----
822 This method handles component dataset types automatically, though most
823 other registry operations do not.
824 """
825 parent_name, component = DatasetType.splitDatasetTypeName(name)
826 parent_dataset_type = self._managers.datasets.get_dataset_type(parent_name)
827 if component is None:
828 return parent_dataset_type
829 else:
830 return parent_dataset_type.makeComponentDatasetType(component)
832 def supportsIdGenerationMode(self, mode: DatasetIdGenEnum) -> bool:
833 """Test whether the given dataset ID generation mode is supported by
834 `insertDatasets`.
836 Parameters
837 ----------
838 mode : `DatasetIdGenEnum`
839 Enum value for the mode to test.
841 Returns
842 -------
843 supported : `bool`
844 Whether the given mode is supported.
845 """
846 return True
848 @transactional
849 def insertDatasets(
850 self,
851 datasetType: DatasetType | str,
852 dataIds: Iterable[DataId],
853 run: str | None = None,
854 expand: bool = True,
855 idGenerationMode: DatasetIdGenEnum = DatasetIdGenEnum.UNIQUE,
856 ) -> list[DatasetRef]:
857 """Insert one or more datasets into the `Registry`.
859 This always adds new datasets; to associate existing datasets with
860 a new collection, use ``associate``.
862 Parameters
863 ----------
864 datasetType : `DatasetType` or `str`
865 A `DatasetType` or the name of one.
866 dataIds : `~collections.abc.Iterable` of `dict` or `DataCoordinate`
867 Dimension-based identifiers for the new datasets.
868 run : `str`, optional
869 The name of the run that produced the datasets. Defaults to
870 ``self.defaults.run``.
871 expand : `bool`, optional
872 If `True` (default), expand data IDs as they are inserted. This is
873 necessary in general to allow datastore to generate file templates,
874 but it may be disabled if the caller can guarantee this is
875 unnecessary.
876 idGenerationMode : `DatasetIdGenEnum`, optional
877 Specifies option for generating dataset IDs. By default unique IDs
878 are generated for each inserted dataset.
880 Returns
881 -------
882 refs : `list` of `DatasetRef`
883 Resolved `DatasetRef` instances for all given data IDs (in the same
884 order).
886 Raises
887 ------
888 lsst.daf.butler.registry.DatasetTypeError
889 Raised if ``datasetType`` is not known to registry.
890 lsst.daf.butler.registry.CollectionTypeError
891 Raised if ``run`` collection type is not `~CollectionType.RUN`.
892 lsst.daf.butler.registry.NoDefaultCollectionError
893 Raised if ``run`` is `None` and ``self.defaults.run`` is `None`.
894 lsst.daf.butler.registry.ConflictingDefinitionError
895 If a dataset with the same dataset type and data ID as one of those
896 given already exists in ``run``.
897 lsst.daf.butler.registry.MissingCollectionError
898 Raised if ``run`` does not exist in the registry.
899 """
900 datasetType = self._managers.datasets.conform_exact_dataset_type(datasetType)
901 if run is None:
902 if self.defaults.run is None:
903 raise NoDefaultCollectionError(
904 "No run provided to insertDatasets, and no default from registry construction."
905 )
906 run = self.defaults.run
907 runRecord = self._managers.collections.find(run)
908 if runRecord.type is not CollectionType.RUN:
909 raise CollectionTypeError(
910 f"Given collection is of type {runRecord.type.name}; RUN collection required."
911 )
912 assert isinstance(runRecord, RunRecord)
914 expandedDataIds = [
915 DataCoordinate.standardize(dataId, dimensions=datasetType.dimensions) for dataId in dataIds
916 ]
917 if expand:
918 _LOG.debug("Expanding %d data IDs", len(expandedDataIds))
919 expandedDataIds = self.expand_data_ids(expandedDataIds)
920 _LOG.debug("Finished expanding data IDs")
922 try:
923 refs = list(
924 self._managers.datasets.insert(datasetType.name, runRecord, expandedDataIds, idGenerationMode)
925 )
926 if self._managers.obscore:
927 self._managers.obscore.add_datasets(refs)
928 except sqlalchemy.exc.IntegrityError as err:
929 raise ConflictingDefinitionError(
930 "A database constraint failure was triggered by inserting "
931 f"one or more datasets of type {datasetType} into "
932 f"collection '{run}'. "
933 "This probably means a dataset with the same data ID "
934 "and dataset type already exists, but it may also mean a "
935 "dimension row is missing."
936 ) from err
937 return refs
939 @transactional
940 def _importDatasets(
941 self,
942 datasets: Iterable[DatasetRef],
943 expand: bool = True,
944 assume_new: bool = False,
945 ) -> list[DatasetRef]:
946 """Import one or more datasets into the `Registry`.
948 This differs from `insertDatasets` method in that this method accepts
949 `DatasetRef` instances, which already have a dataset ID.
951 Parameters
952 ----------
953 datasets : `~collections.abc.Iterable` of `DatasetRef`
954 Datasets to be inserted. All `DatasetRef` instances must have
955 identical ``run`` attributes. ``run``
956 attribute can be `None` and defaults to ``self.defaults.run``.
957 Datasets can specify ``id`` attribute which will be used for
958 inserted datasets.
959 Datasets can be of multiple dataset types, but all the dataset
960 types must have the same set of dimensions.
961 expand : `bool`, optional
962 If `True` (default), expand data IDs as they are inserted. This is
963 necessary in general, but it may be disabled if the caller can
964 guarantee this is unnecessary.
965 assume_new : `bool`, optional
966 If `True`, assume datasets are new. If `False`, datasets that are
967 identical to an existing one are ignored.
969 Returns
970 -------
971 refs : `list` of `DatasetRef`
972 `DatasetRef` instances for all given data IDs (in the same order).
973 If any of ``datasets`` has an ID which already exists in the
974 database then it will not be inserted or updated, but a
975 `DatasetRef` will be returned for it in any case.
977 Raises
978 ------
979 lsst.daf.butler.registry.NoDefaultCollectionError
980 Raised if ``run`` is `None` and ``self.defaults.run`` is `None`.
981 lsst.daf.butler.registry.DatasetTypeError
982 Raised if a dataset type is not known to registry.
983 lsst.daf.butler.registry.ConflictingDefinitionError
984 If a dataset with the same dataset type and data ID as one of those
985 given already exists in ``run``, or if ``assume_new=True`` and at
986 least one dataset is not new.
987 lsst.daf.butler.registry.MissingCollectionError
988 Raised if ``run`` does not exist in the registry.
990 Notes
991 -----
992 This method is considered middleware-internal.
993 """
994 datasets = list(datasets)
995 if not datasets:
996 # nothing to do
997 return []
999 # find run name
1000 runs = {dataset.run for dataset in datasets}
1001 if len(runs) != 1:
1002 raise ValueError(f"Multiple run names in input datasets: {runs}")
1003 run = runs.pop()
1005 runRecord = self._managers.collections.find(run)
1006 if runRecord.type is not CollectionType.RUN:
1007 raise CollectionTypeError(
1008 f"Given collection '{runRecord.name}' is of type {runRecord.type.name};"
1009 " RUN collection required."
1010 )
1011 assert isinstance(runRecord, RunRecord)
1013 if expand:
1014 _LOG.debug("Expanding %d data IDs", len(datasets))
1015 datasets = self.expand_refs(datasets)
1016 _LOG.debug("Finished expanding data IDs")
1018 try:
1019 self._managers.datasets.import_(runRecord, datasets, assume_new=assume_new)
1020 if self._managers.obscore:
1021 self._managers.obscore.add_datasets(datasets)
1022 except sqlalchemy.exc.IntegrityError as err:
1023 raise ConflictingDefinitionError(
1024 "A database constraint failure was triggered by inserting "
1025 f"one or more datasets into collection '{run}'. "
1026 "This probably means a dataset with the same data ID "
1027 "and dataset type already exists, but it may also mean a "
1028 "dimension row is missing, or the dataset was assumed to be "
1029 "new when it was not."
1030 ) from err
1031 return datasets
1033 def getDataset(self, id: DatasetId) -> DatasetRef | None:
1034 """Retrieve a Dataset entry.
1036 Parameters
1037 ----------
1038 id : `DatasetId`
1039 The unique identifier for the dataset.
1041 Returns
1042 -------
1043 ref : `DatasetRef` or `None`
1044 A ref to the Dataset, or `None` if no matching Dataset
1045 was found.
1046 """
1047 refs = self._managers.datasets.get_dataset_refs([id])
1048 if len(refs) == 0:
1049 return None
1050 else:
1051 return refs[0]
1053 def _fetch_run_dataset_ids(self, run: str) -> list[DatasetId]:
1054 """Return the IDs of all datasets in the given ``RUN``
1055 collection.
1057 Parameters
1058 ----------
1059 run : `str`
1060 Name of the collection.
1062 Returns
1063 -------
1064 dataset_ids : `list` [`uuid.UUID`]
1065 List of dataset IDs.
1067 Notes
1068 -----
1069 This is a middleware-internal interface.
1070 """
1071 run_record = self._managers.collections.find(run)
1072 if not isinstance(run_record, RunRecord):
1073 raise CollectionTypeError(f"{run!r} is not a RUN collection.")
1074 return self._managers.datasets.fetch_run_dataset_ids(run_record)
1076 @transactional
1077 def removeDatasets(self, refs: Iterable[DatasetRef]) -> None:
1078 """Remove datasets from the Registry.
1080 The datasets will be removed unconditionally from all collections.
1081 `Datastore` records will *not* be deleted; the caller is responsible
1082 for ensuring that the dataset has already been removed from all
1083 Datastores.
1085 Parameters
1086 ----------
1087 refs : `~collections.abc.Iterable` [`DatasetRef`]
1088 References to the datasets to be removed. Should be considered
1089 invalidated upon return.
1091 Raises
1092 ------
1093 lsst.daf.butler.registry.OrphanedRecordError
1094 Raised if any dataset is still present in any `Datastore`.
1095 """
1096 try:
1097 self._managers.datasets.delete(refs)
1098 except sqlalchemy.exc.IntegrityError as err:
1099 raise OrphanedRecordError(
1100 "One or more datasets is still present in one or more Datastores."
1101 ) from err
1103 @transactional
1104 def associate(self, collection: str, refs: Iterable[DatasetRef]) -> None:
1105 """Add existing datasets to a `~CollectionType.TAGGED` collection.
1107 If a DatasetRef with the same exact ID is already in a collection
1108 nothing is changed. If a `DatasetRef` with the same `DatasetType` and
1109 data ID but with different ID exists in the collection,
1110 `~lsst.daf.butler.registry.ConflictingDefinitionError` is raised.
1112 Parameters
1113 ----------
1114 collection : `str`
1115 Indicates the collection the datasets should be associated with.
1116 refs : `~collections.abc.Iterable` [ `DatasetRef` ]
1117 An iterable of resolved `DatasetRef` instances that already exist
1118 in this `Registry`.
1120 Raises
1121 ------
1122 lsst.daf.butler.registry.ConflictingDefinitionError
1123 If a Dataset with the given `DatasetRef` already exists in the
1124 given collection.
1125 lsst.daf.butler.registry.MissingCollectionError
1126 Raised if ``collection`` does not exist in the registry.
1127 lsst.daf.butler.registry.CollectionTypeError
1128 Raise adding new datasets to the given ``collection`` is not
1129 allowed.
1130 """
1131 progress = Progress("lsst.daf.butler.Registry.associate", level=logging.DEBUG)
1132 collectionRecord = self._managers.collections.find(collection)
1133 for datasetType, refsForType in progress.iter_item_chunks(
1134 DatasetRef.iter_by_type(refs), desc="Associating datasets by type"
1135 ):
1136 try:
1137 self._managers.datasets.associate(datasetType, collectionRecord, refsForType)
1138 if self._managers.obscore:
1139 # If a TAGGED collection is being monitored by ObsCore
1140 # manager then we may need to save the dataset.
1141 self._managers.obscore.associate(refsForType, collectionRecord)
1142 except sqlalchemy.exc.IntegrityError as err:
1143 raise ConflictingDefinitionError(
1144 f"Constraint violation while associating dataset of type {datasetType.name} with "
1145 f"collection {collection}. This probably means that one or more datasets with the same "
1146 "dataset type and data ID already exist in the collection, but it may also indicate "
1147 "that the datasets do not exist."
1148 ) from err
1150 @transactional
1151 def disassociate(self, collection: str, refs: Iterable[DatasetRef]) -> None:
1152 """Remove existing datasets from a `~CollectionType.TAGGED` collection.
1154 ``collection`` and ``ref`` combinations that are not currently
1155 associated are silently ignored.
1157 Parameters
1158 ----------
1159 collection : `str`
1160 The collection the datasets should no longer be associated with.
1161 refs : `~collections.abc.Iterable` [ `DatasetRef` ]
1162 An iterable of resolved `DatasetRef` instances that already exist
1163 in this `Registry`.
1165 Raises
1166 ------
1167 lsst.daf.butler.AmbiguousDatasetError
1168 Raised if any of the given dataset references is unresolved.
1169 lsst.daf.butler.registry.MissingCollectionError
1170 Raised if ``collection`` does not exist in the registry.
1171 lsst.daf.butler.registry.CollectionTypeError
1172 Raise adding new datasets to the given ``collection`` is not
1173 allowed.
1174 """
1175 progress = Progress("lsst.daf.butler.Registry.disassociate", level=logging.DEBUG)
1176 collectionRecord = self._managers.collections.find(collection)
1177 for datasetType, refsForType in progress.iter_item_chunks(
1178 DatasetRef.iter_by_type(refs), desc="Disassociating datasets by type"
1179 ):
1180 self._managers.datasets.disassociate(datasetType, collectionRecord, refsForType)
1181 if self._managers.obscore:
1182 self._managers.obscore.disassociate(refsForType, collectionRecord)
1184 @transactional
1185 def certify(self, collection: str, refs: Iterable[DatasetRef], timespan: Timespan) -> None:
1186 """Associate one or more datasets with a calibration collection and a
1187 validity range within it.
1189 Parameters
1190 ----------
1191 collection : `str`
1192 The name of an already-registered `~CollectionType.CALIBRATION`
1193 collection.
1194 refs : `~collections.abc.Iterable` [ `DatasetRef` ]
1195 Datasets to be associated.
1196 timespan : `Timespan`
1197 The validity range for these datasets within the collection.
1199 Raises
1200 ------
1201 lsst.daf.butler.AmbiguousDatasetError
1202 Raised if any of the given `DatasetRef` instances is unresolved.
1203 lsst.daf.butler.registry.ConflictingDefinitionError
1204 Raised if the collection already contains a different dataset with
1205 the same `DatasetType` and data ID and an overlapping validity
1206 range.
1207 DatasetTypeError
1208 Raised if ``ref.datasetType.isCalibration() is False`` for any ref.
1209 CollectionTypeError
1210 Raised if
1211 ``collection.type is not CollectionType.CALIBRATION``.
1212 """
1213 progress = Progress("lsst.daf.butler.Registry.certify", level=logging.DEBUG)
1214 with self._managers.caching_context.enable_collection_record_cache():
1215 collectionRecord = self._managers.collections.find(collection)
1216 for datasetType, refsForType in progress.iter_item_chunks(
1217 DatasetRef.iter_by_type(refs), desc="Certifying datasets by type"
1218 ):
1219 self._managers.datasets.certify(
1220 datasetType, collectionRecord, refsForType, timespan, self._query
1221 )
1223 @transactional
1224 def decertify(
1225 self,
1226 collection: str,
1227 datasetType: str | DatasetType,
1228 timespan: Timespan,
1229 *,
1230 dataIds: Iterable[DataId] | None = None,
1231 ) -> None:
1232 """Remove or adjust datasets to clear a validity range within a
1233 calibration collection.
1235 Parameters
1236 ----------
1237 collection : `str`
1238 The name of an already-registered `~CollectionType.CALIBRATION`
1239 collection.
1240 datasetType : `str` or `DatasetType`
1241 Name or `DatasetType` instance for the datasets to be decertified.
1242 timespan : `Timespan`, optional
1243 The validity range to remove datasets from within the collection.
1244 Datasets that overlap this range but are not contained by it will
1245 have their validity ranges adjusted to not overlap it, which may
1246 split a single dataset validity range into two.
1247 dataIds : `~collections.abc.Iterable` [`dict` or `DataCoordinate`], \
1248 optional
1249 Data IDs that should be decertified within the given validity range
1250 If `None`, all data IDs for ``self.datasetType`` will be
1251 decertified.
1253 Raises
1254 ------
1255 DatasetTypeError
1256 Raised if ``datasetType.isCalibration() is False``.
1257 CollectionTypeError
1258 Raised if
1259 ``collection.type is not CollectionType.CALIBRATION``.
1260 """
1261 collectionRecord = self._managers.collections.find(collection)
1262 if isinstance(datasetType, str):
1263 datasetType = self.getDatasetType(datasetType)
1264 standardizedDataIds = None
1265 if dataIds is not None:
1266 standardizedDataIds = [
1267 DataCoordinate.standardize(d, dimensions=datasetType.dimensions) for d in dataIds
1268 ]
1269 self._managers.datasets.decertify(
1270 datasetType, collectionRecord, timespan, data_ids=standardizedDataIds, query_func=self._query
1271 )
1273 def getDatastoreBridgeManager(self) -> DatastoreRegistryBridgeManager:
1274 """Return an object that allows a new `Datastore` instance to
1275 communicate with this `Registry`.
1277 Returns
1278 -------
1279 manager : `~.interfaces.DatastoreRegistryBridgeManager`
1280 Object that mediates communication between this `Registry` and its
1281 associated datastores.
1282 """
1283 return self._managers.datastores
1285 def getDatasetLocations(self, ref: DatasetRef) -> Iterable[str]:
1286 """Retrieve datastore locations for a given dataset.
1288 Parameters
1289 ----------
1290 ref : `DatasetRef`
1291 A reference to the dataset for which to retrieve storage
1292 information.
1294 Returns
1295 -------
1296 datastores : `~collections.abc.Iterable` [ `str` ]
1297 All the matching datastores holding this dataset.
1299 Raises
1300 ------
1301 lsst.daf.butler.AmbiguousDatasetError
1302 Raised if ``ref.id`` is `None`.
1303 """
1304 return self._managers.datastores.findDatastores(ref)
1306 def expandDataId(
1307 self,
1308 dataId: DataId | None = None,
1309 *,
1310 dimensions: Iterable[str] | DimensionGroup | None = None,
1311 records: Mapping[str, DimensionRecord | None] | None = None,
1312 withDefaults: bool = True,
1313 **kwargs: Any,
1314 ) -> DataCoordinate:
1315 """Expand a dimension-based data ID to include additional information.
1317 Parameters
1318 ----------
1319 dataId : `DataCoordinate` or `dict`, optional
1320 Data ID to be expanded; augmented and overridden by ``kwargs``.
1321 dimensions : `~collections.abc.Iterable` [ `str` ], \
1322 `DimensionGroup`, optional
1323 The dimensions to be identified by the new `DataCoordinate`.
1324 If not provided, will be inferred from the keys of ``dataId`` and
1325 ``**kwargs``, and ``universe`` must be provided unless ``dataId``
1326 is already a `DataCoordinate`.
1327 records : `~collections.abc.Mapping` [`str`, `DimensionRecord`], \
1328 optional
1329 Dimension record data to use before querying the database for that
1330 data, keyed by element name.
1331 withDefaults : `bool`, optional
1332 Utilize ``self.defaults.dataId`` to fill in missing governor
1333 dimension key-value pairs. Defaults to `True` (i.e. defaults are
1334 used).
1335 **kwargs
1336 Additional keywords are treated like additional key-value pairs for
1337 ``dataId``, extending and overriding.
1339 Returns
1340 -------
1341 expanded : `DataCoordinate`
1342 A data ID that includes full metadata for all of the dimensions it
1343 identifies, i.e. guarantees that ``expanded.hasRecords()`` and
1344 ``expanded.hasFull()`` both return `True`.
1346 Raises
1347 ------
1348 lsst.daf.butler.registry.DataIdError
1349 Raised when ``dataId`` or keyword arguments specify unknown
1350 dimensions or values, or when a resulting data ID contains
1351 contradictory key-value pairs, according to dimension
1352 relationships.
1354 Notes
1355 -----
1356 This method cannot be relied upon to reject invalid data ID values
1357 for dimensions that do actually not have any record columns. For
1358 efficiency reasons the records for these dimensions (which have only
1359 dimension key values that are given by the caller) may be constructed
1360 directly rather than obtained from the registry database.
1361 """
1362 if not withDefaults:
1363 defaults = None
1364 else:
1365 defaults = self.defaults.dataId
1366 standardized = DataCoordinate.standardize(
1367 dataId,
1368 dimensions=dimensions,
1369 universe=self.dimensions,
1370 defaults=defaults,
1371 **kwargs,
1372 )
1373 if standardized.hasRecords():
1374 return standardized
1375 if records is None:
1376 records = {}
1377 else:
1378 records = dict(records)
1379 if isinstance(dataId, DataCoordinate) and dataId.hasRecords() and not kwargs:
1380 for element_name in dataId.dimensions.elements:
1381 records[element_name] = dataId.records[element_name]
1382 keys: dict[str, str | int] = dict(standardized.mapping)
1383 for element_name in standardized.dimensions.lookup_order:
1384 element = self.dimensions[element_name]
1385 record = records.get(element_name, ...) # Use ... to mean not found; None might mean NULL
1386 if record is ...:
1387 if element_name in self.dimensions.dimensions.names and keys.get(element_name) is None:
1388 raise DimensionNameError(f"No value or null value for dimension {element_name}.")
1389 else:
1390 record = self._managers.dimensions.fetch_one(
1391 element_name,
1392 DataCoordinate.standardize(keys, dimensions=element.minimal_group),
1393 self.dimension_record_cache,
1394 )
1395 records[element_name] = record
1396 if record is not None:
1397 for d in element.implied:
1398 value = getattr(record, d.name)
1399 if keys.setdefault(d.name, value) != value:
1400 raise InconsistentDataIdError(
1401 f"Data ID {standardized} has {d.name}={keys[d.name]!r}, "
1402 f"but {element_name} implies {d.name}={value!r}."
1403 )
1404 else:
1405 if element_name in standardized.dimensions.names:
1406 raise DataIdValueError(
1407 f"Could not fetch record for dimension {element.name} via keys {keys}."
1408 )
1409 if element.defines_relationships:
1410 raise InconsistentDataIdError(
1411 f"Could not fetch record for element {element_name} via keys {keys}, "
1412 "but it is marked as defining relationships; this means one or more dimensions are "
1413 "have inconsistent values.",
1414 )
1415 return DataCoordinate.standardize(keys, dimensions=standardized.dimensions).expanded(records=records)
1417 def expand_data_ids(self, data_ids: Iterable[DataCoordinate]) -> list[DataCoordinate]:
1418 return expand_data_ids(data_ids, self.dimensions, self._query, self.dimension_record_cache)
1420 def expand_refs(self, dataset_refs: list[DatasetRef]) -> list[DatasetRef]:
1421 expanded_ids = self.expand_data_ids([ref.dataId for ref in dataset_refs])
1422 return [ref.expanded(data_id) for ref, data_id in zip(dataset_refs, expanded_ids)]
1424 def insertDimensionData(
1425 self,
1426 element: DimensionElement | str,
1427 *data: Mapping[str, Any] | DimensionRecord,
1428 conform: bool = True,
1429 replace: bool = False,
1430 skip_existing: bool = False,
1431 ) -> None:
1432 """Insert one or more dimension records into the database.
1434 Parameters
1435 ----------
1436 element : `DimensionElement` or `str`
1437 The `DimensionElement` or name thereof that identifies the table
1438 records will be inserted into.
1439 *data : `dict` or `DimensionRecord`
1440 One or more records to insert.
1441 conform : `bool`, optional
1442 If `False` (`True` is default) perform no checking or conversions,
1443 and assume that ``element`` is a `DimensionElement` instance and
1444 ``data`` is a one or more `DimensionRecord` instances of the
1445 appropriate subclass.
1446 replace : `bool`, optional
1447 If `True` (`False` is default), replace existing records in the
1448 database if there is a conflict.
1449 skip_existing : `bool`, optional
1450 If `True` (`False` is default), skip insertion if a record with
1451 the same primary key values already exists. Unlike
1452 `syncDimensionData`, this will not detect when the given record
1453 differs from what is in the database, and should not be used when
1454 this is a concern.
1455 """
1456 if isinstance(element, str):
1457 element = self.dimensions[element]
1458 if conform:
1459 records = [
1460 row if isinstance(row, DimensionRecord) else element.RecordClass(**row) for row in data
1461 ]
1462 else:
1463 # Ignore typing since caller said to trust them with conform=False.
1464 records = data # type: ignore
1465 if element.name in self.dimension_record_cache:
1466 self.dimension_record_cache.reset()
1467 self._managers.dimensions.insert(
1468 element,
1469 *records,
1470 replace=replace,
1471 skip_existing=skip_existing,
1472 )
1474 def syncDimensionData(
1475 self,
1476 element: DimensionElement | str,
1477 row: Mapping[str, Any] | DimensionRecord,
1478 conform: bool = True,
1479 update: bool = False,
1480 ) -> bool | dict[str, Any]:
1481 """Synchronize the given dimension record with the database, inserting
1482 if it does not already exist and comparing values if it does.
1484 Parameters
1485 ----------
1486 element : `DimensionElement` or `str`
1487 The `DimensionElement` or name thereof that identifies the table
1488 records will be inserted into.
1489 row : `dict` or `DimensionRecord`
1490 The record to insert.
1491 conform : `bool`, optional
1492 If `False` (`True` is default) perform no checking or conversions,
1493 and assume that ``element`` is a `DimensionElement` instance and
1494 ``data`` is a `DimensionRecord` instances of the appropriate
1495 subclass.
1496 update : `bool`, optional
1497 If `True` (`False` is default), update the existing record in the
1498 database if there is a conflict.
1500 Returns
1501 -------
1502 inserted_or_updated : `bool` or `dict`
1503 `True` if a new row was inserted, `False` if no changes were
1504 needed, or a `dict` mapping updated column names to their old
1505 values if an update was performed (only possible if
1506 ``update=True``).
1508 Raises
1509 ------
1510 lsst.daf.butler.registry.ConflictingDefinitionError
1511 Raised if the record exists in the database (according to primary
1512 key lookup) but is inconsistent with the given one.
1513 """
1514 if conform:
1515 if isinstance(element, str):
1516 element = self.dimensions[element]
1517 record = row if isinstance(row, DimensionRecord) else element.RecordClass(**row)
1518 else:
1519 # Ignore typing since caller said to trust them with conform=False.
1520 record = row # type: ignore
1521 if record.definition.name in self.dimension_record_cache:
1522 self.dimension_record_cache.reset()
1523 return self._managers.dimensions.sync(record, update=update)
1525 def queryDatasetTypes(
1526 self,
1527 expression: Any = ...,
1528 *,
1529 missing: list[str] | None = None,
1530 ) -> Iterable[DatasetType]:
1531 """Iterate over the dataset types whose names match an expression.
1533 Parameters
1534 ----------
1535 expression : dataset type expression, optional
1536 An expression that fully or partially identifies the dataset types
1537 to return, such as a `str`, `re.Pattern`, or iterable thereof.
1538 ``...`` can be used to return all dataset types, and is the
1539 default. See :ref:`daf_butler_dataset_type_expressions` for more
1540 information.
1541 missing : `list` of `str`, optional
1542 String dataset type names that were explicitly given (i.e. not
1543 regular expression patterns) but not found will be appended to this
1544 list, if it is provided.
1546 Returns
1547 -------
1548 dataset_types : `~collections.abc.Iterable` [ `DatasetType`]
1549 An `~collections.abc.Iterable` of `DatasetType` instances whose
1550 names match ``expression``.
1552 Raises
1553 ------
1554 lsst.daf.butler.registry.DatasetTypeExpressionError
1555 Raised when ``expression`` is invalid.
1556 """
1557 wildcard = DatasetTypeWildcard.from_expression(expression)
1558 return self._managers.datasets.resolve_wildcard(wildcard, missing=missing)
1560 def queryCollections(
1561 self,
1562 expression: Any = ...,
1563 datasetType: DatasetType | None = None,
1564 collectionTypes: Iterable[CollectionType] | CollectionType = CollectionType.all(),
1565 flattenChains: bool = False,
1566 includeChains: bool | None = None,
1567 ) -> Sequence[str]:
1568 """Iterate over the collections whose names match an expression.
1570 Parameters
1571 ----------
1572 expression : collection expression, optional
1573 An expression that identifies the collections to return, such as
1574 a `str` (for full matches or partial matches via globs),
1575 `re.Pattern` (for partial matches), or iterable thereof. ``...``
1576 can be used to return all collections, and is the default.
1577 See :ref:`daf_butler_collection_expressions` for more information.
1578 datasetType : `DatasetType`, optional
1579 If provided, only yield collections that may contain datasets of
1580 this type. This is a conservative approximation in general; it may
1581 yield collections that do not have any such datasets.
1582 collectionTypes : `~collections.abc.Set` [`CollectionType`] or \
1583 `CollectionType`, optional
1584 If provided, only yield collections of these types.
1585 flattenChains : `bool`, optional
1586 If `True` (`False` is default), recursively yield the child
1587 collections of matching `~CollectionType.CHAINED` collections.
1588 includeChains : `bool`, optional
1589 If `True`, yield records for matching `~CollectionType.CHAINED`
1590 collections. Default is the opposite of ``flattenChains``: include
1591 either CHAINED collections or their children, but not both.
1593 Returns
1594 -------
1595 collections : `~collections.abc.Sequence` [ `str` ]
1596 The names of collections that match ``expression``.
1598 Raises
1599 ------
1600 lsst.daf.butler.registry.CollectionExpressionError
1601 Raised when ``expression`` is invalid.
1603 Notes
1604 -----
1605 The order in which collections are returned is unspecified, except that
1606 the children of a `~CollectionType.CHAINED` collection are guaranteed
1607 to be in the order in which they are searched. When multiple parent
1608 `~CollectionType.CHAINED` collections match the same criteria, the
1609 order in which the two lists appear is unspecified, and the lists of
1610 children may be incomplete if a child has multiple parents.
1611 """
1612 # Right now the datasetTypes argument is completely ignored, but that
1613 # is consistent with its [lack of] guarantees. DM-24939 or a follow-up
1614 # ticket will take care of that.
1615 if datasetType is not None:
1616 warnings.warn(
1617 "The datasetType parameter should no longer be used. It has"
1618 " never had any effect. Will be removed after v28",
1619 FutureWarning,
1620 )
1621 try:
1622 wildcard = CollectionWildcard.from_expression(expression)
1623 except TypeError as exc:
1624 raise CollectionExpressionError(f"Invalid collection expression '{expression}'") from exc
1625 collectionTypes = ensure_iterable(collectionTypes)
1626 return [
1627 record.name
1628 for record in self._managers.collections.resolve_wildcard(
1629 wildcard,
1630 collection_types=frozenset(collectionTypes),
1631 flatten_chains=flattenChains,
1632 include_chains=includeChains,
1633 )
1634 ]
1636 @contextlib.contextmanager
1637 def _query(self) -> Iterator[Query]:
1638 """Context manager returning a `Query` object used for construction
1639 and execution of complex queries.
1640 """
1641 with self._query_driver(self.defaults.collections, self.defaults.dataId) as driver:
1642 yield Query(driver)
1644 @contextlib.contextmanager
1645 def _query_driver(
1646 self,
1647 default_collections: Iterable[str],
1648 default_data_id: DataCoordinate,
1649 ) -> Iterator[DirectQueryDriver]:
1650 """Set up a `QueryDriver` instance for query execution."""
1651 # Query internals do repeated lookups of the same collections, so it
1652 # benefits from the collection record cache.
1653 with self._managers.caching_context.enable_collection_record_cache():
1654 driver = DirectQueryDriver(
1655 self._db,
1656 self.dimensions,
1657 self._managers,
1658 self.dimension_record_cache,
1659 default_collections=default_collections,
1660 default_data_id=default_data_id,
1661 )
1662 with driver:
1663 yield driver
1665 def get_datastore_records(self, ref: DatasetRef) -> DatasetRef:
1666 """Retrieve datastore records for given ref.
1668 Parameters
1669 ----------
1670 ref : `DatasetRef`
1671 Dataset reference for which to retrieve its corresponding datastore
1672 records.
1674 Returns
1675 -------
1676 updated_ref : `DatasetRef`
1677 Dataset reference with filled datastore records.
1679 Notes
1680 -----
1681 If this method is called with the dataset ref that is not known to the
1682 registry then the reference with an empty set of records is returned.
1683 """
1684 datastore_records: dict[str, list[StoredDatastoreItemInfo]] = {}
1685 for opaque, record_class in self._datastore_record_classes.items():
1686 records = self.fetchOpaqueData(opaque, dataset_id=ref.id)
1687 datastore_records[opaque] = [record_class.from_record(record) for record in records]
1688 return ref.replace(datastore_records=datastore_records)
1690 def store_datastore_records(self, refs: Mapping[str, DatasetRef]) -> None:
1691 """Store datastore records for given refs.
1693 Parameters
1694 ----------
1695 refs : `~collections.abc.Mapping` [`str`, `DatasetRef`]
1696 Mapping of a datastore name to dataset reference stored in that
1697 datastore, reference must include datastore records.
1698 """
1699 for datastore_name, ref in refs.items():
1700 # Store ref IDs in the bridge table.
1701 bridge = self._managers.datastores.register(datastore_name)
1702 bridge.insert([ref])
1704 # store records in opaque tables
1705 assert ref._datastore_records is not None, "Dataset ref must have datastore records"
1706 for table_name, records in ref._datastore_records.items():
1707 opaque_table = self._managers.opaque.get(table_name)
1708 assert opaque_table is not None, f"Unexpected opaque table name {table_name}"
1709 opaque_table.insert(*(record.to_record(dataset_id=ref.id) for record in records))
1711 def make_datastore_tables(self, tables: Mapping[str, DatastoreOpaqueTable]) -> None:
1712 """Create opaque tables used by datastores.
1714 Parameters
1715 ----------
1716 tables : `~collections.abc.Mapping`
1717 Maps opaque table name to its definition.
1719 Notes
1720 -----
1721 This method should disappear in the future when opaque table
1722 definitions will be provided during `Registry` construction.
1723 """
1724 datastore_record_classes = {}
1725 for table_name, table_def in tables.items():
1726 datastore_record_classes[table_name] = table_def.record_class
1727 try:
1728 self._managers.opaque.register(table_name, table_def.table_spec)
1729 except ReadOnlyDatabaseError:
1730 # If the database is read only and we just tried and failed to
1731 # create a table, it means someone is trying to create a
1732 # read-only butler client for an empty repo. That should be
1733 # okay, as long as they then try to get any datasets before
1734 # some other client creates the table. Chances are they're
1735 # just validating configuration.
1736 pass
1737 self._datastore_record_classes = datastore_record_classes
1739 def preload_cache(self, *, load_dimension_record_cache: bool) -> None:
1740 """Immediately load caches that are used for common operations.
1742 Parameters
1743 ----------
1744 load_dimension_record_cache : `bool`
1745 If True, preload the dimension record cache. When this cache is
1746 preloaded, subsequent external changes to governor dimension
1747 records will not be visible to this Butler.
1748 """
1749 self._managers.datasets.preload_cache()
1751 if load_dimension_record_cache:
1752 self.dimension_record_cache.preload_cache()
1754 @property
1755 def obsCoreTableManager(self) -> ObsCoreTableManager | None:
1756 """The ObsCore manager instance for this registry
1757 (`~.interfaces.ObsCoreTableManager`
1758 or `None`).
1760 ObsCore manager may not be implemented for all registry backend, or
1761 may not be enabled for many repositories.
1762 """
1763 return self._managers.obscore
1765 storageClasses: StorageClassFactory
1766 """All storage classes known to the registry (`StorageClassFactory`).
1767 """
1769 _defaults: RegistryDefaults
1770 """Default collections used for registry queries (`RegistryDefaults`)."""