Coverage for python / lsst / daf / butler / datastores / file_datastore / get.py: 20%
117 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:18 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:18 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28__all__ = (
29 "DatasetLocationInformation",
30 "DatastoreFileGetInformation",
31 "generate_datastore_get_information",
32 "get_dataset_as_python_object_from_get_info",
33)
35from collections.abc import Mapping
36from dataclasses import dataclass
37from typing import Any, TypeAlias
39from lsst.daf.butler import (
40 DatasetRef,
41 FileDescriptor,
42 FileIntegrityError,
43 Formatter,
44 FormatterV1inV2,
45 FormatterV2,
46 Location,
47 StorageClass,
48)
49from lsst.daf.butler.datastore.cache_manager import AbstractDatastoreCacheManager
50from lsst.daf.butler.datastore.generic_base import post_process_get
51from lsst.daf.butler.datastore.stored_file_info import StoredFileInfo
52from lsst.utils.introspection import get_instance_of
53from lsst.utils.logging import getLogger
55log = getLogger(__name__)
57DatasetLocationInformation: TypeAlias = tuple[Location, StoredFileInfo]
60@dataclass(frozen=True)
61class DatastoreFileGetInformation:
62 """Collection of useful parameters needed to retrieve a file from
63 a Datastore.
64 """
66 location: Location
67 """The location from which to read the dataset."""
69 formatter: Formatter | FormatterV2
70 """The `Formatter` to use to deserialize the dataset."""
72 info: StoredFileInfo
73 """Stored information about this file and its formatter."""
75 assemblerParams: Mapping[str, Any]
76 """Parameters to use for post-processing the retrieved dataset."""
78 formatterParams: Mapping[str, Any]
79 """Parameters that were understood by the associated formatter."""
81 component: str | None
82 """The component to be retrieved (can be `None`)."""
84 readStorageClass: StorageClass
85 """The `StorageClass` of the dataset being read."""
88def generate_datastore_get_information(
89 fileLocations: list[DatasetLocationInformation],
90 *,
91 ref: DatasetRef,
92 parameters: Mapping[str, Any] | None,
93 readStorageClass: StorageClass | None = None,
94) -> list[DatastoreFileGetInformation]:
95 """Process parameters and instantiate formatters for in preparation for
96 retrieving an artifact and converting it to a Python object.
98 Parameters
99 ----------
100 fileLocations : `list` [`DatasetLocationInformation`]
101 List of file locations for this artifact and their associated datastore
102 records.
103 ref : `DatasetRef`
104 The registry information associated with this artifact.
105 parameters : `~collections.abc.Mapping` [`str`, `typing.Any`]
106 `StorageClass` and `Formatter` parameters.
107 readStorageClass : `StorageClass` | `None`, optional
108 The StorageClass to use when ultimately returning the resulting object
109 from the get. Defaults to the `StorageClass` specified by ``ref``.
111 Returns
112 -------
113 getInfo : `list` [`DatastoreFileGetInformation`]
114 The parameters needed to retrieve each file.
115 """
116 if readStorageClass is None:
117 readStorageClass = ref.datasetType.storageClass
119 # Is this a component request?
120 refComponent = ref.datasetType.component()
122 disassembled = len(fileLocations) > 1
123 fileGetInfo = []
124 for location, storedFileInfo in fileLocations:
125 # The storage class used to write the file
126 writeStorageClass = storedFileInfo.storageClass
127 thisReadStorageClass = readStorageClass
129 # If this has been disassembled we need read to match the write
130 # except for if a component has specified an override.
131 if disassembled and storedFileInfo.component != refComponent:
132 thisReadStorageClass = writeStorageClass
134 formatter = get_instance_of(
135 storedFileInfo.formatter,
136 FileDescriptor(
137 location,
138 readStorageClass=thisReadStorageClass,
139 storageClass=writeStorageClass,
140 parameters=parameters,
141 component=storedFileInfo.component,
142 ),
143 dataId=ref.dataId,
144 ref=ref,
145 )
147 formatterParams, notFormatterParams = formatter.segregate_parameters()
149 # Of the remaining parameters, extract the ones supported by
150 # this StorageClass (for components not all will be handled)
151 assemblerParams = thisReadStorageClass.filterParameters(notFormatterParams)
153 # The ref itself could be a component if the dataset was
154 # disassembled by butler, or we disassembled in datastore and
155 # components came from the datastore records
156 component = storedFileInfo.component if storedFileInfo.component else refComponent
158 fileGetInfo.append(
159 DatastoreFileGetInformation(
160 location,
161 formatter,
162 storedFileInfo,
163 assemblerParams,
164 formatterParams,
165 component,
166 thisReadStorageClass,
167 )
168 )
170 return fileGetInfo
173def _read_artifact_into_memory(
174 getInfo: DatastoreFileGetInformation,
175 ref: DatasetRef,
176 cache_manager: AbstractDatastoreCacheManager,
177 isComponent: bool = False,
178) -> Any:
179 """Read the artifact from datastore into in memory object.
181 Parameters
182 ----------
183 getInfo : `DatastoreFileGetInformation`
184 Information about the artifact within the datastore.
185 ref : `DatasetRef`
186 The registry information associated with this artifact.
187 isComponent : `bool`
188 Flag to indicate if a component is being read from this artifact.
189 cache_manager : `AbstractDatastoreCacheManager`
190 The cache manager to use for caching retrieved files
192 Returns
193 -------
194 inMemoryDataset : `object`
195 The artifact as a python object.
196 """
197 location = getInfo.location
198 uri = location.uri
199 log.debug("Accessing data from %s", uri)
201 # Cannot recalculate checksum but can compare size as a quick check
202 # Do not do this if the size is negative since that indicates
203 # we do not know.
204 recorded_size = getInfo.info.file_size
206 formatter = getInfo.formatter
208 if isinstance(formatter, Formatter):
209 formatter = FormatterV1inV2(
210 formatter.file_descriptor,
211 ref=ref,
212 formatter=formatter,
213 write_parameters=formatter.write_parameters,
214 write_recipes=formatter.write_recipes,
215 )
217 assert isinstance(formatter, FormatterV2)
219 try:
220 result = formatter.read(
221 component=getInfo.component if isComponent else None,
222 expected_size=recorded_size,
223 cache_manager=cache_manager,
224 )
225 except (FileNotFoundError, FileIntegrityError):
226 # This is expected for the case where the resource is missing
227 # or the information we passed to the formatter about the file size
228 # is incorrect.
229 # Allow them to propagate up.
230 raise
231 except Exception as e:
232 # For clarity, include any notes that may have been added by the
233 # formatter to this new exception.
234 notes = "\n".join(getattr(e, "__notes__", []))
235 if notes:
236 notes = "\n" + notes
237 raise ValueError(
238 f"Failure from formatter '{formatter.name()}' for dataset {ref.id}"
239 f" ({ref.datasetType.name} from {uri}): {e}{notes}"
240 ) from e
242 return post_process_get(
243 result, ref.datasetType.storageClass, getInfo.assemblerParams, isComponent=isComponent
244 )
247def get_dataset_as_python_object_from_get_info(
248 allGetInfo: list[DatastoreFileGetInformation],
249 *,
250 ref: DatasetRef,
251 parameters: Mapping[str, Any] | None,
252 cache_manager: AbstractDatastoreCacheManager,
253) -> Any:
254 """Retrieve an artifact from storage and return it as a Python object.
256 Parameters
257 ----------
258 allGetInfo : `list` [`DatastoreFileGetInformation`]
259 Pre-processed information about each file associated with this
260 artifact.
261 ref : `DatasetRef`
262 The registry information associated with this artifact.
263 parameters : `~collections.abc.Mapping` [`str`, `typing.Any`]
264 `StorageClass` and `Formatter` parameters.
265 cache_manager : `AbstractDatastoreCacheManager`
266 The cache manager to use for caching retrieved files.
268 Returns
269 -------
270 python_object : `typing.Any`
271 The retrieved artifact, converted to a Python object according to the
272 `StorageClass` specified in ``ref``.
273 """
274 refStorageClass = ref.datasetType.storageClass
275 refComponent = ref.datasetType.component()
276 # Create mapping from component name to related info
277 allComponents = {i.component: i for i in allGetInfo}
279 # By definition the dataset is disassembled if we have more
280 # than one record for it.
281 isDisassembled = len(allGetInfo) > 1
283 # Look for the special case where we are disassembled but the
284 # component is a derived component that was not written during
285 # disassembly. For this scenario we need to check that the
286 # component requested is listed as a derived component for the
287 # composite storage class
288 isDisassembledReadOnlyComponent = False
289 if isDisassembled and refComponent:
290 # The composite storage class should be accessible through
291 # the component dataset type
292 compositeStorageClass = ref.datasetType.parentStorageClass
294 # In the unlikely scenario where the composite storage
295 # class is not known, we can only assume that this is a
296 # normal component. If that assumption is wrong then the
297 # branch below that reads a persisted component will fail
298 # so there is no need to complain here.
299 if compositeStorageClass is not None:
300 isDisassembledReadOnlyComponent = refComponent in compositeStorageClass.derivedComponents
302 if isDisassembled and not refComponent:
303 # This was a disassembled dataset spread over multiple files
304 # and we need to put them all back together again.
305 # Read into memory and then assemble
307 # Check that the supplied parameters are suitable for the type read
308 refStorageClass.validateParameters(parameters)
310 # We want to keep track of all the parameters that were not used
311 # by formatters. We assume that if any of the component formatters
312 # use a parameter that we do not need to apply it again in the
313 # assembler.
314 usedParams = set()
316 components: dict[str, Any] = {}
317 for getInfo in allGetInfo:
318 # assemblerParams are parameters not understood by the
319 # associated formatter.
320 usedParams.update(set(getInfo.formatterParams))
322 component = getInfo.component
324 if component is None:
325 raise RuntimeError(f"Internal error in datastore assembly of {ref}")
327 # We do not want the formatter to think it's reading
328 # a component though because it is really reading a
329 # standalone dataset -- always tell reader it is not a
330 # component.
331 components[component] = _read_artifact_into_memory(
332 getInfo, ref.makeComponentRef(component), cache_manager, isComponent=False
333 )
335 inMemoryDataset = ref.datasetType.storageClass.delegate().assemble(components)
337 # Any unused parameters will have to be passed to the assembler
338 if parameters:
339 unusedParams = {k: v for k, v in parameters.items() if k not in usedParams}
340 else:
341 unusedParams = {}
343 # Process parameters
344 return ref.datasetType.storageClass.delegate().handleParameters(
345 inMemoryDataset, parameters=unusedParams
346 )
348 elif isDisassembledReadOnlyComponent:
349 compositeStorageClass = ref.datasetType.parentStorageClass
350 if compositeStorageClass is None:
351 raise RuntimeError(
352 f"Unable to retrieve derived component '{refComponent}' since"
353 "no composite storage class is available."
354 )
356 if refComponent is None:
357 # Mainly for mypy
358 raise RuntimeError("Internal error in datastore: component can not be None here")
360 # Assume that every derived component can be calculated by
361 # forwarding the request to a single read/write component.
362 # Rather than guessing which rw component is the right one by
363 # scanning each for a derived component of the same name,
364 # we ask the storage class delegate directly which one is best to
365 # use.
366 compositeDelegate = compositeStorageClass.delegate()
367 forwardedComponent = compositeDelegate.selectResponsibleComponent(refComponent, set(allComponents))
369 # Select the relevant component
370 rwInfo = allComponents[forwardedComponent]
372 # For now assume that read parameters are validated against
373 # the real component and not the requested component
374 forwardedStorageClass = rwInfo.formatter.file_descriptor.readStorageClass
375 forwardedStorageClass.validateParameters(parameters)
377 # Unfortunately the FileDescriptor inside the formatter will have
378 # the wrong write storage class so we need to create a new one
379 # given the immutability constraint.
380 writeStorageClass = rwInfo.info.storageClass
382 # We may need to put some thought into parameters for read
383 # components but for now forward them on as is
384 readFormatter = type(rwInfo.formatter)(
385 FileDescriptor(
386 rwInfo.location,
387 readStorageClass=refStorageClass,
388 storageClass=writeStorageClass,
389 parameters=parameters,
390 component=forwardedComponent,
391 ),
392 dataId=ref.dataId,
393 ref=ref,
394 )
396 # The assembler can not receive any parameter requests for a
397 # derived component at this time since the assembler will
398 # see the storage class of the derived component and those
399 # parameters will have to be handled by the formatter on the
400 # forwarded storage class.
401 assemblerParams: dict[str, Any] = {}
403 # Need to created a new info that specifies the derived
404 # component and associated storage class
405 readInfo = DatastoreFileGetInformation(
406 rwInfo.location,
407 readFormatter,
408 rwInfo.info,
409 assemblerParams,
410 {},
411 refComponent,
412 refStorageClass,
413 )
415 return _read_artifact_into_memory(readInfo, ref, cache_manager, isComponent=True)
417 else:
418 # Single file request or component from that composite file
419 for lookup in (refComponent, None):
420 if lookup in allComponents:
421 getInfo = allComponents[lookup]
422 break
423 else:
424 raise FileNotFoundError(f"Component {refComponent} not found for ref {ref} in datastore")
426 # Do not need the component itself if already disassembled
427 if isDisassembled:
428 isComponent = False
429 else:
430 isComponent = getInfo.component is not None
432 # For a disassembled component we can validate parameters against
433 # the component storage class directly
434 if isDisassembled:
435 refStorageClass.validateParameters(parameters)
436 else:
437 # For an assembled composite this could be a derived
438 # component derived from a real component. The validity
439 # of the parameters is not clear. For now validate against
440 # the composite storage class
441 getInfo.formatter.file_descriptor.storageClass.validateParameters(parameters)
443 return _read_artifact_into_memory(getInfo, ref, cache_manager, isComponent=isComponent)