Coverage for python / lsst / daf / butler / registry / interfaces / _collections.py: 80%
115 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:37 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:37 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
27from __future__ import annotations
29from ... import ddl
31__all__ = [
32 "ChainedCollectionRecord",
33 "CollectionManager",
34 "CollectionRecord",
35 "Joinable",
36 "JoinedCollectionsTable",
37 "RunRecord",
38]
40from abc import abstractmethod
41from collections.abc import Iterable, Mapping, Set
42from typing import TYPE_CHECKING, Any, Generic, NamedTuple, Self, TypeVar
44import sqlalchemy
46from ..._collection_type import CollectionType
47from ..._timespan import Timespan
48from ..wildcards import CollectionWildcard
49from ._versioning import VersionedExtension, VersionTuple
51if TYPE_CHECKING:
52 from .._caching_context import CachingContext
53 from ._database import Database, StaticTablesContext
56_Key = TypeVar("_Key")
58Joinable = TypeVar("Joinable", sqlalchemy.Select, sqlalchemy.FromClause)
61class CollectionRecord(Generic[_Key]):
62 """A struct used to represent a collection in internal `Registry` APIs.
64 User-facing code should always just use a `str` to represent collections.
66 Parameters
67 ----------
68 key : _Key
69 Unique collection ID, can be the same as ``name`` if ``name`` is used
70 for identification. Usually this is an integer or string, but can be
71 other database-specific type.
72 name : `str`
73 Name of the collection.
74 type : `CollectionType`
75 Enumeration value describing the type of the collection.
77 Notes
78 -----
79 The `name`, `key`, and `type` attributes set by the base class should be
80 considered immutable by all users and derived classes (as these are used
81 in the definition of equality and this is a hashable type). Other
82 attributes defined by subclasses may be mutable, as long as they do not
83 participate in some subclass equality definition.
84 """
86 def __init__(self, key: _Key, name: str, type: CollectionType):
87 self.key = key
88 self.name = name
89 self.type = type
90 assert isinstance(self.type, CollectionType)
92 name: str
93 """Name of the collection (`str`).
94 """
96 key: _Key
97 """The primary/foreign key value for this collection.
98 """
100 type: CollectionType
101 """Enumeration value describing the type of the collection
102 (`CollectionType`).
103 """
105 def __eq__(self, other: Any) -> bool:
106 try:
107 return self.name == other.name and self.type == other.type and self.key == other.key
108 except AttributeError:
109 return NotImplemented
111 def __hash__(self) -> int:
112 return hash(self.name)
114 def __repr__(self) -> str:
115 return f"CollectionRecord(key={self.key!r}, name={self.name!r}, type={self.type!r})"
117 def __str__(self) -> str:
118 return self.name
121class RunRecord(CollectionRecord[_Key]):
122 """A subclass of `CollectionRecord` that adds execution information and
123 an interface for updating it.
125 Parameters
126 ----------
127 key : `object`
128 Unique collection key.
129 name : `str`
130 Name of the collection.
131 host : `str`, optional
132 Name of the host or system on which this run was produced.
133 timespan : `Timespan`, optional
134 Begin and end timestamps for the period over which the run was
135 produced.
136 """
138 host: str | None
139 """Name of the host or system on which this run was produced (`str` or
140 `None`).
141 """
143 timespan: Timespan
144 """Begin and end timestamps for the period over which the run was produced.
145 None`/``NULL`` values are interpreted as infinite bounds.
146 """
148 def __init__(
149 self,
150 key: _Key,
151 name: str,
152 *,
153 host: str | None = None,
154 timespan: Timespan | None = None,
155 ):
156 super().__init__(key=key, name=name, type=CollectionType.RUN)
157 self.host = host
158 if timespan is None:
159 timespan = Timespan(begin=None, end=None)
160 self.timespan = timespan
162 def __repr__(self) -> str:
163 return f"RunRecord(key={self.key!r}, name={self.name!r})"
166class ChainedCollectionRecord(CollectionRecord[_Key]):
167 """A subclass of `CollectionRecord` that adds the list of child collections
168 in a ``CHAINED`` collection.
170 Parameters
171 ----------
172 key : `object`
173 Unique collection key.
174 name : `str`
175 Name of the collection.
176 children : Iterable[str],
177 Ordered sequence of names of child collections.
178 """
180 children: tuple[str, ...]
181 """The ordered search path of child collections that define this chain
182 (`tuple` [ `str` ]).
183 """
185 def __init__(
186 self,
187 key: Any,
188 name: str,
189 *,
190 children: Iterable[str],
191 ):
192 super().__init__(key=key, name=name, type=CollectionType.CHAINED)
193 self.children = tuple(children)
195 def __repr__(self) -> str:
196 return f"ChainedCollectionRecord(key={self.key!r}, name={self.name!r}, children={self.children!r})"
199class CollectionManager(Generic[_Key], VersionedExtension):
200 """An interface for managing the collections (including runs) in a
201 `Registry`.
203 Parameters
204 ----------
205 registry_schema_version : `VersionTuple` or `None`, optional
206 Version of registry schema.
208 Notes
209 -----
210 Each layer in a multi-layer `Registry` has its own record for any
211 collection for which it has datasets (or quanta). Different layers may
212 use different IDs for the same collection, so any usage of the IDs
213 obtained through the `CollectionManager` APIs are strictly for internal
214 (to `Registry`) use.
215 """
217 def __init__(self, *, registry_schema_version: VersionTuple | None = None) -> None:
218 super().__init__(registry_schema_version=registry_schema_version)
220 @abstractmethod
221 def clone(self, db: Database, caching_context: CachingContext) -> Self:
222 """Make an independent copy of this manager instance bound to a new
223 `Database` instance.
225 Parameters
226 ----------
227 db : `Database`
228 New `Database` object to use when instantiating the manager.
229 caching_context : `CachingContext`
230 New `CachingContext` object to use when instantiating the manager.
232 Returns
233 -------
234 instance : `CollectionManager`
235 New manager instance with the same configuration as this instance,
236 but bound to a new Database object.
237 """
238 raise NotImplementedError()
240 @classmethod
241 @abstractmethod
242 def initialize(
243 cls,
244 db: Database,
245 context: StaticTablesContext,
246 *,
247 caching_context: CachingContext,
248 registry_schema_version: VersionTuple | None = None,
249 ) -> CollectionManager:
250 """Construct an instance of the manager.
252 Parameters
253 ----------
254 db : `Database`
255 Interface to the underlying database engine and namespace.
256 context : `StaticTablesContext`
257 Context object obtained from `Database.declareStaticTables`; used
258 to declare any tables that should always be present in a layer
259 implemented with this manager.
260 caching_context : `CachingContext`
261 Object controlling caching of information returned by managers.
262 registry_schema_version : `VersionTuple` or `None`
263 Schema version of this extension as defined in registry.
265 Returns
266 -------
267 manager : `CollectionManager`
268 An instance of a concrete `CollectionManager` subclass.
269 """
270 raise NotImplementedError()
272 @classmethod
273 @abstractmethod
274 def addCollectionForeignKey(
275 cls,
276 tableSpec: ddl.TableSpec,
277 *,
278 prefix: str = "collection",
279 onDelete: str | None = None,
280 constraint: bool = True,
281 **kwargs: Any,
282 ) -> ddl.FieldSpec:
283 """Add a foreign key (field and constraint) referencing the collection
284 table.
286 Parameters
287 ----------
288 tableSpec : `ddl.TableSpec`
289 Specification for the table that should reference the collection
290 table. Will be modified in place.
291 prefix : `str`, optional
292 A name to use for the prefix of the new field; the full name may
293 have a suffix (and is given in the returned `ddl.FieldSpec`).
294 onDelete : `str`, optional
295 One of "CASCADE" or "SET NULL", indicating what should happen to
296 the referencing row if the collection row is deleted. `None`
297 indicates that this should be an integrity error.
298 constraint : `bool`, optional
299 If `False` (`True` is default), add a field that can be joined to
300 the collection primary key, but do not add a foreign key
301 constraint.
302 **kwargs
303 Additional keyword arguments are forwarded to the `ddl.FieldSpec`
304 constructor (only the ``name`` and ``dtype`` arguments are
305 otherwise provided).
307 Returns
308 -------
309 fieldSpec : `ddl.FieldSpec`
310 Specification for the field being added.
311 """
312 raise NotImplementedError()
314 @classmethod
315 @abstractmethod
316 def addRunForeignKey(
317 cls,
318 tableSpec: ddl.TableSpec,
319 *,
320 prefix: str = "run",
321 onDelete: str | None = None,
322 constraint: bool = True,
323 **kwargs: Any,
324 ) -> ddl.FieldSpec:
325 """Add a foreign key (field and constraint) referencing the run
326 table.
328 Parameters
329 ----------
330 tableSpec : `ddl.TableSpec`
331 Specification for the table that should reference the run table.
332 Will be modified in place.
333 prefix : `str`, optional
334 A name to use for the prefix of the new field; the full name may
335 have a suffix (and is given in the returned `ddl.FieldSpec`).
336 onDelete : `str`, optional
337 One of "CASCADE" or "SET NULL", indicating what should happen to
338 the referencing row if the collection row is deleted. `None`
339 indicates that this should be an integrity error.
340 constraint : `bool`, optional
341 If `False` (`True` is default), add a field that can be joined to
342 the run primary key, but do not add a foreign key constraint.
343 **kwargs
344 Additional keyword arguments are forwarded to the `ddl.FieldSpec`
345 constructor (only the ``name`` and ``dtype`` arguments are
346 otherwise provided).
348 Returns
349 -------
350 fieldSpec : `ddl.FieldSpec`
351 Specification for the field being added.
352 """
353 raise NotImplementedError()
355 @classmethod
356 @abstractmethod
357 def getCollectionForeignKeyName(cls, prefix: str = "collection") -> str:
358 """Return the name of the field added by `addCollectionForeignKey`
359 if called with the same prefix.
361 Parameters
362 ----------
363 prefix : `str`
364 A name to use for the prefix of the new field; the full name may
365 have a suffix.
367 Returns
368 -------
369 name : `str`
370 The field name.
371 """
372 raise NotImplementedError()
374 @classmethod
375 @abstractmethod
376 def getRunForeignKeyName(cls, prefix: str = "run") -> str:
377 """Return the name of the field added by `addRunForeignKey`
378 if called with the same prefix.
380 Parameters
381 ----------
382 prefix : `str`
383 A name to use for the prefix of the new field; the full name may
384 have a suffix.
386 Returns
387 -------
388 name : `str`
389 The field name.
390 """
391 raise NotImplementedError()
393 @abstractmethod
394 def refresh(self) -> None:
395 """Ensure all other operations on this manager are aware of any
396 collections that may have been registered by other clients since it
397 was initialized or last refreshed.
398 """
399 raise NotImplementedError()
401 @abstractmethod
402 def register(
403 self, name: str, type: CollectionType, doc: str | None = None
404 ) -> tuple[CollectionRecord[_Key], bool]:
405 """Ensure that a collection of the given name and type are present
406 in the layer this manager is associated with.
408 Parameters
409 ----------
410 name : `str`
411 Name of the collection.
412 type : `CollectionType`
413 Enumeration value indicating the type of collection.
414 doc : `str`, optional
415 Documentation string for the collection. Ignored if the collection
416 already exists.
418 Returns
419 -------
420 record : `CollectionRecord`
421 Object representing the collection, including its type and ID.
422 If ``type is CollectionType.RUN``, this will be a `RunRecord`
423 instance. If ``type is CollectionType.CHAIN``, this will be a
424 `ChainedCollectionRecord` instance.
425 registered : `bool`
426 True if the collection was registered, `False` if it already
427 existed.
429 Raises
430 ------
431 TransactionInterruption
432 Raised if this operation is invoked within a `Database.transaction`
433 context.
434 DatabaseConflictError
435 Raised if a collection with this name but a different type already
436 exists.
438 Notes
439 -----
440 Concurrent registrations of the same collection should be safe; nothing
441 should happen if the types are consistent, and integrity errors due to
442 inconsistent types should happen before any database changes are made.
443 """
444 raise NotImplementedError()
446 @abstractmethod
447 def remove(self, name: str) -> None:
448 """Completely remove a collection.
450 Any existing `CollectionRecord` objects that correspond to the removed
451 collection are considered invalidated.
453 Parameters
454 ----------
455 name : `str`
456 Name of the collection to remove.
458 Notes
459 -----
460 If this collection is referenced by foreign keys in tables managed by
461 other objects, the ON DELETE clauses of those tables will be invoked.
462 That will frequently delete many dependent rows automatically (via
463 "CASCADE", but it may also cause this operation to fail (with rollback)
464 unless dependent rows that do not have an ON DELETE clause are removed
465 first.
466 """
467 raise NotImplementedError()
469 @abstractmethod
470 def find(self, name: str) -> CollectionRecord[_Key]:
471 """Return the collection record associated with the given name.
473 Parameters
474 ----------
475 name : `str`
476 Name of the collection.
478 Returns
479 -------
480 record : `CollectionRecord`
481 Object representing the collection, including its type and ID.
482 If ``record.type is CollectionType.RUN``, this will be a
483 `RunRecord` instance. If ``record.type is CollectionType.CHAIN``,
484 this will be a `ChainedCollectionRecord` instance.
486 Raises
487 ------
488 MissingCollectionError
489 Raised if the given collection does not exist.
491 Notes
492 -----
493 Collections registered by another client of the same layer since the
494 last call to `initialize` or `refresh` may not be found.
495 """
496 raise NotImplementedError()
498 @abstractmethod
499 def __getitem__(self, key: Any) -> CollectionRecord[_Key]:
500 """Return the collection record associated with the given
501 primary/foreign key value.
503 Parameters
504 ----------
505 key : `typing.Any`
506 Internal primary key value for the collection.
508 Returns
509 -------
510 record : `CollectionRecord`
511 Object representing the collection, including its type and name.
512 If ``record.type is CollectionType.RUN``, this will be a
513 `RunRecord` instance. If ``record.type is CollectionType.CHAIN``,
514 this will be a `ChainedCollectionRecord` instance.
516 Raises
517 ------
518 MissingCollectionError
519 Raised if no collection with this key exists.
521 Notes
522 -----
523 Collections registered by another client of the same layer since the
524 last call to `initialize` or `refresh` may not be found.
525 """
526 raise NotImplementedError()
528 @abstractmethod
529 def resolve_wildcard(
530 self,
531 wildcard: CollectionWildcard,
532 *,
533 collection_types: Set[CollectionType] = CollectionType.all(),
534 flatten_chains: bool = True,
535 include_chains: bool | None = None,
536 ) -> list[CollectionRecord[_Key]]:
537 """Iterate over collection records that match a wildcard.
539 Parameters
540 ----------
541 wildcard : `CollectionWildcard`
542 Names and/or patterns for collections.
543 collection_types : `collections.abc.Set` [ `CollectionType` ], optional
544 If provided, only yield collections of these types.
545 flatten_chains : `bool`, optional
546 If `True` (default) recursively yield the child collections of
547 `~CollectionType.CHAINED` collections.
548 include_chains : `bool`, optional
549 If `True`, return records for `~CollectionType.CHAINED`
550 collections themselves. The default is the opposite of
551 ``flatten_chains``: either return records for CHAINED collections
552 or their children, but not both.
554 Returns
555 -------
556 records : `list` [ `CollectionRecord` ]
557 Matching collection records.
558 """
559 raise NotImplementedError()
561 @abstractmethod
562 def getDocumentation(self, key: _Key) -> str | None:
563 """Retrieve the documentation string for a collection.
565 Parameters
566 ----------
567 key : _Key
568 Internal primary key value for the collection.
570 Returns
571 -------
572 docs : `str` or `None`
573 Docstring for the collection with the given key.
574 """
575 raise NotImplementedError()
577 @abstractmethod
578 def get_docs(self, key: Iterable[_Key]) -> Mapping[_Key, str]:
579 """Retrieve the documentation string for multiple collections.
581 Parameters
582 ----------
583 key : `~collections.abc.Iterable` [ _Key ]
584 Internal primary key value for the collection.
586 Returns
587 -------
588 docs : `~collections.abc.Mapping` [ _Key, `str`]
589 Documentation strings indexed by collection key. Only collections
590 with non-empty documentation strings are returned.
591 """
592 raise NotImplementedError()
594 @abstractmethod
595 def setDocumentation(self, key: _Key, doc: str | None) -> None:
596 """Set the documentation string for a collection.
598 Parameters
599 ----------
600 key : _Key
601 Internal primary key value for the collection.
602 doc : `str`, optional
603 Docstring for the collection with the given key.
604 """
605 raise NotImplementedError()
607 @abstractmethod
608 def getParentChains(self, key: _Key) -> set[str]:
609 """Find all CHAINED collection names that directly contain the given
610 collection.
612 Parameters
613 ----------
614 key : _Key
615 Internal primary key value for the collection.
617 Returns
618 -------
619 names : `set` [`str`]
620 Parent collection names.
621 """
622 raise NotImplementedError()
624 @abstractmethod
625 def update_chain(
626 self,
627 parent_collection_name: str,
628 child_collection_names: list[str],
629 allow_use_in_caching_context: bool = False,
630 ) -> None:
631 """Replace all of the children in a chained collection with a new list.
633 Parameters
634 ----------
635 parent_collection_name : `str`
636 The name of a CHAINED collection to be modified.
637 child_collection_names : `list` [ `str` ]
638 A child collection name or list of child collection names to be
639 assigned to the parent.
640 allow_use_in_caching_context : `bool`, optional
641 If `True`, skip a check that would otherwise disallow this function
642 from being called inside an active caching context.
643 (Only exists for legacy use, will eventually be removed).
645 Raises
646 ------
647 MissingCollectionError
648 If any of the specified collections do not exist.
649 CollectionTypeError
650 If the parent collection is not a CHAINED collection.
651 CollectionCycleError
652 If this operation would create a collection cycle.
654 Notes
655 -----
656 If this function is called within a call to ``Butler.transaction``, it
657 will hold a lock that prevents other processes from modifying the
658 parent collection until the end of the transaction. Keep these
659 transactions short.
660 """
661 raise NotImplementedError()
663 @abstractmethod
664 def prepend_collection_chain(
665 self, parent_collection_name: str, child_collection_names: list[str]
666 ) -> None:
667 """Add children to the beginning of a CHAINED collection.
669 If any of the children already existed in the chain, they will be moved
670 to the new position at the beginning of the chain.
672 Parameters
673 ----------
674 parent_collection_name : `str`
675 The name of a CHAINED collection to which we will add new children.
676 child_collection_names : `list` [ `str` ]
677 A child collection name or list of child collection names to be
678 added to the parent.
680 Raises
681 ------
682 MissingCollectionError
683 If any of the specified collections do not exist.
684 CollectionTypeError
685 If the parent collection is not a CHAINED collection.
686 CollectionCycleError
687 If this operation would create a collection cycle.
689 Notes
690 -----
691 If this function is called within a call to ``Butler.transaction``, it
692 will hold a lock that prevents other processes from modifying the
693 parent collection until the end of the transaction. Keep these
694 transactions short.
695 """
696 raise NotImplementedError()
698 @abstractmethod
699 def extend_collection_chain(self, parent_collection_name: str, child_collection_names: list[str]) -> None:
700 """Add children to the end of a CHAINED collection.
702 If any of the children already existed in the chain, they will be moved
703 to the new position at the end of the chain.
705 Parameters
706 ----------
707 parent_collection_name : `str`
708 The name of a CHAINED collection to which we will add new children.
709 child_collection_names : `list` [ `str` ]
710 A child collection name or list of child collection names to be
711 added to the parent.
713 Raises
714 ------
715 MissingCollectionError
716 If any of the specified collections do not exist.
717 CollectionTypeError
718 If the parent collection is not a CHAINED collection.
719 CollectionCycleError
720 If this operation would create a collection cycle.
722 Notes
723 -----
724 If this function is called within a call to ``Butler.transaction``, it
725 will hold a lock that prevents other processes from modifying the
726 parent collection until the end of the transaction. Keep these
727 transactions short.
728 """
729 raise NotImplementedError()
731 @abstractmethod
732 def remove_from_collection_chain(
733 self, parent_collection_name: str, child_collection_names: list[str]
734 ) -> None:
735 """Remove children from a CHAINED collection.
737 Parameters
738 ----------
739 parent_collection_name : `str`
740 The name of a CHAINED collection from which we will remove
741 children.
742 child_collection_names : `list` [ `str` ]
743 A child collection name or list of child collection names to be
744 removed from the parent.
746 Raises
747 ------
748 MissingCollectionError
749 If any of the specified collections do not exist.
750 CollectionTypeError
751 If the parent collection is not a CHAINED collection.
753 Notes
754 -----
755 If this function is called within a call to ``Butler.transaction``, it
756 will hold a lock that prevents other processes from modifying the
757 parent collection until the end of the transaction. Keep these
758 transactions short.
759 """
760 raise NotImplementedError()
762 def lookup_name_sql(
763 self, sql_key: sqlalchemy.ColumnElement[_Key], sql_from_clause: Joinable
764 ) -> tuple[sqlalchemy.ColumnElement[str], Joinable]:
765 """Return a SQLAlchemy column and FROM clause that enable a query
766 to look up a collection name from the key.
768 Parameters
769 ----------
770 sql_key : `sqlalchemy.ColumnElement`
771 SQL column expression that evaluates to the collection key.
772 sql_from_clause : `sqlalchemy.FromClause` or `sqlalchemy.Select`
773 SQL FROM clause or select statement from which ``sql_key`` was
774 obtained.
776 Returns
777 -------
778 sql_name : `sqlalchemy.ColumnElement` [ `str` ]
779 SQL column expression that evalutes to the collection name.
780 joined_sql : depends on input type
781 The result of calling join() on the given ``sql_from_clause``,
782 to join in the table needed to provide ``sql_name``.
783 """
784 raise NotImplementedError()
786 def join_collections_sql(
787 self, sql_key: sqlalchemy.ColumnElement[_Key], sql_from_clause: Joinable
788 ) -> JoinedCollectionsTable[Joinable]:
789 """Return a SQLAlchemy column and FROM clause that enable a query
790 to look up collection name and collection type from the collection key.
792 Parameters
793 ----------
794 sql_key : `sqlalchemy.ColumnElement`
795 SQL column expression that evaluates to the collection key.
796 sql_from_clause : `sqlalchemy.FromClause` or `sqlalchemy.Select`
797 SQL FROM clause or select statement from which ``sql_key`` was
798 obtained.
800 Returns
801 -------
802 sql : `JoinedCollectionsTable`
803 Object giving access to the collection table columns.
804 """
805 raise NotImplementedError()
807 def _block_for_concurrency_test(self) -> None:
808 """No-op normally. Provide a place for unit tests to hook in and
809 verify locking behavior.
810 """
813class JoinedCollectionsTable(NamedTuple, Generic[Joinable]):
814 """Container for information needed to access collection table columns."""
816 joined_sql: Joinable
817 """Input SQL statement modified by joining the collections table."""
818 name_column: sqlalchemy.ColumnElement[str]
819 """Column from the joined collection table holding the collection name."""
820 type_column: sqlalchemy.ColumnElement[int]
821 """Column from the joined collection table holding the collection type."""