Coverage for python / lsst / images / _masked_image.py: 35%

180 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-01 08:36 +0000

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14__all__ = ("MaskedImage", "MaskedImageSerializationModel") 

15 

16import functools 

17from collections.abc import Mapping 

18from contextlib import ExitStack 

19from types import EllipsisType 

20from typing import Any, Literal, overload 

21 

22import astropy.io.fits 

23import astropy.units 

24import astropy.wcs 

25import numpy as np 

26import pydantic 

27from astro_metadata_translator import ObservationInfo 

28 

29from lsst.resources import ResourcePath, ResourcePathExpression 

30 

31from . import fits 

32from ._generalized_image import GeneralizedImage 

33from ._geom import Box 

34from ._image import Image, ImageSerializationModel 

35from ._mask import Mask, MaskPlane, MaskSchema, MaskSerializationModel 

36from ._transforms import Frame, Projection, ProjectionSerializationModel 

37from .serialization import ArchiveTree, InputArchive, MetadataValue, OutputArchive 

38from .utils import is_none 

39 

40 

41class MaskedImage(GeneralizedImage): 

42 """A multi-plane image with data (image), mask, and variance planes. 

43 

44 Parameters 

45 ---------- 

46 image 

47 The main image plane. If this has a `Projection`, it will be used 

48 for all planes unless a ``projection`` is passed separately. 

49 mask 

50 A bitmask image that annotates the main image plane. Must have the 

51 same bounding box as ``image`` if provided. Any attached projection 

52 is replaced (possibly by `None`). 

53 variance 

54 The per-pixel uncertainty of the main image as an image of variance 

55 values. Must have the same bounding box as ``image`` if provided, and 

56 its units must be the square of ``image.unit`` or `None`. 

57 Values default to ``1.0``. Any attached projection is replaced 

58 (possibly by `None`). 

59 mask_schema 

60 Schema for the mask plane. Must be provided if and only if ``mask`` is 

61 not provided. 

62 projection 

63 Projection that maps the pixel grid to the sky. 

64 obs_info 

65 General information about the associated observation in standardized 

66 form. 

67 metadata 

68 Arbitrary flexible metadata to associate with the image. 

69 """ 

70 

71 def __init__( 

72 self, 

73 image: Image, 

74 *, 

75 mask: Mask | None = None, 

76 variance: Image | None = None, 

77 mask_schema: MaskSchema | None = None, 

78 projection: Projection | None = None, 

79 obs_info: ObservationInfo | None = None, 

80 metadata: dict[str, MetadataValue] | None = None, 

81 ): 

82 super().__init__(metadata) 

83 if projection is None: 

84 projection = image.projection 

85 else: 

86 image = image.view(projection=projection) 

87 if obs_info is None: 

88 obs_info = image.obs_info 

89 else: 

90 image = image.view(obs_info=obs_info) 

91 if mask is None: 

92 if mask_schema is None: 

93 raise TypeError("'mask_schema' must be provided if 'mask' is not.") 

94 mask = Mask(schema=mask_schema, bbox=image.bbox, projection=projection, obs_info=obs_info) 

95 elif mask_schema is not None: 

96 raise TypeError("'mask_schema' may not be provided if 'mask' is.") 

97 else: 

98 if image.bbox != mask.bbox: 

99 raise ValueError(f"Image ({image.bbox}) and mask ({mask.bbox}) bboxes do not agree.") 

100 mask = mask.view(projection=projection, obs_info=obs_info) 

101 if variance is None: 

102 variance = Image( 

103 1.0, 

104 dtype=np.float32, 

105 bbox=image.bbox, 

106 unit=None if image.unit is None else image.unit**2, 

107 projection=projection, 

108 obs_info=obs_info, 

109 ) 

110 else: 

111 if image.bbox != variance.bbox: 

112 raise ValueError(f"Image ({image.bbox}) and variance ({variance.bbox}) bboxes do not agree.") 

113 variance = variance.view(projection=projection, obs_info=obs_info) 

114 if image.unit is None: 

115 if variance.unit is not None: 

116 raise ValueError(f"Image has no units but variance does ({variance.unit}).") 

