Coverage for python / lsst / images / _visit_image.py: 25%

252 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-17 09:16 +0000

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14__all__ = ("VisitImage", "VisitImageSerializationModel") 

15 

16import functools 

17import warnings 

18from collections.abc import Callable, Mapping, MutableMapping 

19from types import EllipsisType 

20from typing import Any, Literal, cast, overload 

21 

22import astropy.io.fits 

23import astropy.units 

24import astropy.wcs 

25import pydantic 

26from astro_metadata_translator import ObservationInfo, VisitInfoTranslator 

27 

28from ._geom import Box 

29from ._image import Image, ImageSerializationModel 

30from ._mask import Mask, MaskPlane, MaskSchema, MaskSerializationModel, get_legacy_visit_image_mask_planes 

31from ._masked_image import MaskedImage, MaskedImageSerializationModel 

32from ._transforms import DetectorFrame, Projection, ProjectionAstropyView, ProjectionSerializationModel 

33from .fits import FitsOpaqueMetadata 

34from .psfs import ( 

35 GaussianPointSpreadFunction, 

36 GaussianPSFSerializationModel, 

37 PiffSerializationModel, 

38 PiffWrapper, 

39 PointSpreadFunction, 

40 PSFExSerializationModel, 

41 PSFExWrapper, 

42) 

43from .serialization import ArchiveReadError, InputArchive, MetadataValue, OutputArchive 

44 

45 

46def _obs_info_from_md(md: MutableMapping[str, Any], visit_info: Any = None) -> ObservationInfo: 

47 # Try to get an ObservationInfo from the primary header as if 

48 # it's a raw header. Else fallback. 

49 try: 

50 obs_info = ObservationInfo.from_header(md, quiet=True) 

51 except ValueError: 

52 # Not known translator. Must fall back to visit info. If we have 

53 # an actual VisitInfo, serialize it since we know that it will be 

54 # complete. 

55 if visit_info is not None: 

56 from lsst.afw.image import setVisitInfoMetadata 

57 from lsst.daf.base import PropertyList 

58 

59 pl = PropertyList() 

60 setVisitInfoMetadata(pl, visit_info) 

61 # Merge so that we still have access to butler provenance. 

62 md.update(pl) 

63 

64 # Try the given header looking for VisitInfo hints. 

65 # We get lots of warnings if nothing can be found. Currently 

66 # no way to disable those without capturing them. 

67 obs_info = ObservationInfo.from_header(md, translator_class=VisitInfoTranslator, quiet=True) 

68 return obs_info 

69 

70 

71def _update_obs_info_from_legacy( 

72 obs_info: ObservationInfo, detector: Any = None, filter_label: Any = None 

73) -> ObservationInfo: 

74 extra_md: dict[str, str | int] = {} 

75 

76 if filter_label is not None and filter_label.hasBandLabel(): 

77 extra_md["physical_filter"] = filter_label.physicalLabel 

78 

79 # Fill in detector metadata, check for consistency. 

80 # ObsInfo detector name and group can not be derived from 

81 # the getName() information without knowing how the components 

82 # are separated. 

83 if detector is not None: 

84 detector_md = { 

85 "detector_num": detector.getId(), 

86 "detector_serial": detector.getSerial(), 

87 "detector_unique_name": detector.getName(), 

88 } 

89 extra_md.update(detector_md) 

90 

91 obs_info_updates: dict[str, str | int] = {} 

92 for k, v in extra_md.items(): 

93 current = getattr(obs_info, k) 

94 if current is None: 

95 obs_info_updates[k] = v 

96 continue 

97 if current != v: 

98 raise RuntimeError( 

99 f"ObservationInfo contains value for '{k}' that is inconsistent " 

100 f"with given legacy object: {v} != {current}" 

101 ) 

102 

103 if obs_info_updates: 

104 obs_info = obs_info.model_copy(update=obs_info_updates) 

105 return obs_info 

106 

107 

108class VisitImage(MaskedImage): 

