Coverage for python / lsst / images / psfs / _piff.py: 37%

177 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-26 09:16 +0000

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14from lsst.images.serialization import ArchiveReadError 

15 

16__all__ = ("PiffSerializationModel", "PiffWrapper") 

17 

18import operator 

19from collections.abc import Iterator 

20from contextlib import contextmanager 

21from functools import cached_property 

22from logging import getLogger 

23from typing import TYPE_CHECKING, Annotated, Any, Literal 

24 

25import astropy.io.fits 

26import numpy as np 

27import pydantic 

28 

29from .. import serialization 

30from .._concrete_bounds import SerializableBounds 

31from .._geom import Bounds, Box 

32from .._image import Image 

33from ..utils import round_half_up 

34from ._base import PointSpreadFunction 

35 

36if TYPE_CHECKING: 

37 import galsim.wcs 

38 import piff.config 

39 

40_LOG = getLogger(__name__) 

41 

42 

43class PiffWrapper(PointSpreadFunction): 

44 """A PSF model backed by the Piff library. 

45 

46 Parameters 

47 ---------- 

48 impl 

49 The Piff PSF object to wrap. 

50 bounds 

51 The pixel-coordinate region where the model can safely be evaluated. 

52 """ 

53 

54 def __init__(self, impl: piff.PSF, bounds: Bounds, stamp_size: int): 

55 self._impl = impl 

56 self._bounds = bounds 

57 self._stamp_size = stamp_size 

58 

59 @property 

60 def bounds(self) -> Bounds: 

61 return self._bounds 

62 

63 @cached_property 

64 def kernel_bbox(self) -> Box: 

65 r = self._stamp_size // 2 

66 return Box.factory[-r : r + 1, -r : r + 1] 

67 

68 def compute_kernel_image(self, *, x: float, y: float) -> Image: 

69 if "colorValue" in self._impl.interp_property_names: 

70 raise NotImplementedError("Chromatic PSFs are not yet supported.") 

71 gs_image = self._impl.draw(x, y, stamp_size=self._stamp_size, center=True) 

72 r = self._stamp_size // 2 

73 result = Image(gs_image.array.copy(), start=(-r, -r)) 

74 result.array /= np.sum(result.array) 

75 return result 

76 

77 def compute_stellar_image(self, *, x: float, y: float) -> Image: 

78 if "colorValue" in self._impl.interp_property_names: 

79 raise NotImplementedError("Chromatic PSFs are not yet supported.") 

80 gs_image = self._impl.draw(x, y, stamp_size=self._stamp_size, center=None) 

81 r = self._stamp_size // 2 

82 result = Image(gs_image.array.copy(), start=(round_half_up(y) - r, round_half_up(x) - r)) 

83 result.array /= np.sum(result.array) 

84 return result 

85 

86 def compute_stellar_bbox(self, *, x: float, y: float) -> Box: 

87 r = self._stamp_size // 2 

88 xi = round_half_up(x) 

89 yi = round_half_up(y) 

90 return Box.factory[yi - r : yi + r + 1, xi - r : xi + r + 1] 

91 

92 @property 

93 def piff_psf(self) -> Any: 

94 """The backing `piff.PSF` object. 

95 

96 This is an internal object that must not be modified in place. 

97 """ 

98 return self._impl 

99 

100 @classmethod 

101 def from_legacy(cls, legacy_psf: Any, bounds: Bounds) -> PiffWrapper: 

102 return cls(impl=legacy_psf._piffResult, bounds=bounds, stamp_size=int(legacy_psf.width)) 

103 

104 def serialize(self, archive: serialization.OutputArchive[Any]) -> PiffSerializationModel: 

105 """Serialize the PSF to an archive. 

106 

107 This method is intended to be usable as the callback function passed to 

108 `.serialization.OutputArchive.serialize_direct` or 

109 `.serialization.OutputArchive.serialize_pointer`. 

110 """ 

111 from piff.config import PiffLogger 

112 

113 writer = _ArchivePiffWriter() 

114 with self._without_stars(): 

115 self._impl._write(writer, "piff", PiffLogger(_LOG)) 

116 piff_model = writer.serialize(archive) 

117 return PiffSerializationModel( 

118 piff=piff_model, 

119 stamp_size=self._stamp_size, 

120 bounds=self._bounds.serialize(), 

121 ) 

122 

123 @classmethod 

124 def deserialize( 

125 cls, model: PiffSerializationModel, archive: serialization.InputArchive[Any] 

126 ) -> PiffWrapper: 

127 """Deserialize the PSF from an archive. 

128 

129 This method is intended to be usable as the callback function passed to 

130 `.serialization.InputArchive.deserialize_pointer`. 

131 """ 

132 try: 

133 from piff import PSF 

134 from piff.config import PiffLogger 

135 except ImportError: 

136 raise ArchiveReadError("Failed to import piff.") from None 

137 

138 reader = _ArchivePiffReader(model.piff, archive) 

139 impl = PSF._read(reader, "piff", PiffLogger(_LOG)) 

140 return cls(impl, bounds=Bounds.deserialize(model.bounds), stamp_size=model.stamp_size) 

