Coverage for python / lsst / images / _mask.py: 24%

369 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 09:07 +0000

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14__all__ = ( 

15 "Mask", 

16 "MaskPlane", 

17 "MaskPlaneBit", 

18 "MaskSchema", 

19 "MaskSerializationModel", 

20 "get_legacy_visit_image_mask_planes", 

21) 

22 

23import dataclasses 

24import math 

25from collections.abc import Callable, Iterable, Iterator, Mapping, Sequence, Set 

26from types import EllipsisType 

27from typing import Any, ClassVar, cast 

28 

29import astropy.io.fits 

30import astropy.wcs 

31import numpy as np 

32import numpy.typing as npt 

33import pydantic 

34from astro_metadata_translator import ObservationInfo 

35 

36from lsst.resources import ResourcePath, ResourcePathExpression 

37 

38from . import fits 

39from ._generalized_image import GeneralizedImage 

40from ._geom import YX, Box, NoOverlapError 

41from ._transforms import Frame, Projection, ProjectionSerializationModel 

42from .serialization import ( 

43 ArchiveReadError, 

44 ArchiveTree, 

45 ArrayReferenceModel, 

46 InlineArrayModel, 

47 InputArchive, 

48 IntegerType, 

49 MetadataValue, 

50 NumberType, 

51 OutputArchive, 

52 is_integer, 

53 no_header_updates, 

54) 

55from .utils import is_none 

56 

57 

58@dataclasses.dataclass(frozen=True) 

59class MaskPlane: 

60 """Name and description of a single plane in a mask array.""" 

61 

62 name: str 

63 """Unique name for the mask plane (`str`).""" 

64 

65 description: str 

66 """Human-readable documentation for the mask plane (`str`).""" 

67 

68 @classmethod 

69 def read_legacy(cls, header: astropy.io.fits.Header) -> dict[str, int]: 

70 """Read mask plane descriptions written by 

71 `lsst.afw.image.Mask.writeFits`. 

72 

73 Parameters 

74 ---------- 

75 header 

76 FITS header. 

77 

78 Returns 

79 ------- 

80 `dict` [`str`, `int`] 

81 A dictionary mapping mask plane name to integer bit index. 

82 """ 

83 result: dict[str, int] = {} 

84 for card in list(header.cards): 

85 if card.keyword.startswith("MP_"): 

86 result[card.keyword.removeprefix("MP_")] = card.value 

87 del header[card.keyword] 

88 return result 

89 

90 

91@dataclasses.dataclass(frozen=True) 

92class MaskPlaneBit: 

93 """The nested array index and mask value associated with a single mask 

94 plane. 

95 """ 

96 

97 index: int 

98 """Index into the last dimension of the mask array where this plane's bit 

99 is stored. 

100 """ 

101 

102 mask: np.integer 

103 """Bitmask that selects just this plane's bit from a mask array value 

104 (`numpy.integer`). 

105 """ 

106 

107 @classmethod 

108 def compute(cls, overall_index: int, stride: int, mask_type: type[np.integer]) -> MaskPlaneBit: 

109 """Construct a `MaskPlaneBit` from the overall index of a plane in a 

110 `MaskSchema` and the stride (number of bits per mask array element). 

111 """ 

112 index, bit = divmod(overall_index, stride) 

113 return cls(index, mask_type(1 << bit)) 

114 

115 

116class MaskSchema: 

117 """A schema for a bit-packed mask array. 

118 

119 Parameters 

120 ---------- 

121 planes 

122 Iterable of `MaskPlane` instances that define the schema. `None` 

123 values may be included to reserve bits for future use. 

124 dtype 

125 The numpy data type of the mask arrays that use this schema. 

126 

127 Notes 

128 ----- 

129 A `MaskSchema` is a collection of mask planes, which each correspond to a 

130 single bit in a mask array. Mask schemas are immutable and associated with 

131 a particular array data type, allowing them to safely precompute the index 

132 and bitmask for each plane. 

133 

134 `MaskSchema` indexing is by integer (the overall index of a plane in the 

135 schema). The `descriptions` attribute may be indexed by plane name to get 

136 the description for that plane, and the `bitmask` method can be used to 

137 obtain an array that can be used to select one or more planes by name in 

138 a mask array that uses this schema. 

139 

140 If no mask planes are provided, a `None` placeholder is automatically 

141 added. 

142 """ 

143 

144 def __init__(self, planes: Iterable[MaskPlane | None], dtype: npt.DTypeLike = np.uint8): 

145 self._planes: tuple[MaskPlane | None, ...] = tuple(planes) or (None,) 

146 self._dtype = cast(np.dtype[np.integer], np.dtype(dtype)) 

147 stride = self.bits_per_element(self._dtype) 

148 self._descriptions = {plane.name: plane.description for plane in self._planes if plane is not None} 

149 self._mask_size = math.ceil(len(self._planes) / stride) 

