Coverage for python / lsst / images / _visit_image.py: 26%

267 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-25 08:35 +0000

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14__all__ = ("VisitImage", "VisitImageSerializationModel") 

15 

16import warnings 

17from collections.abc import Callable, Mapping, MutableMapping 

18from types import EllipsisType 

19from typing import Any, Literal, cast, overload 

20 

21import astropy.io.fits 

22import astropy.units 

23import astropy.wcs 

24import pydantic 

25from astro_metadata_translator import ObservationInfo, VisitInfoTranslator 

26 

27from ._geom import Box 

28from ._image import Image, ImageSerializationModel 

29from ._mask import Mask, MaskPlane, MaskSchema, MaskSerializationModel, get_legacy_visit_image_mask_planes 

30from ._masked_image import MaskedImage, MaskedImageSerializationModel 

31from ._observation_summary_stats import ObservationSummaryStats 

32from ._transforms import DetectorFrame, Projection, ProjectionAstropyView, ProjectionSerializationModel 

33from .fits import FitsOpaqueMetadata 

34from .psfs import ( 

35 GaussianPointSpreadFunction, 

36 GaussianPSFSerializationModel, 

37 PiffSerializationModel, 

38 PiffWrapper, 

39 PointSpreadFunction, 

40 PSFExSerializationModel, 

41 PSFExWrapper, 

42) 

43from .serialization import ArchiveReadError, InputArchive, MetadataValue, OutputArchive 

44 

45 

46def _obs_info_from_md(md: MutableMapping[str, Any], visit_info: Any = None) -> ObservationInfo: 

47 # Try to get an ObservationInfo from the primary header as if 

48 # it's a raw header. Else fallback. 

49 try: 

50 obs_info = ObservationInfo.from_header(md, quiet=True) 

51 except ValueError: 

52 # Not known translator. Must fall back to visit info. If we have 

53 # an actual VisitInfo, serialize it since we know that it will be 

54 # complete. 

55 if visit_info is not None: 

56 from lsst.afw.image import setVisitInfoMetadata 

57 from lsst.daf.base import PropertyList 

58 

59 pl = PropertyList() 

60 setVisitInfoMetadata(pl, visit_info) 

61 # Merge so that we still have access to butler provenance. 

62 md.update(pl) 

63 

64 # Try the given header looking for VisitInfo hints. 

65 # We get lots of warnings if nothing can be found. Currently 

66 # no way to disable those without capturing them. 

67 obs_info = ObservationInfo.from_header(md, translator_class=VisitInfoTranslator, quiet=True) 

68 return obs_info 

69 

70 

71def _update_obs_info_from_legacy( 

72 obs_info: ObservationInfo, detector: Any = None, filter_label: Any = None 

73) -> ObservationInfo: 

74 extra_md: dict[str, str | int] = {} 

75 

76 if filter_label is not None and filter_label.hasBandLabel(): 

77 extra_md["physical_filter"] = filter_label.physicalLabel 

78 

79 # Fill in detector metadata, check for consistency. 

80 # ObsInfo detector name and group can not be derived from 

81 # the getName() information without knowing how the components 

82 # are separated. 

83 if detector is not None: 

84 detector_md = { 

85 "detector_num": detector.getId(), 

86 "detector_serial": detector.getSerial(), 

87 "detector_unique_name": detector.getName(), 

88 } 

89 extra_md.update(detector_md) 

90 

91 obs_info_updates: dict[str, str | int] = {} 

92 for k, v in extra_md.items(): 

93 current = getattr(obs_info, k) 

94 if current is None: 

95 obs_info_updates[k] = v 

96 continue 

97 if current != v: 

98 raise RuntimeError( 

99 f"ObservationInfo contains value for '{k}' that is inconsistent " 

100 f"with given legacy object: {v} != {current}" 

101 ) 

102 

103 if obs_info_updates: 

104 obs_info = obs_info.model_copy(update=obs_info_updates) 

105 return obs_info 

106 

107 

108class VisitImage(MaskedImage): 

