Coverage for python / lsst / images / _visit_image.py: 28%

232 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-15 00:04 +0000

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14__all__ = ("VisitImage", "VisitImageSerializationModel") 

15 

16import functools 

17import warnings 

18from collections.abc import Callable, Mapping, MutableMapping 

19from types import EllipsisType 

20from typing import Any, Literal, cast, overload 

21 

22import astropy.io.fits 

23import astropy.units 

24import astropy.wcs 

25import pydantic 

26from astro_metadata_translator import ObservationInfo, VisitInfoTranslator 

27 

28from ._geom import Box 

29from ._image import Image, ImageSerializationModel 

30from ._mask import Mask, MaskPlane, MaskSchema, MaskSerializationModel, get_legacy_visit_image_mask_planes 

31from ._masked_image import MaskedImage, MaskedImageSerializationModel 

32from ._transforms import DetectorFrame, Projection, ProjectionAstropyView, ProjectionSerializationModel 

33from .fits import FitsOpaqueMetadata 

34from .psfs import ( 

35 GaussianPointSpreadFunction, 

36 GaussianPSFSerializationModel, 

37 PiffSerializationModel, 

38 PiffWrapper, 

39 PointSpreadFunction, 

40 PSFExSerializationModel, 

41 PSFExWrapper, 

42) 

43from .serialization import ArchiveReadError, InputArchive, MetadataValue, OutputArchive 

44 

45 

46def _obs_info_from_md(md: MutableMapping[str, Any], visit_info: Any = None) -> ObservationInfo: 

47 # Try to get an ObservationInfo from the primary header as if 

48 # it's a raw header. Else fallback. 

49 try: 

50 obs_info = ObservationInfo.from_header(md, quiet=True) 

51 except ValueError: 

52 # Not known translator. Must fall back to visit info. If we have 

53 # an actual VisitInfo, serialize it since we know that it will be 

54 # complete. 

55 if visit_info is not None: 

56 from lsst.afw.image import setVisitInfoMetadata 

57 from lsst.daf.base import PropertyList 

58 

59 pl = PropertyList() 

60 setVisitInfoMetadata(pl, visit_info) 

61 # Merge so that we still have access to butler provenance. 

62 md.update(pl) 

63 

64 # Try the given header looking for VisitInfo hints. 

65 # We get lots of warnings if nothing can be found. Currently 

66 # no way to disable those without capturing them. 

67 obs_info = ObservationInfo.from_header(md, translator_class=VisitInfoTranslator, quiet=True) 

68 return obs_info 

69 

70 

71class VisitImage(MaskedImage): 

72 """A calibrated single-visit image. 

73 

74 Parameters 

75 ---------- 

76 image 

77 The main image plane. If this has a `Projection`, it will be used 

78 for all planes unless a ``projection`` is passed separately. 

79 mask 

80 A bitmask image that annotates the main image plane. Must have the 

81 same bounding box as ``image`` if provided. Any attached projection 

82 is replaced (possibly by `None`). 

83 variance 

84 The per-pixel uncertainty of the main image as an image of variance 

85 values. Must have the same bounding box as ``image`` if provided, and 

86 its units must be the square of ``image.unit`` or `None`. 

87 Values default to ``1.0``. Any attached projection is replaced 

88 (possibly by `None`). 

89 mask_schema 

90 Schema for the mask plane. Must be provided if and only if ``mask`` is 

91 not provided. 

92 projection 

93 Projection that maps the pixel grid to the sky. Can only be `None` if 

94 a projection is already attached to ``image``. 

95 psf 

96 Point-spread function model for this image, or an exception explaining 

97 why it could not be read (to be raised if the PSF is requested later). 

98 obs_info 

99 General information about this visit in standardized form. 

100 metadata 

101 Arbitrary flexible metadata to associate with the image. 

102 """ 

103 

104 def __init__( 

105 self, 

106 image: Image, 

107 *, 

108 mask: Mask | None = None, 

109 variance: Image | None = None, 

110 mask_schema: MaskSchema | None = None, 

111 projection: Projection[DetectorFrame] | None = None, 

112 obs_info: ObservationInfo | None = None, 

113 psf: PointSpreadFunction | ArchiveReadError, 

114 metadata: dict[str, MetadataValue] | None = None, 

115 ): 

