Coverage for tests / test_ApplyApCorr.py: 16%

169 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-24 08:22 +0000

1# This file is part of meas_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22import unittest 

23 

24import astropy.table 

25import numpy as np 

26 

27import lsst.utils.tests 

28import lsst.geom 

29import lsst.meas.base.tests 

30import lsst.afw.image as afwImage 

31import lsst.afw.table as afwTable 

32import lsst.meas.base.applyApCorr as applyApCorr 

33from lsst.afw.math import ChebyshevBoundedField 

34from lsst.meas.base.apCorrRegistry import addApCorrName 

35 

36 

37def initializeSourceCatalog(schema=None, name=None, instFlux=None, sigma=None, centroid=None): 

38 instFluxName = name + "_instFlux" 

39 instFluxErrName = name + "_instFluxErr" 

40 instFluxKey = schema.find(instFluxName).key 

41 centroidKey = afwTable.Point2DKey(schema["slot_Centroid"]) 

42 sourceCat = afwTable.SourceCatalog(schema) 

43 source = sourceCat.addNew() 

44 source.set(instFluxKey, instFlux) 

45 source.set(instFluxErrName, sigma) 

46 source.set(centroidKey, centroid) 

47 return sourceCat 

48 

49 

50class ApplyApCorrTestCase(lsst.meas.base.tests.AlgorithmTestCase, lsst.utils.tests.TestCase): 

51 

52 def setUp(self): 

53 schema = afwTable.SourceTable.makeMinimalSchema() 

54 names = ["test2", "test"] 

55 for name in names: 

56 addApCorrName(name) 

57 schema.addField(name + "_instFlux", type=np.float64) 

58 schema.addField(name + "_instFluxErr", type=np.float64) 

59 schema.addField(name + "_flag", type=np.float64) 

60 schema.addField(name + "_Centroid_x", type=np.float64) 

61 schema.addField(name + "_Centroid_y", type=np.float64) 

62 schema.getAliasMap().set('slot_Centroid', name + '_Centroid') 

63 self.ap_corr_task = applyApCorr.ApplyApCorrTask(schema=schema) 

64 self.name = name # just use 'test' prefix for most of the tests 

65 self.schema = schema 

66 

67 def tearDown(self): 

68 del self.schema 

69 del self.ap_corr_task 

70 

71 def testAddFields(self): 

72 # Check that the required fields have been added to the schema 

73 self.assertIn(self.name + "_apCorr", self.schema.getNames()) 

74 self.assertIn(self.name + "_apCorrErr", self.schema.getNames()) 

75 self.assertIn(self.name + "_flag_apCorr", self.schema.getNames()) 

76 self.assertLess(self.schema.find("test_apCorr").key.getOffset(), 

77 self.schema.find("test2_apCorr").key.getOffset()) 

78 

79 def _catalogToAstropy(self, catalog): 

80 # Convert a SourceCatalog to an Astropy Table 

81 sourceTable = catalog.asAstropy(copy=True) 

82 # Add the x and y object table columns for the centroid 

83 sourceTable["slot_Centroid_x"] = sourceTable["slot_Centroid_x"] 

84 sourceTable["slot_Centroid_y"] = sourceTable["slot_Centroid_y"] 

85 return sourceTable 

86 

87 def testSuccessUnflagged(self): 

88 # Check that the aperture correction flag is set to False if aperture 

89 # correction was successfully run 

90 flagName = self.name + "_flag_apCorr" 

91 flagKey = self.schema.find(flagName).key 

92 source_test_instFlux = 5.1 

93 source_test_centroid = lsst.geom.Point2D(5, 7.1) 

94 sourceCat = initializeSourceCatalog(schema=self.schema, name=self.name, instFlux=source_test_instFlux, 

95 sigma=0, centroid=source_test_centroid) 

96 instFluxName = self.name + "_instFlux" 

97 instFluxErrName = self.name + "_instFluxErr" 

98 

99 apCorrMap = afwImage.ApCorrMap() 

100 bbox = lsst.geom.Box2I(lsst.geom.Point2I(0, 0), lsst.geom.ExtentI(10, 10)) 

101 coefficients = np.ones((1, 1), dtype=np.float64) 

102 coefficients_sigma = np.zeros((1, 1), dtype=np.float64) 

103 apCorrMap[instFluxName] = ChebyshevBoundedField(bbox, coefficients) 

104 apCorrMap[instFluxErrName] = ChebyshevBoundedField(bbox, coefficients_sigma) 

105 

106 sourceTable = self._catalogToAstropy(sourceCat) 

107 self.ap_corr_task.run(sourceCat, apCorrMap) 

