Coverage for python / lsst / images / _mask.py: 24%
369 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-23 08:41 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-23 08:41 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14__all__ = (
15 "Mask",
16 "MaskPlane",
17 "MaskPlaneBit",
18 "MaskSchema",
19 "MaskSerializationModel",
20 "get_legacy_visit_image_mask_planes",
21)
23import dataclasses
24import math
25from collections.abc import Callable, Iterable, Iterator, Mapping, Sequence, Set
26from types import EllipsisType
27from typing import Any, ClassVar, cast
29import astropy.io.fits
30import astropy.wcs
31import numpy as np
32import numpy.typing as npt
33import pydantic
34from astro_metadata_translator import ObservationInfo
36from lsst.resources import ResourcePath, ResourcePathExpression
38from . import fits
39from ._generalized_image import GeneralizedImage
40from ._geom import YX, Box, NoOverlapError
41from ._transforms import Frame, Projection, ProjectionSerializationModel
42from .serialization import (
43 ArchiveReadError,
44 ArchiveTree,
45 ArrayReferenceModel,
46 InputArchive,
47 IntegerType,
48 MetadataValue,
49 NumberType,
50 OutputArchive,
51 is_integer,
52 no_header_updates,
53)
54from .utils import is_none
57@dataclasses.dataclass(frozen=True)
58class MaskPlane:
59 """Name and description of a single plane in a mask array."""
61 name: str
62 """Unique name for the mask plane (`str`)."""
64 description: str
65 """Human-readable documentation for the mask plane (`str`)."""
67 @classmethod
68 def read_legacy(cls, header: astropy.io.fits.Header) -> dict[str, int]:
69 """Read mask plane descriptions written by
70 `lsst.afw.image.Mask.writeFits`.
72 Parameters
73 ----------
74 header
75 FITS header.
77 Returns
78 -------
79 `dict` [`str`, `int`]
80 A dictionary mapping mask plane name to integer bit index.
81 """
82 result: dict[str, int] = {}
83 for card in list(header.cards):
84 if card.keyword.startswith("MP_"):
85 result[card.keyword.removeprefix("MP_")] = card.value
86 del header[card.keyword]
87 return result
90@dataclasses.dataclass(frozen=True)
91class MaskPlaneBit:
92 """The nested array index and mask value associated with a single mask
93 plane.
94 """
96 index: int
97 """Index into the last dimension of the mask array where this plane's bit
98 is stored.
99 """
101 mask: np.integer
102 """Bitmask that selects just this plane's bit from a mask array value
103 (`numpy.integer`).
104 """
106 @classmethod
107 def compute(cls, overall_index: int, stride: int, mask_type: type[np.integer]) -> MaskPlaneBit:
108 """Construct a `MaskPlaneBit` from the overall index of a plane in a
109 `MaskSchema` and the stride (number of bits per mask array element).
110 """
111 index, bit = divmod(overall_index, stride)
112 return cls(index, mask_type(1 << bit))
115class MaskSchema:
116 """A schema for a bit-packed mask array.
118 Parameters
119 ----------
120 planes
121 Iterable of `MaskPlane` instances that define the schema. `None`
122 values may be included to reserve bits for future use.
123 dtype
124 The numpy data type of the mask arrays that use this schema.
126 Notes
127 -----
128 A `MaskSchema` is a collection of mask planes, which each correspond to a
129 single bit in a mask array. Mask schemas are immutable and associated with
130 a particular array data type, allowing them to safely precompute the index
131 and bitmask for each plane.
133 `MaskSchema` indexing is by integer (the overall index of a plane in the
134 schema). The `descriptions` attribute may be indexed by plane name to get
135 the description for that plane, and the `bitmask` method can be used to
136 obtain an array that can be used to select one or more planes by name in
137 a mask array that uses this schema.
139 If no mask planes are provided, a `None` placeholder is automatically
140 added.
141 """
143 def __init__(self, planes: Iterable[MaskPlane | None], dtype: npt.DTypeLike = np.uint8):
144 self._planes: tuple[MaskPlane | None, ...] = tuple(planes) or (None,)
145 self._dtype = cast(np.dtype[np.integer], np.dtype(dtype))
146 stride = self.bits_per_element(self._dtype)
147 self._descriptions = {plane.name: plane.description for plane in self._planes if plane is not None}
148 self._mask_size = math.ceil(len(self._planes) / stride)
149 self._bits: dict[str, MaskPlaneBit] = {
150 plane.name: MaskPlaneBit.compute(n, stride, self._dtype.type)
151 for n, plane in enumerate(self._planes)
152 if plane is not None
153 }
155 @staticmethod
156 def bits_per_element(dtype: npt.DTypeLike) -> int:
157 """Return the number of mask bits per array element for the given
158 data type.
159 """
160 dtype = np.dtype(dtype)
161 match dtype.kind:
162 case "u":
163 return dtype.itemsize * 8
164 case "i":
165 return dtype.itemsize * 8 - 1
166 case _:
167 raise TypeError(f"dtype for masks must be an integer; got {dtype} with kind={dtype.kind}.")
169 def __iter__(self) -> Iterator[MaskPlane | None]:
170 return iter(self._planes)
172 def __len__(self) -> int:
173 return len(self._planes)
175 def __getitem__(self, i: int) -> MaskPlane | None:
176 return self._planes[i]
178 def __repr__(self) -> str:
179 return f"MaskSchema({list(self._planes)}, dtype={self._dtype!r})"
181 def __str__(self) -> str:
182 return "\n".join(
183 [
184 f"{name} [{bit.index}@{hex(bit.mask)}]: {self._descriptions[name]}"
185 for name, bit in self._bits.items()
186 ]
187 )
189 def __eq__(self, other: object) -> bool:
190 if isinstance(other, MaskSchema):
191 return self._planes == other._planes and self._dtype == other._dtype
192 return False
194 @property
195 def dtype(self) -> np.dtype:
196 """The numpy data type of the mask arrays that use this schema."""
197 return self._dtype
199 @property
200 def mask_size(self) -> int:
201 """The number of elements in the last dimension of any mask array that
202 uses this schema.
203 """
204 return self._mask_size
206 @property
207 def names(self) -> Set[str]:
208 """The names of the mask planes, in bit order."""
209 return self._bits.keys()
211 @property
212 def descriptions(self) -> Mapping[str, str]:
213 """A mapping from plane name to description."""
214 return self._descriptions
216 def bit(self, plane: str) -> MaskPlaneBit:
217 """Return the last array index and mask for the given mask plane."""
218 return self._bits[plane]
220 def bitmask(self, *planes: str) -> np.ndarray:
221 """Return a 1-d mask array that represents the union (i.e. bitwise OR)
222 of the planes with the given names.
224 Parameters
225 ----------
226 *planes
227 Mask plane names.
229 Returns
230 -------
231 numpy.ndarray
232 A 1-d array with shape ``(mask_size,)``.
233 """
234 result = np.zeros(self.mask_size, dtype=self._dtype)
235 for plane in planes:
236 bit = self._bits[plane]
237 result[bit.index] |= bit.mask
238 return result
240 def split(self, dtype: npt.DTypeLike) -> list[MaskSchema]:
241 """Split the schema into an equivalent series of schemas that each
242 have a `mask_size` of ``1``, dropping all `None` placeholders.
244 Parameters
245 ----------
246 dtype
247 Data type of the new mask pixels.
249 Returns
250 -------
251 `list` [`MaskSchema`]
252 A list of mask schemas that together include all planes in
253 ``self`` and have `mask_size` equal to ``1``. If there are no
254 mask planes (only `None` placeholders) in ``self``, a single mask
255 schema with a `None` placeholder is returned; otherwise `None`
256 placeholders are returned.
257 """
258 dtype = np.dtype(dtype)
259 planes: list[MaskPlane] = []
260 schemas: list[MaskSchema] = []
261 n_planes_per_schema = self.bits_per_element(dtype)
262 for plane in self._planes:
263 if plane is not None:
264 planes.append(plane)
265 if len(planes) == n_planes_per_schema:
266 schemas.append(MaskSchema(planes, dtype=dtype))
267 planes.clear()
268 if planes:
269 schemas.append(MaskSchema(planes, dtype=dtype))
270 if not schemas:
271 schemas.append(MaskSchema([None], dtype=dtype))
272 return schemas
274 def update_header(self, header: astropy.io.fits.Header) -> None:
275 """Add a description of this mask schema to a FITS header."""
276 for n, plane in enumerate(self):
277 if plane is not None:
278 bit = self.bit(plane.name)
279 if bit.index != 0:
280 raise TypeError("Only mask schemas with mask_size==1 can be described in FITS.")
281 header.set(f"MSKN{n + 1:04d}", plane.name, f"Name for mask plane {n + 1}.")
282 header.set(f"MSKM{n + 1:04d}", bit.mask, f"Bitmask for plane n={n + 1}; always 1<<(n-1).")
283 # We don't add a comment to the description card, because it's
284 # likely to overrun a single card and get the CONTINUE
285 # treatment . That will cause Astropy to warn about the comment
286 # being truncated and that's worse than just leaving it
287 # unexplained; it's pretty obvious from context what it is.
288 header.set(f"MSKD{n + 1:04d}", plane.description)
290 def strip_header(self, header: astropy.io.fits.Header) -> None:
291 """Remove all header cards added by `update_header`."""
292 for n, plane in enumerate(self):
293 if plane is not None:
294 header.remove(f"MSKN{n + 1:04d}", ignore_missing=True)
295 header.remove(f"MSKM{n + 1:04d}", ignore_missing=True)
296 header.remove(f"MSKD{n + 1:04d}", ignore_missing=True)
299class Mask(GeneralizedImage):
300 """A 2-d bitmask image backed by a 3-d byte array.
302 Parameters
303 ----------
304 array_or_fill
305 Array or fill value for the mask. If a fill value, ``bbox`` or
306 ``shape`` must be provided.
307 schema
308 Schema that defines the planes and their bit assignments.
309 bbox
310 Bounding box for the mask. This sets the shape of the first two
311 dimensions of the array.
312 start
313 Logical coordinates of the first pixel in the array, ordered ``y``,
314 ``x`` (unless an `XY` instance is passed). Ignored if
315 ``bbox`` is provided. Defaults to zeros.
316 shape
317 Leading dimensions of the array, ordered ``y``, ``x`` (unless an `XY`
318 instance is passed). Only needed if ``array_or_fill`` is not an
319 array and ``bbox`` is not provided. Like the bbox, this does not
320 include the last dimension of the array.
321 projection
322 Projection that maps the pixel grid to the sky.
323 obs_info
324 General information about the associated observation in standardized
325 form.
326 metadata
327 Arbitrary flexible metadata to associate with the mask.
329 Notes
330 -----
331 Indexing the `array` attribute of a `Mask` does not take into account its
332 ``start`` offset, but accessing a subimage mask by indexing a `Mask` with
333 a `Box` does, and the `bbox` of the subimage is set to match its location
334 within the original mask.
336 A mask's ``bbox`` corresponds to the leading dimensions of its backing
337 `numpy.ndarray`, while the last dimension's size is always equal to the
338 `~MaskSchema.mask_size` of its schema, since a schema can in general
339 require multiple array elements to represent all of its planes.
340 """
342 def __init__(
343 self,
344 array_or_fill: np.ndarray | int = 0,
345 /,
346 *,
347 schema: MaskSchema,
348 bbox: Box | None = None,
349 start: Sequence[int] | None = None,
350 shape: Sequence[int] | None = None,
351 projection: Projection | None = None,
352 obs_info: ObservationInfo | None = None,
353 metadata: dict[str, MetadataValue] | None = None,
354 ):
355 super().__init__(metadata)
356 if shape is not None:
357 shape = tuple(shape)
358 if start is not None:
359 start = tuple(start)
360 if isinstance(array_or_fill, np.ndarray):
361 array = np.array(array_or_fill, dtype=schema.dtype)
362 if array.ndim != 3:
363 raise ValueError("Mask array must be 3-d.")
364 if bbox is None:
365 bbox = Box.from_shape(array.shape[:-1], start=start)
366 elif bbox.shape + (schema.mask_size,) != array.shape:
367 raise ValueError(
368 f"Explicit bbox shape {bbox.shape} and schema of size {schema.mask_size} do not "
369 f"match array with shape {array.shape}."
370 )
371 if shape is not None and shape + (schema.mask_size,) != array.shape:
372 raise ValueError(
373 f"Explicit shape {shape} and schema of size {schema.mask_size} do "
374 f"not match array with shape {array.shape}."
375 )
377 else:
378 if bbox is None:
379 if shape is None:
380 raise TypeError("No bbox, size, or array provided.")
381 bbox = Box.from_shape(shape, start=start)
382 array = np.full(bbox.shape + (schema.mask_size,), array_or_fill, dtype=schema.dtype)
383 self._array = array
384 self._bbox: Box = bbox
385 self._schema: MaskSchema = schema
386 self._projection = projection
387 self._obs_info = obs_info
389 @property
390 def array(self) -> np.ndarray:
391 """The low-level array (`numpy.ndarray`).
393 Assigning to this attribute modifies the existing array in place; the
394 bounding box and underlying data pointer are never changed.
395 """
396 return self._array
398 @array.setter
399 def array(self, value: np.ndarray | int) -> None:
400 self._array[:, :] = value
402 @property
403 def schema(self) -> MaskSchema:
404 """Schema that defines the planes and their bit assignments
405 (`MaskSchema`).
406 """
407 return self._schema
409 @property
410 def bbox(self) -> Box:
411 """2-d bounding box of the mask (`Box`).
413 This sets the shape of the first two dimensions of the array.
414 """
415 return self._bbox
417 @property
418 def projection(self) -> Projection[Any] | None:
419 """The projection that maps this mask's pixel grid to the sky
420 (`Projection` | `None`).
422 Notes
423 -----
424 The pixel coordinates used by this projection account for the bounding
425 box ``start``; they are not just array indices.
426 """
427 return self._projection
429 @property
430 def obs_info(self) -> ObservationInfo | None:
431 """General information about the associated observation in standard
432 form. (`~astro_metadata_translator.ObservationInfo` | `None`).
433 """
434 return self._obs_info
436 def __getitem__(self, bbox: Box | EllipsisType) -> Mask:
437 if bbox is ...:
438 return self
439 super().__getitem__(bbox)
440 return self._transfer_metadata(
441 Mask(
442 self.array[bbox.y.slice_within(self._bbox.y), bbox.x.slice_within(self._bbox.x), :],
443 bbox=bbox,
444 schema=self.schema,
445 ),
446 bbox=bbox,
447 )
449 def __setitem__(self, bbox: Box | EllipsisType, value: Mask) -> None:
450 subview = self[bbox]
451 subview.clear()
452 subview.update(value)
454 def __str__(self) -> str:
455 return f"Mask({self.bbox!s}, {list(self.schema.names)})"
457 def __repr__(self) -> str:
458 return f"Mask(..., bbox={self.bbox!r}, schema={self.schema!r})"
460 def __eq__(self, other: object) -> bool:
461 if not isinstance(other, Mask):
462 return NotImplemented
463 return (
464 self._bbox == other._bbox
465 and self._schema == other._schema
466 and np.array_equal(self._array, other._array, equal_nan=True)
467 )
469 def copy(self) -> Mask:
470 """Deep-copy the mask and metadata."""
471 return self._transfer_metadata(
472 Mask(
473 self._array.copy(),
474 bbox=self._bbox,
475 schema=self._schema,
476 projection=self._projection,
477 obs_info=self._obs_info,
478 ),
479 copy=True,
480 )
482 def view(
483 self,
484 *,
485 schema: MaskSchema | EllipsisType = ...,
486 projection: Projection | None | EllipsisType = ...,
487 start: Sequence[int] | EllipsisType = ...,
488 obs_info: ObservationInfo | None | EllipsisType = ...,
489 ) -> Mask:
490 """Make a view of the mask, with optional updates.
492 Notes
493 -----
494 This can only be used to make changes to schema descriptions; plane
495 names must remain the same (in the same order).
496 """
497 if schema is ...:
498 schema = self._schema
499 else:
500 if list(schema.names) != list(self.schema.names):
501 raise ValueError("Cannot create a mask view with a schema with different names.")
502 if projection is ...:
503 projection = self._projection
504 if start is ...:
505 start = self._bbox.start
506 if obs_info is ...:
507 obs_info = self._obs_info
508 return self._transfer_metadata(
509 Mask(self._array, start=start, schema=schema, projection=projection, obs_info=obs_info)
510 )
512 def update(self, other: Mask) -> None:
513 """Update ``self`` to include all common mask values set in ``other``.
515 Notes
516 -----
517 This only operates on the intersection of the two mask bounding boxes
518 and the mask planes that are present in both. Mask bits are only set,
519 not cleared (i.e. this uses ``|=`` updates, not ``=`` assignments).
520 """
521 lhs = self
522 rhs = other
523 if other.bbox != self.bbox:
524 try:
525 bbox = self.bbox.intersection(other.bbox)
526 except NoOverlapError:
527 return
528 lhs = self[bbox]
529 rhs = other[bbox]
530 for name in self.schema.names & other.schema.names:
531 lhs.set(name, rhs.get(name))
533 def get(self, plane: str) -> np.ndarray:
534 """Return a 2-d boolean array for the given mask plane.
536 Parameters
537 ----------
538 plane
539 Name of the mask plane.
541 Returns
542 -------
543 numpy.ndarray
544 A 2-d boolean array with the same shape as `bbox` that is `True`
545 where the bit for ``plane`` is set and `False` elsewhere.
546 """
547 bit = self.schema.bit(plane)
548 return (self._array[..., bit.index] & bit.mask).astype(bool)
550 def set(self, plane: str, boolean_mask: np.ndarray | EllipsisType = ...) -> None:
551 """Set a mask plane.
553 Parameters
554 ----------
555 plane
556 Name of the mask plane to set
557 boolean_mask
558 A 2-d boolean array with the same shape as `bbox` that is `True`
559 where the bit for ``plane`` should be set and `False` where it
560 should be left unchanged (*not* set to zero). May be ``...`` to
561 set the bit everywhere.
562 """
563 bit = self.schema.bit(plane)
564 if boolean_mask is not ...:
565 boolean_mask = boolean_mask.astype(bool)
566 self._array[boolean_mask, bit.index] |= bit.mask
568 def clear(self, plane: str | None = None, boolean_mask: np.ndarray | EllipsisType = ...) -> None:
569 """Clear one or more mask planes.
571 Parameters
572 ----------
573 plane
574 Name of the mask plane to set. If `None` all mask planes are
575 cleared.
576 boolean_mask
577 A 2-d boolean array with the same shape as `bbox` that is `True`
578 where the bit for ``plane`` should be cleared and `False` where it
579 should be left unchanged. May be ``...`` to clear the bit
580 everywhere.
581 """
582 if boolean_mask is not ...:
583 boolean_mask = boolean_mask.astype(bool)
584 if plane is None:
585 self._array[boolean_mask, :] = 0
586 else:
587 bit = self.schema.bit(plane)
588 self._array[boolean_mask, bit.index] &= ~bit.mask
590 def serialize[P: pydantic.BaseModel](
591 self,
592 archive: OutputArchive[P],
593 *,
594 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
595 save_projection: bool = True,
596 add_offset_wcs: str | None = "A",
597 ) -> MaskSerializationModel[P]:
598 """Serialize the mask to an output archive.
600 Parameters
601 ----------
602 archive
603 Archive to write to.
604 update_header
605 A callback that will be given the FITS header for the HDU
606 containing this mask in order to add keys to it. This callback
607 may be provided but will not be called if the output format is not
608 FITS. As multiple HDUs may be added, this function may be called
609 multiple times.
610 save_projection
611 If `True`, save the `Projection` attached to the mask, if there
612 is one.
613 add_offset_wcs
614 A FITS WCS single-character suffix to use when adding a linear
615 WCS that maps the FITS array to the logical pixel coordinates
616 defined by ``bbox.start``. Set to `None` to not write this WCS.
617 """
618 data: list[ArrayReferenceModel] = []
619 for schema_2d in self.schema.split(np.int32):
620 mask_2d = Mask(
621 0, bbox=self.bbox, schema=schema_2d, projection=self._projection, obs_info=self._obs_info
622 )
623 mask_2d.update(self)
624 data.append(mask_2d._serialize_2d(archive, update_header=update_header))
625 serialized_projection: ProjectionSerializationModel[P] | None = None
626 if save_projection and self.projection is not None:
627 serialized_projection = archive.serialize_direct("projection", self.projection.serialize)
628 serialized_dtype = NumberType.from_numpy(self.schema.dtype)
629 assert is_integer(serialized_dtype), "Mask dtypes should always be integers."
630 return MaskSerializationModel.model_construct(
631 data=data,
632 start=list(self.bbox.start),
633 planes=list(self.schema),
634 dtype=serialized_dtype,
635 projection=serialized_projection,
636 obs_info=self._obs_info,
637 metadata=self.metadata,
638 )
640 def _serialize_2d[P: pydantic.BaseModel](
641 self,
642 archive: OutputArchive[P],
643 *,
644 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
645 save_projection: bool = True,
646 add_offset_wcs: str | None = "A",
647 ) -> ArrayReferenceModel:
648 def _update_header(header: astropy.io.fits.Header) -> None:
649 update_header(header)
650 self.schema.update_header(header)
651 if self.projection is not None:
652 if self.fits_wcs:
653 header.update(self.fits_wcs.to_header(relax=True))
654 if add_offset_wcs is not None:
655 fits.add_offset_wcs(header, x=self.bbox.x.start, y=self.bbox.y.start, key=add_offset_wcs)
657 assert self.array.shape[2] == 1, "Mask should be split before calling this method."
658 return archive.add_array(self._array[:, :, 0], update_header=_update_header)
660 @classmethod
661 def deserialize(
662 cls,
663 model: MaskSerializationModel[Any],
664 archive: InputArchive[Any],
665 *,
666 bbox: Box | None = None,
667 strip_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
668 ) -> Mask:
669 """Deserialize a mask from an input archive.
671 Parameters
672 ----------
673 model
674 A Pydantic model representation of the mask, holding references
675 to data stored in the archive.
676 archive
677 Archive to read from.
678 bbox
679 Bounding box of a subimage to read instead.
680 strip_header
681 A callable that strips out any FITS header cards added by the
682 ``update_header`` argument in the corresponding call to
683 `serialize`.
684 """
685 slices: tuple[slice, ...] | EllipsisType = ...
686 if bbox is not None:
687 slices = bbox.slice_within(model.bbox)
688 else:
689 bbox = model.bbox
690 if not is_integer(model.dtype):
691 raise ArchiveReadError(f"Mask array has a non-integer dtype: {model.dtype}.")
692 schema = MaskSchema(model.planes, dtype=model.dtype.to_numpy())
693 projection = (
694 Projection.deserialize(model.projection, archive) if model.projection is not None else None
695 )
696 result = Mask(
697 0,
698 schema=schema,
699 bbox=bbox,
700 projection=projection,
701 obs_info=model.obs_info,
702 )
703 schemas_2d = schema.split(np.int32)
704 if len(schemas_2d) != len(model.data):
705 raise ArchiveReadError(
706 f"Number of mask arrays ({len(model.data)}) does not match expectation ({len(schemas_2d)})."
707 )
708 for ref, schema_2d in zip(model.data, schemas_2d):
709 mask_2d = cls._deserialize_2d(
710 ref, schema_2d, bbox.start, archive, strip_header=strip_header, slices=slices
711 )
712 result.update(mask_2d)
713 return result._finish_deserialize(model)
715 @classmethod
716 def _deserialize_2d(
717 cls,
718 ref: ArrayReferenceModel,
719 schema_2d: MaskSchema,
720 start: Sequence[int],
721 archive: InputArchive[Any],
722 *,
723 slices: tuple[slice, ...] | EllipsisType = ...,
724 strip_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
725 ) -> Mask:
726 def _strip_header(header: astropy.io.fits.Header) -> None:
727 strip_header(header)
728 schema_2d.strip_header(header)
729 fits.strip_wcs_cards(header)
731 array_2d = archive.get_array(ref, strip_header=_strip_header, slices=slices)
732 return Mask(array_2d[:, :, np.newaxis], schema=schema_2d, start=start)
734 @staticmethod
735 def _get_archive_tree_type[P: pydantic.BaseModel](
736 pointer_type: type[P],
737 ) -> type[MaskSerializationModel[P]]:
738 """Return the serialization model type for this object for an archive
739 type that uses the given pointer type.
740 """
741 return MaskSerializationModel[pointer_type] # type: ignore
743 _archive_default_name: ClassVar[str] = "mask"
744 """The name this object should be serialized with when written as the
745 top-level object.
746 """
748 def write_fits(
749 self,
750 filename: str,
751 *,
752 compression: fits.FitsCompressionOptions | None = fits.FitsCompressionOptions.DEFAULT,
753 ) -> None:
754 """Write the mask to a FITS file.
756 Parameters
757 ----------
758 filename
759 Name of the file to write to. Must be a local file.
760 compression
761 Compression options.
762 """
763 compression_options = {}
764 if compression is not fits.FitsCompressionOptions.DEFAULT:
765 compression_options[self._archive_default_name] = compression
766 fits.write(self, filename, compression_options)
768 @staticmethod
769 def read_fits(url: ResourcePathExpression, *, bbox: Box | None = None) -> Mask:
770 """Read an image from a FITS file.
772 Parameters
773 ----------
774 url
775 URL of the file to read; may be any type supported by
776 `lsst.resources.ResourcePath`.
777 bbox
778 Bounding box of a subimage to read instead.
779 """
780 return fits.read(Mask, url, bbox=bbox).deserialized
782 @staticmethod
783 def from_legacy(
784 legacy: Any,
785 plane_map: Mapping[str, MaskPlane] | None = None,
786 ) -> Mask:
787 """Convert from an `lsst.afw.image.Mask` instance.
789 Parameters
790 ----------
791 legacy
792 An `lsst.afw.image.Mask` instance. This will not share pixel
793 data with the new object.
794 plane_map
795 A mapping from legacy mask plane name to the new plane name and
796 description.
797 """
798 return Mask._from_legacy_array(
799 legacy.array,
800 legacy.getMaskPlaneDict(),
801 start=YX(y=legacy.getY0(), x=legacy.getX0()),
802 plane_map=plane_map,
803 )
805 def to_legacy(self, plane_map: Mapping[str, MaskPlane] | None = None) -> Any:
806 """Convert to an `lsst.afw.image.Mask` instance.
808 The pixel data will not be shared between the two objects.
810 Parameters
811 ----------
812 plane_map
813 A mapping from legacy mask plane name to the new plane name and
814 description.
815 """
816 import lsst.afw.image
817 import lsst.geom
819 result = lsst.afw.image.Mask(self.bbox.to_legacy())
820 if plane_map is None:
821 plane_map = {plane.name: plane for plane in self.schema if plane is not None}
822 for old_name, new_plane in plane_map.items():
823 old_bit = result.addMaskPlane(old_name)
824 old_bitmask = 1 << old_bit
825 result.array[self.get(new_plane.name)] |= old_bitmask
826 return result
828 @staticmethod
829 def _from_legacy_array(
830 array2d: np.ndarray,
831 old_planes: Mapping[str, int],
832 *,
833 start: YX[int],
834 plane_map: Mapping[str, MaskPlane] | None = None,
835 projection: Projection | None = None,
836 ) -> Mask:
837 planes: list[MaskPlane] = []
838 new_name_to_old_bitmask: dict[str, int] = {}
839 for old_name, old_bit in old_planes.items():
840 old_bitmask = 1 << old_bit
841 if plane_map is not None:
842 if new_plane := plane_map.get(old_name):
843 planes.append(new_plane)
844 new_name_to_old_bitmask[new_plane.name] = old_bitmask
845 else:
846 if n_orphaned := np.count_nonzero(array2d & old_bitmask):
847 raise RuntimeError(
848 f"Legacy mask plane {old_name!r} is not remapped, "
849 f"but {n_orphaned} pixels have this bit set."
850 )
851 else:
852 planes.append(MaskPlane(old_name, ""))
853 new_name_to_old_bitmask[old_name] = old_bitmask
854 schema = MaskSchema(planes)
855 mask = Mask(0, schema=schema, start=start, shape=array2d.shape, projection=projection)
856 for new_name, old_bitmask in new_name_to_old_bitmask.items():
857 mask.set(new_name, array2d & old_bitmask)
858 return mask
860 @staticmethod
861 def read_legacy(
862 uri: ResourcePathExpression,
863 *,
864 plane_map: Mapping[str, MaskPlane] | None = None,
865 ext: str | int = 1,
866 fits_wcs_frame: Frame | None = None,
867 ) -> Mask:
868 """Read a FITS file written by `lsst.afw.image.Mask.writeFits`.
870 Parameters
871 ----------
872 uri
873 URI or file name.
874 plane_map
875 A mapping from legacy mask plane name to the new plane name and
876 description.
877 ext
878 Name or index of the FITS HDU to read.
879 fits_wcs_frame
880 If not `None` and the HDU containing the mask has a FITS WCS,
881 attach a `Projection` to the returned mask by converting that WCS.
882 """
883 opaque_metadata = fits.FitsOpaqueMetadata()
884 fs, fspath = ResourcePath(uri).to_fsspec()
885 with fs.open(fspath) as stream, astropy.io.fits.open(stream) as hdu_list:
886 opaque_metadata.extract_legacy_primary_header(hdu_list[0].header)
887 result = Mask._read_legacy_hdu(
888 hdu_list[ext], opaque_metadata, plane_map=plane_map, fits_wcs_frame=fits_wcs_frame
889 )
890 result._opaque_metadata = opaque_metadata
891 return result
893 @staticmethod
894 def _read_legacy_hdu(
895 hdu: astropy.io.fits.ImageHDU | astropy.io.fits.CompImageHDU | astropy.io.fits.BinTableHDU,
896 opaque_metadata: fits.FitsOpaqueMetadata,
897 plane_map: Mapping[str, MaskPlane] | None = None,
898 fits_wcs_frame: Frame | None = None,
899 ) -> Mask:
900 if isinstance(hdu, astropy.io.fits.BinTableHDU):
901 hdu = astropy.io.fits.CompImageHDU(bintable=hdu)
902 dx: int = hdu.header.pop("LTV1")
903 dy: int = hdu.header.pop("LTV2")
904 start = YX(y=-dy, x=-dx)
905 old_planes = MaskPlane.read_legacy(hdu.header)
906 projection: Projection | None = None
907 if fits_wcs_frame is not None:
908 try:
909 fits_wcs = astropy.wcs.WCS(hdu.header)
910 except KeyError:
911 pass
912 else:
913 projection = Projection.from_fits_wcs(
914 fits_wcs, pixel_frame=fits_wcs_frame, x0=start.x, y0=start.y
915 )
916 mask = Mask._from_legacy_array(
917 hdu.data, old_planes, start=start, plane_map=plane_map, projection=projection
918 )
919 fits.strip_wcs_cards(hdu.header)
920 hdu.header.strip()
921 hdu.header.remove("EXTTYPE", ignore_missing=True)
922 hdu.header.remove("INHERIT", ignore_missing=True)
923 # afw set BUNIT on masks because of limitations in how FITS
924 # metadata is handled there.
925 hdu.header.remove("BUNIT", ignore_missing=True)
926 opaque_metadata.add_header(hdu.header)
927 return mask
930class MaskSerializationModel[P: pydantic.BaseModel](ArchiveTree):
931 """Pydantic model used to represent the serialized form of a `.Mask`."""
933 data: list[ArrayReferenceModel] = pydantic.Field(description="References to pixel data.")
934 start: list[int] = pydantic.Field(
935 description="Coordinate of the first pixels in the array, ordered (y, x)."
936 )
937 planes: list[MaskPlane | None] = pydantic.Field(description="Definitions of the bitplanes in the mask.")
938 dtype: IntegerType = pydantic.Field(description="Data type of the in-memory mask.")
939 projection: ProjectionSerializationModel[P] | None = pydantic.Field(
940 default=None,
941 exclude_if=is_none,
942 description="Projection that maps the logical pixel grid onto the sky.",
943 )
944 obs_info: ObservationInfo | None = pydantic.Field(
945 default=None,
946 exclude_if=is_none,
947 description="Standardized description of image metadata",
948 )
950 @property
951 def bbox(self) -> Box:
952 """The 2-d bounding box of the mask."""
953 return Box.from_shape(self.data[0].shape, start=self.start)
956def get_legacy_visit_image_mask_planes() -> dict[str, MaskPlane]:
957 """Return a mapping from legacy mask plane name to `MaskPlane` instance
958 for LSST visit images, c. DP2.
959 """
960 return {
961 "BAD": MaskPlane("BAD", "Bad pixel in the instrument, including bad amplifiers."),
962 "SAT": MaskPlane(
963 "SATURATED", "Pixel was saturated or affected by saturation in a neighboring pixel."
964 ),
965 "INTRP": MaskPlane("INTERPOLATED", "Original pixel value was interpolated."),
966 "CR": MaskPlane("COSMIC_RAY", "A cosmic ray affected this pixel."),
967 "EDGE": MaskPlane(
968 "DETECTION_EDGE",
969 "Pixel was too close to the edge to be considered for detection, "
970 "due to the finite size of the detection kernel.",
971 ),
972 "DETECTED": MaskPlane("DETECTED", "Pixel was part of a detected source."),
973 "SUSPECT": MaskPlane("SUSPECT", "Pixel was close to the saturation level. "),
974 "NO_DATA": MaskPlane("NO_DATA", "No data was available for this pixel."),
975 "VIGNETTED": MaskPlane("VIGNETTED", "Pixel was vignetted by the optics."),
976 "PARTLY_VIGNETTED": MaskPlane("PARTLY_VIGNETTED", "Pixel was partly vignetted by the optics."),
977 "CROSSTALK": MaskPlane("CROSSTALK", "Pixel was affected by crosstalk and corrected accordingly."),
978 "ITL_DIP": MaskPlane(
979 "ITL_DIP", "Pixel was affected by a dark vertical trail from a bright source, on an ITL CCD."
980 ),
981 "NOT_DEBLENDED": MaskPlane(
982 "NOT_DEBLENDED",
983 "Pixel belonged to a detection that was not deblended, usually due to size limits.",
984 ),
985 "SPIKE": MaskPlane(
986 "SPIKE", "Pixel is in the neighborhood of a diffraction spike from a bright star."
987 ),
988 }