116 super().__init__( 

117 image, 

118 mask=mask, 

119 variance=variance, 

120 mask_schema=mask_schema, 

121 projection=projection, 

122 obs_info=obs_info, 

123 metadata=metadata, 

124 ) 

125 if self.image.unit is None: 

126 raise TypeError("The image component of a VisitImage must have units.") 

127 if self.image.projection is None: 

128 raise TypeError("The projection component of a VisitImage cannot be None.") 

129 if self.image.obs_info is None: 

130 raise TypeError("The observation info component of a VisitImage cannot be None.") 

131 if not isinstance(self.image.projection.pixel_frame, DetectorFrame): 

132 raise TypeError("The projection's pixel frame must be a DetectorFrame for VisitImage.") 

133 self._psf = psf 

134 

135 @property 

136 def unit(self) -> astropy.units.UnitBase: 

137 """The units of the image plane (`astropy.units.Unit`).""" 

138 return cast(astropy.units.UnitBase, super().unit) 

139 

140 @property 

141 def projection(self) -> Projection[DetectorFrame]: 

142 """The projection that maps the pixel grid to the sky 

143 (`Projection` [`DetectorFrame`]). 

144 """ 

145 return cast(Projection[DetectorFrame], super().projection) 

146 

147 @property 

148 def obs_info(self) -> ObservationInfo: 

149 """General information about this observation in standard form. 

150 (`~astro_metadata_translator.ObservationInfo`). 

151 """ 

152 obs_info = self.image.obs_info 

153 assert obs_info is not None 

154 return obs_info 

155 

156 @property 

157 def astropy_wcs(self) -> ProjectionAstropyView: 

158 """An Astropy WCS for the pixel arrays (`ProjectionAstropyView`). 

159 

160 Notes 

161 ----- 

162 As expected for Astropy WCS objects, this defines pixel coordinates 

163 such that the first row and column in the arrays are ``(0, 0)``, not 

164 ``bbox.start``, as is the case for `projection`. 

165 

166 This object satisfies the `astropy.wcs.wcsapi.BaseHighLevelWCS` and 

167 `astropy.wcs.wcsapi.BaseLowLevelWCS` interfaces, but it is not an 

168 `astropy.wcs.WCS` (use `fits_wcs` for that). 

169 """ 

170 return cast(ProjectionAstropyView, super().astropy_wcs) 

171 

172 @property 

173 def psf(self) -> PointSpreadFunction: 

174 """The point-spread function model for this image 

175 (`.psfs.PointSpreadFunction`). 

176 """ 

177 if isinstance(self._psf, ArchiveReadError): 

178 raise self._psf 

179 return self._psf 

180 

181 def __getitem__(self, bbox: Box | EllipsisType) -> VisitImage: 

182 if bbox is ...: 

183 return self 

184 super().__getitem__(bbox) 

185 return self._transfer_metadata( 

186 VisitImage( 

187 self.image[bbox], 

188 mask=self.mask[bbox], 

189 variance=self.variance[bbox], 

190 projection=self.projection, 

191 psf=self.psf, 

192 obs_info=self.obs_info, 

193 ), 

194 bbox=bbox, 

195 ) 

196 

197 def __str__(self) -> str: 

198 return f"VisitImage({self.image!s}, {list(self.mask.schema.names)})" 

199 

200 def __repr__(self) -> str: 

201 return f"VisitImage({self.image!r}, mask_schema={self.mask.schema!r})" 

202 

203 def copy(self) -> VisitImage: 

204 """Deep-copy the visit image.""" 

205 return self._transfer_metadata( 

206 VisitImage( 

207 image=self._image.copy(), 

208 mask=self._mask.copy(), 

209 variance=self._variance.copy(), 

210 psf=self._psf, 

211 obs_info=self.obs_info, 

212 ), 

213 copy=True, 

214 ) 

215 

216 def serialize(self, archive: OutputArchive[Any]) -> VisitImageSerializationModel: 

217 serialized_image = archive.serialize_direct( 

218 "image", functools.partial(self.image.serialize, save_projection=False) 

219 ) 