117 elif variance.unit is None: 

118 variance = variance.view(unit=image.unit**2) 

119 elif variance.unit != image.unit**2: 

120 raise ValueError( 

121 f"Variance unit ({variance.unit}) should be the square of the image unit ({image.unit})." 

122 ) 

123 self._image = image 

124 self._mask = mask 

125 self._variance = variance 

126 

127 @property 

128 def image(self) -> Image: 

129 """The main image plane (`~lsst.images.Image`).""" 

130 return self._image 

131 

132 @property 

133 def mask(self) -> Mask: 

134 """The mask plane (`~lsst.images.Mask`).""" 

135 return self._mask 

136 

137 @property 

138 def variance(self) -> Image: 

139 """The variance plane (`~lsst.images.Image`).""" 

140 return self._variance 

141 

142 @property 

143 def bbox(self) -> Box: 

144 """The bounding box shared by all three image planes 

145 (`~lsst.images.Box`). 

146 """ 

147 return self._image.bbox 

148 

149 @property 

150 def unit(self) -> astropy.units.UnitBase | None: 

151 """The units of the image plane (`astropy.units.Unit` | `None`).""" 

152 return self._image.unit 

153 

154 @property 

155 def projection(self) -> Projection[Any] | None: 

156 """The projection that maps the pixel grid to the sky 

157 (`~lsst.images.Projection` | `None`). 

158 """ 

159 return self._image.projection 

160 

161 @property 

162 def obs_info(self) -> ObservationInfo | None: 

163 """General information about the associated observation in standard 

164 form. (`~astro_metadata_translator.ObservationInfo` | `None`). 

165 """ 

166 return self._image.obs_info 

167 

168 def __getitem__(self, bbox: Box | EllipsisType) -> MaskedImage: 

169 if bbox is ...: 

170 return self 

171 super().__getitem__(bbox) 

172 return self._transfer_metadata( 

173 MaskedImage( 

174 # Projection and obs_info propagate from the image. 

175 self.image[bbox], 

176 mask=self.mask[bbox], 

177 variance=self.variance[bbox], 

178 ), 

179 bbox=bbox, 

180 ) 

181 

182 def __setitem__(self, bbox: Box | EllipsisType, value: MaskedImage) -> None: 

183 self._image[bbox] = value.image 

184 self._mask[bbox] = value.mask 

185 self._variance[bbox] = value.variance 

186 

187 def __str__(self) -> str: 

188 return f"MaskedImage({self.image!s}, {list(self.mask.schema.names)})" 

189 

190 def __repr__(self) -> str: 

191 return f"MaskedImage({self.image!r}, mask_schema={self.mask.schema!r})" 

192 

193 def copy(self) -> MaskedImage: 

194 """Deep-copy the masked image and metadata.""" 

195 return self._transfer_metadata( 

196 MaskedImage(image=self._image.copy(), mask=self._mask.copy(), variance=self._variance.copy()), 

197 copy=True, 

198 ) 

199 

200 def serialize(self, archive: OutputArchive[Any]) -> MaskedImageSerializationModel: 

201 """Serialize the masked image to an output archive. 

202 

203 Parameters 

204 ---------- 

205 archive 

206 Archive to write to. 

207 """ 

208 serialized_image = archive.serialize_direct( 

209 "image", functools.partial(self.image.serialize, save_projection=False, save_obs_info=False) 

210 ) 

211 serialized_mask = archive.serialize_direct( 

212 "mask", functools.partial(self.mask.serialize, save_projection=False, save_obs_info=False) 

213 ) 

214 serialized_variance = archive.serialize_direct( 

215 "variance", functools.partial(self.variance.serialize, save_projection=False, save_obs_info=False) 

216 ) 

217 serialized_projection = ( 

218 archive.serialize_direct("projection", self.projection.serialize) 

219 if self.projection is not None 

220 else None 

221 ) 

222 return MaskedImageSerializationModel( 

223 image=serialized_image, 

224 mask=serialized_mask, 

225 variance=serialized_variance, 

226 projection=serialized_projection, 

227 obs_info=self.obs_info, 

228 metadata=self.metadata, 

229 ) 

230 

231 @staticmethod 