109 """A calibrated single-visit image. 

110 

111 Parameters 

112 ---------- 

113 image 

114 The main image plane. If this has a `Projection`, it will be used 

115 for all planes unless a ``projection`` is passed separately. 

116 mask 

117 A bitmask image that annotates the main image plane. Must have the 

118 same bounding box as ``image`` if provided. Any attached projection 

119 is replaced (possibly by `None`). 

120 variance 

121 The per-pixel uncertainty of the main image as an image of variance 

122 values. Must have the same bounding box as ``image`` if provided, and 

123 its units must be the square of ``image.unit`` or `None`. 

124 Values default to ``1.0``. Any attached projection is replaced 

125 (possibly by `None`). 

126 mask_schema 

127 Schema for the mask plane. Must be provided if and only if ``mask`` is 

128 not provided. 

129 projection 

130 Projection that maps the pixel grid to the sky. Can only be `None` if 

131 a projection is already attached to ``image``. 

132 obs_info 

133 General information about this visit in standardized form. 

134 summary_stats 

135 Optional summary statistics associated with this visit. 

136 psf 

137 Point-spread function model for this image, or an exception explaining 

138 why it could not be read (to be raised if the PSF is requested later). 

139 metadata 

140 Arbitrary flexible metadata to associate with the image. 

141 """ 

142 

143 def __init__( 

144 self, 

145 image: Image, 

146 *, 

147 mask: Mask | None = None, 

148 variance: Image | None = None, 

149 mask_schema: MaskSchema | None = None, 

150 projection: Projection[DetectorFrame] | None = None, 

151 obs_info: ObservationInfo | None = None, 

152 summary_stats: ObservationSummaryStats | None = None, 

153 psf: PointSpreadFunction | ArchiveReadError, 

154 metadata: dict[str, MetadataValue] | None = None, 

155 ): 

156 super().__init__( 

157 image, 

158 mask=mask, 

159 variance=variance, 

160 mask_schema=mask_schema, 

161 projection=projection, 

162 obs_info=obs_info, 

163 metadata=metadata, 

164 ) 

165 if self.image.unit is None: 

166 raise TypeError("The image component of a VisitImage must have units.") 

167 if self.image.projection is None: 

168 raise TypeError("The projection component of a VisitImage cannot be None.") 

169 if self.image.obs_info is None: 

170 raise TypeError("The observation info component of a VisitImage cannot be None.") 

171 if not isinstance(self.image.projection.pixel_frame, DetectorFrame): 

172 raise TypeError("The projection's pixel frame must be a DetectorFrame for VisitImage.") 

173 self._summary_stats = summary_stats 

174 self._psf = psf 

175 

176 @property 

177 def unit(self) -> astropy.units.UnitBase: 

178 """The units of the image plane (`astropy.units.Unit`).""" 

179 return cast(astropy.units.UnitBase, super().unit) 

180 

181 @property 

182 def projection(self) -> Projection[DetectorFrame]: 

183 """The projection that maps the pixel grid to the sky 

184 (`Projection` [`DetectorFrame`]). 

185 """ 

186 return cast(Projection[DetectorFrame], super().projection) 

187 

188 @property 

189 def obs_info(self) -> ObservationInfo: 

190 """General information about this observation in standard form. 

191 (`~astro_metadata_translator.ObservationInfo`). 

192 """ 

193 obs_info = self.image.obs_info 

194 assert obs_info is not None 

195 return obs_info 

196 

197 @property 

198 def astropy_wcs(self) -> ProjectionAstropyView: 

199 """An Astropy WCS for the pixel arrays (`ProjectionAstropyView`). 

200 

201 Notes 

202 ----- 

203 As expected for Astropy WCS objects, this defines pixel coordinates 

204 such that the first row and column in the arrays are ``(0, 0)``, not 

205 ``bbox.start``, as is the case for `projection`. 

206 

207 This object satisfies the `astropy.wcs.wcsapi.BaseHighLevelWCS` and 

208 `astropy.wcs.wcsapi.BaseLowLevelWCS` interfaces, but it is not an 

209 `astropy.wcs.WCS` (use `fits_wcs` for that). 

210 """ 

