Coverage for python / lsst / images / _mask.py: 24%

369 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-26 09:16 +0000

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14__all__ = ( 

15 "Mask", 

16 "MaskPlane", 

17 "MaskPlaneBit", 

18 "MaskSchema", 

19 "MaskSerializationModel", 

20 "get_legacy_visit_image_mask_planes", 

21) 

22 

23import dataclasses 

24import math 

25from collections.abc import Callable, Iterable, Iterator, Mapping, Sequence, Set 

26from types import EllipsisType 

27from typing import Any, ClassVar, cast 

28 

29import astropy.io.fits 

30import astropy.wcs 

31import numpy as np 

32import numpy.typing as npt 

33import pydantic 

34from astro_metadata_translator import ObservationInfo 

35 

36from lsst.resources import ResourcePath, ResourcePathExpression 

37 

38from . import fits 

39from ._generalized_image import GeneralizedImage 

40from ._geom import YX, Box, NoOverlapError 

41from ._transforms import Frame, Projection, ProjectionSerializationModel 

42from .serialization import ( 

43 ArchiveReadError, 

44 ArchiveTree, 

45 ArrayReferenceModel, 

46 InputArchive, 

47 IntegerType, 

48 MetadataValue, 

49 NumberType, 

50 OutputArchive, 

51 is_integer, 

52 no_header_updates, 

53) 

54from .utils import is_none 

55 

56 

57@dataclasses.dataclass(frozen=True) 

58class MaskPlane: 

59 """Name and description of a single plane in a mask array.""" 

60 

61 name: str 

62 """Unique name for the mask plane (`str`).""" 

63 

64 description: str 

65 """Human-readable documentation for the mask plane (`str`).""" 

66 

67 @classmethod 

68 def read_legacy(cls, header: astropy.io.fits.Header) -> dict[str, int]: 

69 """Read mask plane descriptions written by 

70 `lsst.afw.image.Mask.writeFits`. 

71 

72 Parameters 

73 ---------- 

74 header 

75 FITS header. 

76 

77 Returns 

78 ------- 

79 `dict` [`str`, `int`] 

80 A dictionary mapping mask plane name to integer bit index. 

81 """ 

82 result: dict[str, int] = {} 

83 for card in list(header.cards): 

84 if card.keyword.startswith("MP_"): 

85 result[card.keyword.removeprefix("MP_")] = card.value 

86 del header[card.keyword] 

87 return result 

88 

89 

90@dataclasses.dataclass(frozen=True) 

91class MaskPlaneBit: 

92 """The nested array index and mask value associated with a single mask 

93 plane. 

94 """ 

95 

96 index: int 

97 """Index into the last dimension of the mask array where this plane's bit 

98 is stored. 

99 """ 

100 

101 mask: np.integer 

102 """Bitmask that selects just this plane's bit from a mask array value 

103 (`numpy.integer`). 

104 """ 

105 

106 @classmethod 

107 def compute(cls, overall_index: int, stride: int, mask_type: type[np.integer]) -> MaskPlaneBit: 

108 """Construct a `MaskPlaneBit` from the overall index of a plane in a 

109 `MaskSchema` and the stride (number of bits per mask array element). 

110 """ 

111 index, bit = divmod(overall_index, stride) 

112 return cls(index, mask_type(1 << bit)) 

113 

114 

115class MaskSchema: 

116 """A schema for a bit-packed mask array. 

117 

118 Parameters 

119 ---------- 

120 planes 

121 Iterable of `MaskPlane` instances that define the schema. `None` 

122 values may be included to reserve bits for future use. 

123 dtype 

124 The numpy data type of the mask arrays that use this schema. 

125 

126 Notes 

127 ----- 

128 A `MaskSchema` is a collection of mask planes, which each correspond to a 

129 single bit in a mask array. Mask schemas are immutable and associated with 

130 a particular array data type, allowing them to safely precompute the index 

131 and bitmask for each plane. 

132 

133 `MaskSchema` indexing is by integer (the overall index of a plane in the 

134 schema). The `descriptions` attribute may be indexed by plane name to get 

135 the description for that plane, and the `bitmask` method can be used to 

136 obtain an array that can be used to select one or more planes by name in 

137 a mask array that uses this schema. 

138 

139 If no mask planes are provided, a `None` placeholder is automatically 

140 added. 

141 """ 

142 

143 def __init__(self, planes: Iterable[MaskPlane | None], dtype: npt.DTypeLike = np.uint8): 

144 self._planes: tuple[MaskPlane | None, ...] = tuple(planes) or (None,) 

145 self._dtype = cast(np.dtype[np.integer], np.dtype(dtype)) 

146 stride = self.bits_per_element(self._dtype) 

147 self._descriptions = {plane.name: plane.description for plane in self._planes if plane is not None} 

148 self._mask_size = math.ceil(len(self._planes) / stride) 

