Coverage for tests / test_normalizedCalibrationFlux.py: 15%

146 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-25 08:26 +0000

1# This file is part of meas_algorithms. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22import unittest 

23 

24import numpy as np 

25import logging 

26 

27import lsst.afw.image 

28import lsst.afw.table 

29import lsst.utils.tests 

30from lsst.meas.algorithms import NormalizedCalibrationFluxTask, NormalizedCalibrationFluxError 

31 

32 

33class NormalizedCalibrationFluxTestCase(lsst.utils.tests.TestCase): 

34 def setUp(self): 

35 self.ap_name = "base_CircularApertureFlux_12_0" 

36 self.cg_name = "base_CompensatedTophatFlux_12" 

37 self.exposure = lsst.afw.image.ExposureF(1000, 1000) 

38 

39 def _make_task(self, apply_only=False): 

40 """ 

41 Make a normalization task for testing. 

42 

43 Parameters 

44 ---------- 

45 apply_only : `bool`, optional 

46 Configure task in apply_only mode? 

47 

48 Returns 

49 ------- 

50 norm_task : `lsst.meas.algorithms.NormalizedCalibrationFluxTask` 

51 """ 

52 schema = lsst.afw.table.SourceTable.makeMinimalSchema() 

53 

54 config = NormalizedCalibrationFluxTask.ConfigClass() 

55 if apply_only: 

56 config.do_measure_ap_corr = False 

57 

58 for name in [self.ap_name, self.cg_name]: 

59 schema.addField(name + "_instFlux", type=float) 

60 schema.addField(name + "_instFluxErr", type=float) 

61 schema.addField(name + "_flag", type="Flag") 

62 schema.addField("base_Centroid_x", type=float) 

63 schema.addField("base_Centroid_y", type=float) 

64 schema.getAliasMap().set("slot_Centroid", "base_Centroid") 

65 schema.addField("calib_psf_used", type="Flag") 

66 config.measure_ap_corr.refFluxName = self.ap_name 

67 flux_field = self.cg_name + "_instFlux" 

68 err_field = flux_field + "Err" 

69 config.measure_ap_corr.sourceSelector["science"].signalToNoise.fluxField = flux_field 

70 config.measure_ap_corr.sourceSelector["science"].signalToNoise.errField = err_field 

71 

72 norm_task = NormalizedCalibrationFluxTask(schema=schema, config=config) 

73 

74 return norm_task 

75 

76 def _make_catalog(self, schema, num_sources=100, ap_flux_offset=0.0, cg_scale=0.5): 

77 """Make a source catalog with optional flux offset and scaling. 

78 

79 Parameters 

80 ---------- 

81 schema : `lsst.afw.table.Schema` 

82 The table schema. 

83 num_sources : `int` 

84 Number of sources to put into the catalog. 

85 ap_flux_offset : `float` 

86 Constant offset in aperture fluxes due to "background" issues. 

87 cg_scale : `float` 

88 Flux ratio between unnormalized flux and aperture (reference) flux. 

89 """ 

90 source_cat = lsst.afw.table.SourceCatalog(schema) 

91 

92 rng = np.random.Generator(np.random.MT19937(self.seed)) 

93 x = rng.random(num_sources)*self.exposure.getWidth() + self.exposure.getX0() 

94 y = rng.random(num_sources)*self.exposure.getHeight() + self.exposure.getY0() 

95 flux = rng.uniform(low=10000.0, high=100000.0, size=num_sources) 

96 

97 source_cat.resize(num_sources) 

98 source_cat["slot_Centroid_x"] = x 

99 source_cat["slot_Centroid_y"] = y 

100 source_cat["calib_psf_used"] = True 

101 

102 # Make a very simple error model. 

103 noise_per_pix = 2.0 

104 ap_flux_err = np.sqrt((np.pi*12**2.*noise_per_pix)**2. + (flux + ap_flux_offset)) 

105 cg_flux_err = np.sqrt((np.pi*4**2.*noise_per_pix)**2. + flux) 

106 

107 source_cat[self.cg_name + "_instFlux"] = cg_scale*flux 

108 source_cat[self.cg_name + "_instFluxErr"] = cg_scale*cg_flux_err 

109 source_cat[self.cg_name + "_flag"] = np.zeros(num_sources, dtype=bool) 

110 source_cat[self.ap_name + "_instFlux"] = flux + ap_flux_offset 

111 source_cat[self.ap_name + "_instFluxErr"] = ap_flux_err 