211 return cast(ProjectionAstropyView, super().astropy_wcs) 

212 

213 @property 

214 def summary_stats(self) -> ObservationSummaryStats | None: 

215 """Optional summary statistics for this observation 

216 (`ObservationSummaryStats`). 

217 """ 

218 return self._summary_stats 

219 

220 @property 

221 def psf(self) -> PointSpreadFunction: 

222 """The point-spread function model for this image 

223 (`.psfs.PointSpreadFunction`). 

224 """ 

225 if isinstance(self._psf, ArchiveReadError): 

226 raise self._psf 

227 return self._psf 

228 

229 def __getitem__(self, bbox: Box | EllipsisType) -> VisitImage: 

230 if bbox is ...: 

231 return self 

232 super().__getitem__(bbox) 

233 return self._transfer_metadata( 

234 VisitImage( 

235 self.image[bbox], 

236 mask=self.mask[bbox], 

237 variance=self.variance[bbox], 

238 projection=self.projection, 

239 psf=self.psf, 

240 obs_info=self.obs_info, 

241 summary_stats=self.summary_stats, 

242 ), 

243 bbox=bbox, 

244 ) 

245 

246 def __str__(self) -> str: 

247 return f"VisitImage({self.image!s}, {list(self.mask.schema.names)})" 

248 

249 def __repr__(self) -> str: 

250 return f"VisitImage({self.image!r}, mask_schema={self.mask.schema!r})" 

251 

252 def copy(self) -> VisitImage: 

253 """Deep-copy the visit image.""" 

254 return self._transfer_metadata( 

255 VisitImage( 

256 image=self._image.copy(), 

257 mask=self._mask.copy(), 

258 variance=self._variance.copy(), 

259 psf=self._psf, 

260 obs_info=self.obs_info, 

261 summary_stats=self.summary_stats, 

262 ), 

263 copy=True, 

264 ) 

265 

266 def serialize(self, archive: OutputArchive[Any]) -> VisitImageSerializationModel: 

267 masked_image_model = super().serialize(archive) 

268 serialized_psf: PiffSerializationModel | PSFExSerializationModel | GaussianPSFSerializationModel 

269 match self._psf: 

270 # MyPy is able to figure things out here with this match statement, 

271 # but not a single isinstance check on both types. 

272 case PiffWrapper(): 

273 serialized_psf = archive.serialize_direct("psf", self._psf.serialize) 

274 case PSFExWrapper(): 

275 serialized_psf = archive.serialize_direct("psf", self._psf.serialize) 

276 case GaussianPointSpreadFunction(): 

277 serialized_psf = archive.serialize_direct("psf", self._psf.serialize) 

278 case _: 

279 raise TypeError( 

280 f"Cannot serialize VisitImage with unrecognized PSF type {type(self._psf).__name__}." 

281 ) 

282 assert masked_image_model.projection is not None, "VisitImage always has a projection." 

283 assert masked_image_model.obs_info is not None, "VisitImage always has observation info." 

284 return VisitImageSerializationModel( 

285 image=masked_image_model.image, 

286 mask=masked_image_model.mask, 

287 variance=masked_image_model.variance, 

288 projection=masked_image_model.projection, 

289 obs_info=masked_image_model.obs_info, 

290 psf=serialized_psf, 

291 summary_stats=self.summary_stats, 

292 metadata=self.metadata, 

293 ) 

294 

295 # Type-checkers want the model argument to only require 

296 # MaskedImageSerializationModel[Any], and they'd be absolutely right if 

297 # this were a regular instance method. But whether Liskov substitution 

298 # applies to classmethods and staticmethods is sort of context-dependent, 

299 # and here we do not want it to. 

300 @staticmethod 

301 def deserialize( 

302 model: VisitImageSerializationModel[Any], # type: ignore[override] 

303 archive: InputArchive[Any], 

304 *, 

305 bbox: Box | None = None, 

306 ) -> VisitImage: 

307 masked_image = MaskedImage.deserialize(model, archive, bbox=bbox) 

