Coverage for python / lsst / images / _visit_image.py: 26%

288 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-06 08:48 +0000

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14__all__ = ("VisitImage", "VisitImageSerializationModel") 

15 

16import warnings 

17from collections.abc import Callable, Mapping, MutableMapping 

18from types import EllipsisType 

19from typing import Any, Literal, cast, overload 

20 

21import astropy.io.fits 

22import astropy.units 

23import astropy.wcs 

24import pydantic 

25from astro_metadata_translator import ObservationInfo, VisitInfoTranslator 

26 

27from ._geom import Box 

28from ._image import Image, ImageSerializationModel 

29from ._mask import Mask, MaskPlane, MaskSchema, MaskSerializationModel, get_legacy_visit_image_mask_planes 

30from ._masked_image import MaskedImage, MaskedImageSerializationModel 

31from ._observation_summary_stats import ObservationSummaryStats 

32from ._transforms import DetectorFrame, Projection, ProjectionAstropyView, ProjectionSerializationModel 

33from .aperture_corrections import ( 

34 ApertureCorrectionMap, 

35 ApertureCorrectionMapSerializationModel, 

36 aperture_corrections_from_legacy, 

37) 

38from .fits import FitsOpaqueMetadata 

39from .psfs import ( 

40 GaussianPointSpreadFunction, 

41 GaussianPSFSerializationModel, 

42 PiffSerializationModel, 

43 PiffWrapper, 

44 PointSpreadFunction, 

45 PSFExSerializationModel, 

46 PSFExWrapper, 

47) 

48from .serialization import ArchiveReadError, InputArchive, MetadataValue, OutputArchive 

49 

50 

51def _obs_info_from_md(md: MutableMapping[str, Any], visit_info: Any = None) -> ObservationInfo: 

52 # Try to get an ObservationInfo from the primary header as if 

53 # it's a raw header. Else fallback. 

54 try: 

55 obs_info = ObservationInfo.from_header(md, quiet=True) 

56 except ValueError: 

57 # Not known translator. Must fall back to visit info. If we have 

58 # an actual VisitInfo, serialize it since we know that it will be 

59 # complete. 

60 if visit_info is not None: 

61 from lsst.afw.image import setVisitInfoMetadata 

62 from lsst.daf.base import PropertyList 

63 

64 pl = PropertyList() 

65 setVisitInfoMetadata(pl, visit_info) 

66 # Merge so that we still have access to butler provenance. 

67 md.update(pl) 

68 

69 # Try the given header looking for VisitInfo hints. 

70 # We get lots of warnings if nothing can be found. Currently 

71 # no way to disable those without capturing them. 

72 obs_info = ObservationInfo.from_header(md, translator_class=VisitInfoTranslator, quiet=True) 

73 return obs_info 

74 

75 

76def _update_obs_info_from_legacy( 

77 obs_info: ObservationInfo, detector: Any = None, filter_label: Any = None 

78) -> ObservationInfo: 

79 extra_md: dict[str, str | int] = {} 

80 

81 if filter_label is not None and filter_label.hasBandLabel(): 

82 extra_md["physical_filter"] = filter_label.physicalLabel 

83 

84 # Fill in detector metadata, check for consistency. 

85 # ObsInfo detector name and group can not be derived from 

86 # the getName() information without knowing how the components 

87 # are separated. 

88 if detector is not None: 

89 detector_md = { 

90 "detector_num": detector.getId(), 

91 "detector_serial": detector.getSerial(), 

92 "detector_unique_name": detector.getName(), 

93 } 

94 extra_md.update(detector_md) 

95 

96 obs_info_updates: dict[str, str | int] = {} 

97 for k, v in extra_md.items(): 

98 current = getattr(obs_info, k) 

99 if current is None: 

100 obs_info_updates[k] = v 

101 continue 

102 if current != v: 

