Coverage for python / lsst / images / _masked_image.py: 35%

180 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-28 09:01 +0000

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14__all__ = ("MaskedImage", "MaskedImageSerializationModel") 

15 

16import functools 

17from collections.abc import Mapping 

18from contextlib import ExitStack 

19from types import EllipsisType 

20from typing import Any, Literal, overload 

21 

22import astropy.io.fits 

23import astropy.units 

24import astropy.wcs 

25import numpy as np 

26import pydantic 

27from astro_metadata_translator import ObservationInfo 

28 

29from lsst.resources import ResourcePath, ResourcePathExpression 

30 

31from . import fits 

32from ._generalized_image import GeneralizedImage 

33from ._geom import Box 

34from ._image import Image, ImageSerializationModel 

35from ._mask import Mask, MaskPlane, MaskSchema, MaskSerializationModel 

36from ._transforms import Frame, Projection, ProjectionSerializationModel 

37from .serialization import ArchiveTree, InputArchive, MetadataValue, OutputArchive 

38from .utils import is_none 

39 

40 

41class MaskedImage(GeneralizedImage): 

42 """A multi-plane image with data (image), mask, and variance planes. 

43 

44 Parameters 

45 ---------- 

46 image 

47 The main image plane. If this has a `Projection`, it will be used 

48 for all planes unless a ``projection`` is passed separately. 

49 mask 

50 A bitmask image that annotates the main image plane. Must have the 

51 same bounding box as ``image`` if provided. Any attached projection 

52 is replaced (possibly by `None`). 

53 variance 

54 The per-pixel uncertainty of the main image as an image of variance 

55 values. Must have the same bounding box as ``image`` if provided, and 

56 its units must be the square of ``image.unit`` or `None`. 

57 Values default to ``1.0``. Any attached projection is replaced 

58 (possibly by `None`). 

59 mask_schema 

60 Schema for the mask plane. Must be provided if and only if ``mask`` is 

61 not provided. 

62 projection 

63 Projection that maps the pixel grid to the sky. 

64 obs_info 

65 General information about the associated observation in standardized 

66 form. 

67 metadata 

68 Arbitrary flexible metadata to associate with the image. 

69 """ 

70 

71 def __init__( 

72 self, 

73 image: Image, 

74 *, 

75 mask: Mask | None = None, 

76 variance: Image | None = None, 

77 mask_schema: MaskSchema | None = None, 

78 projection: Projection | None = None, 

79 obs_info: ObservationInfo | None = None, 

80 metadata: dict[str, MetadataValue] | None = None, 

81 ): 

82 super().__init__(metadata) 

83 if projection is None: 

84 projection = image.projection 

85 else: 

86 image = image.view(projection=projection) 

87 if obs_info is None: 

88 obs_info = image.obs_info 

89 else: 

90 image = image.view(obs_info=obs_info) 

91 if mask is None: 

92 if mask_schema is None: 

93 raise TypeError("'mask_schema' must be provided if 'mask' is not.") 

94 mask = Mask(schema=mask_schema, bbox=image.bbox, projection=projection, obs_info=obs_info) 

95 elif mask_schema is not None: 

96 raise TypeError("'mask_schema' may not be provided if 'mask' is.") 

97 else: 

98 if image.bbox != mask.bbox: 

99 raise ValueError(f"Image ({image.bbox}) and mask ({mask.bbox}) bboxes do not agree.") 

100 mask = mask.view(projection=projection, obs_info=obs_info) 

101 if variance is None: 

102 variance = Image( 

103 1.0, 

104 dtype=np.float32, 

105 bbox=image.bbox, 

106 unit=None if image.unit is None else image.unit**2, 

107 projection=projection, 

108 obs_info=obs_info, 

109 ) 

110 else: 

111 if image.bbox != variance.bbox: 

112 raise ValueError(f"Image ({image.bbox}) and variance ({variance.bbox}) bboxes do not agree.") 

113 variance = variance.view(projection=projection, obs_info=obs_info) 

114 if image.unit is None: 

115 if variance.unit is not None: 

116 raise ValueError(f"Image has no units but variance does ({variance.unit}).") 