308 psf = model.deserialize_psf(archive) 

309 return VisitImage( 

310 masked_image.image, 

311 mask=masked_image.mask, 

312 variance=masked_image.variance, 

313 psf=psf, 

314 projection=masked_image.projection, 

315 obs_info=masked_image.obs_info, 

316 summary_stats=model.summary_stats, 

317 )._finish_deserialize(model) 

318 

319 @staticmethod 

320 def _get_archive_tree_type[P: pydantic.BaseModel]( 

321 pointer_type: type[P], 

322 ) -> type[VisitImageSerializationModel[P]]: 

323 """Return the serialization model type for this object for an archive 

324 type that uses the given pointer type. 

325 """ 

326 return VisitImageSerializationModel[pointer_type] # type: ignore 

327 

328 # write_fits and read_fits inherited from MaskedImage. 

329 

330 @staticmethod 

331 def from_legacy( 

332 legacy: Any, 

333 *, 

334 unit: astropy.units.Unit | None = None, 

335 plane_map: Mapping[str, MaskPlane] | None = None, 

336 instrument: str | None = None, 

337 visit: int | None = None, 

338 ) -> VisitImage: 

339 """Convert from an `lsst.afw.image.Exposure` instance. 

340 

341 Parameters 

342 ---------- 

343 legacy 

344 An `lsst.afw.image.Exposure` instance that will share image and 

345 variance (but not mask) pixel data with the returned object. 

346 unit 

347 Units of the image. If not provided, the ``BUNIT`` metadata 

348 key will be used, if available. 

349 plane_map 

350 A mapping from legacy mask plane name to the new plane name and 

351 description. If `None` (default) 

352 `get_legacy_visit_image_mask_planes` is used. 

353 instrument 

354 Name of the instrument. Extracted from the metadata if not 

355 provided. 

356 visit 

357 ID of the visit. Extracted from the metadata if not provided. 

358 """ 

359 if plane_map is None: 

360 plane_map = get_legacy_visit_image_mask_planes() 

361 md = legacy.getMetadata() 

362 obs_info = _obs_info_from_md(md, visit_info=legacy.info.getVisitInfo()) 

363 instrument = _extract_or_check_header( 

364 "LSST BUTLER DATAID INSTRUMENT", instrument, md, obs_info.instrument, str 

365 ) 

366 visit = _extract_or_check_header("LSST BUTLER DATAID VISIT", visit, md, None, int) 

367 unit = _extract_or_check_header( 

368 "BUNIT", unit, md, None, lambda x: astropy.units.Unit(x, format="fits") 

369 ) 

370 legacy_wcs = legacy.getWcs() 

371 if legacy_wcs is None: 

372 raise ValueError("Exposure does not have a SkyWcs.") 

373 legacy_detector = legacy.getDetector() 

374 if legacy_detector is None: 

375 raise ValueError("Exposure does not have a Detector.") 

376 detector_bbox = Box.from_legacy(legacy_detector.getBBox()) 

377 

378 # Update the ObservationInfo from other components. 

379 obs_info = _update_obs_info_from_legacy(obs_info, legacy_detector, legacy.info.getFilter()) 

380 

381 opaque_fits_metadata = FitsOpaqueMetadata() 

382 primary_header = astropy.io.fits.Header() 

383 with warnings.catch_warnings(): 

384 # Silence warnings about long keys becoming HIERARCH. 

385 warnings.simplefilter("ignore", category=astropy.io.fits.verify.VerifyWarning) 

386 primary_header.update(md.toOrderedDict()) 

387 opaque_fits_metadata.extract_legacy_primary_header(primary_header) 

388 projection = Projection.from_legacy( 

389 legacy_wcs, 

390 DetectorFrame( 

391 instrument=instrument, 

392 visit=visit, 

393 detector=legacy_detector.getId(), 

394 bbox=detector_bbox, 

395 ), 

396 ) 

397 legacy_psf = legacy.getPsf() 

398 if legacy_psf is None: 

399 raise ValueError("Exposure file does not have a Psf.") 

