Coverage for python / lsst / images / _mask.py: 24%
369 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 09:07 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 09:07 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14__all__ = (
15 "Mask",
16 "MaskPlane",
17 "MaskPlaneBit",
18 "MaskSchema",
19 "MaskSerializationModel",
20 "get_legacy_visit_image_mask_planes",
21)
23import dataclasses
24import math
25from collections.abc import Callable, Iterable, Iterator, Mapping, Sequence, Set
26from types import EllipsisType
27from typing import Any, ClassVar, cast
29import astropy.io.fits
30import astropy.wcs
31import numpy as np
32import numpy.typing as npt
33import pydantic
34from astro_metadata_translator import ObservationInfo
36from lsst.resources import ResourcePath, ResourcePathExpression
38from . import fits
39from ._generalized_image import GeneralizedImage
40from ._geom import YX, Box, NoOverlapError
41from ._transforms import Frame, Projection, ProjectionSerializationModel
42from .serialization import (
43 ArchiveReadError,
44 ArchiveTree,
45 ArrayReferenceModel,
46 InlineArrayModel,
47 InputArchive,
48 IntegerType,
49 MetadataValue,
50 NumberType,
51 OutputArchive,
52 is_integer,
53 no_header_updates,
54)
55from .utils import is_none
58@dataclasses.dataclass(frozen=True)
59class MaskPlane:
60 """Name and description of a single plane in a mask array."""
62 name: str
63 """Unique name for the mask plane (`str`)."""
65 description: str
66 """Human-readable documentation for the mask plane (`str`)."""
68 @classmethod
69 def read_legacy(cls, header: astropy.io.fits.Header) -> dict[str, int]:
70 """Read mask plane descriptions written by
71 `lsst.afw.image.Mask.writeFits`.
73 Parameters
74 ----------
75 header
76 FITS header.
78 Returns
79 -------
80 `dict` [`str`, `int`]
81 A dictionary mapping mask plane name to integer bit index.
82 """
83 result: dict[str, int] = {}
84 for card in list(header.cards):
85 if card.keyword.startswith("MP_"):
86 result[card.keyword.removeprefix("MP_")] = card.value
87 del header[card.keyword]
88 return result
91@dataclasses.dataclass(frozen=True)
92class MaskPlaneBit:
93 """The nested array index and mask value associated with a single mask
94 plane.
95 """
97 index: int
98 """Index into the last dimension of the mask array where this plane's bit
99 is stored.
100 """
102 mask: np.integer
103 """Bitmask that selects just this plane's bit from a mask array value
104 (`numpy.integer`).
105 """
107 @classmethod
108 def compute(cls, overall_index: int, stride: int, mask_type: type[np.integer]) -> MaskPlaneBit:
109 """Construct a `MaskPlaneBit` from the overall index of a plane in a
110 `MaskSchema` and the stride (number of bits per mask array element).
111 """
112 index, bit = divmod(overall_index, stride)
113 return cls(index, mask_type(1 << bit))
116class MaskSchema:
117 """A schema for a bit-packed mask array.
119 Parameters
120 ----------
121 planes
122 Iterable of `MaskPlane` instances that define the schema. `None`
123 values may be included to reserve bits for future use.
124 dtype
125 The numpy data type of the mask arrays that use this schema.
127 Notes
128 -----
129 A `MaskSchema` is a collection of mask planes, which each correspond to a
130 single bit in a mask array. Mask schemas are immutable and associated with
131 a particular array data type, allowing them to safely precompute the index
132 and bitmask for each plane.
134 `MaskSchema` indexing is by integer (the overall index of a plane in the
135 schema). The `descriptions` attribute may be indexed by plane name to get
136 the description for that plane, and the `bitmask` method can be used to
137 obtain an array that can be used to select one or more planes by name in
138 a mask array that uses this schema.
140 If no mask planes are provided, a `None` placeholder is automatically
141 added.
142 """
144 def __init__(self, planes: Iterable[MaskPlane | None], dtype: npt.DTypeLike = np.uint8):
145 self._planes: tuple[MaskPlane | None, ...] = tuple(planes) or (None,)
146 self._dtype = cast(np.dtype[np.integer], np.dtype(dtype))
147 stride = self.bits_per_element(self._dtype)
148 self._descriptions = {plane.name: plane.description for plane in self._planes if plane is not None}
149 self._mask_size = math.ceil(len(self._planes) / stride)
150 self._bits: dict[str, MaskPlaneBit] = {
151 plane.name: MaskPlaneBit.compute(n, stride, self._dtype.type)
152 for n, plane in enumerate(self._planes)
153 if plane is not None
154 }
156 @staticmethod
157 def bits_per_element(dtype: npt.DTypeLike) -> int:
158 """Return the number of mask bits per array element for the given
159 data type.
160 """
161 dtype = np.dtype(dtype)
162 match dtype.kind:
163 case "u":
164 return dtype.itemsize * 8
165 case "i":
166 return dtype.itemsize * 8 - 1
167 case _:
168 raise TypeError(f"dtype for masks must be an integer; got {dtype} with kind={dtype.kind}.")
170 def __iter__(self) -> Iterator[MaskPlane | None]:
171 return iter(self._planes)
173 def __len__(self) -> int:
174 return len(self._planes)
176 def __getitem__(self, i: int) -> MaskPlane | None:
177 return self._planes[i]
179 def __repr__(self) -> str:
180 return f"MaskSchema({list(self._planes)}, dtype={self._dtype!r})"
182 def __str__(self) -> str:
183 return "\n".join(
184 [
185 f"{name} [{bit.index}@{hex(bit.mask)}]: {self._descriptions[name]}"
186 for name, bit in self._bits.items()
187 ]
188 )
190 def __eq__(self, other: object) -> bool:
191 if isinstance(other, MaskSchema):
192 return self._planes == other._planes and self._dtype == other._dtype
193 return False
195 @property
196 def dtype(self) -> np.dtype:
197 """The numpy data type of the mask arrays that use this schema."""
198 return self._dtype
200 @property
201 def mask_size(self) -> int:
202 """The number of elements in the last dimension of any mask array that
203 uses this schema.
204 """
205 return self._mask_size
207 @property
208 def names(self) -> Set[str]:
209 """The names of the mask planes, in bit order."""
210 return self._bits.keys()
212 @property
213 def descriptions(self) -> Mapping[str, str]:
214 """A mapping from plane name to description."""
215 return self._descriptions
217 def bit(self, plane: str) -> MaskPlaneBit:
218 """Return the last array index and mask for the given mask plane."""
219 return self._bits[plane]
221 def bitmask(self, *planes: str) -> np.ndarray:
222 """Return a 1-d mask array that represents the union (i.e. bitwise OR)
223 of the planes with the given names.
225 Parameters
226 ----------
227 *planes
228 Mask plane names.
230 Returns
231 -------
232 numpy.ndarray
233 A 1-d array with shape ``(mask_size,)``.
234 """
235 result = np.zeros(self.mask_size, dtype=self._dtype)
236 for plane in planes:
237 bit = self._bits[plane]
238 result[bit.index] |= bit.mask
239 return result
241 def split(self, dtype: npt.DTypeLike) -> list[MaskSchema]:
242 """Split the schema into an equivalent series of schemas that each
243 have a `mask_size` of ``1``, dropping all `None` placeholders.
245 Parameters
246 ----------
247 dtype
248 Data type of the new mask pixels.
250 Returns
251 -------
252 `list` [`MaskSchema`]
253 A list of mask schemas that together include all planes in
254 ``self`` and have `mask_size` equal to ``1``. If there are no
255 mask planes (only `None` placeholders) in ``self``, a single mask
256 schema with a `None` placeholder is returned; otherwise `None`
257 placeholders are returned.
258 """
259 dtype = np.dtype(dtype)
260 planes: list[MaskPlane] = []
261 schemas: list[MaskSchema] = []
262 n_planes_per_schema = self.bits_per_element(dtype)
263 for plane in self._planes:
264 if plane is not None:
265 planes.append(plane)
266 if len(planes) == n_planes_per_schema:
267 schemas.append(MaskSchema(planes, dtype=dtype))
268 planes.clear()
269 if planes:
270 schemas.append(MaskSchema(planes, dtype=dtype))
271 if not schemas:
272 schemas.append(MaskSchema([None], dtype=dtype))
273 return schemas
275 def update_header(self, header: astropy.io.fits.Header) -> None:
276 """Add a description of this mask schema to a FITS header."""
277 for n, plane in enumerate(self):
278 if plane is not None:
279 bit = self.bit(plane.name)
280 if bit.index != 0:
281 raise TypeError("Only mask schemas with mask_size==1 can be described in FITS.")
282 header.set(f"MSKN{n:04d}", plane.name, f"Name for mask plane {n}.")
283 header.set(f"MSKM{n:04d}", bit.mask, f"Bitmask for plane n={n}; always 1<<n.")
284 # We don't add a comment to the description card, because it's
285 # likely to overrun a single card and get the CONTINUE
286 # treatment. That will cause Astropy to warn about the comment
287 # being truncated and that's worse than just leaving it
288 # unexplained; it's pretty obvious from context what it is.
289 header.set(f"MSKD{n:04d}", plane.description)
291 def strip_header(self, header: astropy.io.fits.Header) -> None:
292 """Remove all header cards added by `update_header`."""
293 for n, plane in enumerate(self):
294 if plane is not None:
295 header.remove(f"MSKN{n:04d}", ignore_missing=True)
296 header.remove(f"MSKM{n:04d}", ignore_missing=True)
297 header.remove(f"MSKD{n:04d}", ignore_missing=True)
300class Mask(GeneralizedImage):
301 """A 2-d bitmask image backed by a 3-d byte array.
303 Parameters
304 ----------
305 array_or_fill
306 Array or fill value for the mask. If a fill value, ``bbox`` or
307 ``shape`` must be provided.
308 schema
309 Schema that defines the planes and their bit assignments.
310 bbox
311 Bounding box for the mask. This sets the shape of the first two
312 dimensions of the array.
313 start
314 Logical coordinates of the first pixel in the array, ordered ``y``,
315 ``x`` (unless an `XY` instance is passed). Ignored if
316 ``bbox`` is provided. Defaults to zeros.
317 shape
318 Leading dimensions of the array, ordered ``y``, ``x`` (unless an `XY`
319 instance is passed). Only needed if ``array_or_fill`` is not an
320 array and ``bbox`` is not provided. Like the bbox, this does not
321 include the last dimension of the array.
322 projection
323 Projection that maps the pixel grid to the sky.
324 obs_info
325 General information about the associated observation in standardized
326 form.
327 metadata
328 Arbitrary flexible metadata to associate with the mask.
330 Notes
331 -----
332 Indexing the `array` attribute of a `Mask` does not take into account its
333 ``start`` offset, but accessing a subimage mask by indexing a `Mask` with
334 a `Box` does, and the `bbox` of the subimage is set to match its location
335 within the original mask.
337 A mask's ``bbox`` corresponds to the leading dimensions of its backing
338 `numpy.ndarray`, while the last dimension's size is always equal to the
339 `~MaskSchema.mask_size` of its schema, since a schema can in general
340 require multiple array elements to represent all of its planes.
341 """
343 def __init__(
344 self,
345 array_or_fill: np.ndarray | int = 0,
346 /,
347 *,
348 schema: MaskSchema,
349 bbox: Box | None = None,
350 start: Sequence[int] | None = None,
351 shape: Sequence[int] | None = None,
352 projection: Projection | None = None,
353 obs_info: ObservationInfo | None = None,
354 metadata: dict[str, MetadataValue] | None = None,
355 ):
356 super().__init__(metadata)
357 if shape is not None:
358 shape = tuple(shape)
359 if start is not None:
360 start = tuple(start)
361 if isinstance(array_or_fill, np.ndarray):
362 array = np.array(array_or_fill, dtype=schema.dtype)
363 if array.ndim != 3:
364 raise ValueError("Mask array must be 3-d.")
365 if bbox is None:
366 bbox = Box.from_shape(array.shape[:-1], start=start)
367 elif bbox.shape + (schema.mask_size,) != array.shape:
368 raise ValueError(
369 f"Explicit bbox shape {bbox.shape} and schema of size {schema.mask_size} do not "
370 f"match array with shape {array.shape}."
371 )
372 if shape is not None and shape + (schema.mask_size,) != array.shape:
373 raise ValueError(
374 f"Explicit shape {shape} and schema of size {schema.mask_size} do "
375 f"not match array with shape {array.shape}."
376 )
378 else:
379 if bbox is None:
380 if shape is None:
381 raise TypeError("No bbox, size, or array provided.")
382 bbox = Box.from_shape(shape, start=start)
383 array = np.full(bbox.shape + (schema.mask_size,), array_or_fill, dtype=schema.dtype)
384 self._array = array
385 self._bbox: Box = bbox
386 self._schema: MaskSchema = schema
387 self._projection = projection
388 self._obs_info = obs_info
390 @property
391 def array(self) -> np.ndarray:
392 """The low-level array (`numpy.ndarray`).
394 Assigning to this attribute modifies the existing array in place; the
395 bounding box and underlying data pointer are never changed.
396 """
397 return self._array
399 @array.setter
400 def array(self, value: np.ndarray | int) -> None:
401 self._array[:, :] = value
403 @property
404 def schema(self) -> MaskSchema:
405 """Schema that defines the planes and their bit assignments
406 (`MaskSchema`).
407 """
408 return self._schema
410 @property
411 def bbox(self) -> Box:
412 """2-d bounding box of the mask (`Box`).
414 This sets the shape of the first two dimensions of the array.
415 """
416 return self._bbox
418 @property
419 def projection(self) -> Projection[Any] | None:
420 """The projection that maps this mask's pixel grid to the sky
421 (`Projection` | `None`).
423 Notes
424 -----
425 The pixel coordinates used by this projection account for the bounding
426 box ``start``; they are not just array indices.
427 """
428 return self._projection
430 @property
431 def obs_info(self) -> ObservationInfo | None:
432 """General information about the associated observation in standard
433 form. (`~astro_metadata_translator.ObservationInfo` | `None`).
434 """
435 return self._obs_info
437 def __getitem__(self, bbox: Box | EllipsisType) -> Mask:
438 if bbox is ...:
439 return self
440 super().__getitem__(bbox)
441 return self._transfer_metadata(
442 Mask(
443 self.array[bbox.y.slice_within(self._bbox.y), bbox.x.slice_within(self._bbox.x), :],
444 bbox=bbox,
445 schema=self.schema,
446 ),
447 bbox=bbox,
448 )
450 def __setitem__(self, bbox: Box | EllipsisType, value: Mask) -> None:
451 subview = self[bbox]
452 subview.clear()
453 subview.update(value)
455 def __str__(self) -> str:
456 return f"Mask({self.bbox!s}, {list(self.schema.names)})"
458 def __repr__(self) -> str:
459 return f"Mask(..., bbox={self.bbox!r}, schema={self.schema!r})"
461 def __eq__(self, other: object) -> bool:
462 if not isinstance(other, Mask):
463 return NotImplemented
464 return (
465 self._bbox == other._bbox
466 and self._schema == other._schema
467 and np.array_equal(self._array, other._array, equal_nan=True)
468 )
470 def copy(self) -> Mask:
471 """Deep-copy the mask and metadata."""
472 return self._transfer_metadata(
473 Mask(
474 self._array.copy(),
475 bbox=self._bbox,
476 schema=self._schema,
477 projection=self._projection,
478 obs_info=self._obs_info,
479 ),
480 copy=True,
481 )
483 def view(
484 self,
485 *,
486 schema: MaskSchema | EllipsisType = ...,
487 projection: Projection | None | EllipsisType = ...,
488 start: Sequence[int] | EllipsisType = ...,
489 obs_info: ObservationInfo | None | EllipsisType = ...,
490 ) -> Mask:
491 """Make a view of the mask, with optional updates.
493 Notes
494 -----
495 This can only be used to make changes to schema descriptions; plane
496 names must remain the same (in the same order).
497 """
498 if schema is ...:
499 schema = self._schema
500 else:
501 if list(schema.names) != list(self.schema.names):
502 raise ValueError("Cannot create a mask view with a schema with different names.")
503 if projection is ...:
504 projection = self._projection
505 if start is ...:
506 start = self._bbox.start
507 if obs_info is ...:
508 obs_info = self._obs_info
509 return self._transfer_metadata(
510 Mask(self._array, start=start, schema=schema, projection=projection, obs_info=obs_info)
511 )
513 def update(self, other: Mask) -> None:
514 """Update ``self`` to include all common mask values set in ``other``.
516 Notes
517 -----
518 This only operates on the intersection of the two mask bounding boxes
519 and the mask planes that are present in both. Mask bits are only set,
520 not cleared (i.e. this uses ``|=`` updates, not ``=`` assignments).
521 """
522 lhs = self
523 rhs = other
524 if other.bbox != self.bbox:
525 try:
526 bbox = self.bbox.intersection(other.bbox)
527 except NoOverlapError:
528 return
529 lhs = self[bbox]
530 rhs = other[bbox]
531 for name in self.schema.names & other.schema.names:
532 lhs.set(name, rhs.get(name))
534 def get(self, plane: str) -> np.ndarray:
535 """Return a 2-d boolean array for the given mask plane.
537 Parameters
538 ----------
539 plane
540 Name of the mask plane.
542 Returns
543 -------
544 numpy.ndarray
545 A 2-d boolean array with the same shape as `bbox` that is `True`
546 where the bit for ``plane`` is set and `False` elsewhere.
547 """
548 bit = self.schema.bit(plane)
549 return (self._array[..., bit.index] & bit.mask).astype(bool)
551 def set(self, plane: str, boolean_mask: np.ndarray | EllipsisType = ...) -> None:
552 """Set a mask plane.
554 Parameters
555 ----------
556 plane
557 Name of the mask plane to set
558 boolean_mask
559 A 2-d boolean array with the same shape as `bbox` that is `True`
560 where the bit for ``plane`` should be set and `False` where it
561 should be left unchanged (*not* set to zero). May be ``...`` to
562 set the bit everywhere.
563 """
564 bit = self.schema.bit(plane)
565 if boolean_mask is not ...:
566 boolean_mask = boolean_mask.astype(bool)
567 self._array[boolean_mask, bit.index] |= bit.mask
569 def clear(self, plane: str | None = None, boolean_mask: np.ndarray | EllipsisType = ...) -> None:
570 """Clear one or more mask planes.
572 Parameters
573 ----------
574 plane
575 Name of the mask plane to set. If `None` all mask planes are
576 cleared.
577 boolean_mask
578 A 2-d boolean array with the same shape as `bbox` that is `True`
579 where the bit for ``plane`` should be cleared and `False` where it
580 should be left unchanged. May be ``...`` to clear the bit
581 everywhere.
582 """
583 if boolean_mask is not ...:
584 boolean_mask = boolean_mask.astype(bool)
585 if plane is None:
586 self._array[boolean_mask, :] = 0
587 else:
588 bit = self.schema.bit(plane)
589 self._array[boolean_mask, bit.index] &= ~bit.mask
591 def serialize[P: pydantic.BaseModel](
592 self,
593 archive: OutputArchive[P],
594 *,
595 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
596 save_projection: bool = True,
597 save_obs_info: bool = True,
598 add_offset_wcs: str | None = "A",
599 ) -> MaskSerializationModel[P]:
600 """Serialize the mask to an output archive.
602 Parameters
603 ----------
604 archive
605 Archive to write to.
606 update_header
607 A callback that will be given the FITS header for the HDU
608 containing this mask in order to add keys to it. This callback
609 may be provided but will not be called if the output format is not
610 FITS. As multiple HDUs may be added, this function may be called
611 multiple times.
612 save_projection
613 If `True`, save the `Projection` attached to the image, if there
614 is one. This does not affect whether a FITS WCS corresponding to
615 the projection is written (it always is, if available, and if
616 ``add_offset_wcs`` is not ``" "``).
617 save_obs_info
618 If `True`, save the
619 `~astro_metadata_translator.ObservationInfo` attached to the
620 image, if there is one.
621 add_offset_wcs
622 A FITS WCS single-character suffix to use when adding a linear
623 WCS that maps the FITS array to the logical pixel coordinates
624 defined by ``bbox.start``. Set to `None` to not write this WCS.
625 If this is set to ``" "``, it will prevent the `Projection` from
626 being saved as a FITS WCS.
627 """
628 data: list[ArrayReferenceModel | InlineArrayModel] = []
629 for schema_2d in self.schema.split(np.int32):
630 mask_2d = Mask(
631 0, bbox=self.bbox, schema=schema_2d, projection=self._projection, obs_info=self._obs_info
632 )
633 mask_2d.update(self)
634 data.append(
635 mask_2d._serialize_2d(archive, update_header=update_header, add_offset_wcs=add_offset_wcs)
636 )
637 serialized_projection: ProjectionSerializationModel[P] | None = None
638 if save_projection and self.projection is not None:
639 serialized_projection = archive.serialize_direct("projection", self.projection.serialize)
640 serialized_dtype = NumberType.from_numpy(self.schema.dtype)
641 assert is_integer(serialized_dtype), "Mask dtypes should always be integers."
642 return MaskSerializationModel.model_construct(
643 data=data,
644 start=list(self.bbox.start),
645 planes=list(self.schema),
646 dtype=serialized_dtype,
647 projection=serialized_projection,
648 obs_info=self._obs_info if save_obs_info else None,
649 metadata=self.metadata,
650 )
652 def _serialize_2d[P: pydantic.BaseModel](
653 self,
654 archive: OutputArchive[P],
655 *,
656 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
657 add_offset_wcs: str | None = "A",
658 ) -> ArrayReferenceModel | InlineArrayModel:
659 def _update_header(header: astropy.io.fits.Header) -> None:
660 update_header(header)
661 self.schema.update_header(header)
662 if self.projection is not None and add_offset_wcs != " ":
663 if self.fits_wcs:
664 header.update(self.fits_wcs.to_header(relax=True))
665 if add_offset_wcs is not None:
666 fits.add_offset_wcs(header, x=self.bbox.x.start, y=self.bbox.y.start, key=add_offset_wcs)
668 assert self.array.shape[2] == 1, "Mask should be split before calling this method."
669 return archive.add_array(self._array[:, :, 0], update_header=_update_header)
671 @classmethod
672 def deserialize(
673 cls,
674 model: MaskSerializationModel[Any],
675 archive: InputArchive[Any],
676 *,
677 bbox: Box | None = None,
678 strip_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
679 ) -> Mask:
680 """Deserialize a mask from an input archive.
682 Parameters
683 ----------
684 model
685 A Pydantic model representation of the mask, holding references
686 to data stored in the archive.
687 archive
688 Archive to read from.
689 bbox
690 Bounding box of a subimage to read instead.
691 strip_header
692 A callable that strips out any FITS header cards added by the
693 ``update_header`` argument in the corresponding call to
694 `serialize`.
695 """
696 slices: tuple[slice, ...] | EllipsisType = ...
697 if bbox is not None:
698 slices = bbox.slice_within(model.bbox)
699 else:
700 bbox = model.bbox
701 if not is_integer(model.dtype):
702 raise ArchiveReadError(f"Mask array has a non-integer dtype: {model.dtype}.")
703 schema = MaskSchema(model.planes, dtype=model.dtype.to_numpy())
704 projection = (
705 Projection.deserialize(model.projection, archive) if model.projection is not None else None
706 )
707 result = Mask(
708 0,
709 schema=schema,
710 bbox=bbox,
711 projection=projection,
712 obs_info=model.obs_info,
713 )
714 schemas_2d = schema.split(np.int32)
715 if len(schemas_2d) != len(model.data):
716 raise ArchiveReadError(
717 f"Number of mask arrays ({len(model.data)}) does not match expectation ({len(schemas_2d)})."
718 )
719 for array_model, schema_2d in zip(model.data, schemas_2d):
720 mask_2d = cls._deserialize_2d(
721 array_model, schema_2d, bbox.start, archive, strip_header=strip_header, slices=slices
722 )
723 result.update(mask_2d)
724 return result._finish_deserialize(model)
726 @classmethod
727 def _deserialize_2d(
728 cls,
729 ref: ArrayReferenceModel | InlineArrayModel,
730 schema_2d: MaskSchema,
731 start: Sequence[int],
732 archive: InputArchive[Any],
733 *,
734 slices: tuple[slice, ...] | EllipsisType = ...,
735 strip_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
736 ) -> Mask:
737 def _strip_header(header: astropy.io.fits.Header) -> None:
738 strip_header(header)
739 schema_2d.strip_header(header)
740 fits.strip_wcs_cards(header)
742 array_2d = archive.get_array(ref, strip_header=_strip_header, slices=slices)
743 return Mask(array_2d[:, :, np.newaxis], schema=schema_2d, start=start)
745 @staticmethod
746 def _get_archive_tree_type[P: pydantic.BaseModel](
747 pointer_type: type[P],
748 ) -> type[MaskSerializationModel[P]]:
749 """Return the serialization model type for this object for an archive
750 type that uses the given pointer type.
751 """
752 return MaskSerializationModel[pointer_type] # type: ignore
754 _archive_default_name: ClassVar[str] = "mask"
755 """The name this object should be serialized with when written as the
756 top-level object.
757 """
759 def write_fits(
760 self,
761 filename: str,
762 *,
763 compression: fits.FitsCompressionOptions | None = fits.FitsCompressionOptions.DEFAULT,
764 ) -> None:
765 """Write the mask to a FITS file.
767 Parameters
768 ----------
769 filename
770 Name of the file to write to. Must be a local file.
771 compression
772 Compression options.
773 """
774 compression_options = {}
775 if compression is not fits.FitsCompressionOptions.DEFAULT:
776 compression_options[self._archive_default_name] = compression
777 fits.write(self, filename, compression_options)
779 @staticmethod
780 def read_fits(url: ResourcePathExpression, *, bbox: Box | None = None) -> Mask:
781 """Read an image from a FITS file.
783 Parameters
784 ----------
785 url
786 URL of the file to read; may be any type supported by
787 `lsst.resources.ResourcePath`.
788 bbox
789 Bounding box of a subimage to read instead.
790 """
791 return fits.read(Mask, url, bbox=bbox).deserialized
793 @staticmethod
794 def from_legacy(
795 legacy: Any,
796 plane_map: Mapping[str, MaskPlane] | None = None,
797 ) -> Mask:
798 """Convert from an `lsst.afw.image.Mask` instance.
800 Parameters
801 ----------
802 legacy
803 An `lsst.afw.image.Mask` instance. This will not share pixel
804 data with the new object.
805 plane_map
806 A mapping from legacy mask plane name to the new plane name and
807 description.
808 """
809 return Mask._from_legacy_array(
810 legacy.array,
811 legacy.getMaskPlaneDict(),
812 start=YX(y=legacy.getY0(), x=legacy.getX0()),
813 plane_map=plane_map,
814 )
816 def to_legacy(self, plane_map: Mapping[str, MaskPlane] | None = None) -> Any:
817 """Convert to an `lsst.afw.image.Mask` instance.
819 The pixel data will not be shared between the two objects.
821 Parameters
822 ----------
823 plane_map
824 A mapping from legacy mask plane name to the new plane name and
825 description.
826 """
827 import lsst.afw.image
828 import lsst.geom
830 result = lsst.afw.image.Mask(self.bbox.to_legacy())
831 if plane_map is None:
832 plane_map = {plane.name: plane for plane in self.schema if plane is not None}
833 for old_name, new_plane in plane_map.items():
834 old_bit = result.addMaskPlane(old_name)
835 old_bitmask = 1 << old_bit
836 result.array[self.get(new_plane.name)] |= old_bitmask
837 return result
839 @staticmethod
840 def _from_legacy_array(
841 array2d: np.ndarray,
842 old_planes: Mapping[str, int],
843 *,
844 start: YX[int],
845 plane_map: Mapping[str, MaskPlane] | None = None,
846 projection: Projection | None = None,
847 ) -> Mask:
848 planes: list[MaskPlane] = []
849 new_name_to_old_bitmask: dict[str, int] = {}
850 for old_name, old_bit in old_planes.items():
851 old_bitmask = 1 << old_bit
852 if plane_map is not None:
853 if new_plane := plane_map.get(old_name):
854 planes.append(new_plane)
855 new_name_to_old_bitmask[new_plane.name] = old_bitmask
856 else:
857 if n_orphaned := np.count_nonzero(array2d & old_bitmask):
858 raise RuntimeError(
859 f"Legacy mask plane {old_name!r} is not remapped, "
860 f"but {n_orphaned} pixels have this bit set."
861 )
862 else:
863 planes.append(MaskPlane(old_name, ""))
864 new_name_to_old_bitmask[old_name] = old_bitmask
865 schema = MaskSchema(planes)
866 mask = Mask(0, schema=schema, start=start, shape=array2d.shape, projection=projection)
867 for new_name, old_bitmask in new_name_to_old_bitmask.items():
868 mask.set(new_name, array2d & old_bitmask)
869 return mask
871 @staticmethod
872 def read_legacy(
873 uri: ResourcePathExpression,
874 *,
875 plane_map: Mapping[str, MaskPlane] | None = None,
876 ext: str | int = 1,
877 fits_wcs_frame: Frame | None = None,
878 ) -> Mask:
879 """Read a FITS file written by `lsst.afw.image.Mask.writeFits`.
881 Parameters
882 ----------
883 uri
884 URI or file name.
885 plane_map
886 A mapping from legacy mask plane name to the new plane name and
887 description.
888 ext
889 Name or index of the FITS HDU to read.
890 fits_wcs_frame
891 If not `None` and the HDU containing the mask has a FITS WCS,
892 attach a `Projection` to the returned mask by converting that WCS.
893 """
894 opaque_metadata = fits.FitsOpaqueMetadata()
895 fs, fspath = ResourcePath(uri).to_fsspec()
896 with fs.open(fspath) as stream, astropy.io.fits.open(stream) as hdu_list:
897 opaque_metadata.extract_legacy_primary_header(hdu_list[0].header)
898 result = Mask._read_legacy_hdu(
899 hdu_list[ext], opaque_metadata, plane_map=plane_map, fits_wcs_frame=fits_wcs_frame
900 )
901 result._opaque_metadata = opaque_metadata
902 return result
904 @staticmethod
905 def _read_legacy_hdu(
906 hdu: astropy.io.fits.ImageHDU | astropy.io.fits.CompImageHDU | astropy.io.fits.BinTableHDU,
907 opaque_metadata: fits.FitsOpaqueMetadata,
908 plane_map: Mapping[str, MaskPlane] | None = None,
909 fits_wcs_frame: Frame | None = None,
910 ) -> Mask:
911 if isinstance(hdu, astropy.io.fits.BinTableHDU):
912 hdu = astropy.io.fits.CompImageHDU(bintable=hdu)
913 dx: int = hdu.header.pop("LTV1")
914 dy: int = hdu.header.pop("LTV2")
915 start = YX(y=-dy, x=-dx)
916 old_planes = MaskPlane.read_legacy(hdu.header)
917 projection: Projection | None = None
918 if fits_wcs_frame is not None:
919 try:
920 fits_wcs = astropy.wcs.WCS(hdu.header)
921 except KeyError:
922 pass
923 else:
924 projection = Projection.from_fits_wcs(
925 fits_wcs, pixel_frame=fits_wcs_frame, x0=start.x, y0=start.y
926 )
927 mask = Mask._from_legacy_array(
928 hdu.data, old_planes, start=start, plane_map=plane_map, projection=projection
929 )
930 fits.strip_wcs_cards(hdu.header)
931 hdu.header.strip()
932 hdu.header.remove("EXTTYPE", ignore_missing=True)
933 hdu.header.remove("INHERIT", ignore_missing=True)
934 # afw set BUNIT on masks because of limitations in how FITS
935 # metadata is handled there.
936 hdu.header.remove("BUNIT", ignore_missing=True)
937 opaque_metadata.add_header(hdu.header)
938 return mask
941class MaskSerializationModel[P: pydantic.BaseModel](ArchiveTree):
942 """Pydantic model used to represent the serialized form of a `.Mask`."""
944 data: list[ArrayReferenceModel | InlineArrayModel] = pydantic.Field(
945 description="References to pixel data."
946 )
947 start: list[int] = pydantic.Field(
948 description="Coordinate of the first pixels in the array, ordered (y, x)."
949 )
950 planes: list[MaskPlane | None] = pydantic.Field(description="Definitions of the bitplanes in the mask.")
951 dtype: IntegerType = pydantic.Field(description="Data type of the in-memory mask.")
952 projection: ProjectionSerializationModel[P] | None = pydantic.Field(
953 default=None,
954 exclude_if=is_none,
955 description="Projection that maps the logical pixel grid onto the sky.",
956 )
957 obs_info: ObservationInfo | None = pydantic.Field(
958 default=None,
959 exclude_if=is_none,
960 description="Standardized description of image metadata",
961 )
963 @property
964 def bbox(self) -> Box:
965 """The 2-d bounding box of the mask."""
966 return Box.from_shape(self.data[0].shape, start=self.start)
969def get_legacy_visit_image_mask_planes() -> dict[str, MaskPlane]:
970 """Return a mapping from legacy mask plane name to `MaskPlane` instance
971 for LSST visit images, c. DP2.
972 """
973 return {
974 "BAD": MaskPlane("BAD", "Bad pixel in the instrument, including bad amplifiers."),
975 "SAT": MaskPlane(
976 "SATURATED", "Pixel was saturated or affected by saturation in a neighboring pixel."
977 ),
978 "INTRP": MaskPlane("INTERPOLATED", "Original pixel value was interpolated."),
979 "CR": MaskPlane("COSMIC_RAY", "A cosmic ray affected this pixel."),
980 "EDGE": MaskPlane(
981 "DETECTION_EDGE",
982 "Pixel was too close to the edge to be considered for detection, "
983 "due to the finite size of the detection kernel.",
984 ),
985 "DETECTED": MaskPlane("DETECTED", "Pixel was part of a detected source."),
986 "SUSPECT": MaskPlane("SUSPECT", "Pixel was close to the saturation level. "),
987 "NO_DATA": MaskPlane("NO_DATA", "No data was available for this pixel."),
988 "VIGNETTED": MaskPlane("VIGNETTED", "Pixel was vignetted by the optics."),
989 "PARTLY_VIGNETTED": MaskPlane("PARTLY_VIGNETTED", "Pixel was partly vignetted by the optics."),
990 "CROSSTALK": MaskPlane("CROSSTALK", "Pixel was affected by crosstalk and corrected accordingly."),
991 "ITL_DIP": MaskPlane(
992 "ITL_DIP", "Pixel was affected by a dark vertical trail from a bright source, on an ITL CCD."
993 ),
994 "NOT_DEBLENDED": MaskPlane(
995 "NOT_DEBLENDED",
996 "Pixel belonged to a detection that was not deblended, usually due to size limits.",
997 ),
998 "SPIKE": MaskPlane(
999 "SPIKE", "Pixel is in the neighborhood of a diffraction spike from a bright star."
1000 ),
1001 }