149 self._bits: dict[str, MaskPlaneBit] = { 

150 plane.name: MaskPlaneBit.compute(n, stride, self._dtype.type) 

151 for n, plane in enumerate(self._planes) 

152 if plane is not None 

153 } 

154 

155 @staticmethod 

156 def bits_per_element(dtype: npt.DTypeLike) -> int: 

157 """Return the number of mask bits per array element for the given 

158 data type. 

159 """ 

160 dtype = np.dtype(dtype) 

161 match dtype.kind: 

162 case "u": 

163 return dtype.itemsize * 8 

164 case "i": 

165 return dtype.itemsize * 8 - 1 

166 case _: 

167 raise TypeError(f"dtype for masks must be an integer; got {dtype} with kind={dtype.kind}.") 

168 

169 def __iter__(self) -> Iterator[MaskPlane | None]: 

170 return iter(self._planes) 

171 

172 def __len__(self) -> int: 

173 return len(self._planes) 

174 

175 def __getitem__(self, i: int) -> MaskPlane | None: 

176 return self._planes[i] 

177 

178 def __repr__(self) -> str: 

179 return f"MaskSchema({list(self._planes)}, dtype={self._dtype!r})" 

180 

181 def __str__(self) -> str: 

182 return "\n".join( 

183 [ 

184 f"{name} [{bit.index}@{hex(bit.mask)}]: {self._descriptions[name]}" 

185 for name, bit in self._bits.items() 

186 ] 

187 ) 

188 

189 def __eq__(self, other: object) -> bool: 

190 if isinstance(other, MaskSchema): 

191 return self._planes == other._planes and self._dtype == other._dtype 

192 return False 

193 

194 @property 

195 def dtype(self) -> np.dtype: 

196 """The numpy data type of the mask arrays that use this schema.""" 

197 return self._dtype 

198 

199 @property 

200 def mask_size(self) -> int: 

201 """The number of elements in the last dimension of any mask array that 

202 uses this schema. 

203 """ 

204 return self._mask_size 

205 

206 @property 

207 def names(self) -> Set[str]: 

208 """The names of the mask planes, in bit order.""" 

209 return self._bits.keys() 

210 

211 @property 

212 def descriptions(self) -> Mapping[str, str]: 

213 """A mapping from plane name to description.""" 

214 return self._descriptions 

215 

216 def bit(self, plane: str) -> MaskPlaneBit: 

217 """Return the last array index and mask for the given mask plane.""" 

218 return self._bits[plane] 

219 

220 def bitmask(self, *planes: str) -> np.ndarray: 

221 """Return a 1-d mask array that represents the union (i.e. bitwise OR) 

222 of the planes with the given names. 

223 

224 Parameters 

225 ---------- 

226 *planes 

227 Mask plane names. 

228 

229 Returns 

230 ------- 

231 numpy.ndarray 

232 A 1-d array with shape ``(mask_size,)``. 

233 """ 

234 result = np.zeros(self.mask_size, dtype=self._dtype) 

235 for plane in planes: 

236 bit = self._bits[plane] 

237 result[bit.index] |= bit.mask 

238 return result 

239 

240 def split(self, dtype: npt.DTypeLike) -> list[MaskSchema]: 

241 """Split the schema into an equivalent series of schemas that each 

242 have a `mask_size` of ``1``, dropping all `None` placeholders. 

243 

244 Parameters 

245 ---------- 

246 dtype 

247 Data type of the new mask pixels. 

248 

249 Returns 

250 ------- 

251 `list` [`MaskSchema`] 

252 A list of mask schemas that together include all planes in 

253 ``self`` and have `mask_size` equal to ``1``. If there are no 

254 mask planes (only `None` placeholders) in ``self``, a single mask 

255 schema with a `None` placeholder is returned; otherwise `None` 

256 placeholders are returned. 

257 """ 

258 dtype = np.dtype(dtype) 

259 planes: list[MaskPlane] = [] 

260 schemas: list[MaskSchema] = [] 

261 n_planes_per_schema = self.bits_per_element(dtype) 

262 for plane in self._planes: 

263 if plane is not None: 

264 planes.append(plane) 

265 if len(planes) == n_planes_per_schema: 

266 schemas.append(MaskSchema(planes, dtype=dtype)) 

267 planes.clear() 

268 if planes: 

269 schemas.append(MaskSchema(planes, dtype=dtype)) 

270 if not schemas: 

271 schemas.append(MaskSchema([None], dtype=dtype)) 

272 return schemas 

273 

274 def update_header(self, header: astropy.io.fits.Header) -> None: 

275 """Add a description of this mask schema to a FITS header.""" 

276 for n, plane in enumerate(self): 

277 if plane is not None: 

278 bit = self.bit(plane.name) 

279 if bit.index != 0: 

280 raise TypeError("Only mask schemas with mask_size==1 can be described in FITS.") 

281 header.set(f"MSKN{n + 1:04d}", plane.name, f"Name for mask plane {n + 1}.") 

