Coverage for python / lsst / images / _visit_image.py: 26%
288 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-07 08:34 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-07 08:34 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14__all__ = ("VisitImage", "VisitImageSerializationModel")
16import warnings
17from collections.abc import Callable, Mapping, MutableMapping
18from types import EllipsisType
19from typing import Any, Literal, cast, overload
21import astropy.io.fits
22import astropy.units
23import astropy.wcs
24import pydantic
25from astro_metadata_translator import ObservationInfo, VisitInfoTranslator
27from ._geom import Box
28from ._image import Image, ImageSerializationModel
29from ._mask import Mask, MaskPlane, MaskSchema, MaskSerializationModel, get_legacy_visit_image_mask_planes
30from ._masked_image import MaskedImage, MaskedImageSerializationModel
31from ._observation_summary_stats import ObservationSummaryStats
32from ._transforms import DetectorFrame, Projection, ProjectionAstropyView, ProjectionSerializationModel
33from .aperture_corrections import (
34 ApertureCorrectionMap,
35 ApertureCorrectionMapSerializationModel,
36 aperture_corrections_from_legacy,
37)
38from .fits import FitsOpaqueMetadata
39from .psfs import (
40 GaussianPointSpreadFunction,
41 GaussianPSFSerializationModel,
42 PiffSerializationModel,
43 PiffWrapper,
44 PointSpreadFunction,
45 PSFExSerializationModel,
46 PSFExWrapper,
47)
48from .serialization import ArchiveReadError, InputArchive, MetadataValue, OutputArchive
51def _obs_info_from_md(md: MutableMapping[str, Any], visit_info: Any = None) -> ObservationInfo:
52 # Try to get an ObservationInfo from the primary header as if
53 # it's a raw header. Else fallback.
54 try:
55 obs_info = ObservationInfo.from_header(md, quiet=True)
56 except ValueError:
57 # Not known translator. Must fall back to visit info. If we have
58 # an actual VisitInfo, serialize it since we know that it will be
59 # complete.
60 if visit_info is not None:
61 from lsst.afw.image import setVisitInfoMetadata
62 from lsst.daf.base import PropertyList
64 pl = PropertyList()
65 setVisitInfoMetadata(pl, visit_info)
66 # Merge so that we still have access to butler provenance.
67 md.update(pl)
69 # Try the given header looking for VisitInfo hints.
70 # We get lots of warnings if nothing can be found. Currently
71 # no way to disable those without capturing them.
72 obs_info = ObservationInfo.from_header(md, translator_class=VisitInfoTranslator, quiet=True)
73 return obs_info
76def _update_obs_info_from_legacy(
77 obs_info: ObservationInfo, detector: Any = None, filter_label: Any = None
78) -> ObservationInfo:
79 extra_md: dict[str, str | int] = {}
81 if filter_label is not None and filter_label.hasBandLabel():
82 extra_md["physical_filter"] = filter_label.physicalLabel
84 # Fill in detector metadata, check for consistency.
85 # ObsInfo detector name and group can not be derived from
86 # the getName() information without knowing how the components
87 # are separated.
88 if detector is not None:
89 detector_md = {
90 "detector_num": detector.getId(),
91 "detector_serial": detector.getSerial(),
92 "detector_unique_name": detector.getName(),
93 }
94 extra_md.update(detector_md)
96 obs_info_updates: dict[str, str | int] = {}
97 for k, v in extra_md.items():
98 current = getattr(obs_info, k)
99 if current is None:
100 obs_info_updates[k] = v
101 continue
102 if current != v:
103 raise RuntimeError(
104 f"ObservationInfo contains value for '{k}' that is inconsistent "
105 f"with given legacy object: {v} != {current}"
106 )
108 if obs_info_updates:
109 obs_info = obs_info.model_copy(update=obs_info_updates)
110 return obs_info
113class VisitImage(MaskedImage):
114 """A calibrated single-visit image.
116 Parameters
117 ----------
118 image
119 The main image plane. If this has a `Projection`, it will be used
120 for all planes unless a ``projection`` is passed separately.
121 mask
122 A bitmask image that annotates the main image plane. Must have the
123 same bounding box as ``image`` if provided. Any attached projection
124 is replaced (possibly by `None`).
125 variance
126 The per-pixel uncertainty of the main image as an image of variance
127 values. Must have the same bounding box as ``image`` if provided, and
128 its units must be the square of ``image.unit`` or `None`.
129 Values default to ``1.0``. Any attached projection is replaced
130 (possibly by `None`).
131 mask_schema
132 Schema for the mask plane. Must be provided if and only if ``mask`` is
133 not provided.
134 projection
135 Projection that maps the pixel grid to the sky. Can only be `None` if
136 a projection is already attached to ``image``.
137 obs_info
138 General information about this visit in standardized form.
139 summary_stats
140 Summary statistics associated with this visit. Initialized to default
141 values if not provided.
142 psf
143 Point-spread function model for this image, or an exception explaining
144 why it could not be read (to be raised if the PSF is requested later).
145 aperture_corrections : `dict` [`str`, `~fields.BaseField`]
146 Mapping from photometry algorithm name to the aperture correction for
147 that algorithm.
148 metadata
149 Arbitrary flexible metadata to associate with the image.
150 """
152 def __init__(
153 self,
154 image: Image,
155 *,
156 mask: Mask | None = None,
157 variance: Image | None = None,
158 mask_schema: MaskSchema | None = None,
159 projection: Projection[DetectorFrame] | None = None,
160 obs_info: ObservationInfo | None = None,
161 summary_stats: ObservationSummaryStats | None = None,
162 psf: PointSpreadFunction | ArchiveReadError,
163 aperture_corrections: ApertureCorrectionMap | None = None,
164 metadata: dict[str, MetadataValue] | None = None,
165 ):
166 super().__init__(
167 image,
168 mask=mask,
169 variance=variance,
170 mask_schema=mask_schema,
171 projection=projection,
172 obs_info=obs_info,
173 metadata=metadata,
174 )
175 if self.image.unit is None:
176 raise TypeError("The image component of a VisitImage must have units.")
177 if self.image.projection is None:
178 raise TypeError("The projection component of a VisitImage cannot be None.")
179 if self.image.obs_info is None:
180 raise TypeError("The observation info component of a VisitImage cannot be None.")
181 if not isinstance(self.image.projection.pixel_frame, DetectorFrame):
182 raise TypeError("The projection's pixel frame must be a DetectorFrame for VisitImage.")
183 if summary_stats is None:
184 summary_stats = ObservationSummaryStats()
185 self._summary_stats = summary_stats
186 self._psf = psf
187 self._aperture_corrections = aperture_corrections if aperture_corrections is not None else {}
189 @property
190 def unit(self) -> astropy.units.UnitBase:
191 """The units of the image plane (`astropy.units.Unit`)."""
192 return cast(astropy.units.UnitBase, super().unit)
194 @property
195 def projection(self) -> Projection[DetectorFrame]:
196 """The projection that maps the pixel grid to the sky
197 (`Projection` [`DetectorFrame`]).
198 """
199 return cast(Projection[DetectorFrame], super().projection)
201 @property
202 def obs_info(self) -> ObservationInfo:
203 """General information about this observation in standard form.
204 (`~astro_metadata_translator.ObservationInfo`).
205 """
206 obs_info = self.image.obs_info
207 assert obs_info is not None
208 return obs_info
210 @property
211 def astropy_wcs(self) -> ProjectionAstropyView:
212 """An Astropy WCS for the pixel arrays (`ProjectionAstropyView`).
214 Notes
215 -----
216 As expected for Astropy WCS objects, this defines pixel coordinates
217 such that the first row and column in the arrays are ``(0, 0)``, not
218 ``bbox.start``, as is the case for `projection`.
220 This object satisfies the `astropy.wcs.wcsapi.BaseHighLevelWCS` and
221 `astropy.wcs.wcsapi.BaseLowLevelWCS` interfaces, but it is not an
222 `astropy.wcs.WCS` (use `fits_wcs` for that).
223 """
224 return cast(ProjectionAstropyView, super().astropy_wcs)
226 @property
227 def summary_stats(self) -> ObservationSummaryStats:
228 """Optional summary statistics for this observation
229 (`ObservationSummaryStats`).
230 """
231 return self._summary_stats
233 @property
234 def psf(self) -> PointSpreadFunction:
235 """The point-spread function model for this image
236 (`.psfs.PointSpreadFunction`).
237 """
238 if isinstance(self._psf, ArchiveReadError):
239 raise self._psf
240 return self._psf
242 @property
243 def aperture_corrections(self) -> ApertureCorrectionMap:
244 """A mapping from photometry algorithm name to the aperture correction
245 field for that algorithm (`dict` [`str`, `~.fields.BaseField`]).
246 """
247 return self._aperture_corrections
249 def __getitem__(self, bbox: Box | EllipsisType) -> VisitImage:
250 if bbox is ...:
251 return self
252 super().__getitem__(bbox)
253 return self._transfer_metadata(
254 VisitImage(
255 self.image[bbox],
256 mask=self.mask[bbox],
257 variance=self.variance[bbox],
258 projection=self.projection,
259 psf=self.psf,
260 obs_info=self.obs_info,
261 summary_stats=self.summary_stats,
262 aperture_corrections=self.aperture_corrections,
263 ),
264 bbox=bbox,
265 )
267 def __str__(self) -> str:
268 return f"VisitImage({self.image!s}, {list(self.mask.schema.names)})"
270 def __repr__(self) -> str:
271 return f"VisitImage({self.image!r}, mask_schema={self.mask.schema!r})"
273 def copy(self) -> VisitImage:
274 """Deep-copy the visit image."""
275 return self._transfer_metadata(
276 VisitImage(
277 image=self._image.copy(),
278 mask=self._mask.copy(),
279 variance=self._variance.copy(),
280 psf=self._psf,
281 obs_info=self.obs_info,
282 summary_stats=self.summary_stats.model_copy(),
283 aperture_corrections=self.aperture_corrections.copy(),
284 ),
285 copy=True,
286 )
288 def serialize(self, archive: OutputArchive[Any]) -> VisitImageSerializationModel:
289 masked_image_model = super().serialize(archive)
290 serialized_psf: PiffSerializationModel | PSFExSerializationModel | GaussianPSFSerializationModel
291 match self._psf:
292 # MyPy is able to figure things out here with this match statement,
293 # but not a single isinstance check on both types.
294 case PiffWrapper():
295 serialized_psf = archive.serialize_direct("psf", self._psf.serialize)
296 case PSFExWrapper():
297 serialized_psf = archive.serialize_direct("psf", self._psf.serialize)
298 case GaussianPointSpreadFunction():
299 serialized_psf = archive.serialize_direct("psf", self._psf.serialize)
300 case _:
301 raise TypeError(
302 f"Cannot serialize VisitImage with unrecognized PSF type {type(self._psf).__name__}."
303 )
304 assert masked_image_model.projection is not None, "VisitImage always has a projection."
305 assert masked_image_model.obs_info is not None, "VisitImage always has observation info."
306 serialized_aperture_corrections = ApertureCorrectionMapSerializationModel.serialize(
307 self.aperture_corrections, archive
308 )
309 return VisitImageSerializationModel(
310 image=masked_image_model.image,
311 mask=masked_image_model.mask,
312 variance=masked_image_model.variance,
313 projection=masked_image_model.projection,
314 obs_info=masked_image_model.obs_info,
315 psf=serialized_psf,
316 summary_stats=self.summary_stats,
317 aperture_corrections=serialized_aperture_corrections,
318 metadata=self.metadata,
319 )
321 # Type-checkers want the model argument to only require
322 # MaskedImageSerializationModel[Any], and they'd be absolutely right if
323 # this were a regular instance method. But whether Liskov substitution
324 # applies to classmethods and staticmethods is sort of context-dependent,
325 # and here we do not want it to.
326 @staticmethod
327 def deserialize(
328 model: VisitImageSerializationModel[Any], # type: ignore[override]
329 archive: InputArchive[Any],
330 *,
331 bbox: Box | None = None,
332 ) -> VisitImage:
333 masked_image = MaskedImage.deserialize(model, archive, bbox=bbox)
334 psf = model.deserialize_psf(archive)
335 aperture_corrections = model.aperture_corrections.deserialize(archive)
336 return VisitImage(
337 masked_image.image,
338 mask=masked_image.mask,
339 variance=masked_image.variance,
340 psf=psf,
341 projection=masked_image.projection,
342 obs_info=masked_image.obs_info,
343 summary_stats=model.summary_stats,
344 aperture_corrections=aperture_corrections,
345 )._finish_deserialize(model)
347 @staticmethod
348 def _get_archive_tree_type[P: pydantic.BaseModel](
349 pointer_type: type[P],
350 ) -> type[VisitImageSerializationModel[P]]:
351 """Return the serialization model type for this object for an archive
352 type that uses the given pointer type.
353 """
354 return VisitImageSerializationModel[pointer_type] # type: ignore
356 # write_fits and read_fits inherited from MaskedImage.
358 @staticmethod
359 def from_legacy(
360 legacy: Any,
361 *,
362 unit: astropy.units.Unit | None = None,
363 plane_map: Mapping[str, MaskPlane] | None = None,
364 instrument: str | None = None,
365 visit: int | None = None,
366 ) -> VisitImage:
367 """Convert from an `lsst.afw.image.Exposure` instance.
369 Parameters
370 ----------
371 legacy
372 An `lsst.afw.image.Exposure` instance that will share image and
373 variance (but not mask) pixel data with the returned object.
374 unit
375 Units of the image. If not provided, the ``BUNIT`` metadata
376 key will be used, if available.
377 plane_map
378 A mapping from legacy mask plane name to the new plane name and
379 description. If `None` (default)
380 `get_legacy_visit_image_mask_planes` is used.
381 instrument
382 Name of the instrument. Extracted from the metadata if not
383 provided.
384 visit
385 ID of the visit. Extracted from the metadata if not provided.
386 """
387 if plane_map is None:
388 plane_map = get_legacy_visit_image_mask_planes()
389 md = legacy.getMetadata()
390 obs_info = _obs_info_from_md(md, visit_info=legacy.info.getVisitInfo())
391 instrument = _extract_or_check_header(
392 "LSST BUTLER DATAID INSTRUMENT", instrument, md, obs_info.instrument, str
393 )
394 visit = _extract_or_check_header("LSST BUTLER DATAID VISIT", visit, md, None, int)
395 unit = _extract_or_check_header(
396 "BUNIT", unit, md, None, lambda x: astropy.units.Unit(x, format="fits")
397 )
398 legacy_wcs = legacy.getWcs()
399 if legacy_wcs is None:
400 raise ValueError("Exposure does not have a SkyWcs.")
401 legacy_detector = legacy.getDetector()
402 if legacy_detector is None:
403 raise ValueError("Exposure does not have a Detector.")
404 detector_bbox = Box.from_legacy(legacy_detector.getBBox())
406 # Update the ObservationInfo from other components.
407 obs_info = _update_obs_info_from_legacy(obs_info, legacy_detector, legacy.info.getFilter())
409 opaque_fits_metadata = FitsOpaqueMetadata()
410 primary_header = astropy.io.fits.Header()
411 with warnings.catch_warnings():
412 # Silence warnings about long keys becoming HIERARCH.
413 warnings.simplefilter("ignore", category=astropy.io.fits.verify.VerifyWarning)
414 primary_header.update(md.toOrderedDict())
415 opaque_fits_metadata.extract_legacy_primary_header(primary_header)
416 projection = Projection.from_legacy(
417 legacy_wcs,
418 DetectorFrame(
419 instrument=instrument,
420 visit=visit,
421 detector=legacy_detector.getId(),
422 bbox=detector_bbox,
423 ),
424 )
425 legacy_psf = legacy.getPsf()
426 if legacy_psf is None:
427 raise ValueError("Exposure file does not have a Psf.")
428 psf = PointSpreadFunction.from_legacy(legacy_psf, bounds=detector_bbox)
429 masked_image = MaskedImage.from_legacy(legacy.getMaskedImage(), unit=unit, plane_map=plane_map)
430 legacy_summary_stats = legacy.info.getSummaryStats()
431 legacy_ap_corr_map = legacy.info.getApCorrMap()
432 result = VisitImage(
433 image=masked_image.image.view(unit=unit),
434 mask=masked_image.mask,
435 variance=masked_image.variance,
436 projection=projection,
437 psf=psf,
438 obs_info=obs_info,
439 summary_stats=(
440 ObservationSummaryStats.from_legacy(legacy_summary_stats)
441 if legacy_summary_stats is not None
442 else None
443 ),
444 aperture_corrections=(
445 aperture_corrections_from_legacy(legacy_ap_corr_map)
446 if legacy_ap_corr_map is not None
447 else None
448 ),
449 )
451 result._opaque_metadata = opaque_fits_metadata
452 return result
454 @overload # type: ignore[override]
455 @staticmethod
456 def read_legacy( 456 ↛ exitline 456 didn't return from function 'read_legacy' because
457 filename: str,
458 *,
459 component: Literal["bbox"],
460 ) -> Box: ...
462 @overload
463 @staticmethod
464 def read_legacy( 464 ↛ exitline 464 didn't return from function 'read_legacy' because
465 filename: str,
466 *,
467 preserve_quantization: bool = False,
468 instrument: str | None = None,
469 visit: int | None = None,
470 component: Literal["image"],
471 ) -> Image: ...
473 @overload
474 @staticmethod
475 def read_legacy( 475 ↛ exitline 475 didn't return from function 'read_legacy' because
476 filename: str,
477 *,
478 plane_map: Mapping[str, MaskPlane] | None = None,
479 instrument: str | None = None,
480 visit: int | None = None,
481 component: Literal["mask"],
482 ) -> Mask: ...
484 @overload
485 @staticmethod
486 def read_legacy( 486 ↛ exitline 486 didn't return from function 'read_legacy' because
487 filename: str,
488 *,
489 preserve_quantization: bool = False,
490 instrument: str | None = None,
491 visit: int | None = None,
492 component: Literal["variance"],
493 ) -> Image: ...
495 @overload
496 @staticmethod
497 def read_legacy( 497 ↛ exitline 497 didn't return from function 'read_legacy' because
498 filename: str,
499 *,
500 instrument: str | None = None,
501 visit: int | None = None,
502 component: Literal["projection"],
503 ) -> Projection[DetectorFrame]: ...
505 @overload
506 @staticmethod
507 def read_legacy( 507 ↛ exitline 507 didn't return from function 'read_legacy' because
508 filename: str,
509 *,
510 component: Literal["psf"],
511 ) -> PointSpreadFunction: ...
513 @overload
514 @staticmethod
515 def read_legacy( 515 ↛ exitline 515 didn't return from function 'read_legacy' because
516 filename: str,
517 *,
518 component: Literal["obs_info"],
519 ) -> ObservationInfo: ...
521 @overload
522 @staticmethod
523 def read_legacy( 523 ↛ exitline 523 didn't return from function 'read_legacy' because
524 filename: str,
525 *,
526 component: Literal["summary_stats"],
527 ) -> ObservationSummaryStats: ...
529 @overload
530 @staticmethod
531 def read_legacy( 531 ↛ exitline 531 didn't return from function 'read_legacy' because
532 filename: str,
533 *,
534 component: Literal["aperture_corrections"],
535 ) -> ApertureCorrectionMap: ...
537 @overload
538 @staticmethod
539 def read_legacy( 539 ↛ exitline 539 didn't return from function 'read_legacy' because
540 filename: str,
541 *,
542 preserve_quantization: bool = False,
543 plane_map: Mapping[str, MaskPlane] | None = None,
544 instrument: str | None = None,
545 visit: int | None = None,
546 component: None = None,
547 ) -> VisitImage: ...
549 @staticmethod
550 def read_legacy( # type: ignore[override]
551 filename: str,
552 *,
553 preserve_quantization: bool = False,
554 plane_map: Mapping[str, MaskPlane] | None = None,
555 instrument: str | None = None,
556 visit: int | None = None,
557 component: Literal[
558 "bbox",
559 "image",
560 "mask",
561 "variance",
562 "projection",
563 "psf",
564 "obs_info",
565 "summary_stats",
566 "aperture_corrections",
567 ]
568 | None = None,
569 ) -> Any:
570 """Read a FITS file written by `lsst.afw.image.Exposure.writeFits`.
572 Parameters
573 ----------
574 filename
575 Full name of the file.
576 preserve_quantization
577 If `True`, ensure that writing the masked image back out again will
578 exactly preserve quantization-compressed pixel values. This causes
579 the image and variance plane arrays to be marked as read-only and
580 stores the original binary table data for those planes in memory.
581 If the `MaskedImage` is copied, the precompressed pixel values are
582 not transferred to the copy.
583 plane_map
584 A mapping from legacy mask plane name to the new plane name and
585 description. If `None` (default)
586 `get_legacy_visit_image_mask_planes` is used.
587 instrument
588 Name of the instrument. Read from the primary header if not
589 provided.
590 visit
591 ID of the visit. Read from the primary header if not
592 provided.
593 component
594 A component to read instead of the full image.
595 """
596 from lsst.afw.image import ExposureFitsReader
598 reader = ExposureFitsReader(filename)
599 if component == "bbox":
600 return Box.from_legacy(reader.readBBox())
601 legacy_detector = reader.readDetector()
602 if legacy_detector is None:
603 raise ValueError(f"Exposure file {filename!r} does not have a Detector.")
604 detector_bbox = Box.from_legacy(legacy_detector.getBBox())
605 legacy_wcs = None
606 if component in (None, "image", "mask", "variance", "projection"):
607 legacy_wcs = reader.readWcs()
608 if legacy_wcs is None:
609 raise ValueError(f"Exposure file {filename!r} does not have a SkyWcs.")
610 legacy_exposure_info = reader.readExposureInfo()
611 summary_stats = None
612 if component in (None, "summary_stats"):
613 legacy_stats = legacy_exposure_info.getSummaryStats()
614 if legacy_stats is not None:
615 summary_stats = ObservationSummaryStats.from_legacy(legacy_stats)
616 if component == "summary_stats":
617 return summary_stats
618 if component in (None, "psf"):
619 legacy_psf = reader.readPsf()
620 if legacy_psf is None:
621 raise ValueError(f"Exposure file {filename!r} does not have a Psf.")
622 psf = PointSpreadFunction.from_legacy(legacy_psf, bounds=detector_bbox)
623 if component == "psf":
624 return psf
625 aperture_corrections: ApertureCorrectionMap = {}
626 if component in (None, "aperture_corrections"):
627 legacy_ap_corr_map = reader.readApCorrMap()
628 if legacy_ap_corr_map is not None:
629 aperture_corrections = aperture_corrections_from_legacy(legacy_ap_corr_map)
630 if component == "aperture_corrections":
631 return aperture_corrections
632 assert component in (None, "image", "mask", "variance", "projection", "obs_info"), (
633 component
634 ) # for MyPy
635 with astropy.io.fits.open(filename) as hdu_list:
636 primary_header = hdu_list[0].header
637 obs_info = _obs_info_from_md(primary_header)
638 obs_info = _update_obs_info_from_legacy(obs_info, legacy_detector, reader.readFilter())
639 if component == "obs_info":
640 return obs_info
641 instrument = _extract_or_check_header(
642 "LSST BUTLER DATAID INSTRUMENT", instrument, primary_header, obs_info.instrument, str
643 )
644 visit = _extract_or_check_header("LSST BUTLER DATAID VISIT", visit, primary_header, None, int)
645 projection = Projection.from_legacy(
646 legacy_wcs,
647 DetectorFrame(
648 instrument=instrument,
649 visit=visit,
650 detector=legacy_detector.getId(),
651 bbox=detector_bbox,
652 ),
653 )
654 if component == "projection":
655 return projection
656 if plane_map is None:
657 plane_map = get_legacy_visit_image_mask_planes()
658 assert component != "psf", component # for MyPy
659 from_masked_image = MaskedImage._read_legacy_hdus(
660 hdu_list,
661 filename,
662 preserve_quantization=preserve_quantization,
663 plane_map=plane_map,
664 component=component,
665 )
666 if component is not None:
667 # This is the image, mask, or variance; attach the projection and
668 # obs_info and return
669 return from_masked_image.view(projection=projection, obs_info=obs_info)
670 result = VisitImage(
671 from_masked_image.image,
672 mask=from_masked_image.mask,
673 variance=from_masked_image.variance,
674 projection=projection,
675 psf=psf,
676 obs_info=obs_info,
677 summary_stats=summary_stats,
678 aperture_corrections=aperture_corrections,
679 )
680 result._opaque_metadata = from_masked_image._opaque_metadata
681 return result
684class VisitImageSerializationModel[P: pydantic.BaseModel](MaskedImageSerializationModel[P]):
685 """A Pydantic model used to represent a serialized `VisitImage`."""
687 # Inherited attributes are duplicated because that improves the docs
688 # (some limitation in the sphinx/pydantic integration), and these are
689 # important docs.
691 image: ImageSerializationModel[P] = pydantic.Field(description="The main data image.")
692 mask: MaskSerializationModel[P] = pydantic.Field(
693 description="Bitmask that annotates the main image's pixels."
694 )
695 variance: ImageSerializationModel[P] = pydantic.Field(
696 description="Per-pixel variance estimates for the main image."
697 )
698 projection: ProjectionSerializationModel[P] = pydantic.Field(
699 description="Projection that maps the pixel grid to the sky.",
700 )
701 psf: PiffSerializationModel | PSFExSerializationModel | GaussianPSFSerializationModel | Any = (
702 pydantic.Field(union_mode="left_to_right", description="PSF model for the image.")
703 )
704 obs_info: ObservationInfo = pydantic.Field(
705 description="Standardized description of visit metadata",
706 )
707 summary_stats: ObservationSummaryStats = pydantic.Field(
708 description="Summary statistics for the observation."
709 )
710 aperture_corrections: ApertureCorrectionMapSerializationModel = pydantic.Field(
711 default_factory=ApertureCorrectionMapSerializationModel,
712 description="Aperture corrections, keyed by flux algorithm.",
713 )
715 def deserialize_psf(self, archive: InputArchive[Any]) -> PointSpreadFunction | ArchiveReadError:
716 """Finish deserializing the PSF model, or *return* any exception
717 raised in the attempt.
718 """
719 try:
720 match self.psf:
721 case PiffSerializationModel():
722 return PiffWrapper.deserialize(self.psf, archive)
723 case PSFExSerializationModel():
724 return PSFExWrapper.deserialize(self.psf, archive)
725 case GaussianPSFSerializationModel():
726 return GaussianPointSpreadFunction.deserialize(self.psf, archive)
727 case _:
728 raise ArchiveReadError("PSF model type not recognized.")
729 except ArchiveReadError as err:
730 return err
733def _extract_or_check_value[T](
734 key: str,
735 given_value: T | None,
736 *sources: tuple[str, T | None],
737) -> T:
738 # Compare given value against multiple sources. If given value is not
739 # supplied return the first non-None value in the reference sources.
740 if given_value is not None:
741 for source_name, source_value in sources:
742 if source_value is not None and source_value != given_value:
743 raise ValueError(
744 f"Given value {given_value!r} does not match {source_value!r} from {source_name}."
745 )
746 if source_value is not None:
747 # Only check the first non-None source rather than checking
748 # all supplied values.
749 break
750 return given_value
752 for _, source_value in sources:
753 if source_value is not None:
754 return source_value
756 raise ValueError(f"No value found for {key}.")
759def _extract_or_check_header[T](
760 key: str, given_value: T | None, header: Any, obs_info_value: T | None, coerce: Callable[[Any], T]
761) -> T:
762 hdr_value: T | None = None
763 if (hdr_raw_value := header.get(key)) is not None:
764 hdr_value = coerce(hdr_raw_value)
765 return _extract_or_check_value(
766 key, given_value, ("ObservationInfo", obs_info_value), (f"header key {key}", hdr_value)
767 )