103 raise RuntimeError( 

104 f"ObservationInfo contains value for '{k}' that is inconsistent " 

105 f"with given legacy object: {v} != {current}" 

106 ) 

107 

108 if obs_info_updates: 

109 obs_info = obs_info.model_copy(update=obs_info_updates) 

110 return obs_info 

111 

112 

113class VisitImage(MaskedImage): 

114 """A calibrated single-visit image. 

115 

116 Parameters 

117 ---------- 

118 image 

119 The main image plane. If this has a `Projection`, it will be used 

120 for all planes unless a ``projection`` is passed separately. 

121 mask 

122 A bitmask image that annotates the main image plane. Must have the 

123 same bounding box as ``image`` if provided. Any attached projection 

124 is replaced (possibly by `None`). 

125 variance 

126 The per-pixel uncertainty of the main image as an image of variance 

127 values. Must have the same bounding box as ``image`` if provided, and 

128 its units must be the square of ``image.unit`` or `None`. 

129 Values default to ``1.0``. Any attached projection is replaced 

130 (possibly by `None`). 

131 mask_schema 

132 Schema for the mask plane. Must be provided if and only if ``mask`` is 

133 not provided. 

134 projection 

135 Projection that maps the pixel grid to the sky. Can only be `None` if 

136 a projection is already attached to ``image``. 

137 obs_info 

138 General information about this visit in standardized form. 

139 summary_stats 

140 Summary statistics associated with this visit. Initialized to default 

141 values if not provided. 

142 psf 

143 Point-spread function model for this image, or an exception explaining 

144 why it could not be read (to be raised if the PSF is requested later). 

145 aperture_corrections : `dict` [`str`, `~fields.BaseField`] 

146 Mapping from photometry algorithm name to the aperture correction for 

147 that algorithm. 

148 metadata 

149 Arbitrary flexible metadata to associate with the image. 

150 """ 

151 

152 def __init__( 

153 self, 

154 image: Image, 

155 *, 

156 mask: Mask | None = None, 

157 variance: Image | None = None, 

158 mask_schema: MaskSchema | None = None, 

159 projection: Projection[DetectorFrame] | None = None, 

160 obs_info: ObservationInfo | None = None, 

161 summary_stats: ObservationSummaryStats | None = None, 

162 psf: PointSpreadFunction | ArchiveReadError, 

163 aperture_corrections: ApertureCorrectionMap | None = None, 

164 metadata: dict[str, MetadataValue] | None = None, 

165 ): 

166 super().__init__( 

167 image, 

168 mask=mask, 

169 variance=variance, 

170 mask_schema=mask_schema, 

171 projection=projection, 

172 obs_info=obs_info, 

173 metadata=metadata, 

174 ) 

175 if self.image.unit is None: 

176 raise TypeError("The image component of a VisitImage must have units.") 

177 if self.image.projection is None: 

178 raise TypeError("The projection component of a VisitImage cannot be None.") 

179 if self.image.obs_info is None: 

180 raise TypeError("The observation info component of a VisitImage cannot be None.") 

181 if not isinstance(self.image.projection.pixel_frame, DetectorFrame): 

182 raise TypeError("The projection's pixel frame must be a DetectorFrame for VisitImage.") 

183 if summary_stats is None: 

184 summary_stats = ObservationSummaryStats() 

185 self._summary_stats = summary_stats 

186 self._psf = psf 

187 self._aperture_corrections = aperture_corrections if aperture_corrections is not None else {} 

188 

189 @property 

190 def unit(self) -> astropy.units.UnitBase: 

191 """The units of the image plane (`astropy.units.Unit`).""" 

192 return cast(astropy.units.UnitBase, super().unit) 

193 

194 @property 

195 def projection(self) -> Projection[DetectorFrame]: 

196 """The projection that maps the pixel grid to the sky 

197 (`Projection` [`DetectorFrame`]). 

198 """ 

199 return cast(Projection[DetectorFrame], super().projection) 

200 

201 @property 

