Coverage for python / lsst / images / _masked_image.py: 34%
179 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 09:16 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 09:16 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14__all__ = ("MaskedImage", "MaskedImageSerializationModel")
16import functools
17from collections.abc import Mapping
18from contextlib import ExitStack
19from types import EllipsisType
20from typing import Any, Literal, overload
22import astropy.io.fits
23import astropy.units
24import astropy.wcs
25import numpy as np
26import pydantic
27from astro_metadata_translator import ObservationInfo
29from lsst.resources import ResourcePath, ResourcePathExpression
31from . import fits
32from ._generalized_image import GeneralizedImage
33from ._geom import Box
34from ._image import Image, ImageSerializationModel
35from ._mask import Mask, MaskPlane, MaskSchema, MaskSerializationModel
36from ._transforms import Frame, Projection, ProjectionSerializationModel
37from .serialization import ArchiveTree, InputArchive, MetadataValue, OutputArchive
38from .utils import is_none
41class MaskedImage(GeneralizedImage):
42 """A multi-plane image with data (image), mask, and variance planes.
44 Parameters
45 ----------
46 image
47 The main image plane. If this has a `Projection`, it will be used
48 for all planes unless a ``projection`` is passed separately.
49 mask
50 A bitmask image that annotates the main image plane. Must have the
51 same bounding box as ``image`` if provided. Any attached projection
52 is replaced (possibly by `None`).
53 variance
54 The per-pixel uncertainty of the main image as an image of variance
55 values. Must have the same bounding box as ``image`` if provided, and
56 its units must be the square of ``image.unit`` or `None`.
57 Values default to ``1.0``. Any attached projection is replaced
58 (possibly by `None`).
59 mask_schema
60 Schema for the mask plane. Must be provided if and only if ``mask`` is
61 not provided.
62 projection
63 Projection that maps the pixel grid to the sky.
64 obs_info
65 General information about the associated observation in standardized
66 form.
67 metadata
68 Arbitrary flexible metadata to associate with the image.
69 """
71 def __init__(
72 self,
73 image: Image,
74 *,
75 mask: Mask | None = None,
76 variance: Image | None = None,
77 mask_schema: MaskSchema | None = None,
78 projection: Projection | None = None,
79 obs_info: ObservationInfo | None = None,
80 metadata: dict[str, MetadataValue] | None = None,
81 ):
82 super().__init__(metadata)
83 if projection is None:
84 projection = image.projection
85 else:
86 image = image.view(projection=projection)
87 if obs_info is None:
88 obs_info = image.obs_info
89 else:
90 image = image.view(obs_info=obs_info)
91 if mask is None:
92 if mask_schema is None:
93 raise TypeError("'mask_schema' must be provided if 'mask' is not.")
94 mask = Mask(schema=mask_schema, bbox=image.bbox, projection=projection, obs_info=obs_info)
95 elif mask_schema is not None:
96 raise TypeError("'mask_schema' may not be provided if 'mask' is.")
97 else:
98 if image.bbox != mask.bbox:
99 raise ValueError(f"Image ({image.bbox}) and mask ({mask.bbox}) bboxes do not agree.")
100 mask = mask.view(projection=projection)
101 if variance is None:
102 variance = Image(
103 1.0,
104 dtype=np.float32,
105 bbox=image.bbox,
106 unit=None if image.unit is None else image.unit**2,
107 projection=projection,
108 obs_info=obs_info,
109 )
110 else:
111 if image.bbox != variance.bbox:
112 raise ValueError(f"Image ({image.bbox}) and variance ({variance.bbox}) bboxes do not agree.")
113 variance = variance.view(projection=projection, obs_info=obs_info)
114 if image.unit is None:
115 if variance.unit is not None:
116 raise ValueError(f"Image has no units but variance does ({variance.unit}).")
117 elif variance.unit is None:
118 variance = variance.view(unit=image.unit**2)
119 elif variance.unit != image.unit**2:
120 raise ValueError(
121 f"Variance unit ({variance.unit}) should be the square of the image unit ({image.unit})."
122 )
123 self._image = image
124 self._mask = mask
125 self._variance = variance
127 @property
128 def image(self) -> Image:
129 """The main image plane (`Image`)."""
130 return self._image
132 @property
133 def mask(self) -> Mask:
134 """The mask plane (`Mask`)."""
135 return self._mask
137 @property
138 def variance(self) -> Image:
139 """The variance plane (`Image`)."""
140 return self._variance
142 @property
143 def bbox(self) -> Box:
144 """The bounding box shared by all three image planes (`Box`)."""
145 return self._image.bbox
147 @property
148 def unit(self) -> astropy.units.UnitBase | None:
149 """The units of the image plane (`astropy.units.Unit` | `None`)."""
150 return self._image.unit
152 @property
153 def projection(self) -> Projection[Any] | None:
154 """The projection that maps the pixel grid to the sky
155 (`Projection` | `None`).
156 """
157 return self._image.projection
159 @property
160 def obs_info(self) -> ObservationInfo | None:
161 """General information about the associated observation in standard
162 form. (`~astro_metadata_translator.ObservationInfo` | `None`).
163 """
164 return self._image.obs_info
166 def __getitem__(self, bbox: Box | EllipsisType) -> MaskedImage:
167 if bbox is ...:
168 return self
169 super().__getitem__(bbox)
170 return self._transfer_metadata(
171 MaskedImage(
172 # Projection and obs_info propagate from the image.
173 self.image[bbox],
174 mask=self.mask[bbox],
175 variance=self.variance[bbox],
176 ),
177 bbox=bbox,
178 )
180 def __setitem__(self, bbox: Box | EllipsisType, value: MaskedImage) -> None:
181 self._image[bbox] = value.image
182 self._mask[bbox] = value.mask
183 self._variance[bbox] = value.variance
185 def __str__(self) -> str:
186 return f"MaskedImage({self.image!s}, {list(self.mask.schema.names)})"
188 def __repr__(self) -> str:
189 return f"MaskedImage({self.image!r}, mask_schema={self.mask.schema!r})"
191 def copy(self) -> MaskedImage:
192 """Deep-copy the masked image and metadata."""
193 return self._transfer_metadata(
194 MaskedImage(image=self._image.copy(), mask=self._mask.copy(), variance=self._variance.copy()),
195 copy=True,
196 )
198 def serialize(self, archive: OutputArchive[Any]) -> MaskedImageSerializationModel:
199 """Serialize the masked image to an output archive.
201 Parameters
202 ----------
203 archive
204 Archive to write to.
205 """
206 serialized_image = archive.serialize_direct(
207 "image", functools.partial(self.image.serialize, save_projection=False)
208 )
209 serialized_mask = archive.serialize_direct(
210 "mask", functools.partial(self.mask.serialize, save_projection=False)
211 )
212 serialized_variance = archive.serialize_direct(
213 "variance", functools.partial(self.variance.serialize, save_projection=False)
214 )
215 serialized_projection = (
216 archive.serialize_direct("projection", self.projection.serialize)
217 if self.projection is not None
218 else None
219 )
220 return MaskedImageSerializationModel(
221 image=serialized_image,
222 mask=serialized_mask,
223 variance=serialized_variance,
224 projection=serialized_projection,
225 metadata=self.metadata,
226 )
228 @staticmethod
229 def deserialize(
230 model: MaskedImageSerializationModel[Any], archive: InputArchive[Any], *, bbox: Box | None = None
231 ) -> MaskedImage:
232 """Deserialize an image from an input archive.
234 Parameters
235 ----------
236 model
237 A Pydantic model representation of the image, holding references
238 to data stored in the archive.
239 archive
240 Archive to read from.
241 bbox
242 Bounding box of a subimage to read instead.
243 """
244 image = Image.deserialize(model.image, archive, bbox=bbox)
245 mask = Mask.deserialize(model.mask, archive, bbox=bbox)
246 variance = Image.deserialize(model.variance, archive, bbox=bbox)
247 projection = (
248 Projection.deserialize(model.projection, archive) if model.projection is not None else None
249 )
250 return MaskedImage(image, mask=mask, variance=variance, projection=projection)._finish_deserialize(
251 model
252 )
254 @staticmethod
255 def _get_archive_tree_type[P: pydantic.BaseModel](
256 pointer_type: type[P],
257 ) -> type[MaskedImageSerializationModel[P]]:
258 """Return the serialization model type for this object for an archive
259 type that uses the given pointer type.
260 """
261 return MaskedImageSerializationModel[pointer_type] # type: ignore
263 def write_fits(
264 self,
265 filename: str,
266 *,
267 image_compression: fits.FitsCompressionOptions | None = fits.FitsCompressionOptions.DEFAULT,
268 mask_compression: fits.FitsCompressionOptions | None = fits.FitsCompressionOptions.DEFAULT,
269 variance_compression: fits.FitsCompressionOptions | None = fits.FitsCompressionOptions.DEFAULT,
270 compression_seed: int | None = None,
271 ) -> None:
272 """Write the image to a FITS file.
274 Parameters
275 ----------
276 filename
277 Name of the file to write to. Must be a local file.
278 image_compression
279 Compression options for the `image` plane.
280 mask_compression
281 Compression options for the `mask` plane.
282 variance_compression
283 Compression options for the `variance` plane.
284 compression_seed
285 A FITS tile compression seed to use whenever the configured
286 compression seed is `None` or (for backwards compatibility) ``0``.
287 This value is then incremented every time it is used.
288 """
289 compression_options = {}
290 if image_compression is not fits.FitsCompressionOptions.DEFAULT:
291 compression_options["image"] = image_compression
292 if mask_compression is not fits.FitsCompressionOptions.DEFAULT:
293 compression_options["mask"] = mask_compression
294 if variance_compression is not fits.FitsCompressionOptions.DEFAULT:
295 compression_options["variance"] = variance_compression
296 fits.write(self, filename, compression_options=compression_options, compression_seed=compression_seed)
298 @classmethod
299 def read_fits(cls, url: ResourcePathExpression, *, bbox: Box | None = None) -> MaskedImage:
300 """Read an image from a FITS file.
302 Parameters
303 ----------
304 url
305 URL of the file to read; may be any type supported by
306 `lsst.resources.ResourcePath`.
307 bbox
308 Bounding box of a subimage to read instead.
309 """
310 return fits.read(cls, url, bbox=bbox).deserialized
312 @staticmethod
313 def from_legacy(
314 legacy: Any,
315 *,
316 unit: astropy.units.Unit | None = None,
317 plane_map: Mapping[str, MaskPlane] | None = None,
318 ) -> MaskedImage:
319 """Convert from an `lsst.afw.image.MaskedImage` instance.
321 Parameters
322 ----------
323 legacy
324 An `lsst.afw.image.MaskedImage` instance that will share image and
325 variance (but not mask) pixel data with the returned object.
326 unit
327 Units of the image.
328 plane_map
329 A mapping from legacy mask plane name to the new plane name and
330 description.
331 """
332 return MaskedImage(
333 image=Image.from_legacy(legacy.getImage(), unit),
334 mask=Mask.from_legacy(legacy.getMask(), plane_map),
335 variance=Image.from_legacy(legacy.getVariance()),
336 )
338 def to_legacy(self, *, copy: bool | None = None, plane_map: Mapping[str, MaskPlane] | None = None) -> Any:
339 """Convert to an `lsst.afw.image.MaskedImage` instance.
341 Parameters
342 ----------
343 copy
344 If `True`, always copy the image and variance pixel data.
345 If `False`, return a view, and raise `TypeError` if the pixel data
346 is read-only (this is not supported by afw). If `None`, onyl if
347 the pixel data is read-only. Mask pixel data is always copied.
348 plane_map
349 A mapping from legacy mask plane name to the new plane name and
350 description.
351 """
352 import lsst.afw.image
354 return lsst.afw.image.MaskedImage(
355 self.image.to_legacy(copy=copy),
356 mask=self.mask.to_legacy(plane_map),
357 variance=self.variance.to_legacy(copy=copy),
358 dtype=self.image.array.dtype,
359 )
361 @overload
362 @staticmethod
363 def read_legacy( 363 ↛ exitline 363 didn't return from function 'read_legacy' because
364 uri: ResourcePathExpression,
365 *,
366 preserve_quantization: bool = False,
367 component: Literal["image"],
368 fits_wcs_frame: Frame | None = None,
369 ) -> Image: ...
371 @overload
372 @staticmethod
373 def read_legacy( 373 ↛ exitline 373 didn't return from function 'read_legacy' because
374 uri: ResourcePathExpression,
375 *,
376 plane_map: Mapping[str, MaskPlane] | None = None,
377 component: Literal["mask"],
378 fits_wcs_frame: Frame | None = None,
379 ) -> Mask: ...
381 @overload
382 @staticmethod
383 def read_legacy( 383 ↛ exitline 383 didn't return from function 'read_legacy' because
384 uri: ResourcePathExpression,
385 *,
386 preserve_quantization: bool = False,
387 component: Literal["variance"],
388 fits_wcs_frame: Frame | None = None,
389 ) -> Image: ...
391 @overload
392 @staticmethod
393 def read_legacy( 393 ↛ exitline 393 didn't return from function 'read_legacy' because
394 uri: ResourcePathExpression,
395 *,
396 preserve_quantization: bool = False,
397 plane_map: Mapping[str, MaskPlane] | None = None,
398 component: None = None,
399 fits_wcs_frame: Frame | None = None,
400 ) -> MaskedImage: ...
402 @staticmethod
403 def read_legacy(
404 uri: ResourcePathExpression,
405 *,
406 preserve_quantization: bool = False,
407 plane_map: Mapping[str, MaskPlane] | None = None,
408 component: Literal["image", "mask", "variance"] | None = None,
409 fits_wcs_frame: Frame | None = None,
410 ) -> Any:
411 """Read a FITS file written by `lsst.afw.image.MaskedImage.writeFits`.
413 Parameters
414 ----------
415 uri
416 URI or file name.
417 preserve_quantization
418 If `True`, ensure that writing the masked image back out again will
419 exactly preserve quantization-compressed pixel values. This causes
420 the image and variance plane arrays to be marked as read-only and
421 stores the original binary table data for those planes in memory.
422 If the `MaskedImage` is copied, the precompressed pixel values are
423 not transferred to the copy.
424 plane_map
425 A mapping from legacy mask plane name to the new plane name and
426 description.
427 component
428 A component to read instead of the full image.
429 fits_wcs_frame
430 If not `None` and the HDU containing the image plane has a FITS
431 WCS, attach a `Projection` to the returned masked image by
432 converting that WCS. When ``component`` is one of ``"image"``,
433 ``"mask"``, or ``"variance"``, a FITS WCS from the component HDU
434 is used instead (all three should have the same WCS).
435 """
436 fs, fspath = ResourcePath(uri).to_fsspec()
437 with fs.open(fspath) as stream, astropy.io.fits.open(stream) as hdu_list:
438 return MaskedImage._read_legacy_hdus(
439 hdu_list,
440 uri,
441 preserve_quantization=preserve_quantization,
442 plane_map=plane_map,
443 component=component,
444 fits_wcs_frame=fits_wcs_frame,
445 )
447 @staticmethod
448 def _read_legacy_hdus(
449 hdu_list: astropy.io.fits.HDUList,
450 uri: ResourcePathExpression,
451 *,
452 preserve_quantization: bool = False,
453 plane_map: Mapping[str, MaskPlane] | None = None,
454 component: Literal["image", "mask", "variance"] | None,
455 fits_wcs_frame: Frame | None = None,
456 ) -> Any:
457 opaque_metadata = fits.FitsOpaqueMetadata()
458 opaque_metadata.extract_legacy_primary_header(hdu_list[0].header)
459 image_bintable_hdu: astropy.io.fits.BinTableHDU | None = None
460 variance_bintable_hdu: astropy.io.fits.BinTableHDU | None = None
461 result: Any
462 with ExitStack() as exit_stack:
463 if preserve_quantization:
464 fs, fspath = ResourcePath(uri).to_fsspec()
465 bintable_stream = exit_stack.enter_context(fs.open(fspath))
466 bintable_hdu_list = exit_stack.enter_context(
467 astropy.io.fits.open(bintable_stream, disable_image_compression=True)
468 )
469 image_bintable_hdu = bintable_hdu_list[1]
470 variance_bintable_hdu = bintable_hdu_list[3]
471 if component is None or component == "image":
472 image = Image._read_legacy_hdu(
473 hdu_list[1],
474 opaque_metadata,
475 preserve_bintable=image_bintable_hdu,
476 fits_wcs_frame=fits_wcs_frame,
477 )
478 if component == "image":
479 result = image
480 if component is None or component == "mask":
481 mask = Mask._read_legacy_hdu(
482 hdu_list[2],
483 opaque_metadata,
484 plane_map=plane_map,
485 fits_wcs_frame=fits_wcs_frame if component is not None else None,
486 )
487 if component == "mask":
488 result = mask
489 if component is None or component == "variance":
490 variance = Image._read_legacy_hdu(
491 hdu_list[3],
492 opaque_metadata,
493 preserve_bintable=variance_bintable_hdu,
494 fits_wcs_frame=fits_wcs_frame if component is not None else None,
495 )
496 if component == "variance":
497 result = variance
498 if component is None:
499 result = MaskedImage(image, mask=mask, variance=variance)
500 result._opaque_metadata = opaque_metadata
501 return result
504class MaskedImageSerializationModel[P: pydantic.BaseModel](ArchiveTree):
505 """A Pydantic model used to represent a serialized `MaskedImage`."""
507 image: ImageSerializationModel[P] = pydantic.Field(description="The main data image.")
508 mask: MaskSerializationModel[P] = pydantic.Field(
509 description="Bitmask that annotates the main image's pixels."
510 )
511 variance: ImageSerializationModel[P] = pydantic.Field(
512 description="Per-pixel variance estimates for the main image."
513 )
514 projection: ProjectionSerializationModel[P] | None = pydantic.Field(
515 default=None,
516 exclude_if=is_none,
517 description="Projection that maps the pixel grid to the sky.",
518 )
520 @property
521 def bbox(self) -> Box:
522 """The bounding box of the image."""
523 return self.image.bbox