Coverage for python / lsst / daf / butler / direct_butler / _direct_butler_collections.py: 24%
79 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 08:43 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 08:43 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30__all__ = ("DirectButlerCollections",)
32from collections.abc import Iterable, Mapping, Sequence, Set
33from typing import TYPE_CHECKING, Any
35import sqlalchemy
37from lsst.utils.iteration import ensure_iterable
39from .._butler_collections import ButlerCollections, CollectionInfo
40from .._collection_type import CollectionType
41from ..registry._exceptions import OrphanedRecordError
42from ..registry.interfaces import ChainedCollectionRecord
43from ..registry.sql_registry import SqlRegistry
44from ..registry.wildcards import CollectionWildcard
46if TYPE_CHECKING:
47 from .._dataset_type import DatasetType
48 from ..registry._collection_summary import CollectionSummary
51class DirectButlerCollections(ButlerCollections):
52 """Implementation of ButlerCollections for DirectButler.
54 Parameters
55 ----------
56 registry : `~lsst.daf.butler.registry.sql_registry.SqlRegistry`
57 Registry object used to work with the collections database.
58 """
60 def __init__(self, registry: SqlRegistry):
61 self._registry = registry
63 @property
64 def defaults(self) -> Sequence[str]:
65 return self._registry.defaults.collections
67 def extend_chain(self, parent_collection_name: str, child_collection_names: str | Iterable[str]) -> None:
68 return self._registry._managers.collections.extend_collection_chain(
69 parent_collection_name, list(ensure_iterable(child_collection_names))
70 )
72 def prepend_chain(self, parent_collection_name: str, child_collection_names: str | Iterable[str]) -> None:
73 return self._registry._managers.collections.prepend_collection_chain(
74 parent_collection_name, list(ensure_iterable(child_collection_names))
75 )
77 def redefine_chain(
78 self, parent_collection_name: str, child_collection_names: str | Iterable[str]
79 ) -> None:
80 self._registry._managers.collections.update_chain(
81 parent_collection_name, list(ensure_iterable(child_collection_names))
82 )
84 def remove_from_chain(
85 self, parent_collection_name: str, child_collection_names: str | Iterable[str]
86 ) -> None:
87 return self._registry._managers.collections.remove_from_collection_chain(
88 parent_collection_name, list(ensure_iterable(child_collection_names))
89 )
91 def query(
92 self,
93 expression: str | Iterable[str],
94 collection_types: Set[CollectionType] | CollectionType | None = None,
95 flatten_chains: bool = False,
96 include_chains: bool | None = None,
97 ) -> Sequence[str]:
98 if collection_types is None:
99 collection_types = CollectionType.all()
100 # Do not use base implementation for now to avoid the additional
101 # unused queries.
102 return self._registry.queryCollections(
103 expression,
104 collectionTypes=collection_types,
105 flattenChains=flatten_chains,
106 includeChains=include_chains,
107 )
109 def query_info(
110 self,
111 expression: str | Iterable[str],
112 collection_types: Set[CollectionType] | CollectionType | None = None,
113 flatten_chains: bool = False,
114 include_chains: bool | None = None,
115 include_parents: bool = False,
116 include_summary: bool = False,
117 include_doc: bool = False,
118 summary_datasets: Iterable[DatasetType] | Iterable[str] | None = None,
119 ) -> Sequence[CollectionInfo]:
120 info = []
121 if collection_types is None:
122 collection_types = CollectionType.all()
123 elif isinstance(collection_types, CollectionType):
124 collection_types = {collection_types}
126 records = self._registry._managers.collections.resolve_wildcard(
127 CollectionWildcard.from_expression(expression),
128 collection_types=collection_types,
129 flatten_chains=flatten_chains,
130 include_chains=include_chains,
131 )
133 summaries: Mapping[Any, CollectionSummary] = {}
134 if include_summary:
135 summaries = self._registry._managers.datasets.fetch_summaries(records, summary_datasets)
137 docs: Mapping[Any, str] = {}
138 if include_doc:
139 docs = self._registry._managers.collections.get_docs(record.key for record in records)
141 for record in records:
142 doc = docs.get(record.key, "")
143 children: tuple[str, ...] = tuple()
144 if record.type == CollectionType.CHAINED:
145 assert isinstance(record, ChainedCollectionRecord)
146 children = tuple(record.children)
147 parents: frozenset[str] | None = None
148 if include_parents:
149 # TODO: This is non-vectorized, so expensive to do in a
150 # loop.
151 parents = frozenset(self._registry.getCollectionParentChains(record.name))
152 dataset_types: Set[str] | None = None
153 if summary := summaries.get(record.key):
154 dataset_types = frozenset([dt.name for dt in summary.dataset_types])
156 info.append(
157 CollectionInfo(
158 name=record.name,
159 type=record.type,
160 doc=doc,
161 parents=parents,
162 children=children,
163 dataset_types=dataset_types,
164 )
165 )
167 return info
169 def get_info(
170 self, name: str, include_parents: bool = False, include_summary: bool = False
171 ) -> CollectionInfo:
172 record = self._registry.get_collection_record(name)
173 doc = self._registry.getCollectionDocumentation(name) or ""
174 children: tuple[str, ...] = tuple()
175 if record.type == CollectionType.CHAINED:
176 assert isinstance(record, ChainedCollectionRecord)
177 children = tuple(record.children)
178 parents: set[str] | None = None
179 if include_parents:
180 parents = self._registry.getCollectionParentChains(name)
181 dataset_types: Set[str] | None = None
182 if include_summary:
183 summary = self._registry.getCollectionSummary(name)
184 dataset_types = frozenset([dt.name for dt in summary.dataset_types])
186 return CollectionInfo(
187 name=name,
188 type=record.type,
189 doc=doc,
190 parents=parents,
191 children=children,
192 dataset_types=dataset_types,
193 )
195 def register(self, name: str, type: CollectionType = CollectionType.RUN, doc: str | None = None) -> bool:
196 return self._registry.registerCollection(name, type, doc)
198 def x_remove(self, name: str) -> None:
199 try:
200 self._registry.removeCollection(name)
201 except sqlalchemy.exc.IntegrityError as e:
202 raise OrphanedRecordError(f"Datasets in run {name} are still referenced elsewhere.") from e