282 header.set(f"MSKM{n + 1:04d}", bit.mask, f"Bitmask for plane n={n + 1}; always 1<<(n-1).") 

283 # We don't add a comment to the description card, because it's 

284 # likely to overrun a single card and get the CONTINUE 

285 # treatment . That will cause Astropy to warn about the comment 

286 # being truncated and that's worse than just leaving it 

287 # unexplained; it's pretty obvious from context what it is. 

288 header.set(f"MSKD{n + 1:04d}", plane.description) 

289 

290 def strip_header(self, header: astropy.io.fits.Header) -> None: 

291 """Remove all header cards added by `update_header`.""" 

292 for n, plane in enumerate(self): 

293 if plane is not None: 

294 header.remove(f"MSKN{n + 1:04d}", ignore_missing=True) 

295 header.remove(f"MSKM{n + 1:04d}", ignore_missing=True) 

296 header.remove(f"MSKD{n + 1:04d}", ignore_missing=True) 

297 

298 

299class Mask(GeneralizedImage): 

300 """A 2-d bitmask image backed by a 3-d byte array. 

301 

302 Parameters 

303 ---------- 

304 array_or_fill 

305 Array or fill value for the mask. If a fill value, ``bbox`` or 

306 ``shape`` must be provided. 

307 schema 

308 Schema that defines the planes and their bit assignments. 

309 bbox 

310 Bounding box for the mask. This sets the shape of the first two 

311 dimensions of the array. 

312 start 

313 Logical coordinates of the first pixel in the array, ordered ``y``, 

314 ``x`` (unless an `XY` instance is passed). Ignored if 

315 ``bbox`` is provided. Defaults to zeros. 

316 shape 

317 Leading dimensions of the array, ordered ``y``, ``x`` (unless an `XY` 

318 instance is passed). Only needed if ``array_or_fill`` is not an 

319 array and ``bbox`` is not provided. Like the bbox, this does not 

320 include the last dimension of the array. 

321 projection 

322 Projection that maps the pixel grid to the sky. 

323 obs_info 

324 General information about the associated observation in standardized 

325 form. 

326 metadata 

327 Arbitrary flexible metadata to associate with the mask. 

328 

329 Notes 

330 ----- 

331 Indexing the `array` attribute of a `Mask` does not take into account its 

332 ``start`` offset, but accessing a subimage mask by indexing a `Mask` with 

333 a `Box` does, and the `bbox` of the subimage is set to match its location 

334 within the original mask. 

335 

336 A mask's ``bbox`` corresponds to the leading dimensions of its backing 

337 `numpy.ndarray`, while the last dimension's size is always equal to the 

338 `~MaskSchema.mask_size` of its schema, since a schema can in general 

339 require multiple array elements to represent all of its planes. 

340 """ 

341 

342 def __init__( 

343 self, 

344 array_or_fill: np.ndarray | int = 0, 

345 /, 

346 *, 

347 schema: MaskSchema, 

348 bbox: Box | None = None, 

349 start: Sequence[int] | None = None, 

350 shape: Sequence[int] | None = None, 

351 projection: Projection | None = None, 

352 obs_info: ObservationInfo | None = None, 

353 metadata: dict[str, MetadataValue] | None = None, 

354 ): 

355 super().__init__(metadata) 

356 if shape is not None: 

357 shape = tuple(shape) 

358 if start is not None: 

359 start = tuple(start) 

360 if isinstance(array_or_fill, np.ndarray): 

361 array = np.array(array_or_fill, dtype=schema.dtype) 

362 if array.ndim != 3: 

363 raise ValueError("Mask array must be 3-d.") 

364 if bbox is None: 

365 bbox = Box.from_shape(array.shape[:-1], start=start) 

366 elif bbox.shape + (schema.mask_size,) != array.shape: 

367 raise ValueError( 

368 f"Explicit bbox shape {bbox.shape} and schema of size {schema.mask_size} do not " 

369 f"match array with shape {array.shape}." 

370 ) 

371 if shape is not None and shape + (schema.mask_size,) != array.shape: 

372 raise ValueError( 

373 f"Explicit shape {shape} and schema of size {schema.mask_size} do " 

374 f"not match array with shape {array.shape}." 

375 ) 

376 

377 else: 

378 if bbox is None: 

379 if shape is None: 

380 raise TypeError("No bbox, size, or array provided.") 

381 bbox = Box.from_shape(shape, start=start) 

382 array = np.full(bbox.shape + (schema.mask_size,), array_or_fill, dtype=schema.dtype) 

383 self._array = array 

384 self._bbox: Box = bbox 

385 self._schema: MaskSchema = schema 

386 self._projection = projection 

387 self._obs_info = obs_info 

388 

389 @property 

390 def array(self) -> np.ndarray: 

391 """The low-level array (`numpy.ndarray`). 

392 

393 Assigning to this attribute modifies the existing array in place; the 

394 bounding box and underlying data pointer are never changed. 

395 """ 