109 """A calibrated single-visit image. 

110 

111 Parameters 

112 ---------- 

113 image 

114 The main image plane. If this has a `Projection`, it will be used 

115 for all planes unless a ``projection`` is passed separately. 

116 mask 

117 A bitmask image that annotates the main image plane. Must have the 

118 same bounding box as ``image`` if provided. Any attached projection 

119 is replaced (possibly by `None`). 

120 variance 

121 The per-pixel uncertainty of the main image as an image of variance 

122 values. Must have the same bounding box as ``image`` if provided, and 

123 its units must be the square of ``image.unit`` or `None`. 

124 Values default to ``1.0``. Any attached projection is replaced 

125 (possibly by `None`). 

126 mask_schema 

127 Schema for the mask plane. Must be provided if and only if ``mask`` is 

128 not provided. 

129 projection 

130 Projection that maps the pixel grid to the sky. Can only be `None` if 

131 a projection is already attached to ``image``. 

132 psf 

133 Point-spread function model for this image, or an exception explaining 

134 why it could not be read (to be raised if the PSF is requested later). 

135 obs_info 

136 General information about this visit in standardized form. 

137 metadata 

138 Arbitrary flexible metadata to associate with the image. 

139 """ 

140 

141 def __init__( 

142 self, 

143 image: Image, 

144 *, 

145 mask: Mask | None = None, 

146 variance: Image | None = None, 

147 mask_schema: MaskSchema | None = None, 

148 projection: Projection[DetectorFrame] | None = None, 

149 obs_info: ObservationInfo | None = None, 

150 psf: PointSpreadFunction | ArchiveReadError, 

151 metadata: dict[str, MetadataValue] | None = None, 

152 ): 

153 super().__init__( 

154 image, 

155 mask=mask, 

156 variance=variance, 

157 mask_schema=mask_schema, 

158 projection=projection, 

159 obs_info=obs_info, 

160 metadata=metadata, 

161 ) 

162 if self.image.unit is None: 

163 raise TypeError("The image component of a VisitImage must have units.") 

164 if self.image.projection is None: 

165 raise TypeError("The projection component of a VisitImage cannot be None.") 

166 if self.image.obs_info is None: 

167 raise TypeError("The observation info component of a VisitImage cannot be None.") 

168 if not isinstance(self.image.projection.pixel_frame, DetectorFrame): 

169 raise TypeError("The projection's pixel frame must be a DetectorFrame for VisitImage.") 

170 self._psf = psf 

171 

172 @property 

173 def unit(self) -> astropy.units.UnitBase: 

174 """The units of the image plane (`astropy.units.Unit`).""" 

175 return cast(astropy.units.UnitBase, super().unit) 

176 

177 @property 

178 def projection(self) -> Projection[DetectorFrame]: 

179 """The projection that maps the pixel grid to the sky 

180 (`Projection` [`DetectorFrame`]). 

181 """ 

182 return cast(Projection[DetectorFrame], super().projection) 

183 

184 @property 

185 def obs_info(self) -> ObservationInfo: 

186 """General information about this observation in standard form. 

187 (`~astro_metadata_translator.ObservationInfo`). 

188 """ 

189 obs_info = self.image.obs_info 

190 assert obs_info is not None 

191 return obs_info 

192 

193 @property 

194 def astropy_wcs(self) -> ProjectionAstropyView: 

195 """An Astropy WCS for the pixel arrays (`ProjectionAstropyView`). 

196 

197 Notes 

198 ----- 

199 As expected for Astropy WCS objects, this defines pixel coordinates 

200 such that the first row and column in the arrays are ``(0, 0)``, not 

201 ``bbox.start``, as is the case for `projection`. 

202 

203 This object satisfies the `astropy.wcs.wcsapi.BaseHighLevelWCS` and 

204 `astropy.wcs.wcsapi.BaseLowLevelWCS` interfaces, but it is not an 

205 `astropy.wcs.WCS` (use `fits_wcs` for that). 

206 """ 

207 return cast(ProjectionAstropyView, super().astropy_wcs) 

208 

209 @property 

210 def psf(self) -> PointSpreadFunction: 

211 """The point-spread function model for this image 

212 (`.psfs.PointSpreadFunction`). 

213 """ 

214 if isinstance(self._psf, ArchiveReadError): 

215 raise self._psf 

216 return self._psf 

217 

218 def __getitem__(self, bbox: Box | EllipsisType) -> VisitImage: 

219 if bbox is ...: 

220 return self 

221 super().__getitem__(bbox) 

222 return self._transfer_metadata( 

223 VisitImage( 

224 self.image[bbox], 

225 mask=self.mask[bbox], 

226 variance=self.variance[bbox], 

227 projection=self.projection, 

228 psf=self.psf, 

229 obs_info=self.obs_info, 

230 ), 

231 bbox=bbox, 

232 ) 

233 

234 def __str__(self) -> str: 

235 return f"VisitImage({self.image!s}, {list(self.mask.schema.names)})" 

236 

