Coverage for python / lsst / images / cells / _psf.py: 30%
115 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:35 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:35 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14__all__ = ("CellPointSpreadFunction", "CellPointSpreadFunctionSerializationModel")
16from functools import cached_property
17from typing import TYPE_CHECKING, Any, Literal, overload
19import numpy as np
20import pydantic
22from .._cell_grid import CellGrid, CellGridBounds, CellIJ
23from .._geom import YX, Bounds, BoundsError, Box
24from .._image import Image
25from ..psfs import PointSpreadFunction
26from ..serialization import ArchiveTree, ArrayReferenceModel, InputArchive, OutputArchive
27from ..utils import round_half_up
29if TYPE_CHECKING:
30 try:
31 from lsst.cell_coadds import StitchedPsf
32 except ImportError:
33 type StitchedPsf = Any # type: ignore[no-redef]
36class CellPointSpreadFunction(PointSpreadFunction):
37 """A PSF model that is at least approximately constant over cells.
39 Parameters
40 ----------
41 array
42 A 4-d array of PSF kernel images with with shape
43 ``(n_cells_y, n_cells_x, psf_shape_y, psf_shape_x)``.
44 bounds
45 Description of the cell grid and any missing cells. Array entries for
46 missing cells should be NaN.
47 resampling_kernel
48 Name of the resampling kernel to use when shifting the kernel image
49 into the stellar image.
51 Notes
52 -----
53 Unlike most PSF model types, `CellPointSpreadFunction` can be subset via
54 slicing:
56 - a bounding `.Box` for a subimage, which returns a new PSF with only the
57 cells that cover that subimage;
58 - a `CellIJ` index, which returns the kernel image for that cell.
59 -
60 """
62 def __init__(
63 self,
64 array: np.ndarray,
65 bounds: CellGridBounds,
66 resampling_kernel: Literal["lanczos3", "lanczos5"] = "lanczos5",
67 ):
68 self._array = array
69 self._bounds: CellGridBounds = bounds
70 self._resampling_kernel = resampling_kernel
72 @property
73 def grid(self) -> CellGrid:
74 """The grid that defines the PSF's cells (`CellGrid`).
76 Notes
77 -----
78 This is usually (but is not guaranteed to be) the grid for a full
79 patch, even when the PSF only covers a subimage.
80 """
81 return self._bounds.grid
83 @property
84 def bounds(self) -> CellGridBounds:
85 """The bounds where the PSF can be evaluated (`CellGridBounds`)."""
86 return self._bounds
88 @cached_property
89 def kernel_bbox(self) -> Box:
90 sy, sx = self._array.shape[2:]
91 ry = sy // 2
92 rx = sx // 2
93 return Box.factory[-ry : ry + 1, -rx : rx + 1]
95 @overload
96 def __getitem__(self, bbox: Box) -> CellPointSpreadFunction: ... 96 ↛ exitline 96 didn't return from function '__getitem__' because
97 @overload
98 def __getitem__(self, index: CellIJ) -> Image: ... 98 ↛ exitline 98 didn't return from function '__getitem__' because
100 def __getitem__(self, key: Box | CellIJ) -> CellPointSpreadFunction | Image:
101 match key:
102 case CellIJ():
103 if key in self._bounds.missing:
104 raise BoundsError(f"Cell {key} is missing for this PSF.")
105 index = key - self._bounds.grid_start
106 try:
107 return Image(self._array[index.i, index.j], bbox=self.kernel_bbox)
108 except IndexError:
109 raise BoundsError(f"Cell {key} is out of bounds for this PSF.")
110 case Box():
111 bounds, slices = self._subset_impl(self._bounds, key)
112 return CellPointSpreadFunction(self._array[slices.y, slices.x, ...].copy(), bounds=bounds)
113 case _:
114 raise TypeError("Invalid argument for CellPointSpreadFunction.__getitem__.")
116 def compute_kernel_image(self, *, x: float, y: float) -> Image:
117 index = self.grid.index_of(x=round(x), y=round(y))
118 try:
119 return self[index]
120 except Exception as err:
121 err.add_note(f"Evaluating cell PSF at x={x}, y={y}.")
122 raise
124 def compute_stellar_image(self, *, x: float, y: float) -> Image:
125 try:
126 from lsst.afw.math import offsetImage
127 from lsst.geom import Point2I
128 except ImportError as err:
129 err.add_note("CellPointSpreadFunction.compute_stellar_image cannot be used without lsst.afw.")
130 raise
131 ix = round_half_up(x)
132 dx = x - ix
133 iy = round_half_up(y)
134 dy = y - iy
135 kernel_image = self.compute_kernel_image(x=x, y=y)
136 if dx != 0 or dy != 0:
137 legacy_result = offsetImage(kernel_image.to_legacy(), dx, dy, self._resampling_kernel, 5)
138 else:
139 # This branch is equal to the other up to round-off error, but it's
140 # convenient nonetheless because it maintains exact compatibility
141 # with the legacy implementation, where the caching mechanism
142 # causes the offsetImage call to be skipped.
143 legacy_result = kernel_image.to_legacy()
144 legacy_result.setXY0(Point2I(legacy_result.getX0() + ix, legacy_result.getY0() + iy))
145 return Image.from_legacy(legacy_result)
147 def compute_stellar_bbox(self, *, x: float, y: float) -> Box:
148 # This is obviously inefficient, but it's what afw does, and hence the
149 # only easy way we've got to replicate what afw does.
150 return self.compute_stellar_image(x=x, y=y).bbox
152 def serialize(self, archive: OutputArchive[Any]) -> CellPointSpreadFunctionSerializationModel:
153 array_model = archive.add_array(self._array)
154 return CellPointSpreadFunctionSerializationModel(array=array_model, bounds=self.bounds)
156 @staticmethod
157 def deserialize(
158 model: CellPointSpreadFunctionSerializationModel,
159 archive: InputArchive[Any],
160 *,
161 bbox: Box | None = None,
162 ) -> CellPointSpreadFunction:
163 bounds = model.bounds
164 if bbox is not None:
165 bounds, slices = CellPointSpreadFunction._subset_impl(bounds, bbox)
166 array = archive.get_array(model.array, slices=slices)
167 else:
168 array = archive.get_array(model.array)
169 return CellPointSpreadFunction(array, bounds)
171 @classmethod
172 def from_legacy(cls, legacy_psf: Any, bounds: Bounds | None = None) -> CellPointSpreadFunction:
173 # 'bounds' is accepted as an argument only for base-class
174 # compatibility; we always generate our own bounds.
175 from lsst.geom import Box2I
177 grid = CellGrid.from_legacy(legacy_psf.grid)
178 # Start with bounds that cover the entire grid.
179 bounds = CellGridBounds(grid=grid, bbox=grid.bbox)
180 # Shrink bounds to just the bbox where we have data.
181 legacy_bbox = Box2I()
182 for legacy_index in legacy_psf.images.keys():
183 legacy_bbox.include(legacy_psf.grid.bbox_of(legacy_index))
184 bounds = bounds[Box.from_legacy(legacy_bbox)]
185 # Allocate and populate the array.
186 psf_image_size_y, psf_image_size_x = legacy_psf.images.arbitrary.array.shape
187 array = np.zeros(
188 (
189 bounds.bbox.y.size // grid.cell_shape.y,
190 bounds.bbox.x.size // grid.cell_shape.x,
191 psf_image_size_y,
192 psf_image_size_x,
193 ),
194 dtype=np.float64,
195 )
196 missing: set[CellIJ] = set()
197 for cell_index in bounds.cell_indices():
198 legacy_index = cell_index.to_legacy()
199 array_index = cell_index - bounds.grid_start
200 if legacy_index in legacy_psf.images:
201 array[array_index.i, array_index.j] = legacy_psf.images[legacy_index].array
202 else:
203 array[array_index.i, array_index.j] = np.nan
204 missing.add(cell_index)
205 # Modify the bounds one last time to account for missing cells.
206 bounds = CellGridBounds(grid=grid, bbox=bounds.bbox, missing=frozenset(missing))
207 return cls(array, bounds=bounds)
209 @staticmethod
210 def _subset_impl(bounds: CellGridBounds, bbox: Box) -> tuple[CellGridBounds, YX[slice]]:
211 subset_bounds = bounds[bbox]
212 start = subset_bounds.grid_start - bounds.grid_start
213 stop = subset_bounds.grid_stop - bounds.grid_start
214 return subset_bounds, YX(y=slice(start.i, stop.i), x=slice(start.j, stop.j))
217class CellPointSpreadFunctionSerializationModel(ArchiveTree):
218 """Model used to serialize CellPointSpreadFunction objects."""
220 array: ArrayReferenceModel = pydantic.Field(
221 description=(
222 "A 4-d array of PSF kernel images with with shape "
223 "(n_cells_y, n_cells_x, psf_shape_y, psf_shape_x)."
224 )
225 )
226 bounds: CellGridBounds = pydantic.Field(
227 description=(
228 "Description of the cell grid and any missing cells. Array entries for "
229 "missing cells should be NaN."
230 )
231 )