396 return self._array 

397 

398 @array.setter 

399 def array(self, value: np.ndarray | int) -> None: 

400 self._array[:, :] = value 

401 

402 @property 

403 def schema(self) -> MaskSchema: 

404 """Schema that defines the planes and their bit assignments 

405 (`MaskSchema`). 

406 """ 

407 return self._schema 

408 

409 @property 

410 def bbox(self) -> Box: 

411 """2-d bounding box of the mask (`Box`). 

412 

413 This sets the shape of the first two dimensions of the array. 

414 """ 

415 return self._bbox 

416 

417 @property 

418 def projection(self) -> Projection[Any] | None: 

419 """The projection that maps this mask's pixel grid to the sky 

420 (`Projection` | `None`). 

421 

422 Notes 

423 ----- 

424 The pixel coordinates used by this projection account for the bounding 

425 box ``start``; they are not just array indices. 

426 """ 

427 return self._projection 

428 

429 @property 

430 def obs_info(self) -> ObservationInfo | None: 

431 """General information about the associated observation in standard 

432 form. (`~astro_metadata_translator.ObservationInfo` | `None`). 

433 """ 

434 return self._obs_info 

435 

436 def __getitem__(self, bbox: Box | EllipsisType) -> Mask: 

437 if bbox is ...: 

438 return self 

439 super().__getitem__(bbox) 

440 return self._transfer_metadata( 

441 Mask( 

442 self.array[bbox.y.slice_within(self._bbox.y), bbox.x.slice_within(self._bbox.x), :], 

443 bbox=bbox, 

444 schema=self.schema, 

445 ), 

446 bbox=bbox, 

447 ) 

448 

449 def __setitem__(self, bbox: Box | EllipsisType, value: Mask) -> None: 

450 subview = self[bbox] 

451 subview.clear() 

452 subview.update(value) 

453 

454 def __str__(self) -> str: 

455 return f"Mask({self.bbox!s}, {list(self.schema.names)})" 

456 

457 def __repr__(self) -> str: 

458 return f"Mask(..., bbox={self.bbox!r}, schema={self.schema!r})" 

459 

460 def __eq__(self, other: object) -> bool: 

461 if not isinstance(other, Mask): 

462 return NotImplemented 

463 return ( 

464 self._bbox == other._bbox 

465 and self._schema == other._schema 

466 and np.array_equal(self._array, other._array, equal_nan=True) 

467 ) 

468 

469 def copy(self) -> Mask: 

470 """Deep-copy the mask and metadata.""" 

471 return self._transfer_metadata( 

472 Mask( 

473 self._array.copy(), 

474 bbox=self._bbox, 

475 schema=self._schema, 

476 projection=self._projection, 

477 obs_info=self._obs_info, 

478 ), 

479 copy=True, 

480 ) 

481 

482 def view( 

483 self, 

484 *, 

485 schema: MaskSchema | EllipsisType = ..., 

486 projection: Projection | None | EllipsisType = ..., 

487 start: Sequence[int] | EllipsisType = ..., 

488 obs_info: ObservationInfo | None | EllipsisType = ..., 

489 ) -> Mask: 

490 """Make a view of the mask, with optional updates. 

491 

492 Notes 

493 ----- 

494 This can only be used to make changes to schema descriptions; plane 

495 names must remain the same (in the same order). 

496 """ 

497 if schema is ...: 

498 schema = self._schema 

499 else: 

500 if list(schema.names) != list(self.schema.names): 

501 raise ValueError("Cannot create a mask view with a schema with different names.") 

502 if projection is ...: 

503 projection = self._projection 

504 if start is ...: 

505 start = self._bbox.start 

506 if obs_info is ...: 

507 obs_info = self._obs_info 

508 return self._transfer_metadata( 

509 Mask(self._array, start=start, schema=schema, projection=projection, obs_info=obs_info) 

510 ) 

511 

512 def update(self, other: Mask) -> None: 

513 """Update ``self`` to include all common mask values set in ``other``. 

514 

515 Notes 

516 ----- 

517 This only operates on the intersection of the two mask bounding boxes 

518 and the mask planes that are present in both. Mask bits are only set, 

519 not cleared (i.e. this uses ``|=`` updates, not ``=`` assignments). 

520 """ 

521 lhs = self 

522 rhs = other 

523 if other.bbox != self.bbox: 

524 try: 

525 bbox = self.bbox.intersection(other.bbox) 

526 except NoOverlapError: 

527 return 

528 lhs = self[bbox] 

529 rhs = other[bbox] 

530 for name in self.schema.names & other.schema.names: 

531 lhs.set(name, rhs.get(name)) 

532 

533 def get(self, plane: str) -> np.ndarray: 

534 """Return a 2-d boolean array for the given mask plane. 

535 

536 Parameters 

537 ---------- 

538 plane 

539 Name of the mask plane. 

540 

541 Returns 

542 ------- 

543 numpy.ndarray 

544 A 2-d boolean array with the same shape as `bbox` that is `True` 

545 where the bit for ``plane`` is set and `False` elsewhere. 

546 """ 