117 elif variance.unit is None: 

118 variance = variance.view(unit=image.unit**2) 

119 elif variance.unit != image.unit**2: 

120 raise ValueError( 

121 f"Variance unit ({variance.unit}) should be the square of the image unit ({image.unit})." 

122 ) 

123 self._image = image 

124 self._mask = mask 

125 self._variance = variance 

126 

127 @property 

128 def image(self) -> Image: 

129 """The main image plane (`Image`).""" 

130 return self._image 

131 

132 @property 

133 def mask(self) -> Mask: 

134 """The mask plane (`Mask`).""" 

135 return self._mask 

136 

137 @property 

138 def variance(self) -> Image: 

139 """The variance plane (`Image`).""" 

140 return self._variance 

141 

142 @property 

143 def bbox(self) -> Box: 

144 """The bounding box shared by all three image planes (`Box`).""" 

145 return self._image.bbox 

146 

147 @property 

148 def unit(self) -> astropy.units.UnitBase | None: 

149 """The units of the image plane (`astropy.units.Unit` | `None`).""" 

150 return self._image.unit 

151 

152 @property 

153 def projection(self) -> Projection[Any] | None: 

154 """The projection that maps the pixel grid to the sky 

155 (`Projection` | `None`). 

156 """ 

157 return self._image.projection 

158 

159 @property 

160 def obs_info(self) -> ObservationInfo | None: 

161 """General information about the associated observation in standard 

162 form. (`~astro_metadata_translator.ObservationInfo` | `None`). 

163 """ 

164 return self._image.obs_info 

165 

166 def __getitem__(self, bbox: Box | EllipsisType) -> MaskedImage: 

167 if bbox is ...: 

168 return self 

169 super().__getitem__(bbox) 

170 return self._transfer_metadata( 

171 MaskedImage( 

172 # Projection and obs_info propagate from the image. 

173 self.image[bbox], 

174 mask=self.mask[bbox], 

175 variance=self.variance[bbox], 

176 ), 

177 bbox=bbox, 

178 ) 

179 

180 def __setitem__(self, bbox: Box | EllipsisType, value: MaskedImage) -> None: 

181 self._image[bbox] = value.image 

182 self._mask[bbox] = value.mask 

183 self._variance[bbox] = value.variance 

184 

185 def __str__(self) -> str: 

186 return f"MaskedImage({self.image!s}, {list(self.mask.schema.names)})" 

187 

188 def __repr__(self) -> str: 

189 return f"MaskedImage({self.image!r}, mask_schema={self.mask.schema!r})" 

190 

191 def copy(self) -> MaskedImage: 

192 """Deep-copy the masked image and metadata.""" 

193 return self._transfer_metadata( 

194 MaskedImage(image=self._image.copy(), mask=self._mask.copy(), variance=self._variance.copy()), 

195 copy=True, 

196 ) 

197 

198 def serialize(self, archive: OutputArchive[Any]) -> MaskedImageSerializationModel: 

199 """Serialize the masked image to an output archive. 

200 

201 Parameters 

202 ---------- 

203 archive 

204 Archive to write to. 

205 """ 

206 serialized_image = archive.serialize_direct( 

207 "image", functools.partial(self.image.serialize, save_projection=False, save_obs_info=False) 

208 ) 

209 serialized_mask = archive.serialize_direct( 

210 "mask", functools.partial(self.mask.serialize, save_projection=False, save_obs_info=False) 

211 ) 

212 serialized_variance = archive.serialize_direct( 

213 "variance", functools.partial(self.variance.serialize, save_projection=False, save_obs_info=False) 

214 ) 

215 serialized_projection = ( 

216 archive.serialize_direct("projection", self.projection.serialize) 

217 if self.projection is not None 

218 else None 

219 ) 

220 return MaskedImageSerializationModel( 

221 image=serialized_image, 

222 mask=serialized_mask, 

223 variance=serialized_variance, 

224 projection=serialized_projection, 

225 obs_info=self.obs_info, 

226 metadata=self.metadata, 

227 ) 

228 

229 @staticmethod 

