Coverage for python / lsst / daf / butler / _formatter.py: 23%
531 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-24 08:17 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-24 08:17 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30__all__ = (
31 "FileIntegrityError",
32 "Formatter",
33 "FormatterFactory",
34 "FormatterNotImplementedError",
35 "FormatterParameter",
36 "FormatterV1inV2",
37 "FormatterV2",
38)
40import contextlib
41import copy
42import logging
43import os
44import zipfile
45from abc import ABCMeta, abstractmethod
46from collections.abc import Callable, Iterator, Mapping, Set
47from typing import TYPE_CHECKING, Any, BinaryIO, ClassVar, TypeAlias, final
49from lsst.resources import ResourceHandleProtocol, ResourcePath
50from lsst.utils.introspection import get_full_type_name
51from lsst.utils.timer import time_this
53from ._config import Config
54from ._config_support import LookupKey, processLookupConfigs
55from ._file_descriptor import FileDescriptor
56from ._location import Location
57from ._rubin.temporary_for_ingest import TemporaryForIngest
58from .dimensions import DataCoordinate, DimensionUniverse
59from .mapping_factory import MappingFactory
61log = logging.getLogger(__name__)
63if TYPE_CHECKING:
64 from ._dataset_provenance import DatasetProvenance
65 from ._dataset_ref import DatasetRef
66 from ._dataset_type import DatasetType
67 from ._storage_class import StorageClass
68 from .datastore.cache_manager import AbstractDatastoreCacheManager
70 # Define a new special type for functions that take "entity"
71 Entity: TypeAlias = DatasetType | DatasetRef | StorageClass | str
74class FileIntegrityError(RuntimeError):
75 """The file metadata is inconsistent with the metadata supplied by
76 the datastore.
77 """
80class FormatterNotImplementedError(NotImplementedError):
81 """Formatter does not implement the specific read or write method
82 that is being requested.
83 """
86class FormatterV2:
87 """Interface for reading and writing datasets using URIs.
89 The formatters are associated with a particular `StorageClass`.
91 Parameters
92 ----------
93 file_descriptor : `FileDescriptor`, optional
94 Identifies the file to read or write, and the associated storage
95 classes and parameter information.
96 ref : `DatasetRef`
97 The dataset associated with this formatter. Should not be a component
98 dataset ref.
99 write_parameters : `dict`, optional
100 Parameters to control how the dataset is serialized.
101 write_recipes : `dict`, optional
102 Detailed write recipes indexed by recipe name.
104 **kwargs
105 Additional arguments that will be ignored but allow for
106 `Formatter` V1 parameters to be given.
108 Notes
109 -----
110 A `FormatterV2` author should not override the default `read` or `write`
111 method. Instead for read the formatter author should implement one or all
112 of `read_from_stream`, `read_from_uri`, or `read_from_local_file`. The
113 method `read_from_uri` will always be attempted first and could be more
114 efficient (since it allows the possibility for a subset of the data file to
115 be accessed remotely when parameters or components are specified) but it
116 will not update the local cache. If the entire contents of the remote file
117 are being accessed (no component or parameters defined) and the dataset
118 would be cached, `read_from_uri` will be called with a local file. If the
119 file is remote and the parameters that have been included are known to be
120 more efficiently handled with a local file, the `read_from_uri` method can
121 return `NotImplemented` to indicate that a local file should be given
122 instead.
124 Similarly for writes, the `write` method can not be subclassed. Instead
125 the formatter author should implement `to_bytes` or `write_local_file`.
126 For local URIs the system will always call `write_local_file` first (which
127 by default will call `to_bytes`) to ensure atomic writes are implemented.
128 For remote URIs with local caching disabled, `to_bytes` will be called
129 first and the remote updated directly. If the dataset should be cached
130 it will always be written locally first.
131 """
133 unsupported_parameters: ClassVar[Set[str] | None] = frozenset()
134 """Set of read parameters not understood by this `Formatter`. An empty set
135 means all parameters are supported. `None` indicates that no parameters
136 are supported. These parameters should match those defined in the storage
137 class definition. (`frozenset`).
138 """
140 supported_write_parameters: ClassVar[Set[str] | None] = None
141 """Parameters understood by this formatter that can be used to control
142 how a dataset is serialized. `None` indicates that no parameters are
143 supported."""
145 default_extension: ClassVar[str | None] = None
146 """Default extension to use when writing a file.
148 Can be `None` if the extension is determined dynamically. Use the
149 `get_write_extension` method to get the actual extension to use.
150 """
152 supported_extensions: ClassVar[Set[str]] = frozenset()
153 """Set of all extensions supported by this formatter.
155 Any extension assigned to the ``default_extension`` property will be
156 automatically included in the list of supported extensions.
157 """
159 can_read_from_uri: ClassVar[bool] = False
160 """Declare whether `read_from_uri` is available to this formatter."""
162 can_read_from_stream: ClassVar[bool] = False
163 """Declare whether `read_from_stream` is available to this formatter."""
165 can_read_from_local_file: ClassVar[bool] = False
166 """Declare whether `read_from_local_file` is available to this
167 formatter."""
169 def __init__(
170 self,
171 file_descriptor: FileDescriptor,
172 *,
173 ref: DatasetRef,
174 write_parameters: Mapping[str, Any] | None = None,
175 write_recipes: Mapping[str, Any] | None = None,
176 # Compatibility parameters. Unused in v2.
177 **kwargs: Any,
178 ):
179 if not isinstance(file_descriptor, FileDescriptor):
180 raise TypeError("File descriptor must be a FileDescriptor")
182 self._file_descriptor = file_descriptor
184 if ref.isComponent():
185 # It is a component ref for disassembled composites.
186 ref = ref.makeCompositeRef()
187 self._dataset_ref = ref
189 # Check that the write parameters are allowed
190 if write_parameters:
191 if self.supported_write_parameters is None:
192 raise ValueError(
193 f"This formatter does not accept any write parameters. Got: {', '.join(write_parameters)}"
194 )
195 else:
196 given = set(write_parameters)
197 unknown = given - self.supported_write_parameters
198 if unknown:
199 s = "s" if len(unknown) != 1 else ""
200 unknownStr = ", ".join(f"'{u}'" for u in unknown)
201 raise ValueError(f"This formatter does not accept parameter{s} {unknownStr}")
203 self._write_parameters = write_parameters
204 self._write_recipes = self.validate_write_recipes(write_recipes)
206 def __str__(self) -> str:
207 return f"{self.name()}@{self.file_descriptor.location.uri}"
209 def __repr__(self) -> str:
210 return f"{self.name()}({self.file_descriptor!r})"
212 @property
213 def file_descriptor(self) -> FileDescriptor:
214 """File descriptor associated with this formatter
215 (`FileDescriptor`).
216 """
217 return self._file_descriptor
219 @property
220 def data_id(self) -> DataCoordinate:
221 """Return Data ID associated with this formatter (`DataCoordinate`)."""
222 return self._dataset_ref.dataId
224 @property
225 def dataset_ref(self) -> DatasetRef:
226 """Return Dataset Ref associated with this formatter (`DatasetRef`)."""
227 return self._dataset_ref
229 @property
230 def write_parameters(self) -> Mapping[str, Any]:
231 """Parameters to use when writing out datasets."""
232 if self._write_parameters is not None:
233 return self._write_parameters
234 return {}
236 @property
237 def write_recipes(self) -> Mapping[str, Any]:
238 """Detailed write Recipes indexed by recipe name."""
239 if self._write_recipes is not None:
240 return self._write_recipes
241 return {}
243 def get_write_extension(self) -> str:
244 """Extension to use when writing a file."""
245 default_extension = self.default_extension
246 extension = default_extension if default_extension is not None else ""
247 return extension
249 def can_accept(self, in_memory_dataset: Any) -> bool:
250 """Indicate whether this formatter can accept the specified
251 storage class directly.
253 Parameters
254 ----------
255 in_memory_dataset : `object`
256 The dataset that is to be written.
258 Returns
259 -------
260 accepts : `bool`
261 If `True` the formatter can write data of this type without
262 requiring datastore to convert it. If `False` the datastore
263 will attempt to convert before writing.
265 Notes
266 -----
267 The base class always returns `False` even if the given type is an
268 instance of the storage class type. This will result in a storage
269 class conversion no-op but also allows mocks with mocked storage
270 classes to work properly.
271 """
272 return False
274 @classmethod
275 def validate_write_recipes(cls, recipes: Mapping[str, Any] | None) -> Mapping[str, Any] | None:
276 """Validate supplied recipes for this formatter.
278 The recipes are supplemented with default values where appropriate.
280 Parameters
281 ----------
282 recipes : `dict`
283 Recipes to validate.
285 Returns
286 -------
287 validated : `dict`
288 Validated recipes.
290 Raises
291 ------
292 RuntimeError
293 Raised if validation fails. The default implementation raises
294 if any recipes are given.
295 """
296 if recipes:
297 raise RuntimeError(f"This formatter does not understand these write recipes: {recipes}")
298 return recipes
300 @classmethod
301 def name(cls) -> str:
302 """Return the fully qualified name of the formatter.
304 Returns
305 -------
306 name : `str`
307 Fully-qualified name of formatter class.
308 """
309 return get_full_type_name(cls)
311 def _is_disassembled(self) -> bool:
312 """Return `True` if this formatter is looking at a disassembled
313 component.
314 """
315 return self.file_descriptor.component is not None
317 def _check_resource_size(self, uri: ResourcePath, recorded_size: int, resource_size: int) -> None:
318 """Compare the recorded size with the resource size.
320 The given URI will not be accessed.
321 """
322 if recorded_size >= 0 and resource_size != recorded_size:
323 raise FileIntegrityError(
324 "Integrity failure in Datastore. "
325 f"Size of file {uri} ({resource_size}) "
326 f"does not match size recorded in registry of {recorded_size}"
327 )
329 def _get_cache_ref(self) -> DatasetRef:
330 """Get the `DatasetRef` to use for cache look ups.
332 Returns
333 -------
334 ref : `lsst.daf.butler.DatasetRef`
335 The dataset ref to use when looking in the cache.
336 For single-file dataset this will be the dataset ref directly.
337 If this is disassembled we need the component and the component
338 will be in the `FileDescriptor`.
339 """
340 if self.file_descriptor.component is None:
341 cache_ref = self.dataset_ref
342 else:
343 cache_ref = self.dataset_ref.makeComponentRef(self.file_descriptor.component)
344 return cache_ref
346 def _ensure_cache(
347 self, cache_manager: AbstractDatastoreCacheManager | None = None
348 ) -> AbstractDatastoreCacheManager:
349 """Return the cache if given else return a null cache."""
350 if cache_manager is None:
351 # Circular import avoidance.
352 from .datastore.cache_manager import DatastoreDisabledCacheManager
354 cache_manager = DatastoreDisabledCacheManager(None, None)
355 return cache_manager
357 def read(
358 self,
359 component: str | None = None,
360 expected_size: int = -1,
361 cache_manager: AbstractDatastoreCacheManager | None = None,
362 ) -> Any:
363 """Read a Dataset.
365 Parameters
366 ----------
367 component : `str`, optional
368 Component to read from the file. Only used if the `StorageClass`
369 for reading differed from the `StorageClass` used to write the
370 file.
371 expected_size : `int`, optional
372 If known, the expected size of the resource to read. This can be
373 used for verification or to decide whether to do a direct read or a
374 file download. ``-1`` indicates the file size is not known.
375 cache_manager : `AbstractDatastoreCacheManager`
376 A cache manager to use to allow a formatter to cache a remote file
377 locally or read a cached file that is already local.
379 Returns
380 -------
381 in_memory_dataset : `object`
382 The requested Dataset.
384 Notes
385 -----
386 This method should not be subclassed. Instead formatter subclasses
387 should re-implement the specific ``read_from_*`` methods as
388 appropriate. Each of these methods has a corresponding class property
389 that must be `True` for the method to be called.
391 The priority for reading is:
393 * `read_from_uri`
394 * `read_from_stream`
395 * `read_from_local_file`
396 * `read_from_uri` (but with a local file)
398 Any of these methods can return `NotImplemented` if there is a desire
399 to skip to the next one in the list. If a dataset is being requested
400 with no component, no parameters, and it should also be added to the
401 local cache, the first two calls will be skipped (unless
402 `read_from_stream` is the only implemented read method) such that a
403 local file will be used.
405 A Formatter can also read a file from within a Zip file if the
406 URI associated with the `FileDescriptor` corresponds to a file with
407 a `.zip` extension and a URI fragment of the form
408 ``zip-path={path_in_zip}``. When reading a file from within a Zip
409 file the priority for reading is:
411 * `read_from_stream`
412 * `read_from_local_file`
413 * `read_from_uri`
415 There are multiple cases that must be handled for reading:
417 For a single file:
419 * No component requested, read the whole file.
420 * Component requested, optionally read the component efficiently,
421 else read the whole file and extract the component.
422 * Derived component requested, read whole file or read relevant
423 component and derive.
425 Disassembled Composite:
427 * The file to read here is the component itself. Formatter only knows
428 about this one component file. Should be no component specified
429 in the ``read`` call but the `FileDescriptor` will know which
430 component this is.
431 * A derived component. The file to read is a component but not the
432 specified component. The caching needs the component from which
433 it's derived.
435 Raises
436 ------
437 lsst.daf.butler.FormatterNotImplementedError
438 Raised if no implementations were found that could read this
439 resource.
440 """
441 # If the file to read is a ZIP file with a fragment requesting
442 # a file within the ZIP file, it is no longer possible to use the
443 # direct read from URI option and the contents of the Zip file must
444 # be extracted.
445 uri = self.file_descriptor.location.uri
446 if uri.fragment and uri.unquoted_fragment.startswith("zip-path="):
447 _, _, path_in_zip = uri.unquoted_fragment.partition("=")
449 # Open the Zip file using ResourcePath.
450 with uri.open("rb") as fd:
451 with zipfile.ZipFile(fd) as zf: # type: ignore
452 if self.can_read_from_stream:
453 with contextlib.closing(zf.open(path_in_zip)) as zip_fd:
454 result = self.read_from_stream(zip_fd, component, expected_size=expected_size)
456 if result is not NotImplemented:
457 return result
459 # For now for both URI and local file options we retrieve
460 # the bytes to a temporary local and use that.
461 _, suffix = os.path.splitext(path_in_zip)
462 with ResourcePath.temporary_uri(suffix=suffix) as tmp_uri:
463 tmp_uri.write(zf.read(path_in_zip))
465 if self.can_read_from_local_file:
466 result = self.read_from_local_file(
467 tmp_uri.ospath, component, expected_size=expected_size
468 )
469 if result is not NotImplemented:
470 return result
471 if self.can_read_from_uri:
472 result = self.read_from_uri(tmp_uri, component, expected_size=expected_size)
473 if result is not NotImplemented:
474 return result
476 raise FormatterNotImplementedError(
477 f"Formatter {self.name()} could not read the file at {uri} using any method."
478 )
480 # If the there are no parameters, no component request, and
481 # the file should be cached, it is better to defer the read_from_uri
482 # to use the local file and populate the cache.
483 prefer_local = False
484 if (
485 component is None
486 and not self.file_descriptor.parameters
487 and self._ensure_cache(cache_manager).should_be_cached(self._get_cache_ref())
488 ):
489 prefer_local = True
491 # First see if the formatter can support direct remote read from
492 # a URI. This can be called later from the local path. If it returns
493 # NotImplemented the formatter decided on its own that local
494 # reads are preferred.
495 if not prefer_local and self.can_read_from_uri:
496 result = self.read_directly_from_possibly_cached_uri(
497 component, expected_size, cache_manager=cache_manager
498 )
499 if result is not NotImplemented:
500 return result
502 # Some formatters might want to be able to read directly from
503 # an open file stream. This is preferred over forcing a download
504 # to local file system unless a file read option is available and we
505 # want to store it in the cache because the whole file is being read.
506 if self.can_read_from_stream and not (
507 prefer_local and (self.can_read_from_uri or self.can_read_from_local_file)
508 ):
509 result = self.read_from_possibly_cached_stream(
510 component, expected_size, cache_manager=cache_manager
511 )
512 if result is not NotImplemented:
513 return result
515 # Finally, try to read the local file.
516 if self.can_read_from_local_file or self.can_read_from_uri:
517 result = self.read_from_possibly_cached_local_file(
518 component, expected_size, cache_manager=cache_manager
519 )
520 if result is not NotImplemented:
521 return result
523 raise FormatterNotImplementedError(
524 f"Formatter {self.name()} could not read the file at {uri} using any method."
525 )
527 def _read_from_possibly_cached_location_no_cache_write(
528 self,
529 callback: Callable[[ResourcePath, str | None, int], Any],
530 component: str | None = None,
531 expected_size: int = -1,
532 *,
533 cache_manager: AbstractDatastoreCacheManager | None = None,
534 ) -> Any:
535 """Read from the cache and call payload without writing to cache."""
536 cache_manager = self._ensure_cache(cache_manager)
538 uri = self.file_descriptor.location.uri
539 cache_ref = self._get_cache_ref()
541 # The component for log messages is either the component requested
542 # explicitly or the component from the file descriptor.
543 log_component = component if component is not None else self.file_descriptor.component
545 with cache_manager.find_in_cache(cache_ref, uri.getExtension()) as cached_file:
546 if cached_file is not None:
547 desired_uri = cached_file
548 msg = f" (cached version of {uri})"
549 else:
550 desired_uri = uri
551 msg = ""
553 if desired_uri.isLocal:
554 # Do not spend the time doing a slow size() call to a remote
555 # resource.
556 self._check_resource_size(desired_uri, expected_size, desired_uri.size())
558 with time_this(
559 log,
560 msg="Reading%s from file handle %s%s with formatter %s",
561 args=(
562 f" component {log_component}" if log_component else "",
563 desired_uri,
564 msg,
565 self.name(),
566 ),
567 ):
568 return callback(desired_uri, component, expected_size)
570 def read_from_possibly_cached_stream(
571 self,
572 component: str | None = None,
573 expected_size: int = -1,
574 *,
575 cache_manager: AbstractDatastoreCacheManager | None = None,
576 ) -> Any:
577 """Read from a stream, checking for possible presence in local cache.
579 Parameters
580 ----------
581 component : `str`, optional
582 Component to read from the file. Only used if the `StorageClass`
583 for reading differed from the `StorageClass` used to write the
584 file.
585 expected_size : `int`, optional
586 If known, the expected size of the resource to read. This can be
587 used for verification or to decide whether to do a direct read or a
588 file download. ``-1`` indicates the file size is not known.
589 cache_manager : `AbstractDatastoreCacheManager`
590 A cache manager to use to allow a formatter to check if there is
591 a copy of the file in the local cache.
593 Returns
594 -------
595 in_memory_dataset : `object` or `NotImplemented`
596 The requested Dataset or an indication that the read mode was
597 not implemented.
599 Notes
600 -----
601 Calls `read_from_stream` but will first check the datastore cache
602 in case the file is present locally. This method will not download
603 a file to the local cache.
604 """
606 def _open_stream(uri: ResourcePath, comp: str | None, size: int = -1) -> Any:
607 with uri.open("rb") as fd:
608 return self.read_from_stream(fd, comp, expected_size=size)
610 return self._read_from_possibly_cached_location_no_cache_write(
611 _open_stream, component, expected_size=expected_size, cache_manager=cache_manager
612 )
614 def read_directly_from_possibly_cached_uri(
615 self,
616 component: str | None = None,
617 expected_size: int = -1,
618 *,
619 cache_manager: AbstractDatastoreCacheManager | None = None,
620 ) -> Any:
621 """Read from arbitrary URI, checking for possible presence in local
622 cache.
624 Parameters
625 ----------
626 component : `str`, optional
627 Component to read from the file. Only used if the `StorageClass`
628 for reading differed from the `StorageClass` used to write the
629 file.
630 expected_size : `int`, optional
631 If known, the expected size of the resource to read. This can be
632 ``-1`` indicates the file size is not known.
633 cache_manager : `AbstractDatastoreCacheManager`
634 A cache manager to use to allow a formatter to check if there is
635 a copy of the file in the local cache.
637 Returns
638 -------
639 in_memory_dataset : `object` or `NotImplemented`
640 The requested Dataset or an indication that the read mode was
641 not implemented.
643 Notes
644 -----
645 This method will first check the datastore cache
646 in case the file is present locally. This method will not cache a
647 remote dataset and will only do a size check for local files to avoid
648 unnecessary round trips to a remote server.
650 The URI will be read by calling `read_from_uri`.
651 """
653 def _open_uri(uri: ResourcePath, comp: str | None, size: int = -1) -> Any:
654 return self.read_from_uri(uri, comp, expected_size=size)
656 return self._read_from_possibly_cached_location_no_cache_write(
657 _open_uri, component, expected_size=expected_size, cache_manager=cache_manager
658 )
660 def read_from_possibly_cached_local_file(
661 self,
662 component: str | None = None,
663 expected_size: int = -1,
664 *,
665 cache_manager: AbstractDatastoreCacheManager | None = None,
666 ) -> Any:
667 """Read a dataset ensuring that a local file is used, checking the
668 cache for it.
670 Parameters
671 ----------
672 component : `str`, optional
673 Component to read from the file. Only used if the `StorageClass`
674 for reading differed from the `StorageClass` used to write the
675 file.
676 expected_size : `int`, optional
677 If known, the expected size of the resource to read. This can be
678 used for verification or to decide whether to do a direct read or a
679 file download. ``-1`` indicates the file size is not known.
680 cache_manager : `AbstractDatastoreCacheManager`
681 A cache manager to use to allow a formatter to cache a remote file
682 locally or read a cached file that is already local.
684 Returns
685 -------
686 in_memory_dataset : `object` or `NotImplemented`
687 The requested Dataset or an indication that the read mode was
688 not implemented.
690 Notes
691 -----
692 The file will be downloaded and cached if it is a remote resource.
693 The file contents will be read using `read_from_local_file` or
694 `read_from_uri`, with preference given to the former.
695 """
696 cache_manager = self._ensure_cache(cache_manager)
697 uri = self.file_descriptor.location.uri
699 # Need to have something we can look up in the cache.
700 cache_ref = self._get_cache_ref()
702 # The component for log messages is either the component requested
703 # explicitly or the component from the file descriptor.
704 log_component = component if component is not None else self.file_descriptor.component
706 result = NotImplemented
708 # Ensure we have a local file.
709 with cache_manager.find_in_cache(cache_ref, uri.getExtension()) as cached_file:
710 if cached_file is not None:
711 msg = f"(via cache read of remote file {uri})"
712 uri = cached_file
713 else:
714 msg = ""
716 with uri.as_local() as local_uri:
717 self._check_resource_size(self.file_descriptor.location.uri, expected_size, local_uri.size())
718 can_be_cached = False
719 if uri != local_uri:
720 # URI was remote and file was downloaded
721 cache_msg = ""
723 if cache_manager.should_be_cached(cache_ref):
724 # In this scenario we want to ask if the downloaded
725 # file should be cached but we should not cache
726 # it until after we've used it (to ensure it can't
727 # be expired whilst we are using it).
728 can_be_cached = True
730 # Say that it is "likely" to be cached because
731 # if the formatter read fails we will not be
732 # caching this file.
733 cache_msg = " and likely cached"
735 msg = f"(via download to local file{cache_msg})"
737 with time_this(
738 log,
739 msg="Reading%s from location %s %s with formatter %s",
740 args=(
741 f" component {log_component}" if log_component else "",
742 uri,
743 msg,
744 self.name(),
745 ),
746 ):
747 if self.can_read_from_local_file:
748 result = self.read_from_local_file(
749 local_uri.ospath, component=component, expected_size=expected_size
750 )
751 if result is NotImplemented and self.can_read_from_uri:
752 # If the direct URI reader was skipped earlier and
753 # there is no explicit local file implementation, pass
754 # in the guaranteed local URI to the generic reader.
755 result = self.read_from_uri(
756 local_uri, component=component, expected_size=expected_size
757 )
759 # File was read successfully so can move to cache.
760 # Also move to cache even if NotImplemented was returned.
761 if can_be_cached:
762 cache_manager.move_to_cache(local_uri, cache_ref)
764 return result
766 def read_from_uri(self, uri: ResourcePath, component: str | None = None, expected_size: int = -1) -> Any:
767 """Read a dataset from a URI that can be local or remote.
769 Parameters
770 ----------
771 uri : `lsst.resources.ResourcePath`
772 URI to use to read the dataset. This URI can be local or remote
773 and can refer to the actual resource or to a locally cached file.
774 component : `str` or `None`, optional
775 The component to be read from the dataset.
776 expected_size : `int`, optional
777 If known, the expected size of the resource to read. This can be
778 ``-1`` indicates the file size is not known.
780 Returns
781 -------
782 in_memory_dataset : `object` or `NotImplemented`
783 The Python object read from the resource or `NotImplemented`.
785 Raises
786 ------
787 lsst.daf.butler.FormatterNotImplementedError
788 Raised if there is no support for direct reads from a, possibly,
789 remote URI.
791 Notes
792 -----
793 This method is only called if the class property
794 ``can_read_from_uri`` is set to `True`.
796 It is possible that a cached local file will be given to this method
797 even if it was originally a remote URI. This can happen if the original
798 write resulted in the file being added to the local cache.
800 If the full file is being read this file will not be added to the
801 local cache. Consider returning `NotImplemented` in
802 this situation, for example if there are no parameters or component
803 specified, and allowing the system to fall back to calling
804 `read_from_local_file` (which will populate the cache if configured
805 to do so).
806 """
807 return NotImplemented
809 def read_from_stream(
810 self, stream: BinaryIO | ResourceHandleProtocol, component: str | None = None, expected_size: int = -1
811 ) -> Any:
812 """Read from an open file descriptor.
814 Parameters
815 ----------
816 stream : `lsst.resources.ResourceHandleProtocol` or \
817 `typing.BinaryIO`
818 File stream to use to read the dataset.
819 component : `str` or `None`, optional
820 The component to be read from the dataset.
821 expected_size : `int`, optional
822 If known, the expected size of the resource to read. This can be
823 ``-1`` indicates the file size is not known.
825 Returns
826 -------
827 in_memory_dataset : `object` or `NotImplemented`
828 The Python object read from the stream or `NotImplemented`.
830 Notes
831 -----
832 Only called if the class property ``can_read_from_stream`` is `True`.
833 """
834 return NotImplemented
836 def read_from_local_file(self, path: str, component: str | None = None, expected_size: int = -1) -> Any:
837 """Read a dataset from a URI guaranteed to refer to the local file
838 system.
840 Parameters
841 ----------
842 path : `str`
843 Path to a local file that should be read.
844 component : `str` or `None`, optional
845 The component to be read from the dataset.
846 expected_size : `int`, optional
847 If known, the expected size of the resource to read. This can be
848 ``-1`` indicates the file size is not known.
850 Returns
851 -------
852 in_memory_dataset : `object` or `NotImplemented`
853 The Python object read from the resource or `NotImplemented`.
855 Raises
856 ------
857 lsst.daf.butler.FormatterNotImplementedError
858 Raised if there is no implementation written to read data
859 from a local file.
861 Notes
862 -----
863 This method will only be called if the class property
864 ``can_read_from_local_file`` is `True` and other options were not
865 used.
866 """
867 return NotImplemented
869 def add_provenance(
870 self, in_memory_dataset: Any, /, *, provenance: DatasetProvenance | None = None
871 ) -> Any:
872 """Add provenance to the dataset.
874 Parameters
875 ----------
876 in_memory_dataset : `object`
877 The dataset to serialize.
878 provenance : `~lsst.daf.butler.DatasetProvenance` or `None`, optional
879 Provenance to attach to dataset.
881 Returns
882 -------
883 dataset_to_write : `object`
884 The dataset to use for serialization. Can be the same object as
885 given.
887 Notes
888 -----
889 The base class implementation returns the given object unchanged.
890 """
891 return in_memory_dataset
893 @final
894 def write(
895 self,
896 in_memory_dataset: Any,
897 /,
898 *,
899 cache_manager: AbstractDatastoreCacheManager | None = None,
900 provenance: DatasetProvenance | None = None,
901 ) -> None:
902 """Write a Dataset.
904 Parameters
905 ----------
906 in_memory_dataset : `object`
907 The Dataset to serialize.
908 cache_manager : `AbstractDatastoreCacheManager`
909 A cache manager to use to allow a formatter to cache the written
910 file.
911 provenance : `DatasetProvenance` | `None`, optional
912 Provenance to attach to the file being written.
914 Returns
915 -------
916 None
918 Raises
919 ------
920 lsst.daf.butler.FormatterNotImplementedError
921 Raised if the formatter subclass has not implemented
922 `write_local_file` and `to_bytes` was not called.
923 Exception
924 Raised if there is an error serializing the dataset to disk.
926 Notes
927 -----
928 The intent is for subclasses to implement either `to_bytes` or
929 `write_local_file` or both and not to subclass this method.
930 """
931 # Ensure we are using the correct file extension.
932 uri = self.file_descriptor.location.uri.updatedExtension(self.get_write_extension())
934 # Attach any provenance to the dataset. This could involve returning
935 # a different object.
936 in_memory_dataset = self.add_provenance(in_memory_dataset, provenance=provenance)
938 written = self.write_direct(in_memory_dataset, uri, cache_manager)
939 if not written:
940 self.write_locally_then_move(in_memory_dataset, uri, cache_manager)
942 def write_direct(
943 self,
944 in_memory_dataset: Any,
945 uri: ResourcePath,
946 cache_manager: AbstractDatastoreCacheManager | None = None,
947 ) -> bool:
948 """Serialize and write directly to final location.
950 Parameters
951 ----------
952 in_memory_dataset : `object`
953 The Dataset to serialize.
954 uri : `lsst.resources.ResourcePath`
955 URI to use when writing the serialized dataset.
956 cache_manager : `AbstractDatastoreCacheManager`
957 A cache manager to use to allow a formatter to cache the written
958 file.
960 Returns
961 -------
962 written : `bool`
963 Flag to indicate whether the direct write did happen.
965 Raises
966 ------
967 Exception
968 Raised if there was a failure from serializing to bytes that
969 was not `~lsst.daf.butler.FormatterNotImplementedError`.
971 Notes
972 -----
973 This method will call `to_bytes` to serialize the in-memory dataset
974 and then will call the `~lsst.resources.ResourcePath.write` method
975 directly.
977 If the dataset should be cached or is local the file will not be
978 written and the method will return `False`. This is because local URIs
979 should be written to a temporary file name and then renamed to allow
980 atomic writes. That path is handled by `write_locally_then_move`
981 through `write_local_file`) and is preferred over this method being
982 subclassed and the atomic write re-implemented.
983 """
984 cache_manager = self._ensure_cache(cache_manager)
986 # For remote URIs some datasets can be serialized directly
987 # to bytes and sent to the remote datastore without writing a
988 # file. If the dataset is intended to be saved to the cache
989 # a file is always written and direct write to the remote
990 # datastore is bypassed.
991 data_written = False
992 if not uri.isLocal and not cache_manager.should_be_cached(self._get_cache_ref()):
993 # Remote URI that is not cached so can write directly.
994 try:
995 serialized_dataset = self.to_bytes(in_memory_dataset)
996 except FormatterNotImplementedError:
997 # Fallback to the file writing option.
998 pass
999 except Exception as e:
1000 e.add_note(
1001 f"Failed to serialize dataset {self.dataset_ref} of "
1002 f"type {get_full_type_name(in_memory_dataset)} to bytes."
1003 )
1004 raise
1005 else:
1006 log.debug("Writing bytes directly to %s", uri)
1007 uri.write(serialized_dataset, overwrite=True)
1008 log.debug("Successfully wrote bytes directly to %s", uri)
1009 data_written = True
1010 return data_written
1012 def write_locally_then_move(
1013 self,
1014 in_memory_dataset: Any,
1015 uri: ResourcePath,
1016 cache_manager: AbstractDatastoreCacheManager | None = None,
1017 ) -> None:
1018 """Write file to file system and then move to final location.
1020 Parameters
1021 ----------
1022 in_memory_dataset : `object`
1023 The Dataset to serialize.
1024 uri : `lsst.resources.ResourcePath`
1025 URI to use when writing the serialized dataset.
1026 cache_manager : `AbstractDatastoreCacheManager`
1027 A cache manager to use to allow a formatter to cache the written
1028 file.
1030 Raises
1031 ------
1032 lsst.daf.butler.FormatterNotImplementedError
1033 Raised if the formatter subclass has not implemented
1034 `write_local_file`.
1035 Exception
1036 Raised if there is an error serializing the dataset to disk.
1037 """
1038 cache_manager = self._ensure_cache(cache_manager)
1040 with TemporaryForIngest.make_path(uri) as temporary_uri:
1041 # Need to configure the formatter to write to a different
1042 # location and that needs us to overwrite internals
1043 log.debug("Writing dataset to temporary location at %s", temporary_uri)
1045 # Assumes that if write_local_file is not subclassed that
1046 # to_bytes will be called using the base class definition.
1047 try:
1048 self.write_local_file(in_memory_dataset, temporary_uri)
1049 except Exception as e:
1050 e.add_note(
1051 f"Failed to serialize dataset {self.dataset_ref} of type"
1052 f" {get_full_type_name(in_memory_dataset)} to "
1053 f"temporary location {temporary_uri}."
1054 )
1055 raise
1057 # Use move for a local file since that becomes an efficient
1058 # os.rename. For remote resources we use copy to allow the
1059 # file to be cached afterwards.
1060 transfer = "move" if uri.isLocal else "copy"
1062 uri.transfer_from(temporary_uri, transfer=transfer, overwrite=True)
1064 if transfer == "copy":
1065 # Cache if required
1066 cache_manager.move_to_cache(temporary_uri, self._get_cache_ref())
1068 log.debug("Successfully wrote dataset to %s via a temporary file.", uri)
1070 def write_local_file(self, in_memory_dataset: Any, uri: ResourcePath) -> None:
1071 """Serialize the in-memory dataset to a local file.
1073 Parameters
1074 ----------
1075 in_memory_dataset : `object`
1076 The Python object to serialize.
1077 uri : `~lsst.resources.ResourcePath`
1078 The URI to use when writing the file.
1080 Notes
1081 -----
1082 By default this method will attempt to call `to_bytes` and then
1083 write these bytes to the file.
1085 Raises
1086 ------
1087 lsst.daf.butler.FormatterNotImplementedError
1088 Raised if the formatter subclass has not implemented this method
1089 or has failed to implement the `to_bytes` method.
1090 """
1091 log.debug("Writing bytes directly to %s", uri)
1092 uri.write(self.to_bytes(in_memory_dataset))
1093 log.debug("Successfully wrote bytes directly to %s", uri)
1095 def to_bytes(self, in_memory_dataset: Any) -> bytes:
1096 """Serialize the in-memory dataset to bytes.
1098 Parameters
1099 ----------
1100 in_memory_dataset : `object`
1101 The Python object to serialize.
1103 Returns
1104 -------
1105 serialized_dataset : `bytes`
1106 Bytes representing the serialized dataset.
1108 Raises
1109 ------
1110 lsst.daf.butler.FormatterNotImplementedError
1111 Raised if the formatter has not implemented the method. This will
1112 not cause a problem if `write_local_file` has been implemented.
1113 """
1114 raise FormatterNotImplementedError(
1115 f"This formatter can not convert {get_full_type_name(in_memory_dataset)} directly to bytes."
1116 )
1118 def make_updated_location(self, location: Location) -> Location:
1119 """Return a new `Location` updated with this formatter's extension.
1121 Parameters
1122 ----------
1123 location : `Location`
1124 The location to update.
1126 Returns
1127 -------
1128 updated : `Location`
1129 A new `Location` with a new file extension applied.
1130 """
1131 location = location.clone()
1132 # If the extension is "" the extension will be removed.
1133 location.updateExtension(self.get_write_extension())
1134 return location
1136 @classmethod
1137 def validate_extension(cls, location: Location) -> None:
1138 """Check the extension of the provided location for compatibility.
1140 Parameters
1141 ----------
1142 location : `Location`
1143 Location from which to extract a file extension.
1145 Returns
1146 -------
1147 None
1149 Raises
1150 ------
1151 ValueError
1152 Raised if the formatter does not understand this extension.
1153 """
1154 supported = set(cls.supported_extensions)
1155 default = cls.default_extension # type: ignore
1157 # If extension is implemented as an instance property it won't return
1158 # a string when called as a class property. Assume that
1159 # the supported extensions class property is complete.
1160 if default is not None and isinstance(default, str):
1161 supported.add(default)
1163 # Get the file name from the uri
1164 file = location.uri.basename()
1166 # Check that this file name ends with one of the supported extensions.
1167 # This is less prone to confusion than asking the location for
1168 # its extension and then doing a set comparison
1169 for ext in supported:
1170 if file.endswith(ext):
1171 return
1173 raise ValueError(
1174 f"Extension '{location.getExtension()}' on '{location}' "
1175 f"is not supported by Formatter '{cls.__name__}' (supports: {supported})"
1176 )
1178 def predict_path(self) -> str:
1179 """Return the path that would be returned by write.
1181 Does not write any data file.
1183 Uses the `FileDescriptor` associated with the instance.
1185 Returns
1186 -------
1187 path : `str`
1188 Path within datastore that would be associated with the location
1189 stored in this `Formatter`.
1190 """
1191 updated = self.make_updated_location(self.file_descriptor.location)
1192 return updated.pathInStore.path
1194 def segregate_parameters(self, parameters: dict[str, Any] | None = None) -> tuple[dict, dict]:
1195 """Segregate the supplied parameters.
1197 This splits the parameters into those understood by the
1198 formatter and those not understood by the formatter.
1200 Any unsupported parameters are assumed to be usable by associated
1201 assemblers.
1203 Parameters
1204 ----------
1205 parameters : `dict`, optional
1206 Parameters with values that have been supplied by the caller
1207 and which might be relevant for the formatter. If `None`
1208 parameters will be read from the registered `FileDescriptor`.
1210 Returns
1211 -------
1212 supported : `dict`
1213 Those parameters supported by this formatter.
1214 unsupported : `dict`
1215 Those parameters not supported by this formatter.
1216 """
1217 if parameters is None:
1218 parameters = self.file_descriptor.parameters
1220 if parameters is None:
1221 return {}, {}
1223 if self.unsupported_parameters is None:
1224 # Support none of the parameters
1225 return {}, parameters.copy()
1227 # Start by assuming all are supported
1228 supported = parameters.copy()
1229 unsupported = {}
1231 # And remove any we know are not supported
1232 for p in set(supported):
1233 if p in self.unsupported_parameters:
1234 unsupported[p] = supported.pop(p)
1236 return supported, unsupported
1239class Formatter(metaclass=ABCMeta):
1240 """Interface for reading and writing Datasets.
1242 The formatters are associated with a particular `StorageClass`.
1244 Parameters
1245 ----------
1246 fileDescriptor : `FileDescriptor`, optional
1247 Identifies the file to read or write, and the associated storage
1248 classes and parameter information.
1249 dataId : `DataCoordinate`
1250 Data ID associated with this formatter.
1251 writeParameters : `dict`, optional
1252 Parameters to control how the dataset is serialized.
1253 writeRecipes : `dict`, optional
1254 Detailed write recipes indexed by recipe name.
1255 **kwargs
1256 Additional parameters that can allow parameters
1257 from `FormatterV2` to be provided.
1259 Notes
1260 -----
1261 All Formatter subclasses should share the base class's constructor
1262 signature.
1263 """
1265 # Now assuming that Formatter v1 can only refer to files so can add
1266 # this property for compatibility with v2.
1267 extension: str | None = None
1268 """Default file extension to use for writing files. None means that no
1269 modifications will be made to the supplied file extension. (`str`)"""
1271 unsupportedParameters: ClassVar[Set[str] | None] = frozenset()
1272 """Set of read parameters not understood by this `Formatter`. An empty set
1273 means all parameters are supported. `None` indicates that no parameters
1274 are supported. These parameters should match those defined in the storage
1275 class definition. (`frozenset`).
1276 """
1278 supportedWriteParameters: ClassVar[Set[str] | None] = None
1279 """Parameters understood by this formatter that can be used to control
1280 how a dataset is serialized. `None` indicates that no parameters are
1281 supported."""
1283 supportedExtensions: ClassVar[Set[str]] = frozenset()
1284 """Set of all extensions supported by this formatter.
1286 Only expected to be populated by Formatters that write files. Any extension
1287 assigned to the ``extension`` property will be automatically included in
1288 the list of supported extensions."""
1290 def __init__(
1291 self,
1292 fileDescriptor: FileDescriptor,
1293 *,
1294 dataId: DataCoordinate | None = None,
1295 writeParameters: Mapping[str, Any] | None = None,
1296 writeRecipes: Mapping[str, Any] | None = None,
1297 # Allow FormatterV2 parameters to be dropped.
1298 **kwargs: Any,
1299 ):
1300 if not isinstance(fileDescriptor, FileDescriptor):
1301 raise TypeError("File descriptor must be a FileDescriptor")
1302 self._fileDescriptor = fileDescriptor
1304 if dataId is None:
1305 raise RuntimeError("dataId is now required for formatter initialization")
1306 if not isinstance(dataId, DataCoordinate):
1307 raise TypeError(f"DataId is required to be a DataCoordinate but got {type(dataId)}.")
1308 self._dataId = dataId
1310 # V2 compatibility.
1311 if writeParameters is None:
1312 writeParameters = kwargs.get("write_parameters")
1313 if writeRecipes is None:
1314 writeRecipes = kwargs.get("write_recipes")
1316 # Check that the write parameters are allowed
1317 if writeParameters:
1318 if self.supportedWriteParameters is None:
1319 raise ValueError(
1320 f"This formatter does not accept any write parameters. Got: {', '.join(writeParameters)}"
1321 )
1322 else:
1323 given = set(writeParameters)
1324 unknown = given - self.supportedWriteParameters
1325 if unknown:
1326 s = "s" if len(unknown) != 1 else ""
1327 unknownStr = ", ".join(f"'{u}'" for u in unknown)
1328 raise ValueError(f"This formatter does not accept parameter{s} {unknownStr}")
1330 self._writeParameters = writeParameters
1331 self._writeRecipes = self.validate_write_recipes(writeRecipes)
1333 def __str__(self) -> str:
1334 return f"{self.name()}@{self.fileDescriptor.location.path}"
1336 def __repr__(self) -> str:
1337 return f"{self.name()}({self.fileDescriptor!r})"
1339 @property
1340 def fileDescriptor(self) -> FileDescriptor:
1341 """File descriptor associated with this formatter
1342 (`FileDescriptor`).
1343 """
1344 return self._fileDescriptor
1346 @property
1347 def dataId(self) -> DataCoordinate:
1348 """Return Data ID associated with this formatter (`DataCoordinate`)."""
1349 return self._dataId
1351 @property
1352 def writeParameters(self) -> Mapping[str, Any]:
1353 """Parameters to use when writing out datasets."""
1354 if self._writeParameters is not None:
1355 return self._writeParameters
1356 return {}
1358 @property
1359 def writeRecipes(self) -> Mapping[str, Any]:
1360 """Detailed write Recipes indexed by recipe name."""
1361 if self._writeRecipes is not None:
1362 return self._writeRecipes
1363 return {}
1365 def can_accept(self, in_memory_dataset: Any) -> bool:
1366 """Indicate whether this formatter can accept the specified
1367 storage class directly.
1369 Parameters
1370 ----------
1371 in_memory_dataset : `object`
1372 The dataset that is to be written.
1374 Returns
1375 -------
1376 accepts : `bool`
1377 If `True` the formatter can write data of this type without
1378 requiring datastore to convert it. If `False` the datastore
1379 will attempt to convert before writing.
1381 Notes
1382 -----
1383 The base class checks that the given python type matches
1384 the python type specified for this formatter when
1385 constructed.
1386 """
1387 return isinstance(in_memory_dataset, self.file_descriptor.storageClass.pytype)
1389 @classmethod
1390 def validateWriteRecipes(cls, recipes: Mapping[str, Any] | None) -> Mapping[str, Any] | None:
1391 """Validate supplied recipes for this formatter.
1393 The recipes are supplemented with default values where appropriate.
1395 Parameters
1396 ----------
1397 recipes : `dict`
1398 Recipes to validate.
1400 Returns
1401 -------
1402 validated : `dict`
1403 Validated recipes.
1405 Raises
1406 ------
1407 RuntimeError
1408 Raised if validation fails. The default implementation raises
1409 if any recipes are given.
1410 """
1411 if recipes:
1412 raise RuntimeError(f"This formatter does not understand these writeRecipes: {recipes}")
1413 return recipes
1415 @classmethod
1416 def validate_write_recipes(cls, recipes: Mapping[str, Any] | None) -> Mapping[str, Any] | None:
1417 return cls.validateWriteRecipes(recipes)
1419 @classmethod
1420 def name(cls) -> str:
1421 """Return the fully qualified name of the formatter.
1423 Returns
1424 -------
1425 name : `str`
1426 Fully-qualified name of formatter class.
1427 """
1428 return get_full_type_name(cls)
1430 @abstractmethod
1431 def read(self, component: str | None = None) -> Any:
1432 """Read a Dataset.
1434 Parameters
1435 ----------
1436 component : `str`, optional
1437 Component to read from the file. Only used if the `StorageClass`
1438 for reading differed from the `StorageClass` used to write the
1439 file.
1441 Returns
1442 -------
1443 inMemoryDataset : `object`
1444 The requested Dataset.
1445 """
1446 raise FormatterNotImplementedError("Type does not support reading")
1448 @abstractmethod
1449 def write(self, inMemoryDataset: Any) -> None:
1450 """Write a Dataset.
1452 Parameters
1453 ----------
1454 inMemoryDataset : `object`
1455 The Dataset to store.
1456 """
1457 raise FormatterNotImplementedError("Type does not support writing")
1459 @classmethod
1460 def can_read_bytes(cls) -> bool:
1461 """Indicate if this formatter can format from bytes.
1463 Returns
1464 -------
1465 can : `bool`
1466 `True` if the `fromBytes` method is implemented.
1467 """
1468 # We have no property to read so instead try to format from a byte
1469 # and see what happens
1470 try:
1471 # We know the arguments are incompatible
1472 cls.fromBytes(cls, b"") # type: ignore
1473 except FormatterNotImplementedError:
1474 return False
1475 except Exception:
1476 # There will be problems with the bytes we are supplying so ignore
1477 pass
1478 return True
1480 def fromBytes(self, serializedDataset: bytes, component: str | None = None) -> object:
1481 """Read serialized data into a Dataset or its component.
1483 Parameters
1484 ----------
1485 serializedDataset : `bytes`
1486 Bytes object to unserialize.
1487 component : `str`, optional
1488 Component to read from the Dataset. Only used if the `StorageClass`
1489 for reading differed from the `StorageClass` used to write the
1490 file.
1492 Returns
1493 -------
1494 inMemoryDataset : `object`
1495 The requested data as a Python object. The type of object
1496 is controlled by the specific formatter.
1497 """
1498 raise FormatterNotImplementedError("Type does not support reading from bytes.")
1500 def toBytes(self, inMemoryDataset: Any) -> bytes:
1501 """Serialize the Dataset to bytes based on formatter.
1503 Parameters
1504 ----------
1505 inMemoryDataset : `object`
1506 The Python object to serialize.
1508 Returns
1509 -------
1510 serializedDataset : `bytes`
1511 Bytes representing the serialized dataset.
1512 """
1513 raise FormatterNotImplementedError("Type does not support writing to bytes.")
1515 @contextlib.contextmanager
1516 def _updateLocation(self, location: Location | None) -> Iterator[Location]:
1517 """Temporarily replace the location associated with this formatter.
1519 Parameters
1520 ----------
1521 location : `Location`
1522 New location to use for this formatter. If `None` the
1523 formatter will not change but it will still return
1524 the old location. This allows it to be used in a code
1525 path where the location may not need to be updated
1526 but the with block is still convenient.
1528 Yields
1529 ------
1530 old : `Location`
1531 The old location that will be restored.
1533 Notes
1534 -----
1535 This is an internal method that should be used with care.
1536 It may change in the future. Should be used as a context
1537 manager to restore the location when the temporary is no
1538 longer required.
1539 """
1540 old = self._fileDescriptor.location
1541 try:
1542 if location is not None:
1543 self._fileDescriptor.location = location
1544 yield old
1545 finally:
1546 if location is not None:
1547 self._fileDescriptor.location = old
1549 def make_updated_location(self, location: Location) -> Location:
1550 """Return a new `Location` updated with this formatter's extension.
1552 Parameters
1553 ----------
1554 location : `Location`
1555 The location to update.
1557 Returns
1558 -------
1559 updated : `Location`
1560 A new `Location` with a new file extension applied.
1562 Raises
1563 ------
1564 NotImplementedError
1565 Raised if there is no ``extension`` attribute associated with
1566 this formatter.
1568 Notes
1569 -----
1570 This method is available to all Formatters but might not be
1571 implemented by all formatters. It requires that a formatter set
1572 an ``extension`` attribute containing the file extension used when
1573 writing files. If ``extension`` is `None` the supplied file will
1574 not be updated. Not all formatters write files so this is not
1575 defined in the base class.
1576 """
1577 location = location.clone()
1578 try:
1579 # We are deliberately allowing extension to be undefined by
1580 # default in the base class and mypy complains.
1581 location.updateExtension(self.extension) # type:ignore
1582 except AttributeError:
1583 raise NotImplementedError("No file extension registered with this formatter") from None
1584 return location
1586 @classmethod
1587 def validate_extension(cls, location: Location) -> None:
1588 """Check the extension of the provided location for compatibility.
1590 Parameters
1591 ----------
1592 location : `Location`
1593 Location from which to extract a file extension.
1595 Returns
1596 -------
1597 None
1599 Raises
1600 ------
1601 NotImplementedError
1602 Raised if file extensions are a concept not understood by this
1603 formatter.
1604 ValueError
1605 Raised if the formatter does not understand this extension.
1607 Notes
1608 -----
1609 This method is available to all Formatters but might not be
1610 implemented by all formatters. It requires that a formatter set
1611 an ``extension`` attribute containing the file extension used when
1612 writing files. If ``extension`` is `None` only the set of supported
1613 extensions will be examined.
1614 """
1615 supported = set(cls.supportedExtensions)
1617 try:
1618 # We are deliberately allowing extension to be undefined by
1619 # default in the base class and mypy complains.
1620 default = cls.extension # type: ignore
1621 except AttributeError:
1622 raise NotImplementedError("No file extension registered with this formatter") from None
1624 # If extension is implemented as an instance property it won't return
1625 # a string when called as a class property. Assume that
1626 # the supported extensions class property is complete.
1627 if default is not None and isinstance(default, str):
1628 supported.add(default)
1630 # Get the file name from the uri
1631 file = location.uri.basename()
1633 # Check that this file name ends with one of the supported extensions.
1634 # This is less prone to confusion than asking the location for
1635 # its extension and then doing a set comparison
1636 for ext in supported:
1637 if file.endswith(ext):
1638 return
1640 raise ValueError(
1641 f"Extension '{location.getExtension()}' on '{location}' "
1642 f"is not supported by Formatter '{cls.__name__}' (supports: {supported})"
1643 )
1645 def predict_path(self) -> str:
1646 """Return the path that would be returned by write.
1648 Does not write any data file.
1650 Uses the `FileDescriptor` associated with the instance.
1652 Returns
1653 -------
1654 path : `str`
1655 Path within datastore that would be associated with the location
1656 stored in this `Formatter`.
1657 """
1658 updated = self.make_updated_location(self.fileDescriptor.location)
1659 return updated.pathInStore.path
1661 def segregate_parameters(self, parameters: dict[str, Any] | None = None) -> tuple[dict, dict]:
1662 """Segregate the supplied parameters.
1664 This splits the parameters into those understood by the
1665 formatter and those not understood by the formatter.
1667 Any unsupported parameters are assumed to be usable by associated
1668 assemblers.
1670 Parameters
1671 ----------
1672 parameters : `dict`, optional
1673 Parameters with values that have been supplied by the caller
1674 and which might be relevant for the formatter. If `None`
1675 parameters will be read from the registered `FileDescriptor`.
1677 Returns
1678 -------
1679 supported : `dict`
1680 Those parameters supported by this formatter.
1681 unsupported : `dict`
1682 Those parameters not supported by this formatter.
1683 """
1684 if parameters is None:
1685 parameters = self.fileDescriptor.parameters
1687 if parameters is None:
1688 return {}, {}
1690 if self.unsupportedParameters is None:
1691 # Support none of the parameters
1692 return {}, parameters.copy()
1694 # Start by assuming all are supported
1695 supported = parameters.copy()
1696 unsupported = {}
1698 # And remove any we know are not supported
1699 for p in set(supported):
1700 if p in self.unsupportedParameters:
1701 unsupported[p] = supported.pop(p)
1703 return supported, unsupported
1705 # Support classic V1 interface.
1706 makeUpdatedLocation = make_updated_location
1707 validateExtension = validate_extension
1708 segregateParameters = segregate_parameters
1709 predictPath = predict_path
1711 # Compatibility with V2 properties.
1712 @property
1713 def write_parameters(self) -> Mapping[str, Any]:
1714 return self.writeParameters
1716 @property
1717 def write_recipes(self) -> Mapping[str, Any]:
1718 return self.writeRecipes
1720 @property
1721 def file_descriptor(self) -> FileDescriptor:
1722 return self.fileDescriptor
1724 @property
1725 def data_id(self) -> DataCoordinate:
1726 return self.dataId
1729class FormatterFactory:
1730 """Factory for `Formatter` instances."""
1732 defaultKey = LookupKey("default")
1733 """Configuration key associated with default write parameter settings."""
1735 writeRecipesKey = LookupKey("write_recipes")
1736 """Configuration key associated with write recipes."""
1738 def __init__(self) -> None:
1739 self._mappingFactory = MappingFactory(Formatter)
1741 def __contains__(self, key: LookupKey | str) -> bool:
1742 """Indicate whether the supplied key is present in the factory.
1744 Parameters
1745 ----------
1746 key : `LookupKey`, `str` or objects with ``name`` attribute
1747 Key to use to lookup in the factory whether a corresponding
1748 formatter is present.
1750 Returns
1751 -------
1752 in : `bool`
1753 `True` if the supplied key is present in the factory.
1754 """
1755 return key in self._mappingFactory
1757 def registerFormatters(self, config: Config, *, universe: DimensionUniverse) -> None:
1758 """Bulk register formatters from a config.
1760 Parameters
1761 ----------
1762 config : `Config`
1763 ``formatters`` section of a configuration.
1764 universe : `DimensionUniverse`, optional
1765 Set of all known dimensions, used to expand and validate any used
1766 in lookup keys.
1768 Notes
1769 -----
1770 The configuration can include one level of hierarchy where an
1771 instrument-specific section can be defined to override more general
1772 template specifications. This is represented in YAML using a
1773 key of form ``instrument<name>`` which can then define templates
1774 that will be returned if a `DatasetRef` contains a matching instrument
1775 name in the data ID.
1777 The config is parsed using the function
1778 `~lsst.daf.butler.configSubset.processLookupConfigs`.
1780 The values for formatter entries can be either a simple string
1781 referring to a python type or a dict representing the formatter and
1782 parameters to be hard-coded into the formatter constructor. For
1783 the dict case the following keys are supported:
1785 - formatter: The python type to be used as the formatter class.
1786 - parameters: A further dict to be passed directly to the
1787 ``write_parameters`` Formatter constructor to seed it.
1788 These parameters are validated at instance creation and not at
1789 configuration.
1791 Additionally, a special ``default`` section can be defined that
1792 uses the formatter type (class) name as the keys and specifies
1793 default write parameters that should be used whenever an instance
1794 of that class is constructed.
1796 .. code-block:: yaml
1798 formatters:
1799 default:
1800 lsst.daf.butler.formatters.example.ExampleFormatter:
1801 max: 10
1802 min: 2
1803 comment: Default comment
1804 calexp: lsst.daf.butler.formatters.example.ExampleFormatter
1805 coadd:
1806 formatter: lsst.daf.butler.formatters.example.ExampleFormatter
1807 parameters:
1808 max: 5
1810 Any time an ``ExampleFormatter`` is constructed it will use those
1811 parameters. If an explicit entry later in the configuration specifies
1812 a different set of parameters, the two will be merged with the later
1813 entry taking priority. In the example above ``calexp`` will use
1814 the default parameters but ``coadd`` will override the value for
1815 ``max``.
1817 Formatter configuration can also include a special section describing
1818 collections of write parameters that can be accessed through a
1819 simple label. This allows common collections of options to be
1820 specified in one place in the configuration and reused later.
1821 The ``write_recipes`` section is indexed by Formatter class name
1822 and each key is the label to associate with the parameters.
1824 .. code-block:: yaml
1826 formatters:
1827 write_recipes:
1828 lsst.obs.base.formatters.fitsExposure.FixExposureFormatter:
1829 lossless:
1830 ...
1831 noCompression:
1832 ...
1834 By convention a formatter that uses write recipes will support a
1835 ``recipe`` write parameter that will refer to a recipe name in
1836 the ``write_recipes`` component. The `Formatter` will be constructed
1837 in the `FormatterFactory` with all the relevant recipes and
1838 will not attempt to filter by looking at ``write_parameters`` in
1839 advance. See the specific formatter documentation for details on
1840 acceptable recipe options.
1841 """
1842 allowed_keys = {"formatter", "parameters"}
1844 contents = processLookupConfigs(config, allow_hierarchy=True, universe=universe)
1846 # Extract any default parameter settings
1847 defaultParameters = contents.get(self.defaultKey, {})
1848 if not isinstance(defaultParameters, Mapping):
1849 raise RuntimeError(
1850 "Default formatter parameters in config can not be a single string"
1851 f" (got: {type(defaultParameters)})"
1852 )
1854 # Extract any global write recipes -- these are indexed by
1855 # Formatter class name.
1856 writeRecipes = contents.get(self.writeRecipesKey, {})
1857 if isinstance(writeRecipes, str):
1858 raise RuntimeError(
1859 f"The formatters.{self.writeRecipesKey} section must refer to a dict not '{writeRecipes}'"
1860 )
1862 for key, f in contents.items():
1863 # default is handled in a special way
1864 if key == self.defaultKey:
1865 continue
1866 if key == self.writeRecipesKey:
1867 continue
1869 # Can be a str or a dict.
1870 specificWriteParameters = {}
1871 if isinstance(f, str):
1872 formatter = f
1873 elif isinstance(f, Mapping):
1874 all_keys = set(f)
1875 unexpected_keys = all_keys - allowed_keys
1876 if unexpected_keys:
1877 raise ValueError(f"Formatter {key} uses unexpected keys {unexpected_keys} in config")
1878 if "formatter" not in f:
1879 raise ValueError(f"Mandatory 'formatter' key missing for formatter key {key}")
1880 formatter = f["formatter"]
1881 if "parameters" in f:
1882 specificWriteParameters = f["parameters"]
1883 else:
1884 raise ValueError(f"Formatter for key {key} has unexpected value: '{f}'")
1886 # Apply any default parameters for this formatter
1887 writeParameters = copy.deepcopy(defaultParameters.get(formatter, {}))
1888 writeParameters.update(specificWriteParameters)
1890 kwargs: dict[str, Any] = {}
1891 if writeParameters:
1892 kwargs["write_parameters"] = writeParameters
1894 if formatter in writeRecipes:
1895 kwargs["write_recipes"] = writeRecipes[formatter]
1897 self.registerFormatter(key, formatter, **kwargs)
1899 def getLookupKeys(self) -> set[LookupKey]:
1900 """Retrieve the look up keys for all the registry entries.
1902 Returns
1903 -------
1904 keys : `set` of `LookupKey`
1905 The keys available for matching in the registry.
1906 """
1907 return self._mappingFactory.getLookupKeys()
1909 def getFormatterClassWithMatch(
1910 self, entity: Entity
1911 ) -> tuple[LookupKey, type[Formatter | FormatterV2], dict[str, Any]]:
1912 """Get the matching formatter class along with the registry key.
1914 Parameters
1915 ----------
1916 entity : `DatasetRef`, `DatasetType`, `StorageClass`, or `str`
1917 Entity to use to determine the formatter to return.
1918 `StorageClass` will be used as a last resort if `DatasetRef`
1919 or `DatasetType` instance is provided. Supports instrument
1920 override if a `DatasetRef` is provided configured with an
1921 ``instrument`` value for the data ID.
1923 Returns
1924 -------
1925 matchKey : `LookupKey`
1926 The key that resulted in the successful match.
1927 formatter : `type`
1928 The class of the registered formatter.
1929 formatter_kwargs : `dict`
1930 Keyword arguments that are associated with this formatter entry.
1931 """
1932 names = (LookupKey(name=entity),) if isinstance(entity, str) else entity._lookupNames()
1933 matchKey, formatter, formatter_kwargs = self._mappingFactory.getClassFromRegistryWithMatch(names)
1934 log.debug(
1935 "Retrieved formatter %s from key '%s' for entity '%s'",
1936 get_full_type_name(formatter),
1937 matchKey,
1938 entity,
1939 )
1941 return matchKey, formatter, formatter_kwargs
1943 def getFormatterClass(self, entity: Entity) -> type:
1944 """Get the matching formatter class.
1946 Parameters
1947 ----------
1948 entity : `DatasetRef`, `DatasetType`, `StorageClass`, or `str`
1949 Entity to use to determine the formatter to return.
1950 `StorageClass` will be used as a last resort if `DatasetRef`
1951 or `DatasetType` instance is provided. Supports instrument
1952 override if a `DatasetRef` is provided configured with an
1953 ``instrument`` value for the data ID.
1955 Returns
1956 -------
1957 formatter : `type`
1958 The class of the registered formatter.
1959 """
1960 _, formatter, _ = self.getFormatterClassWithMatch(entity)
1961 return formatter
1963 def getFormatterWithMatch(
1964 self, entity: Entity, *args: Any, **kwargs: Any
1965 ) -> tuple[LookupKey, Formatter | FormatterV2]:
1966 """Get a new formatter instance along with the matching registry key.
1968 Parameters
1969 ----------
1970 entity : `DatasetRef`, `DatasetType`, `StorageClass`, or `str`
1971 Entity to use to determine the formatter to return.
1972 `StorageClass` will be used as a last resort if `DatasetRef`
1973 or `DatasetType` instance is provided. Supports instrument
1974 override if a `DatasetRef` is provided configured with an
1975 ``instrument`` value for the data ID.
1976 *args : `tuple`
1977 Positional arguments to use pass to the object constructor.
1978 **kwargs
1979 Keyword arguments to pass to object constructor.
1981 Returns
1982 -------
1983 matchKey : `LookupKey`
1984 The key that resulted in the successful match.
1985 formatter : `Formatter`
1986 An instance of the registered formatter.
1987 """
1988 names = (LookupKey(name=entity),) if isinstance(entity, str) else entity._lookupNames()
1989 matchKey, formatter = self._mappingFactory.getFromRegistryWithMatch(names, *args, **kwargs)
1990 log.debug(
1991 "Retrieved formatter %s from key '%s' for entity '%s'",
1992 get_full_type_name(formatter),
1993 matchKey,
1994 entity,
1995 )
1997 return matchKey, formatter
1999 def getFormatter(self, entity: Entity, *args: Any, **kwargs: Any) -> Formatter | FormatterV2:
2000 """Get a new formatter instance.
2002 Parameters
2003 ----------
2004 entity : `DatasetRef`, `DatasetType`, `StorageClass`, or `str`
2005 Entity to use to determine the formatter to return.
2006 `StorageClass` will be used as a last resort if `DatasetRef`
2007 or `DatasetType` instance is provided. Supports instrument
2008 override if a `DatasetRef` is provided configured with an
2009 ``instrument`` value for the data ID.
2010 *args : `tuple`
2011 Positional arguments to use pass to the object constructor.
2012 **kwargs
2013 Keyword arguments to pass to object constructor.
2015 Returns
2016 -------
2017 formatter : `Formatter`
2018 An instance of the registered formatter.
2019 """
2020 _, formatter = self.getFormatterWithMatch(entity, *args, **kwargs)
2021 return formatter
2023 def registerFormatter(
2024 self,
2025 type_: LookupKey | str | StorageClass | DatasetType,
2026 formatter: str,
2027 *,
2028 overwrite: bool = False,
2029 **kwargs: Any,
2030 ) -> None:
2031 """Register a `Formatter`.
2033 Parameters
2034 ----------
2035 type_ : `LookupKey`, `str`, `StorageClass` or `DatasetType`
2036 Type for which this formatter is to be used. If a `LookupKey`
2037 is not provided, one will be constructed from the supplied string
2038 or by using the ``name`` property of the supplied entity.
2039 formatter : `str` or class of type `Formatter`
2040 Identifies a `Formatter` subclass to use for reading and writing
2041 Datasets of this type. Can be a `Formatter` class.
2042 overwrite : `bool`, optional
2043 If `True` an existing entry will be replaced by the new value.
2044 Default is `False`.
2045 **kwargs
2046 Keyword arguments to always pass to object constructor when
2047 retrieved.
2049 Raises
2050 ------
2051 ValueError
2052 Raised if the formatter does not name a valid formatter type and
2053 ``overwrite`` is `False`.
2054 """
2055 self._mappingFactory.placeInRegistry(type_, formatter, overwrite=overwrite, **kwargs)
2058class FormatterV1inV2(FormatterV2):
2059 """An implementation of a V2 formatter that provides a compatibility
2060 interface for V1 formatters.
2062 Parameters
2063 ----------
2064 file_descriptor : `FileDescriptor`, optional
2065 Identifies the file to read or write, and the associated storage
2066 classes and parameter information. Its value can be `None` if the
2067 caller will never call `Formatter.read` or `Formatter.write`.
2068 ref : `DatasetRef`
2069 The dataset associated with this formatter. Should not be a component
2070 dataset ref.
2071 formatter : `Formatter`
2072 A version 1 `Formatter` instance. The V2 formatter layer forwards calls
2073 to this formatter.
2074 write_parameters : `dict`, optional
2075 Any parameters to be hard-coded into this instance to control how
2076 the dataset is serialized.
2077 write_recipes : `dict`, optional
2078 Detailed write Recipes indexed by recipe name.
2079 **kwargs
2080 Additional arguments that will be ignored but allow for
2081 `Formatter` V1 parameters to be given.
2082 """
2084 can_read_from_local_file = True
2085 """This formatter can read from a local file."""
2087 def __init__(
2088 self,
2089 file_descriptor: FileDescriptor,
2090 *,
2091 ref: DatasetRef,
2092 formatter: Formatter,
2093 write_parameters: Mapping[str, Any] | None = None,
2094 write_recipes: Mapping[str, Any] | None = None,
2095 # Compatibility parameters. Unused in v2.
2096 **kwargs: Any,
2097 ):
2098 if not isinstance(formatter, Formatter):
2099 raise TypeError(f"Formatter parameter was not a V1 formatter (was {type(formatter)})")
2101 # Replace the class property with instance values from this
2102 # V1 formatter so that the V2 __init__ will be able to validate.
2103 self.supported_write_parameters = formatter.supportedWriteParameters # type: ignore
2104 self._formatter = formatter
2106 super().__init__(
2107 file_descriptor, ref=ref, write_parameters=write_parameters, write_recipes=write_recipes
2108 )
2110 def get_write_extension(self) -> str:
2111 ext = self._formatter.extension
2112 return ext if ext is not None else ""
2114 def segregate_parameters(self, parameters: dict[str, Any] | None = None) -> tuple[dict, dict]:
2115 return self._formatter.segregate_parameters(parameters)
2117 def validate_write_recipes( # type: ignore
2118 self,
2119 recipes: Mapping[str, Any] | None,
2120 ) -> Mapping[str, Any] | None:
2121 # This should be a class method but a class method can not work
2122 # for a dynamic shim. Luckily the shim is only used as an instance.
2123 return self._formatter.validate_write_recipes(recipes)
2125 def read_from_local_file(self, path: str, component: str | None = None, expected_size: int = -1) -> Any:
2126 # Need to temporarily override the location since the V1 formatter
2127 # will not know anything about this local file.
2129 # V2 does not have a fromBytes equivalent.
2130 if self._formatter.can_read_bytes():
2131 with open(path, "rb") as fd:
2132 serialized_dataset = fd.read()
2133 return self._formatter.fromBytes(serialized_dataset, component=component)
2135 location = Location(None, path)
2136 with self._formatter._updateLocation(location):
2137 try:
2138 result = self._formatter.read(component=component)
2139 except NotImplementedError:
2140 # V1 raises NotImplementedError but V2 is expecting something
2141 # slightly different.
2142 return NotImplemented
2143 return result
2145 def to_bytes(self, in_memory_dataset: Any) -> bytes:
2146 try:
2147 return self._formatter.toBytes(in_memory_dataset)
2148 except NotImplementedError as e:
2149 # V1 raises NotImplementedError but V2 is expecting something
2150 # slightly different.
2151 raise FormatterNotImplementedError(str(e)) from e
2153 def write_local_file(self, in_memory_dataset: Any, uri: ResourcePath) -> None:
2154 with self._formatter._updateLocation(Location(None, uri)):
2155 try:
2156 self._formatter.write(in_memory_dataset)
2157 except NotImplementedError as e:
2158 # V1 raises NotImplementedError but V2 is expecting something
2159 # slightly different.
2160 raise FormatterNotImplementedError(str(e)) from e
2163# Type to use when allowing a Formatter or its class name
2164FormatterParameter: TypeAlias = str | type[Formatter] | Formatter | FormatterV2 | type[FormatterV2]