547 bit = self.schema.bit(plane) 

548 return (self._array[..., bit.index] & bit.mask).astype(bool) 

549 

550 def set(self, plane: str, boolean_mask: np.ndarray | EllipsisType = ...) -> None: 

551 """Set a mask plane. 

552 

553 Parameters 

554 ---------- 

555 plane 

556 Name of the mask plane to set 

557 boolean_mask 

558 A 2-d boolean array with the same shape as `bbox` that is `True` 

559 where the bit for ``plane`` should be set and `False` where it 

560 should be left unchanged (*not* set to zero). May be ``...`` to 

561 set the bit everywhere. 

562 """ 

563 bit = self.schema.bit(plane) 

564 if boolean_mask is not ...: 

565 boolean_mask = boolean_mask.astype(bool) 

566 self._array[boolean_mask, bit.index] |= bit.mask 

567 

568 def clear(self, plane: str | None = None, boolean_mask: np.ndarray | EllipsisType = ...) -> None: 

569 """Clear one or more mask planes. 

570 

571 Parameters 

572 ---------- 

573 plane 

574 Name of the mask plane to set. If `None` all mask planes are 

575 cleared. 

576 boolean_mask 

577 A 2-d boolean array with the same shape as `bbox` that is `True` 

578 where the bit for ``plane`` should be cleared and `False` where it 

579 should be left unchanged. May be ``...`` to clear the bit 

580 everywhere. 

581 """ 

582 if boolean_mask is not ...: 

583 boolean_mask = boolean_mask.astype(bool) 

584 if plane is None: 

585 self._array[boolean_mask, :] = 0 

586 else: 

587 bit = self.schema.bit(plane) 

588 self._array[boolean_mask, bit.index] &= ~bit.mask 

589 

590 def serialize[P: pydantic.BaseModel]( 

591 self, 

592 archive: OutputArchive[P], 

593 *, 

594 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

595 save_projection: bool = True, 

596 save_obs_info: bool = True, 

597 add_offset_wcs: str | None = "A", 

598 ) -> MaskSerializationModel[P]: 

599 """Serialize the mask to an output archive. 

600 

601 Parameters 

602 ---------- 

603 archive 

604 Archive to write to. 

605 update_header 

606 A callback that will be given the FITS header for the HDU 

607 containing this mask in order to add keys to it. This callback 

608 may be provided but will not be called if the output format is not 

609 FITS. As multiple HDUs may be added, this function may be called 

610 multiple times. 

611 save_projection 

612 If `True`, save the `Projection` attached to the image, if there 

613 is one. This does not affect whether a FITS WCS corresponding to 

614 the projection is written (it always is, if available, and if 

615 ``add_offset_wcs`` is not ``" "``). 

616 save_obs_info 

617 If `True`, save the 

618 `~astro_metadata_translator.ObservationInfo` attached to the 

619 image, if there is one. 

620 add_offset_wcs 

621 A FITS WCS single-character suffix to use when adding a linear 

622 WCS that maps the FITS array to the logical pixel coordinates 

623 defined by ``bbox.start``. Set to `None` to not write this WCS. 

624 If this is set to ``" "``, it will prevent the `Projection` from 

625 being saved as a FITS WCS. 

626 """ 

627 data: list[ArrayReferenceModel] = [] 

628 for schema_2d in self.schema.split(np.int32): 

629 mask_2d = Mask( 

630 0, bbox=self.bbox, schema=schema_2d, projection=self._projection, obs_info=self._obs_info 

631 ) 

632 mask_2d.update(self) 

633 data.append( 

634 mask_2d._serialize_2d(archive, update_header=update_header, add_offset_wcs=add_offset_wcs) 

635 ) 

636 serialized_projection: ProjectionSerializationModel[P] | None = None 

637 if save_projection and self.projection is not None: 

638 serialized_projection = archive.serialize_direct("projection", self.projection.serialize) 

639 serialized_dtype = NumberType.from_numpy(self.schema.dtype) 

640 assert is_integer(serialized_dtype), "Mask dtypes should always be integers." 

641 return MaskSerializationModel.model_construct( 

642 data=data, 

643 start=list(self.bbox.start), 

644 planes=list(self.schema), 

645 dtype=serialized_dtype, 

646 projection=serialized_projection, 

647 obs_info=self._obs_info if save_obs_info else None, 

648 metadata=self.metadata, 

649 ) 

650 

651 def _serialize_2d[P: pydantic.BaseModel]( 

652 self, 

653 archive: OutputArchive[P], 

654 *, 

655 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

656 add_offset_wcs: str | None = "A", 

657 ) -> ArrayReferenceModel: 

658 def _update_header(header: astropy.io.fits.Header) -> None: 

659 update_header(header) 

