Coverage for python / lsst / meas / extensions / piff / piffPsf.py: 23%
104 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 08:45 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 08:45 +0000
1# This file is part of meas_extensions_piff.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22__all__ = ["PiffPsf"]
24import pickle
25import piff
26from packaging.version import Version
27import numpy as np
28from lsst.afw.typehandling import StorableHelperFactory
29from lsst.meas.algorithms import ImagePsf
30from lsst.afw.image import Image, Color
31from lsst.geom import Box2I, Point2I, Extent2I, Point2D
32import logging
35class PiffPsf(ImagePsf):
36 _factory = StorableHelperFactory(
37 "lsst.meas.extensions.piff.piffPsf",
38 "PiffPsf"
39 )
41 def __init__(self, width, height, piffResult, log=None):
42 assert width == height
43 ImagePsf.__init__(self)
44 self.width = width
45 self.height = height
46 self.dimensions = Extent2I(width, height)
47 self._piffResult = piffResult
48 self._averagePosition = None
49 self._averageColor = None
50 self.log = log or logging.getLogger(__name__)
52 try:
53 color_type = [s.data.properties['colorType'] for s in self._piffResult.stars
54 if not s.is_flagged and not s.is_reserve]
55 color_type = list(set(color_type))
56 if len(color_type) > 1:
57 self.log.warning(f"More than one color type was given during training: {color_type}")
58 self._color_type = None
59 else:
60 self._color_type = color_type[0]
61 except Exception:
62 self._color_type = None
64 @property
65 def piffResult(self):
66 return self._piffResult
68 # Storable overrides
70 def isPersistable(self):
71 return True
73 def _getPersistenceName(self):
74 return "PiffPsf"
76 def _getPythonModule(self):
77 return "lsst.meas.extensions.piff.piffPsf"
79 def _write(self):
80 return pickle.dumps((self.width, self.height, self._piffResult))
82 @staticmethod
83 def _read(pkl):
84 width, height, piffResult = pickle.loads(pkl)
85 # We need to do surgery on pickles created with earlier versions of
86 # piff when using piff version 1.4 or later.
87 if Version(piff.version) >= Version("1.4"):
88 if not hasattr(piffResult, "_num"):
89 piffResult._num = None
90 piffResult.model._num = None
91 piffResult.model._fit_flux = None
92 piffResult.interp._num = None
93 if not hasattr(piffResult.model, "_maxk"):
94 piffResult.model._maxk = 0
95 if not hasattr(piffResult.model, "_stepk"):
96 piffResult.model._stepk = 0
97 return PiffPsf(width, height, piffResult)
99 # ImagePsf overrides
101 def __deepcopy__(self, meta=None):
102 return PiffPsf(self.width, self.height, self._piffResult)
104 def resized(self, width, height):
105 assert width == height
106 return PiffPsf(width, height, self._piffResult)
108 def _doComputeImage(self, position, color):
109 return self._doImage(position, center=None, color=color)
111 def _doComputeKernelImage(self, position, color):
112 return self._doImage(position, center=True, color=color)
114 def _doComputeBBox(self, position, color):
115 return self._doBBox(Point2I(0, 0), center=True)
117 def getAveragePosition(self):
118 if self._averagePosition is None:
119 x = np.mean([s.image_pos.x for s in self._piffResult.stars
120 if not s.is_flagged and not s.is_reserve])
121 y = np.mean([s.image_pos.y for s in self._piffResult.stars
122 if not s.is_flagged and not s.is_reserve])
123 self._averagePosition = Point2D(x, y)
124 return self._averagePosition
126 def getAverageColor(self):
127 if self._averageColor is None:
128 if 'colorValue' in self._piffResult.interp_property_names:
129 c = np.nanmean([s.data.properties['colorValue'] for s in self._piffResult.stars
130 if not s.is_flagged and not s.is_reserve])
131 self._averageColor = Color(colorValue=c, colorType=self._color_type)
132 else:
133 self._averageColor = Color() # set value to nan.
134 return self._averageColor
136 # Internal private methods
138 def _doImage(self, position, center, color=None):
139 # Follow Piff conventions for center.
140 # None => draw as if star at position
141 # True => draw in center of image
142 if 'colorValue' in self._piffResult.interp_property_names:
143 if color is None or color.isIndeterminate():
144 meanColor = np.nan
145 if self._averageColor is None:
146 meanColor = self.getAverageColor().getColorValue()
147 else:
148 meanColor = self._averageColor.getColorValue()
149 kwargs = {'colorValue': meanColor}
150 # PSF model need a color information.
151 # Set to mean color from PSF fit right now.
152 else:
153 ctype = color.getColorType()
154 if self._color_type != color.getColorType():
155 raise ValueError(f"Invalid Color type. Need {self._color_type} but {ctype} is provided")
156 kwargs = {'colorValue': color.getColorValue()}
157 else:
158 kwargs = {}
159 gsimg = self._piffResult.draw(
160 position.x, position.y, stamp_size=self.width, center=center, **kwargs,
161 )
162 bbox = self._doBBox(position, center)
163 img = Image(bbox, dtype=np.float64)
164 img.array[:] = gsimg.array
165 img.array /= np.sum(img.array)
166 return img
168 def _doBBox(self, position, center):
169 origin = -(self.dimensions//2)
170 if center is None:
171 origin = Point2I(position) + origin
172 return Box2I(Point2I(origin), self.dimensions)