237 def __repr__(self) -> str: 

238 return f"VisitImage({self.image!r}, mask_schema={self.mask.schema!r})" 

239 

240 def copy(self) -> VisitImage: 

241 """Deep-copy the visit image.""" 

242 return self._transfer_metadata( 

243 VisitImage( 

244 image=self._image.copy(), 

245 mask=self._mask.copy(), 

246 variance=self._variance.copy(), 

247 psf=self._psf, 

248 obs_info=self.obs_info, 

249 ), 

250 copy=True, 

251 ) 

252 

253 def serialize(self, archive: OutputArchive[Any]) -> VisitImageSerializationModel: 

254 serialized_image = archive.serialize_direct( 

255 "image", functools.partial(self.image.serialize, save_projection=False) 

256 ) 

257 serialized_mask = archive.serialize_direct( 

258 "mask", functools.partial(self.mask.serialize, save_projection=False) 

259 ) 

260 serialized_variance = archive.serialize_direct( 

261 "variance", functools.partial(self.variance.serialize, save_projection=False) 

262 ) 

263 serialized_projection = archive.serialize_direct("projection", self.projection.serialize) 

264 serialized_psf: PiffSerializationModel | PSFExSerializationModel | GaussianPSFSerializationModel 

265 match self._psf: 

266 # MyPy is able to figure things out here with this match statement, 

267 # but not a single isinstance check on both types. 

268 case PiffWrapper(): 

269 serialized_psf = archive.serialize_direct("psf", self._psf.serialize) 

270 case PSFExWrapper(): 

271 serialized_psf = archive.serialize_direct("psf", self._psf.serialize) 

272 case GaussianPointSpreadFunction(): 

273 serialized_psf = archive.serialize_direct("psf", self._psf.serialize) 

274 case _: 

275 raise TypeError( 

276 f"Cannot serialize VisitImage with unrecognized PSF type {type(self._psf).__name__}." 

277 ) 

278 return VisitImageSerializationModel( 

279 image=serialized_image, 

280 mask=serialized_mask, 

281 variance=serialized_variance, 

282 projection=serialized_projection, 

283 psf=serialized_psf, 

284 obs_info=self.obs_info, 

285 metadata=self.metadata, 

286 ) 

287 

288 # Type-checkers want the model argument to only require 

289 # MaskedImageSerializationModel[Any], and they'd be absolutely right if 

290 # this were a regular instance method. But whether Liskov substitution 

291 # applies to classmethods and staticmethods is sort of context-dependent, 

292 # and here we do not want it to. 

293 @staticmethod 

294 def deserialize( 

295 model: VisitImageSerializationModel[Any], # type: ignore[override] 

296 archive: InputArchive[Any], 

297 *, 

298 bbox: Box | None = None, 

299 ) -> VisitImage: 

300 masked_image = MaskedImage.deserialize(model, archive, bbox=bbox) 

301 psf = model.deserialize_psf(archive) 

302 projection = Projection.deserialize(model.projection, archive) 

303 return VisitImage( 

304 masked_image.image, 

305 mask=masked_image.mask, 

306 variance=masked_image.variance, 

307 psf=psf, 

308 projection=projection, 

309 obs_info=model.obs_info, 

310 )._finish_deserialize(model) 

311 

312 @staticmethod 

313 def _get_archive_tree_type[P: pydantic.BaseModel]( 

314 pointer_type: type[P], 

315 ) -> type[VisitImageSerializationModel[P]]: 

316 """Return the serialization model type for this object for an archive 

317 type that uses the given pointer type. 

318 """ 

319 return VisitImageSerializationModel[pointer_type] # type: ignore 

320 

321 # write_fits and read_fits inherited from MaskedImage. 

322 

323 @staticmethod 

324 def from_legacy( 

325 legacy: Any, 

326 *, 

327 unit: astropy.units.Unit | None = None, 

328 plane_map: Mapping[str, MaskPlane] | None = None, 

329 instrument: str | None = None, 

330 visit: int | None = None, 

331 ) -> VisitImage: 

332 """Convert from an `lsst.afw.image.Exposure` instance. 

333 

334 Parameters 

335 ---------- 

336 legacy 

337 An `lsst.afw.image.Exposure` instance that will share image and 

338 variance (but not mask) pixel data with the returned object. 

339 unit 

340 Units of the image. If not provided, the ``BUNIT`` metadata 

341 key will be used, if available. 

342 plane_map 

343 A mapping from legacy mask plane name to the new plane name and 

344 description. If `None` (default) 

345 `get_legacy_visit_image_mask_planes` is used. 

346 instrument 

347 Name of the instrument. Extracted from the metadata if not 

348 provided. 

349 visit 

350 ID of the visit. Extracted from the metadata if not provided. 

351 """ 

