Coverage for python / lsst / images / _visit_image.py: 26%
267 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-25 08:35 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-25 08:35 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14__all__ = ("VisitImage", "VisitImageSerializationModel")
16import warnings
17from collections.abc import Callable, Mapping, MutableMapping
18from types import EllipsisType
19from typing import Any, Literal, cast, overload
21import astropy.io.fits
22import astropy.units
23import astropy.wcs
24import pydantic
25from astro_metadata_translator import ObservationInfo, VisitInfoTranslator
27from ._geom import Box
28from ._image import Image, ImageSerializationModel
29from ._mask import Mask, MaskPlane, MaskSchema, MaskSerializationModel, get_legacy_visit_image_mask_planes
30from ._masked_image import MaskedImage, MaskedImageSerializationModel
31from ._observation_summary_stats import ObservationSummaryStats
32from ._transforms import DetectorFrame, Projection, ProjectionAstropyView, ProjectionSerializationModel
33from .fits import FitsOpaqueMetadata
34from .psfs import (
35 GaussianPointSpreadFunction,
36 GaussianPSFSerializationModel,
37 PiffSerializationModel,
38 PiffWrapper,
39 PointSpreadFunction,
40 PSFExSerializationModel,
41 PSFExWrapper,
42)
43from .serialization import ArchiveReadError, InputArchive, MetadataValue, OutputArchive
46def _obs_info_from_md(md: MutableMapping[str, Any], visit_info: Any = None) -> ObservationInfo:
47 # Try to get an ObservationInfo from the primary header as if
48 # it's a raw header. Else fallback.
49 try:
50 obs_info = ObservationInfo.from_header(md, quiet=True)
51 except ValueError:
52 # Not known translator. Must fall back to visit info. If we have
53 # an actual VisitInfo, serialize it since we know that it will be
54 # complete.
55 if visit_info is not None:
56 from lsst.afw.image import setVisitInfoMetadata
57 from lsst.daf.base import PropertyList
59 pl = PropertyList()
60 setVisitInfoMetadata(pl, visit_info)
61 # Merge so that we still have access to butler provenance.
62 md.update(pl)
64 # Try the given header looking for VisitInfo hints.
65 # We get lots of warnings if nothing can be found. Currently
66 # no way to disable those without capturing them.
67 obs_info = ObservationInfo.from_header(md, translator_class=VisitInfoTranslator, quiet=True)
68 return obs_info
71def _update_obs_info_from_legacy(
72 obs_info: ObservationInfo, detector: Any = None, filter_label: Any = None
73) -> ObservationInfo:
74 extra_md: dict[str, str | int] = {}
76 if filter_label is not None and filter_label.hasBandLabel():
77 extra_md["physical_filter"] = filter_label.physicalLabel
79 # Fill in detector metadata, check for consistency.
80 # ObsInfo detector name and group can not be derived from
81 # the getName() information without knowing how the components
82 # are separated.
83 if detector is not None:
84 detector_md = {
85 "detector_num": detector.getId(),
86 "detector_serial": detector.getSerial(),
87 "detector_unique_name": detector.getName(),
88 }
89 extra_md.update(detector_md)
91 obs_info_updates: dict[str, str | int] = {}
92 for k, v in extra_md.items():
93 current = getattr(obs_info, k)
94 if current is None:
95 obs_info_updates[k] = v
96 continue
97 if current != v:
98 raise RuntimeError(
99 f"ObservationInfo contains value for '{k}' that is inconsistent "
100 f"with given legacy object: {v} != {current}"
101 )
103 if obs_info_updates:
104 obs_info = obs_info.model_copy(update=obs_info_updates)
105 return obs_info
108class VisitImage(MaskedImage):
109 """A calibrated single-visit image.
111 Parameters
112 ----------
113 image
114 The main image plane. If this has a `Projection`, it will be used
115 for all planes unless a ``projection`` is passed separately.
116 mask
117 A bitmask image that annotates the main image plane. Must have the
118 same bounding box as ``image`` if provided. Any attached projection
119 is replaced (possibly by `None`).
120 variance
121 The per-pixel uncertainty of the main image as an image of variance
122 values. Must have the same bounding box as ``image`` if provided, and
123 its units must be the square of ``image.unit`` or `None`.
124 Values default to ``1.0``. Any attached projection is replaced
125 (possibly by `None`).
126 mask_schema
127 Schema for the mask plane. Must be provided if and only if ``mask`` is
128 not provided.
129 projection
130 Projection that maps the pixel grid to the sky. Can only be `None` if
131 a projection is already attached to ``image``.
132 obs_info
133 General information about this visit in standardized form.
134 summary_stats
135 Optional summary statistics associated with this visit.
136 psf
137 Point-spread function model for this image, or an exception explaining
138 why it could not be read (to be raised if the PSF is requested later).
139 metadata
140 Arbitrary flexible metadata to associate with the image.
141 """
143 def __init__(
144 self,
145 image: Image,
146 *,
147 mask: Mask | None = None,
148 variance: Image | None = None,
149 mask_schema: MaskSchema | None = None,
150 projection: Projection[DetectorFrame] | None = None,
151 obs_info: ObservationInfo | None = None,
152 summary_stats: ObservationSummaryStats | None = None,
153 psf: PointSpreadFunction | ArchiveReadError,
154 metadata: dict[str, MetadataValue] | None = None,
155 ):
156 super().__init__(
157 image,
158 mask=mask,
159 variance=variance,
160 mask_schema=mask_schema,
161 projection=projection,
162 obs_info=obs_info,
163 metadata=metadata,
164 )
165 if self.image.unit is None:
166 raise TypeError("The image component of a VisitImage must have units.")
167 if self.image.projection is None:
168 raise TypeError("The projection component of a VisitImage cannot be None.")
169 if self.image.obs_info is None:
170 raise TypeError("The observation info component of a VisitImage cannot be None.")
171 if not isinstance(self.image.projection.pixel_frame, DetectorFrame):
172 raise TypeError("The projection's pixel frame must be a DetectorFrame for VisitImage.")
173 self._summary_stats = summary_stats
174 self._psf = psf
176 @property
177 def unit(self) -> astropy.units.UnitBase:
178 """The units of the image plane (`astropy.units.Unit`)."""
179 return cast(astropy.units.UnitBase, super().unit)
181 @property
182 def projection(self) -> Projection[DetectorFrame]:
183 """The projection that maps the pixel grid to the sky
184 (`Projection` [`DetectorFrame`]).
185 """
186 return cast(Projection[DetectorFrame], super().projection)
188 @property
189 def obs_info(self) -> ObservationInfo:
190 """General information about this observation in standard form.
191 (`~astro_metadata_translator.ObservationInfo`).
192 """
193 obs_info = self.image.obs_info
194 assert obs_info is not None
195 return obs_info
197 @property
198 def astropy_wcs(self) -> ProjectionAstropyView:
199 """An Astropy WCS for the pixel arrays (`ProjectionAstropyView`).
201 Notes
202 -----
203 As expected for Astropy WCS objects, this defines pixel coordinates
204 such that the first row and column in the arrays are ``(0, 0)``, not
205 ``bbox.start``, as is the case for `projection`.
207 This object satisfies the `astropy.wcs.wcsapi.BaseHighLevelWCS` and
208 `astropy.wcs.wcsapi.BaseLowLevelWCS` interfaces, but it is not an
209 `astropy.wcs.WCS` (use `fits_wcs` for that).
210 """
211 return cast(ProjectionAstropyView, super().astropy_wcs)
213 @property
214 def summary_stats(self) -> ObservationSummaryStats | None:
215 """Optional summary statistics for this observation
216 (`ObservationSummaryStats`).
217 """
218 return self._summary_stats
220 @property
221 def psf(self) -> PointSpreadFunction:
222 """The point-spread function model for this image
223 (`.psfs.PointSpreadFunction`).
224 """
225 if isinstance(self._psf, ArchiveReadError):
226 raise self._psf
227 return self._psf
229 def __getitem__(self, bbox: Box | EllipsisType) -> VisitImage:
230 if bbox is ...:
231 return self
232 super().__getitem__(bbox)
233 return self._transfer_metadata(
234 VisitImage(
235 self.image[bbox],
236 mask=self.mask[bbox],
237 variance=self.variance[bbox],
238 projection=self.projection,
239 psf=self.psf,
240 obs_info=self.obs_info,
241 summary_stats=self.summary_stats,
242 ),
243 bbox=bbox,
244 )
246 def __str__(self) -> str:
247 return f"VisitImage({self.image!s}, {list(self.mask.schema.names)})"
249 def __repr__(self) -> str:
250 return f"VisitImage({self.image!r}, mask_schema={self.mask.schema!r})"
252 def copy(self) -> VisitImage:
253 """Deep-copy the visit image."""
254 return self._transfer_metadata(
255 VisitImage(
256 image=self._image.copy(),
257 mask=self._mask.copy(),
258 variance=self._variance.copy(),
259 psf=self._psf,
260 obs_info=self.obs_info,
261 summary_stats=self.summary_stats,
262 ),
263 copy=True,
264 )
266 def serialize(self, archive: OutputArchive[Any]) -> VisitImageSerializationModel:
267 masked_image_model = super().serialize(archive)
268 serialized_psf: PiffSerializationModel | PSFExSerializationModel | GaussianPSFSerializationModel
269 match self._psf:
270 # MyPy is able to figure things out here with this match statement,
271 # but not a single isinstance check on both types.
272 case PiffWrapper():
273 serialized_psf = archive.serialize_direct("psf", self._psf.serialize)
274 case PSFExWrapper():
275 serialized_psf = archive.serialize_direct("psf", self._psf.serialize)
276 case GaussianPointSpreadFunction():
277 serialized_psf = archive.serialize_direct("psf", self._psf.serialize)
278 case _:
279 raise TypeError(
280 f"Cannot serialize VisitImage with unrecognized PSF type {type(self._psf).__name__}."
281 )
282 assert masked_image_model.projection is not None, "VisitImage always has a projection."
283 assert masked_image_model.obs_info is not None, "VisitImage always has observation info."
284 return VisitImageSerializationModel(
285 image=masked_image_model.image,
286 mask=masked_image_model.mask,
287 variance=masked_image_model.variance,
288 projection=masked_image_model.projection,
289 obs_info=masked_image_model.obs_info,
290 psf=serialized_psf,
291 summary_stats=self.summary_stats,
292 metadata=self.metadata,
293 )
295 # Type-checkers want the model argument to only require
296 # MaskedImageSerializationModel[Any], and they'd be absolutely right if
297 # this were a regular instance method. But whether Liskov substitution
298 # applies to classmethods and staticmethods is sort of context-dependent,
299 # and here we do not want it to.
300 @staticmethod
301 def deserialize(
302 model: VisitImageSerializationModel[Any], # type: ignore[override]
303 archive: InputArchive[Any],
304 *,
305 bbox: Box | None = None,
306 ) -> VisitImage:
307 masked_image = MaskedImage.deserialize(model, archive, bbox=bbox)
308 psf = model.deserialize_psf(archive)
309 return VisitImage(
310 masked_image.image,
311 mask=masked_image.mask,
312 variance=masked_image.variance,
313 psf=psf,
314 projection=masked_image.projection,
315 obs_info=masked_image.obs_info,
316 summary_stats=model.summary_stats,
317 )._finish_deserialize(model)
319 @staticmethod
320 def _get_archive_tree_type[P: pydantic.BaseModel](
321 pointer_type: type[P],
322 ) -> type[VisitImageSerializationModel[P]]:
323 """Return the serialization model type for this object for an archive
324 type that uses the given pointer type.
325 """
326 return VisitImageSerializationModel[pointer_type] # type: ignore
328 # write_fits and read_fits inherited from MaskedImage.
330 @staticmethod
331 def from_legacy(
332 legacy: Any,
333 *,
334 unit: astropy.units.Unit | None = None,
335 plane_map: Mapping[str, MaskPlane] | None = None,
336 instrument: str | None = None,
337 visit: int | None = None,
338 ) -> VisitImage:
339 """Convert from an `lsst.afw.image.Exposure` instance.
341 Parameters
342 ----------
343 legacy
344 An `lsst.afw.image.Exposure` instance that will share image and
345 variance (but not mask) pixel data with the returned object.
346 unit
347 Units of the image. If not provided, the ``BUNIT`` metadata
348 key will be used, if available.
349 plane_map
350 A mapping from legacy mask plane name to the new plane name and
351 description. If `None` (default)
352 `get_legacy_visit_image_mask_planes` is used.
353 instrument
354 Name of the instrument. Extracted from the metadata if not
355 provided.
356 visit
357 ID of the visit. Extracted from the metadata if not provided.
358 """
359 if plane_map is None:
360 plane_map = get_legacy_visit_image_mask_planes()
361 md = legacy.getMetadata()
362 obs_info = _obs_info_from_md(md, visit_info=legacy.info.getVisitInfo())
363 instrument = _extract_or_check_header(
364 "LSST BUTLER DATAID INSTRUMENT", instrument, md, obs_info.instrument, str
365 )
366 visit = _extract_or_check_header("LSST BUTLER DATAID VISIT", visit, md, None, int)
367 unit = _extract_or_check_header(
368 "BUNIT", unit, md, None, lambda x: astropy.units.Unit(x, format="fits")
369 )
370 legacy_wcs = legacy.getWcs()
371 if legacy_wcs is None:
372 raise ValueError("Exposure does not have a SkyWcs.")
373 legacy_detector = legacy.getDetector()
374 if legacy_detector is None:
375 raise ValueError("Exposure does not have a Detector.")
376 detector_bbox = Box.from_legacy(legacy_detector.getBBox())
378 # Update the ObservationInfo from other components.
379 obs_info = _update_obs_info_from_legacy(obs_info, legacy_detector, legacy.info.getFilter())
381 opaque_fits_metadata = FitsOpaqueMetadata()
382 primary_header = astropy.io.fits.Header()
383 with warnings.catch_warnings():
384 # Silence warnings about long keys becoming HIERARCH.
385 warnings.simplefilter("ignore", category=astropy.io.fits.verify.VerifyWarning)
386 primary_header.update(md.toOrderedDict())
387 opaque_fits_metadata.extract_legacy_primary_header(primary_header)
388 projection = Projection.from_legacy(
389 legacy_wcs,
390 DetectorFrame(
391 instrument=instrument,
392 visit=visit,
393 detector=legacy_detector.getId(),
394 bbox=detector_bbox,
395 ),
396 )
397 legacy_psf = legacy.getPsf()
398 if legacy_psf is None:
399 raise ValueError("Exposure file does not have a Psf.")
400 psf = PointSpreadFunction.from_legacy(legacy_psf, bounds=detector_bbox)
401 masked_image = MaskedImage.from_legacy(legacy.getMaskedImage(), unit=unit, plane_map=plane_map)
402 legacy_summary_stats = legacy.info.getSummaryStats()
403 result = VisitImage(
404 image=masked_image.image.view(unit=unit),
405 mask=masked_image.mask,
406 variance=masked_image.variance,
407 projection=projection,
408 psf=psf,
409 obs_info=obs_info,
410 summary_stats=(
411 ObservationSummaryStats.from_legacy(legacy_summary_stats)
412 if legacy_summary_stats is not None
413 else None
414 ),
415 )
417 result._opaque_metadata = opaque_fits_metadata
418 return result
420 @overload # type: ignore[override]
421 @staticmethod
422 def read_legacy( 422 ↛ exitline 422 didn't return from function 'read_legacy' because
423 filename: str,
424 *,
425 component: Literal["bbox"],
426 ) -> Box: ...
428 @overload
429 @staticmethod
430 def read_legacy( 430 ↛ exitline 430 didn't return from function 'read_legacy' because
431 filename: str,
432 *,
433 preserve_quantization: bool = False,
434 instrument: str | None = None,
435 visit: int | None = None,
436 component: Literal["image"],
437 ) -> Image: ...
439 @overload
440 @staticmethod
441 def read_legacy( 441 ↛ exitline 441 didn't return from function 'read_legacy' because
442 filename: str,
443 *,
444 plane_map: Mapping[str, MaskPlane] | None = None,
445 instrument: str | None = None,
446 visit: int | None = None,
447 component: Literal["mask"],
448 ) -> Mask: ...
450 @overload
451 @staticmethod
452 def read_legacy( 452 ↛ exitline 452 didn't return from function 'read_legacy' because
453 filename: str,
454 *,
455 preserve_quantization: bool = False,
456 instrument: str | None = None,
457 visit: int | None = None,
458 component: Literal["variance"],
459 ) -> Image: ...
461 @overload
462 @staticmethod
463 def read_legacy( 463 ↛ exitline 463 didn't return from function 'read_legacy' because
464 filename: str,
465 *,
466 instrument: str | None = None,
467 visit: int | None = None,
468 component: Literal["projection"],
469 ) -> Projection[DetectorFrame]: ...
471 @overload
472 @staticmethod
473 def read_legacy( 473 ↛ exitline 473 didn't return from function 'read_legacy' because
474 filename: str,
475 *,
476 component: Literal["psf"],
477 ) -> PointSpreadFunction: ...
479 @overload
480 @staticmethod
481 def read_legacy( 481 ↛ exitline 481 didn't return from function 'read_legacy' because
482 filename: str,
483 *,
484 component: Literal["obs_info"],
485 ) -> ObservationInfo: ...
487 @overload
488 @staticmethod
489 def read_legacy( 489 ↛ exitline 489 didn't return from function 'read_legacy' because
490 filename: str,
491 *,
492 component: Literal["summary_stats"],
493 ) -> ObservationSummaryStats: ...
495 @overload
496 @staticmethod
497 def read_legacy( 497 ↛ exitline 497 didn't return from function 'read_legacy' because
498 filename: str,
499 *,
500 preserve_quantization: bool = False,
501 plane_map: Mapping[str, MaskPlane] | None = None,
502 instrument: str | None = None,
503 visit: int | None = None,
504 component: None = None,
505 ) -> VisitImage: ...
507 @staticmethod
508 def read_legacy( # type: ignore[override]
509 filename: str,
510 *,
511 preserve_quantization: bool = False,
512 plane_map: Mapping[str, MaskPlane] | None = None,
513 instrument: str | None = None,
514 visit: int | None = None,
515 component: Literal[
516 "bbox", "image", "mask", "variance", "projection", "psf", "obs_info", "summary_stats"
517 ]
518 | None = None,
519 ) -> Any:
520 """Read a FITS file written by `lsst.afw.image.Exposure.writeFits`.
522 Parameters
523 ----------
524 filename
525 Full name of the file.
526 preserve_quantization
527 If `True`, ensure that writing the masked image back out again will
528 exactly preserve quantization-compressed pixel values. This causes
529 the image and variance plane arrays to be marked as read-only and
530 stores the original binary table data for those planes in memory.
531 If the `MaskedImage` is copied, the precompressed pixel values are
532 not transferred to the copy.
533 plane_map
534 A mapping from legacy mask plane name to the new plane name and
535 description. If `None` (default)
536 `get_legacy_visit_image_mask_planes` is used.
537 instrument
538 Name of the instrument. Read from the primary header if not
539 provided.
540 visit
541 ID of the visit. Read from the primary header if not
542 provided.
543 component
544 A component to read instead of the full image.
545 """
546 from lsst.afw.image import ExposureFitsReader
548 reader = ExposureFitsReader(filename)
549 if component == "bbox":
550 return Box.from_legacy(reader.readBBox())
551 legacy_detector = reader.readDetector()
552 if legacy_detector is None:
553 raise ValueError(f"Exposure file {filename!r} does not have a Detector.")
554 detector_bbox = Box.from_legacy(legacy_detector.getBBox())
555 legacy_wcs = None
556 if component in (None, "image", "mask", "variance", "projection"):
557 legacy_wcs = reader.readWcs()
558 if legacy_wcs is None:
559 raise ValueError(f"Exposure file {filename!r} does not have a SkyWcs.")
560 legacy_exposure_info = reader.readExposureInfo()
561 summary_stats = None
562 if component in (None, "summary_stats"):
563 legacy_stats = legacy_exposure_info.getSummaryStats()
564 if legacy_stats is not None:
565 summary_stats = ObservationSummaryStats.from_legacy(legacy_stats)
566 if component == "summary_stats":
567 return summary_stats
568 if component in (None, "psf"):
569 legacy_psf = reader.readPsf()
570 if legacy_psf is None:
571 raise ValueError(f"Exposure file {filename!r} does not have a Psf.")
572 psf = PointSpreadFunction.from_legacy(legacy_psf, bounds=detector_bbox)
573 if component == "psf":
574 return psf
575 assert component in (None, "image", "mask", "variance", "projection", "obs_info"), (
576 component
577 ) # for MyPy
578 with astropy.io.fits.open(filename) as hdu_list:
579 primary_header = hdu_list[0].header
580 obs_info = _obs_info_from_md(primary_header)
581 obs_info = _update_obs_info_from_legacy(obs_info, legacy_detector, reader.readFilter())
582 if component == "obs_info":
583 return obs_info
584 instrument = _extract_or_check_header(
585 "LSST BUTLER DATAID INSTRUMENT", instrument, primary_header, obs_info.instrument, str
586 )
587 visit = _extract_or_check_header("LSST BUTLER DATAID VISIT", visit, primary_header, None, int)
588 projection = Projection.from_legacy(
589 legacy_wcs,
590 DetectorFrame(
591 instrument=instrument,
592 visit=visit,
593 detector=legacy_detector.getId(),
594 bbox=detector_bbox,
595 ),
596 )
597 if component == "projection":
598 return projection
599 if plane_map is None:
600 plane_map = get_legacy_visit_image_mask_planes()
601 assert component != "psf", component # for MyPy
602 from_masked_image = MaskedImage._read_legacy_hdus(
603 hdu_list,
604 filename,
605 preserve_quantization=preserve_quantization,
606 plane_map=plane_map,
607 component=component,
608 )
609 if component is not None:
610 # This is the image, mask, or variance; attach the projection
611 # and return
612 return from_masked_image.view(projection=projection)
613 result = VisitImage(
614 from_masked_image.image,
615 mask=from_masked_image.mask,
616 variance=from_masked_image.variance,
617 projection=projection,
618 psf=psf,
619 obs_info=obs_info,
620 summary_stats=summary_stats,
621 )
622 result._opaque_metadata = from_masked_image._opaque_metadata
623 return result
626class VisitImageSerializationModel[P: pydantic.BaseModel](MaskedImageSerializationModel[P]):
627 """A Pydantic model used to represent a serialized `VisitImage`."""
629 # Inherited attributes are duplicated because that improves the docs
630 # (some limitation in the sphinx/pydantic integration), and these are
631 # important docs.
633 image: ImageSerializationModel[P] = pydantic.Field(description="The main data image.")
634 mask: MaskSerializationModel[P] = pydantic.Field(
635 description="Bitmask that annotates the main image's pixels."
636 )
637 variance: ImageSerializationModel[P] = pydantic.Field(
638 description="Per-pixel variance estimates for the main image."
639 )
640 projection: ProjectionSerializationModel[P] = pydantic.Field(
641 description="Projection that maps the pixel grid to the sky.",
642 )
643 psf: PiffSerializationModel | PSFExSerializationModel | GaussianPSFSerializationModel | Any = (
644 pydantic.Field(union_mode="left_to_right", description="PSF model for the image.")
645 )
646 obs_info: ObservationInfo = pydantic.Field(
647 description="Standardized description of visit metadata",
648 )
649 summary_stats: ObservationSummaryStats | None = pydantic.Field(
650 default=None, description="Optional summary statistics for the observation."
651 )
653 def deserialize_psf(self, archive: InputArchive[Any]) -> PointSpreadFunction | ArchiveReadError:
654 """Finish deserializing the PSF model, or *return* any exception
655 raised in the attempt.
656 """
657 try:
658 match self.psf:
659 case PiffSerializationModel():
660 return PiffWrapper.deserialize(self.psf, archive)
661 case PSFExSerializationModel():
662 return PSFExWrapper.deserialize(self.psf, archive)
663 case GaussianPSFSerializationModel():
664 return GaussianPointSpreadFunction.deserialize(self.psf, archive)
665 case _:
666 raise ArchiveReadError("PSF model type not recognized.")
667 except ArchiveReadError as err:
668 return err
671def _extract_or_check_value[T](
672 key: str,
673 given_value: T | None,
674 *sources: tuple[str, T | None],
675) -> T:
676 # Compare given value against multiple sources. If given value is not
677 # supplied return the first non-None value in the reference sources.
678 if given_value is not None:
679 for source_name, source_value in sources:
680 if source_value is not None and source_value != given_value:
681 raise ValueError(
682 f"Given value {given_value!r} does not match {source_value!r} from {source_name}."
683 )
684 if source_value is not None:
685 # Only check the first non-None source rather than checking
686 # all supplied values.
687 break
688 return given_value
690 for _, source_value in sources:
691 if source_value is not None:
692 return source_value
694 raise ValueError(f"No value found for {key}.")
697def _extract_or_check_header[T](
698 key: str, given_value: T | None, header: Any, obs_info_value: T | None, coerce: Callable[[Any], T]
699) -> T:
700 hdr_value: T | None = None
701 if (hdr_raw_value := header.get(key)) is not None:
702 hdr_value = coerce(hdr_raw_value)
703 return _extract_or_check_value(
704 key, given_value, ("ObservationInfo", obs_info_value), (f"header key {key}", hdr_value)
705 )