232 def deserialize( 

233 model: MaskedImageSerializationModel[Any], archive: InputArchive[Any], *, bbox: Box | None = None 

234 ) -> MaskedImage: 

235 """Deserialize an image from an input archive. 

236 

237 Parameters 

238 ---------- 

239 model 

240 A Pydantic model representation of the image, holding references 

241 to data stored in the archive. 

242 archive 

243 Archive to read from. 

244 bbox 

245 Bounding box of a subimage to read instead. 

246 """ 

247 image = Image.deserialize(model.image, archive, bbox=bbox) 

248 mask = Mask.deserialize(model.mask, archive, bbox=bbox) 

249 variance = Image.deserialize(model.variance, archive, bbox=bbox) 

250 projection = ( 

251 Projection.deserialize(model.projection, archive) if model.projection is not None else None 

252 ) 

253 return MaskedImage( 

254 image, mask=mask, variance=variance, projection=projection, obs_info=model.obs_info 

255 )._finish_deserialize(model) 

256 

257 @staticmethod 

258 def _get_archive_tree_type[P: pydantic.BaseModel]( 

259 pointer_type: type[P], 

260 ) -> type[MaskedImageSerializationModel[P]]: 

261 """Return the serialization model type for this object for an archive 

262 type that uses the given pointer type. 

263 """ 

264 return MaskedImageSerializationModel[pointer_type] # type: ignore 

265 

266 def write_fits( 

267 self, 

268 filename: str, 

269 *, 

270 image_compression: fits.FitsCompressionOptions | None = fits.FitsCompressionOptions.DEFAULT, 

271 mask_compression: fits.FitsCompressionOptions | None = fits.FitsCompressionOptions.DEFAULT, 

272 variance_compression: fits.FitsCompressionOptions | None = fits.FitsCompressionOptions.DEFAULT, 

273 compression_seed: int | None = None, 

274 ) -> None: 

275 """Write the image to a FITS file. 

276 

277 Parameters 

278 ---------- 

279 filename 

280 Name of the file to write to. Must be a local file. 

281 image_compression 

282 Compression options for the `image` plane. 

283 mask_compression 

284 Compression options for the `mask` plane. 

285 variance_compression 

286 Compression options for the `variance` plane. 

287 compression_seed 

288 A FITS tile compression seed to use whenever the configured 

289 compression seed is `None` or (for backwards compatibility) ``0``. 

290 This value is then incremented every time it is used. 

291 """ 

292 compression_options = {} 

293 if image_compression is not fits.FitsCompressionOptions.DEFAULT: 

294 compression_options["image"] = image_compression 

295 if mask_compression is not fits.FitsCompressionOptions.DEFAULT: 

296 compression_options["mask"] = mask_compression 

297 if variance_compression is not fits.FitsCompressionOptions.DEFAULT: 

298 compression_options["variance"] = variance_compression 

299 fits.write(self, filename, compression_options=compression_options, compression_seed=compression_seed) 

300 

301 @classmethod 

302 def read_fits(cls, url: ResourcePathExpression, *, bbox: Box | None = None) -> MaskedImage: 

303 """Read an image from a FITS file. 

304 

305 Parameters 

306 ---------- 

307 url 

308 URL of the file to read; may be any type supported by 

309 `lsst.resources.ResourcePath`. 

310 bbox 

311 Bounding box of a subimage to read instead. 

312 """ 

313 return fits.read(cls, url, bbox=bbox).deserialized 

314 

315 @staticmethod 

316 def from_legacy( 

317 legacy: Any, 

318 *, 

319 unit: astropy.units.Unit | None = None, 

320 plane_map: Mapping[str, MaskPlane] | None = None, 

321 ) -> MaskedImage: 

322 """Convert from an `lsst.afw.image.MaskedImage` instance. 

323 

324 Parameters 

325 ---------- 

326 legacy 

327 An `lsst.afw.image.MaskedImage` instance that will share image and 

328 variance (but not mask) pixel data with the returned object. 

329 unit 

330 Units of the image. 

331 plane_map 

332 A mapping from legacy mask plane name to the new plane name and 

333 description. 

334 """ 