220 serialized_mask = archive.serialize_direct( 

221 "mask", functools.partial(self.mask.serialize, save_projection=False) 

222 ) 

223 serialized_variance = archive.serialize_direct( 

224 "variance", functools.partial(self.variance.serialize, save_projection=False) 

225 ) 

226 serialized_projection = archive.serialize_direct("projection", self.projection.serialize) 

227 serialized_psf: PiffSerializationModel | PSFExSerializationModel | GaussianPSFSerializationModel 

228 match self._psf: 

229 # MyPy is able to figure things out here with this match statement, 

230 # but not a single isinstance check on both types. 

231 case PiffWrapper(): 

232 serialized_psf = archive.serialize_direct("psf", self._psf.serialize) 

233 case PSFExWrapper(): 

234 serialized_psf = archive.serialize_direct("psf", self._psf.serialize) 

235 case GaussianPointSpreadFunction(): 

236 serialized_psf = archive.serialize_direct("psf", self._psf.serialize) 

237 case _: 

238 raise TypeError( 

239 f"Cannot serialize VisitImage with unrecognized PSF type {type(self._psf).__name__}." 

240 ) 

241 return VisitImageSerializationModel( 

242 image=serialized_image, 

243 mask=serialized_mask, 

244 variance=serialized_variance, 

245 projection=serialized_projection, 

246 psf=serialized_psf, 

247 obs_info=self.obs_info, 

248 metadata=self.metadata, 

249 ) 

250 

251 # Type-checkers want the model argument to only require 

252 # MaskedImageSerializationModel[Any], and they'd be absolutely right if 

253 # this were a regular instance method. But whether Liskov substitution 

254 # applies to classmethods and staticmethods is sort of context-dependent, 

255 # and here we do not want it to. 

256 @staticmethod 

257 def deserialize( 

258 model: VisitImageSerializationModel[Any], # type: ignore[override] 

259 archive: InputArchive[Any], 

260 *, 

261 bbox: Box | None = None, 

262 ) -> VisitImage: 

263 masked_image = MaskedImage.deserialize(model, archive, bbox=bbox) 

264 psf = model.deserialize_psf(archive) 

265 projection = Projection.deserialize(model.projection, archive) 

266 return VisitImage( 

267 masked_image.image, 

268 mask=masked_image.mask, 

269 variance=masked_image.variance, 

270 psf=psf, 

271 projection=projection, 

272 obs_info=model.obs_info, 

273 )._finish_deserialize(model) 

274 

275 @staticmethod 

276 def _get_archive_tree_type[P: pydantic.BaseModel]( 

277 pointer_type: type[P], 

278 ) -> type[VisitImageSerializationModel[P]]: 

279 """Return the serialization model type for this object for an archive 

280 type that uses the given pointer type. 

281 """ 

282 return VisitImageSerializationModel[pointer_type] # type: ignore 

283 

284 # write_fits and read_fits inherited from MaskedImage. 

285 

286 @staticmethod 

287 def from_legacy( 

288 legacy: Any, 

289 *, 

290 unit: astropy.units.Unit | None = None, 

291 plane_map: Mapping[str, MaskPlane] | None = None, 

292 instrument: str | None = None, 

293 visit: int | None = None, 

294 ) -> VisitImage: 

295 """Convert from an `lsst.afw.image.Exposure` instance. 

296 

297 Parameters 

298 ---------- 

299 legacy 

300 An `lsst.afw.image.Exposure` instance that will share image and 

301 variance (but not mask) pixel data with the returned object. 

302 unit 

303 Units of the image. If not provided, the ``BUNIT`` metadata 

304 key will be used, if available. 

305 plane_map 

306 A mapping from legacy mask plane name to the new plane name and 

307 description. If `None` (default) 

308 `get_legacy_visit_image_mask_planes` is used. 

309 instrument 

310 Name of the instrument. Extracted from the metadata if not 

311 provided. 

312 visit 

313 ID of the visit. Extracted from the metadata if not provided. 

314 """ 

315 if plane_map is None: 

316 plane_map = get_legacy_visit_image_mask_planes() 

317 md = legacy.getMetadata() 

318 obs_info = _obs_info_from_md(md, visit_info=legacy.info.getVisitInfo()) 

