Coverage for tests / test_measureApCorr.py: 16%

176 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-01 08:26 +0000

1# 

2# LSST Data Management System 

3# 

4# Copyright 2008-2016 AURA/LSST. 

5# 

6# This product includes software developed by the 

7# LSST Project (http://www.lsst.org/). 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the LSST License Statement and 

20# the GNU General Public License along with this program. If not, 

21# see <https://www.lsstcorp.org/LegalNotices/>. 

22# 

23import unittest 

24import numpy as np 

25import logging 

26 

27import lsst.geom 

28import lsst.afw.image as afwImage 

29import lsst.afw.table as afwTable 

30from lsst.afw.math import ChebyshevBoundedField 

31import lsst.pex.config 

32import lsst.meas.algorithms.measureApCorr as measureApCorr 

33from lsst.meas.base.apCorrRegistry import addApCorrName 

34import lsst.meas.base.tests 

35import lsst.utils.tests 

36 

37 

38def apCorrDefaultMap(value=None, bbox=None): 

39 default_coefficients = np.ones((1, 1), dtype=float) 

40 default_coefficients /= value 

41 default_apCorrMap = ChebyshevBoundedField(bbox, default_coefficients) 

42 default_fill = afwImage.ImageF(bbox) 

43 default_apCorrMap.fillImage(default_fill) 

44 return default_fill 

45 

46 

47class MeasureApCorrTestCase(lsst.meas.base.tests.AlgorithmTestCase, lsst.utils.tests.TestCase): 

48 

49 def makeCatalog(self, apCorrScale=1.0, numSources=5): 

50 sourceCat = afwTable.SourceCatalog(self.schema) 

51 

52 centroidKey = afwTable.Point2DKey(self.schema["slot_Centroid"]) 

53 x = self.rng.random(numSources)*self.exposure.getWidth() + self.exposure.getX0() 

54 y = self.rng.random(numSources)*self.exposure.getHeight() + self.exposure.getY0() 

55 for _i in range(numSources): 

56 source_test_centroid = lsst.geom.Point2D(x[_i], y[_i]) 

57 source = sourceCat.addNew() 

58 source.set(centroidKey, source_test_centroid) 

59 # All sources are unresolved. 

60 source[self.unresolvedName] = 0.0 

61 

62 source_test_instFlux = 5.1 

63 source_test_instFluxErr = 1e-3 

64 

65 for name in self.names: 

66 sourceCat[name + "_instFlux"] = source_test_instFlux 

67 sourceCat[name + "_instFluxErr"] = source_test_instFluxErr 

68 sourceCat[name + "_flag"] = np.zeros(len(sourceCat), dtype=bool) 

69 sourceCat[name + self.apNameStr + "_instFlux"] = source_test_instFlux * apCorrScale 

70 sourceCat[name + self.apNameStr + "_instFluxErr"] = source_test_instFluxErr * apCorrScale 

71 sourceCat[name + self.apNameStr + "_flag"] = np.zeros(len(sourceCat), dtype=bool) 

72 

73 return sourceCat 

74 

75 def setUp(self): 

76 self.rng = np.random.Generator(np.random.MT19937(1)) 

77 schema = afwTable.SourceTable.makeMinimalSchema() 

78 apNameStr = "Ap" 

79 config = measureApCorr.MeasureApCorrTask.ConfigClass() 

80 unresolvedName = config.sourceSelector.active.unresolved.name 

81 # Add fields in anti-sorted order to try to impose a need for sorting 

82 # in the addition of the apCorr fields (may happen by fluke, but this 

83 # is the best we can do to test this here. 

84 names = ["test2", "test1"] 

85 for name in names: 

86 apName = name + apNameStr 

87 addApCorrName(apName) 

88 schema.addField(name + "_instFlux", type=float) 

89 schema.addField(name + "_instFluxErr", type=float) 

90 schema.addField(name + "_flag", type="Flag") 

91 schema.addField(apName + "_instFlux", type=float) 

92 schema.addField(apName + "_instFluxErr", type=float) 

93 schema.addField(apName + "_flag", type="Flag") 

94 schema.addField(names[0] + "_Centroid_x", type=float) 

95 schema.addField(names[0] + "_Centroid_y", type=float) 

96 schema.getAliasMap().set("slot_Centroid", names[0] + "_Centroid") 

97 schema.addField(unresolvedName, type=float) 

98 schema.addField("deblend_nChild", type=np.int32) 

99 for flag in [ 

100 "base_PixelFlags_flag_edge", 

101 "base_PixelFlags_flag_nodata", 

102 "base_PixelFlags_flag_interpolatedCenter", 

103 "base_PixelFlags_flag_saturatedCenter", 

104 "base_PixelFlags_flag_crCenter", 

105 "base_PixelFlags_flag_bad", 

106 "base_PixelFlags_flag_interpolated", 

107 "base_PixelFlags_flag_saturated", 

108 ]: 