230 def deserialize( 

231 model: MaskedImageSerializationModel[Any], archive: InputArchive[Any], *, bbox: Box | None = None 

232 ) -> MaskedImage: 

233 """Deserialize an image from an input archive. 

234 

235 Parameters 

236 ---------- 

237 model 

238 A Pydantic model representation of the image, holding references 

239 to data stored in the archive. 

240 archive 

241 Archive to read from. 

242 bbox 

243 Bounding box of a subimage to read instead. 

244 """ 

245 image = Image.deserialize(model.image, archive, bbox=bbox) 

246 mask = Mask.deserialize(model.mask, archive, bbox=bbox) 

247 variance = Image.deserialize(model.variance, archive, bbox=bbox) 

248 projection = ( 

249 Projection.deserialize(model.projection, archive) if model.projection is not None else None 

250 ) 

251 return MaskedImage( 

252 image, mask=mask, variance=variance, projection=projection, obs_info=model.obs_info 

253 )._finish_deserialize(model) 

254 

255 @staticmethod 

256 def _get_archive_tree_type[P: pydantic.BaseModel]( 

257 pointer_type: type[P], 

258 ) -> type[MaskedImageSerializationModel[P]]: 

259 """Return the serialization model type for this object for an archive 

260 type that uses the given pointer type. 

261 """ 

262 return MaskedImageSerializationModel[pointer_type] # type: ignore 

263 

264 def write_fits( 

265 self, 

266 filename: str, 

267 *, 

268 image_compression: fits.FitsCompressionOptions | None = fits.FitsCompressionOptions.DEFAULT, 

269 mask_compression: fits.FitsCompressionOptions | None = fits.FitsCompressionOptions.DEFAULT, 

270 variance_compression: fits.FitsCompressionOptions | None = fits.FitsCompressionOptions.DEFAULT, 

271 compression_seed: int | None = None, 

272 ) -> None: 

273 """Write the image to a FITS file. 

274 

275 Parameters 

276 ---------- 

277 filename 

278 Name of the file to write to. Must be a local file. 

279 image_compression 

280 Compression options for the `image` plane. 

281 mask_compression 

282 Compression options for the `mask` plane. 

283 variance_compression 

284 Compression options for the `variance` plane. 

285 compression_seed 

286 A FITS tile compression seed to use whenever the configured 

287 compression seed is `None` or (for backwards compatibility) ``0``. 

288 This value is then incremented every time it is used. 

289 """ 

290 compression_options = {} 

291 if image_compression is not fits.FitsCompressionOptions.DEFAULT: 

292 compression_options["image"] = image_compression 

293 if mask_compression is not fits.FitsCompressionOptions.DEFAULT: 

294 compression_options["mask"] = mask_compression 

295 if variance_compression is not fits.FitsCompressionOptions.DEFAULT: 

296 compression_options["variance"] = variance_compression 

297 fits.write(self, filename, compression_options=compression_options, compression_seed=compression_seed) 

298 

299 @classmethod 

300 def read_fits(cls, url: ResourcePathExpression, *, bbox: Box | None = None) -> MaskedImage: 

301 """Read an image from a FITS file. 

302 

303 Parameters 

304 ---------- 

305 url 

306 URL of the file to read; may be any type supported by 

307 `lsst.resources.ResourcePath`. 

308 bbox 

309 Bounding box of a subimage to read instead. 

310 """ 

311 return fits.read(cls, url, bbox=bbox).deserialized 

312 

313 @staticmethod 

314 def from_legacy( 

315 legacy: Any, 

316 *, 

317 unit: astropy.units.Unit | None = None, 

318 plane_map: Mapping[str, MaskPlane] | None = None, 

319 ) -> MaskedImage: 

320 """Convert from an `lsst.afw.image.MaskedImage` instance. 

321 

322 Parameters 

323 ---------- 

324 legacy 

325 An `lsst.afw.image.MaskedImage` instance that will share image and 

326 variance (but not mask) pixel data with the returned object. 

327 unit 

328 Units of the image. 

329 plane_map 

330 A mapping from legacy mask plane name to the new plane name and 

331 description. 

332 """ 