150 self._bits: dict[str, MaskPlaneBit] = { 

151 plane.name: MaskPlaneBit.compute(n, stride, self._dtype.type) 

152 for n, plane in enumerate(self._planes) 

153 if plane is not None 

154 } 

155 

156 @staticmethod 

157 def bits_per_element(dtype: npt.DTypeLike) -> int: 

158 """Return the number of mask bits per array element for the given 

159 data type. 

160 """ 

161 dtype = np.dtype(dtype) 

162 match dtype.kind: 

163 case "u": 

164 return dtype.itemsize * 8 

165 case "i": 

166 return dtype.itemsize * 8 - 1 

167 case _: 

168 raise TypeError(f"dtype for masks must be an integer; got {dtype} with kind={dtype.kind}.") 

169 

170 def __iter__(self) -> Iterator[MaskPlane | None]: 

171 return iter(self._planes) 

172 

173 def __len__(self) -> int: 

174 return len(self._planes) 

175 

176 def __getitem__(self, i: int) -> MaskPlane | None: 

177 return self._planes[i] 

178 

179 def __repr__(self) -> str: 

180 return f"MaskSchema({list(self._planes)}, dtype={self._dtype!r})" 

181 

182 def __str__(self) -> str: 

183 return "\n".join( 

184 [ 

185 f"{name} [{bit.index}@{hex(bit.mask)}]: {self._descriptions[name]}" 

186 for name, bit in self._bits.items() 

187 ] 

188 ) 

189 

190 def __eq__(self, other: object) -> bool: 

191 if isinstance(other, MaskSchema): 

192 return self._planes == other._planes and self._dtype == other._dtype 

193 return False 

194 

195 @property 

196 def dtype(self) -> np.dtype: 

197 """The numpy data type of the mask arrays that use this schema.""" 

198 return self._dtype 

199 

200 @property 

201 def mask_size(self) -> int: 

202 """The number of elements in the last dimension of any mask array that 

203 uses this schema. 

204 """ 

205 return self._mask_size 

206 

207 @property 

208 def names(self) -> Set[str]: 

209 """The names of the mask planes, in bit order.""" 

210 return self._bits.keys() 

211 

212 @property 

213 def descriptions(self) -> Mapping[str, str]: 

214 """A mapping from plane name to description.""" 

215 return self._descriptions 

216 

217 def bit(self, plane: str) -> MaskPlaneBit: 

218 """Return the last array index and mask for the given mask plane.""" 

219 return self._bits[plane] 

220 

221 def bitmask(self, *planes: str) -> np.ndarray: 

222 """Return a 1-d mask array that represents the union (i.e. bitwise OR) 

223 of the planes with the given names. 

224 

225 Parameters 

226 ---------- 

227 *planes 

228 Mask plane names. 

229 

230 Returns 

231 ------- 

232 numpy.ndarray 

233 A 1-d array with shape ``(mask_size,)``. 

234 """ 

235 result = np.zeros(self.mask_size, dtype=self._dtype) 

236 for plane in planes: 

237 bit = self._bits[plane] 

238 result[bit.index] |= bit.mask 

239 return result 

240 

241 def split(self, dtype: npt.DTypeLike) -> list[MaskSchema]: 

242 """Split the schema into an equivalent series of schemas that each 

243 have a `mask_size` of ``1``, dropping all `None` placeholders. 

244 

245 Parameters 

246 ---------- 

247 dtype 

248 Data type of the new mask pixels. 

249 

250 Returns 

251 ------- 

252 `list` [`MaskSchema`] 

253 A list of mask schemas that together include all planes in 

254 ``self`` and have `mask_size` equal to ``1``. If there are no 

255 mask planes (only `None` placeholders) in ``self``, a single mask 

256 schema with a `None` placeholder is returned; otherwise `None` 

257 placeholders are returned. 

258 """ 

259 dtype = np.dtype(dtype) 

260 planes: list[MaskPlane] = [] 

261 schemas: list[MaskSchema] = [] 

262 n_planes_per_schema = self.bits_per_element(dtype) 

263 for plane in self._planes: 

264 if plane is not None: 

265 planes.append(plane) 

266 if len(planes) == n_planes_per_schema: 

267 schemas.append(MaskSchema(planes, dtype=dtype)) 

268 planes.clear() 

269 if planes: 

270 schemas.append(MaskSchema(planes, dtype=dtype)) 

271 if not schemas: 

272 schemas.append(MaskSchema([None], dtype=dtype)) 

273 return schemas 

274 

275 def update_header(self, header: astropy.io.fits.Header) -> None: 

276 """Add a description of this mask schema to a FITS header.""" 

277 for n, plane in enumerate(self): 

278 if plane is not None: 

279 bit = self.bit(plane.name) 

280 if bit.index != 0: 

281 raise TypeError("Only mask schemas with mask_size==1 can be described in FITS.") 

282 header.set(f"MSKN{n:04d}", plane.name, f"Name for mask plane {n}.") 

