Coverage for python / lsst / images / _mask.py: 24%
371 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:36 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:36 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14__all__ = (
15 "Mask",
16 "MaskPlane",
17 "MaskPlaneBit",
18 "MaskSchema",
19 "MaskSerializationModel",
20 "get_legacy_deep_coadd_mask_planes",
21 "get_legacy_visit_image_mask_planes",
22)
24import dataclasses
25import math
26from collections.abc import Callable, Iterable, Iterator, Mapping, Sequence, Set
27from types import EllipsisType
28from typing import Any, ClassVar, cast
30import astropy.io.fits
31import astropy.wcs
32import numpy as np
33import numpy.typing as npt
34import pydantic
35from astro_metadata_translator import ObservationInfo
37from lsst.resources import ResourcePath, ResourcePathExpression
39from . import fits
40from ._generalized_image import GeneralizedImage
41from ._geom import YX, Box, NoOverlapError
42from ._transforms import Frame, Projection, ProjectionSerializationModel
43from .serialization import (
44 ArchiveReadError,
45 ArchiveTree,
46 ArrayReferenceModel,
47 InlineArrayModel,
48 InputArchive,
49 IntegerType,
50 MetadataValue,
51 NumberType,
52 OutputArchive,
53 is_integer,
54 no_header_updates,
55)
56from .utils import is_none
59@dataclasses.dataclass(frozen=True)
60class MaskPlane:
61 """Name and description of a single plane in a mask array."""
63 name: str
64 """Unique name for the mask plane (`str`)."""
66 description: str
67 """Human-readable documentation for the mask plane (`str`)."""
69 @classmethod
70 def read_legacy(cls, header: astropy.io.fits.Header) -> dict[str, int]:
71 """Read mask plane descriptions written by
72 `lsst.afw.image.Mask.writeFits`.
74 Parameters
75 ----------
76 header
77 FITS header.
79 Returns
80 -------
81 `dict` [`str`, `int`]
82 A dictionary mapping mask plane name to integer bit index.
83 """
84 result: dict[str, int] = {}
85 for card in list(header.cards):
86 if card.keyword.startswith("MP_"):
87 result[card.keyword.removeprefix("MP_")] = card.value
88 del header[card.keyword]
89 return result
92@dataclasses.dataclass(frozen=True)
93class MaskPlaneBit:
94 """The nested array index and mask value associated with a single mask
95 plane.
96 """
98 index: int
99 """Index into the last dimension of the mask array where this plane's bit
100 is stored.
101 """
103 mask: np.integer
104 """Bitmask that selects just this plane's bit from a mask array value
105 (`numpy.integer`).
106 """
108 @classmethod
109 def compute(cls, overall_index: int, stride: int, mask_type: type[np.integer]) -> MaskPlaneBit:
110 """Construct a `MaskPlaneBit` from the overall index of a plane in a
111 `MaskSchema` and the stride (number of bits per mask array element).
112 """
113 index, bit = divmod(overall_index, stride)
114 return cls(index, mask_type(1 << bit))
117class MaskSchema:
118 """A schema for a bit-packed mask array.
120 Parameters
121 ----------
122 planes
123 Iterable of `MaskPlane` instances that define the schema. `None`
124 values may be included to reserve bits for future use.
125 dtype
126 The numpy data type of the mask arrays that use this schema.
128 Notes
129 -----
130 A `MaskSchema` is a collection of mask planes, which each correspond to a
131 single bit in a mask array. Mask schemas are immutable and associated with
132 a particular array data type, allowing them to safely precompute the index
133 and bitmask for each plane.
135 `MaskSchema` indexing is by integer (the overall index of a plane in the
136 schema). The `descriptions` attribute may be indexed by plane name to get
137 the description for that plane, and the `bitmask` method can be used to
138 obtain an array that can be used to select one or more planes by name in
139 a mask array that uses this schema.
141 If no mask planes are provided, a `None` placeholder is automatically
142 added.
143 """
145 def __init__(self, planes: Iterable[MaskPlane | None], dtype: npt.DTypeLike = np.uint8):
146 self._planes: tuple[MaskPlane | None, ...] = tuple(planes) or (None,)
147 self._dtype = cast(np.dtype[np.integer], np.dtype(dtype))
148 stride = self.bits_per_element(self._dtype)
149 self._descriptions = {plane.name: plane.description for plane in self._planes if plane is not None}
150 self._mask_size = math.ceil(len(self._planes) / stride)
151 self._bits: dict[str, MaskPlaneBit] = {
152 plane.name: MaskPlaneBit.compute(n, stride, self._dtype.type)
153 for n, plane in enumerate(self._planes)
154 if plane is not None
155 }
157 @staticmethod
158 def bits_per_element(dtype: npt.DTypeLike) -> int:
159 """Return the number of mask bits per array element for the given
160 data type.
161 """
162 dtype = np.dtype(dtype)
163 match dtype.kind:
164 case "u":
165 return dtype.itemsize * 8
166 case "i":
167 return dtype.itemsize * 8 - 1
168 case _:
169 raise TypeError(f"dtype for masks must be an integer; got {dtype} with kind={dtype.kind}.")
171 def __iter__(self) -> Iterator[MaskPlane | None]:
172 return iter(self._planes)
174 def __len__(self) -> int:
175 return len(self._planes)
177 def __getitem__(self, i: int) -> MaskPlane | None:
178 return self._planes[i]
180 def __repr__(self) -> str:
181 return f"MaskSchema({list(self._planes)}, dtype={self._dtype!r})"
183 def __str__(self) -> str:
184 return "\n".join(
185 [
186 f"{name} [{bit.index}@{hex(bit.mask)}]: {self._descriptions[name]}"
187 for name, bit in self._bits.items()
188 ]
189 )
191 def __eq__(self, other: object) -> bool:
192 if isinstance(other, MaskSchema):
193 return self._planes == other._planes and self._dtype == other._dtype
194 return False
196 @property
197 def dtype(self) -> np.dtype:
198 """The numpy data type of the mask arrays that use this schema."""
199 return self._dtype
201 @property
202 def mask_size(self) -> int:
203 """The number of elements in the last dimension of any mask array that
204 uses this schema.
205 """
206 return self._mask_size
208 @property
209 def names(self) -> Set[str]:
210 """The names of the mask planes, in bit order."""
211 return self._bits.keys()
213 @property
214 def descriptions(self) -> Mapping[str, str]:
215 """A mapping from plane name to description."""
216 return self._descriptions
218 def bit(self, plane: str) -> MaskPlaneBit:
219 """Return the last array index and mask for the given mask plane."""
220 return self._bits[plane]
222 def bitmask(self, *planes: str) -> np.ndarray:
223 """Return a 1-d mask array that represents the union (i.e. bitwise OR)
224 of the planes with the given names.
226 Parameters
227 ----------
228 *planes
229 Mask plane names.
231 Returns
232 -------
233 numpy.ndarray
234 A 1-d array with shape ``(mask_size,)``.
235 """
236 result = np.zeros(self.mask_size, dtype=self._dtype)
237 for plane in planes:
238 bit = self._bits[plane]
239 result[bit.index] |= bit.mask
240 return result
242 def split(self, dtype: npt.DTypeLike) -> list[MaskSchema]:
243 """Split the schema into an equivalent series of schemas that each
244 have a `mask_size` of ``1``, dropping all `None` placeholders.
246 Parameters
247 ----------
248 dtype
249 Data type of the new mask pixels.
251 Returns
252 -------
253 `list` [`MaskSchema`]
254 A list of mask schemas that together include all planes in
255 ``self`` and have `mask_size` equal to ``1``. If there are no
256 mask planes (only `None` placeholders) in ``self``, a single mask
257 schema with a `None` placeholder is returned; otherwise `None`
258 placeholders are returned.
259 """
260 dtype = np.dtype(dtype)
261 planes: list[MaskPlane] = []
262 schemas: list[MaskSchema] = []
263 n_planes_per_schema = self.bits_per_element(dtype)
264 for plane in self._planes:
265 if plane is not None:
266 planes.append(plane)
267 if len(planes) == n_planes_per_schema:
268 schemas.append(MaskSchema(planes, dtype=dtype))
269 planes.clear()
270 if planes:
271 schemas.append(MaskSchema(planes, dtype=dtype))
272 if not schemas:
273 schemas.append(MaskSchema([None], dtype=dtype))
274 return schemas
276 def update_header(self, header: astropy.io.fits.Header) -> None:
277 """Add a description of this mask schema to a FITS header."""
278 for n, plane in enumerate(self):
279 if plane is not None:
280 bit = self.bit(plane.name)
281 if bit.index != 0:
282 raise TypeError("Only mask schemas with mask_size==1 can be described in FITS.")
283 header.set(f"MSKN{n:04d}", plane.name, f"Name for mask plane {n}.")
284 header.set(f"MSKM{n:04d}", bit.mask, f"Bitmask for plane n={n}; always 1<<n.")
285 # We don't add a comment to the description card, because it's
286 # likely to overrun a single card and get the CONTINUE
287 # treatment. That will cause Astropy to warn about the comment
288 # being truncated and that's worse than just leaving it
289 # unexplained; it's pretty obvious from context what it is.
290 header.set(f"MSKD{n:04d}", plane.description)
292 def strip_header(self, header: astropy.io.fits.Header) -> None:
293 """Remove all header cards added by `update_header`."""
294 for n, plane in enumerate(self):
295 if plane is not None:
296 header.remove(f"MSKN{n:04d}", ignore_missing=True)
297 header.remove(f"MSKM{n:04d}", ignore_missing=True)
298 header.remove(f"MSKD{n:04d}", ignore_missing=True)
301class Mask(GeneralizedImage):
302 """A 2-d bitmask image backed by a 3-d byte array.
304 Parameters
305 ----------
306 array_or_fill
307 Array or fill value for the mask. If a fill value, ``bbox`` or
308 ``shape`` must be provided.
309 schema
310 Schema that defines the planes and their bit assignments.
311 bbox
312 Bounding box for the mask. This sets the shape of the first two
313 dimensions of the array.
314 start
315 Logical coordinates of the first pixel in the array, ordered ``y``,
316 ``x`` (unless an `XY` instance is passed). Ignored if
317 ``bbox`` is provided. Defaults to zeros.
318 shape
319 Leading dimensions of the array, ordered ``y``, ``x`` (unless an `XY`
320 instance is passed). Only needed if ``array_or_fill`` is not an
321 array and ``bbox`` is not provided. Like the bbox, this does not
322 include the last dimension of the array.
323 projection
324 Projection that maps the pixel grid to the sky.
325 obs_info
326 General information about the associated observation in standardized
327 form.
328 metadata
329 Arbitrary flexible metadata to associate with the mask.
331 Notes
332 -----
333 Indexing the `array` attribute of a `Mask` does not take into account its
334 ``start`` offset, but accessing a subimage mask by indexing a `Mask` with
335 a `Box` does, and the `bbox` of the subimage is set to match its location
336 within the original mask.
338 A mask's ``bbox`` corresponds to the leading dimensions of its backing
339 `numpy.ndarray`, while the last dimension's size is always equal to the
340 `~MaskSchema.mask_size` of its schema, since a schema can in general
341 require multiple array elements to represent all of its planes.
342 """
344 def __init__(
345 self,
346 array_or_fill: np.ndarray | int = 0,
347 /,
348 *,
349 schema: MaskSchema,
350 bbox: Box | None = None,
351 start: Sequence[int] | None = None,
352 shape: Sequence[int] | None = None,
353 projection: Projection | None = None,
354 obs_info: ObservationInfo | None = None,
355 metadata: dict[str, MetadataValue] | None = None,
356 ):
357 super().__init__(metadata)
358 if shape is not None:
359 shape = tuple(shape)
360 if start is not None:
361 start = tuple(start)
362 if isinstance(array_or_fill, np.ndarray):
363 array = np.array(array_or_fill, dtype=schema.dtype)
364 if array.ndim != 3:
365 raise ValueError("Mask array must be 3-d.")
366 if bbox is None:
367 bbox = Box.from_shape(array.shape[:-1], start=start)
368 elif bbox.shape + (schema.mask_size,) != array.shape:
369 raise ValueError(
370 f"Explicit bbox shape {bbox.shape} and schema of size {schema.mask_size} do not "
371 f"match array with shape {array.shape}."
372 )
373 if shape is not None and shape + (schema.mask_size,) != array.shape:
374 raise ValueError(
375 f"Explicit shape {shape} and schema of size {schema.mask_size} do "
376 f"not match array with shape {array.shape}."
377 )
379 else:
380 if bbox is None:
381 if shape is None:
382 raise TypeError("No bbox, size, or array provided.")
383 bbox = Box.from_shape(shape, start=start)
384 array = np.full(bbox.shape + (schema.mask_size,), array_or_fill, dtype=schema.dtype)
385 self._array = array
386 self._bbox: Box = bbox
387 self._schema: MaskSchema = schema
388 self._projection = projection
389 self._obs_info = obs_info
391 @property
392 def array(self) -> np.ndarray:
393 """The low-level array (`numpy.ndarray`).
395 Assigning to this attribute modifies the existing array in place; the
396 bounding box and underlying data pointer are never changed.
397 """
398 return self._array
400 @array.setter
401 def array(self, value: np.ndarray | int) -> None:
402 self._array[:, :] = value
404 @property
405 def schema(self) -> MaskSchema:
406 """Schema that defines the planes and their bit assignments
407 (`MaskSchema`).
408 """
409 return self._schema
411 @property
412 def bbox(self) -> Box:
413 """2-d bounding box of the mask (`Box`).
415 This sets the shape of the first two dimensions of the array.
416 """
417 return self._bbox
419 @property
420 def projection(self) -> Projection[Any] | None:
421 """The projection that maps this mask's pixel grid to the sky
422 (`Projection` | `None`).
424 Notes
425 -----
426 The pixel coordinates used by this projection account for the bounding
427 box ``start``; they are not just array indices.
428 """
429 return self._projection
431 @property
432 def obs_info(self) -> ObservationInfo | None:
433 """General information about the associated observation in standard
434 form. (`~astro_metadata_translator.ObservationInfo` | `None`).
435 """
436 return self._obs_info
438 def __getitem__(self, bbox: Box | EllipsisType) -> Mask:
439 if bbox is ...:
440 return self
441 super().__getitem__(bbox)
442 return self._transfer_metadata(
443 Mask(
444 self.array[bbox.y.slice_within(self._bbox.y), bbox.x.slice_within(self._bbox.x), :],
445 bbox=bbox,
446 schema=self.schema,
447 ),
448 bbox=bbox,
449 )
451 def __setitem__(self, bbox: Box | EllipsisType, value: Mask) -> None:
452 subview = self[bbox]
453 subview.clear()
454 subview.update(value)
456 def __str__(self) -> str:
457 return f"Mask({self.bbox!s}, {list(self.schema.names)})"
459 def __repr__(self) -> str:
460 return f"Mask(..., bbox={self.bbox!r}, schema={self.schema!r})"
462 def __eq__(self, other: object) -> bool:
463 if not isinstance(other, Mask):
464 return NotImplemented
465 return (
466 self._bbox == other._bbox
467 and self._schema == other._schema
468 and np.array_equal(self._array, other._array, equal_nan=True)
469 )
471 def copy(self) -> Mask:
472 """Deep-copy the mask and metadata."""
473 return self._transfer_metadata(
474 Mask(
475 self._array.copy(),
476 bbox=self._bbox,
477 schema=self._schema,
478 projection=self._projection,
479 obs_info=self._obs_info,
480 ),
481 copy=True,
482 )
484 def view(
485 self,
486 *,
487 schema: MaskSchema | EllipsisType = ...,
488 projection: Projection | None | EllipsisType = ...,
489 start: Sequence[int] | EllipsisType = ...,
490 obs_info: ObservationInfo | None | EllipsisType = ...,
491 ) -> Mask:
492 """Make a view of the mask, with optional updates.
494 Notes
495 -----
496 This can only be used to make changes to schema descriptions; plane
497 names must remain the same (in the same order).
498 """
499 if schema is ...:
500 schema = self._schema
501 else:
502 if list(schema.names) != list(self.schema.names):
503 raise ValueError("Cannot create a mask view with a schema with different names.")
504 if projection is ...:
505 projection = self._projection
506 if start is ...:
507 start = self._bbox.start
508 if obs_info is ...:
509 obs_info = self._obs_info
510 return self._transfer_metadata(
511 Mask(self._array, start=start, schema=schema, projection=projection, obs_info=obs_info)
512 )
514 def update(self, other: Mask) -> None:
515 """Update ``self`` to include all common mask values set in ``other``.
517 Notes
518 -----
519 This only operates on the intersection of the two mask bounding boxes
520 and the mask planes that are present in both. Mask bits are only set,
521 not cleared (i.e. this uses ``|=`` updates, not ``=`` assignments).
522 """
523 lhs = self
524 rhs = other
525 if other.bbox != self.bbox:
526 try:
527 bbox = self.bbox.intersection(other.bbox)
528 except NoOverlapError:
529 return
530 lhs = self[bbox]
531 rhs = other[bbox]
532 for name in self.schema.names & other.schema.names:
533 lhs.set(name, rhs.get(name))
535 def get(self, plane: str) -> np.ndarray:
536 """Return a 2-d boolean array for the given mask plane.
538 Parameters
539 ----------
540 plane
541 Name of the mask plane.
543 Returns
544 -------
545 numpy.ndarray
546 A 2-d boolean array with the same shape as `bbox` that is `True`
547 where the bit for ``plane`` is set and `False` elsewhere.
548 """
549 bit = self.schema.bit(plane)
550 return (self._array[..., bit.index] & bit.mask).astype(bool)
552 def set(self, plane: str, boolean_mask: np.ndarray | EllipsisType = ...) -> None:
553 """Set a mask plane.
555 Parameters
556 ----------
557 plane
558 Name of the mask plane to set
559 boolean_mask
560 A 2-d boolean array with the same shape as `bbox` that is `True`
561 where the bit for ``plane`` should be set and `False` where it
562 should be left unchanged (*not* set to zero). May be ``...`` to
563 set the bit everywhere.
564 """
565 bit = self.schema.bit(plane)
566 if boolean_mask is not ...:
567 boolean_mask = boolean_mask.astype(bool)
568 self._array[boolean_mask, bit.index] |= bit.mask
570 def clear(self, plane: str | None = None, boolean_mask: np.ndarray | EllipsisType = ...) -> None:
571 """Clear one or more mask planes.
573 Parameters
574 ----------
575 plane
576 Name of the mask plane to set. If `None` all mask planes are
577 cleared.
578 boolean_mask
579 A 2-d boolean array with the same shape as `bbox` that is `True`
580 where the bit for ``plane`` should be cleared and `False` where it
581 should be left unchanged. May be ``...`` to clear the bit
582 everywhere.
583 """
584 if boolean_mask is not ...:
585 boolean_mask = boolean_mask.astype(bool)
586 if plane is None:
587 self._array[boolean_mask, :] = 0
588 else:
589 bit = self.schema.bit(plane)
590 self._array[boolean_mask, bit.index] &= ~bit.mask
592 def serialize[P: pydantic.BaseModel](
593 self,
594 archive: OutputArchive[P],
595 *,
596 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
597 save_projection: bool = True,
598 save_obs_info: bool = True,
599 add_offset_wcs: str | None = "A",
600 ) -> MaskSerializationModel[P]:
601 """Serialize the mask to an output archive.
603 Parameters
604 ----------
605 archive
606 Archive to write to.
607 update_header
608 A callback that will be given the FITS header for the HDU
609 containing this mask in order to add keys to it. This callback
610 may be provided but will not be called if the output format is not
611 FITS. As multiple HDUs may be added, this function may be called
612 multiple times.
613 save_projection
614 If `True`, save the `Projection` attached to the image, if there
615 is one. This does not affect whether a FITS WCS corresponding to
616 the projection is written (it always is, if available, and if
617 ``add_offset_wcs`` is not ``" "``).
618 save_obs_info
619 If `True`, save the
620 `~astro_metadata_translator.ObservationInfo` attached to the
621 image, if there is one.
622 add_offset_wcs
623 A FITS WCS single-character suffix to use when adding a linear
624 WCS that maps the FITS array to the logical pixel coordinates
625 defined by ``bbox.start``. Set to `None` to not write this WCS.
626 If this is set to ``" "``, it will prevent the `Projection` from
627 being saved as a FITS WCS.
628 """
629 data: list[ArrayReferenceModel | InlineArrayModel] = []
630 for schema_2d in self.schema.split(np.int32):
631 mask_2d = Mask(
632 0, bbox=self.bbox, schema=schema_2d, projection=self._projection, obs_info=self._obs_info
633 )
634 mask_2d.update(self)
635 data.append(
636 mask_2d._serialize_2d(archive, update_header=update_header, add_offset_wcs=add_offset_wcs)
637 )
638 serialized_projection: ProjectionSerializationModel[P] | None = None
639 if save_projection and self.projection is not None:
640 serialized_projection = archive.serialize_direct("projection", self.projection.serialize)
641 serialized_dtype = NumberType.from_numpy(self.schema.dtype)
642 assert is_integer(serialized_dtype), "Mask dtypes should always be integers."
643 return MaskSerializationModel.model_construct(
644 data=data,
645 start=list(self.bbox.start),
646 planes=list(self.schema),
647 dtype=serialized_dtype,
648 projection=serialized_projection,
649 obs_info=self._obs_info if save_obs_info else None,
650 metadata=self.metadata,
651 )
653 def _serialize_2d[P: pydantic.BaseModel](
654 self,
655 archive: OutputArchive[P],
656 *,
657 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
658 add_offset_wcs: str | None = "A",
659 ) -> ArrayReferenceModel | InlineArrayModel:
660 def _update_header(header: astropy.io.fits.Header) -> None:
661 update_header(header)
662 self.schema.update_header(header)
663 if self.projection is not None and add_offset_wcs != " ":
664 if self.fits_wcs:
665 header.update(self.fits_wcs.to_header(relax=True))
666 if add_offset_wcs is not None:
667 fits.add_offset_wcs(header, x=self.bbox.x.start, y=self.bbox.y.start, key=add_offset_wcs)
669 assert self.array.shape[2] == 1, "Mask should be split before calling this method."
670 return archive.add_array(self._array[:, :, 0], update_header=_update_header)
672 @classmethod
673 def deserialize(
674 cls,
675 model: MaskSerializationModel[Any],
676 archive: InputArchive[Any],
677 *,
678 bbox: Box | None = None,
679 strip_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
680 ) -> Mask:
681 """Deserialize a mask from an input archive.
683 Parameters
684 ----------
685 model
686 A Pydantic model representation of the mask, holding references
687 to data stored in the archive.
688 archive
689 Archive to read from.
690 bbox
691 Bounding box of a subimage to read instead.
692 strip_header
693 A callable that strips out any FITS header cards added by the
694 ``update_header`` argument in the corresponding call to
695 `serialize`.
696 """
697 slices: tuple[slice, ...] | EllipsisType = ...
698 if bbox is not None:
699 slices = bbox.slice_within(model.bbox)
700 else:
701 bbox = model.bbox
702 if not is_integer(model.dtype):
703 raise ArchiveReadError(f"Mask array has a non-integer dtype: {model.dtype}.")
704 schema = MaskSchema(model.planes, dtype=model.dtype.to_numpy())
705 projection = (
706 Projection.deserialize(model.projection, archive) if model.projection is not None else None
707 )
708 result = Mask(
709 0,
710 schema=schema,
711 bbox=bbox,
712 projection=projection,
713 obs_info=model.obs_info,
714 )
715 schemas_2d = schema.split(np.int32)
716 if len(schemas_2d) != len(model.data):
717 raise ArchiveReadError(
718 f"Number of mask arrays ({len(model.data)}) does not match expectation ({len(schemas_2d)})."
719 )
720 for array_model, schema_2d in zip(model.data, schemas_2d):
721 mask_2d = cls._deserialize_2d(
722 array_model, schema_2d, bbox.start, archive, strip_header=strip_header, slices=slices
723 )
724 result.update(mask_2d)
725 return result._finish_deserialize(model)
727 @classmethod
728 def _deserialize_2d(
729 cls,
730 ref: ArrayReferenceModel | InlineArrayModel,
731 schema_2d: MaskSchema,
732 start: Sequence[int],
733 archive: InputArchive[Any],
734 *,
735 slices: tuple[slice, ...] | EllipsisType = ...,
736 strip_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
737 ) -> Mask:
738 def _strip_header(header: astropy.io.fits.Header) -> None:
739 strip_header(header)
740 schema_2d.strip_header(header)
741 fits.strip_wcs_cards(header)
743 array_2d = archive.get_array(ref, strip_header=_strip_header, slices=slices)
744 return Mask(array_2d[:, :, np.newaxis], schema=schema_2d, start=start)
746 @staticmethod
747 def _get_archive_tree_type[P: pydantic.BaseModel](
748 pointer_type: type[P],
749 ) -> type[MaskSerializationModel[P]]:
750 """Return the serialization model type for this object for an archive
751 type that uses the given pointer type.
752 """
753 return MaskSerializationModel[pointer_type] # type: ignore
755 _archive_default_name: ClassVar[str] = "mask"
756 """The name this object should be serialized with when written as the
757 top-level object.
758 """
760 def write_fits(
761 self,
762 filename: str,
763 *,
764 compression: fits.FitsCompressionOptions | None = fits.FitsCompressionOptions.DEFAULT,
765 ) -> None:
766 """Write the mask to a FITS file.
768 Parameters
769 ----------
770 filename
771 Name of the file to write to. Must be a local file.
772 compression
773 Compression options.
774 """
775 compression_options = {}
776 if compression is not fits.FitsCompressionOptions.DEFAULT:
777 compression_options[self._archive_default_name] = compression
778 fits.write(self, filename, compression_options)
780 @staticmethod
781 def read_fits(url: ResourcePathExpression, *, bbox: Box | None = None) -> Mask:
782 """Read an image from a FITS file.
784 Parameters
785 ----------
786 url
787 URL of the file to read; may be any type supported by
788 `lsst.resources.ResourcePath`.
789 bbox
790 Bounding box of a subimage to read instead.
791 """
792 return fits.read(Mask, url, bbox=bbox).deserialized
794 @staticmethod
795 def from_legacy(
796 legacy: Any,
797 plane_map: Mapping[str, MaskPlane] | None = None,
798 ) -> Mask:
799 """Convert from an `lsst.afw.image.Mask` instance.
801 Parameters
802 ----------
803 legacy
804 An `lsst.afw.image.Mask` instance. This will not share pixel
805 data with the new object.
806 plane_map
807 A mapping from legacy mask plane name to the new plane name and
808 description.
809 """
810 return Mask._from_legacy_array(
811 legacy.array,
812 legacy.getMaskPlaneDict(),
813 start=YX(y=legacy.getY0(), x=legacy.getX0()),
814 plane_map=plane_map,
815 )
817 def to_legacy(self, plane_map: Mapping[str, MaskPlane] | None = None) -> Any:
818 """Convert to an `lsst.afw.image.Mask` instance.
820 The pixel data will not be shared between the two objects.
822 Parameters
823 ----------
824 plane_map
825 A mapping from legacy mask plane name to the new plane name and
826 description.
827 """
828 import lsst.afw.image
829 import lsst.geom
831 result = lsst.afw.image.Mask(self.bbox.to_legacy())
832 if plane_map is None:
833 plane_map = {plane.name: plane for plane in self.schema if plane is not None}
834 for old_name, new_plane in plane_map.items():
835 old_bit = result.addMaskPlane(old_name)
836 old_bitmask = 1 << old_bit
837 result.array[self.get(new_plane.name)] |= old_bitmask
838 return result
840 @staticmethod
841 def _from_legacy_array(
842 array2d: np.ndarray,
843 old_planes: Mapping[str, int],
844 *,
845 start: YX[int],
846 plane_map: Mapping[str, MaskPlane] | None = None,
847 projection: Projection | None = None,
848 ) -> Mask:
849 planes: list[MaskPlane] = []
850 new_name_to_old_bitmask: dict[str, int] = {}
851 for old_name, old_bit in old_planes.items():
852 old_bitmask = 1 << old_bit
853 if plane_map is not None:
854 if new_plane := plane_map.get(old_name):
855 planes.append(new_plane)
856 new_name_to_old_bitmask[new_plane.name] = old_bitmask
857 else:
858 if n_orphaned := np.count_nonzero(array2d & old_bitmask):
859 raise RuntimeError(
860 f"Legacy mask plane {old_name!r} is not remapped, "
861 f"but {n_orphaned} pixels have this bit set."
862 )
863 else:
864 planes.append(MaskPlane(old_name, ""))
865 new_name_to_old_bitmask[old_name] = old_bitmask
866 schema = MaskSchema(planes)
867 mask = Mask(0, schema=schema, start=start, shape=array2d.shape, projection=projection)
868 for new_name, old_bitmask in new_name_to_old_bitmask.items():
869 mask.set(new_name, array2d & old_bitmask)
870 return mask
872 @staticmethod
873 def read_legacy(
874 uri: ResourcePathExpression,
875 *,
876 plane_map: Mapping[str, MaskPlane] | None = None,
877 ext: str | int = 1,
878 fits_wcs_frame: Frame | None = None,
879 ) -> Mask:
880 """Read a FITS file written by `lsst.afw.image.Mask.writeFits`.
882 Parameters
883 ----------
884 uri
885 URI or file name.
886 plane_map
887 A mapping from legacy mask plane name to the new plane name and
888 description.
889 ext
890 Name or index of the FITS HDU to read.
891 fits_wcs_frame
892 If not `None` and the HDU containing the mask has a FITS WCS,
893 attach a `Projection` to the returned mask by converting that WCS.
894 """
895 opaque_metadata = fits.FitsOpaqueMetadata()
896 fs, fspath = ResourcePath(uri).to_fsspec()
897 with fs.open(fspath) as stream, astropy.io.fits.open(stream) as hdu_list:
898 opaque_metadata.extract_legacy_primary_header(hdu_list[0].header)
899 result = Mask._read_legacy_hdu(
900 hdu_list[ext], opaque_metadata, plane_map=plane_map, fits_wcs_frame=fits_wcs_frame
901 )
902 result._opaque_metadata = opaque_metadata
903 return result
905 @staticmethod
906 def _read_legacy_hdu(
907 hdu: astropy.io.fits.ImageHDU | astropy.io.fits.CompImageHDU | astropy.io.fits.BinTableHDU,
908 opaque_metadata: fits.FitsOpaqueMetadata,
909 plane_map: Mapping[str, MaskPlane] | None = None,
910 fits_wcs_frame: Frame | None = None,
911 ) -> Mask:
912 if isinstance(hdu, astropy.io.fits.BinTableHDU):
913 hdu = astropy.io.fits.CompImageHDU(bintable=hdu)
914 dx: int = hdu.header.pop("LTV1")
915 dy: int = hdu.header.pop("LTV2")
916 start = YX(y=-dy, x=-dx)
917 old_planes = MaskPlane.read_legacy(hdu.header)
918 projection: Projection | None = None
919 if fits_wcs_frame is not None:
920 try:
921 fits_wcs = astropy.wcs.WCS(hdu.header)
922 except KeyError:
923 pass
924 else:
925 projection = Projection.from_fits_wcs(
926 fits_wcs, pixel_frame=fits_wcs_frame, x0=start.x, y0=start.y
927 )
928 mask = Mask._from_legacy_array(
929 hdu.data, old_planes, start=start, plane_map=plane_map, projection=projection
930 )
931 fits.strip_wcs_cards(hdu.header)
932 hdu.header.strip()
933 hdu.header.remove("EXTTYPE", ignore_missing=True)
934 hdu.header.remove("INHERIT", ignore_missing=True)
935 # afw set BUNIT on masks because of limitations in how FITS
936 # metadata is handled there.
937 hdu.header.remove("BUNIT", ignore_missing=True)
938 opaque_metadata.add_header(hdu.header)
939 return mask
942class MaskSerializationModel[P: pydantic.BaseModel](ArchiveTree):
943 """Pydantic model used to represent the serialized form of a `.Mask`."""
945 data: list[ArrayReferenceModel | InlineArrayModel] = pydantic.Field(
946 description="References to pixel data."
947 )
948 start: list[int] = pydantic.Field(
949 description="Coordinate of the first pixels in the array, ordered (y, x)."
950 )
951 planes: list[MaskPlane | None] = pydantic.Field(description="Definitions of the bitplanes in the mask.")
952 dtype: IntegerType = pydantic.Field(description="Data type of the in-memory mask.")
953 projection: ProjectionSerializationModel[P] | None = pydantic.Field(
954 default=None,
955 exclude_if=is_none,
956 description="Projection that maps the logical pixel grid onto the sky.",
957 )
958 obs_info: ObservationInfo | None = pydantic.Field(
959 default=None,
960 exclude_if=is_none,
961 description="Standardized description of image metadata",
962 )
964 @property
965 def bbox(self) -> Box:
966 """The 2-d bounding box of the mask."""
967 return Box.from_shape(self.data[0].shape, start=self.start)
970def get_legacy_visit_image_mask_planes() -> dict[str, MaskPlane]:
971 """Return a mapping from legacy mask plane name to `MaskPlane` instance
972 for LSST visit images, c. DP2.
973 """
974 return {
975 "BAD": MaskPlane("BAD", "Bad pixel in the instrument, including bad amplifiers."),
976 "SAT": MaskPlane(
977 "SATURATED", "Pixel was saturated or affected by saturation in a neighboring pixel."
978 ),
979 "INTRP": MaskPlane("INTERPOLATED", "Original pixel value was interpolated."),
980 "CR": MaskPlane("COSMIC_RAY", "A cosmic ray affected this pixel."),
981 "EDGE": MaskPlane(
982 "DETECTION_EDGE",
983 "Pixel was too close to the edge to be considered for detection, "
984 "due to the finite size of the detection kernel.",
985 ),
986 "DETECTED": MaskPlane("DETECTED", "Pixel was part of a detected source."),
987 "SUSPECT": MaskPlane("SUSPECT", "Pixel was close to the saturation level. "),
988 "NO_DATA": MaskPlane("NO_DATA", "No data was available for this pixel."),
989 "VIGNETTED": MaskPlane("VIGNETTED", "Pixel was vignetted by the optics."),
990 "PARTLY_VIGNETTED": MaskPlane("PARTLY_VIGNETTED", "Pixel was partly vignetted by the optics."),
991 "CROSSTALK": MaskPlane("CROSSTALK", "Pixel was affected by crosstalk and corrected accordingly."),
992 "ITL_DIP": MaskPlane(
993 "ITL_DIP", "Pixel was affected by a dark vertical trail from a bright source, on an ITL CCD."
994 ),
995 "NOT_DEBLENDED": MaskPlane(
996 "NOT_DEBLENDED",
997 "Pixel belonged to a detection that was not deblended, usually due to size limits.",
998 ),
999 "SPIKE": MaskPlane(
1000 "SPIKE", "Pixel is in the neighborhood of a diffraction spike from a bright star."
1001 ),
1002 }
1005def get_legacy_deep_coadd_mask_planes() -> dict[str, MaskPlane]:
1006 """Return a mapping from legacy mask plane name to `MaskPlane` instance
1007 for LSST deep coadds, c. DP2.
1008 """
1009 return {
1010 # TODO: reconcile this with counts from the DP2 coadds.
1011 # BAD, CLIPPED, SUSPECT, PARTLY_VIGNETTED, SPIKE: should be fully
1012 # rejected from (cell) coadds with no propagation.
1013 "NO_DATA": MaskPlane("NO_DATA", "No data was available for this pixel."),
1014 "INTRP": MaskPlane("INTERPOLATED", "Pixel value is the result of interpolating nearby good pixels."),
1015 "CR": MaskPlane(
1016 "COSMIC_RAY",
1017 "A cosmic ray affected this pixel on at least one input image (and was interpolated).",
1018 ),
1019 "SAT": MaskPlane("SATURATED", "More than 10% of the potential input visits."),
1020 "EDGE": MaskPlane(
1021 "DETECTION_EDGE",
1022 "Pixel was too close to the edge to be considered for detection, "
1023 "due to the finite size of the detection kernel.",
1024 ),
1025 "REJECTED": MaskPlane(
1026 "REJECTED", "At least one input visit was left out of the coadd for this pixel due to masking."
1027 ),
1028 "DETECTED": MaskPlane("DETECTED", "Pixel was part of a detected source."),
1029 "INEXACT_PSF": MaskPlane(
1030 "INEXACT_PSF",
1031 "Pixel is on or near a cell boundary and hence its PSF may be (usually slightly) discontinuous.",
1032 ),
1033 }