Coverage for python / lsst / daf / butler / remote_butler / _registry.py: 0%
125 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 08:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 08:48 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30import contextlib
31from collections.abc import Iterable, Iterator, Mapping, Sequence
32from typing import Any
34from lsst.daf.butler import Butler
35from lsst.utils.iteration import ensure_iterable
37from .._collection_type import CollectionType
38from .._dataset_ref import DatasetId, DatasetIdGenEnum, DatasetRef
39from .._dataset_type import DatasetType
40from .._storage_class import StorageClassFactory
41from .._timespan import Timespan
42from ..dimensions import (
43 DataCoordinate,
44 DataId,
45 DimensionElement,
46 DimensionGroup,
47 DimensionRecord,
48 DimensionUniverse,
49)
50from ..registry import (
51 CollectionArgType,
52 CollectionSummary,
53 CollectionTypeError,
54 DatasetTypeError,
55 RegistryDefaults,
56)
57from ..registry._registry_base import RegistryBase
58from ._collection_args import (
59 convert_collection_arg_to_glob_string_list,
60 convert_dataset_type_arg_to_glob_string_list,
61)
62from ._defaults import DefaultsHolder
63from ._http_connection import RemoteButlerHttpConnection, parse_model
64from .server_models import (
65 ExpandDataIdRequestModel,
66 ExpandDataIdResponseModel,
67 GetCollectionSummaryResponseModel,
68 QueryDatasetTypesRequestModel,
69 QueryDatasetTypesResponseModel,
70)
73class RemoteButlerRegistry(RegistryBase):
74 """Implementation of the `Registry` interface for `RemoteButler.
76 Parameters
77 ----------
78 butler : `Butler`
79 Butler instance to which this registry delegates operations.
80 defaults : `DefaultHolder`
81 Reference to object containing default collections and data ID.
82 connection : `RemoteButlerHttpConnection`
83 HTTP connection to Butler server for looking up data.
84 """
86 def __init__(self, butler: Butler, defaults: DefaultsHolder, connection: RemoteButlerHttpConnection):
87 super().__init__(butler)
88 self._connection = connection
89 self._defaults = defaults
91 def isWriteable(self) -> bool:
92 return self._butler.isWriteable()
94 @property
95 def dimensions(self) -> DimensionUniverse:
96 return self._butler.dimensions
98 @property
99 def defaults(self) -> RegistryDefaults:
100 return self._defaults.get()
102 @defaults.setter
103 def defaults(self, value: RegistryDefaults) -> None:
104 value.finish(self)
105 self._defaults.set(value)
107 def refresh(self) -> None:
108 # In theory the server should manage all necessary invalidation of
109 # state.
110 pass
112 def refresh_collection_summaries(self) -> None:
113 # Docstring inherited from a base class.
114 raise NotImplementedError()
116 @contextlib.contextmanager
117 def caching_context(self) -> Iterator[None]:
118 yield
120 @contextlib.contextmanager
121 def transaction(self, *, savepoint: bool = False) -> Iterator[None]:
122 # RemoteButler will never support transactions
123 raise NotImplementedError()
125 def registerCollection(
126 self, name: str, type: CollectionType = CollectionType.TAGGED, doc: str | None = None
127 ) -> bool:
128 raise NotImplementedError()
130 def getCollectionType(self, name: str) -> CollectionType:
131 return self._butler.collections.get_info(name).type
133 def registerRun(self, name: str, doc: str | None = None) -> bool:
134 raise NotImplementedError()
136 def removeCollection(self, name: str) -> None:
137 raise NotImplementedError()
139 def getCollectionChain(self, parent: str) -> Sequence[str]:
140 info = self._butler.collections.get_info(parent)
141 if info.type is not CollectionType.CHAINED:
142 raise CollectionTypeError(f"Collection '{parent}' has type {info.type.name}, not CHAINED.")
143 return info.children
145 def setCollectionChain(self, parent: str, children: Any, *, flatten: bool = False) -> None:
146 raise NotImplementedError()
148 def getCollectionParentChains(self, collection: str) -> set[str]:
149 info = self._butler.collections.get_info(collection, include_parents=True)
150 assert info.parents is not None, "Requested list of parents from server, but it did not send them."
151 return set(info.parents)
153 def getCollectionDocumentation(self, collection: str) -> str | None:
154 doc = self._butler.collections.get_info(collection).doc
155 if not doc:
156 return None
157 return doc
159 def setCollectionDocumentation(self, collection: str, doc: str | None) -> None:
160 raise NotImplementedError()
162 def getCollectionSummary(self, collection: str) -> CollectionSummary:
163 response = self._connection.get("collection_summary", {"name": collection})
164 parsed = parse_model(response, GetCollectionSummaryResponseModel)
165 return CollectionSummary.from_simple(parsed.summary, self.dimensions)
167 def registerDatasetType(self, datasetType: DatasetType) -> bool:
168 raise NotImplementedError()
170 def removeDatasetType(self, name: str | tuple[str, ...]) -> None:
171 raise NotImplementedError()
173 def getDatasetType(self, name: str) -> DatasetType:
174 return self._butler.get_dataset_type(name)
176 def supportsIdGenerationMode(self, mode: DatasetIdGenEnum) -> bool:
177 raise NotImplementedError()
179 def findDataset(
180 self,
181 datasetType: DatasetType | str,
182 dataId: DataId | None = None,
183 *,
184 collections: CollectionArgType | None = None,
185 timespan: Timespan | None = None,
186 datastore_records: bool = False,
187 **kwargs: Any,
188 ) -> DatasetRef | None:
189 # Components are supported by Butler.find_dataset, but are required to
190 # raise an exception in registry.findDataset (see
191 # RegistryTests.testComponentLookups). Apparently the registry version
192 # used to support components, but at some point a decision was made to
193 # draw a hard architectural boundary between registry and datastore so
194 # that registry is no longer allowed to know about components.
195 if _is_component_dataset_type(datasetType):
196 raise DatasetTypeError(
197 "Component dataset types are not supported;"
198 " use DatasetRef or DatasetType methods to obtain components from parents instead."
199 )
201 if collections is not None:
202 collections = convert_collection_arg_to_glob_string_list(collections)
204 return self._butler.find_dataset(
205 datasetType,
206 dataId,
207 collections=collections,
208 timespan=timespan,
209 datastore_records=datastore_records,
210 **kwargs,
211 )
213 def insertDatasets(
214 self,
215 datasetType: DatasetType | str,
216 dataIds: Iterable[DataId],
217 run: str | None = None,
218 expand: bool = True,
219 idGenerationMode: DatasetIdGenEnum = DatasetIdGenEnum.UNIQUE,
220 ) -> list[DatasetRef]:
221 raise NotImplementedError()
223 def _importDatasets(
224 self,
225 datasets: Iterable[DatasetRef],
226 expand: bool = True,
227 assume_new: bool = False,
228 ) -> list[DatasetRef]:
229 raise NotImplementedError()
231 def getDataset(self, id: DatasetId) -> DatasetRef | None:
232 return self._butler.get_dataset(id)
234 def _fetch_run_dataset_ids(self, run: str) -> list[DatasetId]:
235 raise NotImplementedError()
237 def removeDatasets(self, refs: Iterable[DatasetRef]) -> None:
238 raise NotImplementedError()
240 def associate(self, collection: str, refs: Iterable[DatasetRef]) -> None:
241 raise NotImplementedError()
243 def disassociate(self, collection: str, refs: Iterable[DatasetRef]) -> None:
244 raise NotImplementedError()
246 def certify(self, collection: str, refs: Iterable[DatasetRef], timespan: Timespan) -> None:
247 raise NotImplementedError()
249 def decertify(
250 self,
251 collection: str,
252 datasetType: str | DatasetType,
253 timespan: Timespan,
254 *,
255 dataIds: Iterable[DataId] | None = None,
256 ) -> None:
257 raise NotImplementedError()
259 def getDatasetLocations(self, ref: DatasetRef) -> Iterable[str]:
260 raise NotImplementedError()
262 def expandDataId(
263 self,
264 dataId: DataId | None = None,
265 *,
266 dimensions: Iterable[str] | DimensionGroup | None = None,
267 records: Mapping[str, DimensionRecord | None] | None = None,
268 withDefaults: bool = True,
269 **kwargs: Any,
270 ) -> DataCoordinate:
271 standardized = DataCoordinate.standardize(
272 dataId,
273 dimensions=dimensions,
274 universe=self.dimensions,
275 defaults=self.defaults.dataId if withDefaults else None,
276 **kwargs,
277 )
278 if standardized.hasRecords():
279 return standardized
281 request = ExpandDataIdRequestModel(data_id=standardized.to_simple().dataId)
282 response = self._connection.post("expand_data_id", request)
283 model = parse_model(response, ExpandDataIdResponseModel)
284 return DataCoordinate.from_simple(model.data_coordinate, self.dimensions)
286 def insertDimensionData(
287 self,
288 element: DimensionElement | str,
289 *data: Mapping[str, Any] | DimensionRecord,
290 conform: bool = True,
291 replace: bool = False,
292 skip_existing: bool = False,
293 ) -> None:
294 raise NotImplementedError()
296 def syncDimensionData(
297 self,
298 element: DimensionElement | str,
299 row: Mapping[str, Any] | DimensionRecord,
300 conform: bool = True,
301 update: bool = False,
302 ) -> bool | dict[str, Any]:
303 raise NotImplementedError()
305 def queryDatasetTypes(
306 self,
307 expression: Any = ...,
308 *,
309 components: bool = False,
310 missing: list[str] | None = None,
311 ) -> Iterable[DatasetType]:
312 query = convert_dataset_type_arg_to_glob_string_list(expression)
313 request = QueryDatasetTypesRequestModel(search=query.search)
314 response = self._connection.post("query_dataset_types", request)
315 model = parse_model(response, QueryDatasetTypesResponseModel)
316 if missing is not None:
317 missing.extend(model.missing)
319 result = []
320 for dt in model.dataset_types:
321 if dt.name in query.explicit_dataset_types:
322 # Users are permitted to pass in already-existing DatasetType
323 # instances, and we are supposed to preserve their overridden
324 # storage class etc.
325 result.append(query.explicit_dataset_types[dt.name])
326 else:
327 result.append(DatasetType.from_simple(dt, self.dimensions))
329 return result
331 def queryCollections(
332 self,
333 expression: Any = ...,
334 datasetType: DatasetType | None = None,
335 collectionTypes: Iterable[CollectionType] | CollectionType = CollectionType.all(),
336 flattenChains: bool = False,
337 includeChains: bool | None = None,
338 ) -> Sequence[str]:
339 return self._butler.collections.query(
340 expression,
341 collection_types=set(ensure_iterable(collectionTypes)),
342 flatten_chains=flattenChains,
343 include_chains=includeChains,
344 )
346 @property
347 def storageClasses(self) -> StorageClassFactory:
348 return self._butler.storageClasses
351def _is_component_dataset_type(dataset_type: DatasetType | str) -> bool:
352 """Return true if the given dataset type refers to a component."""
353 if isinstance(dataset_type, DatasetType):
354 return dataset_type.component() is not None
355 elif isinstance(dataset_type, str):
356 parent, component = DatasetType.splitDatasetTypeName(dataset_type)
357 return component is not None
358 else:
359 raise TypeError(f"Expected DatasetType or str, got {dataset_type!r}")