660 self.schema.update_header(header) 

661 if self.projection is not None and add_offset_wcs != " ": 

662 if self.fits_wcs: 

663 header.update(self.fits_wcs.to_header(relax=True)) 

664 if add_offset_wcs is not None: 

665 fits.add_offset_wcs(header, x=self.bbox.x.start, y=self.bbox.y.start, key=add_offset_wcs) 

666 

667 assert self.array.shape[2] == 1, "Mask should be split before calling this method." 

668 return archive.add_array(self._array[:, :, 0], update_header=_update_header) 

669 

670 @classmethod 

671 def deserialize( 

672 cls, 

673 model: MaskSerializationModel[Any], 

674 archive: InputArchive[Any], 

675 *, 

676 bbox: Box | None = None, 

677 strip_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

678 ) -> Mask: 

679 """Deserialize a mask from an input archive. 

680 

681 Parameters 

682 ---------- 

683 model 

684 A Pydantic model representation of the mask, holding references 

685 to data stored in the archive. 

686 archive 

687 Archive to read from. 

688 bbox 

689 Bounding box of a subimage to read instead. 

690 strip_header 

691 A callable that strips out any FITS header cards added by the 

692 ``update_header`` argument in the corresponding call to 

693 `serialize`. 

694 """ 

695 slices: tuple[slice, ...] | EllipsisType = ... 

696 if bbox is not None: 

697 slices = bbox.slice_within(model.bbox) 

698 else: 

699 bbox = model.bbox 

700 if not is_integer(model.dtype): 

701 raise ArchiveReadError(f"Mask array has a non-integer dtype: {model.dtype}.") 

702 schema = MaskSchema(model.planes, dtype=model.dtype.to_numpy()) 

703 projection = ( 

704 Projection.deserialize(model.projection, archive) if model.projection is not None else None 

705 ) 

706 result = Mask( 

707 0, 

708 schema=schema, 

709 bbox=bbox, 

710 projection=projection, 

711 obs_info=model.obs_info, 

712 ) 

713 schemas_2d = schema.split(np.int32) 

714 if len(schemas_2d) != len(model.data): 

715 raise ArchiveReadError( 

716 f"Number of mask arrays ({len(model.data)}) does not match expectation ({len(schemas_2d)})." 

717 ) 

718 for ref, schema_2d in zip(model.data, schemas_2d): 

719 mask_2d = cls._deserialize_2d( 

720 ref, schema_2d, bbox.start, archive, strip_header=strip_header, slices=slices 

721 ) 

722 result.update(mask_2d) 

723 return result._finish_deserialize(model) 

724 

725 @classmethod 

726 def _deserialize_2d( 

727 cls, 

728 ref: ArrayReferenceModel, 

729 schema_2d: MaskSchema, 

730 start: Sequence[int], 

731 archive: InputArchive[Any], 

732 *, 

733 slices: tuple[slice, ...] | EllipsisType = ..., 

734 strip_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

735 ) -> Mask: 

736 def _strip_header(header: astropy.io.fits.Header) -> None: 

737 strip_header(header) 

738 schema_2d.strip_header(header) 

739 fits.strip_wcs_cards(header) 

740 

741 array_2d = archive.get_array(ref, strip_header=_strip_header, slices=slices) 

742 return Mask(array_2d[:, :, np.newaxis], schema=schema_2d, start=start) 

743 

744 @staticmethod 

745 def _get_archive_tree_type[P: pydantic.BaseModel]( 

746 pointer_type: type[P], 

747 ) -> type[MaskSerializationModel[P]]: 

748 """Return the serialization model type for this object for an archive 

749 type that uses the given pointer type. 

750 """ 

751 return MaskSerializationModel[pointer_type] # type: ignore 

752 

753 _archive_default_name: ClassVar[str] = "mask" 

754 """The name this object should be serialized with when written as the 

755 top-level object. 

756 """ 

757 

758 def write_fits( 

759 self, 

760 filename: str, 

761 *, 

762 compression: fits.FitsCompressionOptions | None = fits.FitsCompressionOptions.DEFAULT, 

763 ) -> None: 

764 """Write the mask to a FITS file. 

765 

766 Parameters 

767 ---------- 

768 filename 

769 Name of the file to write to. Must be a local file. 

770 compression 

771 Compression options. 

772 """ 

773 compression_options = {} 

774 if compression is not fits.FitsCompressionOptions.DEFAULT: 

775 compression_options[self._archive_default_name] = compression 

776 fits.write(self, filename, compression_options) 

777 

778 @staticmethod 

779 def read_fits(url: ResourcePathExpression, *, bbox: Box | None = None) -> Mask: 

780 """Read an image from a FITS file. 

781 

782 Parameters 

783 ---------- 

784 url 

785 URL of the file to read; may be any type supported by 

786 `lsst.resources.ResourcePath`. 

787 bbox 

788 Bounding box of a subimage to read instead. 

789 """ 