202 def obs_info(self) -> ObservationInfo: 

203 """General information about this observation in standard form. 

204 (`~astro_metadata_translator.ObservationInfo`). 

205 """ 

206 obs_info = self.image.obs_info 

207 assert obs_info is not None 

208 return obs_info 

209 

210 @property 

211 def astropy_wcs(self) -> ProjectionAstropyView: 

212 """An Astropy WCS for the pixel arrays (`ProjectionAstropyView`). 

213 

214 Notes 

215 ----- 

216 As expected for Astropy WCS objects, this defines pixel coordinates 

217 such that the first row and column in the arrays are ``(0, 0)``, not 

218 ``bbox.start``, as is the case for `projection`. 

219 

220 This object satisfies the `astropy.wcs.wcsapi.BaseHighLevelWCS` and 

221 `astropy.wcs.wcsapi.BaseLowLevelWCS` interfaces, but it is not an 

222 `astropy.wcs.WCS` (use `fits_wcs` for that). 

223 """ 

224 return cast(ProjectionAstropyView, super().astropy_wcs) 

225 

226 @property 

227 def summary_stats(self) -> ObservationSummaryStats: 

228 """Optional summary statistics for this observation 

229 (`ObservationSummaryStats`). 

230 """ 

231 return self._summary_stats 

232 

233 @property 

234 def psf(self) -> PointSpreadFunction: 

235 """The point-spread function model for this image 

236 (`.psfs.PointSpreadFunction`). 

237 """ 

238 if isinstance(self._psf, ArchiveReadError): 

239 raise self._psf 

240 return self._psf 

241 

242 @property 

243 def aperture_corrections(self) -> ApertureCorrectionMap: 

244 """A mapping from photometry algorithm name to the aperture correction 

245 field for that algorithm (`dict` [`str`, `~.fields.BaseField`]). 

246 """ 

247 return self._aperture_corrections 

248 

249 def __getitem__(self, bbox: Box | EllipsisType) -> VisitImage: 

250 if bbox is ...: 

251 return self 

252 super().__getitem__(bbox) 

253 return self._transfer_metadata( 

254 VisitImage( 

255 self.image[bbox], 

256 mask=self.mask[bbox], 

257 variance=self.variance[bbox], 

258 projection=self.projection, 

259 psf=self.psf, 

260 obs_info=self.obs_info, 

261 summary_stats=self.summary_stats, 

262 aperture_corrections=self.aperture_corrections, 

263 ), 

264 bbox=bbox, 

265 ) 

266 

267 def __str__(self) -> str: 

268 return f"VisitImage({self.image!s}, {list(self.mask.schema.names)})" 

269 

270 def __repr__(self) -> str: 

271 return f"VisitImage({self.image!r}, mask_schema={self.mask.schema!r})" 

272 

273 def copy(self) -> VisitImage: 

274 """Deep-copy the visit image.""" 

275 return self._transfer_metadata( 

276 VisitImage( 

277 image=self._image.copy(), 

278 mask=self._mask.copy(), 

279 variance=self._variance.copy(), 

280 psf=self._psf, 

281 obs_info=self.obs_info, 

282 summary_stats=self.summary_stats.model_copy(), 

283 aperture_corrections=self.aperture_corrections.copy(), 

284 ), 

285 copy=True, 

286 ) 

287 

288 def serialize(self, archive: OutputArchive[Any]) -> VisitImageSerializationModel: 

289 masked_image_model = super().serialize(archive) 

290 serialized_psf: PiffSerializationModel | PSFExSerializationModel | GaussianPSFSerializationModel 

291 match self._psf: 

292 # MyPy is able to figure things out here with this match statement, 

293 # but not a single isinstance check on both types. 

294 case PiffWrapper(): 

295 serialized_psf = archive.serialize_direct("psf", self._psf.serialize) 

296 case PSFExWrapper(): 

