Coverage for python / lsst / images / _visit_image.py: 28%
232 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 00:04 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 00:04 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14__all__ = ("VisitImage", "VisitImageSerializationModel")
16import functools
17import warnings
18from collections.abc import Callable, Mapping, MutableMapping
19from types import EllipsisType
20from typing import Any, Literal, cast, overload
22import astropy.io.fits
23import astropy.units
24import astropy.wcs
25import pydantic
26from astro_metadata_translator import ObservationInfo, VisitInfoTranslator
28from ._geom import Box
29from ._image import Image, ImageSerializationModel
30from ._mask import Mask, MaskPlane, MaskSchema, MaskSerializationModel, get_legacy_visit_image_mask_planes
31from ._masked_image import MaskedImage, MaskedImageSerializationModel
32from ._transforms import DetectorFrame, Projection, ProjectionAstropyView, ProjectionSerializationModel
33from .fits import FitsOpaqueMetadata
34from .psfs import (
35 GaussianPointSpreadFunction,
36 GaussianPSFSerializationModel,
37 PiffSerializationModel,
38 PiffWrapper,
39 PointSpreadFunction,
40 PSFExSerializationModel,
41 PSFExWrapper,
42)
43from .serialization import ArchiveReadError, InputArchive, MetadataValue, OutputArchive
46def _obs_info_from_md(md: MutableMapping[str, Any], visit_info: Any = None) -> ObservationInfo:
47 # Try to get an ObservationInfo from the primary header as if
48 # it's a raw header. Else fallback.
49 try:
50 obs_info = ObservationInfo.from_header(md, quiet=True)
51 except ValueError:
52 # Not known translator. Must fall back to visit info. If we have
53 # an actual VisitInfo, serialize it since we know that it will be
54 # complete.
55 if visit_info is not None:
56 from lsst.afw.image import setVisitInfoMetadata
57 from lsst.daf.base import PropertyList
59 pl = PropertyList()
60 setVisitInfoMetadata(pl, visit_info)
61 # Merge so that we still have access to butler provenance.
62 md.update(pl)
64 # Try the given header looking for VisitInfo hints.
65 # We get lots of warnings if nothing can be found. Currently
66 # no way to disable those without capturing them.
67 obs_info = ObservationInfo.from_header(md, translator_class=VisitInfoTranslator, quiet=True)
68 return obs_info
71class VisitImage(MaskedImage):
72 """A calibrated single-visit image.
74 Parameters
75 ----------
76 image
77 The main image plane. If this has a `Projection`, it will be used
78 for all planes unless a ``projection`` is passed separately.
79 mask
80 A bitmask image that annotates the main image plane. Must have the
81 same bounding box as ``image`` if provided. Any attached projection
82 is replaced (possibly by `None`).
83 variance
84 The per-pixel uncertainty of the main image as an image of variance
85 values. Must have the same bounding box as ``image`` if provided, and
86 its units must be the square of ``image.unit`` or `None`.
87 Values default to ``1.0``. Any attached projection is replaced
88 (possibly by `None`).
89 mask_schema
90 Schema for the mask plane. Must be provided if and only if ``mask`` is
91 not provided.
92 projection
93 Projection that maps the pixel grid to the sky. Can only be `None` if
94 a projection is already attached to ``image``.
95 psf
96 Point-spread function model for this image, or an exception explaining
97 why it could not be read (to be raised if the PSF is requested later).
98 obs_info
99 General information about this visit in standardized form.
100 metadata
101 Arbitrary flexible metadata to associate with the image.
102 """
104 def __init__(
105 self,
106 image: Image,
107 *,
108 mask: Mask | None = None,
109 variance: Image | None = None,
110 mask_schema: MaskSchema | None = None,
111 projection: Projection[DetectorFrame] | None = None,
112 obs_info: ObservationInfo | None = None,
113 psf: PointSpreadFunction | ArchiveReadError,
114 metadata: dict[str, MetadataValue] | None = None,
115 ):
116 super().__init__(
117 image,
118 mask=mask,
119 variance=variance,
120 mask_schema=mask_schema,
121 projection=projection,
122 obs_info=obs_info,
123 metadata=metadata,
124 )
125 if self.image.unit is None:
126 raise TypeError("The image component of a VisitImage must have units.")
127 if self.image.projection is None:
128 raise TypeError("The projection component of a VisitImage cannot be None.")
129 if self.image.obs_info is None:
130 raise TypeError("The observation info component of a VisitImage cannot be None.")
131 if not isinstance(self.image.projection.pixel_frame, DetectorFrame):
132 raise TypeError("The projection's pixel frame must be a DetectorFrame for VisitImage.")
133 self._psf = psf
135 @property
136 def unit(self) -> astropy.units.UnitBase:
137 """The units of the image plane (`astropy.units.Unit`)."""
138 return cast(astropy.units.UnitBase, super().unit)
140 @property
141 def projection(self) -> Projection[DetectorFrame]:
142 """The projection that maps the pixel grid to the sky
143 (`Projection` [`DetectorFrame`]).
144 """
145 return cast(Projection[DetectorFrame], super().projection)
147 @property
148 def obs_info(self) -> ObservationInfo:
149 """General information about this observation in standard form.
150 (`~astro_metadata_translator.ObservationInfo`).
151 """
152 obs_info = self.image.obs_info
153 assert obs_info is not None
154 return obs_info
156 @property
157 def astropy_wcs(self) -> ProjectionAstropyView:
158 """An Astropy WCS for the pixel arrays (`ProjectionAstropyView`).
160 Notes
161 -----
162 As expected for Astropy WCS objects, this defines pixel coordinates
163 such that the first row and column in the arrays are ``(0, 0)``, not
164 ``bbox.start``, as is the case for `projection`.
166 This object satisfies the `astropy.wcs.wcsapi.BaseHighLevelWCS` and
167 `astropy.wcs.wcsapi.BaseLowLevelWCS` interfaces, but it is not an
168 `astropy.wcs.WCS` (use `fits_wcs` for that).
169 """
170 return cast(ProjectionAstropyView, super().astropy_wcs)
172 @property
173 def psf(self) -> PointSpreadFunction:
174 """The point-spread function model for this image
175 (`.psfs.PointSpreadFunction`).
176 """
177 if isinstance(self._psf, ArchiveReadError):
178 raise self._psf
179 return self._psf
181 def __getitem__(self, bbox: Box | EllipsisType) -> VisitImage:
182 if bbox is ...:
183 return self
184 super().__getitem__(bbox)
185 return self._transfer_metadata(
186 VisitImage(
187 self.image[bbox],
188 mask=self.mask[bbox],
189 variance=self.variance[bbox],
190 projection=self.projection,
191 psf=self.psf,
192 obs_info=self.obs_info,
193 ),
194 bbox=bbox,
195 )
197 def __str__(self) -> str:
198 return f"VisitImage({self.image!s}, {list(self.mask.schema.names)})"
200 def __repr__(self) -> str:
201 return f"VisitImage({self.image!r}, mask_schema={self.mask.schema!r})"
203 def copy(self) -> VisitImage:
204 """Deep-copy the visit image."""
205 return self._transfer_metadata(
206 VisitImage(
207 image=self._image.copy(),
208 mask=self._mask.copy(),
209 variance=self._variance.copy(),
210 psf=self._psf,
211 obs_info=self.obs_info,
212 ),
213 copy=True,
214 )
216 def serialize(self, archive: OutputArchive[Any]) -> VisitImageSerializationModel:
217 serialized_image = archive.serialize_direct(
218 "image", functools.partial(self.image.serialize, save_projection=False)
219 )
220 serialized_mask = archive.serialize_direct(
221 "mask", functools.partial(self.mask.serialize, save_projection=False)
222 )
223 serialized_variance = archive.serialize_direct(
224 "variance", functools.partial(self.variance.serialize, save_projection=False)
225 )
226 serialized_projection = archive.serialize_direct("projection", self.projection.serialize)
227 serialized_psf: PiffSerializationModel | PSFExSerializationModel | GaussianPSFSerializationModel
228 match self._psf:
229 # MyPy is able to figure things out here with this match statement,
230 # but not a single isinstance check on both types.
231 case PiffWrapper():
232 serialized_psf = archive.serialize_direct("psf", self._psf.serialize)
233 case PSFExWrapper():
234 serialized_psf = archive.serialize_direct("psf", self._psf.serialize)
235 case GaussianPointSpreadFunction():
236 serialized_psf = archive.serialize_direct("psf", self._psf.serialize)
237 case _:
238 raise TypeError(
239 f"Cannot serialize VisitImage with unrecognized PSF type {type(self._psf).__name__}."
240 )
241 return VisitImageSerializationModel(
242 image=serialized_image,
243 mask=serialized_mask,
244 variance=serialized_variance,
245 projection=serialized_projection,
246 psf=serialized_psf,
247 obs_info=self.obs_info,
248 metadata=self.metadata,
249 )
251 # Type-checkers want the model argument to only require
252 # MaskedImageSerializationModel[Any], and they'd be absolutely right if
253 # this were a regular instance method. But whether Liskov substitution
254 # applies to classmethods and staticmethods is sort of context-dependent,
255 # and here we do not want it to.
256 @staticmethod
257 def deserialize(
258 model: VisitImageSerializationModel[Any], # type: ignore[override]
259 archive: InputArchive[Any],
260 *,
261 bbox: Box | None = None,
262 ) -> VisitImage:
263 masked_image = MaskedImage.deserialize(model, archive, bbox=bbox)
264 psf = model.deserialize_psf(archive)
265 projection = Projection.deserialize(model.projection, archive)
266 return VisitImage(
267 masked_image.image,
268 mask=masked_image.mask,
269 variance=masked_image.variance,
270 psf=psf,
271 projection=projection,
272 obs_info=model.obs_info,
273 )._finish_deserialize(model)
275 @staticmethod
276 def _get_archive_tree_type[P: pydantic.BaseModel](
277 pointer_type: type[P],
278 ) -> type[VisitImageSerializationModel[P]]:
279 """Return the serialization model type for this object for an archive
280 type that uses the given pointer type.
281 """
282 return VisitImageSerializationModel[pointer_type] # type: ignore
284 # write_fits and read_fits inherited from MaskedImage.
286 @staticmethod
287 def from_legacy(
288 legacy: Any,
289 *,
290 unit: astropy.units.Unit | None = None,
291 plane_map: Mapping[str, MaskPlane] | None = None,
292 instrument: str | None = None,
293 visit: int | None = None,
294 ) -> VisitImage:
295 """Convert from an `lsst.afw.image.Exposure` instance.
297 Parameters
298 ----------
299 legacy
300 An `lsst.afw.image.Exposure` instance that will share image and
301 variance (but not mask) pixel data with the returned object.
302 unit
303 Units of the image. If not provided, the ``BUNIT`` metadata
304 key will be used, if available.
305 plane_map
306 A mapping from legacy mask plane name to the new plane name and
307 description. If `None` (default)
308 `get_legacy_visit_image_mask_planes` is used.
309 instrument
310 Name of the instrument. Extracted from the metadata if not
311 provided.
312 visit
313 ID of the visit. Extracted from the metadata if not provided.
314 """
315 if plane_map is None:
316 plane_map = get_legacy_visit_image_mask_planes()
317 md = legacy.getMetadata()
318 obs_info = _obs_info_from_md(md, visit_info=legacy.info.getVisitInfo())
319 instrument = _extract_or_check_header(
320 "LSST BUTLER DATAID INSTRUMENT", instrument, md, obs_info.instrument, str
321 )
322 visit = _extract_or_check_header("LSST BUTLER DATAID VISIT", visit, md, None, int)
323 unit = _extract_or_check_header(
324 "BUNIT", unit, md, None, lambda x: astropy.units.Unit(x, format="fits")
325 )
326 legacy_wcs = legacy.getWcs()
327 if legacy_wcs is None:
328 raise ValueError("Exposure does not have a SkyWcs.")
329 legacy_detector = legacy.getDetector()
330 if legacy_detector is None:
331 raise ValueError("Exposure does not have a Detector.")
332 detector_bbox = Box.from_legacy(legacy_detector.getBBox())
333 opaque_fits_metadata = FitsOpaqueMetadata()
334 primary_header = astropy.io.fits.Header()
335 with warnings.catch_warnings():
336 # Silence warnings about long keys becoming HIERARCH.
337 warnings.simplefilter("ignore", category=astropy.io.fits.verify.VerifyWarning)
338 primary_header.update(md.toOrderedDict())
339 opaque_fits_metadata.extract_legacy_primary_header(primary_header)
340 projection = Projection.from_legacy(
341 legacy_wcs,
342 DetectorFrame(
343 instrument=instrument,
344 visit=visit,
345 detector=legacy_detector.getId(),
346 bbox=detector_bbox,
347 ),
348 )
349 legacy_psf = legacy.getPsf()
350 if legacy_psf is None:
351 raise ValueError("Exposure file does not have a Psf.")
352 psf = PointSpreadFunction.from_legacy(legacy_psf, bounds=detector_bbox)
353 masked_image = MaskedImage.from_legacy(legacy.getMaskedImage(), unit=unit, plane_map=plane_map)
354 result = VisitImage(
355 image=masked_image.image.view(unit=unit),
356 mask=masked_image.mask,
357 variance=masked_image.variance,
358 projection=projection,
359 psf=psf,
360 obs_info=obs_info,
361 )
363 result._opaque_metadata = opaque_fits_metadata
364 return result
366 @overload # type: ignore[override]
367 @staticmethod
368 def read_legacy( 368 ↛ exitline 368 didn't return from function 'read_legacy' because
369 filename: str,
370 *,
371 component: Literal["bbox"],
372 ) -> Box: ...
374 @overload
375 @staticmethod
376 def read_legacy( 376 ↛ exitline 376 didn't return from function 'read_legacy' because
377 filename: str,
378 *,
379 preserve_quantization: bool = False,
380 instrument: str | None = None,
381 visit: int | None = None,
382 component: Literal["image"],
383 ) -> Image: ...
385 @overload
386 @staticmethod
387 def read_legacy( 387 ↛ exitline 387 didn't return from function 'read_legacy' because
388 filename: str,
389 *,
390 plane_map: Mapping[str, MaskPlane] | None = None,
391 instrument: str | None = None,
392 visit: int | None = None,
393 component: Literal["mask"],
394 ) -> Mask: ...
396 @overload
397 @staticmethod
398 def read_legacy( 398 ↛ exitline 398 didn't return from function 'read_legacy' because
399 filename: str,
400 *,
401 preserve_quantization: bool = False,
402 instrument: str | None = None,
403 visit: int | None = None,
404 component: Literal["variance"],
405 ) -> Image: ...
407 @overload
408 @staticmethod
409 def read_legacy( 409 ↛ exitline 409 didn't return from function 'read_legacy' because
410 filename: str,
411 *,
412 instrument: str | None = None,
413 visit: int | None = None,
414 component: Literal["projection"],
415 ) -> Projection[DetectorFrame]: ...
417 @overload
418 @staticmethod
419 def read_legacy( 419 ↛ exitline 419 didn't return from function 'read_legacy' because
420 filename: str,
421 *,
422 component: Literal["psf"],
423 ) -> PointSpreadFunction: ...
425 @overload
426 @staticmethod
427 def read_legacy( 427 ↛ exitline 427 didn't return from function 'read_legacy' because
428 filename: str,
429 *,
430 component: Literal["obs_info"],
431 ) -> ObservationInfo: ...
433 @overload
434 @staticmethod
435 def read_legacy( 435 ↛ exitline 435 didn't return from function 'read_legacy' because
436 filename: str,
437 *,
438 preserve_quantization: bool = False,
439 plane_map: Mapping[str, MaskPlane] | None = None,
440 instrument: str | None = None,
441 visit: int | None = None,
442 component: None = None,
443 ) -> VisitImage: ...
445 @staticmethod
446 def read_legacy( # type: ignore[override]
447 filename: str,
448 *,
449 preserve_quantization: bool = False,
450 plane_map: Mapping[str, MaskPlane] | None = None,
451 instrument: str | None = None,
452 visit: int | None = None,
453 component: Literal["bbox", "image", "mask", "variance", "projection", "psf", "obs_info"]
454 | None = None,
455 ) -> Any:
456 """Read a FITS file written by `lsst.afw.image.Exposure.writeFits`.
458 Parameters
459 ----------
460 filename
461 Full name of the file.
462 preserve_quantization
463 If `True`, ensure that writing the masked image back out again will
464 exactly preserve quantization-compressed pixel values. This causes
465 the image and variance plane arrays to be marked as read-only and
466 stores the original binary table data for those planes in memory.
467 If the `MaskedImage` is copied, the precompressed pixel values are
468 not transferred to the copy.
469 plane_map
470 A mapping from legacy mask plane name to the new plane name and
471 description. If `None` (default)
472 `get_legacy_visit_image_mask_planes` is used.
473 instrument
474 Name of the instrument. Read from the primary header if not
475 provided.
476 visit
477 ID of the visit. Read from the primary header if not
478 provided.
479 component
480 A component to read instead of the full image.
481 """
482 from lsst.afw.image import ExposureFitsReader
484 reader = ExposureFitsReader(filename)
485 if component == "bbox":
486 return Box.from_legacy(reader.readBBox())
487 legacy_detector = reader.readDetector()
488 if legacy_detector is None:
489 raise ValueError(f"Exposure file {filename!r} does not have a Detector.")
490 detector_bbox = Box.from_legacy(legacy_detector.getBBox())
491 legacy_wcs = None
492 if component in (None, "image", "mask", "variance", "projection"):
493 legacy_wcs = reader.readWcs()
494 if legacy_wcs is None:
495 raise ValueError(f"Exposure file {filename!r} does not have a SkyWcs.")
496 if component in (None, "psf"):
497 legacy_psf = reader.readPsf()
498 if legacy_psf is None:
499 raise ValueError(f"Exposure file {filename!r} does not have a Psf.")
500 psf = PointSpreadFunction.from_legacy(legacy_psf, bounds=detector_bbox)
501 if component == "psf":
502 return psf
503 assert component in (None, "image", "mask", "variance", "projection", "obs_info"), (
504 component
505 ) # for MyPy
506 with astropy.io.fits.open(filename) as hdu_list:
507 primary_header = hdu_list[0].header
508 obs_info = _obs_info_from_md(primary_header)
509 if component == "obs_info":
510 return obs_info
511 instrument = _extract_or_check_header(
512 "LSST BUTLER DATAID INSTRUMENT", instrument, primary_header, obs_info.instrument, str
513 )
514 visit = _extract_or_check_header("LSST BUTLER DATAID VISIT", visit, primary_header, None, int)
515 projection = Projection.from_legacy(
516 legacy_wcs,
517 DetectorFrame(
518 instrument=instrument,
519 visit=visit,
520 detector=legacy_detector.getId(),
521 bbox=detector_bbox,
522 ),
523 )
524 if component == "projection":
525 return projection
526 if plane_map is None:
527 plane_map = get_legacy_visit_image_mask_planes()
528 assert component != "psf", component # for MyPy
529 from_masked_image = MaskedImage._read_legacy_hdus(
530 hdu_list,
531 filename,
532 preserve_quantization=preserve_quantization,
533 plane_map=plane_map,
534 component=component,
535 )
536 if component is not None:
537 # This is the image, mask, or variance; attach the projection
538 # and return
539 return from_masked_image.view(projection=projection)
540 result = VisitImage(
541 from_masked_image.image,
542 mask=from_masked_image.mask,
543 variance=from_masked_image.variance,
544 projection=projection,
545 psf=psf,
546 obs_info=obs_info,
547 )
548 result._opaque_metadata = from_masked_image._opaque_metadata
549 return result
552class VisitImageSerializationModel[P: pydantic.BaseModel](MaskedImageSerializationModel[P]):
553 """A Pydantic model used to represent a serialized `VisitImage`."""
555 # Inherited attributes are duplicated because that improves the docs
556 # (some limitation in the sphinx/pydantic integration), and these are
557 # important docs.
559 image: ImageSerializationModel[P] = pydantic.Field(description="The main data image.")
560 mask: MaskSerializationModel[P] = pydantic.Field(
561 description="Bitmask that annotates the main image's pixels."
562 )
563 variance: ImageSerializationModel[P] = pydantic.Field(
564 description="Per-pixel variance estimates for the main image."
565 )
566 projection: ProjectionSerializationModel[P] = pydantic.Field(
567 description="Projection that maps the pixel grid to the sky.",
568 )
569 psf: PiffSerializationModel | PSFExSerializationModel | GaussianPSFSerializationModel | Any = (
570 pydantic.Field(union_mode="left_to_right", description="PSF model for the image.")
571 )
572 obs_info: ObservationInfo = pydantic.Field(
573 description="Standardized description of visit metadata",
574 )
576 def deserialize_psf(self, archive: InputArchive[Any]) -> PointSpreadFunction | ArchiveReadError:
577 """Finish deserializing the PSF model, or *return* any exception
578 raised in the attempt.
579 """
580 try:
581 match self.psf:
582 case PiffSerializationModel():
583 return PiffWrapper.deserialize(self.psf, archive)
584 case PSFExSerializationModel():
585 return PSFExWrapper.deserialize(self.psf, archive)
586 case GaussianPSFSerializationModel():
587 return GaussianPointSpreadFunction.deserialize(self.psf, archive)
588 case _:
589 raise ArchiveReadError("PSF model type not recognized.")
590 except ArchiveReadError as err:
591 return err
594def _extract_or_check_value[T](
595 key: str,
596 given_value: T | None,
597 *sources: tuple[str, T | None],
598) -> T:
599 # Compare given value against multiple sources. If given value is not
600 # supplied return the first non-None value in the reference sources.
601 if given_value is not None:
602 for source_name, source_value in sources:
603 if source_value is not None and source_value != given_value:
604 raise ValueError(
605 f"Given value {given_value!r} does not match {source_value!r} from {source_name}."
606 )
607 if source_value is not None:
608 # Only check the first non-None source rather than checking
609 # all supplied values.
610 break
611 return given_value
613 for _, source_value in sources:
614 if source_value is not None:
615 return source_value
617 raise ValueError(f"No value found for {key}.")
620def _extract_or_check_header[T](
621 key: str, given_value: T | None, header: Any, obs_info_value: T | None, coerce: Callable[[Any], T]
622) -> T:
623 hdr_value: T | None = None
624 if (hdr_raw_value := header.get(key)) is not None:
625 hdr_value = coerce(hdr_raw_value)
626 return _extract_or_check_value(
627 key, given_value, ("ObservationInfo", obs_info_value), (f"header key {key}", hdr_value)
628 )