335 return MaskedImage( 

336 image=Image.from_legacy(legacy.getImage(), unit), 

337 mask=Mask.from_legacy(legacy.getMask(), plane_map), 

338 variance=Image.from_legacy(legacy.getVariance()), 

339 ) 

340 

341 def to_legacy(self, *, copy: bool | None = None, plane_map: Mapping[str, MaskPlane] | None = None) -> Any: 

342 """Convert to an `lsst.afw.image.MaskedImage` instance. 

343 

344 Parameters 

345 ---------- 

346 copy 

347 If `True`, always copy the image and variance pixel data. 

348 If `False`, return a view, and raise `TypeError` if the pixel data 

349 is read-only (this is not supported by afw). If `None`, onyl if 

350 the pixel data is read-only. Mask pixel data is always copied. 

351 plane_map 

352 A mapping from legacy mask plane name to the new plane name and 

353 description. 

354 """ 

355 import lsst.afw.image 

356 

357 return lsst.afw.image.MaskedImage( 

358 self.image.to_legacy(copy=copy), 

359 mask=self.mask.to_legacy(plane_map), 

360 variance=self.variance.to_legacy(copy=copy), 

361 dtype=self.image.array.dtype, 

362 ) 

363 

364 @overload 

365 @staticmethod 

366 def read_legacy( 366 ↛ exitline 366 didn't return from function 'read_legacy' because

367 uri: ResourcePathExpression, 

368 *, 

369 preserve_quantization: bool = False, 

370 component: Literal["image"], 

371 fits_wcs_frame: Frame | None = None, 

372 ) -> Image: ... 

373 

374 @overload 

375 @staticmethod 

376 def read_legacy( 376 ↛ exitline 376 didn't return from function 'read_legacy' because

377 uri: ResourcePathExpression, 

378 *, 

379 plane_map: Mapping[str, MaskPlane] | None = None, 

380 component: Literal["mask"], 

381 fits_wcs_frame: Frame | None = None, 

382 ) -> Mask: ... 

383 

384 @overload 

385 @staticmethod 

386 def read_legacy( 386 ↛ exitline 386 didn't return from function 'read_legacy' because

387 uri: ResourcePathExpression, 

388 *, 

389 preserve_quantization: bool = False, 

390 component: Literal["variance"], 

391 fits_wcs_frame: Frame | None = None, 

392 ) -> Image: ... 

393 

394 @overload 

395 @staticmethod 

396 def read_legacy( 396 ↛ exitline 396 didn't return from function 'read_legacy' because

397 uri: ResourcePathExpression, 

398 *, 

399 preserve_quantization: bool = False, 

400 plane_map: Mapping[str, MaskPlane] | None = None, 

401 component: None = None, 

402 fits_wcs_frame: Frame | None = None, 

403 ) -> MaskedImage: ... 

404 

405 @staticmethod 

406 def read_legacy( 

407 uri: ResourcePathExpression, 

408 *, 

409 preserve_quantization: bool = False, 

410 plane_map: Mapping[str, MaskPlane] | None = None, 

411 component: Literal["image", "mask", "variance"] | None = None, 

412 fits_wcs_frame: Frame | None = None, 

413 ) -> Any: 

414 """Read a FITS file written by `lsst.afw.image.MaskedImage.writeFits`. 

415 

416 Parameters 

417 ---------- 

418 uri 

419 URI or file name. 

420 preserve_quantization 

421 If `True`, ensure that writing the masked image back out again will 

422 exactly preserve quantization-compressed pixel values. This causes 

423 the image and variance plane arrays to be marked as read-only and 

424 stores the original binary table data for those planes in memory. 

425 If the `~lsst.images.MaskedImage` is copied, the precompressed 

426 pixel values are not transferred to the copy. 

427 plane_map 

428 A mapping from legacy mask plane name to the new plane name and 

429 description. 

430 component 

431 A component to read instead of the full image. 

432 fits_wcs_frame 

433 If not `None` and the HDU containing the image plane has a FITS 

434 WCS, attach a `~lsst.images.Projection` to the returned masked 

435 image by converting that WCS. When ``component`` is one of 

436 ``"image"``, ``"mask"``, or ``"variance"``, a FITS WCS from the 

437 component HDU is used instead (all three should have the same WCS). 

438 """ 