109 schema.addField(flag, type="Flag") 

110 config.refFluxName = names[0] 

111 config.sourceSelector["science"].signalToNoise.fluxField = names[0] + "_instFlux" 

112 config.sourceSelector["science"].signalToNoise.errField = names[0] + "_instFluxErr" 

113 self.meas_apCorr_task = measureApCorr.MeasureApCorrTask(schema=schema, config=config) 

114 self.names = names 

115 self.apNameStr = apNameStr 

116 self.schema = schema 

117 self.exposure = lsst.afw.image.ExposureF(10, 10) 

118 self.unresolvedName = unresolvedName 

119 

120 def tearDown(self): 

121 del self.schema 

122 del self.meas_apCorr_task 

123 del self.exposure 

124 

125 def testAddFields(self): 

126 """Instantiating the task should add one field to the schema.""" 

127 for name in self.names: 

128 self.assertIn("apcorr_" + name + self.apNameStr + "_used", self.schema.getNames()) 

129 sortedNames = sorted(self.names) 

130 key0 = self.schema.find("apcorr_" + sortedNames[0] + self.apNameStr + "_used").key 

131 key1 = self.schema.find("apcorr_" + sortedNames[1] + self.apNameStr + "_used").key 

132 # Check that the apCorr fields were added in a sorted order (not 

133 # foolproof as this could have happened by fluke, but it's the best 

134 # we can do to test this here (having added the two fields in an anti- 

135 # sorted order). 

136 self.assertLess(key0.getOffset() + key0.getBit(), key1.getOffset() + key1.getBit()) 

137 

138 def testReturnApCorrMap(self): 

139 """The measureApCorr task should return a structure with a single key 'apCorrMap'.""" 

140 struct = self.meas_apCorr_task.run(catalog=self.makeCatalog(), exposure=self.exposure) 

141 self.assertEqual(list(struct.getDict().keys()), ['apCorrMap']) 

142 

143 def testApCorrMapKeys(self): 

144 """An apCorrMap structure should have two keys per name supplied to addApCorrName().""" 

145 key_names = [] 

146 for name in self.names: 

147 apFluxName = name + self.apNameStr + "_instFlux" 

148 apFluxErrName = name + self.apNameStr + "_instFluxErr" 

149 struct = self.meas_apCorr_task.run(catalog=self.makeCatalog(), exposure=self.exposure) 

150 key_names.append(apFluxName) 

151 key_names.append(apFluxErrName) 

152 self.assertEqual(set(struct.apCorrMap.keys()), set(key_names)) 

153 

154 def testTooFewSources(self): 

155 """ If there are too few sources, check that an exception is raised.""" 

156 # Create an empty catalog with no sources to process. 

157 catalog = afwTable.SourceCatalog(self.schema) 

158 with self.assertRaisesRegex(measureApCorr.MeasureApCorrError, 

159 "Unable to measure aperture correction for 'test1Ap'"): 

160 self.meas_apCorr_task.run(catalog=catalog, exposure=self.exposure) 

161 

162 # We now try again after declaring that the aperture correction is 

163 # allowed to fail. This should run without raising an exception, but 

164 # will log a warning. 

165 for name in self.names: 

166 self.meas_apCorr_task.config.allowFailure.append(name + self.apNameStr) 

167 with self.assertLogs(level=logging.WARNING) as cm: 

168 self.meas_apCorr_task.run(catalog=catalog, exposure=self.exposure) 

169 self.assertIn("Unable to measure aperture correction for 'test1Ap'", cm.output[0]) 

170 

171 def testSourceNotUsed(self): 

172 """ Check that a source outside the bounding box is flagged as not used (False).""" 

173 sourceCat = self.makeCatalog() 

174 source = sourceCat.addNew() 

175 nameAp = self.names[0] + "Ap" 

176 source[nameAp + "_instFlux"] = 5.1 

177 source[nameAp + "_instFluxErr"] = 1e-3 

178 source[self.meas_apCorr_task.config.refFluxName + "_instFlux"] = 5.1 

179 source[self.meas_apCorr_task.config.refFluxName + "_instFluxErr"] = 1e-3 

180 centroidKey = afwTable.Point2DKey(self.schema["slot_Centroid"]) 

181 source.set(centroidKey, lsst.geom.Point2D(15, 7.1)) 

182 apCorrFlagName = "apcorr_" + nameAp + "_used" 

183 

184 self.meas_apCorr_task.run(catalog=sourceCat, exposure=self.exposure) 

185 # Check that all but the final source are used. 

186 self.assertTrue(sourceCat[apCorrFlagName][0: -1].all()) 

187 # Check that the final source is not used. 

188 self.assertFalse(sourceCat[apCorrFlagName][-1]) 

189 

190 def testSourceUsed(self): 

