Coverage for python / lsst / cell_coadds / _coadd_ap_corr_map.py: 23%

78 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 09:13 +0000

1# This file is part of cell_coadds. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22from __future__ import annotations 

23 

24__all__ = ( 

25 "CoaddApCorrMapStacker", 

26 "EMPTY_AP_CORR_MAP", 

27) 

28 

29 

30from collections.abc import Iterable 

31 

32import numpy as np 

33from frozendict import frozendict 

34 

35from lsst.afw.image import ApCorrMap 

36from lsst.afw.math import BoundedField 

37from lsst.geom import Point2D 

38 

39from .typing_helpers import SingleCellCoaddApCorrMap 

40 

41EMPTY_AP_CORR_MAP: SingleCellCoaddApCorrMap = frozendict() 

42"""Default empty aperture correction map for a single cell coadd.""" 

43 

44 

45class CoaddApCorrMapStacker: 

46 """Online aperture correction map for a cell-based coadd. 

47 

48 This class is responsible for implementing the logic to coadd the 

49 aperture correction values and their uncertainties. 

50 

51 Parameters 

52 ---------- 

53 evaluation_point : `~lsst.geom.Point2D` 

54 The point at which the input aperture correction is evaluated. 

55 do_coadd_inverse_ap_corr : `bool`, optional 

56 If True, the inverse aperture correction is applied to the coadd. 

57 

58 Notes 

59 ----- 

60 At least one of class variables are set dynamically the first time the 

61 ``add`` method is called on any instance of this class. This behavior 

62 is based on the practical assumption that all ``ApCorrMap`` instances will 

63 have the same set of field names during the entire processing. A schema 

64 is therefore not expected at the time of initialization. 

65 """ 

66 

67 def __init__(self, evaluation_point: Point2D, do_coadd_inverse_ap_corr: bool = True) -> None: 

68 # Initialize frozen attributes. 

69 self._evaluation_point = evaluation_point 

70 self._do_coadd_inverse_ap_corr = do_coadd_inverse_ap_corr 

71 

72 # Initialize mutable attributes. 

73 self._total_weight = 0.0 

74 self._intermediate_ap_corr_map: dict[str, float] = {} 

75 self._ap_corr_names: Iterable[str] = () 

76 # An iterable of algorithm names that have aperture correction values. 

77 # This is set when the first time the add method is called on any 

78 # instance. 

79 

80 def _setup_ap_corr_names(self, ap_corr_map: ApCorrMap) -> None: 

81 """Set up the aperture correction name set. 

82 

83 Parameters 

84 ---------- 

85 ap_corr_map : `~lsst.meas.base.ApCorrMap` 

86 The aperture correction map to add. 

87 

88 Raises 

89 ------ 

90 RuntimeError 

91 Raised if the keys in `ap_corr_map` do not end in "_instFlux" or 

92 "_instFluxErr". 

93 """ 

94 ap_corr_name_set = set() 

95 for field_name in ap_corr_map: 

96 algorithm_name, suffix = field_name.split("_instFlux") 

97 if suffix not in ("", "Err"): 

98 raise RuntimeError(f"Invalid field name {field_name} in aperture correction map.") 

99 

100 ap_corr_name_set.add(algorithm_name) 

101 

102 self._ap_corr_names = tuple(sorted(ap_corr_name_set)) 

103 

104 @property 

105 def evaluation_point(self) -> Point2D: 

106 """The point at which the aperture correction is evaluated.""" 

107 return self._evaluation_point 

108 

109 @property 

110 def do_coadd_inverse_ap_corr(self) -> bool: 

111 """If True, the inverse aperture correction is applied to the coadd.""" 

112 return self._do_coadd_inverse_ap_corr 

113 

114 @property 

115 def ap_corr_names(self) -> Iterable[str]: 

116 """Iterable of algorithm names that have aperture correction values.""" 

117 return self._ap_corr_names 

118 

119 @property 

120 def total_weight(self) -> float: 

121 """The total weight of the aperture correction map.""" 

122 return self._total_weight 

123 

124 def add(self, ap_corr_map: ApCorrMap, weight: float) -> None: 

125 """Add an aperture correction map to the coadd. 

126 

127 Parameters 

128 ---------- 

129 ap_corr_map : `~lsst.meas.base.ApCorrMap` 

130 The aperture correction map to add. 

131 weight : `float` 

132 The weight to apply to the aperture correction map. 

133 

134 Raises 

135 ------ 

136 RuntimeError 

137 Raised if the keys in `ap_corr_map` do not end in "_instFlux" or 

138 "_instFluxErr". 

139 ValueError 

140 Raised if the aperture correction value or its error is missing. 

141 """ 

142 if not self.ap_corr_names: 

143 # Lazily initialize the aperture correction name set. 