112 source_cat[self.ap_name + "_flag"] = np.zeros(num_sources, dtype=bool) 

113 

114 return source_cat 

115 

116 def tearDown(self): 

117 del self.exposure 

118 

119 def testNormalizedCalibrationFlux(self): 

120 self.seed = 12345 

121 norm_task = self._make_task() 

122 catalog = self._make_catalog(norm_task.schema) 

123 

124 ap_corr_map = norm_task.run(catalog=catalog, exposure=self.exposure).ap_corr_map 

125 

126 self.assertEqual( 

127 catalog.schema.getAliasMap().get("slot_CalibFlux"), 

128 norm_task.config.normalized_calibflux_name, 

129 ) 

130 

131 self.assertIn("base_CompensatedTophatFlux_12_instFlux", ap_corr_map) 

132 self.assertIn("base_CompensatedTophatFlux_12_instFluxErr", ap_corr_map) 

133 

134 # The full set should have a 1.0 ratio when the 

135 # aperture flux offset is 0.0 

136 ratio = np.mean(catalog["slot_CalibFlux_instFlux"]/catalog[self.ap_name + "_instFlux"]) 

137 self.assertFloatsAlmostEqual(ratio, 1.0, rtol=1e-10) 

138 

139 # The subset that was used should always have a 1.0 ratio. 

140 used = catalog["apcorr_base_CompensatedTophatFlux_12_used"] 

141 ratio_used = np.mean( 

142 catalog["slot_CalibFlux_instFlux"][used]/catalog[self.ap_name + "_instFlux"][used] 

143 ) 

144 self.assertFloatsAlmostEqual(ratio_used, 1.0, rtol=1e-10) 

145 

146 # The error ratios should match the input and output. 

147 self.assertFloatsAlmostEqual( 

148 catalog["slot_CalibFlux_instFluxErr"]/catalog["slot_CalibFlux_instFlux"], 

149 catalog[self.cg_name + "_instFluxErr"]/catalog[self.cg_name + "_instFlux"], 

150 ) 

151 

152 def testNormalizedCalibrationFluxOffset(self): 

153 self.seed = 12345 

154 norm_task = self._make_task() 

155 

156 for offset in [-10.0, 10.0]: 

157 catalog = self._make_catalog(norm_task.schema, ap_flux_offset=offset) 

158 

159 norm_task.run(catalog=catalog, exposure=self.exposure) 

160 

161 # The full set should not have a 1.0 ratio when the 

162 # aperture flux offset is not 0.0 

163 ratio = np.mean(catalog["slot_CalibFlux_instFlux"]/catalog[self.ap_name + "_instFlux"]) 

164 self.assertFloatsNotEqual(ratio, 1.0) 

165 # Whether the full set is less than or greater than 1.0 depends on 

166 # the sign of the background offset. 

167 if offset < 0.0: 

168 self.assertGreater(ratio, 1.0) 

169 else: 

170 self.assertLess(ratio, 1.0) 

171 

172 # The subset that was used should always have a 1.0 ratio, though 

173 # this may be not quite zero because of the trend in the ratio 

174 # vs flux even at the bright end. 

175 used = catalog["apcorr_base_CompensatedTophatFlux_12_used"] 

176 ratio_used = np.median( 

177 catalog["slot_CalibFlux_instFlux"][used]/catalog[self.ap_name + "_instFlux"][used] 

178 ) 

179 self.assertFloatsAlmostEqual(ratio_used, 1.0, rtol=1e-10) 

180 

181 def testNormalizedCalibrationFluxTooFew(self): 

182 self.seed = 12345 

183 norm_task = self._make_task() 

184 catalog = self._make_catalog(norm_task.schema) 

185 

186 flags = np.ones(len(catalog), dtype=bool) 

187 flags[0] = False 

188 catalog[self.cg_name + "_flag"] = flags 

189 

190 ap_corr_map = norm_task.run(catalog=catalog, exposure=self.exposure).ap_corr_map 

191 

192 self.assertIn("base_CompensatedTophatFlux_12_instFlux", ap_corr_map) 

193 self.assertIn("base_CompensatedTophatFlux_12_instFluxErr", ap_corr_map) 

194 

195 # The full set should have a 1.0 ratio when the 

196 # aperture flux offset is 0.0 

197 ratio = np.mean(catalog["slot_CalibFlux_instFlux"]/catalog[self.ap_name + "_instFlux"]) 

