Coverage for python / lsst / pipe / base / tests / in_memory_limited_butler.py: 36%
64 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 08:44 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 08:44 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30__all__ = ["InMemoryLimitedButler"]
32import logging
33import uuid
34from collections.abc import Iterable
35from typing import Any
37from lsst.daf.butler import (
38 ButlerMetrics,
39 DatasetProvenance,
40 DatasetRef,
41 DatasetType,
42 DimensionUniverse,
43 LimitedButler,
44 MissingDatasetTypeError,
45 Quantum,
46 StorageClass,
47 StorageClassFactory,
48)
49from lsst.daf.butler.registry import ConflictingDefinitionError
51from .._dataset_handle import InMemoryDatasetHandle
53_LOG = logging.getLogger(__name__)
56class InMemoryLimitedButler(LimitedButler):
57 """A `LimitedButler` that just stores datasets in an in-memory mapping.
59 Parameters
60 ----------
61 universe : `lsst.daf.butler.DimensionUniverse`
62 Definitions for all dimensions.
63 dataset_types : `~collections.abc.Iterable` [ \
64 `lsst.daf.butler.DatasetType` ]
65 Definitions of all dataset types.
67 Notes
68 -----
69 This is an incomplete implementation of the `LimitedButler` interface
70 intended only for tests. It supports all methods required by
71 `SingleQuantumExecutor`, but not transfers or URI retrieval.
73 While this class supports storage class conversions in `get` and `put`, it
74 uses different code paths from real butlers, and should not be used in
75 tests in which storage class correctness is part of what is being tested.
77 Objects are always copied (via storage class machinery) by `get`.
79 Pickling this class will pickle all datasets already `put` (which must be
80 pickleable). This generally allows a central butler to be initialized with
81 input datasets in one process and distributed to worker processes that run
82 quanta *once*, but it does not allow outputs from a worker process to be
83 distributed to others or the originating process. This can be hard to
84 notice because quanta will usually be skipped with
85 `lsst.pipe.base.NoWorkFound` (a success!) when all of their inputs are
86 missing.
87 """
89 def __init__(self, universe: DimensionUniverse, dataset_types: Iterable[DatasetType] = ()):
90 self.storageClasses = StorageClassFactory()
91 self._universe = universe
92 self._datasets: dict[uuid.UUID, tuple[DatasetRef, InMemoryDatasetHandle]] = {}
93 self._metrics = ButlerMetrics()
94 self._dataset_types = {dt.name: dt for dt in dataset_types}
95 assert not any(dt.component() for dt in self._dataset_types.values()), (
96 "Dataset type definitions must not be components."
97 )
99 def __getstate__(self) -> dict[str, Any]:
100 # Pickle customization is needed because StorageClassFactory is not
101 # pickleable.
102 return {
103 "universe": self._universe,
104 "datasets": self._datasets,
105 "dataset_types": self._dataset_types,
106 }
108 def __setstate__(self, state: dict[str, Any]) -> None:
109 self.storageClasses = StorageClassFactory()
110 self._universe = state["universe"]
111 self._datasets = state["datasets"]
112 self._metrics = ButlerMetrics()
113 self._dataset_types = state["dataset_types"]
115 def get_datasets(self, dataset_type: str | None = None) -> dict[DatasetRef, object]:
116 """Return datasets that have been `put` to this butler.
118 Storage classes and corresponding Python types will match the dataset
119 type definitions provided at butler construction, which may not be the
120 same as what was `put`.
122 Parameters
123 ----------
124 dataset_type : `str`, optional
125 Dataset type name used to filter results.
127 Returns
128 -------
129 refs : `dict` [ `lsst.daf.butler.DatasetRef`, `object` ]
130 Datasets held by this butler.
131 """
132 return {
133 ref: handle.get()
134 for ref, handle in self._datasets.values()
135 if dataset_type is None or dataset_type == ref.datasetType.name
136 }
138 def isWriteable(self) -> bool:
139 return True
141 def put(self, obj: Any, ref: DatasetRef, /, *, provenance: DatasetProvenance | None = None) -> DatasetRef:
142 with self._metrics.instrument_put():
143 assert not ref.isComponent(), "Component dataset types cannot be put."
144 if ref.id in self._datasets:
145 # Some butlers may not raise reliably when a dataset already
146 # exists (it's hard to be rigorous in parallel given different
147 # guarantees provided by storage), but we don't want code to
148 # rely on it not being an error, so we want a test butler to
149 # always complain.
150 raise ConflictingDefinitionError(f"Dataset {ref} already exists.")
151 if (repo_dataset_type := self._dataset_types.get(ref.datasetType.name)) is None:
152 raise MissingDatasetTypeError(f"Dataset type {ref.datasetType.name!r} not recognized.")
153 repo_dataset_type.storageClass.coerce_type(obj)
154 self._datasets[ref.id] = (
155 ref.overrideStorageClass(repo_dataset_type.storageClass),
156 InMemoryDatasetHandle(
157 obj,
158 storageClass=repo_dataset_type.storageClass,
159 dataId=ref.dataId,
160 copy=True,
161 ),
162 )
163 return ref
165 def get(
166 self,
167 ref: DatasetRef,
168 /,
169 *,
170 parameters: dict[str, Any] | None = None,
171 storageClass: StorageClass | str | None = None,
172 ) -> Any:
173 with self._metrics.instrument_get():
174 if storageClass is None:
175 storageClass = ref.datasetType.storageClass
176 elif isinstance(storageClass, str):
177 storageClass = self.storageClasses.getStorageClass(storageClass)
178 if entry := self._datasets.get(ref.id):
179 (ref, handle) = entry
180 return handle.get(
181 component=ref.datasetType.component(), parameters=parameters, storageClass=storageClass
182 )
183 raise FileNotFoundError(f"Dataset {ref} does not exist.")
185 def stored_many(self, refs: Iterable[DatasetRef]) -> dict[DatasetRef, bool]:
186 return {ref: ref.id in self._datasets for ref in refs}
188 def pruneDatasets(
189 self,
190 refs: Iterable[DatasetRef],
191 *,
192 disassociate: bool = True,
193 unstore: bool = False,
194 tags: Iterable[str] = (),
195 purge: bool = False,
196 ) -> None:
197 for ref in refs:
198 self._datasets.pop(ref.id, None)
200 @property
201 def _datastore(self) -> Any:
202 raise NotImplementedError("This test butler does not have a datastore.")
204 @_datastore.setter # demanded by MyPy since we declare it to be an instance attribute in LimitedButler.
205 def _datastore(self, value: Any) -> None:
206 raise NotImplementedError("This test butler does not have a datastore.")
208 @property
209 def dimensions(self) -> DimensionUniverse:
210 return self._universe
212 def factory(self, quantum: Quantum) -> InMemoryLimitedButler:
213 """Return ``self``.
215 This method can be used as the ``limited_butler_factory`` argument to
216 `.single_quantum_executor.SingleQuantumExecutor`.
218 Parameters
219 ----------
220 quantum : `lsst.daf.butler.Quantum`
221 Ignored.
222 """
223 return self