297 serialized_psf = archive.serialize_direct("psf", self._psf.serialize) 

298 case GaussianPointSpreadFunction(): 

299 serialized_psf = archive.serialize_direct("psf", self._psf.serialize) 

300 case _: 

301 raise TypeError( 

302 f"Cannot serialize VisitImage with unrecognized PSF type {type(self._psf).__name__}." 

303 ) 

304 assert masked_image_model.projection is not None, "VisitImage always has a projection." 

305 assert masked_image_model.obs_info is not None, "VisitImage always has observation info." 

306 serialized_aperture_corrections = ApertureCorrectionMapSerializationModel.serialize( 

307 self.aperture_corrections, archive 

308 ) 

309 return VisitImageSerializationModel( 

310 image=masked_image_model.image, 

311 mask=masked_image_model.mask, 

312 variance=masked_image_model.variance, 

313 projection=masked_image_model.projection, 

314 obs_info=masked_image_model.obs_info, 

315 psf=serialized_psf, 

316 summary_stats=self.summary_stats, 

317 aperture_corrections=serialized_aperture_corrections, 

318 metadata=self.metadata, 

319 ) 

320 

321 # Type-checkers want the model argument to only require 

322 # MaskedImageSerializationModel[Any], and they'd be absolutely right if 

323 # this were a regular instance method. But whether Liskov substitution 

324 # applies to classmethods and staticmethods is sort of context-dependent, 

325 # and here we do not want it to. 

326 @staticmethod 

327 def deserialize( 

328 model: VisitImageSerializationModel[Any], # type: ignore[override] 

329 archive: InputArchive[Any], 

330 *, 

331 bbox: Box | None = None, 

332 ) -> VisitImage: 

333 masked_image = MaskedImage.deserialize(model, archive, bbox=bbox) 

334 psf = model.deserialize_psf(archive) 

335 aperture_corrections = model.aperture_corrections.deserialize(archive) 

336 return VisitImage( 

337 masked_image.image, 

338 mask=masked_image.mask, 

339 variance=masked_image.variance, 

340 psf=psf, 

341 projection=masked_image.projection, 

342 obs_info=masked_image.obs_info, 

343 summary_stats=model.summary_stats, 

344 aperture_corrections=aperture_corrections, 

345 )._finish_deserialize(model) 

346 

347 @staticmethod 

348 def _get_archive_tree_type[P: pydantic.BaseModel]( 

349 pointer_type: type[P], 

350 ) -> type[VisitImageSerializationModel[P]]: 

351 """Return the serialization model type for this object for an archive 

352 type that uses the given pointer type. 

353 """ 

354 return VisitImageSerializationModel[pointer_type] # type: ignore 

355 

356 # write_fits and read_fits inherited from MaskedImage. 

357 

358 @staticmethod 

359 def from_legacy( 

360 legacy: Any, 

361 *, 

362 unit: astropy.units.Unit | None = None, 

363 plane_map: Mapping[str, MaskPlane] | None = None, 

364 instrument: str | None = None, 

365 visit: int | None = None, 

366 ) -> VisitImage: 

367 """Convert from an `lsst.afw.image.Exposure` instance. 

368 

369 Parameters 

370 ---------- 

371 legacy 

372 An `lsst.afw.image.Exposure` instance that will share image and 

373 variance (but not mask) pixel data with the returned object. 

374 unit 

375 Units of the image. If not provided, the ``BUNIT`` metadata 

376 key will be used, if available. 

377 plane_map 

378 A mapping from legacy mask plane name to the new plane name and 

379 description. If `None` (default) 

380 `get_legacy_visit_image_mask_planes` is used. 

381 instrument 

382 Name of the instrument. Extracted from the metadata if not 

383 provided. 

384 visit 

385 ID of the visit. Extracted from the metadata if not provided. 

386 """ 

387 if plane_map is None: 

388 plane_map = get_legacy_visit_image_mask_planes() 