333 return MaskedImage( 

334 image=Image.from_legacy(legacy.getImage(), unit), 

335 mask=Mask.from_legacy(legacy.getMask(), plane_map), 

336 variance=Image.from_legacy(legacy.getVariance()), 

337 ) 

338 

339 def to_legacy(self, *, copy: bool | None = None, plane_map: Mapping[str, MaskPlane] | None = None) -> Any: 

340 """Convert to an `lsst.afw.image.MaskedImage` instance. 

341 

342 Parameters 

343 ---------- 

344 copy 

345 If `True`, always copy the image and variance pixel data. 

346 If `False`, return a view, and raise `TypeError` if the pixel data 

347 is read-only (this is not supported by afw). If `None`, onyl if 

348 the pixel data is read-only. Mask pixel data is always copied. 

349 plane_map 

350 A mapping from legacy mask plane name to the new plane name and 

351 description. 

352 """ 

353 import lsst.afw.image 

354 

355 return lsst.afw.image.MaskedImage( 

356 self.image.to_legacy(copy=copy), 

357 mask=self.mask.to_legacy(plane_map), 

358 variance=self.variance.to_legacy(copy=copy), 

359 dtype=self.image.array.dtype, 

360 ) 

361 

362 @overload 

363 @staticmethod 

364 def read_legacy( 364 ↛ exitline 364 didn't return from function 'read_legacy' because

365 uri: ResourcePathExpression, 

366 *, 

367 preserve_quantization: bool = False, 

368 component: Literal["image"], 

369 fits_wcs_frame: Frame | None = None, 

370 ) -> Image: ... 

371 

372 @overload 

373 @staticmethod 

374 def read_legacy( 374 ↛ exitline 374 didn't return from function 'read_legacy' because

375 uri: ResourcePathExpression, 

376 *, 

377 plane_map: Mapping[str, MaskPlane] | None = None, 

378 component: Literal["mask"], 

379 fits_wcs_frame: Frame | None = None, 

380 ) -> Mask: ... 

381 

382 @overload 

383 @staticmethod 

384 def read_legacy( 384 ↛ exitline 384 didn't return from function 'read_legacy' because

385 uri: ResourcePathExpression, 

386 *, 

387 preserve_quantization: bool = False, 

388 component: Literal["variance"], 

389 fits_wcs_frame: Frame | None = None, 

390 ) -> Image: ... 

391 

392 @overload 

393 @staticmethod 

394 def read_legacy( 394 ↛ exitline 394 didn't return from function 'read_legacy' because

395 uri: ResourcePathExpression, 

396 *, 

397 preserve_quantization: bool = False, 

398 plane_map: Mapping[str, MaskPlane] | None = None, 

399 component: None = None, 

400 fits_wcs_frame: Frame | None = None, 

401 ) -> MaskedImage: ... 

402 

403 @staticmethod 

404 def read_legacy( 

405 uri: ResourcePathExpression, 

406 *, 

407 preserve_quantization: bool = False, 

408 plane_map: Mapping[str, MaskPlane] | None = None, 

409 component: Literal["image", "mask", "variance"] | None = None, 

410 fits_wcs_frame: Frame | None = None, 

411 ) -> Any: 

412 """Read a FITS file written by `lsst.afw.image.MaskedImage.writeFits`. 

413 

414 Parameters 

415 ---------- 

416 uri 

417 URI or file name. 

418 preserve_quantization 

419 If `True`, ensure that writing the masked image back out again will 

420 exactly preserve quantization-compressed pixel values. This causes 

421 the image and variance plane arrays to be marked as read-only and 

422 stores the original binary table data for those planes in memory. 

423 If the `MaskedImage` is copied, the precompressed pixel values are 

424 not transferred to the copy. 

425 plane_map 

426 A mapping from legacy mask plane name to the new plane name and 

427 description. 

428 component 

429 A component to read instead of the full image. 

430 fits_wcs_frame 

431 If not `None` and the HDU containing the image plane has a FITS 

432 WCS, attach a `Projection` to the returned masked image by 

433 converting that WCS. When ``component`` is one of ``"image"``, 

434 ``"mask"``, or ``"variance"``, a FITS WCS from the component HDU 

435 is used instead (all three should have the same WCS). 

436 """ 