141 

142 @staticmethod 

143 def _get_archive_tree_type( 

144 pointer_type: type[pydantic.BaseModel], 

145 ) -> type[PiffSerializationModel]: 

146 """Return the serialization model type for this object for an archive 

147 type that uses the given pointer type. 

148 """ 

149 return PiffSerializationModel 

150 

151 @contextmanager 

152 def _without_stars(self) -> Iterator[None]: 

153 """Temporarily drop the embedded list of stars used to fit the PSF. 

154 

155 Notes 

156 ----- 

157 By default Piff saves the list of stars (including postage stamps) used 

158 to fit the PSF, which makes the serialized form much larger. But the 

159 upstream Piff serialization code recognizes the case where that 

160 ``stars`` attribute has been deleted and serializes everything else. 

161 

162 Unfortunately, to date, Rubin's pickle-based Piff serialization instead 

163 just deletes the postage stamp image attributes from inside the Piff 

164 ``stars`` list, which is not a state the Piff serialization code 

165 handles gracefully. So for now we have to drop the full stars list 

166 during serialization if it is present. 

167 """ 

168 if hasattr(self._impl, "stars"): 

169 stars = self._impl.stars 

170 try: 

171 del self._impl.stars 

172 yield 

173 finally: 

174 self._impl.stars = stars 

175 else: 

176 yield 

177 

178 

179# Conventions on public visibility of the serialization types: 

180# 

181# - We lift and document the outermost Pydantic model type, since that needs to 

182# be included directly in the Pydantic models of types that hold a PSF. This 

183# type needs to be very clearly documented and named as a *serialization* 

184# model, since there are many other kinds of models in play in this package. 

185# 

186# - We do not lift or document types used in that outermost model, but we do 

187# not give them leading underscores, since they aren't really private. 

188# 

189# - Other utility types do get leading underscores. 

190 

191 

192# Piff serialization uses a lot of dictionaries and lists restricted to these 

193# basic types. 

194type PiffScalar = int | float | str | bool | None 

195type PiffDict = dict[str, PiffScalar | list[PiffScalar]] 

196 

197 

198class GalSimPixelScaleModel(pydantic.BaseModel, ser_json_inf_nan="constants"): 

199 """Model used to serialize `galsim.wcs.PixelScale` instances.""" 

200 

201 scale: float 

202 wcs_type: Literal["pixel_scale"] = "pixel_scale" 

203 

204 

205# We expect this discriminated union to grow to include other trivial 

206# pixel-to-pixel transforms that get embedded in PSFs. If we someday have to 

207# store Piff objects that embed more sophisticated PSFs, we'll hook them into 

208# the AST-based coordinate transform system instead, but as long as we're just 

209# talking about simple offsets and scalings, that's a lot of extra complexity 

210# for very little gain. 

211type GalSimLocalWcsModel = Annotated[GalSimPixelScaleModel, pydantic.Field(discriminator="wcs_type")] 

212 

213 

214class PiffTableModel(pydantic.BaseModel, ser_json_inf_nan="constants"): 

215 """Serialization model used to embed a reference to a binary-data table in 

216 a Piff serialization's JSON-like data. 

217 """ 

218 

219 metadata: PiffDict 

220 table: serialization.TableReferenceModel 

221 

222 

223class PiffObjectModel(pydantic.BaseModel, ser_json_inf_nan="constants"): 

224 """General-purpose serialization model used for various Piff objects.""" 

225 

226 structs: dict[str, PiffDict] = pydantic.Field(default_factory=dict, exclude_if=operator.not_) 

227 tables: dict[str, PiffTableModel] = pydantic.Field(default_factory=dict, exclude_if=operator.not_) 

228 wcs: dict[str, GalSimLocalWcsModel] = pydantic.Field(default_factory=dict, exclude_if=operator.not_) 

229 objects: dict[str, PiffObjectModel] = pydantic.Field(default_factory=dict, exclude_if=operator.not_) 

230 

231 

232class PiffSerializationModel(serialization.ArchiveTree): 

233 """Serialization model for a Piff PSF.""" 

234 

235 piff: PiffObjectModel = pydantic.Field(description="The Piff PSF object itself.") 

236 

237 stamp_size: int = pydantic.Field( 

238 description="Width of the (square) images returned by this PSF's methods." 

239 ) 

240 

241 bounds: SerializableBounds = pydantic.Field( 

242 description="The bounds object that represents the PSF's validity region." 

243 ) 

244 

245 

246class _ArchivePiffWriter: 

247 """An adapter from the Piff serialization interface to the 

248 `.serialization.OutputArchive` class. 

249 

250 Notes 

251 ----- 

252 Piff has its own simple serialization framework (contributed upstream by 

253 Rubin DM) that maps everything to dictionaries, structured numpy arrays, 

254 and a library of GalSim WCS objects, with the native implementation writing 

255 standalone FITS files. That mostly maps nicely to the `lsst.images` 

256 archive system, but we don't get to leverage any Pydantic validation or 

257 JSON schema functionality since we only get opaque dictionaries from Piff. 

258 

259 See `piff.FitsWriter` for most method documentation; this class is designed 

260 to mimic it exactly (the Piff authors prefer to just use duck-typing rather 

261 than ABCs or protocols for interface definition). 

262 """ 

