Coverage for python / lsst / meas / extensions / piff / piffPsf.py: 23%

104 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-22 09:10 +0000

1# This file is part of meas_extensions_piff. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22__all__ = ["PiffPsf"] 

23 

24import pickle 

25import piff 

26from packaging.version import Version 

27import numpy as np 

28from lsst.afw.typehandling import StorableHelperFactory 

29from lsst.meas.algorithms import ImagePsf 

30from lsst.afw.image import Image, Color 

31from lsst.geom import Box2I, Point2I, Extent2I, Point2D 

32import logging 

33 

34 

35class PiffPsf(ImagePsf): 

36 _factory = StorableHelperFactory( 

37 "lsst.meas.extensions.piff.piffPsf", 

38 "PiffPsf" 

39 ) 

40 

41 def __init__(self, width, height, piffResult, log=None): 

42 assert width == height 

43 ImagePsf.__init__(self) 

44 self.width = width 

45 self.height = height 

46 self.dimensions = Extent2I(width, height) 

47 self._piffResult = piffResult 

48 self._averagePosition = None 

49 self._averageColor = None 

50 self.log = log or logging.getLogger(__name__) 

51 

52 try: 

53 color_type = [s.data.properties['colorType'] for s in self._piffResult.stars 

54 if not s.is_flagged and not s.is_reserve] 

55 color_type = list(set(color_type)) 

56 if len(color_type) > 1: 

57 self.log.warning(f"More than one color type was given during training: {color_type}") 

58 self._color_type = None 

59 else: 

60 self._color_type = color_type[0] 

61 except Exception: 

62 self._color_type = None 

63 

64 @property 

65 def piffResult(self): 

66 return self._piffResult 

67 

68 # Storable overrides 

69 

70 def isPersistable(self): 

71 return True 

72 

73 def _getPersistenceName(self): 

74 return "PiffPsf" 

75 

76 def _getPythonModule(self): 

77 return "lsst.meas.extensions.piff.piffPsf" 

78 

79 def _write(self): 

80 return pickle.dumps((self.width, self.height, self._piffResult)) 

81 

82 @staticmethod 

83 def _read(pkl): 

84 width, height, piffResult = pickle.loads(pkl) 

85 # We need to do surgery on pickles created with earlier versions of 

86 # piff when using piff version 1.4 or later. 

87 if Version(piff.version) >= Version("1.4"): 

88 if not hasattr(piffResult, "_num"): 

89 piffResult._num = None 

90 piffResult.model._num = None 

91 piffResult.model._fit_flux = None 

92 piffResult.interp._num = None 

93 if not hasattr(piffResult.model, "_maxk"): 

94 piffResult.model._maxk = 0 

95 if not hasattr(piffResult.model, "_stepk"): 

96 piffResult.model._stepk = 0 

97 return PiffPsf(width, height, piffResult) 

98 

99 # ImagePsf overrides 

100 

101 def __deepcopy__(self, meta=None): 

102 return PiffPsf(self.width, self.height, self._piffResult) 

103 

104 def resized(self, width, height): 

105 assert width == height 

106 return PiffPsf(width, height, self._piffResult) 

107 

108 def _doComputeImage(self, position, color): 

109 return self._doImage(position, center=None, color=color) 

110 

111 def _doComputeKernelImage(self, position, color): 

112 return self._doImage(position, center=True, color=color) 

113 

114 def _doComputeBBox(self, position, color): 

115 return self._doBBox(Point2I(0, 0), center=True) 

116 

117 def getAveragePosition(self): 

118 if self._averagePosition is None: 

119 x = np.mean([s.image_pos.x for s in self._piffResult.stars 

120 if not s.is_flagged and not s.is_reserve]) 

121 y = np.mean([s.image_pos.y for s in self._piffResult.stars 

122 if not s.is_flagged and not s.is_reserve]) 

123 self._averagePosition = Point2D(x, y) 

124 return self._averagePosition 

125 

126 def getAverageColor(self): 

127 if self._averageColor is None: 

128 if 'colorValue' in self._piffResult.interp_property_names: 

129 c = np.nanmean([s.data.properties['colorValue'] for s in self._piffResult.stars 

130 if not s.is_flagged and not s.is_reserve]) 

131 self._averageColor = Color(colorValue=c, colorType=self._color_type) 

132 else: 

133 self._averageColor = Color() # set value to nan. 

134 return self._averageColor 

135 

136 # Internal private methods 

137 

138 def _doImage(self, position, center, color=None): 

139 # Follow Piff conventions for center. 

140 # None => draw as if star at position 

141 # True => draw in center of image 

142 if 'colorValue' in self._piffResult.interp_property_names: 

143 if color is None or color.isIndeterminate(): 

144 meanColor = np.nan 

145 if self._averageColor is None: 

146 meanColor = self.getAverageColor().getColorValue() 

147 else: 

148 meanColor = self._averageColor.getColorValue() 

149 kwargs = {'colorValue': meanColor} 

150 # PSF model need a color information. 

151 # Set to mean color from PSF fit right now. 

152 else: 

153 ctype = color.getColorType() 

154 if self._color_type != color.getColorType(): 

155 raise ValueError(f"Invalid Color type. Need {self._color_type} but {ctype} is provided") 

156 kwargs = {'colorValue': color.getColorValue()} 

157 else: 

158 kwargs = {} 

159 gsimg = self._piffResult.draw( 

160 position.x, position.y, stamp_size=self.width, center=center, **kwargs, 

161 ) 

162 bbox = self._doBBox(position, center) 

163 img = Image(bbox, dtype=np.float64) 

164 img.array[:] = gsimg.array 

165 img.array /= np.sum(img.array) 

166 return img 

167 

168 def _doBBox(self, position, center): 

169 origin = -(self.dimensions//2) 

170 if center is None: 

171 origin = Point2I(position) + origin 

172 return Box2I(Point2I(origin), self.dimensions)