400 psf = PointSpreadFunction.from_legacy(legacy_psf, bounds=detector_bbox) 

401 masked_image = MaskedImage.from_legacy(legacy.getMaskedImage(), unit=unit, plane_map=plane_map) 

402 legacy_summary_stats = legacy.info.getSummaryStats() 

403 result = VisitImage( 

404 image=masked_image.image.view(unit=unit), 

405 mask=masked_image.mask, 

406 variance=masked_image.variance, 

407 projection=projection, 

408 psf=psf, 

409 obs_info=obs_info, 

410 summary_stats=( 

411 ObservationSummaryStats.from_legacy(legacy_summary_stats) 

412 if legacy_summary_stats is not None 

413 else None 

414 ), 

415 ) 

416 

417 result._opaque_metadata = opaque_fits_metadata 

418 return result 

419 

420 @overload # type: ignore[override] 

421 @staticmethod 

422 def read_legacy( 422 ↛ exitline 422 didn't return from function 'read_legacy' because

423 filename: str, 

424 *, 

425 component: Literal["bbox"], 

426 ) -> Box: ... 

427 

428 @overload 

429 @staticmethod 

430 def read_legacy( 430 ↛ exitline 430 didn't return from function 'read_legacy' because

431 filename: str, 

432 *, 

433 preserve_quantization: bool = False, 

434 instrument: str | None = None, 

435 visit: int | None = None, 

436 component: Literal["image"], 

437 ) -> Image: ... 

438 

439 @overload 

440 @staticmethod 

441 def read_legacy( 441 ↛ exitline 441 didn't return from function 'read_legacy' because

442 filename: str, 

443 *, 

444 plane_map: Mapping[str, MaskPlane] | None = None, 

445 instrument: str | None = None, 

446 visit: int | None = None, 

447 component: Literal["mask"], 

448 ) -> Mask: ... 

449 

450 @overload 

451 @staticmethod 

452 def read_legacy( 452 ↛ exitline 452 didn't return from function 'read_legacy' because

453 filename: str, 

454 *, 

455 preserve_quantization: bool = False, 

456 instrument: str | None = None, 

457 visit: int | None = None, 

458 component: Literal["variance"], 

459 ) -> Image: ... 

460 

461 @overload 

462 @staticmethod 

463 def read_legacy( 463 ↛ exitline 463 didn't return from function 'read_legacy' because

464 filename: str, 

465 *, 

466 instrument: str | None = None, 

467 visit: int | None = None, 

468 component: Literal["projection"], 

469 ) -> Projection[DetectorFrame]: ... 

470 

471 @overload 

472 @staticmethod 

473 def read_legacy( 473 ↛ exitline 473 didn't return from function 'read_legacy' because

474 filename: str, 

475 *, 

476 component: Literal["psf"], 

477 ) -> PointSpreadFunction: ... 

478 

479 @overload 

480 @staticmethod 

481 def read_legacy( 481 ↛ exitline 481 didn't return from function 'read_legacy' because

482 filename: str, 

483 *, 

484 component: Literal["obs_info"], 

485 ) -> ObservationInfo: ... 

486 

487 @overload 

488 @staticmethod 

489 def read_legacy( 489 ↛ exitline 489 didn't return from function 'read_legacy' because

490 filename: str, 

491 *, 

492 component: Literal["summary_stats"], 

493 ) -> ObservationSummaryStats: ... 

494 

495 @overload 

496 @staticmethod 

497 def read_legacy( 497 ↛ exitline 497 didn't return from function 'read_legacy' because

498 filename: str, 

499 *, 

500 preserve_quantization: bool = False, 

501 plane_map: Mapping[str, MaskPlane] | None = None, 

502 instrument: str | None = None, 

503 visit: int | None = None, 

504 component: None = None, 

505 ) -> VisitImage: ... 

506 

507 @staticmethod 

508 def read_legacy( # type: ignore[override] 

509 filename: str, 

510 *, 

511 preserve_quantization: bool = False, 

512 plane_map: Mapping[str, MaskPlane] | None = None, 

513 instrument: str | None = None, 

514 visit: int | None = None, 

515 component: Literal[ 

516 "bbox", "image", "mask", "variance", "projection", "psf", "obs_info", "summary_stats" 

517 ] 

518 | None = None, 

519 ) -> Any: 