319 instrument = _extract_or_check_header( 

320 "LSST BUTLER DATAID INSTRUMENT", instrument, md, obs_info.instrument, str 

321 ) 

322 visit = _extract_or_check_header("LSST BUTLER DATAID VISIT", visit, md, None, int) 

323 unit = _extract_or_check_header( 

324 "BUNIT", unit, md, None, lambda x: astropy.units.Unit(x, format="fits") 

325 ) 

326 legacy_wcs = legacy.getWcs() 

327 if legacy_wcs is None: 

328 raise ValueError("Exposure does not have a SkyWcs.") 

329 legacy_detector = legacy.getDetector() 

330 if legacy_detector is None: 

331 raise ValueError("Exposure does not have a Detector.") 

332 detector_bbox = Box.from_legacy(legacy_detector.getBBox()) 

333 opaque_fits_metadata = FitsOpaqueMetadata() 

334 primary_header = astropy.io.fits.Header() 

335 with warnings.catch_warnings(): 

336 # Silence warnings about long keys becoming HIERARCH. 

337 warnings.simplefilter("ignore", category=astropy.io.fits.verify.VerifyWarning) 

338 primary_header.update(md.toOrderedDict()) 

339 opaque_fits_metadata.extract_legacy_primary_header(primary_header) 

340 projection = Projection.from_legacy( 

341 legacy_wcs, 

342 DetectorFrame( 

343 instrument=instrument, 

344 visit=visit, 

345 detector=legacy_detector.getId(), 

346 bbox=detector_bbox, 

347 ), 

348 ) 

349 legacy_psf = legacy.getPsf() 

350 if legacy_psf is None: 

351 raise ValueError("Exposure file does not have a Psf.") 

352 psf = PointSpreadFunction.from_legacy(legacy_psf, bounds=detector_bbox) 

353 masked_image = MaskedImage.from_legacy(legacy.getMaskedImage(), unit=unit, plane_map=plane_map) 

354 result = VisitImage( 

355 image=masked_image.image.view(unit=unit), 

356 mask=masked_image.mask, 

357 variance=masked_image.variance, 

358 projection=projection, 

359 psf=psf, 

360 obs_info=obs_info, 

361 ) 

362 

363 result._opaque_metadata = opaque_fits_metadata 

364 return result 

365 

366 @overload # type: ignore[override] 

367 @staticmethod 

368 def read_legacy( 368 ↛ exitline 368 didn't return from function 'read_legacy' because

369 filename: str, 

370 *, 

371 component: Literal["bbox"], 

372 ) -> Box: ... 

373 

374 @overload 

375 @staticmethod 

376 def read_legacy( 376 ↛ exitline 376 didn't return from function 'read_legacy' because

377 filename: str, 

378 *, 

379 preserve_quantization: bool = False, 

380 instrument: str | None = None, 

381 visit: int | None = None, 

382 component: Literal["image"], 

383 ) -> Image: ... 

384 

385 @overload 

386 @staticmethod 

387 def read_legacy( 387 ↛ exitline 387 didn't return from function 'read_legacy' because

388 filename: str, 

389 *, 

390 plane_map: Mapping[str, MaskPlane] | None = None, 

391 instrument: str | None = None, 

392 visit: int | None = None, 

393 component: Literal["mask"], 

394 ) -> Mask: ... 

395 

396 @overload 

397 @staticmethod 

398 def read_legacy( 398 ↛ exitline 398 didn't return from function 'read_legacy' because

399 filename: str, 

400 *, 

401 preserve_quantization: bool = False, 

402 instrument: str | None = None, 

403 visit: int | None = None, 

404 component: Literal["variance"], 

405 ) -> Image: ... 

406 

407 @overload 

408 @staticmethod 

409 def read_legacy( 409 ↛ exitline 409 didn't return from function 'read_legacy' because

410 filename: str, 

411 *, 

412 instrument: str | None = None, 

413 visit: int | None = None, 

414 component: Literal["projection"], 

415 ) -> Projection[DetectorFrame]: ... 

416 

417 @overload 

418 @staticmethod 

419 def read_legacy( 419 ↛ exitline 419 didn't return from function 'read_legacy' because

420 filename: str, 

421 *, 

422 component: Literal["psf"], 

423 ) -> PointSpreadFunction: ... 

