Coverage for python / lsst / images / cells / _psf.py: 30%

115 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-07 08:34 +0000

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14__all__ = ("CellPointSpreadFunction", "CellPointSpreadFunctionSerializationModel") 

15 

16from functools import cached_property 

17from typing import TYPE_CHECKING, Any, Literal, overload 

18 

19import numpy as np 

20import pydantic 

21 

22from .._cell_grid import CellGrid, CellGridBounds, CellIJ 

23from .._geom import YX, Bounds, BoundsError, Box 

24from .._image import Image 

25from ..psfs import PointSpreadFunction 

26from ..serialization import ArchiveTree, ArrayReferenceModel, InputArchive, OutputArchive 

27from ..utils import round_half_up 

28 

29if TYPE_CHECKING: 

30 try: 

31 from lsst.cell_coadds import StitchedPsf 

32 except ImportError: 

33 type StitchedPsf = Any # type: ignore[no-redef] 

34 

35 

36class CellPointSpreadFunction(PointSpreadFunction): 

37 """A PSF model that is at least approximately constant over cells. 

38 

39 Parameters 

40 ---------- 

41 array 

42 A 4-d array of PSF kernel images with with shape 

43 ``(n_cells_y, n_cells_x, psf_shape_y, psf_shape_x)``. 

44 bounds 

45 Description of the cell grid and any missing cells. Array entries for 

46 missing cells should be NaN. 

47 resampling_kernel 

48 Name of the resampling kernel to use when shifting the kernel image 

49 into the stellar image. 

50 

51 Notes 

52 ----- 

53 Unlike most PSF model types, `CellPointSpreadFunction` can be subset via 

54 slicing: 

55 

56 - a bounding `.Box` for a subimage, which returns a new PSF with only the 

57 cells that cover that subimage; 

58 - a `CellIJ` index, which returns the kernel image for that cell. 

59 - 

60 """ 

61 

62 def __init__( 

63 self, 

64 array: np.ndarray, 

65 bounds: CellGridBounds, 

66 resampling_kernel: Literal["lanczos3", "lanczos5"] = "lanczos5", 

67 ): 

68 self._array = array 

69 self._bounds: CellGridBounds = bounds 

70 self._resampling_kernel = resampling_kernel 

71 

72 @property 

73 def grid(self) -> CellGrid: 

74 """The grid that defines the PSF's cells (`CellGrid`). 

75 

76 Notes 

77 ----- 

78 This is usually (but is not guaranteed to be) the grid for a full 

79 patch, even when the PSF only covers a subimage. 

80 """ 

81 return self._bounds.grid 

82 

83 @property 

84 def bounds(self) -> CellGridBounds: 

85 """The bounds where the PSF can be evaluated (`CellGridBounds`).""" 

86 return self._bounds 

87 

88 @cached_property 

89 def kernel_bbox(self) -> Box: 

90 sy, sx = self._array.shape[2:] 

91 ry = sy // 2 

92 rx = sx // 2 

93 return Box.factory[-ry : ry + 1, -rx : rx + 1] 

94 

95 @overload 

96 def __getitem__(self, bbox: Box) -> CellPointSpreadFunction: ... 96 ↛ exitline 96 didn't return from function '__getitem__' because

97 @overload 

98 def __getitem__(self, index: CellIJ) -> Image: ... 98 ↛ exitline 98 didn't return from function '__getitem__' because

99 

100 def __getitem__(self, key: Box | CellIJ) -> CellPointSpreadFunction | Image: 

101 match key: 

102 case CellIJ(): 

103 if key in self._bounds.missing: 

104 raise BoundsError(f"Cell {key} is missing for this PSF.") 

105 index = key - self._bounds.grid_start 

106 try: 

107 return Image(self._array[index.i, index.j], bbox=self.kernel_bbox) 

108 except IndexError: 

109 raise BoundsError(f"Cell {key} is out of bounds for this PSF.") 

110 case Box(): 

111 bounds, slices = self._subset_impl(self._bounds, key) 

112 return CellPointSpreadFunction(self._array[slices.y, slices.x, ...].copy(), bounds=bounds) 

113 case _: 

114 raise TypeError("Invalid argument for CellPointSpreadFunction.__getitem__.") 

115 

116 def compute_kernel_image(self, *, x: float, y: float) -> Image: 

117 index = self.grid.index_of(x=round(x), y=round(y)) 

118 try: 

119 return self[index] 

120 except Exception as err: 

121 err.add_note(f"Evaluating cell PSF at x={x}, y={y}.") 

122 raise 

123 

124 def compute_stellar_image(self, *, x: float, y: float) -> Image: 

125 try: 

126 from lsst.afw.math import offsetImage 

127 from lsst.geom import Point2I 

128 except ImportError as err: 

129 err.add_note("CellPointSpreadFunction.compute_stellar_image cannot be used without lsst.afw.") 

130 raise 

131 ix = round_half_up(x) 

132 dx = x - ix 

133 iy = round_half_up(y) 

