Coverage for python / lsst / images / _visit_image.py: 25%
252 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 09:16 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 09:16 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14__all__ = ("VisitImage", "VisitImageSerializationModel")
16import functools
17import warnings
18from collections.abc import Callable, Mapping, MutableMapping
19from types import EllipsisType
20from typing import Any, Literal, cast, overload
22import astropy.io.fits
23import astropy.units
24import astropy.wcs
25import pydantic
26from astro_metadata_translator import ObservationInfo, VisitInfoTranslator
28from ._geom import Box
29from ._image import Image, ImageSerializationModel
30from ._mask import Mask, MaskPlane, MaskSchema, MaskSerializationModel, get_legacy_visit_image_mask_planes
31from ._masked_image import MaskedImage, MaskedImageSerializationModel
32from ._transforms import DetectorFrame, Projection, ProjectionAstropyView, ProjectionSerializationModel
33from .fits import FitsOpaqueMetadata
34from .psfs import (
35 GaussianPointSpreadFunction,
36 GaussianPSFSerializationModel,
37 PiffSerializationModel,
38 PiffWrapper,
39 PointSpreadFunction,
40 PSFExSerializationModel,
41 PSFExWrapper,
42)
43from .serialization import ArchiveReadError, InputArchive, MetadataValue, OutputArchive
46def _obs_info_from_md(md: MutableMapping[str, Any], visit_info: Any = None) -> ObservationInfo:
47 # Try to get an ObservationInfo from the primary header as if
48 # it's a raw header. Else fallback.
49 try:
50 obs_info = ObservationInfo.from_header(md, quiet=True)
51 except ValueError:
52 # Not known translator. Must fall back to visit info. If we have
53 # an actual VisitInfo, serialize it since we know that it will be
54 # complete.
55 if visit_info is not None:
56 from lsst.afw.image import setVisitInfoMetadata
57 from lsst.daf.base import PropertyList
59 pl = PropertyList()
60 setVisitInfoMetadata(pl, visit_info)
61 # Merge so that we still have access to butler provenance.
62 md.update(pl)
64 # Try the given header looking for VisitInfo hints.
65 # We get lots of warnings if nothing can be found. Currently
66 # no way to disable those without capturing them.
67 obs_info = ObservationInfo.from_header(md, translator_class=VisitInfoTranslator, quiet=True)
68 return obs_info
71def _update_obs_info_from_legacy(
72 obs_info: ObservationInfo, detector: Any = None, filter_label: Any = None
73) -> ObservationInfo:
74 extra_md: dict[str, str | int] = {}
76 if filter_label is not None and filter_label.hasBandLabel():
77 extra_md["physical_filter"] = filter_label.physicalLabel
79 # Fill in detector metadata, check for consistency.
80 # ObsInfo detector name and group can not be derived from
81 # the getName() information without knowing how the components
82 # are separated.
83 if detector is not None:
84 detector_md = {
85 "detector_num": detector.getId(),
86 "detector_serial": detector.getSerial(),
87 "detector_unique_name": detector.getName(),
88 }
89 extra_md.update(detector_md)
91 obs_info_updates: dict[str, str | int] = {}
92 for k, v in extra_md.items():
93 current = getattr(obs_info, k)
94 if current is None:
95 obs_info_updates[k] = v
96 continue
97 if current != v:
98 raise RuntimeError(
99 f"ObservationInfo contains value for '{k}' that is inconsistent "
100 f"with given legacy object: {v} != {current}"
101 )
103 if obs_info_updates:
104 obs_info = obs_info.model_copy(update=obs_info_updates)
105 return obs_info
108class VisitImage(MaskedImage):
109 """A calibrated single-visit image.
111 Parameters
112 ----------
113 image
114 The main image plane. If this has a `Projection`, it will be used
115 for all planes unless a ``projection`` is passed separately.
116 mask
117 A bitmask image that annotates the main image plane. Must have the
118 same bounding box as ``image`` if provided. Any attached projection
119 is replaced (possibly by `None`).
120 variance
121 The per-pixel uncertainty of the main image as an image of variance
122 values. Must have the same bounding box as ``image`` if provided, and
123 its units must be the square of ``image.unit`` or `None`.
124 Values default to ``1.0``. Any attached projection is replaced
125 (possibly by `None`).
126 mask_schema
127 Schema for the mask plane. Must be provided if and only if ``mask`` is
128 not provided.
129 projection
130 Projection that maps the pixel grid to the sky. Can only be `None` if
131 a projection is already attached to ``image``.
132 psf
133 Point-spread function model for this image, or an exception explaining
134 why it could not be read (to be raised if the PSF is requested later).
135 obs_info
136 General information about this visit in standardized form.
137 metadata
138 Arbitrary flexible metadata to associate with the image.
139 """
141 def __init__(
142 self,
143 image: Image,
144 *,
145 mask: Mask | None = None,
146 variance: Image | None = None,
147 mask_schema: MaskSchema | None = None,
148 projection: Projection[DetectorFrame] | None = None,
149 obs_info: ObservationInfo | None = None,
150 psf: PointSpreadFunction | ArchiveReadError,
151 metadata: dict[str, MetadataValue] | None = None,
152 ):
153 super().__init__(
154 image,
155 mask=mask,
156 variance=variance,
157 mask_schema=mask_schema,
158 projection=projection,
159 obs_info=obs_info,
160 metadata=metadata,
161 )
162 if self.image.unit is None:
163 raise TypeError("The image component of a VisitImage must have units.")
164 if self.image.projection is None:
165 raise TypeError("The projection component of a VisitImage cannot be None.")
166 if self.image.obs_info is None:
167 raise TypeError("The observation info component of a VisitImage cannot be None.")
168 if not isinstance(self.image.projection.pixel_frame, DetectorFrame):
169 raise TypeError("The projection's pixel frame must be a DetectorFrame for VisitImage.")
170 self._psf = psf
172 @property
173 def unit(self) -> astropy.units.UnitBase:
174 """The units of the image plane (`astropy.units.Unit`)."""
175 return cast(astropy.units.UnitBase, super().unit)
177 @property
178 def projection(self) -> Projection[DetectorFrame]:
179 """The projection that maps the pixel grid to the sky
180 (`Projection` [`DetectorFrame`]).
181 """
182 return cast(Projection[DetectorFrame], super().projection)
184 @property
185 def obs_info(self) -> ObservationInfo:
186 """General information about this observation in standard form.
187 (`~astro_metadata_translator.ObservationInfo`).
188 """
189 obs_info = self.image.obs_info
190 assert obs_info is not None
191 return obs_info
193 @property
194 def astropy_wcs(self) -> ProjectionAstropyView:
195 """An Astropy WCS for the pixel arrays (`ProjectionAstropyView`).
197 Notes
198 -----
199 As expected for Astropy WCS objects, this defines pixel coordinates
200 such that the first row and column in the arrays are ``(0, 0)``, not
201 ``bbox.start``, as is the case for `projection`.
203 This object satisfies the `astropy.wcs.wcsapi.BaseHighLevelWCS` and
204 `astropy.wcs.wcsapi.BaseLowLevelWCS` interfaces, but it is not an
205 `astropy.wcs.WCS` (use `fits_wcs` for that).
206 """
207 return cast(ProjectionAstropyView, super().astropy_wcs)
209 @property
210 def psf(self) -> PointSpreadFunction:
211 """The point-spread function model for this image
212 (`.psfs.PointSpreadFunction`).
213 """
214 if isinstance(self._psf, ArchiveReadError):
215 raise self._psf
216 return self._psf
218 def __getitem__(self, bbox: Box | EllipsisType) -> VisitImage:
219 if bbox is ...:
220 return self
221 super().__getitem__(bbox)
222 return self._transfer_metadata(
223 VisitImage(
224 self.image[bbox],
225 mask=self.mask[bbox],
226 variance=self.variance[bbox],
227 projection=self.projection,
228 psf=self.psf,
229 obs_info=self.obs_info,
230 ),
231 bbox=bbox,
232 )
234 def __str__(self) -> str:
235 return f"VisitImage({self.image!s}, {list(self.mask.schema.names)})"
237 def __repr__(self) -> str:
238 return f"VisitImage({self.image!r}, mask_schema={self.mask.schema!r})"
240 def copy(self) -> VisitImage:
241 """Deep-copy the visit image."""
242 return self._transfer_metadata(
243 VisitImage(
244 image=self._image.copy(),
245 mask=self._mask.copy(),
246 variance=self._variance.copy(),
247 psf=self._psf,
248 obs_info=self.obs_info,
249 ),
250 copy=True,
251 )
253 def serialize(self, archive: OutputArchive[Any]) -> VisitImageSerializationModel:
254 serialized_image = archive.serialize_direct(
255 "image", functools.partial(self.image.serialize, save_projection=False)
256 )
257 serialized_mask = archive.serialize_direct(
258 "mask", functools.partial(self.mask.serialize, save_projection=False)
259 )
260 serialized_variance = archive.serialize_direct(
261 "variance", functools.partial(self.variance.serialize, save_projection=False)
262 )
263 serialized_projection = archive.serialize_direct("projection", self.projection.serialize)
264 serialized_psf: PiffSerializationModel | PSFExSerializationModel | GaussianPSFSerializationModel
265 match self._psf:
266 # MyPy is able to figure things out here with this match statement,
267 # but not a single isinstance check on both types.
268 case PiffWrapper():
269 serialized_psf = archive.serialize_direct("psf", self._psf.serialize)
270 case PSFExWrapper():
271 serialized_psf = archive.serialize_direct("psf", self._psf.serialize)
272 case GaussianPointSpreadFunction():
273 serialized_psf = archive.serialize_direct("psf", self._psf.serialize)
274 case _:
275 raise TypeError(
276 f"Cannot serialize VisitImage with unrecognized PSF type {type(self._psf).__name__}."
277 )
278 return VisitImageSerializationModel(
279 image=serialized_image,
280 mask=serialized_mask,
281 variance=serialized_variance,
282 projection=serialized_projection,
283 psf=serialized_psf,
284 obs_info=self.obs_info,
285 metadata=self.metadata,
286 )
288 # Type-checkers want the model argument to only require
289 # MaskedImageSerializationModel[Any], and they'd be absolutely right if
290 # this were a regular instance method. But whether Liskov substitution
291 # applies to classmethods and staticmethods is sort of context-dependent,
292 # and here we do not want it to.
293 @staticmethod
294 def deserialize(
295 model: VisitImageSerializationModel[Any], # type: ignore[override]
296 archive: InputArchive[Any],
297 *,
298 bbox: Box | None = None,
299 ) -> VisitImage:
300 masked_image = MaskedImage.deserialize(model, archive, bbox=bbox)
301 psf = model.deserialize_psf(archive)
302 projection = Projection.deserialize(model.projection, archive)
303 return VisitImage(
304 masked_image.image,
305 mask=masked_image.mask,
306 variance=masked_image.variance,
307 psf=psf,
308 projection=projection,
309 obs_info=model.obs_info,
310 )._finish_deserialize(model)
312 @staticmethod
313 def _get_archive_tree_type[P: pydantic.BaseModel](
314 pointer_type: type[P],
315 ) -> type[VisitImageSerializationModel[P]]:
316 """Return the serialization model type for this object for an archive
317 type that uses the given pointer type.
318 """
319 return VisitImageSerializationModel[pointer_type] # type: ignore
321 # write_fits and read_fits inherited from MaskedImage.
323 @staticmethod
324 def from_legacy(
325 legacy: Any,
326 *,
327 unit: astropy.units.Unit | None = None,
328 plane_map: Mapping[str, MaskPlane] | None = None,
329 instrument: str | None = None,
330 visit: int | None = None,
331 ) -> VisitImage:
332 """Convert from an `lsst.afw.image.Exposure` instance.
334 Parameters
335 ----------
336 legacy
337 An `lsst.afw.image.Exposure` instance that will share image and
338 variance (but not mask) pixel data with the returned object.
339 unit
340 Units of the image. If not provided, the ``BUNIT`` metadata
341 key will be used, if available.
342 plane_map
343 A mapping from legacy mask plane name to the new plane name and
344 description. If `None` (default)
345 `get_legacy_visit_image_mask_planes` is used.
346 instrument
347 Name of the instrument. Extracted from the metadata if not
348 provided.
349 visit
350 ID of the visit. Extracted from the metadata if not provided.
351 """
352 if plane_map is None:
353 plane_map = get_legacy_visit_image_mask_planes()
354 md = legacy.getMetadata()
355 obs_info = _obs_info_from_md(md, visit_info=legacy.info.getVisitInfo())
356 instrument = _extract_or_check_header(
357 "LSST BUTLER DATAID INSTRUMENT", instrument, md, obs_info.instrument, str
358 )
359 visit = _extract_or_check_header("LSST BUTLER DATAID VISIT", visit, md, None, int)
360 unit = _extract_or_check_header(
361 "BUNIT", unit, md, None, lambda x: astropy.units.Unit(x, format="fits")
362 )
363 legacy_wcs = legacy.getWcs()
364 if legacy_wcs is None:
365 raise ValueError("Exposure does not have a SkyWcs.")
366 legacy_detector = legacy.getDetector()
367 if legacy_detector is None:
368 raise ValueError("Exposure does not have a Detector.")
369 detector_bbox = Box.from_legacy(legacy_detector.getBBox())
371 # Update the ObservationInfo from other components.
372 obs_info = _update_obs_info_from_legacy(obs_info, legacy_detector, legacy.info.getFilter())
374 opaque_fits_metadata = FitsOpaqueMetadata()
375 primary_header = astropy.io.fits.Header()
376 with warnings.catch_warnings():
377 # Silence warnings about long keys becoming HIERARCH.
378 warnings.simplefilter("ignore", category=astropy.io.fits.verify.VerifyWarning)
379 primary_header.update(md.toOrderedDict())
380 opaque_fits_metadata.extract_legacy_primary_header(primary_header)
381 projection = Projection.from_legacy(
382 legacy_wcs,
383 DetectorFrame(
384 instrument=instrument,
385 visit=visit,
386 detector=legacy_detector.getId(),
387 bbox=detector_bbox,
388 ),
389 )
390 legacy_psf = legacy.getPsf()
391 if legacy_psf is None:
392 raise ValueError("Exposure file does not have a Psf.")
393 psf = PointSpreadFunction.from_legacy(legacy_psf, bounds=detector_bbox)
394 masked_image = MaskedImage.from_legacy(legacy.getMaskedImage(), unit=unit, plane_map=plane_map)
395 result = VisitImage(
396 image=masked_image.image.view(unit=unit),
397 mask=masked_image.mask,
398 variance=masked_image.variance,
399 projection=projection,
400 psf=psf,
401 obs_info=obs_info,
402 )
404 result._opaque_metadata = opaque_fits_metadata
405 return result
407 @overload # type: ignore[override]
408 @staticmethod
409 def read_legacy( 409 ↛ exitline 409 didn't return from function 'read_legacy' because
410 filename: str,
411 *,
412 component: Literal["bbox"],
413 ) -> Box: ...
415 @overload
416 @staticmethod
417 def read_legacy( 417 ↛ exitline 417 didn't return from function 'read_legacy' because
418 filename: str,
419 *,
420 preserve_quantization: bool = False,
421 instrument: str | None = None,
422 visit: int | None = None,
423 component: Literal["image"],
424 ) -> Image: ...
426 @overload
427 @staticmethod
428 def read_legacy( 428 ↛ exitline 428 didn't return from function 'read_legacy' because
429 filename: str,
430 *,
431 plane_map: Mapping[str, MaskPlane] | None = None,
432 instrument: str | None = None,
433 visit: int | None = None,
434 component: Literal["mask"],
435 ) -> Mask: ...
437 @overload
438 @staticmethod
439 def read_legacy( 439 ↛ exitline 439 didn't return from function 'read_legacy' because
440 filename: str,
441 *,
442 preserve_quantization: bool = False,
443 instrument: str | None = None,
444 visit: int | None = None,
445 component: Literal["variance"],
446 ) -> Image: ...
448 @overload
449 @staticmethod
450 def read_legacy( 450 ↛ exitline 450 didn't return from function 'read_legacy' because
451 filename: str,
452 *,
453 instrument: str | None = None,
454 visit: int | None = None,
455 component: Literal["projection"],
456 ) -> Projection[DetectorFrame]: ...
458 @overload
459 @staticmethod
460 def read_legacy( 460 ↛ exitline 460 didn't return from function 'read_legacy' because
461 filename: str,
462 *,
463 component: Literal["psf"],
464 ) -> PointSpreadFunction: ...
466 @overload
467 @staticmethod
468 def read_legacy( 468 ↛ exitline 468 didn't return from function 'read_legacy' because
469 filename: str,
470 *,
471 component: Literal["obs_info"],
472 ) -> ObservationInfo: ...
474 @overload
475 @staticmethod
476 def read_legacy( 476 ↛ exitline 476 didn't return from function 'read_legacy' because
477 filename: str,
478 *,
479 preserve_quantization: bool = False,
480 plane_map: Mapping[str, MaskPlane] | None = None,
481 instrument: str | None = None,
482 visit: int | None = None,
483 component: None = None,
484 ) -> VisitImage: ...
486 @staticmethod
487 def read_legacy( # type: ignore[override]
488 filename: str,
489 *,
490 preserve_quantization: bool = False,
491 plane_map: Mapping[str, MaskPlane] | None = None,
492 instrument: str | None = None,
493 visit: int | None = None,
494 component: Literal["bbox", "image", "mask", "variance", "projection", "psf", "obs_info"]
495 | None = None,
496 ) -> Any:
497 """Read a FITS file written by `lsst.afw.image.Exposure.writeFits`.
499 Parameters
500 ----------
501 filename
502 Full name of the file.
503 preserve_quantization
504 If `True`, ensure that writing the masked image back out again will
505 exactly preserve quantization-compressed pixel values. This causes
506 the image and variance plane arrays to be marked as read-only and
507 stores the original binary table data for those planes in memory.
508 If the `MaskedImage` is copied, the precompressed pixel values are
509 not transferred to the copy.
510 plane_map
511 A mapping from legacy mask plane name to the new plane name and
512 description. If `None` (default)
513 `get_legacy_visit_image_mask_planes` is used.
514 instrument
515 Name of the instrument. Read from the primary header if not
516 provided.
517 visit
518 ID of the visit. Read from the primary header if not
519 provided.
520 component
521 A component to read instead of the full image.
522 """
523 from lsst.afw.image import ExposureFitsReader
525 reader = ExposureFitsReader(filename)
526 if component == "bbox":
527 return Box.from_legacy(reader.readBBox())
528 legacy_detector = reader.readDetector()
529 if legacy_detector is None:
530 raise ValueError(f"Exposure file {filename!r} does not have a Detector.")
531 detector_bbox = Box.from_legacy(legacy_detector.getBBox())
532 legacy_wcs = None
533 if component in (None, "image", "mask", "variance", "projection"):
534 legacy_wcs = reader.readWcs()
535 if legacy_wcs is None:
536 raise ValueError(f"Exposure file {filename!r} does not have a SkyWcs.")
537 if component in (None, "psf"):
538 legacy_psf = reader.readPsf()
539 if legacy_psf is None:
540 raise ValueError(f"Exposure file {filename!r} does not have a Psf.")
541 psf = PointSpreadFunction.from_legacy(legacy_psf, bounds=detector_bbox)
542 if component == "psf":
543 return psf
544 assert component in (None, "image", "mask", "variance", "projection", "obs_info"), (
545 component
546 ) # for MyPy
547 with astropy.io.fits.open(filename) as hdu_list:
548 primary_header = hdu_list[0].header
549 obs_info = _obs_info_from_md(primary_header)
550 obs_info = _update_obs_info_from_legacy(obs_info, legacy_detector, reader.readFilter())
551 if component == "obs_info":
552 return obs_info
553 instrument = _extract_or_check_header(
554 "LSST BUTLER DATAID INSTRUMENT", instrument, primary_header, obs_info.instrument, str
555 )
556 visit = _extract_or_check_header("LSST BUTLER DATAID VISIT", visit, primary_header, None, int)
557 projection = Projection.from_legacy(
558 legacy_wcs,
559 DetectorFrame(
560 instrument=instrument,
561 visit=visit,
562 detector=legacy_detector.getId(),
563 bbox=detector_bbox,
564 ),
565 )
566 if component == "projection":
567 return projection
568 if plane_map is None:
569 plane_map = get_legacy_visit_image_mask_planes()
570 assert component != "psf", component # for MyPy
571 from_masked_image = MaskedImage._read_legacy_hdus(
572 hdu_list,
573 filename,
574 preserve_quantization=preserve_quantization,
575 plane_map=plane_map,
576 component=component,
577 )
578 if component is not None:
579 # This is the image, mask, or variance; attach the projection
580 # and return
581 return from_masked_image.view(projection=projection)
582 result = VisitImage(
583 from_masked_image.image,
584 mask=from_masked_image.mask,
585 variance=from_masked_image.variance,
586 projection=projection,
587 psf=psf,
588 obs_info=obs_info,
589 )
590 result._opaque_metadata = from_masked_image._opaque_metadata
591 return result
594class VisitImageSerializationModel[P: pydantic.BaseModel](MaskedImageSerializationModel[P]):
595 """A Pydantic model used to represent a serialized `VisitImage`."""
597 # Inherited attributes are duplicated because that improves the docs
598 # (some limitation in the sphinx/pydantic integration), and these are
599 # important docs.
601 image: ImageSerializationModel[P] = pydantic.Field(description="The main data image.")
602 mask: MaskSerializationModel[P] = pydantic.Field(
603 description="Bitmask that annotates the main image's pixels."
604 )
605 variance: ImageSerializationModel[P] = pydantic.Field(
606 description="Per-pixel variance estimates for the main image."
607 )
608 projection: ProjectionSerializationModel[P] = pydantic.Field(
609 description="Projection that maps the pixel grid to the sky.",
610 )
611 psf: PiffSerializationModel | PSFExSerializationModel | GaussianPSFSerializationModel | Any = (
612 pydantic.Field(union_mode="left_to_right", description="PSF model for the image.")
613 )
614 obs_info: ObservationInfo = pydantic.Field(
615 description="Standardized description of visit metadata",
616 )
618 def deserialize_psf(self, archive: InputArchive[Any]) -> PointSpreadFunction | ArchiveReadError:
619 """Finish deserializing the PSF model, or *return* any exception
620 raised in the attempt.
621 """
622 try:
623 match self.psf:
624 case PiffSerializationModel():
625 return PiffWrapper.deserialize(self.psf, archive)
626 case PSFExSerializationModel():
627 return PSFExWrapper.deserialize(self.psf, archive)
628 case GaussianPSFSerializationModel():
629 return GaussianPointSpreadFunction.deserialize(self.psf, archive)
630 case _:
631 raise ArchiveReadError("PSF model type not recognized.")
632 except ArchiveReadError as err:
633 return err
636def _extract_or_check_value[T](
637 key: str,
638 given_value: T | None,
639 *sources: tuple[str, T | None],
640) -> T:
641 # Compare given value against multiple sources. If given value is not
642 # supplied return the first non-None value in the reference sources.
643 if given_value is not None:
644 for source_name, source_value in sources:
645 if source_value is not None and source_value != given_value:
646 raise ValueError(
647 f"Given value {given_value!r} does not match {source_value!r} from {source_name}."
648 )
649 if source_value is not None:
650 # Only check the first non-None source rather than checking
651 # all supplied values.
652 break
653 return given_value
655 for _, source_value in sources:
656 if source_value is not None:
657 return source_value
659 raise ValueError(f"No value found for {key}.")
662def _extract_or_check_header[T](
663 key: str, given_value: T | None, header: Any, obs_info_value: T | None, coerce: Callable[[Any], T]
664) -> T:
665 hdr_value: T | None = None
666 if (hdr_raw_value := header.get(key)) is not None:
667 hdr_value = coerce(hdr_raw_value)
668 return _extract_or_check_value(
669 key, given_value, ("ObservationInfo", obs_info_value), (f"header key {key}", hdr_value)
670 )