352 if plane_map is None: 

353 plane_map = get_legacy_visit_image_mask_planes() 

354 md = legacy.getMetadata() 

355 obs_info = _obs_info_from_md(md, visit_info=legacy.info.getVisitInfo()) 

356 instrument = _extract_or_check_header( 

357 "LSST BUTLER DATAID INSTRUMENT", instrument, md, obs_info.instrument, str 

358 ) 

359 visit = _extract_or_check_header("LSST BUTLER DATAID VISIT", visit, md, None, int) 

360 unit = _extract_or_check_header( 

361 "BUNIT", unit, md, None, lambda x: astropy.units.Unit(x, format="fits") 

362 ) 

363 legacy_wcs = legacy.getWcs() 

364 if legacy_wcs is None: 

365 raise ValueError("Exposure does not have a SkyWcs.") 

366 legacy_detector = legacy.getDetector() 

367 if legacy_detector is None: 

368 raise ValueError("Exposure does not have a Detector.") 

369 detector_bbox = Box.from_legacy(legacy_detector.getBBox()) 

370 

371 # Update the ObservationInfo from other components. 

372 obs_info = _update_obs_info_from_legacy(obs_info, legacy_detector, legacy.info.getFilter()) 

373 

374 opaque_fits_metadata = FitsOpaqueMetadata() 

375 primary_header = astropy.io.fits.Header() 

376 with warnings.catch_warnings(): 

377 # Silence warnings about long keys becoming HIERARCH. 

378 warnings.simplefilter("ignore", category=astropy.io.fits.verify.VerifyWarning) 

379 primary_header.update(md.toOrderedDict()) 

380 opaque_fits_metadata.extract_legacy_primary_header(primary_header) 

381 projection = Projection.from_legacy( 

382 legacy_wcs, 

383 DetectorFrame( 

384 instrument=instrument, 

385 visit=visit, 

386 detector=legacy_detector.getId(), 

387 bbox=detector_bbox, 

388 ), 

389 ) 

390 legacy_psf = legacy.getPsf() 

391 if legacy_psf is None: 

392 raise ValueError("Exposure file does not have a Psf.") 

393 psf = PointSpreadFunction.from_legacy(legacy_psf, bounds=detector_bbox) 

394 masked_image = MaskedImage.from_legacy(legacy.getMaskedImage(), unit=unit, plane_map=plane_map) 

395 result = VisitImage( 

396 image=masked_image.image.view(unit=unit), 

397 mask=masked_image.mask, 

398 variance=masked_image.variance, 

399 projection=projection, 

400 psf=psf, 

401 obs_info=obs_info, 

402 ) 

403 

404 result._opaque_metadata = opaque_fits_metadata 

405 return result 

406 

407 @overload # type: ignore[override] 

408 @staticmethod 

409 def read_legacy( 409 ↛ exitline 409 didn't return from function 'read_legacy' because

410 filename: str, 

411 *, 

412 component: Literal["bbox"], 

413 ) -> Box: ... 

414 

415 @overload 

416 @staticmethod 

417 def read_legacy( 417 ↛ exitline 417 didn't return from function 'read_legacy' because

418 filename: str, 

419 *, 

420 preserve_quantization: bool = False, 

421 instrument: str | None = None, 

422 visit: int | None = None, 

423 component: Literal["image"], 

424 ) -> Image: ... 

425 

426 @overload 

427 @staticmethod 

428 def read_legacy( 428 ↛ exitline 428 didn't return from function 'read_legacy' because

429 filename: str, 

430 *, 

431 plane_map: Mapping[str, MaskPlane] | None = None, 

432 instrument: str | None = None, 

433 visit: int | None = None, 

434 component: Literal["mask"], 

435 ) -> Mask: ... 

436 

437 @overload 

438 @staticmethod 

439 def read_legacy( 439 ↛ exitline 439 didn't return from function 'read_legacy' because

440 filename: str, 

441 *, 

442 preserve_quantization: bool = False, 

443 instrument: str | None = None, 

444 visit: int | None = None, 

445 component: Literal["variance"], 

446 ) -> Image: ... 

447 

448 @overload 

449 @staticmethod 