389 md = legacy.getMetadata() 

390 obs_info = _obs_info_from_md(md, visit_info=legacy.info.getVisitInfo()) 

391 instrument = _extract_or_check_header( 

392 "LSST BUTLER DATAID INSTRUMENT", instrument, md, obs_info.instrument, str 

393 ) 

394 visit = _extract_or_check_header("LSST BUTLER DATAID VISIT", visit, md, None, int) 

395 unit = _extract_or_check_header( 

396 "BUNIT", unit, md, None, lambda x: astropy.units.Unit(x, format="fits") 

397 ) 

398 legacy_wcs = legacy.getWcs() 

399 if legacy_wcs is None: 

400 raise ValueError("Exposure does not have a SkyWcs.") 

401 legacy_detector = legacy.getDetector() 

402 if legacy_detector is None: 

403 raise ValueError("Exposure does not have a Detector.") 

404 detector_bbox = Box.from_legacy(legacy_detector.getBBox()) 

405 

406 # Update the ObservationInfo from other components. 

407 obs_info = _update_obs_info_from_legacy(obs_info, legacy_detector, legacy.info.getFilter()) 

408 

409 opaque_fits_metadata = FitsOpaqueMetadata() 

410 primary_header = astropy.io.fits.Header() 

411 with warnings.catch_warnings(): 

412 # Silence warnings about long keys becoming HIERARCH. 

413 warnings.simplefilter("ignore", category=astropy.io.fits.verify.VerifyWarning) 

414 primary_header.update(md.toOrderedDict()) 

415 opaque_fits_metadata.extract_legacy_primary_header(primary_header) 

416 projection = Projection.from_legacy( 

417 legacy_wcs, 

418 DetectorFrame( 

419 instrument=instrument, 

420 visit=visit, 

421 detector=legacy_detector.getId(), 

422 bbox=detector_bbox, 

423 ), 

424 ) 

425 legacy_psf = legacy.getPsf() 

426 if legacy_psf is None: 

427 raise ValueError("Exposure file does not have a Psf.") 

428 psf = PointSpreadFunction.from_legacy(legacy_psf, bounds=detector_bbox) 

429 masked_image = MaskedImage.from_legacy(legacy.getMaskedImage(), unit=unit, plane_map=plane_map) 

430 legacy_summary_stats = legacy.info.getSummaryStats() 

431 legacy_ap_corr_map = legacy.info.getApCorrMap() 

432 result = VisitImage( 

433 image=masked_image.image.view(unit=unit), 

434 mask=masked_image.mask, 

435 variance=masked_image.variance, 

436 projection=projection, 

437 psf=psf, 

438 obs_info=obs_info, 

439 summary_stats=( 

440 ObservationSummaryStats.from_legacy(legacy_summary_stats) 

441 if legacy_summary_stats is not None 

442 else None 

443 ), 

444 aperture_corrections=( 

445 aperture_corrections_from_legacy(legacy_ap_corr_map) 

446 if legacy_ap_corr_map is not None 

447 else None 

448 ), 

449 ) 

450 

451 result._opaque_metadata = opaque_fits_metadata 

452 return result 

453 

454 @overload # type: ignore[override] 

455 @staticmethod 

456 def read_legacy( 456 ↛ exitline 456 didn't return from function 'read_legacy' because

457 filename: str, 

458 *, 

459 component: Literal["bbox"], 

460 ) -> Box: ... 

461 

462 @overload 

463 @staticmethod 

464 def read_legacy( 464 ↛ exitline 464 didn't return from function 'read_legacy' because

465 filename: str, 

466 *, 

467 preserve_quantization: bool = False, 

468 instrument: str | None = None, 

469 visit: int | None = None, 

470 component: Literal["image"], 

471 ) -> Image: ... 

472 

473 @overload 

474 @staticmethod 