790 return fits.read(Mask, url, bbox=bbox).deserialized 

791 

792 @staticmethod 

793 def from_legacy( 

794 legacy: Any, 

795 plane_map: Mapping[str, MaskPlane] | None = None, 

796 ) -> Mask: 

797 """Convert from an `lsst.afw.image.Mask` instance. 

798 

799 Parameters 

800 ---------- 

801 legacy 

802 An `lsst.afw.image.Mask` instance. This will not share pixel 

803 data with the new object. 

804 plane_map 

805 A mapping from legacy mask plane name to the new plane name and 

806 description. 

807 """ 

808 return Mask._from_legacy_array( 

809 legacy.array, 

810 legacy.getMaskPlaneDict(), 

811 start=YX(y=legacy.getY0(), x=legacy.getX0()), 

812 plane_map=plane_map, 

813 ) 

814 

815 def to_legacy(self, plane_map: Mapping[str, MaskPlane] | None = None) -> Any: 

816 """Convert to an `lsst.afw.image.Mask` instance. 

817 

818 The pixel data will not be shared between the two objects. 

819 

820 Parameters 

821 ---------- 

822 plane_map 

823 A mapping from legacy mask plane name to the new plane name and 

824 description. 

825 """ 

826 import lsst.afw.image 

827 import lsst.geom 

828 

829 result = lsst.afw.image.Mask(self.bbox.to_legacy()) 

830 if plane_map is None: 

831 plane_map = {plane.name: plane for plane in self.schema if plane is not None} 

832 for old_name, new_plane in plane_map.items(): 

833 old_bit = result.addMaskPlane(old_name) 

834 old_bitmask = 1 << old_bit 

835 result.array[self.get(new_plane.name)] |= old_bitmask 

836 return result 

837 

838 @staticmethod 

839 def _from_legacy_array( 

840 array2d: np.ndarray, 

841 old_planes: Mapping[str, int], 

842 *, 

843 start: YX[int], 

844 plane_map: Mapping[str, MaskPlane] | None = None, 

845 projection: Projection | None = None, 

846 ) -> Mask: 

847 planes: list[MaskPlane] = [] 

848 new_name_to_old_bitmask: dict[str, int] = {} 

849 for old_name, old_bit in old_planes.items(): 

850 old_bitmask = 1 << old_bit 

851 if plane_map is not None: 

852 if new_plane := plane_map.get(old_name): 

853 planes.append(new_plane) 

854 new_name_to_old_bitmask[new_plane.name] = old_bitmask 

855 else: 

856 if n_orphaned := np.count_nonzero(array2d & old_bitmask): 

857 raise RuntimeError( 

858 f"Legacy mask plane {old_name!r} is not remapped, " 

859 f"but {n_orphaned} pixels have this bit set." 

860 ) 

861 else: 

862 planes.append(MaskPlane(old_name, "")) 

863 new_name_to_old_bitmask[old_name] = old_bitmask 

864 schema = MaskSchema(planes) 

865 mask = Mask(0, schema=schema, start=start, shape=array2d.shape, projection=projection) 

866 for new_name, old_bitmask in new_name_to_old_bitmask.items(): 

867 mask.set(new_name, array2d & old_bitmask) 

868 return mask 

869 

870 @staticmethod 

871 def read_legacy( 

872 uri: ResourcePathExpression, 

873 *, 

874 plane_map: Mapping[str, MaskPlane] | None = None, 

875 ext: str | int = 1, 

876 fits_wcs_frame: Frame | None = None, 

877 ) -> Mask: 

878 """Read a FITS file written by `lsst.afw.image.Mask.writeFits`. 

879 

880 Parameters 

881 ---------- 

882 uri 

883 URI or file name. 

884 plane_map 

885 A mapping from legacy mask plane name to the new plane name and 

886 description. 

887 ext 

888 Name or index of the FITS HDU to read. 

889 fits_wcs_frame 

890 If not `None` and the HDU containing the mask has a FITS WCS, 

891 attach a `Projection` to the returned mask by converting that WCS. 

892 """ 

893 opaque_metadata = fits.FitsOpaqueMetadata() 

894 fs, fspath = ResourcePath(uri).to_fsspec() 

895 with fs.open(fspath) as stream, astropy.io.fits.open(stream) as hdu_list: 

896 opaque_metadata.extract_legacy_primary_header(hdu_list[0].header) 

897 result = Mask._read_legacy_hdu( 

898 hdu_list[ext], opaque_metadata, plane_map=plane_map, fits_wcs_frame=fits_wcs_frame 

899 ) 

900 result._opaque_metadata = opaque_metadata 

901 return result 

902 

903 @staticmethod 

904 def _read_legacy_hdu( 

905 hdu: astropy.io.fits.ImageHDU | astropy.io.fits.CompImageHDU | astropy.io.fits.BinTableHDU, 

906 opaque_metadata: fits.FitsOpaqueMetadata, 

907 plane_map: Mapping[str, MaskPlane] | None = None, 

908 fits_wcs_frame: Frame | None = None, 

909 ) -> Mask: 

