Coverage for tests / test_ApplyApCorr.py: 16%
169 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 08:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 08:48 +0000
1# This file is part of meas_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22import unittest
24import astropy.table
25import numpy as np
27import lsst.utils.tests
28import lsst.geom
29import lsst.meas.base.tests
30import lsst.afw.image as afwImage
31import lsst.afw.table as afwTable
32import lsst.meas.base.applyApCorr as applyApCorr
33from lsst.afw.math import ChebyshevBoundedField
34from lsst.meas.base.apCorrRegistry import addApCorrName
37def initializeSourceCatalog(schema=None, name=None, instFlux=None, sigma=None, centroid=None):
38 instFluxName = name + "_instFlux"
39 instFluxErrName = name + "_instFluxErr"
40 instFluxKey = schema.find(instFluxName).key
41 centroidKey = afwTable.Point2DKey(schema["slot_Centroid"])
42 sourceCat = afwTable.SourceCatalog(schema)
43 source = sourceCat.addNew()
44 source.set(instFluxKey, instFlux)
45 source.set(instFluxErrName, sigma)
46 source.set(centroidKey, centroid)
47 return sourceCat
50class ApplyApCorrTestCase(lsst.meas.base.tests.AlgorithmTestCase, lsst.utils.tests.TestCase):
52 def setUp(self):
53 schema = afwTable.SourceTable.makeMinimalSchema()
54 names = ["test2", "test"]
55 for name in names:
56 addApCorrName(name)
57 schema.addField(name + "_instFlux", type=np.float64)
58 schema.addField(name + "_instFluxErr", type=np.float64)
59 schema.addField(name + "_flag", type=np.float64)
60 schema.addField(name + "_Centroid_x", type=np.float64)
61 schema.addField(name + "_Centroid_y", type=np.float64)
62 schema.getAliasMap().set('slot_Centroid', name + '_Centroid')
63 self.ap_corr_task = applyApCorr.ApplyApCorrTask(schema=schema)
64 self.name = name # just use 'test' prefix for most of the tests
65 self.schema = schema
67 def tearDown(self):
68 del self.schema
69 del self.ap_corr_task
71 def testAddFields(self):
72 # Check that the required fields have been added to the schema
73 self.assertIn(self.name + "_apCorr", self.schema.getNames())
74 self.assertIn(self.name + "_apCorrErr", self.schema.getNames())
75 self.assertIn(self.name + "_flag_apCorr", self.schema.getNames())
76 self.assertLess(self.schema.find("test_apCorr").key.getOffset(),
77 self.schema.find("test2_apCorr").key.getOffset())
79 def _catalogToAstropy(self, catalog):
80 # Convert a SourceCatalog to an Astropy Table
81 sourceTable = catalog.asAstropy(copy=True)
82 # Add the x and y object table columns for the centroid
83 sourceTable["slot_Centroid_x"] = sourceTable["slot_Centroid_x"]
84 sourceTable["slot_Centroid_y"] = sourceTable["slot_Centroid_y"]
85 return sourceTable
87 def testSuccessUnflagged(self):
88 # Check that the aperture correction flag is set to False if aperture
89 # correction was successfully run
90 flagName = self.name + "_flag_apCorr"
91 flagKey = self.schema.find(flagName).key
92 source_test_instFlux = 5.1
93 source_test_centroid = lsst.geom.Point2D(5, 7.1)
94 sourceCat = initializeSourceCatalog(schema=self.schema, name=self.name, instFlux=source_test_instFlux,
95 sigma=0, centroid=source_test_centroid)
96 instFluxName = self.name + "_instFlux"
97 instFluxErrName = self.name + "_instFluxErr"
99 apCorrMap = afwImage.ApCorrMap()
100 bbox = lsst.geom.Box2I(lsst.geom.Point2I(0, 0), lsst.geom.ExtentI(10, 10))
101 coefficients = np.ones((1, 1), dtype=np.float64)
102 coefficients_sigma = np.zeros((1, 1), dtype=np.float64)
103 apCorrMap[instFluxName] = ChebyshevBoundedField(bbox, coefficients)
104 apCorrMap[instFluxErrName] = ChebyshevBoundedField(bbox, coefficients_sigma)
106 sourceTable = self._catalogToAstropy(sourceCat)
107 self.ap_corr_task.run(sourceCat, apCorrMap)
108 self.assertFalse(sourceCat[flagKey])
109 self.ap_corr_task.run(sourceTable, apCorrMap)
110 self.assertFalse(sourceTable[flagName])
112 def testFailureFlagged(self):
113 # Check that aperture correction flag is set to True if aperture
114 # correction is invalid (negative)
115 flagName = self.name + "_flag_apCorr"
116 flagKey = self.schema.find(flagName).key
117 source_test_instFlux = 5.2
118 source_test_centroid = lsst.geom.Point2D(5, 7.1)
119 sourceCat = initializeSourceCatalog(schema=self.schema, name=self.name, instFlux=source_test_instFlux,
120 sigma=0, centroid=source_test_centroid)
121 instFluxName = self.name + "_instFlux"
122 instFluxErrName = self.name + "_instFluxErr"
124 apCorrMap = afwImage.ApCorrMap()
125 bbox = lsst.geom.Box2I(lsst.geom.Point2I(0, 0), lsst.geom.ExtentI(10, 10))
126 coefficients = -(np.ones((1, 1), dtype=np.float64))
127 coefficients_sigma = np.zeros((1, 1), dtype=np.float64)
128 apCorrMap[instFluxName] = ChebyshevBoundedField(bbox, coefficients)
129 apCorrMap[instFluxErrName] = ChebyshevBoundedField(bbox, coefficients_sigma)
130 sourceTable = self._catalogToAstropy(sourceCat)
131 self.ap_corr_task.run(sourceCat, apCorrMap)
132 self.assertTrue(sourceCat[flagKey])
133 self.ap_corr_task.run(sourceTable, apCorrMap)
134 self.assertTrue(sourceTable[flagName])
136 def testCatFluxUnchanged(self):
137 # Pick arbitrary but unique values for the test case
138 source_test_instFlux = 5.3
139 source_test_centroid = lsst.geom.Point2D(5, 7.1)
140 sourceCat = initializeSourceCatalog(schema=self.schema, name=self.name, instFlux=source_test_instFlux,
141 sigma=0, centroid=source_test_centroid)
142 instFluxName = self.name + "_instFlux"
143 instFluxErrName = self.name + "_instFluxErr"
144 instFluxKey = self.schema.find(instFluxName).key
146 apCorrMap = afwImage.ApCorrMap()
147 bbox = lsst.geom.Box2I(lsst.geom.Point2I(0, 0), lsst.geom.ExtentI(10, 10))
148 coefficients = np.ones((1, 1), dtype=np.float64)
149 coefficients_sigma = np.zeros((1, 1), dtype=np.float64)
150 apCorrMap[instFluxName] = ChebyshevBoundedField(bbox, coefficients)
151 apCorrMap[instFluxErrName] = ChebyshevBoundedField(bbox, coefficients_sigma)
153 sourceTable = self._catalogToAstropy(sourceCat)
154 self.ap_corr_task.run(sourceCat, apCorrMap)
155 self.assertEqual(sourceCat[instFluxKey], source_test_instFlux)
156 self.ap_corr_task.run(sourceTable, apCorrMap)
157 self.assertEqual(sourceTable[instFluxName], source_test_instFlux)
159 def testCatFluxHalf(self):
160 # Pick arbitrary but unique values for the test case
161 source_test_instFlux = 5.4
162 source_test_centroid = lsst.geom.Point2D(5, 7.1)
163 sourceCat = initializeSourceCatalog(schema=self.schema, name=self.name, instFlux=source_test_instFlux,
164 sigma=0, centroid=source_test_centroid)
165 instFluxName = self.name + "_instFlux"
166 instFluxErrName = self.name + "_instFluxErr"
167 instFluxKey = self.schema.find(instFluxName).key
169 apCorrMap = afwImage.ApCorrMap()
170 bbox = lsst.geom.Box2I(lsst.geom.Point2I(0, 0), lsst.geom.ExtentI(10, 10))
171 coefficients = np.ones((1, 1), dtype=np.float64)
172 coefficients /= 2.
173 coefficients_sigma = np.zeros((1, 1), dtype=np.float64)
174 apCorrMap[instFluxName] = ChebyshevBoundedField(bbox, coefficients)
175 apCorrMap[instFluxErrName] = ChebyshevBoundedField(bbox, coefficients_sigma)
177 sourceTable = self._catalogToAstropy(sourceCat)
178 self.ap_corr_task.run(sourceCat, apCorrMap)
179 self.assertFloatsAlmostEqual(sourceCat[instFluxKey], source_test_instFlux / 2)
180 self.ap_corr_task.run(sourceTable, apCorrMap)
181 self.assertFloatsAlmostEqual(sourceTable[instFluxName], source_test_instFlux / 2)
182 self.assertFloatsEqual(sourceTable[self.name+"_apCorr"], 0.5)
184 def testCatFluxErr(self):
185 """Test catalog flux errors.
187 Notes
188 -----
189 This test will break if ``UseNaiveFluxErr = False``~
191 The alternate method significantly overestimates noise, causing this
192 test to fail. It is likely that this test will need to be modified if
193 the noise calculation is updated.
194 """
195 # Pick arbitrary but unique values for the test case
196 source_test_instFlux = 5.5
197 source_test_sigma = 0.23
198 source_test_centroid = lsst.geom.Point2D(5, 7.3)
199 sourceCat = initializeSourceCatalog(schema=self.schema, name=self.name, instFlux=source_test_instFlux,
200 sigma=source_test_sigma, centroid=source_test_centroid)
202 instFluxName = self.name + "_instFlux"
203 instFluxErrName = self.name + "_instFluxErr"
204 instFluxErrKey = self.schema.find(instFluxErrName).key
206 apCorrMap = afwImage.ApCorrMap()
207 bbox = lsst.geom.Box2I(lsst.geom.Point2I(0, 0), lsst.geom.ExtentI(10, 10))
208 coefficients = np.ones((1, 1), dtype=np.float64)
209 coefficients_sigma = np.ones((1, 1), dtype=np.float64)
210 apCorrMap[instFluxName] = ChebyshevBoundedField(bbox, coefficients)
211 apCorrMap[instFluxErrName] = ChebyshevBoundedField(bbox, coefficients_sigma)
213 sourceTable = self._catalogToAstropy(sourceCat)
214 self.ap_corr_task.run(sourceCat, apCorrMap)
215 self.assertFloatsAlmostEqual(sourceCat[instFluxErrKey], source_test_sigma)
216 self.ap_corr_task.run(sourceTable, apCorrMap)
217 self.assertFloatsAlmostEqual(sourceTable[instFluxErrName], source_test_sigma)
219 def testSourceTable(self):
220 """Test that the task can handle a SourceTable without columns."""
221 # For a source table there is no schema
222 # so we initialize the task without one.
223 self.ap_corr_task = applyApCorr.ApplyApCorrTask()
224 # Create an empty SourceTable
225 source_test_instFlux = 5.3
226 source_test_centroid = lsst.geom.Point2D(5, 7.1)
227 instFluxName = self.name + "_instFlux"
228 instFluxErrName = self.name + "_instFluxErr"
230 sourceTable = astropy.table.Table(
231 {
232 "id": [1],
233 "slot_Centroid_x": [source_test_centroid.x],
234 "slot_Centroid_y": [source_test_centroid.y],
235 instFluxName: [source_test_instFlux],
236 instFluxErrName: [0.0],
237 }
238 )
240 apCorrMap = afwImage.ApCorrMap()
241 bbox = lsst.geom.Box2I(lsst.geom.Point2I(0, 0), lsst.geom.ExtentI(10, 10))
242 coefficients = np.ones((1, 1), dtype=np.float64)
243 coefficients /= 2.
244 coefficients_sigma = np.zeros((1, 1), dtype=np.float64)
245 apCorrMap[instFluxName] = ChebyshevBoundedField(bbox, coefficients)
246 apCorrMap[instFluxErrName] = ChebyshevBoundedField(bbox, coefficients_sigma)
248 self.ap_corr_task.run(sourceTable, apCorrMap)
249 self.assertFloatsAlmostEqual(sourceTable[instFluxName], source_test_instFlux / 2)
250 self.assertFloatsEqual(sourceTable[self.name+"_apCorr"], 0.5)
253class TestMemory(lsst.utils.tests.MemoryTestCase):
254 pass
257def setup_module(module):
258 lsst.utils.tests.init()
261if __name__ == "__main__": 261 ↛ 262line 261 didn't jump to line 262 because the condition on line 261 was never true
262 lsst.utils.tests.init()
263 unittest.main()