283 header.set(f"MSKM{n:04d}", bit.mask, f"Bitmask for plane n={n}; always 1<<n.") 

284 # We don't add a comment to the description card, because it's 

285 # likely to overrun a single card and get the CONTINUE 

286 # treatment. That will cause Astropy to warn about the comment 

287 # being truncated and that's worse than just leaving it 

288 # unexplained; it's pretty obvious from context what it is. 

289 header.set(f"MSKD{n:04d}", plane.description) 

290 

291 def strip_header(self, header: astropy.io.fits.Header) -> None: 

292 """Remove all header cards added by `update_header`.""" 

293 for n, plane in enumerate(self): 

294 if plane is not None: 

295 header.remove(f"MSKN{n:04d}", ignore_missing=True) 

296 header.remove(f"MSKM{n:04d}", ignore_missing=True) 

297 header.remove(f"MSKD{n:04d}", ignore_missing=True) 

298 

299 

300class Mask(GeneralizedImage): 

301 """A 2-d bitmask image backed by a 3-d byte array. 

302 

303 Parameters 

304 ---------- 

305 array_or_fill 

306 Array or fill value for the mask. If a fill value, ``bbox`` or 

307 ``shape`` must be provided. 

308 schema 

309 Schema that defines the planes and their bit assignments. 

310 bbox 

311 Bounding box for the mask. This sets the shape of the first two 

312 dimensions of the array. 

313 start 

314 Logical coordinates of the first pixel in the array, ordered ``y``, 

315 ``x`` (unless an `XY` instance is passed). Ignored if 

316 ``bbox`` is provided. Defaults to zeros. 

317 shape 

318 Leading dimensions of the array, ordered ``y``, ``x`` (unless an `XY` 

319 instance is passed). Only needed if ``array_or_fill`` is not an 

320 array and ``bbox`` is not provided. Like the bbox, this does not 

321 include the last dimension of the array. 

322 projection 

323 Projection that maps the pixel grid to the sky. 

324 obs_info 

325 General information about the associated observation in standardized 

326 form. 

327 metadata 

328 Arbitrary flexible metadata to associate with the mask. 

329 

330 Notes 

331 ----- 

332 Indexing the `array` attribute of a `Mask` does not take into account its 

333 ``start`` offset, but accessing a subimage mask by indexing a `Mask` with 

334 a `Box` does, and the `bbox` of the subimage is set to match its location 

335 within the original mask. 

336 

337 A mask's ``bbox`` corresponds to the leading dimensions of its backing 

338 `numpy.ndarray`, while the last dimension's size is always equal to the 

339 `~MaskSchema.mask_size` of its schema, since a schema can in general 

340 require multiple array elements to represent all of its planes. 

341 """ 

342 

343 def __init__( 

344 self, 

345 array_or_fill: np.ndarray | int = 0, 

346 /, 

347 *, 

348 schema: MaskSchema, 

349 bbox: Box | None = None, 

350 start: Sequence[int] | None = None, 

351 shape: Sequence[int] | None = None, 

352 projection: Projection | None = None, 

353 obs_info: ObservationInfo | None = None, 

354 metadata: dict[str, MetadataValue] | None = None, 

355 ): 

356 super().__init__(metadata) 

357 if shape is not None: 

358 shape = tuple(shape) 

359 if start is not None: 

360 start = tuple(start) 

361 if isinstance(array_or_fill, np.ndarray): 

362 array = np.array(array_or_fill, dtype=schema.dtype) 

363 if array.ndim != 3: 

364 raise ValueError("Mask array must be 3-d.") 

365 if bbox is None: 

366 bbox = Box.from_shape(array.shape[:-1], start=start) 

367 elif bbox.shape + (schema.mask_size,) != array.shape: 

368 raise ValueError( 

369 f"Explicit bbox shape {bbox.shape} and schema of size {schema.mask_size} do not " 

370 f"match array with shape {array.shape}." 

371 ) 

372 if shape is not None and shape + (schema.mask_size,) != array.shape: 

373 raise ValueError( 

374 f"Explicit shape {shape} and schema of size {schema.mask_size} do " 

375 f"not match array with shape {array.shape}." 

376 ) 

377 

378 else: 

379 if bbox is None: 

380 if shape is None: 

381 raise TypeError("No bbox, size, or array provided.") 

382 bbox = Box.from_shape(shape, start=start) 

383 array = np.full(bbox.shape + (schema.mask_size,), array_or_fill, dtype=schema.dtype) 

384 self._array = array 

385 self._bbox: Box = bbox 

386 self._schema: MaskSchema = schema 

387 self._projection = projection 

388 self._obs_info = obs_info 

389 

390 @property 

391 def array(self) -> np.ndarray: 

392 """The low-level array (`numpy.ndarray`). 

393 

394 Assigning to this attribute modifies the existing array in place; the 

395 bounding box and underlying data pointer are never changed. 

396 """ 

397 return self._array 

398 

399 @array.setter 

