Coverage for python / lsst / images / _masked_image.py: 35%
180 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-07 08:34 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-07 08:34 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14__all__ = ("MaskedImage", "MaskedImageSerializationModel")
16import functools
17from collections.abc import Mapping
18from contextlib import ExitStack
19from types import EllipsisType
20from typing import Any, Literal, overload
22import astropy.io.fits
23import astropy.units
24import astropy.wcs
25import numpy as np
26import pydantic
27from astro_metadata_translator import ObservationInfo
29from lsst.resources import ResourcePath, ResourcePathExpression
31from . import fits
32from ._generalized_image import GeneralizedImage
33from ._geom import Box
34from ._image import Image, ImageSerializationModel
35from ._mask import Mask, MaskPlane, MaskSchema, MaskSerializationModel
36from ._transforms import Frame, Projection, ProjectionSerializationModel
37from .serialization import ArchiveTree, InputArchive, MetadataValue, OutputArchive
38from .utils import is_none
41class MaskedImage(GeneralizedImage):
42 """A multi-plane image with data (image), mask, and variance planes.
44 Parameters
45 ----------
46 image
47 The main image plane. If this has a `Projection`, it will be used
48 for all planes unless a ``projection`` is passed separately.
49 mask
50 A bitmask image that annotates the main image plane. Must have the
51 same bounding box as ``image`` if provided. Any attached projection
52 is replaced (possibly by `None`).
53 variance
54 The per-pixel uncertainty of the main image as an image of variance
55 values. Must have the same bounding box as ``image`` if provided, and
56 its units must be the square of ``image.unit`` or `None`.
57 Values default to ``1.0``. Any attached projection is replaced
58 (possibly by `None`).
59 mask_schema
60 Schema for the mask plane. Must be provided if and only if ``mask`` is
61 not provided.
62 projection
63 Projection that maps the pixel grid to the sky.
64 obs_info
65 General information about the associated observation in standardized
66 form.
67 metadata
68 Arbitrary flexible metadata to associate with the image.
69 """
71 def __init__(
72 self,
73 image: Image,
74 *,
75 mask: Mask | None = None,
76 variance: Image | None = None,
77 mask_schema: MaskSchema | None = None,
78 projection: Projection | None = None,
79 obs_info: ObservationInfo | None = None,
80 metadata: dict[str, MetadataValue] | None = None,
81 ):
82 super().__init__(metadata)
83 if projection is None:
84 projection = image.projection
85 else:
86 image = image.view(projection=projection)
87 if obs_info is None:
88 obs_info = image.obs_info
89 else:
90 image = image.view(obs_info=obs_info)
91 if mask is None:
92 if mask_schema is None:
93 raise TypeError("'mask_schema' must be provided if 'mask' is not.")
94 mask = Mask(schema=mask_schema, bbox=image.bbox, projection=projection, obs_info=obs_info)
95 elif mask_schema is not None:
96 raise TypeError("'mask_schema' may not be provided if 'mask' is.")
97 else:
98 if image.bbox != mask.bbox:
99 raise ValueError(f"Image ({image.bbox}) and mask ({mask.bbox}) bboxes do not agree.")
100 mask = mask.view(projection=projection, obs_info=obs_info)
101 if variance is None:
102 variance = Image(
103 1.0,
104 dtype=np.float32,
105 bbox=image.bbox,
106 unit=None if image.unit is None else image.unit**2,
107 projection=projection,
108 obs_info=obs_info,
109 )
110 else:
111 if image.bbox != variance.bbox:
112 raise ValueError(f"Image ({image.bbox}) and variance ({variance.bbox}) bboxes do not agree.")
113 variance = variance.view(projection=projection, obs_info=obs_info)
114 if image.unit is None:
115 if variance.unit is not None:
116 raise ValueError(f"Image has no units but variance does ({variance.unit}).")
117 elif variance.unit is None:
118 variance = variance.view(unit=image.unit**2)
119 elif variance.unit != image.unit**2:
120 raise ValueError(
121 f"Variance unit ({variance.unit}) should be the square of the image unit ({image.unit})."
122 )
123 self._image = image
124 self._mask = mask
125 self._variance = variance
127 @property
128 def image(self) -> Image:
129 """The main image plane (`~lsst.images.Image`)."""
130 return self._image
132 @property
133 def mask(self) -> Mask:
134 """The mask plane (`~lsst.images.Mask`)."""
135 return self._mask
137 @property
138 def variance(self) -> Image:
139 """The variance plane (`~lsst.images.Image`)."""
140 return self._variance
142 @property
143 def bbox(self) -> Box:
144 """The bounding box shared by all three image planes
145 (`~lsst.images.Box`).
146 """
147 return self._image.bbox
149 @property
150 def unit(self) -> astropy.units.UnitBase | None:
151 """The units of the image plane (`astropy.units.Unit` | `None`)."""
152 return self._image.unit
154 @property
155 def projection(self) -> Projection[Any] | None:
156 """The projection that maps the pixel grid to the sky
157 (`~lsst.images.Projection` | `None`).
158 """
159 return self._image.projection
161 @property
162 def obs_info(self) -> ObservationInfo | None:
163 """General information about the associated observation in standard
164 form. (`~astro_metadata_translator.ObservationInfo` | `None`).
165 """
166 return self._image.obs_info
168 def __getitem__(self, bbox: Box | EllipsisType) -> MaskedImage:
169 if bbox is ...:
170 return self
171 super().__getitem__(bbox)
172 return self._transfer_metadata(
173 MaskedImage(
174 # Projection and obs_info propagate from the image.
175 self.image[bbox],
176 mask=self.mask[bbox],
177 variance=self.variance[bbox],
178 ),
179 bbox=bbox,
180 )
182 def __setitem__(self, bbox: Box | EllipsisType, value: MaskedImage) -> None:
183 self._image[bbox] = value.image
184 self._mask[bbox] = value.mask
185 self._variance[bbox] = value.variance
187 def __str__(self) -> str:
188 return f"MaskedImage({self.image!s}, {list(self.mask.schema.names)})"
190 def __repr__(self) -> str:
191 return f"MaskedImage({self.image!r}, mask_schema={self.mask.schema!r})"
193 def copy(self) -> MaskedImage:
194 """Deep-copy the masked image and metadata."""
195 return self._transfer_metadata(
196 MaskedImage(image=self._image.copy(), mask=self._mask.copy(), variance=self._variance.copy()),
197 copy=True,
198 )
200 def serialize(self, archive: OutputArchive[Any]) -> MaskedImageSerializationModel:
201 """Serialize the masked image to an output archive.
203 Parameters
204 ----------
205 archive
206 Archive to write to.
207 """
208 serialized_image = archive.serialize_direct(
209 "image", functools.partial(self.image.serialize, save_projection=False, save_obs_info=False)
210 )
211 serialized_mask = archive.serialize_direct(
212 "mask", functools.partial(self.mask.serialize, save_projection=False, save_obs_info=False)
213 )
214 serialized_variance = archive.serialize_direct(
215 "variance", functools.partial(self.variance.serialize, save_projection=False, save_obs_info=False)
216 )
217 serialized_projection = (
218 archive.serialize_direct("projection", self.projection.serialize)
219 if self.projection is not None
220 else None
221 )
222 return MaskedImageSerializationModel(
223 image=serialized_image,
224 mask=serialized_mask,
225 variance=serialized_variance,
226 projection=serialized_projection,
227 obs_info=self.obs_info,
228 metadata=self.metadata,
229 )
231 @staticmethod
232 def deserialize(
233 model: MaskedImageSerializationModel[Any], archive: InputArchive[Any], *, bbox: Box | None = None
234 ) -> MaskedImage:
235 """Deserialize an image from an input archive.
237 Parameters
238 ----------
239 model
240 A Pydantic model representation of the image, holding references
241 to data stored in the archive.
242 archive
243 Archive to read from.
244 bbox
245 Bounding box of a subimage to read instead.
246 """
247 image = Image.deserialize(model.image, archive, bbox=bbox)
248 mask = Mask.deserialize(model.mask, archive, bbox=bbox)
249 variance = Image.deserialize(model.variance, archive, bbox=bbox)
250 projection = (
251 Projection.deserialize(model.projection, archive) if model.projection is not None else None
252 )
253 return MaskedImage(
254 image, mask=mask, variance=variance, projection=projection, obs_info=model.obs_info
255 )._finish_deserialize(model)
257 @staticmethod
258 def _get_archive_tree_type[P: pydantic.BaseModel](
259 pointer_type: type[P],
260 ) -> type[MaskedImageSerializationModel[P]]:
261 """Return the serialization model type for this object for an archive
262 type that uses the given pointer type.
263 """
264 return MaskedImageSerializationModel[pointer_type] # type: ignore
266 def write_fits(
267 self,
268 filename: str,
269 *,
270 image_compression: fits.FitsCompressionOptions | None = fits.FitsCompressionOptions.DEFAULT,
271 mask_compression: fits.FitsCompressionOptions | None = fits.FitsCompressionOptions.DEFAULT,
272 variance_compression: fits.FitsCompressionOptions | None = fits.FitsCompressionOptions.DEFAULT,
273 compression_seed: int | None = None,
274 ) -> None:
275 """Write the image to a FITS file.
277 Parameters
278 ----------
279 filename
280 Name of the file to write to. Must be a local file.
281 image_compression
282 Compression options for the `image` plane.
283 mask_compression
284 Compression options for the `mask` plane.
285 variance_compression
286 Compression options for the `variance` plane.
287 compression_seed
288 A FITS tile compression seed to use whenever the configured
289 compression seed is `None` or (for backwards compatibility) ``0``.
290 This value is then incremented every time it is used.
291 """
292 compression_options = {}
293 if image_compression is not fits.FitsCompressionOptions.DEFAULT:
294 compression_options["image"] = image_compression
295 if mask_compression is not fits.FitsCompressionOptions.DEFAULT:
296 compression_options["mask"] = mask_compression
297 if variance_compression is not fits.FitsCompressionOptions.DEFAULT:
298 compression_options["variance"] = variance_compression
299 fits.write(self, filename, compression_options=compression_options, compression_seed=compression_seed)
301 @classmethod
302 def read_fits(cls, url: ResourcePathExpression, *, bbox: Box | None = None) -> MaskedImage:
303 """Read an image from a FITS file.
305 Parameters
306 ----------
307 url
308 URL of the file to read; may be any type supported by
309 `lsst.resources.ResourcePath`.
310 bbox
311 Bounding box of a subimage to read instead.
312 """
313 return fits.read(cls, url, bbox=bbox).deserialized
315 @staticmethod
316 def from_legacy(
317 legacy: Any,
318 *,
319 unit: astropy.units.Unit | None = None,
320 plane_map: Mapping[str, MaskPlane] | None = None,
321 ) -> MaskedImage:
322 """Convert from an `lsst.afw.image.MaskedImage` instance.
324 Parameters
325 ----------
326 legacy
327 An `lsst.afw.image.MaskedImage` instance that will share image and
328 variance (but not mask) pixel data with the returned object.
329 unit
330 Units of the image.
331 plane_map
332 A mapping from legacy mask plane name to the new plane name and
333 description.
334 """
335 return MaskedImage(
336 image=Image.from_legacy(legacy.getImage(), unit),
337 mask=Mask.from_legacy(legacy.getMask(), plane_map),
338 variance=Image.from_legacy(legacy.getVariance()),
339 )
341 def to_legacy(self, *, copy: bool | None = None, plane_map: Mapping[str, MaskPlane] | None = None) -> Any:
342 """Convert to an `lsst.afw.image.MaskedImage` instance.
344 Parameters
345 ----------
346 copy
347 If `True`, always copy the image and variance pixel data.
348 If `False`, return a view, and raise `TypeError` if the pixel data
349 is read-only (this is not supported by afw). If `None`, onyl if
350 the pixel data is read-only. Mask pixel data is always copied.
351 plane_map
352 A mapping from legacy mask plane name to the new plane name and
353 description.
354 """
355 import lsst.afw.image
357 return lsst.afw.image.MaskedImage(
358 self.image.to_legacy(copy=copy),
359 mask=self.mask.to_legacy(plane_map),
360 variance=self.variance.to_legacy(copy=copy),
361 dtype=self.image.array.dtype,
362 )
364 @overload
365 @staticmethod
366 def read_legacy( 366 ↛ exitline 366 didn't return from function 'read_legacy' because
367 uri: ResourcePathExpression,
368 *,
369 preserve_quantization: bool = False,
370 component: Literal["image"],
371 fits_wcs_frame: Frame | None = None,
372 ) -> Image: ...
374 @overload
375 @staticmethod
376 def read_legacy( 376 ↛ exitline 376 didn't return from function 'read_legacy' because
377 uri: ResourcePathExpression,
378 *,
379 plane_map: Mapping[str, MaskPlane] | None = None,
380 component: Literal["mask"],
381 fits_wcs_frame: Frame | None = None,
382 ) -> Mask: ...
384 @overload
385 @staticmethod
386 def read_legacy( 386 ↛ exitline 386 didn't return from function 'read_legacy' because
387 uri: ResourcePathExpression,
388 *,
389 preserve_quantization: bool = False,
390 component: Literal["variance"],
391 fits_wcs_frame: Frame | None = None,
392 ) -> Image: ...
394 @overload
395 @staticmethod
396 def read_legacy( 396 ↛ exitline 396 didn't return from function 'read_legacy' because
397 uri: ResourcePathExpression,
398 *,
399 preserve_quantization: bool = False,
400 plane_map: Mapping[str, MaskPlane] | None = None,
401 component: None = None,
402 fits_wcs_frame: Frame | None = None,
403 ) -> MaskedImage: ...
405 @staticmethod
406 def read_legacy(
407 uri: ResourcePathExpression,
408 *,
409 preserve_quantization: bool = False,
410 plane_map: Mapping[str, MaskPlane] | None = None,
411 component: Literal["image", "mask", "variance"] | None = None,
412 fits_wcs_frame: Frame | None = None,
413 ) -> Any:
414 """Read a FITS file written by `lsst.afw.image.MaskedImage.writeFits`.
416 Parameters
417 ----------
418 uri
419 URI or file name.
420 preserve_quantization
421 If `True`, ensure that writing the masked image back out again will
422 exactly preserve quantization-compressed pixel values. This causes
423 the image and variance plane arrays to be marked as read-only and
424 stores the original binary table data for those planes in memory.
425 If the `~lsst.images.MaskedImage` is copied, the precompressed
426 pixel values are not transferred to the copy.
427 plane_map
428 A mapping from legacy mask plane name to the new plane name and
429 description.
430 component
431 A component to read instead of the full image.
432 fits_wcs_frame
433 If not `None` and the HDU containing the image plane has a FITS
434 WCS, attach a `~lsst.images.Projection` to the returned masked
435 image by converting that WCS. When ``component`` is one of
436 ``"image"``, ``"mask"``, or ``"variance"``, a FITS WCS from the
437 component HDU is used instead (all three should have the same WCS).
438 """
439 fs, fspath = ResourcePath(uri).to_fsspec()
440 with fs.open(fspath) as stream, astropy.io.fits.open(stream) as hdu_list:
441 return MaskedImage._read_legacy_hdus(
442 hdu_list,
443 uri,
444 preserve_quantization=preserve_quantization,
445 plane_map=plane_map,
446 component=component,
447 fits_wcs_frame=fits_wcs_frame,
448 )
450 @staticmethod
451 def _read_legacy_hdus(
452 hdu_list: astropy.io.fits.HDUList,
453 uri: ResourcePathExpression,
454 *,
455 preserve_quantization: bool = False,
456 plane_map: Mapping[str, MaskPlane] | None = None,
457 component: Literal["image", "mask", "variance"] | None,
458 fits_wcs_frame: Frame | None = None,
459 ) -> Any:
460 opaque_metadata = fits.FitsOpaqueMetadata()
461 opaque_metadata.extract_legacy_primary_header(hdu_list[0].header)
462 image_bintable_hdu: astropy.io.fits.BinTableHDU | None = None
463 variance_bintable_hdu: astropy.io.fits.BinTableHDU | None = None
464 result: Any
465 with ExitStack() as exit_stack:
466 if preserve_quantization:
467 fs, fspath = ResourcePath(uri).to_fsspec()
468 bintable_stream = exit_stack.enter_context(fs.open(fspath))
469 bintable_hdu_list = exit_stack.enter_context(
470 astropy.io.fits.open(bintable_stream, disable_image_compression=True)
471 )
472 image_bintable_hdu = bintable_hdu_list[1]
473 variance_bintable_hdu = bintable_hdu_list[3]
474 if component is None or component == "image":
475 image = Image._read_legacy_hdu(
476 hdu_list[1],
477 opaque_metadata,
478 preserve_bintable=image_bintable_hdu,
479 fits_wcs_frame=fits_wcs_frame,
480 )
481 if component == "image":
482 result = image
483 if component is None or component == "mask":
484 mask = Mask._read_legacy_hdu(
485 hdu_list[2],
486 opaque_metadata,
487 plane_map=plane_map,
488 fits_wcs_frame=fits_wcs_frame if component is not None else None,
489 )
490 if component == "mask":
491 result = mask
492 if component is None or component == "variance":
493 variance = Image._read_legacy_hdu(
494 hdu_list[3],
495 opaque_metadata,
496 preserve_bintable=variance_bintable_hdu,
497 fits_wcs_frame=fits_wcs_frame if component is not None else None,
498 )
499 if component == "variance":
500 result = variance
501 if component is None:
502 result = MaskedImage(image, mask=mask, variance=variance)
503 result._opaque_metadata = opaque_metadata
504 return result
507class MaskedImageSerializationModel[P: pydantic.BaseModel](ArchiveTree):
508 """A Pydantic model used to represent a serialized `MaskedImage`."""
510 image: ImageSerializationModel[P] = pydantic.Field(description="The main data image.")
511 mask: MaskSerializationModel[P] = pydantic.Field(
512 description="Bitmask that annotates the main image's pixels."
513 )
514 variance: ImageSerializationModel[P] = pydantic.Field(
515 description="Per-pixel variance estimates for the main image."
516 )
517 projection: ProjectionSerializationModel[P] | None = pydantic.Field(
518 default=None,
519 exclude_if=is_none,
520 description="Projection that maps the pixel grid to the sky.",
521 )
522 obs_info: ObservationInfo | None = pydantic.Field(
523 default=None,
524 exclude_if=is_none,
525 description="Standardized description of image metadata",
526 )
528 @property
529 def bbox(self) -> Box:
530 """The bounding box of the image."""
531 return self.image.bbox