108 self.assertFalse(sourceCat[flagKey]) 

109 self.ap_corr_task.run(sourceTable, apCorrMap) 

110 self.assertFalse(sourceTable[flagName]) 

111 

112 def testFailureFlagged(self): 

113 # Check that aperture correction flag is set to True if aperture 

114 # correction is invalid (negative) 

115 flagName = self.name + "_flag_apCorr" 

116 flagKey = self.schema.find(flagName).key 

117 source_test_instFlux = 5.2 

118 source_test_centroid = lsst.geom.Point2D(5, 7.1) 

119 sourceCat = initializeSourceCatalog(schema=self.schema, name=self.name, instFlux=source_test_instFlux, 

120 sigma=0, centroid=source_test_centroid) 

121 instFluxName = self.name + "_instFlux" 

122 instFluxErrName = self.name + "_instFluxErr" 

123 

124 apCorrMap = afwImage.ApCorrMap() 

125 bbox = lsst.geom.Box2I(lsst.geom.Point2I(0, 0), lsst.geom.ExtentI(10, 10)) 

126 coefficients = -(np.ones((1, 1), dtype=np.float64)) 

127 coefficients_sigma = np.zeros((1, 1), dtype=np.float64) 

128 apCorrMap[instFluxName] = ChebyshevBoundedField(bbox, coefficients) 

129 apCorrMap[instFluxErrName] = ChebyshevBoundedField(bbox, coefficients_sigma) 

130 sourceTable = self._catalogToAstropy(sourceCat) 

131 self.ap_corr_task.run(sourceCat, apCorrMap) 

132 self.assertTrue(sourceCat[flagKey]) 

133 self.ap_corr_task.run(sourceTable, apCorrMap) 

134 self.assertTrue(sourceTable[flagName]) 

135 

136 def testCatFluxUnchanged(self): 

137 # Pick arbitrary but unique values for the test case 

138 source_test_instFlux = 5.3 

139 source_test_centroid = lsst.geom.Point2D(5, 7.1) 

140 sourceCat = initializeSourceCatalog(schema=self.schema, name=self.name, instFlux=source_test_instFlux, 

141 sigma=0, centroid=source_test_centroid) 

142 instFluxName = self.name + "_instFlux" 

143 instFluxErrName = self.name + "_instFluxErr" 

144 instFluxKey = self.schema.find(instFluxName).key 

145 

146 apCorrMap = afwImage.ApCorrMap() 

147 bbox = lsst.geom.Box2I(lsst.geom.Point2I(0, 0), lsst.geom.ExtentI(10, 10)) 

148 coefficients = np.ones((1, 1), dtype=np.float64) 

149 coefficients_sigma = np.zeros((1, 1), dtype=np.float64) 

150 apCorrMap[instFluxName] = ChebyshevBoundedField(bbox, coefficients) 

151 apCorrMap[instFluxErrName] = ChebyshevBoundedField(bbox, coefficients_sigma) 

152 

153 sourceTable = self._catalogToAstropy(sourceCat) 

154 self.ap_corr_task.run(sourceCat, apCorrMap) 

155 self.assertEqual(sourceCat[instFluxKey], source_test_instFlux) 

156 self.ap_corr_task.run(sourceTable, apCorrMap) 

157 self.assertEqual(sourceTable[instFluxName], source_test_instFlux) 

158 

159 def testCatFluxHalf(self): 

160 # Pick arbitrary but unique values for the test case 

161 source_test_instFlux = 5.4 

162 source_test_centroid = lsst.geom.Point2D(5, 7.1) 

163 sourceCat = initializeSourceCatalog(schema=self.schema, name=self.name, instFlux=source_test_instFlux, 

164 sigma=0, centroid=source_test_centroid) 

165 instFluxName = self.name + "_instFlux" 

166 instFluxErrName = self.name + "_instFluxErr" 

167 instFluxKey = self.schema.find(instFluxName).key 

168 

169 apCorrMap = afwImage.ApCorrMap() 

170 bbox = lsst.geom.Box2I(lsst.geom.Point2I(0, 0), lsst.geom.ExtentI(10, 10)) 

171 coefficients = np.ones((1, 1), dtype=np.float64) 

172 coefficients /= 2. 

173 coefficients_sigma = np.zeros((1, 1), dtype=np.float64) 

174 apCorrMap[instFluxName] = ChebyshevBoundedField(bbox, coefficients) 

175 apCorrMap[instFluxErrName] = ChebyshevBoundedField(bbox, coefficients_sigma) 

176 

177 sourceTable = self._catalogToAstropy(sourceCat) 