400 def array(self, value: np.ndarray | int) -> None: 

401 self._array[:, :] = value 

402 

403 @property 

404 def schema(self) -> MaskSchema: 

405 """Schema that defines the planes and their bit assignments 

406 (`MaskSchema`). 

407 """ 

408 return self._schema 

409 

410 @property 

411 def bbox(self) -> Box: 

412 """2-d bounding box of the mask (`Box`). 

413 

414 This sets the shape of the first two dimensions of the array. 

415 """ 

416 return self._bbox 

417 

418 @property 

419 def projection(self) -> Projection[Any] | None: 

420 """The projection that maps this mask's pixel grid to the sky 

421 (`Projection` | `None`). 

422 

423 Notes 

424 ----- 

425 The pixel coordinates used by this projection account for the bounding 

426 box ``start``; they are not just array indices. 

427 """ 

428 return self._projection 

429 

430 @property 

431 def obs_info(self) -> ObservationInfo | None: 

432 """General information about the associated observation in standard 

433 form. (`~astro_metadata_translator.ObservationInfo` | `None`). 

434 """ 

435 return self._obs_info 

436 

437 def __getitem__(self, bbox: Box | EllipsisType) -> Mask: 

438 if bbox is ...: 

439 return self 

440 super().__getitem__(bbox) 

441 return self._transfer_metadata( 

442 Mask( 

443 self.array[bbox.y.slice_within(self._bbox.y), bbox.x.slice_within(self._bbox.x), :], 

444 bbox=bbox, 

445 schema=self.schema, 

446 ), 

447 bbox=bbox, 

448 ) 

449 

450 def __setitem__(self, bbox: Box | EllipsisType, value: Mask) -> None: 

451 subview = self[bbox] 

452 subview.clear() 

453 subview.update(value) 

454 

455 def __str__(self) -> str: 

456 return f"Mask({self.bbox!s}, {list(self.schema.names)})" 

457 

458 def __repr__(self) -> str: 

459 return f"Mask(..., bbox={self.bbox!r}, schema={self.schema!r})" 

460 

461 def __eq__(self, other: object) -> bool: 

462 if not isinstance(other, Mask): 

463 return NotImplemented 

464 return ( 

465 self._bbox == other._bbox 

466 and self._schema == other._schema 

467 and np.array_equal(self._array, other._array, equal_nan=True) 

468 ) 

469 

470 def copy(self) -> Mask: 

471 """Deep-copy the mask and metadata.""" 

472 return self._transfer_metadata( 

473 Mask( 

474 self._array.copy(), 

475 bbox=self._bbox, 

476 schema=self._schema, 

477 projection=self._projection, 

478 obs_info=self._obs_info, 

479 ), 

480 copy=True, 

481 ) 

482 

483 def view( 

484 self, 

485 *, 

486 schema: MaskSchema | EllipsisType = ..., 

487 projection: Projection | None | EllipsisType = ..., 

488 start: Sequence[int] | EllipsisType = ..., 

489 obs_info: ObservationInfo | None | EllipsisType = ..., 

490 ) -> Mask: 

491 """Make a view of the mask, with optional updates. 

492 

493 Notes 

494 ----- 

495 This can only be used to make changes to schema descriptions; plane 

496 names must remain the same (in the same order). 

497 """ 

498 if schema is ...: 

499 schema = self._schema 

500 else: 

501 if list(schema.names) != list(self.schema.names): 

502 raise ValueError("Cannot create a mask view with a schema with different names.") 

503 if projection is ...: 

504 projection = self._projection 

505 if start is ...: 

506 start = self._bbox.start 

507 if obs_info is ...: 

508 obs_info = self._obs_info 

509 return self._transfer_metadata( 

510 Mask(self._array, start=start, schema=schema, projection=projection, obs_info=obs_info) 

511 ) 

512 

513 def update(self, other: Mask) -> None: 

514 """Update ``self`` to include all common mask values set in ``other``. 

515 

516 Notes 

517 ----- 

518 This only operates on the intersection of the two mask bounding boxes 

519 and the mask planes that are present in both. Mask bits are only set, 

520 not cleared (i.e. this uses ``|=`` updates, not ``=`` assignments). 

521 """ 

522 lhs = self 

523 rhs = other 

524 if other.bbox != self.bbox: 

525 try: 

526 bbox = self.bbox.intersection(other.bbox) 

527 except NoOverlapError: 

528 return 

529 lhs = self[bbox] 

530 rhs = other[bbox] 

531 for name in self.schema.names & other.schema.names: 

532 lhs.set(name, rhs.get(name)) 

533 

534 def get(self, plane: str) -> np.ndarray: 

535 """Return a 2-d boolean array for the given mask plane. 

536 

537 Parameters 

538 ---------- 

539 plane 

540 Name of the mask plane. 

541 

542 Returns 

543 ------- 

544 numpy.ndarray 

545 A 2-d boolean array with the same shape as `bbox` that is `True` 

546 where the bit for ``plane`` is set and `False` elsewhere. 

547 """ 