520 """Read a FITS file written by `lsst.afw.image.Exposure.writeFits`. 

521 

522 Parameters 

523 ---------- 

524 filename 

525 Full name of the file. 

526 preserve_quantization 

527 If `True`, ensure that writing the masked image back out again will 

528 exactly preserve quantization-compressed pixel values. This causes 

529 the image and variance plane arrays to be marked as read-only and 

530 stores the original binary table data for those planes in memory. 

531 If the `MaskedImage` is copied, the precompressed pixel values are 

532 not transferred to the copy. 

533 plane_map 

534 A mapping from legacy mask plane name to the new plane name and 

535 description. If `None` (default) 

536 `get_legacy_visit_image_mask_planes` is used. 

537 instrument 

538 Name of the instrument. Read from the primary header if not 

539 provided. 

540 visit 

541 ID of the visit. Read from the primary header if not 

542 provided. 

543 component 

544 A component to read instead of the full image. 

545 """ 

546 from lsst.afw.image import ExposureFitsReader 

547 

548 reader = ExposureFitsReader(filename) 

549 if component == "bbox": 

550 return Box.from_legacy(reader.readBBox()) 

551 legacy_detector = reader.readDetector() 

552 if legacy_detector is None: 

553 raise ValueError(f"Exposure file {filename!r} does not have a Detector.") 

554 detector_bbox = Box.from_legacy(legacy_detector.getBBox()) 

555 legacy_wcs = None 

556 if component in (None, "image", "mask", "variance", "projection"): 

557 legacy_wcs = reader.readWcs() 

558 if legacy_wcs is None: 

559 raise ValueError(f"Exposure file {filename!r} does not have a SkyWcs.") 

560 legacy_exposure_info = reader.readExposureInfo() 

561 summary_stats = None 

562 if component in (None, "summary_stats"): 

563 legacy_stats = legacy_exposure_info.getSummaryStats() 

564 if legacy_stats is not None: 

565 summary_stats = ObservationSummaryStats.from_legacy(legacy_stats) 

566 if component == "summary_stats": 

567 return summary_stats 

568 if component in (None, "psf"): 

569 legacy_psf = reader.readPsf() 

570 if legacy_psf is None: 

571 raise ValueError(f"Exposure file {filename!r} does not have a Psf.") 

572 psf = PointSpreadFunction.from_legacy(legacy_psf, bounds=detector_bbox) 

573 if component == "psf": 

574 return psf 

575 assert component in (None, "image", "mask", "variance", "projection", "obs_info"), ( 

576 component 

577 ) # for MyPy 

578 with astropy.io.fits.open(filename) as hdu_list: 

579 primary_header = hdu_list[0].header 

580 obs_info = _obs_info_from_md(primary_header) 

581 obs_info = _update_obs_info_from_legacy(obs_info, legacy_detector, reader.readFilter()) 

582 if component == "obs_info": 

583 return obs_info 

584 instrument = _extract_or_check_header( 

585 "LSST BUTLER DATAID INSTRUMENT", instrument, primary_header, obs_info.instrument, str 

586 ) 

587 visit = _extract_or_check_header("LSST BUTLER DATAID VISIT", visit, primary_header, None, int) 

588 projection = Projection.from_legacy( 

589 legacy_wcs, 

590 DetectorFrame( 

591 instrument=instrument, 

592 visit=visit, 

593 detector=legacy_detector.getId(), 

594 bbox=detector_bbox, 

595 ), 

596 ) 

597 if component == "projection": 

598 return projection 

599 if plane_map is None: 

600 plane_map = get_legacy_visit_image_mask_planes() 

601 assert component != "psf", component # for MyPy 

602 from_masked_image = MaskedImage._read_legacy_hdus( 

603 hdu_list, 

604 filename, 

605 preserve_quantization=preserve_quantization, 

606 plane_map=plane_map, 

607 component=component, 

608 ) 