910 if isinstance(hdu, astropy.io.fits.BinTableHDU): 

911 hdu = astropy.io.fits.CompImageHDU(bintable=hdu) 

912 dx: int = hdu.header.pop("LTV1") 

913 dy: int = hdu.header.pop("LTV2") 

914 start = YX(y=-dy, x=-dx) 

915 old_planes = MaskPlane.read_legacy(hdu.header) 

916 projection: Projection | None = None 

917 if fits_wcs_frame is not None: 

918 try: 

919 fits_wcs = astropy.wcs.WCS(hdu.header) 

920 except KeyError: 

921 pass 

922 else: 

923 projection = Projection.from_fits_wcs( 

924 fits_wcs, pixel_frame=fits_wcs_frame, x0=start.x, y0=start.y 

925 ) 

926 mask = Mask._from_legacy_array( 

927 hdu.data, old_planes, start=start, plane_map=plane_map, projection=projection 

928 ) 

929 fits.strip_wcs_cards(hdu.header) 

930 hdu.header.strip() 

931 hdu.header.remove("EXTTYPE", ignore_missing=True) 

932 hdu.header.remove("INHERIT", ignore_missing=True) 

933 # afw set BUNIT on masks because of limitations in how FITS 

934 # metadata is handled there. 

935 hdu.header.remove("BUNIT", ignore_missing=True) 

936 opaque_metadata.add_header(hdu.header) 

937 return mask 

938 

939 

940class MaskSerializationModel[P: pydantic.BaseModel](ArchiveTree): 

941 """Pydantic model used to represent the serialized form of a `.Mask`.""" 

942 

943 data: list[ArrayReferenceModel] = pydantic.Field(description="References to pixel data.") 

944 start: list[int] = pydantic.Field( 

945 description="Coordinate of the first pixels in the array, ordered (y, x)." 

946 ) 

947 planes: list[MaskPlane | None] = pydantic.Field(description="Definitions of the bitplanes in the mask.") 

948 dtype: IntegerType = pydantic.Field(description="Data type of the in-memory mask.") 

949 projection: ProjectionSerializationModel[P] | None = pydantic.Field( 

950 default=None, 

951 exclude_if=is_none, 

952 description="Projection that maps the logical pixel grid onto the sky.", 

953 ) 

954 obs_info: ObservationInfo | None = pydantic.Field( 

955 default=None, 

956 exclude_if=is_none, 

957 description="Standardized description of image metadata", 

958 ) 

959 

960 @property 

961 def bbox(self) -> Box: 

962 """The 2-d bounding box of the mask.""" 

963 return Box.from_shape(self.data[0].shape, start=self.start) 

964 

965 

966def get_legacy_visit_image_mask_planes() -> dict[str, MaskPlane]: 

967 """Return a mapping from legacy mask plane name to `MaskPlane` instance 

968 for LSST visit images, c. DP2. 

969 """ 

970 return { 

971 "BAD": MaskPlane("BAD", "Bad pixel in the instrument, including bad amplifiers."), 

972 "SAT": MaskPlane( 

973 "SATURATED", "Pixel was saturated or affected by saturation in a neighboring pixel." 

974 ), 

975 "INTRP": MaskPlane("INTERPOLATED", "Original pixel value was interpolated."), 

976 "CR": MaskPlane("COSMIC_RAY", "A cosmic ray affected this pixel."), 

977 "EDGE": MaskPlane( 

978 "DETECTION_EDGE", 

979 "Pixel was too close to the edge to be considered for detection, " 

980 "due to the finite size of the detection kernel.", 

981 ), 

982 "DETECTED": MaskPlane("DETECTED", "Pixel was part of a detected source."), 

983 "SUSPECT": MaskPlane("SUSPECT", "Pixel was close to the saturation level. "), 

984 "NO_DATA": MaskPlane("NO_DATA", "No data was available for this pixel."), 

985 "VIGNETTED": MaskPlane("VIGNETTED", "Pixel was vignetted by the optics."), 

986 "PARTLY_VIGNETTED": MaskPlane("PARTLY_VIGNETTED", "Pixel was partly vignetted by the optics."), 

987 "CROSSTALK": MaskPlane("CROSSTALK", "Pixel was affected by crosstalk and corrected accordingly."), 

988 "ITL_DIP": MaskPlane( 

989 "ITL_DIP", "Pixel was affected by a dark vertical trail from a bright source, on an ITL CCD." 

990 ), 

991 "NOT_DEBLENDED": MaskPlane( 

992 "NOT_DEBLENDED", 

993 "Pixel belonged to a detection that was not deblended, usually due to size limits.", 

994 ), 

995 "SPIKE": MaskPlane( 

996 "SPIKE", "Pixel is in the neighborhood of a diffraction spike from a bright star." 

997 ), 

998 }