Coverage for tests / test_normalizedCalibrationFlux.py: 15%
146 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 08:38 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 08:38 +0000
1# This file is part of meas_algorithms.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22import unittest
24import numpy as np
25import logging
27import lsst.afw.image
28import lsst.afw.table
29import lsst.utils.tests
30from lsst.meas.algorithms import NormalizedCalibrationFluxTask, NormalizedCalibrationFluxError
33class NormalizedCalibrationFluxTestCase(lsst.utils.tests.TestCase):
34 def setUp(self):
35 self.ap_name = "base_CircularApertureFlux_12_0"
36 self.cg_name = "base_CompensatedTophatFlux_12"
37 self.exposure = lsst.afw.image.ExposureF(1000, 1000)
39 def _make_task(self, apply_only=False):
40 """
41 Make a normalization task for testing.
43 Parameters
44 ----------
45 apply_only : `bool`, optional
46 Configure task in apply_only mode?
48 Returns
49 -------
50 norm_task : `lsst.meas.algorithms.NormalizedCalibrationFluxTask`
51 """
52 schema = lsst.afw.table.SourceTable.makeMinimalSchema()
54 config = NormalizedCalibrationFluxTask.ConfigClass()
55 if apply_only:
56 config.do_measure_ap_corr = False
58 for name in [self.ap_name, self.cg_name]:
59 schema.addField(name + "_instFlux", type=float)
60 schema.addField(name + "_instFluxErr", type=float)
61 schema.addField(name + "_flag", type="Flag")
62 schema.addField("base_Centroid_x", type=float)
63 schema.addField("base_Centroid_y", type=float)
64 schema.getAliasMap().set("slot_Centroid", "base_Centroid")
65 schema.addField("calib_psf_used", type="Flag")
66 config.measure_ap_corr.refFluxName = self.ap_name
67 flux_field = self.cg_name + "_instFlux"
68 err_field = flux_field + "Err"
69 config.measure_ap_corr.sourceSelector["science"].signalToNoise.fluxField = flux_field
70 config.measure_ap_corr.sourceSelector["science"].signalToNoise.errField = err_field
72 norm_task = NormalizedCalibrationFluxTask(schema=schema, config=config)
74 return norm_task
76 def _make_catalog(self, schema, num_sources=100, ap_flux_offset=0.0, cg_scale=0.5):
77 """Make a source catalog with optional flux offset and scaling.
79 Parameters
80 ----------
81 schema : `lsst.afw.table.Schema`
82 The table schema.
83 num_sources : `int`
84 Number of sources to put into the catalog.
85 ap_flux_offset : `float`
86 Constant offset in aperture fluxes due to "background" issues.
87 cg_scale : `float`
88 Flux ratio between unnormalized flux and aperture (reference) flux.
89 """
90 source_cat = lsst.afw.table.SourceCatalog(schema)
92 rng = np.random.Generator(np.random.MT19937(self.seed))
93 x = rng.random(num_sources)*self.exposure.getWidth() + self.exposure.getX0()
94 y = rng.random(num_sources)*self.exposure.getHeight() + self.exposure.getY0()
95 flux = rng.uniform(low=10000.0, high=100000.0, size=num_sources)
97 source_cat.resize(num_sources)
98 source_cat["slot_Centroid_x"] = x
99 source_cat["slot_Centroid_y"] = y
100 source_cat["calib_psf_used"] = True
102 # Make a very simple error model.
103 noise_per_pix = 2.0
104 ap_flux_err = np.sqrt((np.pi*12**2.*noise_per_pix)**2. + (flux + ap_flux_offset))
105 cg_flux_err = np.sqrt((np.pi*4**2.*noise_per_pix)**2. + flux)
107 source_cat[self.cg_name + "_instFlux"] = cg_scale*flux
108 source_cat[self.cg_name + "_instFluxErr"] = cg_scale*cg_flux_err
109 source_cat[self.cg_name + "_flag"] = np.zeros(num_sources, dtype=bool)
110 source_cat[self.ap_name + "_instFlux"] = flux + ap_flux_offset
111 source_cat[self.ap_name + "_instFluxErr"] = ap_flux_err
112 source_cat[self.ap_name + "_flag"] = np.zeros(num_sources, dtype=bool)
114 return source_cat
116 def tearDown(self):
117 del self.exposure
119 def testNormalizedCalibrationFlux(self):
120 self.seed = 12345
121 norm_task = self._make_task()
122 catalog = self._make_catalog(norm_task.schema)
124 ap_corr_map = norm_task.run(catalog=catalog, exposure=self.exposure).ap_corr_map
126 self.assertEqual(
127 catalog.schema.getAliasMap().get("slot_CalibFlux"),
128 norm_task.config.normalized_calibflux_name,
129 )
131 self.assertIn("base_CompensatedTophatFlux_12_instFlux", ap_corr_map)
132 self.assertIn("base_CompensatedTophatFlux_12_instFluxErr", ap_corr_map)
134 # The full set should have a 1.0 ratio when the
135 # aperture flux offset is 0.0
136 ratio = np.mean(catalog["slot_CalibFlux_instFlux"]/catalog[self.ap_name + "_instFlux"])
137 self.assertFloatsAlmostEqual(ratio, 1.0, rtol=1e-10)
139 # The subset that was used should always have a 1.0 ratio.
140 used = catalog["apcorr_base_CompensatedTophatFlux_12_used"]
141 ratio_used = np.mean(
142 catalog["slot_CalibFlux_instFlux"][used]/catalog[self.ap_name + "_instFlux"][used]
143 )
144 self.assertFloatsAlmostEqual(ratio_used, 1.0, rtol=1e-10)
146 # The error ratios should match the input and output.
147 self.assertFloatsAlmostEqual(
148 catalog["slot_CalibFlux_instFluxErr"]/catalog["slot_CalibFlux_instFlux"],
149 catalog[self.cg_name + "_instFluxErr"]/catalog[self.cg_name + "_instFlux"],
150 )
152 def testNormalizedCalibrationFluxOffset(self):
153 self.seed = 12345
154 norm_task = self._make_task()
156 for offset in [-10.0, 10.0]:
157 catalog = self._make_catalog(norm_task.schema, ap_flux_offset=offset)
159 norm_task.run(catalog=catalog, exposure=self.exposure)
161 # The full set should not have a 1.0 ratio when the
162 # aperture flux offset is not 0.0
163 ratio = np.mean(catalog["slot_CalibFlux_instFlux"]/catalog[self.ap_name + "_instFlux"])
164 self.assertFloatsNotEqual(ratio, 1.0)
165 # Whether the full set is less than or greater than 1.0 depends on
166 # the sign of the background offset.
167 if offset < 0.0:
168 self.assertGreater(ratio, 1.0)
169 else:
170 self.assertLess(ratio, 1.0)
172 # The subset that was used should always have a 1.0 ratio, though
173 # this may be not quite zero because of the trend in the ratio
174 # vs flux even at the bright end.
175 used = catalog["apcorr_base_CompensatedTophatFlux_12_used"]
176 ratio_used = np.median(
177 catalog["slot_CalibFlux_instFlux"][used]/catalog[self.ap_name + "_instFlux"][used]
178 )
179 self.assertFloatsAlmostEqual(ratio_used, 1.0, rtol=1e-10)
181 def testNormalizedCalibrationFluxTooFew(self):
182 self.seed = 12345
183 norm_task = self._make_task()
184 catalog = self._make_catalog(norm_task.schema)
186 flags = np.ones(len(catalog), dtype=bool)
187 flags[0] = False
188 catalog[self.cg_name + "_flag"] = flags
190 ap_corr_map = norm_task.run(catalog=catalog, exposure=self.exposure).ap_corr_map
192 self.assertIn("base_CompensatedTophatFlux_12_instFlux", ap_corr_map)
193 self.assertIn("base_CompensatedTophatFlux_12_instFluxErr", ap_corr_map)
195 # The full set should have a 1.0 ratio when the
196 # aperture flux offset is 0.0
197 ratio = np.mean(catalog["slot_CalibFlux_instFlux"]/catalog[self.ap_name + "_instFlux"])
198 self.assertFloatsAlmostEqual(ratio, 1.0, rtol=1e-10)
200 self.assertTrue(np.all(~flags == catalog["apcorr_base_CompensatedTophatFlux_12_used"]))
202 def testNormalizedCalibrationFluxApplyOnly(self):
203 # Run the regular task in default mode first.
204 self.seed = 12345
205 norm_task = self._make_task()
206 catalog_run1 = self._make_catalog(norm_task.schema)
207 exposure_run1 = self.exposure.clone()
209 ap_corr_map = norm_task.run(catalog=catalog_run1, exposure=exposure_run1).ap_corr_map
211 exposure_run1.info.setApCorrMap(ap_corr_map)
213 # Rerun the task; we need to make sure we have the same input so re-seed.
214 self.seed = 12345
215 norm_task2 = self._make_task(apply_only=True)
216 catalog_run2 = self._make_catalog(norm_task.schema)
218 ap_corr_map2 = norm_task2.run(catalog=catalog_run2, exposure=exposure_run1).ap_corr_map
220 # Check that the ap_corr_map and ap_corr_map2 are the same.
221 self.assertEqual(set(ap_corr_map2.keys()), set(ap_corr_map.keys()))
222 for key in ap_corr_map.keys():
223 self.assertEqual(ap_corr_map2[key], ap_corr_map[key])
225 # Check that the slot is set correctly
226 self.assertEqual(
227 catalog_run2.schema.getAliasMap().get("slot_CalibFlux"),
228 norm_task2.config.normalized_calibflux_name,
229 )
231 # Check that the final normalized catalog values are the same.
232 self.assertFloatsAlmostEqual(
233 catalog_run2["slot_CalibFlux_instFlux"],
234 catalog_run1["slot_CalibFlux_instFlux"],
235 )
237 def testNormalizedCalibrationFluxApplyOnlyFail(self):
238 self.seed = 12345
239 norm_task = self._make_task()
240 catalog_run1 = self._make_catalog(norm_task.schema)
241 exposure_run1 = self.exposure.clone()
243 norm_task.run(catalog=catalog_run1, exposure=exposure_run1).ap_corr_map
245 self.seed = 12345
246 norm_task2 = self._make_task(apply_only=True)
247 catalog_run2 = self._make_catalog(norm_task.schema)
249 # Try without setting an aperture correction map at all.
250 with self.assertLogs(level=logging.WARNING) as cm:
251 _ = norm_task2.run(catalog=catalog_run2, exposure=exposure_run1)
252 warnings = '\n'.join(cm.output)
253 self.assertIn("does not have a valid normalization", warnings)
255 # Try again after setting an incomplete aperture correction map.
256 ap_corr_map_blank = lsst.afw.image.ApCorrMap()
257 exposure_run1.info.setApCorrMap(ap_corr_map_blank)
259 with self.assertLogs(level=logging.WARNING) as cm:
260 _ = norm_task2.run(catalog=catalog_run2, exposure=exposure_run1)
261 warnings = '\n'.join(cm.output)
262 self.assertIn("aperture correction map is missing base_CompensatedTophatFlux_12_instFlux", warnings)
264 def testNormalizedCalibrationFluxError(self):
266 self.seed = 12345
267 norm_task = self._make_task()
268 catalog = self._make_catalog(norm_task.schema)
269 catalog[norm_task.config.raw_calibflux_name + "_flag"] = True
270 nStars = len(catalog)
272 error_string = (f"There are no valid stars to compute normalized calibration fluxes. Of {nStars} "
273 "initially selected sources, 0 have good raw calibration fluxes and {nStars} have "
274 "good reference fluxes.")
275 with self.assertRaises(NormalizedCalibrationFluxError, msg=error_string):
276 norm_task.run(catalog=catalog, exposure=self.exposure)
279class TestMemory(lsst.utils.tests.MemoryTestCase):
280 pass
283def setup_module(module):
284 lsst.utils.tests.init()
287if __name__ == "__main__": 287 ↛ 288line 287 didn't jump to line 288 because the condition on line 287 was never true
288 lsst.utils.tests.init()
289 unittest.main()