548 bit = self.schema.bit(plane) 

549 return (self._array[..., bit.index] & bit.mask).astype(bool) 

550 

551 def set(self, plane: str, boolean_mask: np.ndarray | EllipsisType = ...) -> None: 

552 """Set a mask plane. 

553 

554 Parameters 

555 ---------- 

556 plane 

557 Name of the mask plane to set 

558 boolean_mask 

559 A 2-d boolean array with the same shape as `bbox` that is `True` 

560 where the bit for ``plane`` should be set and `False` where it 

561 should be left unchanged (*not* set to zero). May be ``...`` to 

562 set the bit everywhere. 

563 """ 

564 bit = self.schema.bit(plane) 

565 if boolean_mask is not ...: 

566 boolean_mask = boolean_mask.astype(bool) 

567 self._array[boolean_mask, bit.index] |= bit.mask 

568 

569 def clear(self, plane: str | None = None, boolean_mask: np.ndarray | EllipsisType = ...) -> None: 

570 """Clear one or more mask planes. 

571 

572 Parameters 

573 ---------- 

574 plane 

575 Name of the mask plane to set. If `None` all mask planes are 

576 cleared. 

577 boolean_mask 

578 A 2-d boolean array with the same shape as `bbox` that is `True` 

579 where the bit for ``plane`` should be cleared and `False` where it 

580 should be left unchanged. May be ``...`` to clear the bit 

581 everywhere. 

582 """ 

583 if boolean_mask is not ...: 

584 boolean_mask = boolean_mask.astype(bool) 

585 if plane is None: 

586 self._array[boolean_mask, :] = 0 

587 else: 

588 bit = self.schema.bit(plane) 

589 self._array[boolean_mask, bit.index] &= ~bit.mask 

590 

591 def serialize[P: pydantic.BaseModel]( 

592 self, 

593 archive: OutputArchive[P], 

594 *, 

595 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

596 save_projection: bool = True, 

597 save_obs_info: bool = True, 

598 add_offset_wcs: str | None = "A", 

599 ) -> MaskSerializationModel[P]: 

600 """Serialize the mask to an output archive. 

601 

602 Parameters 

603 ---------- 

604 archive 

605 Archive to write to. 

606 update_header 

607 A callback that will be given the FITS header for the HDU 

608 containing this mask in order to add keys to it. This callback 

609 may be provided but will not be called if the output format is not 

610 FITS. As multiple HDUs may be added, this function may be called 

611 multiple times. 

612 save_projection 

613 If `True`, save the `Projection` attached to the image, if there 

614 is one. This does not affect whether a FITS WCS corresponding to 

615 the projection is written (it always is, if available, and if 

616 ``add_offset_wcs`` is not ``" "``). 

617 save_obs_info 

618 If `True`, save the 

619 `~astro_metadata_translator.ObservationInfo` attached to the 

620 image, if there is one. 

621 add_offset_wcs 

622 A FITS WCS single-character suffix to use when adding a linear 

623 WCS that maps the FITS array to the logical pixel coordinates 

624 defined by ``bbox.start``. Set to `None` to not write this WCS. 

625 If this is set to ``" "``, it will prevent the `Projection` from 

626 being saved as a FITS WCS. 

627 """ 

628 data: list[ArrayReferenceModel | InlineArrayModel] = [] 

629 for schema_2d in self.schema.split(np.int32): 

630 mask_2d = Mask( 

631 0, bbox=self.bbox, schema=schema_2d, projection=self._projection, obs_info=self._obs_info 

632 ) 

633 mask_2d.update(self) 

634 data.append( 

635 mask_2d._serialize_2d(archive, update_header=update_header, add_offset_wcs=add_offset_wcs) 

636 ) 

637 serialized_projection: ProjectionSerializationModel[P] | None = None 

638 if save_projection and self.projection is not None: 

639 serialized_projection = archive.serialize_direct("projection", self.projection.serialize) 

640 serialized_dtype = NumberType.from_numpy(self.schema.dtype) 

641 assert is_integer(serialized_dtype), "Mask dtypes should always be integers." 

642 return MaskSerializationModel.model_construct( 

643 data=data, 

644 start=list(self.bbox.start), 

645 planes=list(self.schema), 

646 dtype=serialized_dtype, 

647 projection=serialized_projection, 

648 obs_info=self._obs_info if save_obs_info else None, 

649 metadata=self.metadata, 

650 ) 

651 

652 def _serialize_2d[P: pydantic.BaseModel]( 

653 self, 

654 archive: OutputArchive[P], 

655 *, 

656 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

657 add_offset_wcs: str | None = "A", 

658 ) -> ArrayReferenceModel | InlineArrayModel: 

659 def _update_header(header: astropy.io.fits.Header) -> None: 

660 update_header(header) 

661 self.schema.update_header(header) 

