Coverage for python / lsst / images / _masked_image.py: 34%

179 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-18 09:00 +0000

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14__all__ = ("MaskedImage", "MaskedImageSerializationModel") 

15 

16import functools 

17from collections.abc import Mapping 

18from contextlib import ExitStack 

19from types import EllipsisType 

20from typing import Any, Literal, overload 

21 

22import astropy.io.fits 

23import astropy.units 

24import astropy.wcs 

25import numpy as np 

26import pydantic 

27from astro_metadata_translator import ObservationInfo 

28 

29from lsst.resources import ResourcePath, ResourcePathExpression 

30 

31from . import fits 

32from ._generalized_image import GeneralizedImage 

33from ._geom import Box 

34from ._image import Image, ImageSerializationModel 

35from ._mask import Mask, MaskPlane, MaskSchema, MaskSerializationModel 

36from ._transforms import Frame, Projection, ProjectionSerializationModel 

37from .serialization import ArchiveTree, InputArchive, MetadataValue, OutputArchive 

38from .utils import is_none 

39 

40 

41class MaskedImage(GeneralizedImage): 

42 """A multi-plane image with data (image), mask, and variance planes. 

43 

44 Parameters 

45 ---------- 

46 image 

47 The main image plane. If this has a `Projection`, it will be used 

48 for all planes unless a ``projection`` is passed separately. 

49 mask 

50 A bitmask image that annotates the main image plane. Must have the 

51 same bounding box as ``image`` if provided. Any attached projection 

52 is replaced (possibly by `None`). 

53 variance 

54 The per-pixel uncertainty of the main image as an image of variance 

55 values. Must have the same bounding box as ``image`` if provided, and 

56 its units must be the square of ``image.unit`` or `None`. 

57 Values default to ``1.0``. Any attached projection is replaced 

58 (possibly by `None`). 

59 mask_schema 

60 Schema for the mask plane. Must be provided if and only if ``mask`` is 

61 not provided. 

62 projection 

63 Projection that maps the pixel grid to the sky. 

64 obs_info 

65 General information about the associated observation in standardized 

66 form. 

67 metadata 

68 Arbitrary flexible metadata to associate with the image. 

69 """ 

70 

71 def __init__( 

72 self, 

73 image: Image, 

74 *, 

75 mask: Mask | None = None, 

76 variance: Image | None = None, 

77 mask_schema: MaskSchema | None = None, 

78 projection: Projection | None = None, 

79 obs_info: ObservationInfo | None = None, 

80 metadata: dict[str, MetadataValue] | None = None, 

81 ): 

82 super().__init__(metadata) 

83 if projection is None: 

84 projection = image.projection 

85 else: 

86 image = image.view(projection=projection) 

87 if obs_info is None: 

88 obs_info = image.obs_info 

89 else: 

90 image = image.view(obs_info=obs_info) 

91 if mask is None: 

92 if mask_schema is None: 

93 raise TypeError("'mask_schema' must be provided if 'mask' is not.") 

94 mask = Mask(schema=mask_schema, bbox=image.bbox, projection=projection, obs_info=obs_info) 

95 elif mask_schema is not None: 

96 raise TypeError("'mask_schema' may not be provided if 'mask' is.") 

97 else: 

98 if image.bbox != mask.bbox: 

99 raise ValueError(f"Image ({image.bbox}) and mask ({mask.bbox}) bboxes do not agree.") 

100 mask = mask.view(projection=projection) 

101 if variance is None: 

102 variance = Image( 

103 1.0, 

104 dtype=np.float32, 

105 bbox=image.bbox, 

106 unit=None if image.unit is None else image.unit**2, 

107 projection=projection, 

108 obs_info=obs_info, 

109 ) 

110 else: 

111 if image.bbox != variance.bbox: 

112 raise ValueError(f"Image ({image.bbox}) and variance ({variance.bbox}) bboxes do not agree.") 

113 variance = variance.view(projection=projection, obs_info=obs_info) 

114 if image.unit is None: 

115 if variance.unit is not None: 