475 def read_legacy( 475 ↛ exitline 475 didn't return from function 'read_legacy' because

476 filename: str, 

477 *, 

478 plane_map: Mapping[str, MaskPlane] | None = None, 

479 instrument: str | None = None, 

480 visit: int | None = None, 

481 component: Literal["mask"], 

482 ) -> Mask: ... 

483 

484 @overload 

485 @staticmethod 

486 def read_legacy( 486 ↛ exitline 486 didn't return from function 'read_legacy' because

487 filename: str, 

488 *, 

489 preserve_quantization: bool = False, 

490 instrument: str | None = None, 

491 visit: int | None = None, 

492 component: Literal["variance"], 

493 ) -> Image: ... 

494 

495 @overload 

496 @staticmethod 

497 def read_legacy( 497 ↛ exitline 497 didn't return from function 'read_legacy' because

498 filename: str, 

499 *, 

500 instrument: str | None = None, 

501 visit: int | None = None, 

502 component: Literal["projection"], 

503 ) -> Projection[DetectorFrame]: ... 

504 

505 @overload 

506 @staticmethod 

507 def read_legacy( 507 ↛ exitline 507 didn't return from function 'read_legacy' because

508 filename: str, 

509 *, 

510 component: Literal["psf"], 

511 ) -> PointSpreadFunction: ... 

512 

513 @overload 

514 @staticmethod 

515 def read_legacy( 515 ↛ exitline 515 didn't return from function 'read_legacy' because

516 filename: str, 

517 *, 

518 component: Literal["obs_info"], 

519 ) -> ObservationInfo: ... 

520 

521 @overload 

522 @staticmethod 

523 def read_legacy( 523 ↛ exitline 523 didn't return from function 'read_legacy' because

524 filename: str, 

525 *, 

526 component: Literal["summary_stats"], 

527 ) -> ObservationSummaryStats: ... 

528 

529 @overload 

530 @staticmethod 

531 def read_legacy( 531 ↛ exitline 531 didn't return from function 'read_legacy' because

532 filename: str, 

533 *, 

534 component: Literal["aperture_corrections"], 

535 ) -> ApertureCorrectionMap: ... 

536 

537 @overload 

538 @staticmethod 

539 def read_legacy( 539 ↛ exitline 539 didn't return from function 'read_legacy' because

540 filename: str, 

541 *, 

542 preserve_quantization: bool = False, 

543 plane_map: Mapping[str, MaskPlane] | None = None, 

544 instrument: str | None = None, 

545 visit: int | None = None, 

546 component: None = None, 

547 ) -> VisitImage: ... 

548 

549 @staticmethod 

550 def read_legacy( # type: ignore[override] 

551 filename: str, 

552 *, 

553 preserve_quantization: bool = False, 

554 plane_map: Mapping[str, MaskPlane] | None = None, 

555 instrument: str | None = None, 

556 visit: int | None = None, 

557 component: Literal[ 

558 "bbox", 

559 "image", 

560 "mask", 

561 "variance", 

562 "projection", 

563 "psf", 

564 "obs_info", 

565 "summary_stats", 

566 "aperture_corrections", 

567 ] 

568 | None = None, 

569 ) -> Any: 

570 """Read a FITS file written by `lsst.afw.image.Exposure.writeFits`. 

571 

572 Parameters 

573 ---------- 

574 filename 

575 Full name of the file. 

576 preserve_quantization 

577 If `True`, ensure that writing the masked image back out again will 

578 exactly preserve quantization-compressed pixel values. This causes 

579 the image and variance plane arrays to be marked as read-only and 

580 stores the original binary table data for those planes in memory. 

581 If the `MaskedImage` is copied, the precompressed pixel values are 

582 not transferred to the copy. 

583 plane_map 

584 A mapping from legacy mask plane name to the new plane name and 

585 description. If `None` (default) 

586 `get_legacy_visit_image_mask_planes` is used. 

587 instrument 

588 Name of the instrument. Read from the primary header if not 

589 provided. 

590 visit 

591 ID of the visit. Read from the primary header if not 

592 provided. 

593 component 

594 A component to read instead of the full image. 

595 """ 