662 if self.projection is not None and add_offset_wcs != " ": 

663 if self.fits_wcs: 

664 header.update(self.fits_wcs.to_header(relax=True)) 

665 if add_offset_wcs is not None: 

666 fits.add_offset_wcs(header, x=self.bbox.x.start, y=self.bbox.y.start, key=add_offset_wcs) 

667 

668 assert self.array.shape[2] == 1, "Mask should be split before calling this method." 

669 return archive.add_array(self._array[:, :, 0], update_header=_update_header) 

670 

671 @classmethod 

672 def deserialize( 

673 cls, 

674 model: MaskSerializationModel[Any], 

675 archive: InputArchive[Any], 

676 *, 

677 bbox: Box | None = None, 

678 strip_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

679 ) -> Mask: 

680 """Deserialize a mask from an input archive. 

681 

682 Parameters 

683 ---------- 

684 model 

685 A Pydantic model representation of the mask, holding references 

686 to data stored in the archive. 

687 archive 

688 Archive to read from. 

689 bbox 

690 Bounding box of a subimage to read instead. 

691 strip_header 

692 A callable that strips out any FITS header cards added by the 

693 ``update_header`` argument in the corresponding call to 

694 `serialize`. 

695 """ 

696 slices: tuple[slice, ...] | EllipsisType = ... 

697 if bbox is not None: 

698 slices = bbox.slice_within(model.bbox) 

699 else: 

700 bbox = model.bbox 

701 if not is_integer(model.dtype): 

702 raise ArchiveReadError(f"Mask array has a non-integer dtype: {model.dtype}.") 

703 schema = MaskSchema(model.planes, dtype=model.dtype.to_numpy()) 

704 projection = ( 

705 Projection.deserialize(model.projection, archive) if model.projection is not None else None 

706 ) 

707 result = Mask( 

708 0, 

709 schema=schema, 

710 bbox=bbox, 

711 projection=projection, 

712 obs_info=model.obs_info, 

713 ) 

714 schemas_2d = schema.split(np.int32) 

715 if len(schemas_2d) != len(model.data): 

716 raise ArchiveReadError( 

717 f"Number of mask arrays ({len(model.data)}) does not match expectation ({len(schemas_2d)})." 

718 ) 

719 for array_model, schema_2d in zip(model.data, schemas_2d): 

720 mask_2d = cls._deserialize_2d( 

721 array_model, schema_2d, bbox.start, archive, strip_header=strip_header, slices=slices 

722 ) 

723 result.update(mask_2d) 

724 return result._finish_deserialize(model) 

725 

726 @classmethod 

727 def _deserialize_2d( 

728 cls, 

729 ref: ArrayReferenceModel | InlineArrayModel, 

730 schema_2d: MaskSchema, 

731 start: Sequence[int], 

732 archive: InputArchive[Any], 

733 *, 

734 slices: tuple[slice, ...] | EllipsisType = ..., 

735 strip_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

736 ) -> Mask: 

737 def _strip_header(header: astropy.io.fits.Header) -> None: 

738 strip_header(header) 

739 schema_2d.strip_header(header) 

740 fits.strip_wcs_cards(header) 

741 

742 array_2d = archive.get_array(ref, strip_header=_strip_header, slices=slices) 

743 return Mask(array_2d[:, :, np.newaxis], schema=schema_2d, start=start) 

744 

745 @staticmethod 

746 def _get_archive_tree_type[P: pydantic.BaseModel]( 

747 pointer_type: type[P], 

748 ) -> type[MaskSerializationModel[P]]: 

749 """Return the serialization model type for this object for an archive 

750 type that uses the given pointer type. 

751 """ 

752 return MaskSerializationModel[pointer_type] # type: ignore 

753 

754 _archive_default_name: ClassVar[str] = "mask" 

755 """The name this object should be serialized with when written as the 

756 top-level object. 

757 """ 

758 

759 def write_fits( 

760 self, 

761 filename: str, 

762 *, 

763 compression: fits.FitsCompressionOptions | None = fits.FitsCompressionOptions.DEFAULT, 

764 ) -> None: 

765 """Write the mask to a FITS file. 

766 

767 Parameters 

768 ---------- 

769 filename 

770 Name of the file to write to. Must be a local file. 

771 compression 

772 Compression options. 

773 """ 

774 compression_options = {} 

775 if compression is not fits.FitsCompressionOptions.DEFAULT: 

776 compression_options[self._archive_default_name] = compression 

777 fits.write(self, filename, compression_options) 

778 

779 @staticmethod 

780 def read_fits(url: ResourcePathExpression, *, bbox: Box | None = None) -> Mask: 

781 """Read an image from a FITS file. 

782 

783 Parameters 

784 ---------- 

785 url 

786 URL of the file to read; may be any type supported by 

787 `lsst.resources.ResourcePath`. 

788 bbox 

789 Bounding box of a subimage to read instead. 

790 """ 

791 return fits.read(Mask, url, bbox=bbox).deserialized 