116 raise ValueError(f"Image has no units but variance does ({variance.unit}).") 

117 elif variance.unit is None: 

118 variance = variance.view(unit=image.unit**2) 

119 elif variance.unit != image.unit**2: 

120 raise ValueError( 

121 f"Variance unit ({variance.unit}) should be the square of the image unit ({image.unit})." 

122 ) 

123 self._image = image 

124 self._mask = mask 

125 self._variance = variance 

126 

127 @property 

128 def image(self) -> Image: 

129 """The main image plane (`Image`).""" 

130 return self._image 

131 

132 @property 

133 def mask(self) -> Mask: 

134 """The mask plane (`Mask`).""" 

135 return self._mask 

136 

137 @property 

138 def variance(self) -> Image: 

139 """The variance plane (`Image`).""" 

140 return self._variance 

141 

142 @property 

143 def bbox(self) -> Box: 

144 """The bounding box shared by all three image planes (`Box`).""" 

145 return self._image.bbox 

146 

147 @property 

148 def unit(self) -> astropy.units.UnitBase | None: 

149 """The units of the image plane (`astropy.units.Unit` | `None`).""" 

150 return self._image.unit 

151 

152 @property 

153 def projection(self) -> Projection[Any] | None: 

154 """The projection that maps the pixel grid to the sky 

155 (`Projection` | `None`). 

156 """ 

157 return self._image.projection 

158 

159 @property 

160 def obs_info(self) -> ObservationInfo | None: 

161 """General information about the associated observation in standard 

162 form. (`~astro_metadata_translator.ObservationInfo` | `None`). 

163 """ 

164 return self._image.obs_info 

165 

166 def __getitem__(self, bbox: Box | EllipsisType) -> MaskedImage: 

167 if bbox is ...: 

168 return self 

169 super().__getitem__(bbox) 

170 return self._transfer_metadata( 

171 MaskedImage( 

172 # Projection and obs_info propagate from the image. 

173 self.image[bbox], 

174 mask=self.mask[bbox], 

175 variance=self.variance[bbox], 

176 ), 

177 bbox=bbox, 

178 ) 

179 

180 def __setitem__(self, bbox: Box | EllipsisType, value: MaskedImage) -> None: 

181 self._image[bbox] = value.image 

182 self._mask[bbox] = value.mask 

183 self._variance[bbox] = value.variance 

184 

185 def __str__(self) -> str: 

186 return f"MaskedImage({self.image!s}, {list(self.mask.schema.names)})" 

187 

188 def __repr__(self) -> str: 

189 return f"MaskedImage({self.image!r}, mask_schema={self.mask.schema!r})" 

190 

191 def copy(self) -> MaskedImage: 

192 """Deep-copy the masked image and metadata.""" 

193 return self._transfer_metadata( 

194 MaskedImage(image=self._image.copy(), mask=self._mask.copy(), variance=self._variance.copy()), 

195 copy=True, 

196 ) 

197 

198 def serialize(self, archive: OutputArchive[Any]) -> MaskedImageSerializationModel: 

199 """Serialize the masked image to an output archive. 

200 

201 Parameters 

202 ---------- 

203 archive 

204 Archive to write to. 

205 """ 

206 serialized_image = archive.serialize_direct( 

207 "image", functools.partial(self.image.serialize, save_projection=False) 

208 ) 

209 serialized_mask = archive.serialize_direct( 

210 "mask", functools.partial(self.mask.serialize, save_projection=False) 

211 ) 

212 serialized_variance = archive.serialize_direct( 

213 "variance", functools.partial(self.variance.serialize, save_projection=False) 

214 ) 

215 serialized_projection = ( 

216 archive.serialize_direct("projection", self.projection.serialize) 

217 if self.projection is not None 

218 else None 

219 ) 

220 return MaskedImageSerializationModel( 

221 image=serialized_image, 

222 mask=serialized_mask, 

223 variance=serialized_variance, 

224 projection=serialized_projection, 

225 metadata=self.metadata, 

226 ) 

227 

228 @staticmethod 

