Coverage for python / lsst / images / _masked_image.py: 35%
180 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 09:01 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 09:01 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14__all__ = ("MaskedImage", "MaskedImageSerializationModel")
16import functools
17from collections.abc import Mapping
18from contextlib import ExitStack
19from types import EllipsisType
20from typing import Any, Literal, overload
22import astropy.io.fits
23import astropy.units
24import astropy.wcs
25import numpy as np
26import pydantic
27from astro_metadata_translator import ObservationInfo
29from lsst.resources import ResourcePath, ResourcePathExpression
31from . import fits
32from ._generalized_image import GeneralizedImage
33from ._geom import Box
34from ._image import Image, ImageSerializationModel
35from ._mask import Mask, MaskPlane, MaskSchema, MaskSerializationModel
36from ._transforms import Frame, Projection, ProjectionSerializationModel
37from .serialization import ArchiveTree, InputArchive, MetadataValue, OutputArchive
38from .utils import is_none
41class MaskedImage(GeneralizedImage):
42 """A multi-plane image with data (image), mask, and variance planes.
44 Parameters
45 ----------
46 image
47 The main image plane. If this has a `Projection`, it will be used
48 for all planes unless a ``projection`` is passed separately.
49 mask
50 A bitmask image that annotates the main image plane. Must have the
51 same bounding box as ``image`` if provided. Any attached projection
52 is replaced (possibly by `None`).
53 variance
54 The per-pixel uncertainty of the main image as an image of variance
55 values. Must have the same bounding box as ``image`` if provided, and
56 its units must be the square of ``image.unit`` or `None`.
57 Values default to ``1.0``. Any attached projection is replaced
58 (possibly by `None`).
59 mask_schema
60 Schema for the mask plane. Must be provided if and only if ``mask`` is
61 not provided.
62 projection
63 Projection that maps the pixel grid to the sky.
64 obs_info
65 General information about the associated observation in standardized
66 form.
67 metadata
68 Arbitrary flexible metadata to associate with the image.
69 """
71 def __init__(
72 self,
73 image: Image,
74 *,
75 mask: Mask | None = None,
76 variance: Image | None = None,
77 mask_schema: MaskSchema | None = None,
78 projection: Projection | None = None,
79 obs_info: ObservationInfo | None = None,
80 metadata: dict[str, MetadataValue] | None = None,
81 ):
82 super().__init__(metadata)
83 if projection is None:
84 projection = image.projection
85 else:
86 image = image.view(projection=projection)
87 if obs_info is None:
88 obs_info = image.obs_info
89 else:
90 image = image.view(obs_info=obs_info)
91 if mask is None:
92 if mask_schema is None:
93 raise TypeError("'mask_schema' must be provided if 'mask' is not.")
94 mask = Mask(schema=mask_schema, bbox=image.bbox, projection=projection, obs_info=obs_info)
95 elif mask_schema is not None:
96 raise TypeError("'mask_schema' may not be provided if 'mask' is.")
97 else:
98 if image.bbox != mask.bbox:
99 raise ValueError(f"Image ({image.bbox}) and mask ({mask.bbox}) bboxes do not agree.")
100 mask = mask.view(projection=projection, obs_info=obs_info)
101 if variance is None:
102 variance = Image(
103 1.0,
104 dtype=np.float32,
105 bbox=image.bbox,
106 unit=None if image.unit is None else image.unit**2,
107 projection=projection,
108 obs_info=obs_info,
109 )
110 else:
111 if image.bbox != variance.bbox:
112 raise ValueError(f"Image ({image.bbox}) and variance ({variance.bbox}) bboxes do not agree.")
113 variance = variance.view(projection=projection, obs_info=obs_info)
114 if image.unit is None:
115 if variance.unit is not None:
116 raise ValueError(f"Image has no units but variance does ({variance.unit}).")
117 elif variance.unit is None:
118 variance = variance.view(unit=image.unit**2)
119 elif variance.unit != image.unit**2:
120 raise ValueError(
121 f"Variance unit ({variance.unit}) should be the square of the image unit ({image.unit})."
122 )
123 self._image = image
124 self._mask = mask
125 self._variance = variance
127 @property
128 def image(self) -> Image:
129 """The main image plane (`Image`)."""
130 return self._image
132 @property
133 def mask(self) -> Mask:
134 """The mask plane (`Mask`)."""
135 return self._mask
137 @property
138 def variance(self) -> Image:
139 """The variance plane (`Image`)."""
140 return self._variance
142 @property
143 def bbox(self) -> Box:
144 """The bounding box shared by all three image planes (`Box`)."""
145 return self._image.bbox
147 @property
148 def unit(self) -> astropy.units.UnitBase | None:
149 """The units of the image plane (`astropy.units.Unit` | `None`)."""
150 return self._image.unit
152 @property
153 def projection(self) -> Projection[Any] | None:
154 """The projection that maps the pixel grid to the sky
155 (`Projection` | `None`).
156 """
157 return self._image.projection
159 @property
160 def obs_info(self) -> ObservationInfo | None:
161 """General information about the associated observation in standard
162 form. (`~astro_metadata_translator.ObservationInfo` | `None`).
163 """
164 return self._image.obs_info
166 def __getitem__(self, bbox: Box | EllipsisType) -> MaskedImage:
167 if bbox is ...:
168 return self
169 super().__getitem__(bbox)
170 return self._transfer_metadata(
171 MaskedImage(
172 # Projection and obs_info propagate from the image.
173 self.image[bbox],
174 mask=self.mask[bbox],
175 variance=self.variance[bbox],
176 ),
177 bbox=bbox,
178 )
180 def __setitem__(self, bbox: Box | EllipsisType, value: MaskedImage) -> None:
181 self._image[bbox] = value.image
182 self._mask[bbox] = value.mask
183 self._variance[bbox] = value.variance
185 def __str__(self) -> str:
186 return f"MaskedImage({self.image!s}, {list(self.mask.schema.names)})"
188 def __repr__(self) -> str:
189 return f"MaskedImage({self.image!r}, mask_schema={self.mask.schema!r})"
191 def copy(self) -> MaskedImage:
192 """Deep-copy the masked image and metadata."""
193 return self._transfer_metadata(
194 MaskedImage(image=self._image.copy(), mask=self._mask.copy(), variance=self._variance.copy()),
195 copy=True,
196 )
198 def serialize(self, archive: OutputArchive[Any]) -> MaskedImageSerializationModel:
199 """Serialize the masked image to an output archive.
201 Parameters
202 ----------
203 archive
204 Archive to write to.
205 """
206 serialized_image = archive.serialize_direct(
207 "image", functools.partial(self.image.serialize, save_projection=False, save_obs_info=False)
208 )
209 serialized_mask = archive.serialize_direct(
210 "mask", functools.partial(self.mask.serialize, save_projection=False, save_obs_info=False)
211 )
212 serialized_variance = archive.serialize_direct(
213 "variance", functools.partial(self.variance.serialize, save_projection=False, save_obs_info=False)
214 )
215 serialized_projection = (
216 archive.serialize_direct("projection", self.projection.serialize)
217 if self.projection is not None
218 else None
219 )
220 return MaskedImageSerializationModel(
221 image=serialized_image,
222 mask=serialized_mask,
223 variance=serialized_variance,
224 projection=serialized_projection,
225 obs_info=self.obs_info,
226 metadata=self.metadata,
227 )
229 @staticmethod
230 def deserialize(
231 model: MaskedImageSerializationModel[Any], archive: InputArchive[Any], *, bbox: Box | None = None
232 ) -> MaskedImage:
233 """Deserialize an image from an input archive.
235 Parameters
236 ----------
237 model
238 A Pydantic model representation of the image, holding references
239 to data stored in the archive.
240 archive
241 Archive to read from.
242 bbox
243 Bounding box of a subimage to read instead.
244 """
245 image = Image.deserialize(model.image, archive, bbox=bbox)
246 mask = Mask.deserialize(model.mask, archive, bbox=bbox)
247 variance = Image.deserialize(model.variance, archive, bbox=bbox)
248 projection = (
249 Projection.deserialize(model.projection, archive) if model.projection is not None else None
250 )
251 return MaskedImage(
252 image, mask=mask, variance=variance, projection=projection, obs_info=model.obs_info
253 )._finish_deserialize(model)
255 @staticmethod
256 def _get_archive_tree_type[P: pydantic.BaseModel](
257 pointer_type: type[P],
258 ) -> type[MaskedImageSerializationModel[P]]:
259 """Return the serialization model type for this object for an archive
260 type that uses the given pointer type.
261 """
262 return MaskedImageSerializationModel[pointer_type] # type: ignore
264 def write_fits(
265 self,
266 filename: str,
267 *,
268 image_compression: fits.FitsCompressionOptions | None = fits.FitsCompressionOptions.DEFAULT,
269 mask_compression: fits.FitsCompressionOptions | None = fits.FitsCompressionOptions.DEFAULT,
270 variance_compression: fits.FitsCompressionOptions | None = fits.FitsCompressionOptions.DEFAULT,
271 compression_seed: int | None = None,
272 ) -> None:
273 """Write the image to a FITS file.
275 Parameters
276 ----------
277 filename
278 Name of the file to write to. Must be a local file.
279 image_compression
280 Compression options for the `image` plane.
281 mask_compression
282 Compression options for the `mask` plane.
283 variance_compression
284 Compression options for the `variance` plane.
285 compression_seed
286 A FITS tile compression seed to use whenever the configured
287 compression seed is `None` or (for backwards compatibility) ``0``.
288 This value is then incremented every time it is used.
289 """
290 compression_options = {}
291 if image_compression is not fits.FitsCompressionOptions.DEFAULT:
292 compression_options["image"] = image_compression
293 if mask_compression is not fits.FitsCompressionOptions.DEFAULT:
294 compression_options["mask"] = mask_compression
295 if variance_compression is not fits.FitsCompressionOptions.DEFAULT:
296 compression_options["variance"] = variance_compression
297 fits.write(self, filename, compression_options=compression_options, compression_seed=compression_seed)
299 @classmethod
300 def read_fits(cls, url: ResourcePathExpression, *, bbox: Box | None = None) -> MaskedImage:
301 """Read an image from a FITS file.
303 Parameters
304 ----------
305 url
306 URL of the file to read; may be any type supported by
307 `lsst.resources.ResourcePath`.
308 bbox
309 Bounding box of a subimage to read instead.
310 """
311 return fits.read(cls, url, bbox=bbox).deserialized
313 @staticmethod
314 def from_legacy(
315 legacy: Any,
316 *,
317 unit: astropy.units.Unit | None = None,
318 plane_map: Mapping[str, MaskPlane] | None = None,
319 ) -> MaskedImage:
320 """Convert from an `lsst.afw.image.MaskedImage` instance.
322 Parameters
323 ----------
324 legacy
325 An `lsst.afw.image.MaskedImage` instance that will share image and
326 variance (but not mask) pixel data with the returned object.
327 unit
328 Units of the image.
329 plane_map
330 A mapping from legacy mask plane name to the new plane name and
331 description.
332 """
333 return MaskedImage(
334 image=Image.from_legacy(legacy.getImage(), unit),
335 mask=Mask.from_legacy(legacy.getMask(), plane_map),
336 variance=Image.from_legacy(legacy.getVariance()),
337 )
339 def to_legacy(self, *, copy: bool | None = None, plane_map: Mapping[str, MaskPlane] | None = None) -> Any:
340 """Convert to an `lsst.afw.image.MaskedImage` instance.
342 Parameters
343 ----------
344 copy
345 If `True`, always copy the image and variance pixel data.
346 If `False`, return a view, and raise `TypeError` if the pixel data
347 is read-only (this is not supported by afw). If `None`, onyl if
348 the pixel data is read-only. Mask pixel data is always copied.
349 plane_map
350 A mapping from legacy mask plane name to the new plane name and
351 description.
352 """
353 import lsst.afw.image
355 return lsst.afw.image.MaskedImage(
356 self.image.to_legacy(copy=copy),
357 mask=self.mask.to_legacy(plane_map),
358 variance=self.variance.to_legacy(copy=copy),
359 dtype=self.image.array.dtype,
360 )
362 @overload
363 @staticmethod
364 def read_legacy( 364 ↛ exitline 364 didn't return from function 'read_legacy' because
365 uri: ResourcePathExpression,
366 *,
367 preserve_quantization: bool = False,
368 component: Literal["image"],
369 fits_wcs_frame: Frame | None = None,
370 ) -> Image: ...
372 @overload
373 @staticmethod
374 def read_legacy( 374 ↛ exitline 374 didn't return from function 'read_legacy' because
375 uri: ResourcePathExpression,
376 *,
377 plane_map: Mapping[str, MaskPlane] | None = None,
378 component: Literal["mask"],
379 fits_wcs_frame: Frame | None = None,
380 ) -> Mask: ...
382 @overload
383 @staticmethod
384 def read_legacy( 384 ↛ exitline 384 didn't return from function 'read_legacy' because
385 uri: ResourcePathExpression,
386 *,
387 preserve_quantization: bool = False,
388 component: Literal["variance"],
389 fits_wcs_frame: Frame | None = None,
390 ) -> Image: ...
392 @overload
393 @staticmethod
394 def read_legacy( 394 ↛ exitline 394 didn't return from function 'read_legacy' because
395 uri: ResourcePathExpression,
396 *,
397 preserve_quantization: bool = False,
398 plane_map: Mapping[str, MaskPlane] | None = None,
399 component: None = None,
400 fits_wcs_frame: Frame | None = None,
401 ) -> MaskedImage: ...
403 @staticmethod
404 def read_legacy(
405 uri: ResourcePathExpression,
406 *,
407 preserve_quantization: bool = False,
408 plane_map: Mapping[str, MaskPlane] | None = None,
409 component: Literal["image", "mask", "variance"] | None = None,
410 fits_wcs_frame: Frame | None = None,
411 ) -> Any:
412 """Read a FITS file written by `lsst.afw.image.MaskedImage.writeFits`.
414 Parameters
415 ----------
416 uri
417 URI or file name.
418 preserve_quantization
419 If `True`, ensure that writing the masked image back out again will
420 exactly preserve quantization-compressed pixel values. This causes
421 the image and variance plane arrays to be marked as read-only and
422 stores the original binary table data for those planes in memory.
423 If the `MaskedImage` is copied, the precompressed pixel values are
424 not transferred to the copy.
425 plane_map
426 A mapping from legacy mask plane name to the new plane name and
427 description.
428 component
429 A component to read instead of the full image.
430 fits_wcs_frame
431 If not `None` and the HDU containing the image plane has a FITS
432 WCS, attach a `Projection` to the returned masked image by
433 converting that WCS. When ``component`` is one of ``"image"``,
434 ``"mask"``, or ``"variance"``, a FITS WCS from the component HDU
435 is used instead (all three should have the same WCS).
436 """
437 fs, fspath = ResourcePath(uri).to_fsspec()
438 with fs.open(fspath) as stream, astropy.io.fits.open(stream) as hdu_list:
439 return MaskedImage._read_legacy_hdus(
440 hdu_list,
441 uri,
442 preserve_quantization=preserve_quantization,
443 plane_map=plane_map,
444 component=component,
445 fits_wcs_frame=fits_wcs_frame,
446 )
448 @staticmethod
449 def _read_legacy_hdus(
450 hdu_list: astropy.io.fits.HDUList,
451 uri: ResourcePathExpression,
452 *,
453 preserve_quantization: bool = False,
454 plane_map: Mapping[str, MaskPlane] | None = None,
455 component: Literal["image", "mask", "variance"] | None,
456 fits_wcs_frame: Frame | None = None,
457 ) -> Any:
458 opaque_metadata = fits.FitsOpaqueMetadata()
459 opaque_metadata.extract_legacy_primary_header(hdu_list[0].header)
460 image_bintable_hdu: astropy.io.fits.BinTableHDU | None = None
461 variance_bintable_hdu: astropy.io.fits.BinTableHDU | None = None
462 result: Any
463 with ExitStack() as exit_stack:
464 if preserve_quantization:
465 fs, fspath = ResourcePath(uri).to_fsspec()
466 bintable_stream = exit_stack.enter_context(fs.open(fspath))
467 bintable_hdu_list = exit_stack.enter_context(
468 astropy.io.fits.open(bintable_stream, disable_image_compression=True)
469 )
470 image_bintable_hdu = bintable_hdu_list[1]
471 variance_bintable_hdu = bintable_hdu_list[3]
472 if component is None or component == "image":
473 image = Image._read_legacy_hdu(
474 hdu_list[1],
475 opaque_metadata,
476 preserve_bintable=image_bintable_hdu,
477 fits_wcs_frame=fits_wcs_frame,
478 )
479 if component == "image":
480 result = image
481 if component is None or component == "mask":
482 mask = Mask._read_legacy_hdu(
483 hdu_list[2],
484 opaque_metadata,
485 plane_map=plane_map,
486 fits_wcs_frame=fits_wcs_frame if component is not None else None,
487 )
488 if component == "mask":
489 result = mask
490 if component is None or component == "variance":
491 variance = Image._read_legacy_hdu(
492 hdu_list[3],
493 opaque_metadata,
494 preserve_bintable=variance_bintable_hdu,
495 fits_wcs_frame=fits_wcs_frame if component is not None else None,
496 )
497 if component == "variance":
498 result = variance
499 if component is None:
500 result = MaskedImage(image, mask=mask, variance=variance)
501 result._opaque_metadata = opaque_metadata
502 return result
505class MaskedImageSerializationModel[P: pydantic.BaseModel](ArchiveTree):
506 """A Pydantic model used to represent a serialized `MaskedImage`."""
508 image: ImageSerializationModel[P] = pydantic.Field(description="The main data image.")
509 mask: MaskSerializationModel[P] = pydantic.Field(
510 description="Bitmask that annotates the main image's pixels."
511 )
512 variance: ImageSerializationModel[P] = pydantic.Field(
513 description="Per-pixel variance estimates for the main image."
514 )
515 projection: ProjectionSerializationModel[P] | None = pydantic.Field(
516 default=None,
517 exclude_if=is_none,
518 description="Projection that maps the pixel grid to the sky.",
519 )
520 obs_info: ObservationInfo | None = pydantic.Field(
521 default=None,
522 exclude_if=is_none,
523 description="Standardized description of image metadata",
524 )
526 @property
527 def bbox(self) -> Box:
528 """The bounding box of the image."""
529 return self.image.bbox