792 

793 @staticmethod 

794 def from_legacy( 

795 legacy: Any, 

796 plane_map: Mapping[str, MaskPlane] | None = None, 

797 ) -> Mask: 

798 """Convert from an `lsst.afw.image.Mask` instance. 

799 

800 Parameters 

801 ---------- 

802 legacy 

803 An `lsst.afw.image.Mask` instance. This will not share pixel 

804 data with the new object. 

805 plane_map 

806 A mapping from legacy mask plane name to the new plane name and 

807 description. 

808 """ 

809 return Mask._from_legacy_array( 

810 legacy.array, 

811 legacy.getMaskPlaneDict(), 

812 start=YX(y=legacy.getY0(), x=legacy.getX0()), 

813 plane_map=plane_map, 

814 ) 

815 

816 def to_legacy(self, plane_map: Mapping[str, MaskPlane] | None = None) -> Any: 

817 """Convert to an `lsst.afw.image.Mask` instance. 

818 

819 The pixel data will not be shared between the two objects. 

820 

821 Parameters 

822 ---------- 

823 plane_map 

824 A mapping from legacy mask plane name to the new plane name and 

825 description. 

826 """ 

827 import lsst.afw.image 

828 import lsst.geom 

829 

830 result = lsst.afw.image.Mask(self.bbox.to_legacy()) 

831 if plane_map is None: 

832 plane_map = {plane.name: plane for plane in self.schema if plane is not None} 

833 for old_name, new_plane in plane_map.items(): 

834 old_bit = result.addMaskPlane(old_name) 

835 old_bitmask = 1 << old_bit 

836 result.array[self.get(new_plane.name)] |= old_bitmask 

837 return result 

838 

839 @staticmethod 

840 def _from_legacy_array( 

841 array2d: np.ndarray, 

842 old_planes: Mapping[str, int], 

843 *, 

844 start: YX[int], 

845 plane_map: Mapping[str, MaskPlane] | None = None, 

846 projection: Projection | None = None, 

847 ) -> Mask: 

848 planes: list[MaskPlane] = [] 

849 new_name_to_old_bitmask: dict[str, int] = {} 

850 for old_name, old_bit in old_planes.items(): 

851 old_bitmask = 1 << old_bit 

852 if plane_map is not None: 

853 if new_plane := plane_map.get(old_name): 

854 planes.append(new_plane) 

855 new_name_to_old_bitmask[new_plane.name] = old_bitmask 

856 else: 

857 if n_orphaned := np.count_nonzero(array2d & old_bitmask): 

858 raise RuntimeError( 

859 f"Legacy mask plane {old_name!r} is not remapped, " 

860 f"but {n_orphaned} pixels have this bit set." 

861 ) 

862 else: 

863 planes.append(MaskPlane(old_name, "")) 

864 new_name_to_old_bitmask[old_name] = old_bitmask 

865 schema = MaskSchema(planes) 

866 mask = Mask(0, schema=schema, start=start, shape=array2d.shape, projection=projection) 

867 for new_name, old_bitmask in new_name_to_old_bitmask.items(): 

868 mask.set(new_name, array2d & old_bitmask) 

869 return mask 

870 

871 @staticmethod 

872 def read_legacy( 

873 uri: ResourcePathExpression, 

874 *, 

875 plane_map: Mapping[str, MaskPlane] | None = None, 

876 ext: str | int = 1, 

877 fits_wcs_frame: Frame | None = None, 

878 ) -> Mask: 

879 """Read a FITS file written by `lsst.afw.image.Mask.writeFits`. 

880 

881 Parameters 

882 ---------- 

883 uri 

884 URI or file name. 

885 plane_map 

886 A mapping from legacy mask plane name to the new plane name and 

887 description. 

888 ext 

889 Name or index of the FITS HDU to read. 

890 fits_wcs_frame 

891 If not `None` and the HDU containing the mask has a FITS WCS, 

892 attach a `Projection` to the returned mask by converting that WCS. 

893 """ 

894 opaque_metadata = fits.FitsOpaqueMetadata() 

895 fs, fspath = ResourcePath(uri).to_fsspec() 

896 with fs.open(fspath) as stream, astropy.io.fits.open(stream) as hdu_list: 

897 opaque_metadata.extract_legacy_primary_header(hdu_list[0].header) 

898 result = Mask._read_legacy_hdu( 

899 hdu_list[ext], opaque_metadata, plane_map=plane_map, fits_wcs_frame=fits_wcs_frame 

900 ) 

901 result._opaque_metadata = opaque_metadata 

902 return result 

903 

904 @staticmethod 

905 def _read_legacy_hdu( 

906 hdu: astropy.io.fits.ImageHDU | astropy.io.fits.CompImageHDU | astropy.io.fits.BinTableHDU, 

907 opaque_metadata: fits.FitsOpaqueMetadata, 

908 plane_map: Mapping[str, MaskPlane] | None = None, 

909 fits_wcs_frame: Frame | None = None, 

910 ) -> Mask: 