229 def deserialize( 

230 model: MaskedImageSerializationModel[Any], archive: InputArchive[Any], *, bbox: Box | None = None 

231 ) -> MaskedImage: 

232 """Deserialize an image from an input archive. 

233 

234 Parameters 

235 ---------- 

236 model 

237 A Pydantic model representation of the image, holding references 

238 to data stored in the archive. 

239 archive 

240 Archive to read from. 

241 bbox 

242 Bounding box of a subimage to read instead. 

243 """ 

244 image = Image.deserialize(model.image, archive, bbox=bbox) 

245 mask = Mask.deserialize(model.mask, archive, bbox=bbox) 

246 variance = Image.deserialize(model.variance, archive, bbox=bbox) 

247 projection = ( 

248 Projection.deserialize(model.projection, archive) if model.projection is not None else None 

249 ) 

250 return MaskedImage(image, mask=mask, variance=variance, projection=projection)._finish_deserialize( 

251 model 

252 ) 

253 

254 @staticmethod 

255 def _get_archive_tree_type[P: pydantic.BaseModel]( 

256 pointer_type: type[P], 

257 ) -> type[MaskedImageSerializationModel[P]]: 

258 """Return the serialization model type for this object for an archive 

259 type that uses the given pointer type. 

260 """ 

261 return MaskedImageSerializationModel[pointer_type] # type: ignore 

262 

263 def write_fits( 

264 self, 

265 filename: str, 

266 *, 

267 image_compression: fits.FitsCompressionOptions | None = fits.FitsCompressionOptions.DEFAULT, 

268 mask_compression: fits.FitsCompressionOptions | None = fits.FitsCompressionOptions.DEFAULT, 

269 variance_compression: fits.FitsCompressionOptions | None = fits.FitsCompressionOptions.DEFAULT, 

270 compression_seed: int | None = None, 

271 ) -> None: 

272 """Write the image to a FITS file. 

273 

274 Parameters 

275 ---------- 

276 filename 

277 Name of the file to write to. Must be a local file. 

278 image_compression 

279 Compression options for the `image` plane. 

280 mask_compression 

281 Compression options for the `mask` plane. 

282 variance_compression 

283 Compression options for the `variance` plane. 

284 compression_seed 

285 A FITS tile compression seed to use whenever the configured 

286 compression seed is `None` or (for backwards compatibility) ``0``. 

287 This value is then incremented every time it is used. 

288 """ 

289 compression_options = {} 

290 if image_compression is not fits.FitsCompressionOptions.DEFAULT: 

291 compression_options["image"] = image_compression 

292 if mask_compression is not fits.FitsCompressionOptions.DEFAULT: 

293 compression_options["mask"] = mask_compression 

294 if variance_compression is not fits.FitsCompressionOptions.DEFAULT: 

295 compression_options["variance"] = variance_compression 

296 fits.write(self, filename, compression_options=compression_options, compression_seed=compression_seed) 

297 

298 @classmethod 

299 def read_fits(cls, url: ResourcePathExpression, *, bbox: Box | None = None) -> MaskedImage: 

300 """Read an image from a FITS file. 

301 

302 Parameters 

303 ---------- 

304 url 

305 URL of the file to read; may be any type supported by 

306 `lsst.resources.ResourcePath`. 

307 bbox 

308 Bounding box of a subimage to read instead. 

309 """ 

310 return fits.read(cls, url, bbox=bbox).deserialized 

311 

312 @staticmethod 

313 def from_legacy( 

314 legacy: Any, 

315 *, 

316 unit: astropy.units.Unit | None = None, 

317 plane_map: Mapping[str, MaskPlane] | None = None, 

318 ) -> MaskedImage: 

319 """Convert from an `lsst.afw.image.MaskedImage` instance. 

320 

321 Parameters 

322 ---------- 

323 legacy 

324 An `lsst.afw.image.MaskedImage` instance that will share image and 

325 variance (but not mask) pixel data with the returned object. 

326 unit 

327 Units of the image. 

328 plane_map 

329 A mapping from legacy mask plane name to the new plane name and 

330 description. 

331 """ 