596 from lsst.afw.image import ExposureFitsReader 

597 

598 reader = ExposureFitsReader(filename) 

599 if component == "bbox": 

600 return Box.from_legacy(reader.readBBox()) 

601 legacy_detector = reader.readDetector() 

602 if legacy_detector is None: 

603 raise ValueError(f"Exposure file {filename!r} does not have a Detector.") 

604 detector_bbox = Box.from_legacy(legacy_detector.getBBox()) 

605 legacy_wcs = None 

606 if component in (None, "image", "mask", "variance", "projection"): 

607 legacy_wcs = reader.readWcs() 

608 if legacy_wcs is None: 

609 raise ValueError(f"Exposure file {filename!r} does not have a SkyWcs.") 

610 legacy_exposure_info = reader.readExposureInfo() 

611 summary_stats = None 

612 if component in (None, "summary_stats"): 

613 legacy_stats = legacy_exposure_info.getSummaryStats() 

614 if legacy_stats is not None: 

615 summary_stats = ObservationSummaryStats.from_legacy(legacy_stats) 

616 if component == "summary_stats": 

617 return summary_stats 

618 if component in (None, "psf"): 

619 legacy_psf = reader.readPsf() 

620 if legacy_psf is None: 

621 raise ValueError(f"Exposure file {filename!r} does not have a Psf.") 

622 psf = PointSpreadFunction.from_legacy(legacy_psf, bounds=detector_bbox) 

623 if component == "psf": 

624 return psf 

625 aperture_corrections: ApertureCorrectionMap = {} 

626 if component in (None, "aperture_corrections"): 

627 legacy_ap_corr_map = reader.readApCorrMap() 

628 if legacy_ap_corr_map is not None: 

629 aperture_corrections = aperture_corrections_from_legacy(legacy_ap_corr_map) 

630 if component == "aperture_corrections": 

631 return aperture_corrections 

632 assert component in (None, "image", "mask", "variance", "projection", "obs_info"), ( 

633 component 

634 ) # for MyPy 

635 with astropy.io.fits.open(filename) as hdu_list: 

636 primary_header = hdu_list[0].header 

637 obs_info = _obs_info_from_md(primary_header) 

638 obs_info = _update_obs_info_from_legacy(obs_info, legacy_detector, reader.readFilter()) 

639 if component == "obs_info": 

640 return obs_info 

641 instrument = _extract_or_check_header( 

642 "LSST BUTLER DATAID INSTRUMENT", instrument, primary_header, obs_info.instrument, str 

643 ) 

644 visit = _extract_or_check_header("LSST BUTLER DATAID VISIT", visit, primary_header, None, int) 

645 projection = Projection.from_legacy( 

646 legacy_wcs, 

647 DetectorFrame( 

648 instrument=instrument, 

649 visit=visit, 

650 detector=legacy_detector.getId(), 

651 bbox=detector_bbox, 

652 ), 

653 ) 

654 if component == "projection": 

655 return projection 

656 if plane_map is None: 

657 plane_map = get_legacy_visit_image_mask_planes() 

658 assert component != "psf", component # for MyPy 

659 from_masked_image = MaskedImage._read_legacy_hdus( 

660 hdu_list, 

661 filename, 

662 preserve_quantization=preserve_quantization, 

663 plane_map=plane_map, 

664 component=component, 

665 ) 

666 if component is not None: 

667 # This is the image, mask, or variance; attach the projection and 

668 # obs_info and return 

669 return from_masked_image.view(projection=projection, obs_info=obs_info) 

670 result = VisitImage( 

671 from_masked_image.image, 

672 mask=from_masked_image.mask, 

673 variance=from_masked_image.variance, 

674 projection=projection, 

675 psf=psf, 

676 obs_info=obs_info, 

677 summary_stats=summary_stats, 

678 aperture_corrections=aperture_corrections, 

679 ) 

