Coverage for python / lsst / images / _visit_image.py: 26%
270 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-23 08:41 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-23 08:41 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14__all__ = ("VisitImage", "VisitImageSerializationModel")
16import functools
17import warnings
18from collections.abc import Callable, Mapping, MutableMapping
19from types import EllipsisType
20from typing import Any, Literal, cast, overload
22import astropy.io.fits
23import astropy.units
24import astropy.wcs
25import pydantic
26from astro_metadata_translator import ObservationInfo, VisitInfoTranslator
28from ._geom import Box
29from ._image import Image, ImageSerializationModel
30from ._mask import Mask, MaskPlane, MaskSchema, MaskSerializationModel, get_legacy_visit_image_mask_planes
31from ._masked_image import MaskedImage, MaskedImageSerializationModel
32from ._observation_summary_stats import ObservationSummaryStats
33from ._transforms import DetectorFrame, Projection, ProjectionAstropyView, ProjectionSerializationModel
34from .fits import FitsOpaqueMetadata
35from .psfs import (
36 GaussianPointSpreadFunction,
37 GaussianPSFSerializationModel,
38 PiffSerializationModel,
39 PiffWrapper,
40 PointSpreadFunction,
41 PSFExSerializationModel,
42 PSFExWrapper,
43)
44from .serialization import ArchiveReadError, InputArchive, MetadataValue, OutputArchive
47def _obs_info_from_md(md: MutableMapping[str, Any], visit_info: Any = None) -> ObservationInfo:
48 # Try to get an ObservationInfo from the primary header as if
49 # it's a raw header. Else fallback.
50 try:
51 obs_info = ObservationInfo.from_header(md, quiet=True)
52 except ValueError:
53 # Not known translator. Must fall back to visit info. If we have
54 # an actual VisitInfo, serialize it since we know that it will be
55 # complete.
56 if visit_info is not None:
57 from lsst.afw.image import setVisitInfoMetadata
58 from lsst.daf.base import PropertyList
60 pl = PropertyList()
61 setVisitInfoMetadata(pl, visit_info)
62 # Merge so that we still have access to butler provenance.
63 md.update(pl)
65 # Try the given header looking for VisitInfo hints.
66 # We get lots of warnings if nothing can be found. Currently
67 # no way to disable those without capturing them.
68 obs_info = ObservationInfo.from_header(md, translator_class=VisitInfoTranslator, quiet=True)
69 return obs_info
72def _update_obs_info_from_legacy(
73 obs_info: ObservationInfo, detector: Any = None, filter_label: Any = None
74) -> ObservationInfo:
75 extra_md: dict[str, str | int] = {}
77 if filter_label is not None and filter_label.hasBandLabel():
78 extra_md["physical_filter"] = filter_label.physicalLabel
80 # Fill in detector metadata, check for consistency.
81 # ObsInfo detector name and group can not be derived from
82 # the getName() information without knowing how the components
83 # are separated.
84 if detector is not None:
85 detector_md = {
86 "detector_num": detector.getId(),
87 "detector_serial": detector.getSerial(),
88 "detector_unique_name": detector.getName(),
89 }
90 extra_md.update(detector_md)
92 obs_info_updates: dict[str, str | int] = {}
93 for k, v in extra_md.items():
94 current = getattr(obs_info, k)
95 if current is None:
96 obs_info_updates[k] = v
97 continue
98 if current != v:
99 raise RuntimeError(
100 f"ObservationInfo contains value for '{k}' that is inconsistent "
101 f"with given legacy object: {v} != {current}"
102 )
104 if obs_info_updates:
105 obs_info = obs_info.model_copy(update=obs_info_updates)
106 return obs_info
109class VisitImage(MaskedImage):
110 """A calibrated single-visit image.
112 Parameters
113 ----------
114 image
115 The main image plane. If this has a `Projection`, it will be used
116 for all planes unless a ``projection`` is passed separately.
117 mask
118 A bitmask image that annotates the main image plane. Must have the
119 same bounding box as ``image`` if provided. Any attached projection
120 is replaced (possibly by `None`).
121 variance
122 The per-pixel uncertainty of the main image as an image of variance
123 values. Must have the same bounding box as ``image`` if provided, and
124 its units must be the square of ``image.unit`` or `None`.
125 Values default to ``1.0``. Any attached projection is replaced
126 (possibly by `None`).
127 mask_schema
128 Schema for the mask plane. Must be provided if and only if ``mask`` is
129 not provided.
130 projection
131 Projection that maps the pixel grid to the sky. Can only be `None` if
132 a projection is already attached to ``image``.
133 obs_info
134 General information about this visit in standardized form.
135 summary_stats
136 Optional summary statistics associated with this visit.
137 psf
138 Point-spread function model for this image, or an exception explaining
139 why it could not be read (to be raised if the PSF is requested later).
140 metadata
141 Arbitrary flexible metadata to associate with the image.
142 """
144 def __init__(
145 self,
146 image: Image,
147 *,
148 mask: Mask | None = None,
149 variance: Image | None = None,
150 mask_schema: MaskSchema | None = None,
151 projection: Projection[DetectorFrame] | None = None,
152 obs_info: ObservationInfo | None = None,
153 summary_stats: ObservationSummaryStats | None = None,
154 psf: PointSpreadFunction | ArchiveReadError,
155 metadata: dict[str, MetadataValue] | None = None,
156 ):
157 super().__init__(
158 image,
159 mask=mask,
160 variance=variance,
161 mask_schema=mask_schema,
162 projection=projection,
163 obs_info=obs_info,
164 metadata=metadata,
165 )
166 if self.image.unit is None:
167 raise TypeError("The image component of a VisitImage must have units.")
168 if self.image.projection is None:
169 raise TypeError("The projection component of a VisitImage cannot be None.")
170 if self.image.obs_info is None:
171 raise TypeError("The observation info component of a VisitImage cannot be None.")
172 if not isinstance(self.image.projection.pixel_frame, DetectorFrame):
173 raise TypeError("The projection's pixel frame must be a DetectorFrame for VisitImage.")
174 self._summary_stats = summary_stats
175 self._psf = psf
177 @property
178 def unit(self) -> astropy.units.UnitBase:
179 """The units of the image plane (`astropy.units.Unit`)."""
180 return cast(astropy.units.UnitBase, super().unit)
182 @property
183 def projection(self) -> Projection[DetectorFrame]:
184 """The projection that maps the pixel grid to the sky
185 (`Projection` [`DetectorFrame`]).
186 """
187 return cast(Projection[DetectorFrame], super().projection)
189 @property
190 def obs_info(self) -> ObservationInfo:
191 """General information about this observation in standard form.
192 (`~astro_metadata_translator.ObservationInfo`).
193 """
194 obs_info = self.image.obs_info
195 assert obs_info is not None
196 return obs_info
198 @property
199 def astropy_wcs(self) -> ProjectionAstropyView:
200 """An Astropy WCS for the pixel arrays (`ProjectionAstropyView`).
202 Notes
203 -----
204 As expected for Astropy WCS objects, this defines pixel coordinates
205 such that the first row and column in the arrays are ``(0, 0)``, not
206 ``bbox.start``, as is the case for `projection`.
208 This object satisfies the `astropy.wcs.wcsapi.BaseHighLevelWCS` and
209 `astropy.wcs.wcsapi.BaseLowLevelWCS` interfaces, but it is not an
210 `astropy.wcs.WCS` (use `fits_wcs` for that).
211 """
212 return cast(ProjectionAstropyView, super().astropy_wcs)
214 @property
215 def summary_stats(self) -> ObservationSummaryStats | None:
216 """Optional summary statistics for this observation
217 (`ObservationSummaryStats`).
218 """
219 return self._summary_stats
221 @property
222 def psf(self) -> PointSpreadFunction:
223 """The point-spread function model for this image
224 (`.psfs.PointSpreadFunction`).
225 """
226 if isinstance(self._psf, ArchiveReadError):
227 raise self._psf
228 return self._psf
230 def __getitem__(self, bbox: Box | EllipsisType) -> VisitImage:
231 if bbox is ...:
232 return self
233 super().__getitem__(bbox)
234 return self._transfer_metadata(
235 VisitImage(
236 self.image[bbox],
237 mask=self.mask[bbox],
238 variance=self.variance[bbox],
239 projection=self.projection,
240 psf=self.psf,
241 obs_info=self.obs_info,
242 summary_stats=self.summary_stats,
243 ),
244 bbox=bbox,
245 )
247 def __str__(self) -> str:
248 return f"VisitImage({self.image!s}, {list(self.mask.schema.names)})"
250 def __repr__(self) -> str:
251 return f"VisitImage({self.image!r}, mask_schema={self.mask.schema!r})"
253 def copy(self) -> VisitImage:
254 """Deep-copy the visit image."""
255 return self._transfer_metadata(
256 VisitImage(
257 image=self._image.copy(),
258 mask=self._mask.copy(),
259 variance=self._variance.copy(),
260 psf=self._psf,
261 obs_info=self.obs_info,
262 summary_stats=self.summary_stats,
263 ),
264 copy=True,
265 )
267 def serialize(self, archive: OutputArchive[Any]) -> VisitImageSerializationModel:
268 serialized_image = archive.serialize_direct(
269 "image", functools.partial(self.image.serialize, save_projection=False)
270 )
271 serialized_mask = archive.serialize_direct(
272 "mask", functools.partial(self.mask.serialize, save_projection=False)
273 )
274 serialized_variance = archive.serialize_direct(
275 "variance", functools.partial(self.variance.serialize, save_projection=False)
276 )
277 serialized_projection = archive.serialize_direct("projection", self.projection.serialize)
278 serialized_psf: PiffSerializationModel | PSFExSerializationModel | GaussianPSFSerializationModel
279 match self._psf:
280 # MyPy is able to figure things out here with this match statement,
281 # but not a single isinstance check on both types.
282 case PiffWrapper():
283 serialized_psf = archive.serialize_direct("psf", self._psf.serialize)
284 case PSFExWrapper():
285 serialized_psf = archive.serialize_direct("psf", self._psf.serialize)
286 case GaussianPointSpreadFunction():
287 serialized_psf = archive.serialize_direct("psf", self._psf.serialize)
288 case _:
289 raise TypeError(
290 f"Cannot serialize VisitImage with unrecognized PSF type {type(self._psf).__name__}."
291 )
292 return VisitImageSerializationModel(
293 image=serialized_image,
294 mask=serialized_mask,
295 variance=serialized_variance,
296 projection=serialized_projection,
297 psf=serialized_psf,
298 obs_info=self.obs_info,
299 summary_stats=self.summary_stats,
300 metadata=self.metadata,
301 )
303 # Type-checkers want the model argument to only require
304 # MaskedImageSerializationModel[Any], and they'd be absolutely right if
305 # this were a regular instance method. But whether Liskov substitution
306 # applies to classmethods and staticmethods is sort of context-dependent,
307 # and here we do not want it to.
308 @staticmethod
309 def deserialize(
310 model: VisitImageSerializationModel[Any], # type: ignore[override]
311 archive: InputArchive[Any],
312 *,
313 bbox: Box | None = None,
314 ) -> VisitImage:
315 masked_image = MaskedImage.deserialize(model, archive, bbox=bbox)
316 psf = model.deserialize_psf(archive)
317 projection = Projection.deserialize(model.projection, archive)
318 return VisitImage(
319 masked_image.image,
320 mask=masked_image.mask,
321 variance=masked_image.variance,
322 psf=psf,
323 projection=projection,
324 obs_info=model.obs_info,
325 summary_stats=model.summary_stats,
326 )._finish_deserialize(model)
328 @staticmethod
329 def _get_archive_tree_type[P: pydantic.BaseModel](
330 pointer_type: type[P],
331 ) -> type[VisitImageSerializationModel[P]]:
332 """Return the serialization model type for this object for an archive
333 type that uses the given pointer type.
334 """
335 return VisitImageSerializationModel[pointer_type] # type: ignore
337 # write_fits and read_fits inherited from MaskedImage.
339 @staticmethod
340 def from_legacy(
341 legacy: Any,
342 *,
343 unit: astropy.units.Unit | None = None,
344 plane_map: Mapping[str, MaskPlane] | None = None,
345 instrument: str | None = None,
346 visit: int | None = None,
347 ) -> VisitImage:
348 """Convert from an `lsst.afw.image.Exposure` instance.
350 Parameters
351 ----------
352 legacy
353 An `lsst.afw.image.Exposure` instance that will share image and
354 variance (but not mask) pixel data with the returned object.
355 unit
356 Units of the image. If not provided, the ``BUNIT`` metadata
357 key will be used, if available.
358 plane_map
359 A mapping from legacy mask plane name to the new plane name and
360 description. If `None` (default)
361 `get_legacy_visit_image_mask_planes` is used.
362 instrument
363 Name of the instrument. Extracted from the metadata if not
364 provided.
365 visit
366 ID of the visit. Extracted from the metadata if not provided.
367 """
368 if plane_map is None:
369 plane_map = get_legacy_visit_image_mask_planes()
370 md = legacy.getMetadata()
371 obs_info = _obs_info_from_md(md, visit_info=legacy.info.getVisitInfo())
372 instrument = _extract_or_check_header(
373 "LSST BUTLER DATAID INSTRUMENT", instrument, md, obs_info.instrument, str
374 )
375 visit = _extract_or_check_header("LSST BUTLER DATAID VISIT", visit, md, None, int)
376 unit = _extract_or_check_header(
377 "BUNIT", unit, md, None, lambda x: astropy.units.Unit(x, format="fits")
378 )
379 legacy_wcs = legacy.getWcs()
380 if legacy_wcs is None:
381 raise ValueError("Exposure does not have a SkyWcs.")
382 legacy_detector = legacy.getDetector()
383 if legacy_detector is None:
384 raise ValueError("Exposure does not have a Detector.")
385 detector_bbox = Box.from_legacy(legacy_detector.getBBox())
387 # Update the ObservationInfo from other components.
388 obs_info = _update_obs_info_from_legacy(obs_info, legacy_detector, legacy.info.getFilter())
390 opaque_fits_metadata = FitsOpaqueMetadata()
391 primary_header = astropy.io.fits.Header()
392 with warnings.catch_warnings():
393 # Silence warnings about long keys becoming HIERARCH.
394 warnings.simplefilter("ignore", category=astropy.io.fits.verify.VerifyWarning)
395 primary_header.update(md.toOrderedDict())
396 opaque_fits_metadata.extract_legacy_primary_header(primary_header)
397 projection = Projection.from_legacy(
398 legacy_wcs,
399 DetectorFrame(
400 instrument=instrument,
401 visit=visit,
402 detector=legacy_detector.getId(),
403 bbox=detector_bbox,
404 ),
405 )
406 legacy_psf = legacy.getPsf()
407 if legacy_psf is None:
408 raise ValueError("Exposure file does not have a Psf.")
409 psf = PointSpreadFunction.from_legacy(legacy_psf, bounds=detector_bbox)
410 masked_image = MaskedImage.from_legacy(legacy.getMaskedImage(), unit=unit, plane_map=plane_map)
411 legacy_summary_stats = legacy.info.getSummaryStats()
412 result = VisitImage(
413 image=masked_image.image.view(unit=unit),
414 mask=masked_image.mask,
415 variance=masked_image.variance,
416 projection=projection,
417 psf=psf,
418 obs_info=obs_info,
419 summary_stats=(
420 ObservationSummaryStats.from_legacy(legacy_summary_stats)
421 if legacy_summary_stats is not None
422 else None
423 ),
424 )
426 result._opaque_metadata = opaque_fits_metadata
427 return result
429 @overload # type: ignore[override]
430 @staticmethod
431 def read_legacy( 431 ↛ exitline 431 didn't return from function 'read_legacy' because
432 filename: str,
433 *,
434 component: Literal["bbox"],
435 ) -> Box: ...
437 @overload
438 @staticmethod
439 def read_legacy( 439 ↛ exitline 439 didn't return from function 'read_legacy' because
440 filename: str,
441 *,
442 preserve_quantization: bool = False,
443 instrument: str | None = None,
444 visit: int | None = None,
445 component: Literal["image"],
446 ) -> Image: ...
448 @overload
449 @staticmethod
450 def read_legacy( 450 ↛ exitline 450 didn't return from function 'read_legacy' because
451 filename: str,
452 *,
453 plane_map: Mapping[str, MaskPlane] | None = None,
454 instrument: str | None = None,
455 visit: int | None = None,
456 component: Literal["mask"],
457 ) -> Mask: ...
459 @overload
460 @staticmethod
461 def read_legacy( 461 ↛ exitline 461 didn't return from function 'read_legacy' because
462 filename: str,
463 *,
464 preserve_quantization: bool = False,
465 instrument: str | None = None,
466 visit: int | None = None,
467 component: Literal["variance"],
468 ) -> Image: ...
470 @overload
471 @staticmethod
472 def read_legacy( 472 ↛ exitline 472 didn't return from function 'read_legacy' because
473 filename: str,
474 *,
475 instrument: str | None = None,
476 visit: int | None = None,
477 component: Literal["projection"],
478 ) -> Projection[DetectorFrame]: ...
480 @overload
481 @staticmethod
482 def read_legacy( 482 ↛ exitline 482 didn't return from function 'read_legacy' because
483 filename: str,
484 *,
485 component: Literal["psf"],
486 ) -> PointSpreadFunction: ...
488 @overload
489 @staticmethod
490 def read_legacy( 490 ↛ exitline 490 didn't return from function 'read_legacy' because
491 filename: str,
492 *,
493 component: Literal["obs_info"],
494 ) -> ObservationInfo: ...
496 @overload
497 @staticmethod
498 def read_legacy( 498 ↛ exitline 498 didn't return from function 'read_legacy' because
499 filename: str,
500 *,
501 component: Literal["summary_stats"],
502 ) -> ObservationSummaryStats: ...
504 @overload
505 @staticmethod
506 def read_legacy( 506 ↛ exitline 506 didn't return from function 'read_legacy' because
507 filename: str,
508 *,
509 preserve_quantization: bool = False,
510 plane_map: Mapping[str, MaskPlane] | None = None,
511 instrument: str | None = None,
512 visit: int | None = None,
513 component: None = None,
514 ) -> VisitImage: ...
516 @staticmethod
517 def read_legacy( # type: ignore[override]
518 filename: str,
519 *,
520 preserve_quantization: bool = False,
521 plane_map: Mapping[str, MaskPlane] | None = None,
522 instrument: str | None = None,
523 visit: int | None = None,
524 component: Literal[
525 "bbox", "image", "mask", "variance", "projection", "psf", "obs_info", "summary_stats"
526 ]
527 | None = None,
528 ) -> Any:
529 """Read a FITS file written by `lsst.afw.image.Exposure.writeFits`.
531 Parameters
532 ----------
533 filename
534 Full name of the file.
535 preserve_quantization
536 If `True`, ensure that writing the masked image back out again will
537 exactly preserve quantization-compressed pixel values. This causes
538 the image and variance plane arrays to be marked as read-only and
539 stores the original binary table data for those planes in memory.
540 If the `MaskedImage` is copied, the precompressed pixel values are
541 not transferred to the copy.
542 plane_map
543 A mapping from legacy mask plane name to the new plane name and
544 description. If `None` (default)
545 `get_legacy_visit_image_mask_planes` is used.
546 instrument
547 Name of the instrument. Read from the primary header if not
548 provided.
549 visit
550 ID of the visit. Read from the primary header if not
551 provided.
552 component
553 A component to read instead of the full image.
554 """
555 from lsst.afw.image import ExposureFitsReader
557 reader = ExposureFitsReader(filename)
558 if component == "bbox":
559 return Box.from_legacy(reader.readBBox())
560 legacy_detector = reader.readDetector()
561 if legacy_detector is None:
562 raise ValueError(f"Exposure file {filename!r} does not have a Detector.")
563 detector_bbox = Box.from_legacy(legacy_detector.getBBox())
564 legacy_wcs = None
565 if component in (None, "image", "mask", "variance", "projection"):
566 legacy_wcs = reader.readWcs()
567 if legacy_wcs is None:
568 raise ValueError(f"Exposure file {filename!r} does not have a SkyWcs.")
569 legacy_exposure_info = reader.readExposureInfo()
570 summary_stats = None
571 if component in (None, "summary_stats"):
572 legacy_stats = legacy_exposure_info.getSummaryStats()
573 if legacy_stats is not None:
574 summary_stats = ObservationSummaryStats.from_legacy(legacy_stats)
575 if component == "summary_stats":
576 return summary_stats
577 if component in (None, "psf"):
578 legacy_psf = reader.readPsf()
579 if legacy_psf is None:
580 raise ValueError(f"Exposure file {filename!r} does not have a Psf.")
581 psf = PointSpreadFunction.from_legacy(legacy_psf, bounds=detector_bbox)
582 if component == "psf":
583 return psf
584 assert component in (None, "image", "mask", "variance", "projection", "obs_info"), (
585 component
586 ) # for MyPy
587 with astropy.io.fits.open(filename) as hdu_list:
588 primary_header = hdu_list[0].header
589 obs_info = _obs_info_from_md(primary_header)
590 obs_info = _update_obs_info_from_legacy(obs_info, legacy_detector, reader.readFilter())
591 if component == "obs_info":
592 return obs_info
593 instrument = _extract_or_check_header(
594 "LSST BUTLER DATAID INSTRUMENT", instrument, primary_header, obs_info.instrument, str
595 )
596 visit = _extract_or_check_header("LSST BUTLER DATAID VISIT", visit, primary_header, None, int)
597 projection = Projection.from_legacy(
598 legacy_wcs,
599 DetectorFrame(
600 instrument=instrument,
601 visit=visit,
602 detector=legacy_detector.getId(),
603 bbox=detector_bbox,
604 ),
605 )
606 if component == "projection":
607 return projection
608 if plane_map is None:
609 plane_map = get_legacy_visit_image_mask_planes()
610 assert component != "psf", component # for MyPy
611 from_masked_image = MaskedImage._read_legacy_hdus(
612 hdu_list,
613 filename,
614 preserve_quantization=preserve_quantization,
615 plane_map=plane_map,
616 component=component,
617 )
618 if component is not None:
619 # This is the image, mask, or variance; attach the projection
620 # and return
621 return from_masked_image.view(projection=projection)
622 result = VisitImage(
623 from_masked_image.image,
624 mask=from_masked_image.mask,
625 variance=from_masked_image.variance,
626 projection=projection,
627 psf=psf,
628 obs_info=obs_info,
629 summary_stats=summary_stats,
630 )
631 result._opaque_metadata = from_masked_image._opaque_metadata
632 return result
635class VisitImageSerializationModel[P: pydantic.BaseModel](MaskedImageSerializationModel[P]):
636 """A Pydantic model used to represent a serialized `VisitImage`."""
638 # Inherited attributes are duplicated because that improves the docs
639 # (some limitation in the sphinx/pydantic integration), and these are
640 # important docs.
642 image: ImageSerializationModel[P] = pydantic.Field(description="The main data image.")
643 mask: MaskSerializationModel[P] = pydantic.Field(
644 description="Bitmask that annotates the main image's pixels."
645 )
646 variance: ImageSerializationModel[P] = pydantic.Field(
647 description="Per-pixel variance estimates for the main image."
648 )
649 projection: ProjectionSerializationModel[P] = pydantic.Field(
650 description="Projection that maps the pixel grid to the sky.",
651 )
652 psf: PiffSerializationModel | PSFExSerializationModel | GaussianPSFSerializationModel | Any = (
653 pydantic.Field(union_mode="left_to_right", description="PSF model for the image.")
654 )
655 obs_info: ObservationInfo = pydantic.Field(
656 description="Standardized description of visit metadata",
657 )
658 summary_stats: ObservationSummaryStats | None = pydantic.Field(
659 default=None, description="Optional summary statistics for the observation."
660 )
662 def deserialize_psf(self, archive: InputArchive[Any]) -> PointSpreadFunction | ArchiveReadError:
663 """Finish deserializing the PSF model, or *return* any exception
664 raised in the attempt.
665 """
666 try:
667 match self.psf:
668 case PiffSerializationModel():
669 return PiffWrapper.deserialize(self.psf, archive)
670 case PSFExSerializationModel():
671 return PSFExWrapper.deserialize(self.psf, archive)
672 case GaussianPSFSerializationModel():
673 return GaussianPointSpreadFunction.deserialize(self.psf, archive)
674 case _:
675 raise ArchiveReadError("PSF model type not recognized.")
676 except ArchiveReadError as err:
677 return err
680def _extract_or_check_value[T](
681 key: str,
682 given_value: T | None,
683 *sources: tuple[str, T | None],
684) -> T:
685 # Compare given value against multiple sources. If given value is not
686 # supplied return the first non-None value in the reference sources.
687 if given_value is not None:
688 for source_name, source_value in sources:
689 if source_value is not None and source_value != given_value:
690 raise ValueError(
691 f"Given value {given_value!r} does not match {source_value!r} from {source_name}."
692 )
693 if source_value is not None:
694 # Only check the first non-None source rather than checking
695 # all supplied values.
696 break
697 return given_value
699 for _, source_value in sources:
700 if source_value is not None:
701 return source_value
703 raise ValueError(f"No value found for {key}.")
706def _extract_or_check_header[T](
707 key: str, given_value: T | None, header: Any, obs_info_value: T | None, coerce: Callable[[Any], T]
708) -> T:
709 hdr_value: T | None = None
710 if (hdr_raw_value := header.get(key)) is not None:
711 hdr_value = coerce(hdr_raw_value)
712 return _extract_or_check_value(
713 key, given_value, ("ObservationInfo", obs_info_value), (f"header key {key}", hdr_value)
714 )