144 self._setup_ap_corr_names(ap_corr_map) 

145 

146 if not self._intermediate_ap_corr_map: 

147 self._intermediate_ap_corr_map = dict.fromkeys( 

148 [f"{algorithm_name}_instFlux" for algorithm_name in self.ap_corr_names] 

149 + [f"{algorithm_name}_instFluxErr" for algorithm_name in self.ap_corr_names], 

150 0.0, 

151 ) 

152 

153 # Accumulate the aperture correction values in a temporary dict. 

154 # This is so that if we error out in the middle, we don't leave the 

155 # aperture correction map in an inconsistent state. 

156 temp_ap_corr_map = dict.fromkeys(self._intermediate_ap_corr_map, 0.0) 

157 

158 for algorithm_name in self.ap_corr_names: 

159 # Accumulate the aperture correction values. 

160 ap_corr_field: BoundedField | None 

161 if (ap_corr_field := ap_corr_map.get(f"{algorithm_name}_instFlux")) is None: 

162 ap_corr_value = np.nan 

163 else: 

164 ap_corr_value = ap_corr_field.evaluate(self.evaluation_point) 

165 

166 # Calculate the term to accumulate depending on the boolean. 

167 if self.do_coadd_inverse_ap_corr: 

168 if ap_corr_value == 0: 

169 raise ValueError("This should not have happened. ap_corr_value is zero.") 

170 else: 

171 term = weight / ap_corr_value 

172 else: 

173 term = weight * ap_corr_value 

174 

175 temp_ap_corr_map[f"{algorithm_name}_instFlux"] = term 

176 

177 # Accumulate the aperture correction error values. 

178 ap_corr_err_field: BoundedField | None 

179 if (ap_corr_err_field := ap_corr_map.get(f"{algorithm_name}_instFluxErr")) is None: 

180 ap_corr_err_value = np.nan 

181 else: 

182 ap_corr_err_value = ap_corr_err_field.evaluate(self.evaluation_point) 

183 

184 # Calculate the term to accumulate depending on the boolean. 

185 if self.do_coadd_inverse_ap_corr: 

186 term = (weight * ap_corr_err_value) ** 2 / ap_corr_value**4 

187 else: 

188 term = (weight * ap_corr_err_value) ** 2 

189 

190 temp_ap_corr_map[f"{algorithm_name}_instFluxErr"] = term 

191 

192 # Update the intermediate aperture correction map. 

193 for key in self._intermediate_ap_corr_map: 

194 self._intermediate_ap_corr_map[key] += temp_ap_corr_map[key] 

195 

196 # Add the weight to the total weight. 

197 self._total_weight += weight 

198 

199 @property 

200 def final_ap_corr_map(self) -> SingleCellCoaddApCorrMap: 

201 """Final coadded aperture correction map. 

202 

203 This should be called after all aperture correction maps have been 

204 added. 

205 

206 Raises 

207 ------ 

208 RuntimeError 

209 Raised if the total weight is zero. 

210 """ 

211 if self.total_weight == 0 or not self.ap_corr_names: 

212 raise RuntimeError("Cannot get an empty aperture correction map.") 

213 

214 final_ap_corr_map = dict.fromkeys(self._intermediate_ap_corr_map, 0.0) 

215 

216 # The transformation equation is different for the aperture correction 

217 # values and their uncertainties and it also depends on whether we 

218 # accumulate the aperture corrections or their inverse. 

219 

220 if self.do_coadd_inverse_ap_corr: 

221 for algorithm_name in self.ap_corr_names: 

222 if ( 

223 inverse_ap_corr_value := self._intermediate_ap_corr_map[f"{algorithm_name}_instFlux"] 

224 ) > 0: 

225 final_ap_corr_map[f"{algorithm_name}_instFlux"] = ( 

226 self.total_weight / inverse_ap_corr_value 

227 ) 

228 final_ap_corr_map[f"{algorithm_name}_instFluxErr"] = ( 

229 final_ap_corr_map[f"{algorithm_name}_instFlux"] ** 2 

230 * np.sqrt(self._intermediate_ap_corr_map[f"{algorithm_name}_instFluxErr"]) 

231 / self.total_weight 

232 ) 

233 else: 

234 for algorithm_name in self.ap_corr_names: 

235 final_ap_corr_map[f"{algorithm_name}_instFlux"] = ( 

236 self._intermediate_ap_corr_map[f"{algorithm_name}_instFlux"] / self.total_weight 

237 ) 

238 final_ap_corr_map[f"{algorithm_name}_instFluxErr"] = ( 

239 np.sqrt(self._intermediate_ap_corr_map[f"{algorithm_name}_instFluxErr"]) 

240 / self.total_weight 

241 ) 

242 

243 # Return the finalized (immutable) aperture correction map. 

244 return frozendict(final_ap_corr_map)