680 result._opaque_metadata = from_masked_image._opaque_metadata 

681 return result 

682 

683 

684class VisitImageSerializationModel[P: pydantic.BaseModel](MaskedImageSerializationModel[P]): 

685 """A Pydantic model used to represent a serialized `VisitImage`.""" 

686 

687 # Inherited attributes are duplicated because that improves the docs 

688 # (some limitation in the sphinx/pydantic integration), and these are 

689 # important docs. 

690 

691 image: ImageSerializationModel[P] = pydantic.Field(description="The main data image.") 

692 mask: MaskSerializationModel[P] = pydantic.Field( 

693 description="Bitmask that annotates the main image's pixels." 

694 ) 

695 variance: ImageSerializationModel[P] = pydantic.Field( 

696 description="Per-pixel variance estimates for the main image." 

697 ) 

698 projection: ProjectionSerializationModel[P] = pydantic.Field( 

699 description="Projection that maps the pixel grid to the sky.", 

700 ) 

701 psf: PiffSerializationModel | PSFExSerializationModel | GaussianPSFSerializationModel | Any = ( 

702 pydantic.Field(union_mode="left_to_right", description="PSF model for the image.") 

703 ) 

704 obs_info: ObservationInfo = pydantic.Field( 

705 description="Standardized description of visit metadata", 

706 ) 

707 summary_stats: ObservationSummaryStats = pydantic.Field( 

708 description="Summary statistics for the observation." 

709 ) 

710 aperture_corrections: ApertureCorrectionMapSerializationModel = pydantic.Field( 

711 default_factory=ApertureCorrectionMapSerializationModel, 

712 description="Aperture corrections, keyed by flux algorithm.", 

713 ) 

714 

715 def deserialize_psf(self, archive: InputArchive[Any]) -> PointSpreadFunction | ArchiveReadError: 

716 """Finish deserializing the PSF model, or *return* any exception 

717 raised in the attempt. 

718 """ 

719 try: 

720 match self.psf: 

721 case PiffSerializationModel(): 

722 return PiffWrapper.deserialize(self.psf, archive) 

723 case PSFExSerializationModel(): 

724 return PSFExWrapper.deserialize(self.psf, archive) 

725 case GaussianPSFSerializationModel(): 

726 return GaussianPointSpreadFunction.deserialize(self.psf, archive) 

727 case _: 

728 raise ArchiveReadError("PSF model type not recognized.") 

729 except ArchiveReadError as err: 

730 return err 

731 

732 

733def _extract_or_check_value[T]( 

734 key: str, 

735 given_value: T | None, 

736 *sources: tuple[str, T | None], 

737) -> T: 

738 # Compare given value against multiple sources. If given value is not 

739 # supplied return the first non-None value in the reference sources. 

740 if given_value is not None: 

741 for source_name, source_value in sources: 

742 if source_value is not None and source_value != given_value: 

743 raise ValueError( 

744 f"Given value {given_value!r} does not match {source_value!r} from {source_name}." 

745 ) 

746 if source_value is not None: 

747 # Only check the first non-None source rather than checking 

748 # all supplied values. 

749 break 

750 return given_value 

751 

752 for _, source_value in sources: 

753 if source_value is not None: 

754 return source_value 

755 

756 raise ValueError(f"No value found for {key}.") 

757 

758 

759def _extract_or_check_header[T]( 

760 key: str, given_value: T | None, header: Any, obs_info_value: T | None, coerce: Callable[[Any], T] 

761) -> T: 

762 hdr_value: T | None = None 

763 if (hdr_raw_value := header.get(key)) is not None: 

764 hdr_value = coerce(hdr_raw_value) 

765 return _extract_or_check_value( 

766 key, given_value, ("ObservationInfo", obs_info_value), (f"header key {key}", hdr_value) 

767 )