178 self.ap_corr_task.run(sourceCat, apCorrMap) 

179 self.assertFloatsAlmostEqual(sourceCat[instFluxKey], source_test_instFlux / 2) 

180 self.ap_corr_task.run(sourceTable, apCorrMap) 

181 self.assertFloatsAlmostEqual(sourceTable[instFluxName], source_test_instFlux / 2) 

182 self.assertFloatsEqual(sourceTable[self.name+"_apCorr"], 0.5) 

183 

184 def testCatFluxErr(self): 

185 """Test catalog flux errors. 

186 

187 Notes 

188 ----- 

189 This test will break if ``UseNaiveFluxErr = False``~ 

190 

191 The alternate method significantly overestimates noise, causing this 

192 test to fail. It is likely that this test will need to be modified if 

193 the noise calculation is updated. 

194 """ 

195 # Pick arbitrary but unique values for the test case 

196 source_test_instFlux = 5.5 

197 source_test_sigma = 0.23 

198 source_test_centroid = lsst.geom.Point2D(5, 7.3) 

199 sourceCat = initializeSourceCatalog(schema=self.schema, name=self.name, instFlux=source_test_instFlux, 

200 sigma=source_test_sigma, centroid=source_test_centroid) 

201 

202 instFluxName = self.name + "_instFlux" 

203 instFluxErrName = self.name + "_instFluxErr" 

204 instFluxErrKey = self.schema.find(instFluxErrName).key 

205 

206 apCorrMap = afwImage.ApCorrMap() 

207 bbox = lsst.geom.Box2I(lsst.geom.Point2I(0, 0), lsst.geom.ExtentI(10, 10)) 

208 coefficients = np.ones((1, 1), dtype=np.float64) 

209 coefficients_sigma = np.ones((1, 1), dtype=np.float64) 

210 apCorrMap[instFluxName] = ChebyshevBoundedField(bbox, coefficients) 

211 apCorrMap[instFluxErrName] = ChebyshevBoundedField(bbox, coefficients_sigma) 

212 

213 sourceTable = self._catalogToAstropy(sourceCat) 

214 self.ap_corr_task.run(sourceCat, apCorrMap) 

215 self.assertFloatsAlmostEqual(sourceCat[instFluxErrKey], source_test_sigma) 

216 self.ap_corr_task.run(sourceTable, apCorrMap) 

217 self.assertFloatsAlmostEqual(sourceTable[instFluxErrName], source_test_sigma) 

218 

219 def testSourceTable(self): 

220 """Test that the task can handle a SourceTable without columns.""" 

221 # For a source table there is no schema 

222 # so we initialize the task without one. 

223 self.ap_corr_task = applyApCorr.ApplyApCorrTask() 

224 # Create an empty SourceTable 

225 source_test_instFlux = 5.3 

226 source_test_centroid = lsst.geom.Point2D(5, 7.1) 

227 instFluxName = self.name + "_instFlux" 

228 instFluxErrName = self.name + "_instFluxErr" 

229 

230 sourceTable = astropy.table.Table( 

231 { 

232 "id": [1], 

233 "slot_Centroid_x": [source_test_centroid.x], 

234 "slot_Centroid_y": [source_test_centroid.y], 

235 instFluxName: [source_test_instFlux], 

236 instFluxErrName: [0.0], 

237 } 

238 ) 

239 

240 apCorrMap = afwImage.ApCorrMap() 

241 bbox = lsst.geom.Box2I(lsst.geom.Point2I(0, 0), lsst.geom.ExtentI(10, 10)) 

242 coefficients = np.ones((1, 1), dtype=np.float64) 

243 coefficients /= 2. 

244 coefficients_sigma = np.zeros((1, 1), dtype=np.float64) 

245 apCorrMap[instFluxName] = ChebyshevBoundedField(bbox, coefficients) 

246 apCorrMap[instFluxErrName] = ChebyshevBoundedField(bbox, coefficients_sigma) 

247 

248 self.ap_corr_task.run(sourceTable, apCorrMap) 

249 self.assertFloatsAlmostEqual(sourceTable[instFluxName], source_test_instFlux / 2) 

250 self.assertFloatsEqual(sourceTable[self.name+"_apCorr"], 0.5) 

251 

252 

253class TestMemory(lsst.utils.tests.MemoryTestCase): 

254 pass 

255 

256 

257def setup_module(module): 

258 lsst.utils.tests.init() 

259 

260 

261if __name__ == "__main__": 261 ↛ 262line 261 didn't jump to line 262 because the condition on line 261 was never true

262 lsst.utils.tests.init() 

263 unittest.main()