439 fs, fspath = ResourcePath(uri).to_fsspec() 

440 with fs.open(fspath) as stream, astropy.io.fits.open(stream) as hdu_list: 

441 return MaskedImage._read_legacy_hdus( 

442 hdu_list, 

443 uri, 

444 preserve_quantization=preserve_quantization, 

445 plane_map=plane_map, 

446 component=component, 

447 fits_wcs_frame=fits_wcs_frame, 

448 ) 

449 

450 @staticmethod 

451 def _read_legacy_hdus( 

452 hdu_list: astropy.io.fits.HDUList, 

453 uri: ResourcePathExpression, 

454 *, 

455 preserve_quantization: bool = False, 

456 plane_map: Mapping[str, MaskPlane] | None = None, 

457 component: Literal["image", "mask", "variance"] | None, 

458 fits_wcs_frame: Frame | None = None, 

459 ) -> Any: 

460 opaque_metadata = fits.FitsOpaqueMetadata() 

461 opaque_metadata.extract_legacy_primary_header(hdu_list[0].header) 

462 image_bintable_hdu: astropy.io.fits.BinTableHDU | None = None 

463 variance_bintable_hdu: astropy.io.fits.BinTableHDU | None = None 

464 result: Any 

465 with ExitStack() as exit_stack: 

466 if preserve_quantization: 

467 fs, fspath = ResourcePath(uri).to_fsspec() 

468 bintable_stream = exit_stack.enter_context(fs.open(fspath)) 

469 bintable_hdu_list = exit_stack.enter_context( 

470 astropy.io.fits.open(bintable_stream, disable_image_compression=True) 

471 ) 

472 image_bintable_hdu = bintable_hdu_list[1] 

473 variance_bintable_hdu = bintable_hdu_list[3] 

474 if component is None or component == "image": 

475 image = Image._read_legacy_hdu( 

476 hdu_list[1], 

477 opaque_metadata, 

478 preserve_bintable=image_bintable_hdu, 

479 fits_wcs_frame=fits_wcs_frame, 

480 ) 

481 if component == "image": 

482 result = image 

483 if component is None or component == "mask": 

484 mask = Mask._read_legacy_hdu( 

485 hdu_list[2], 

486 opaque_metadata, 

487 plane_map=plane_map, 

488 fits_wcs_frame=fits_wcs_frame if component is not None else None, 

489 ) 

490 if component == "mask": 

491 result = mask 

492 if component is None or component == "variance": 

493 variance = Image._read_legacy_hdu( 

494 hdu_list[3], 

495 opaque_metadata, 

496 preserve_bintable=variance_bintable_hdu, 

497 fits_wcs_frame=fits_wcs_frame if component is not None else None, 

498 ) 

499 if component == "variance": 

500 result = variance 

501 if component is None: 

502 result = MaskedImage(image, mask=mask, variance=variance) 

503 result._opaque_metadata = opaque_metadata 

504 return result 

505 

506 

507class MaskedImageSerializationModel[P: pydantic.BaseModel](ArchiveTree): 

508 """A Pydantic model used to represent a serialized `MaskedImage`.""" 

509 

510 image: ImageSerializationModel[P] = pydantic.Field(description="The main data image.") 

511 mask: MaskSerializationModel[P] = pydantic.Field( 

512 description="Bitmask that annotates the main image's pixels." 

513 ) 

514 variance: ImageSerializationModel[P] = pydantic.Field( 

515 description="Per-pixel variance estimates for the main image." 

516 ) 

517 projection: ProjectionSerializationModel[P] | None = pydantic.Field( 

518 default=None, 

519 exclude_if=is_none, 

520 description="Projection that maps the pixel grid to the sky.", 

521 ) 

522 obs_info: ObservationInfo | None = pydantic.Field( 

523 default=None, 

524 exclude_if=is_none, 

525 description="Standardized description of image metadata", 

526 ) 

527 

528 @property 

529 def bbox(self) -> Box: 

530 """The bounding box of the image.""" 

531 return self.image.bbox