609 if component is not None: 

610 # This is the image, mask, or variance; attach the projection 

611 # and return 

612 return from_masked_image.view(projection=projection) 

613 result = VisitImage( 

614 from_masked_image.image, 

615 mask=from_masked_image.mask, 

616 variance=from_masked_image.variance, 

617 projection=projection, 

618 psf=psf, 

619 obs_info=obs_info, 

620 summary_stats=summary_stats, 

621 ) 

622 result._opaque_metadata = from_masked_image._opaque_metadata 

623 return result 

624 

625 

626class VisitImageSerializationModel[P: pydantic.BaseModel](MaskedImageSerializationModel[P]): 

627 """A Pydantic model used to represent a serialized `VisitImage`.""" 

628 

629 # Inherited attributes are duplicated because that improves the docs 

630 # (some limitation in the sphinx/pydantic integration), and these are 

631 # important docs. 

632 

633 image: ImageSerializationModel[P] = pydantic.Field(description="The main data image.") 

634 mask: MaskSerializationModel[P] = pydantic.Field( 

635 description="Bitmask that annotates the main image's pixels." 

636 ) 

637 variance: ImageSerializationModel[P] = pydantic.Field( 

638 description="Per-pixel variance estimates for the main image." 

639 ) 

640 projection: ProjectionSerializationModel[P] = pydantic.Field( 

641 description="Projection that maps the pixel grid to the sky.", 

642 ) 

643 psf: PiffSerializationModel | PSFExSerializationModel | GaussianPSFSerializationModel | Any = ( 

644 pydantic.Field(union_mode="left_to_right", description="PSF model for the image.") 

645 ) 

646 obs_info: ObservationInfo = pydantic.Field( 

647 description="Standardized description of visit metadata", 

648 ) 

649 summary_stats: ObservationSummaryStats | None = pydantic.Field( 

650 default=None, description="Optional summary statistics for the observation." 

651 ) 

652 

653 def deserialize_psf(self, archive: InputArchive[Any]) -> PointSpreadFunction | ArchiveReadError: 

654 """Finish deserializing the PSF model, or *return* any exception 

655 raised in the attempt. 

656 """ 

657 try: 

658 match self.psf: 

659 case PiffSerializationModel(): 

660 return PiffWrapper.deserialize(self.psf, archive) 

661 case PSFExSerializationModel(): 

662 return PSFExWrapper.deserialize(self.psf, archive) 

663 case GaussianPSFSerializationModel(): 

664 return GaussianPointSpreadFunction.deserialize(self.psf, archive) 

665 case _: 

666 raise ArchiveReadError("PSF model type not recognized.") 

667 except ArchiveReadError as err: 

668 return err 

669 

670 

671def _extract_or_check_value[T]( 

672 key: str, 

673 given_value: T | None, 

674 *sources: tuple[str, T | None], 

675) -> T: 

676 # Compare given value against multiple sources. If given value is not 

677 # supplied return the first non-None value in the reference sources. 

678 if given_value is not None: 

679 for source_name, source_value in sources: 

680 if source_value is not None and source_value != given_value: 

681 raise ValueError( 

682 f"Given value {given_value!r} does not match {source_value!r} from {source_name}." 

683 ) 

684 if source_value is not None: 

685 # Only check the first non-None source rather than checking 

686 # all supplied values. 

687 break 

688 return given_value 

689 

690 for _, source_value in sources: 

691 if source_value is not None: 

692 return source_value 

693 

694 raise ValueError(f"No value found for {key}.") 

695 

696 

697def _extract_or_check_header[T]( 

698 key: str, given_value: T | None, header: Any, obs_info_value: T | None, coerce: Callable[[Any], T] 

699) -> T: 

700 hdr_value: T | None = None 

701 if (hdr_raw_value := header.get(key)) is not None: 

702 hdr_value = coerce(hdr_raw_value) 

703 return _extract_or_check_value( 

704 key, given_value, ("ObservationInfo", obs_info_value), (f"header key {key}", hdr_value) 

705 )