263 

264 def __init__(self, base_name: str = ""): 

265 self._base_name = base_name 

266 self.structs: dict[str, PiffDict] = {} 

267 self.tables: dict[str, tuple[np.ndarray, PiffDict]] = {} 

268 self.wcs_models: dict[str, GalSimLocalWcsModel] = {} 

269 self.writers: dict[str, _ArchivePiffWriter] = {} 

270 

271 def write_struct(self, name: str, struct: PiffDict) -> None: 

272 self.structs[name] = {k: self._to_builtin(v) for k, v in struct.items()} 

273 

274 def write_table(self, name: str, array: np.ndarray, metadata: PiffDict | None = None) -> None: 

275 self.tables[name] = (array, metadata or {}) 

276 

277 def write_wcs_map( 

278 self, name: str, wcs_map: dict[int, galsim.wcs.BaseWCS], pointing: galsim.CelestialCoord | None 

279 ) -> None: 

280 import galsim.wcs 

281 

282 match wcs_map: 

283 case {0: galsim.wcs.PixelScale() as wcs} if pointing is None: 

284 self.wcs_models[name] = GalSimPixelScaleModel(scale=wcs.scale) 

285 case _: 

286 raise NotImplementedError("PSFs with complex embedded WCSs are not supported.") 

287 

288 @contextmanager 

289 def nested(self, name: str) -> Iterator[_ArchivePiffWriter]: 

290 nested = _ArchivePiffWriter(self.get_full_name(name)) 

291 yield nested 

292 self.writers[name] = nested 

293 

294 def get_full_name(self, name: str) -> str: 

295 return f"{self._base_name}/{name}" 

296 

297 def serialize(self, archive: serialization.OutputArchive[Any]) -> PiffObjectModel: 

298 """Serialize to an archive. 

299 

300 This method is intended to be used as the callable passed to 

301 `.serialization.OutputArchive.serialize_direct` and 

302 `.serialization.OutputArchive.serialize_pointer`, after first passing 

303 this writer to a Piff object's ``write`` or ``_write`` method. 

304 """ 

305 model = PiffObjectModel() 

306 for name, struct in self.structs.items(): 

307 model.structs[name] = struct 

308 for name, (array, metadata) in self.tables.items(): 

309 model.tables[name] = PiffTableModel( 

310 metadata=metadata, 

311 table=archive.add_structured_array( 

312 array, name=name, update_header=lambda header: header.update(metadata) 

313 ), 

314 ) 

315 for name, wcs_model in self.wcs_models.items(): 

316 model.wcs[name] = wcs_model 

317 for name, writer in self.writers.items(): 

318 model.objects[name] = archive.serialize_direct(name, writer.serialize) 

319 return model 

320 

321 @staticmethod 

322 def _to_builtin(val: Any) -> PiffScalar: 

323 match val: 

324 case np.integer(): 

325 return int(val) 

326 case np.floating(): 

327 return float(val) 

328 case np.str_(): 

329 return str(val) 

330 return val 

331 

332 

333class _ArchivePiffReader: 

334 """An adapter from the Piff serialization interface to the 

335 `.serialization.InputArchive` class. 

336 

337 See `ArchivePiffWriter` for additional notes. 

338 """ 

339 

340 def __init__( 

341 self, object_model: PiffObjectModel, archive: serialization.InputArchive[Any], base_name: str = "" 

342 ): 

343 self._model = object_model 

344 self._archive = archive 

345 self._base_name = base_name 

346 

347 def read_struct(self, name: str) -> PiffDict | None: 

348 return self._model.structs.get(name) 

349 

350 def read_table(self, name: str, metadata: PiffDict | None = None) -> np.ndarray | None: 

351 table_model = self._model.tables.get(name) 

352 if table_model is None: 

353 return None 

354 if metadata is not None: 

355 metadata.update(table_model.metadata) 

356 return self._archive.get_structured_array( 

357 table_model.table, strip_header=astropy.io.fits.Header.clear 

358 ) 

359 

360 def read_wcs_map( 

361 self, name: str, logger: piff.config.LoggerWrapper 

362 ) -> tuple[dict[int, galsim.wcs.BaseWCS] | None, galsim.CelestialCoord | None]: 

363 import galsim.wcs 

364 

365 match self._model.wcs.get(name): 

366 case GalSimPixelScaleModel(scale=scale): 

367 return {0: galsim.wcs.PixelScale(scale)}, None 

368 case None: 

369 return None, None 

370 case unexpected: 

371 raise serialization.ArchiveReadError( 

372 f"{self.get_full_name(name)} should be a WCS or WCS map, not {unexpected!r}." 

373 ) 

374 

375 @contextmanager 

376 def nested(self, name: str) -> Iterator[_ArchivePiffReader]: 

377 nested_model = self._model.objects[name] 

378 yield _ArchivePiffReader(nested_model, self._archive, self.get_full_name(name)) 

379 

380 def get_full_name(self, name: str) -> str: 

381 return f"{self._base_name}/{name}"