437 fs, fspath = ResourcePath(uri).to_fsspec() 

438 with fs.open(fspath) as stream, astropy.io.fits.open(stream) as hdu_list: 

439 return MaskedImage._read_legacy_hdus( 

440 hdu_list, 

441 uri, 

442 preserve_quantization=preserve_quantization, 

443 plane_map=plane_map, 

444 component=component, 

445 fits_wcs_frame=fits_wcs_frame, 

446 ) 

447 

448 @staticmethod 

449 def _read_legacy_hdus( 

450 hdu_list: astropy.io.fits.HDUList, 

451 uri: ResourcePathExpression, 

452 *, 

453 preserve_quantization: bool = False, 

454 plane_map: Mapping[str, MaskPlane] | None = None, 

455 component: Literal["image", "mask", "variance"] | None, 

456 fits_wcs_frame: Frame | None = None, 

457 ) -> Any: 

458 opaque_metadata = fits.FitsOpaqueMetadata() 

459 opaque_metadata.extract_legacy_primary_header(hdu_list[0].header) 

460 image_bintable_hdu: astropy.io.fits.BinTableHDU | None = None 

461 variance_bintable_hdu: astropy.io.fits.BinTableHDU | None = None 

462 result: Any 

463 with ExitStack() as exit_stack: 

464 if preserve_quantization: 

465 fs, fspath = ResourcePath(uri).to_fsspec() 

466 bintable_stream = exit_stack.enter_context(fs.open(fspath)) 

467 bintable_hdu_list = exit_stack.enter_context( 

468 astropy.io.fits.open(bintable_stream, disable_image_compression=True) 

469 ) 

470 image_bintable_hdu = bintable_hdu_list[1] 

471 variance_bintable_hdu = bintable_hdu_list[3] 

472 if component is None or component == "image": 

473 image = Image._read_legacy_hdu( 

474 hdu_list[1], 

475 opaque_metadata, 

476 preserve_bintable=image_bintable_hdu, 

477 fits_wcs_frame=fits_wcs_frame, 

478 ) 

479 if component == "image": 

480 result = image 

481 if component is None or component == "mask": 

482 mask = Mask._read_legacy_hdu( 

483 hdu_list[2], 

484 opaque_metadata, 

485 plane_map=plane_map, 

486 fits_wcs_frame=fits_wcs_frame if component is not None else None, 

487 ) 

488 if component == "mask": 

489 result = mask 

490 if component is None or component == "variance": 

491 variance = Image._read_legacy_hdu( 

492 hdu_list[3], 

493 opaque_metadata, 

494 preserve_bintable=variance_bintable_hdu, 

495 fits_wcs_frame=fits_wcs_frame if component is not None else None, 

496 ) 

497 if component == "variance": 

498 result = variance 

499 if component is None: 

500 result = MaskedImage(image, mask=mask, variance=variance) 

501 result._opaque_metadata = opaque_metadata 

502 return result 

503 

504 

505class MaskedImageSerializationModel[P: pydantic.BaseModel](ArchiveTree): 

506 """A Pydantic model used to represent a serialized `MaskedImage`.""" 

507 

508 image: ImageSerializationModel[P] = pydantic.Field(description="The main data image.") 

509 mask: MaskSerializationModel[P] = pydantic.Field( 

510 description="Bitmask that annotates the main image's pixels." 

511 ) 

512 variance: ImageSerializationModel[P] = pydantic.Field( 

513 description="Per-pixel variance estimates for the main image." 

514 ) 

515 projection: ProjectionSerializationModel[P] | None = pydantic.Field( 

516 default=None, 

517 exclude_if=is_none, 

518 description="Projection that maps the pixel grid to the sky.", 

519 ) 

520 obs_info: ObservationInfo | None = pydantic.Field( 

521 default=None, 

522 exclude_if=is_none, 

523 description="Standardized description of image metadata", 

524 ) 

525 

526 @property 

527 def bbox(self) -> Box: 

528 """The bounding box of the image.""" 

529 return self.image.bbox