Coverage for python / lsst / images / psfs / _piff.py: 37%
177 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:35 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:35 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14from lsst.images.serialization import ArchiveReadError
16__all__ = ("PiffSerializationModel", "PiffWrapper")
18import operator
19from collections.abc import Iterator
20from contextlib import contextmanager
21from functools import cached_property
22from logging import getLogger
23from typing import TYPE_CHECKING, Annotated, Any, Literal
25import astropy.io.fits
26import numpy as np
27import pydantic
29from .. import serialization
30from .._concrete_bounds import SerializableBounds
31from .._geom import Bounds, Box
32from .._image import Image
33from ..utils import round_half_up
34from ._base import PointSpreadFunction
36if TYPE_CHECKING:
37 import galsim.wcs
38 import piff.config
40_LOG = getLogger(__name__)
43class PiffWrapper(PointSpreadFunction):
44 """A PSF model backed by the Piff library.
46 Parameters
47 ----------
48 impl
49 The Piff PSF object to wrap.
50 bounds
51 The pixel-coordinate region where the model can safely be evaluated.
52 """
54 def __init__(self, impl: piff.PSF, bounds: Bounds, stamp_size: int):
55 self._impl = impl
56 self._bounds = bounds
57 self._stamp_size = stamp_size
59 @property
60 def bounds(self) -> Bounds:
61 return self._bounds
63 @cached_property
64 def kernel_bbox(self) -> Box:
65 r = self._stamp_size // 2
66 return Box.factory[-r : r + 1, -r : r + 1]
68 def compute_kernel_image(self, *, x: float, y: float) -> Image:
69 if "colorValue" in self._impl.interp_property_names:
70 raise NotImplementedError("Chromatic PSFs are not yet supported.")
71 gs_image = self._impl.draw(x, y, stamp_size=self._stamp_size, center=True)
72 r = self._stamp_size // 2
73 result = Image(gs_image.array.copy(), start=(-r, -r))
74 result.array /= np.sum(result.array)
75 return result
77 def compute_stellar_image(self, *, x: float, y: float) -> Image:
78 if "colorValue" in self._impl.interp_property_names:
79 raise NotImplementedError("Chromatic PSFs are not yet supported.")
80 gs_image = self._impl.draw(x, y, stamp_size=self._stamp_size, center=None)
81 r = self._stamp_size // 2
82 result = Image(gs_image.array.copy(), start=(round_half_up(y) - r, round_half_up(x) - r))
83 result.array /= np.sum(result.array)
84 return result
86 def compute_stellar_bbox(self, *, x: float, y: float) -> Box:
87 r = self._stamp_size // 2
88 xi = round_half_up(x)
89 yi = round_half_up(y)
90 return Box.factory[yi - r : yi + r + 1, xi - r : xi + r + 1]
92 @property
93 def piff_psf(self) -> Any:
94 """The backing `piff.PSF` object.
96 This is an internal object that must not be modified in place.
97 """
98 return self._impl
100 @classmethod
101 def from_legacy(cls, legacy_psf: Any, bounds: Bounds) -> PiffWrapper:
102 return cls(impl=legacy_psf._piffResult, bounds=bounds, stamp_size=int(legacy_psf.width))
104 def serialize(self, archive: serialization.OutputArchive[Any]) -> PiffSerializationModel:
105 """Serialize the PSF to an archive.
107 This method is intended to be usable as the callback function passed to
108 `.serialization.OutputArchive.serialize_direct` or
109 `.serialization.OutputArchive.serialize_pointer`.
110 """
111 from piff.config import PiffLogger
113 writer = _ArchivePiffWriter()
114 with self._without_stars():
115 self._impl._write(writer, "piff", PiffLogger(_LOG))
116 piff_model = writer.serialize(archive)
117 return PiffSerializationModel(
118 piff=piff_model,
119 stamp_size=self._stamp_size,
120 bounds=self._bounds.serialize(),
121 )
123 @classmethod
124 def deserialize(
125 cls, model: PiffSerializationModel, archive: serialization.InputArchive[Any]
126 ) -> PiffWrapper:
127 """Deserialize the PSF from an archive.
129 This method is intended to be usable as the callback function passed to
130 `.serialization.InputArchive.deserialize_pointer`.
131 """
132 try:
133 from piff import PSF
134 from piff.config import PiffLogger
135 except ImportError:
136 raise ArchiveReadError("Failed to import piff.") from None
138 reader = _ArchivePiffReader(model.piff, archive)
139 impl = PSF._read(reader, "piff", PiffLogger(_LOG))
140 return cls(impl, bounds=Bounds.deserialize(model.bounds), stamp_size=model.stamp_size)
142 @staticmethod
143 def _get_archive_tree_type(
144 pointer_type: type[pydantic.BaseModel],
145 ) -> type[PiffSerializationModel]:
146 """Return the serialization model type for this object for an archive
147 type that uses the given pointer type.
148 """
149 return PiffSerializationModel
151 @contextmanager
152 def _without_stars(self) -> Iterator[None]:
153 """Temporarily drop the embedded list of stars used to fit the PSF.
155 Notes
156 -----
157 By default Piff saves the list of stars (including postage stamps) used
158 to fit the PSF, which makes the serialized form much larger. But the
159 upstream Piff serialization code recognizes the case where that
160 ``stars`` attribute has been deleted and serializes everything else.
162 Unfortunately, to date, Rubin's pickle-based Piff serialization instead
163 just deletes the postage stamp image attributes from inside the Piff
164 ``stars`` list, which is not a state the Piff serialization code
165 handles gracefully. So for now we have to drop the full stars list
166 during serialization if it is present.
167 """
168 if hasattr(self._impl, "stars"):
169 stars = self._impl.stars
170 try:
171 del self._impl.stars
172 yield
173 finally:
174 self._impl.stars = stars
175 else:
176 yield
179# Conventions on public visibility of the serialization types:
180#
181# - We lift and document the outermost Pydantic model type, since that needs to
182# be included directly in the Pydantic models of types that hold a PSF. This
183# type needs to be very clearly documented and named as a *serialization*
184# model, since there are many other kinds of models in play in this package.
185#
186# - We do not lift or document types used in that outermost model, but we do
187# not give them leading underscores, since they aren't really private.
188#
189# - Other utility types do get leading underscores.
192# Piff serialization uses a lot of dictionaries and lists restricted to these
193# basic types.
194type PiffScalar = int | float | str | bool | None
195type PiffDict = dict[str, PiffScalar | list[PiffScalar]]
198class GalSimPixelScaleModel(pydantic.BaseModel, ser_json_inf_nan="constants"):
199 """Model used to serialize `galsim.wcs.PixelScale` instances."""
201 scale: float
202 wcs_type: Literal["pixel_scale"] = "pixel_scale"
205# We expect this discriminated union to grow to include other trivial
206# pixel-to-pixel transforms that get embedded in PSFs. If we someday have to
207# store Piff objects that embed more sophisticated PSFs, we'll hook them into
208# the AST-based coordinate transform system instead, but as long as we're just
209# talking about simple offsets and scalings, that's a lot of extra complexity
210# for very little gain.
211type GalSimLocalWcsModel = Annotated[GalSimPixelScaleModel, pydantic.Field(discriminator="wcs_type")]
214class PiffTableModel(pydantic.BaseModel, ser_json_inf_nan="constants"):
215 """Serialization model used to embed a reference to a binary-data table in
216 a Piff serialization's JSON-like data.
217 """
219 metadata: PiffDict
220 table: serialization.TableModel
223class PiffObjectModel(pydantic.BaseModel, ser_json_inf_nan="constants"):
224 """General-purpose serialization model used for various Piff objects."""
226 structs: dict[str, PiffDict] = pydantic.Field(default_factory=dict, exclude_if=operator.not_)
227 tables: dict[str, PiffTableModel] = pydantic.Field(default_factory=dict, exclude_if=operator.not_)
228 wcs: dict[str, GalSimLocalWcsModel] = pydantic.Field(default_factory=dict, exclude_if=operator.not_)
229 objects: dict[str, PiffObjectModel] = pydantic.Field(default_factory=dict, exclude_if=operator.not_)
232class PiffSerializationModel(serialization.ArchiveTree):
233 """Serialization model for a Piff PSF."""
235 piff: PiffObjectModel = pydantic.Field(description="The Piff PSF object itself.")
237 stamp_size: int = pydantic.Field(
238 description="Width of the (square) images returned by this PSF's methods."
239 )
241 bounds: SerializableBounds = pydantic.Field(
242 description="The bounds object that represents the PSF's validity region."
243 )
246class _ArchivePiffWriter:
247 """An adapter from the Piff serialization interface to the
248 `.serialization.OutputArchive` class.
250 Notes
251 -----
252 Piff has its own simple serialization framework (contributed upstream by
253 Rubin DM) that maps everything to dictionaries, structured numpy arrays,
254 and a library of GalSim WCS objects, with the native implementation writing
255 standalone FITS files. That mostly maps nicely to the `lsst.images`
256 archive system, but we don't get to leverage any Pydantic validation or
257 JSON schema functionality since we only get opaque dictionaries from Piff.
259 See `piff.FitsWriter` for most method documentation; this class is designed
260 to mimic it exactly (the Piff authors prefer to just use duck-typing rather
261 than ABCs or protocols for interface definition).
262 """
264 def __init__(self, base_name: str = ""):
265 self._base_name = base_name
266 self.structs: dict[str, PiffDict] = {}
267 self.tables: dict[str, tuple[np.ndarray, PiffDict]] = {}
268 self.wcs_models: dict[str, GalSimLocalWcsModel] = {}
269 self.writers: dict[str, _ArchivePiffWriter] = {}
271 def write_struct(self, name: str, struct: PiffDict) -> None:
272 self.structs[name] = {k: self._to_builtin(v) for k, v in struct.items()}
274 def write_table(self, name: str, array: np.ndarray, metadata: PiffDict | None = None) -> None:
275 self.tables[name] = (array, metadata or {})
277 def write_wcs_map(
278 self, name: str, wcs_map: dict[int, galsim.wcs.BaseWCS], pointing: galsim.CelestialCoord | None
279 ) -> None:
280 import galsim.wcs
282 match wcs_map:
283 case {0: galsim.wcs.PixelScale() as wcs} if pointing is None:
284 self.wcs_models[name] = GalSimPixelScaleModel(scale=wcs.scale)
285 case _:
286 raise NotImplementedError("PSFs with complex embedded WCSs are not supported.")
288 @contextmanager
289 def nested(self, name: str) -> Iterator[_ArchivePiffWriter]:
290 nested = _ArchivePiffWriter(self.get_full_name(name))
291 yield nested
292 self.writers[name] = nested
294 def get_full_name(self, name: str) -> str:
295 return f"{self._base_name}/{name}"
297 def serialize(self, archive: serialization.OutputArchive[Any]) -> PiffObjectModel:
298 """Serialize to an archive.
300 This method is intended to be used as the callable passed to
301 `.serialization.OutputArchive.serialize_direct` and
302 `.serialization.OutputArchive.serialize_pointer`, after first passing
303 this writer to a Piff object's ``write`` or ``_write`` method.
304 """
305 model = PiffObjectModel()
306 for name, struct in self.structs.items():
307 model.structs[name] = struct
308 for name, (array, metadata) in self.tables.items():
309 model.tables[name] = PiffTableModel(
310 metadata=metadata,
311 table=archive.add_structured_array(
312 array, name=name, update_header=lambda header: header.update(metadata)
313 ),
314 )
315 for name, wcs_model in self.wcs_models.items():
316 model.wcs[name] = wcs_model
317 for name, writer in self.writers.items():
318 model.objects[name] = archive.serialize_direct(name, writer.serialize)
319 return model
321 @staticmethod
322 def _to_builtin(val: Any) -> PiffScalar:
323 match val:
324 case np.integer():
325 return int(val)
326 case np.floating():
327 return float(val)
328 case np.str_():
329 return str(val)
330 return val
333class _ArchivePiffReader:
334 """An adapter from the Piff serialization interface to the
335 `.serialization.InputArchive` class.
337 See `ArchivePiffWriter` for additional notes.
338 """
340 def __init__(
341 self, object_model: PiffObjectModel, archive: serialization.InputArchive[Any], base_name: str = ""
342 ):
343 self._model = object_model
344 self._archive = archive
345 self._base_name = base_name
347 def read_struct(self, name: str) -> PiffDict | None:
348 return self._model.structs.get(name)
350 def read_table(self, name: str, metadata: PiffDict | None = None) -> np.ndarray | None:
351 table_model = self._model.tables.get(name)
352 if table_model is None:
353 return None
354 if metadata is not None:
355 metadata.update(table_model.metadata)
356 return self._archive.get_structured_array(
357 table_model.table, strip_header=astropy.io.fits.Header.clear
358 )
360 def read_wcs_map(
361 self, name: str, logger: piff.config.LoggerWrapper
362 ) -> tuple[dict[int, galsim.wcs.BaseWCS] | None, galsim.CelestialCoord | None]:
363 import galsim.wcs
365 match self._model.wcs.get(name):
366 case GalSimPixelScaleModel(scale=scale):
367 return {0: galsim.wcs.PixelScale(scale)}, None
368 case None:
369 return None, None
370 case unexpected:
371 raise serialization.ArchiveReadError(
372 f"{self.get_full_name(name)} should be a WCS or WCS map, not {unexpected!r}."
373 )
375 @contextmanager
376 def nested(self, name: str) -> Iterator[_ArchivePiffReader]:
377 nested_model = self._model.objects[name]
378 yield _ArchivePiffReader(nested_model, self._archive, self.get_full_name(name))
380 def get_full_name(self, name: str) -> str:
381 return f"{self._base_name}/{name}"