191 """Check that valid sources inside the bounding box that are used have their flags set to True.""" 

192 sourceCat = self.makeCatalog() 

193 self.meas_apCorr_task.run(catalog=sourceCat, exposure=self.exposure) 

194 for name in self.names: 

195 self.assertTrue(sourceCat["apcorr_" + name + self.apNameStr + "_used"].all()) 

196 

197 def testApertureMeasOnes(self): 

198 """ Check that sources with aperture fluxes exactly the same as their catalog fluxes 

199 returns an aperture correction map of 1s""" 

200 apFluxName = self.names[0] + self.apNameStr + "_instFlux" 

201 sourceCat = self.makeCatalog() 

202 struct = self.meas_apCorr_task.run(catalog=sourceCat, exposure=self.exposure) 

203 default_fill = apCorrDefaultMap(value=1.0, bbox=self.exposure.getBBox()) 

204 test_fill = afwImage.ImageF(self.exposure.getBBox()) 

205 struct.apCorrMap[apFluxName].fillImage(test_fill) 

206 np.testing.assert_allclose(test_fill.getArray(), default_fill.getArray()) 

207 

208 def testApertureMeasTens(self): 

209 """Check that aperture correction scales source fluxes in the correct direction.""" 

210 apCorr_factor = 10. 

211 sourceCat = self.makeCatalog(apCorrScale=apCorr_factor) 

212 apFluxName = self.names[0] + self.apNameStr + "_instFlux" 

213 struct = self.meas_apCorr_task.run(catalog=sourceCat, exposure=self.exposure) 

214 default_fill = apCorrDefaultMap(value=apCorr_factor, bbox=self.exposure.getBBox()) 

215 test_fill = afwImage.ImageF(self.exposure.getBBox()) 

216 struct.apCorrMap[apFluxName].fillImage(test_fill) 

217 np.testing.assert_allclose(test_fill.getArray(), default_fill.getArray()) 

218 

219 def testFilterBadValue(self): 

220 """Check that the aperture correction filters a bad value.""" 

221 sourceCat = self.makeCatalog() 

222 source = sourceCat.addNew() 

223 nameAp = self.names[0] + self.apNameStr 

224 source[nameAp + "_instFlux"] = 100.0 

225 source[nameAp + "_instFluxErr"] = 1e-3 

226 source[self.meas_apCorr_task.config.refFluxName + "_instFlux"] = 5.1 

227 source[self.meas_apCorr_task.config.refFluxName + "_instFluxErr"] = 1e-3 

228 source[self.unresolvedName] = 0.0 

229 centroidKey = afwTable.Point2DKey(self.schema["slot_Centroid"]) 

230 x = self.exposure.getX0() + 1 

231 y = self.exposure.getY0() + 1 

232 source.set(centroidKey, lsst.geom.Point2D(x, y)) 

233 

234 self.meas_apCorr_task.run(catalog=sourceCat, exposure=self.exposure) 

235 

236 # Check that both Ap fluxes are removed as outliers; one is due 

237 # to being unfilled (nan), the other is a large outlier. 

238 for name in self.names: 

239 apCorrFlagName = "apcorr_" + name + self.apNameStr + "_used" 

240 # Check that all but the final source are used. 

241 self.assertTrue(sourceCat[apCorrFlagName][0: -1].all()) 

242 # Check that the final source is not used. 

243 self.assertFalse(sourceCat[apCorrFlagName][-1]) 

244 

245 def testTooFewSourcesAfterFiltering(self): 

246 """Check that the aperture correction fails when too many are filtered.""" 

247 sourceCat = self.makeCatalog() 

248 self.meas_apCorr_task.config.minDegreesOfFreedom = 4 

249 

250 for name in self.names: 

251 nameAp = name + self.apNameStr 

252 sourceCat[nameAp + "_instFlux"][0] = 100.0 

253 

254 with self.assertRaisesRegex(measureApCorr.MeasureApCorrError, 

255 f"Unable to measure aperture correction for '{nameAp}'"): 

256 self.meas_apCorr_task.run(catalog=sourceCat, exposure=self.exposure) 

257 

258 # We now try again after declaring that the aperture correction is 

259 # allowed to fail. This should run cleanly without raising an exception. 

260 for name in self.names: 

261 self.meas_apCorr_task.config.allowFailure.append(name + self.apNameStr) 

262 self.meas_apCorr_task.run(catalog=sourceCat, exposure=self.exposure) 

263 

264 

265class TestMemory(lsst.utils.tests.MemoryTestCase): 

266 pass 

267 

268 

269def setup_module(module): 

270 lsst.utils.tests.init() 

271 

272 

273if __name__ == "__main__": 273 ↛ 274line 273 didn't jump to line 274 because the condition on line 273 was never true

274 lsst.utils.tests.init() 

275 unittest.main()