424 

425 @overload 

426 @staticmethod 

427 def read_legacy( 427 ↛ exitline 427 didn't return from function 'read_legacy' because

428 filename: str, 

429 *, 

430 component: Literal["obs_info"], 

431 ) -> ObservationInfo: ... 

432 

433 @overload 

434 @staticmethod 

435 def read_legacy( 435 ↛ exitline 435 didn't return from function 'read_legacy' because

436 filename: str, 

437 *, 

438 preserve_quantization: bool = False, 

439 plane_map: Mapping[str, MaskPlane] | None = None, 

440 instrument: str | None = None, 

441 visit: int | None = None, 

442 component: None = None, 

443 ) -> VisitImage: ... 

444 

445 @staticmethod 

446 def read_legacy( # type: ignore[override] 

447 filename: str, 

448 *, 

449 preserve_quantization: bool = False, 

450 plane_map: Mapping[str, MaskPlane] | None = None, 

451 instrument: str | None = None, 

452 visit: int | None = None, 

453 component: Literal["bbox", "image", "mask", "variance", "projection", "psf", "obs_info"] 

454 | None = None, 

455 ) -> Any: 

456 """Read a FITS file written by `lsst.afw.image.Exposure.writeFits`. 

457 

458 Parameters 

459 ---------- 

460 filename 

461 Full name of the file. 

462 preserve_quantization 

463 If `True`, ensure that writing the masked image back out again will 

464 exactly preserve quantization-compressed pixel values. This causes 

465 the image and variance plane arrays to be marked as read-only and 

466 stores the original binary table data for those planes in memory. 

467 If the `MaskedImage` is copied, the precompressed pixel values are 

468 not transferred to the copy. 

469 plane_map 

470 A mapping from legacy mask plane name to the new plane name and 

471 description. If `None` (default) 

472 `get_legacy_visit_image_mask_planes` is used. 

473 instrument 

474 Name of the instrument. Read from the primary header if not 

475 provided. 

476 visit 

477 ID of the visit. Read from the primary header if not 

478 provided. 

479 component 

480 A component to read instead of the full image. 

481 """ 

482 from lsst.afw.image import ExposureFitsReader 

483 

484 reader = ExposureFitsReader(filename) 

485 if component == "bbox": 

486 return Box.from_legacy(reader.readBBox()) 

487 legacy_detector = reader.readDetector() 

488 if legacy_detector is None: 

489 raise ValueError(f"Exposure file {filename!r} does not have a Detector.") 

490 detector_bbox = Box.from_legacy(legacy_detector.getBBox()) 

491 legacy_wcs = None 

492 if component in (None, "image", "mask", "variance", "projection"): 

493 legacy_wcs = reader.readWcs() 

494 if legacy_wcs is None: 

495 raise ValueError(f"Exposure file {filename!r} does not have a SkyWcs.") 

496 if component in (None, "psf"): 

497 legacy_psf = reader.readPsf() 

498 if legacy_psf is None: 

499 raise ValueError(f"Exposure file {filename!r} does not have a Psf.") 

500 psf = PointSpreadFunction.from_legacy(legacy_psf, bounds=detector_bbox) 

501 if component == "psf": 

502 return psf 

503 assert component in (None, "image", "mask", "variance", "projection", "obs_info"), ( 

504 component 

505 ) # for MyPy 

506 with astropy.io.fits.open(filename) as hdu_list: 

507 primary_header = hdu_list[0].header 

508 obs_info = _obs_info_from_md(primary_header) 

509 if component == "obs_info": 

510 return obs_info 

511 instrument = _extract_or_check_header( 

512 "LSST BUTLER DATAID INSTRUMENT", instrument, primary_header, obs_info.instrument, str 

513 ) 

514 visit = _extract_or_check_header("LSST BUTLER DATAID VISIT", visit, primary_header, None, int) 

515 projection = Projection.from_legacy( 

516 legacy_wcs, 

517 DetectorFrame( 

518 instrument=instrument, 

519 visit=visit, 

520 detector=legacy_detector.getId(), 

521 bbox=detector_bbox, 

522 ), 

523 ) 

524 if component == "projection": 

525 return projection 