198 self.assertFloatsAlmostEqual(ratio, 1.0, rtol=1e-10) 

199 

200 self.assertTrue(np.all(~flags == catalog["apcorr_base_CompensatedTophatFlux_12_used"])) 

201 

202 def testNormalizedCalibrationFluxApplyOnly(self): 

203 # Run the regular task in default mode first. 

204 self.seed = 12345 

205 norm_task = self._make_task() 

206 catalog_run1 = self._make_catalog(norm_task.schema) 

207 exposure_run1 = self.exposure.clone() 

208 

209 ap_corr_map = norm_task.run(catalog=catalog_run1, exposure=exposure_run1).ap_corr_map 

210 

211 exposure_run1.info.setApCorrMap(ap_corr_map) 

212 

213 # Rerun the task; we need to make sure we have the same input so re-seed. 

214 self.seed = 12345 

215 norm_task2 = self._make_task(apply_only=True) 

216 catalog_run2 = self._make_catalog(norm_task.schema) 

217 

218 ap_corr_map2 = norm_task2.run(catalog=catalog_run2, exposure=exposure_run1).ap_corr_map 

219 

220 # Check that the ap_corr_map and ap_corr_map2 are the same. 

221 self.assertEqual(set(ap_corr_map2.keys()), set(ap_corr_map.keys())) 

222 for key in ap_corr_map.keys(): 

223 self.assertEqual(ap_corr_map2[key], ap_corr_map[key]) 

224 

225 # Check that the slot is set correctly 

226 self.assertEqual( 

227 catalog_run2.schema.getAliasMap().get("slot_CalibFlux"), 

228 norm_task2.config.normalized_calibflux_name, 

229 ) 

230 

231 # Check that the final normalized catalog values are the same. 

232 self.assertFloatsAlmostEqual( 

233 catalog_run2["slot_CalibFlux_instFlux"], 

234 catalog_run1["slot_CalibFlux_instFlux"], 

235 ) 

236 

237 def testNormalizedCalibrationFluxApplyOnlyFail(self): 

238 self.seed = 12345 

239 norm_task = self._make_task() 

240 catalog_run1 = self._make_catalog(norm_task.schema) 

241 exposure_run1 = self.exposure.clone() 

242 

243 norm_task.run(catalog=catalog_run1, exposure=exposure_run1).ap_corr_map 

244 

245 self.seed = 12345 

246 norm_task2 = self._make_task(apply_only=True) 

247 catalog_run2 = self._make_catalog(norm_task.schema) 

248 

249 # Try without setting an aperture correction map at all. 

250 with self.assertLogs(level=logging.WARNING) as cm: 

251 _ = norm_task2.run(catalog=catalog_run2, exposure=exposure_run1) 

252 warnings = '\n'.join(cm.output) 

253 self.assertIn("does not have a valid normalization", warnings) 

254 

255 # Try again after setting an incomplete aperture correction map. 

256 ap_corr_map_blank = lsst.afw.image.ApCorrMap() 

257 exposure_run1.info.setApCorrMap(ap_corr_map_blank) 

258 

259 with self.assertLogs(level=logging.WARNING) as cm: 

260 _ = norm_task2.run(catalog=catalog_run2, exposure=exposure_run1) 

261 warnings = '\n'.join(cm.output) 

262 self.assertIn("aperture correction map is missing base_CompensatedTophatFlux_12_instFlux", warnings) 

263 

264 def testNormalizedCalibrationFluxError(self): 

265 

266 self.seed = 12345 

267 norm_task = self._make_task() 

268 catalog = self._make_catalog(norm_task.schema) 

269 catalog[norm_task.config.raw_calibflux_name + "_flag"] = True 

270 nStars = len(catalog) 

271 

272 error_string = (f"There are no valid stars to compute normalized calibration fluxes. Of {nStars} " 

273 "initially selected sources, 0 have good raw calibration fluxes and {nStars} have " 

274 "good reference fluxes.") 

275 with self.assertRaises(NormalizedCalibrationFluxError, msg=error_string): 

276 norm_task.run(catalog=catalog, exposure=self.exposure) 

277 

278 

279class TestMemory(lsst.utils.tests.MemoryTestCase): 

280 pass 

281 

282 

283def setup_module(module): 

284 lsst.utils.tests.init() 

285 

286 

287if __name__ == "__main__": 287 ↛ 288line 287 didn't jump to line 288 because the condition on line 287 was never true

288 lsst.utils.tests.init() 

289 unittest.main()