450 def read_legacy( 450 ↛ exitline 450 didn't return from function 'read_legacy' because

451 filename: str, 

452 *, 

453 instrument: str | None = None, 

454 visit: int | None = None, 

455 component: Literal["projection"], 

456 ) -> Projection[DetectorFrame]: ... 

457 

458 @overload 

459 @staticmethod 

460 def read_legacy( 460 ↛ exitline 460 didn't return from function 'read_legacy' because

461 filename: str, 

462 *, 

463 component: Literal["psf"], 

464 ) -> PointSpreadFunction: ... 

465 

466 @overload 

467 @staticmethod 

468 def read_legacy( 468 ↛ exitline 468 didn't return from function 'read_legacy' because

469 filename: str, 

470 *, 

471 component: Literal["obs_info"], 

472 ) -> ObservationInfo: ... 

473 

474 @overload 

475 @staticmethod 

476 def read_legacy( 476 ↛ exitline 476 didn't return from function 'read_legacy' because

477 filename: str, 

478 *, 

479 preserve_quantization: bool = False, 

480 plane_map: Mapping[str, MaskPlane] | None = None, 

481 instrument: str | None = None, 

482 visit: int | None = None, 

483 component: None = None, 

484 ) -> VisitImage: ... 

485 

486 @staticmethod 

487 def read_legacy( # type: ignore[override] 

488 filename: str, 

489 *, 

490 preserve_quantization: bool = False, 

491 plane_map: Mapping[str, MaskPlane] | None = None, 

492 instrument: str | None = None, 

493 visit: int | None = None, 

494 component: Literal["bbox", "image", "mask", "variance", "projection", "psf", "obs_info"] 

495 | None = None, 

496 ) -> Any: 

497 """Read a FITS file written by `lsst.afw.image.Exposure.writeFits`. 

498 

499 Parameters 

500 ---------- 

501 filename 

502 Full name of the file. 

503 preserve_quantization 

504 If `True`, ensure that writing the masked image back out again will 

505 exactly preserve quantization-compressed pixel values. This causes 

506 the image and variance plane arrays to be marked as read-only and 

507 stores the original binary table data for those planes in memory. 

508 If the `MaskedImage` is copied, the precompressed pixel values are 

509 not transferred to the copy. 

510 plane_map 

511 A mapping from legacy mask plane name to the new plane name and 

512 description. If `None` (default) 

513 `get_legacy_visit_image_mask_planes` is used. 

514 instrument 

515 Name of the instrument. Read from the primary header if not 

516 provided. 

517 visit 

518 ID of the visit. Read from the primary header if not 

519 provided. 

520 component 

521 A component to read instead of the full image. 

522 """ 

523 from lsst.afw.image import ExposureFitsReader 

524 

525 reader = ExposureFitsReader(filename) 

526 if component == "bbox": 

527 return Box.from_legacy(reader.readBBox()) 

528 legacy_detector = reader.readDetector() 

529 if legacy_detector is None: 

530 raise ValueError(f"Exposure file {filename!r} does not have a Detector.") 

531 detector_bbox = Box.from_legacy(legacy_detector.getBBox()) 

532 legacy_wcs = None 

533 if component in (None, "image", "mask", "variance", "projection"): 

534 legacy_wcs = reader.readWcs() 

535 if legacy_wcs is None: 

536 raise ValueError(f"Exposure file {filename!r} does not have a SkyWcs.") 

537 if component in (None, "psf"): 

538 legacy_psf = reader.readPsf() 

539 if legacy_psf is None: 

540 raise ValueError(f"Exposure file {filename!r} does not have a Psf.") 

541 psf = PointSpreadFunction.from_legacy(legacy_psf, bounds=detector_bbox) 

542 if component == "psf": 

543 return psf 

544 assert component in (None, "image", "mask", "variance", "projection", "obs_info"), ( 

545 component 

546 ) # for MyPy 

547 with astropy.io.fits.open(filename) as hdu_list: 

548 primary_header = hdu_list[0].header 

549 obs_info = _obs_info_from_md(primary_header) 

550 obs_info = _update_obs_info_from_legacy(obs_info, legacy_detector, reader.readFilter()) 

551 if component == "obs_info": 

552 return obs_info 

553 instrument = _extract_or_check_header( 

554 "LSST BUTLER DATAID INSTRUMENT", instrument, primary_header, obs_info.instrument, str 

555 ) 

556 visit = _extract_or_check_header("LSST BUTLER DATAID VISIT", visit, primary_header, None, int) 