526 if plane_map is None: 

527 plane_map = get_legacy_visit_image_mask_planes() 

528 assert component != "psf", component # for MyPy 

529 from_masked_image = MaskedImage._read_legacy_hdus( 

530 hdu_list, 

531 filename, 

532 preserve_quantization=preserve_quantization, 

533 plane_map=plane_map, 

534 component=component, 

535 ) 

536 if component is not None: 

537 # This is the image, mask, or variance; attach the projection 

538 # and return 

539 return from_masked_image.view(projection=projection) 

540 result = VisitImage( 

541 from_masked_image.image, 

542 mask=from_masked_image.mask, 

543 variance=from_masked_image.variance, 

544 projection=projection, 

545 psf=psf, 

546 obs_info=obs_info, 

547 ) 

548 result._opaque_metadata = from_masked_image._opaque_metadata 

549 return result 

550 

551 

552class VisitImageSerializationModel[P: pydantic.BaseModel](MaskedImageSerializationModel[P]): 

553 """A Pydantic model used to represent a serialized `VisitImage`.""" 

554 

555 # Inherited attributes are duplicated because that improves the docs 

556 # (some limitation in the sphinx/pydantic integration), and these are 

557 # important docs. 

558 

559 image: ImageSerializationModel[P] = pydantic.Field(description="The main data image.") 

560 mask: MaskSerializationModel[P] = pydantic.Field( 

561 description="Bitmask that annotates the main image's pixels." 

562 ) 

563 variance: ImageSerializationModel[P] = pydantic.Field( 

564 description="Per-pixel variance estimates for the main image." 

565 ) 

566 projection: ProjectionSerializationModel[P] = pydantic.Field( 

567 description="Projection that maps the pixel grid to the sky.", 

568 ) 

569 psf: PiffSerializationModel | PSFExSerializationModel | GaussianPSFSerializationModel | Any = ( 

570 pydantic.Field(union_mode="left_to_right", description="PSF model for the image.") 

571 ) 

572 obs_info: ObservationInfo = pydantic.Field( 

573 description="Standardized description of visit metadata", 

574 ) 

575 

576 def deserialize_psf(self, archive: InputArchive[Any]) -> PointSpreadFunction | ArchiveReadError: 

577 """Finish deserializing the PSF model, or *return* any exception 

578 raised in the attempt. 

579 """ 

580 try: 

581 match self.psf: 

582 case PiffSerializationModel(): 

583 return PiffWrapper.deserialize(self.psf, archive) 

584 case PSFExSerializationModel(): 

585 return PSFExWrapper.deserialize(self.psf, archive) 

586 case GaussianPSFSerializationModel(): 

587 return GaussianPointSpreadFunction.deserialize(self.psf, archive) 

588 case _: 

589 raise ArchiveReadError("PSF model type not recognized.") 

590 except ArchiveReadError as err: 

591 return err 

592 

593 

594def _extract_or_check_value[T]( 

595 key: str, 

596 given_value: T | None, 

597 *sources: tuple[str, T | None], 

598) -> T: 

599 # Compare given value against multiple sources. If given value is not 

600 # supplied return the first non-None value in the reference sources. 

601 if given_value is not None: 

602 for source_name, source_value in sources: 

603 if source_value is not None and source_value != given_value: 

604 raise ValueError( 

605 f"Given value {given_value!r} does not match {source_value!r} from {source_name}." 

606 ) 

607 if source_value is not None: 

608 # Only check the first non-None source rather than checking 

609 # all supplied values. 

610 break 

611 return given_value 

612 

613 for _, source_value in sources: 

614 if source_value is not None: 

615 return source_value 

616 

617 raise ValueError(f"No value found for {key}.") 

618 

619 

620def _extract_or_check_header[T]( 

621 key: str, given_value: T | None, header: Any, obs_info_value: T | None, coerce: Callable[[Any], T] 

622) -> T: 

623 hdr_value: T | None = None 

624 if (hdr_raw_value := header.get(key)) is not None: 

625 hdr_value = coerce(hdr_raw_value) 

626 return _extract_or_check_value( 

627 key, given_value, ("ObservationInfo", obs_info_value), (f"header key {key}", hdr_value) 

628 )