332 return MaskedImage( 

333 image=Image.from_legacy(legacy.getImage(), unit), 

334 mask=Mask.from_legacy(legacy.getMask(), plane_map), 

335 variance=Image.from_legacy(legacy.getVariance()), 

336 ) 

337 

338 def to_legacy(self, *, copy: bool | None = None, plane_map: Mapping[str, MaskPlane] | None = None) -> Any: 

339 """Convert to an `lsst.afw.image.MaskedImage` instance. 

340 

341 Parameters 

342 ---------- 

343 copy 

344 If `True`, always copy the image and variance pixel data. 

345 If `False`, return a view, and raise `TypeError` if the pixel data 

346 is read-only (this is not supported by afw). If `None`, onyl if 

347 the pixel data is read-only. Mask pixel data is always copied. 

348 plane_map 

349 A mapping from legacy mask plane name to the new plane name and 

350 description. 

351 """ 

352 import lsst.afw.image 

353 

354 return lsst.afw.image.MaskedImage( 

355 self.image.to_legacy(copy=copy), 

356 mask=self.mask.to_legacy(plane_map), 

357 variance=self.variance.to_legacy(copy=copy), 

358 dtype=self.image.array.dtype, 

359 ) 

360 

361 @overload 

362 @staticmethod 

363 def read_legacy( 363 ↛ exitline 363 didn't return from function 'read_legacy' because

364 uri: ResourcePathExpression, 

365 *, 

366 preserve_quantization: bool = False, 

367 component: Literal["image"], 

368 fits_wcs_frame: Frame | None = None, 

369 ) -> Image: ... 

370 

371 @overload 

372 @staticmethod 

373 def read_legacy( 373 ↛ exitline 373 didn't return from function 'read_legacy' because

374 uri: ResourcePathExpression, 

375 *, 

376 plane_map: Mapping[str, MaskPlane] | None = None, 

377 component: Literal["mask"], 

378 fits_wcs_frame: Frame | None = None, 

379 ) -> Mask: ... 

380 

381 @overload 

382 @staticmethod 

383 def read_legacy( 383 ↛ exitline 383 didn't return from function 'read_legacy' because

384 uri: ResourcePathExpression, 

385 *, 

386 preserve_quantization: bool = False, 

387 component: Literal["variance"], 

388 fits_wcs_frame: Frame | None = None, 

389 ) -> Image: ... 

390 

391 @overload 

392 @staticmethod 

393 def read_legacy( 393 ↛ exitline 393 didn't return from function 'read_legacy' because

394 uri: ResourcePathExpression, 

395 *, 

396 preserve_quantization: bool = False, 

397 plane_map: Mapping[str, MaskPlane] | None = None, 

398 component: None = None, 

399 fits_wcs_frame: Frame | None = None, 

400 ) -> MaskedImage: ... 

401 

402 @staticmethod 

403 def read_legacy( 

404 uri: ResourcePathExpression, 

405 *, 

406 preserve_quantization: bool = False, 

407 plane_map: Mapping[str, MaskPlane] | None = None, 

408 component: Literal["image", "mask", "variance"] | None = None, 

409 fits_wcs_frame: Frame | None = None, 

410 ) -> Any: 

411 """Read a FITS file written by `lsst.afw.image.MaskedImage.writeFits`. 

412 

413 Parameters 

414 ---------- 

415 uri 

416 URI or file name. 

417 preserve_quantization 

418 If `True`, ensure that writing the masked image back out again will 

419 exactly preserve quantization-compressed pixel values. This causes 

420 the image and variance plane arrays to be marked as read-only and 

421 stores the original binary table data for those planes in memory. 

422 If the `MaskedImage` is copied, the precompressed pixel values are 

423 not transferred to the copy. 

424 plane_map 

425 A mapping from legacy mask plane name to the new plane name and 

426 description. 

427 component 

428 A component to read instead of the full image. 

429 fits_wcs_frame 

430 If not `None` and the HDU containing the image plane has a FITS 

431 WCS, attach a `Projection` to the returned masked image by 

432 converting that WCS. When ``component`` is one of ``"image"``, 

433 ``"mask"``, or ``"variance"``, a FITS WCS from the component HDU 

434 is used instead (all three should have the same WCS). 

435 """ 