134 dy = y - iy 

135 kernel_image = self.compute_kernel_image(x=x, y=y) 

136 if dx != 0 or dy != 0: 

137 legacy_result = offsetImage(kernel_image.to_legacy(), dx, dy, self._resampling_kernel, 5) 

138 else: 

139 # This branch is equal to the other up to round-off error, but it's 

140 # convenient nonetheless because it maintains exact compatibility 

141 # with the legacy implementation, where the caching mechanism 

142 # causes the offsetImage call to be skipped. 

143 legacy_result = kernel_image.to_legacy() 

144 legacy_result.setXY0(Point2I(legacy_result.getX0() + ix, legacy_result.getY0() + iy)) 

145 return Image.from_legacy(legacy_result) 

146 

147 def compute_stellar_bbox(self, *, x: float, y: float) -> Box: 

148 # This is obviously inefficient, but it's what afw does, and hence the 

149 # only easy way we've got to replicate what afw does. 

150 return self.compute_stellar_image(x=x, y=y).bbox 

151 

152 def serialize(self, archive: OutputArchive[Any]) -> CellPointSpreadFunctionSerializationModel: 

153 array_model = archive.add_array(self._array) 

154 return CellPointSpreadFunctionSerializationModel(array=array_model, bounds=self.bounds) 

155 

156 @staticmethod 

157 def deserialize( 

158 model: CellPointSpreadFunctionSerializationModel, 

159 archive: InputArchive[Any], 

160 *, 

161 bbox: Box | None = None, 

162 ) -> CellPointSpreadFunction: 

163 bounds = model.bounds 

164 if bbox is not None: 

165 bounds, slices = CellPointSpreadFunction._subset_impl(bounds, bbox) 

166 array = archive.get_array(model.array, slices=slices) 

167 else: 

168 array = archive.get_array(model.array) 

169 return CellPointSpreadFunction(array, bounds) 

170 

171 @classmethod 

172 def from_legacy(cls, legacy_psf: Any, bounds: Bounds | None = None) -> CellPointSpreadFunction: 

173 # 'bounds' is accepted as an argument only for base-class 

174 # compatibility; we always generate our own bounds. 

175 from lsst.geom import Box2I 

176 

177 grid = CellGrid.from_legacy(legacy_psf.grid) 

178 # Start with bounds that cover the entire grid. 

179 bounds = CellGridBounds(grid=grid, bbox=grid.bbox) 

180 # Shrink bounds to just the bbox where we have data. 

181 legacy_bbox = Box2I() 

182 for legacy_index in legacy_psf.images.keys(): 

183 legacy_bbox.include(legacy_psf.grid.bbox_of(legacy_index)) 

184 bounds = bounds[Box.from_legacy(legacy_bbox)] 

185 # Allocate and populate the array. 

186 psf_image_size_y, psf_image_size_x = legacy_psf.images.arbitrary.array.shape 

187 array = np.zeros( 

188 ( 

189 bounds.bbox.y.size // grid.cell_shape.y, 

190 bounds.bbox.x.size // grid.cell_shape.x, 

191 psf_image_size_y, 

192 psf_image_size_x, 

193 ), 

194 dtype=np.float64, 

195 ) 

196 missing: set[CellIJ] = set() 

197 for cell_index in bounds.cell_indices(): 

198 legacy_index = cell_index.to_legacy() 

199 array_index = cell_index - bounds.grid_start 

200 if legacy_index in legacy_psf.images: 

201 array[array_index.i, array_index.j] = legacy_psf.images[legacy_index].array 

202 else: 

203 array[array_index.i, array_index.j] = np.nan 

204 missing.add(cell_index) 

205 # Modify the bounds one last time to account for missing cells. 

206 bounds = CellGridBounds(grid=grid, bbox=bounds.bbox, missing=frozenset(missing)) 

207 return cls(array, bounds=bounds) 

208 

209 @staticmethod 

210 def _subset_impl(bounds: CellGridBounds, bbox: Box) -> tuple[CellGridBounds, YX[slice]]: 

211 subset_bounds = bounds[bbox] 

212 start = subset_bounds.grid_start - bounds.grid_start 

213 stop = subset_bounds.grid_stop - bounds.grid_start 

214 return subset_bounds, YX(y=slice(start.i, stop.i), x=slice(start.j, stop.j)) 

215 

216 

217class CellPointSpreadFunctionSerializationModel(ArchiveTree): 

218 """Model used to serialize CellPointSpreadFunction objects.""" 

219 

220 array: ArrayReferenceModel = pydantic.Field( 

221 description=( 

222 "A 4-d array of PSF kernel images with with shape " 

223 "(n_cells_y, n_cells_x, psf_shape_y, psf_shape_x)." 

224 ) 

225 ) 

226 bounds: CellGridBounds = pydantic.Field( 

227 description=( 

228 "Description of the cell grid and any missing cells. Array entries for " 

229 "missing cells should be NaN." 

230 ) 

231 )