557 projection = Projection.from_legacy( 

558 legacy_wcs, 

559 DetectorFrame( 

560 instrument=instrument, 

561 visit=visit, 

562 detector=legacy_detector.getId(), 

563 bbox=detector_bbox, 

564 ), 

565 ) 

566 if component == "projection": 

567 return projection 

568 if plane_map is None: 

569 plane_map = get_legacy_visit_image_mask_planes() 

570 assert component != "psf", component # for MyPy 

571 from_masked_image = MaskedImage._read_legacy_hdus( 

572 hdu_list, 

573 filename, 

574 preserve_quantization=preserve_quantization, 

575 plane_map=plane_map, 

576 component=component, 

577 ) 

578 if component is not None: 

579 # This is the image, mask, or variance; attach the projection 

580 # and return 

581 return from_masked_image.view(projection=projection) 

582 result = VisitImage( 

583 from_masked_image.image, 

584 mask=from_masked_image.mask, 

585 variance=from_masked_image.variance, 

586 projection=projection, 

587 psf=psf, 

588 obs_info=obs_info, 

589 ) 

590 result._opaque_metadata = from_masked_image._opaque_metadata 

591 return result 

592 

593 

594class VisitImageSerializationModel[P: pydantic.BaseModel](MaskedImageSerializationModel[P]): 

595 """A Pydantic model used to represent a serialized `VisitImage`.""" 

596 

597 # Inherited attributes are duplicated because that improves the docs 

598 # (some limitation in the sphinx/pydantic integration), and these are 

599 # important docs. 

600 

601 image: ImageSerializationModel[P] = pydantic.Field(description="The main data image.") 

602 mask: MaskSerializationModel[P] = pydantic.Field( 

603 description="Bitmask that annotates the main image's pixels." 

604 ) 

605 variance: ImageSerializationModel[P] = pydantic.Field( 

606 description="Per-pixel variance estimates for the main image." 

607 ) 

608 projection: ProjectionSerializationModel[P] = pydantic.Field( 

609 description="Projection that maps the pixel grid to the sky.", 

610 ) 

611 psf: PiffSerializationModel | PSFExSerializationModel | GaussianPSFSerializationModel | Any = ( 

612 pydantic.Field(union_mode="left_to_right", description="PSF model for the image.") 

613 ) 

614 obs_info: ObservationInfo = pydantic.Field( 

615 description="Standardized description of visit metadata", 

616 ) 

617 

618 def deserialize_psf(self, archive: InputArchive[Any]) -> PointSpreadFunction | ArchiveReadError: 

619 """Finish deserializing the PSF model, or *return* any exception 

620 raised in the attempt. 

621 """ 

622 try: 

623 match self.psf: 

624 case PiffSerializationModel(): 

625 return PiffWrapper.deserialize(self.psf, archive) 

626 case PSFExSerializationModel(): 

627 return PSFExWrapper.deserialize(self.psf, archive) 

628 case GaussianPSFSerializationModel(): 

629 return GaussianPointSpreadFunction.deserialize(self.psf, archive) 

630 case _: 

631 raise ArchiveReadError("PSF model type not recognized.") 

632 except ArchiveReadError as err: 

633 return err 

634 

635 

636def _extract_or_check_value[T]( 

637 key: str, 

638 given_value: T | None, 

639 *sources: tuple[str, T | None], 

640) -> T: 

641 # Compare given value against multiple sources. If given value is not 

642 # supplied return the first non-None value in the reference sources. 

643 if given_value is not None: 

644 for source_name, source_value in sources: 

645 if source_value is not None and source_value != given_value: 

646 raise ValueError( 

647 f"Given value {given_value!r} does not match {source_value!r} from {source_name}." 

648 ) 

649 if source_value is not None: 

650 # Only check the first non-None source rather than checking 

651 # all supplied values. 

652 break 

653 return given_value 

654 

655 for _, source_value in sources: 

656 if source_value is not None: 

657 return source_value 

658 

659 raise ValueError(f"No value found for {key}.") 

660 

661 

662def _extract_or_check_header[T]( 

663 key: str, given_value: T | None, header: Any, obs_info_value: T | None, coerce: Callable[[Any], T] 

664) -> T: 

665 hdr_value: T | None = None 

666 if (hdr_raw_value := header.get(key)) is not None: 

667 hdr_value = coerce(hdr_raw_value) 

668 return _extract_or_check_value( 

669 key, given_value, ("ObservationInfo", obs_info_value), (f"header key {key}", hdr_value) 

670 )