436 fs, fspath = ResourcePath(uri).to_fsspec() 

437 with fs.open(fspath) as stream, astropy.io.fits.open(stream) as hdu_list: 

438 return MaskedImage._read_legacy_hdus( 

439 hdu_list, 

440 uri, 

441 preserve_quantization=preserve_quantization, 

442 plane_map=plane_map, 

443 component=component, 

444 fits_wcs_frame=fits_wcs_frame, 

445 ) 

446 

447 @staticmethod 

448 def _read_legacy_hdus( 

449 hdu_list: astropy.io.fits.HDUList, 

450 uri: ResourcePathExpression, 

451 *, 

452 preserve_quantization: bool = False, 

453 plane_map: Mapping[str, MaskPlane] | None = None, 

454 component: Literal["image", "mask", "variance"] | None, 

455 fits_wcs_frame: Frame | None = None, 

456 ) -> Any: 

457 opaque_metadata = fits.FitsOpaqueMetadata() 

458 opaque_metadata.extract_legacy_primary_header(hdu_list[0].header) 

459 image_bintable_hdu: astropy.io.fits.BinTableHDU | None = None 

460 variance_bintable_hdu: astropy.io.fits.BinTableHDU | None = None 

461 result: Any 

462 with ExitStack() as exit_stack: 

463 if preserve_quantization: 

464 fs, fspath = ResourcePath(uri).to_fsspec() 

465 bintable_stream = exit_stack.enter_context(fs.open(fspath)) 

466 bintable_hdu_list = exit_stack.enter_context( 

467 astropy.io.fits.open(bintable_stream, disable_image_compression=True) 

468 ) 

469 image_bintable_hdu = bintable_hdu_list[1] 

470 variance_bintable_hdu = bintable_hdu_list[3] 

471 if component is None or component == "image": 

472 image = Image._read_legacy_hdu( 

473 hdu_list[1], 

474 opaque_metadata, 

475 preserve_bintable=image_bintable_hdu, 

476 fits_wcs_frame=fits_wcs_frame, 

477 ) 

478 if component == "image": 

479 result = image 

480 if component is None or component == "mask": 

481 mask = Mask._read_legacy_hdu( 

482 hdu_list[2], 

483 opaque_metadata, 

484 plane_map=plane_map, 

485 fits_wcs_frame=fits_wcs_frame if component is not None else None, 

486 ) 

487 if component == "mask": 

488 result = mask 

489 if component is None or component == "variance": 

490 variance = Image._read_legacy_hdu( 

491 hdu_list[3], 

492 opaque_metadata, 

493 preserve_bintable=variance_bintable_hdu, 

494 fits_wcs_frame=fits_wcs_frame if component is not None else None, 

495 ) 

496 if component == "variance": 

497 result = variance 

498 if component is None: 

499 result = MaskedImage(image, mask=mask, variance=variance) 

500 result._opaque_metadata = opaque_metadata 

501 return result 

502 

503 

504class MaskedImageSerializationModel[P: pydantic.BaseModel](ArchiveTree): 

505 """A Pydantic model used to represent a serialized `MaskedImage`.""" 

506 

507 image: ImageSerializationModel[P] = pydantic.Field(description="The main data image.") 

508 mask: MaskSerializationModel[P] = pydantic.Field( 

509 description="Bitmask that annotates the main image's pixels." 

510 ) 

511 variance: ImageSerializationModel[P] = pydantic.Field( 

512 description="Per-pixel variance estimates for the main image." 

513 ) 

514 projection: ProjectionSerializationModel[P] | None = pydantic.Field( 

515 default=None, 

516 exclude_if=is_none, 

517 description="Projection that maps the pixel grid to the sky.", 

518 ) 

519 

520 @property 

521 def bbox(self) -> Box: 

522 """The bounding box of the image.""" 

523 return self.image.bbox