911 if isinstance(hdu, astropy.io.fits.BinTableHDU): 

912 hdu = astropy.io.fits.CompImageHDU(bintable=hdu) 

913 dx: int = hdu.header.pop("LTV1") 

914 dy: int = hdu.header.pop("LTV2") 

915 start = YX(y=-dy, x=-dx) 

916 old_planes = MaskPlane.read_legacy(hdu.header) 

917 projection: Projection | None = None 

918 if fits_wcs_frame is not None: 

919 try: 

920 fits_wcs = astropy.wcs.WCS(hdu.header) 

921 except KeyError: 

922 pass 

923 else: 

924 projection = Projection.from_fits_wcs( 

925 fits_wcs, pixel_frame=fits_wcs_frame, x0=start.x, y0=start.y 

926 ) 

927 mask = Mask._from_legacy_array( 

928 hdu.data, old_planes, start=start, plane_map=plane_map, projection=projection 

929 ) 

930 fits.strip_wcs_cards(hdu.header) 

931 hdu.header.strip() 

932 hdu.header.remove("EXTTYPE", ignore_missing=True) 

933 hdu.header.remove("INHERIT", ignore_missing=True) 

934 # afw set BUNIT on masks because of limitations in how FITS 

935 # metadata is handled there. 

936 hdu.header.remove("BUNIT", ignore_missing=True) 

937 opaque_metadata.add_header(hdu.header) 

938 return mask 

939 

940 

941class MaskSerializationModel[P: pydantic.BaseModel](ArchiveTree): 

942 """Pydantic model used to represent the serialized form of a `.Mask`.""" 

943 

944 data: list[ArrayReferenceModel | InlineArrayModel] = pydantic.Field( 

945 description="References to pixel data." 

946 ) 

947 start: list[int] = pydantic.Field( 

948 description="Coordinate of the first pixels in the array, ordered (y, x)." 

949 ) 

950 planes: list[MaskPlane | None] = pydantic.Field(description="Definitions of the bitplanes in the mask.") 

951 dtype: IntegerType = pydantic.Field(description="Data type of the in-memory mask.") 

952 projection: ProjectionSerializationModel[P] | None = pydantic.Field( 

953 default=None, 

954 exclude_if=is_none, 

955 description="Projection that maps the logical pixel grid onto the sky.", 

956 ) 

957 obs_info: ObservationInfo | None = pydantic.Field( 

958 default=None, 

959 exclude_if=is_none, 

960 description="Standardized description of image metadata", 

961 ) 

962 

963 @property 

964 def bbox(self) -> Box: 

965 """The 2-d bounding box of the mask.""" 

966 return Box.from_shape(self.data[0].shape, start=self.start) 

967 

968 

969def get_legacy_visit_image_mask_planes() -> dict[str, MaskPlane]: 

970 """Return a mapping from legacy mask plane name to `MaskPlane` instance 

971 for LSST visit images, c. DP2. 

972 """ 

973 return { 

974 "BAD": MaskPlane("BAD", "Bad pixel in the instrument, including bad amplifiers."), 

975 "SAT": MaskPlane( 

976 "SATURATED", "Pixel was saturated or affected by saturation in a neighboring pixel." 

977 ), 

978 "INTRP": MaskPlane("INTERPOLATED", "Original pixel value was interpolated."), 

979 "CR": MaskPlane("COSMIC_RAY", "A cosmic ray affected this pixel."), 

980 "EDGE": MaskPlane( 

981 "DETECTION_EDGE", 

982 "Pixel was too close to the edge to be considered for detection, " 

983 "due to the finite size of the detection kernel.", 

984 ), 

985 "DETECTED": MaskPlane("DETECTED", "Pixel was part of a detected source."), 

986 "SUSPECT": MaskPlane("SUSPECT", "Pixel was close to the saturation level. "), 

987 "NO_DATA": MaskPlane("NO_DATA", "No data was available for this pixel."), 

988 "VIGNETTED": MaskPlane("VIGNETTED", "Pixel was vignetted by the optics."), 

989 "PARTLY_VIGNETTED": MaskPlane("PARTLY_VIGNETTED", "Pixel was partly vignetted by the optics."), 

990 "CROSSTALK": MaskPlane("CROSSTALK", "Pixel was affected by crosstalk and corrected accordingly."), 

991 "ITL_DIP": MaskPlane( 

992 "ITL_DIP", "Pixel was affected by a dark vertical trail from a bright source, on an ITL CCD." 

993 ), 

994 "NOT_DEBLENDED": MaskPlane( 

995 "NOT_DEBLENDED", 

996 "Pixel belonged to a detection that was not deblended, usually due to size limits.", 

997 ), 

998 "SPIKE": MaskPlane( 

999 "SPIKE", "Pixel is in the neighborhood of a diffraction spike from a bright star." 

1000 ), 

1001 }