Coverage for tests / test_jointcal.py: 21%
278 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-23 08:58 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-23 08:58 +0000
1# This file is part of jointcal.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22import itertools
23import os.path
24import unittest
25from unittest import mock
27import numpy as np
28import pyarrow.parquet
29import astropy.time
31import lsst.log
32import lsst.utils
34import lsst.afw.table
35import lsst.daf.butler
36from lsst.daf.base import DateTime
37import lsst.geom
38from lsst.meas.algorithms import getRefFluxField, ReferenceObjectLoader
39from lsst.meas.algorithms import convertReferenceCatalog
40import lsst.obs.base
41import lsst.pipe.base
42import lsst.jointcal
43from lsst.jointcal.jointcal import make_schema_table, extract_detector_catalog_from_visit_catalog
44from lsst.jointcal import MinimizeResult
45import lsst.jointcal.testUtils
48# for MemoryTestCase
49def setup_module(module):
50 lsst.utils.tests.init()
53def make_fake_refcat(center, flux, filterName):
54 """Make a fake reference catalog."""
55 schema = convertReferenceCatalog._makeSchema([filterName], fullPositionInformation=True)
56 catalog = lsst.afw.table.SimpleCatalog(schema)
57 record = catalog.addNew()
58 record.setCoord(center)
59 record[filterName + '_flux'] = flux
60 record[filterName + '_fluxErr'] = flux*0.1
61 record['pm_ra'] = lsst.geom.Angle(1)
62 record['pm_dec'] = lsst.geom.Angle(2)
63 record['epoch'] = 65432.1
64 return catalog
67def make_fake_wcs():
68 """Return two simple SkyWcs objects, with slightly different sky positions.
70 Use the same pixel origins as the cfht_minimal data, but put the sky origin
71 at RA=0
72 """
73 crpix = lsst.geom.Point2D(931.517869, 2438.572109)
74 cd = np.array([[5.19513851e-05, -2.81124812e-07],
75 [-3.25186974e-07, -5.19112119e-05]])
76 crval1 = lsst.geom.SpherePoint(0.01, -0.01, lsst.geom.degrees)
77 crval2 = lsst.geom.SpherePoint(-0.01, 0.01, lsst.geom.degrees)
78 wcs1 = lsst.afw.geom.makeSkyWcs(crpix, crval1, cd)
79 wcs2 = lsst.afw.geom.makeSkyWcs(crpix, crval2, cd)
80 return wcs1, wcs2
83class TestJointcalVisitCatalog(lsst.utils.tests.TestCase):
84 """Tests of jointcal's sourceTable_visit parquet ->single detector afw
85 table catalog unrolling.
86 """
87 def setUp(self):
88 filename = os.path.join(os.path.dirname(__file__),
89 "data/subselected-sourceTable-0034690.parq")
90 file = pyarrow.parquet.ParquetFile(filename)
91 self.data = file.read(use_pandas_metadata=True).to_pandas()
92 self.config = lsst.jointcal.jointcal.JointcalConfig()
93 # NOTE: This parquet file is older and uses the earlier
94 # "capitalize first letter" naming convention for these fields.
95 self.config.sourceFluxType = "ApFlux_12_0"
96 # we don't actually need either fitter to run for these tests
97 self.config.doAstrometry = False
98 self.config.doPhotometry = False
99 self.jointcal = lsst.jointcal.JointcalTask(config=self.config)
101 def test_make_catalog_schema(self):
102 """Check that the slot fields required by CcdImage::loadCatalog are in
103 the schema returned by _make_catalog_schema().
104 """
105 table = make_schema_table()
106 self.assertTrue(table.getCentroidSlot().getMeasKey().isValid())
107 self.assertTrue(table.getCentroidSlot().getErrKey().isValid())
108 self.assertTrue(table.getShapeSlot().getMeasKey().isValid())
110 def test_extract_detector_catalog_from_visit_catalog(self):
111 """Spot check a value output by the script that generated the test
112 parquet catalog and check that the size of the returned catalog
113 is correct for each detectior.
114 """
115 detectorId = 56
116 table = make_schema_table()
117 catalog = extract_detector_catalog_from_visit_catalog(table,
118 self.data,
119 detectorId,
120 ['Ixx', 'Iyy', 'Ixy'],
121 self.config.sourceFluxType,
122 self.jointcal.log)
124 # The test catalog has a number of elements for each detector equal to the detector id.
125 self.assertEqual(len(catalog), detectorId)
126 self.assertIn(29798723617816629, catalog['id'])
127 matched = catalog[29798723617816629 == catalog['id']]
128 self.assertEqual(1715.734359473175, matched['slot_Centroid_x'])
129 self.assertEqual(89.06076509964362, matched['slot_Centroid_y'])
132class JointcalTestBase:
133 def setUp(self):
134 struct = lsst.jointcal.testUtils.createTwoFakeCcdImages(100, 100)
135 self.ccdImageList = struct.ccdImageList
136 # so that countStars() returns nonzero results
137 for ccdImage in self.ccdImageList:
138 ccdImage.resetCatalogForFit()
140 self.goodChi2 = lsst.jointcal.Chi2Statistic()
141 # chi2/ndof == 2.0 should be non-bad
142 self.goodChi2.chi2 = 200.0
143 self.goodChi2.ndof = 100
145 self.badChi2 = lsst.jointcal.Chi2Statistic()
146 self.badChi2.chi2 = 600.0
147 self.badChi2.ndof = 100
149 self.nanChi2 = lsst.jointcal.Chi2Statistic()
150 self.nanChi2.chi2 = np.nan
151 self.nanChi2.ndof = 100
153 self.maxSteps = 20
154 self.name = "testing"
155 self.dataName = "fake"
156 self.whatToFit = "" # unneeded, since we're mocking the fitter
158 # Mock a Butler so the refObjLoaders have something to call `get()` on.
159 self.butler = unittest.mock.Mock(spec=lsst.daf.butler.Butler)
161 # Mock the association manager and give it access to the ccd list above.
162 self.associations = mock.Mock(spec=lsst.jointcal.Associations)
163 self.associations.getCcdImageList.return_value = self.ccdImageList
165 # a default config to be modified by individual tests
166 self.config = lsst.jointcal.jointcal.JointcalConfig()
169class TestJointcalIterateFit(JointcalTestBase, lsst.utils.tests.TestCase):
170 def setUp(self):
171 super().setUp()
172 # Mock the fitter and model, so we can force particular
173 # return values/exceptions. Default to "good" return values.
174 self.fitter = mock.Mock(spec=lsst.jointcal.PhotometryFit)
175 self.fitter.computeChi2.return_value = self.goodChi2
176 self.fitter.minimize.return_value = MinimizeResult.Converged
177 self.model = mock.Mock(spec=lsst.jointcal.SimpleFluxModel)
179 self.jointcal = lsst.jointcal.JointcalTask(config=self.config)
181 def test_iterateFit_success(self):
182 chi2 = self.jointcal._iterate_fit(self.associations, self.fitter,
183 self.maxSteps, self.name, self.whatToFit)
184 self.assertEqual(chi2, self.goodChi2)
185 # Once for the for loop, the second time for the rank update.
186 self.assertEqual(self.fitter.minimize.call_count, 2)
188 def test_iterateFit_writeChi2Outer(self):
189 chi2 = self.jointcal._iterate_fit(self.associations, self.fitter,
190 self.maxSteps, self.name, self.whatToFit,
191 dataName=self.dataName)
192 self.assertEqual(chi2, self.goodChi2)
193 # Once for the for loop, the second time for the rank update.
194 self.assertEqual(self.fitter.minimize.call_count, 2)
195 # Default config should not call saveChi2Contributions
196 self.fitter.saveChi2Contributions.assert_not_called()
198 def test_iterateFit_failed(self):
199 self.fitter.minimize.return_value = MinimizeResult.Failed
201 with self.assertRaises(RuntimeError):
202 self.jointcal._iterate_fit(self.associations, self.fitter,
203 self.maxSteps, self.name, self.whatToFit)
204 self.assertEqual(self.fitter.minimize.call_count, 1)
206 def test_iterateFit_badFinalChi2(self):
207 self.fitter.computeChi2.return_value = self.badChi2
209 with self.assertLogs("lsst.jointcal", level="INFO") as cm:
210 chi2 = self.jointcal._iterate_fit(self.associations, self.fitter,
211 self.maxSteps, self.name, self.whatToFit)
212 self.assertEqual(chi2, self.badChi2)
213 self.assertIn(f"INFO:lsst.jointcal:Fit completed {self.badChi2}", cm.output)
214 self.assertIn("ERROR:lsst.jointcal:Potentially bad fit: High chi-squared/ndof.", cm.output)
216 def test_iterateFit_exceedMaxSteps(self):
217 self.fitter.minimize.return_value = MinimizeResult.Chi2Increased
218 maxSteps = 3
220 with self.assertLogs("lsst.jointcal", level="ERROR") as cm:
221 chi2 = self.jointcal._iterate_fit(self.associations, self.fitter,
222 maxSteps, self.name, self.whatToFit)
223 self.assertEqual(chi2, self.goodChi2)
224 self.assertEqual(self.fitter.minimize.call_count, maxSteps)
225 self.assertIn(f"ERROR:lsst.jointcal:testing failed to converge after {maxSteps} steps", cm.output)
227 def test_moderate_chi2_increase(self):
228 """DM-25159: warn, but don't fail, on moderate chi2 increases between
229 steps.
230 """
231 chi2_1 = lsst.jointcal.Chi2Statistic()
232 chi2_1.chi2 = 100.0
233 chi2_1.ndof = 100
234 chi2_2 = lsst.jointcal.Chi2Statistic()
235 chi2_2.chi2 = 300.0
236 chi2_2.ndof = 100
238 chi2s = [self.goodChi2, chi2_1, chi2_2, self.goodChi2, self.goodChi2]
239 self.fitter.computeChi2.side_effect = chi2s
240 self.fitter.minimize.side_effect = [MinimizeResult.Chi2Increased,
241 MinimizeResult.Chi2Increased,
242 MinimizeResult.Chi2Increased,
243 MinimizeResult.Converged,
244 MinimizeResult.Converged]
245 with self.assertLogs("lsst.jointcal", level="WARNING") as cm:
246 self.jointcal._iterate_fit(self.associations, self.fitter,
247 self.maxSteps, self.name, self.whatToFit)
248 msg = "Significant chi2 increase by a factor of 300 / 100 = 3"
249 self.assertIn(f"WARNING:lsst.jointcal:{msg}", cm.output)
251 def test_large_chi2_increase_fails(self):
252 """DM-25159: fail on large chi2 increases between steps."""
253 chi2_1 = lsst.jointcal.Chi2Statistic()
254 chi2_1.chi2 = 1e11
255 chi2_1.ndof = 100
256 chi2_2 = lsst.jointcal.Chi2Statistic()
257 chi2_2.chi2 = 1.123456e13 # to check floating point formatting
258 chi2_2.ndof = 100
260 chi2s = [chi2_1, chi2_1, chi2_2]
261 self.fitter.computeChi2.side_effect = chi2s
262 self.fitter.minimize.return_value = MinimizeResult.Chi2Increased
263 with self.assertLogs("lsst.jointcal", level="WARNING") as cm:
264 with self.assertRaisesRegex(RuntimeError, "Large chi2 increase"):
265 self.jointcal._iterate_fit(self.associations, self.fitter,
266 self.maxSteps, self.name, self.whatToFit)
267 msg = "Significant chi2 increase by a factor of 1.123e+13 / 1e+11 = 112.3"
268 self.assertIn(f"WARNING:lsst.jointcal:{msg}", cm.output)
270 def test_invalid_model(self):
271 self.model.validate.return_value = False
272 with self.assertRaises(ValueError):
273 self.jointcal._logChi2AndValidate(self.associations, self.fitter, self.model, "invalid")
275 def test_nonfinite_chi2(self):
276 self.fitter.computeChi2.return_value = self.nanChi2
277 with self.assertRaises(FloatingPointError):
278 self.jointcal._logChi2AndValidate(self.associations, self.fitter, self.model, "nonfinite")
280 def test_writeChi2(self):
281 filename = "somefile"
282 self.jointcal._logChi2AndValidate(self.associations, self.fitter, self.model, "writeCh2",
283 writeChi2Name=filename)
284 # logChi2AndValidate prepends `config.debugOutputPath` to the filename
285 self.fitter.saveChi2Contributions.assert_called_with("./"+filename+"{type}")
288class TestJointcalLoadRefCat(JointcalTestBase, lsst.utils.tests.TestCase):
290 def _make_fake_refcat(self):
291 """Mock a fake reference catalog and the bits necessary to use it."""
292 center = lsst.geom.SpherePoint(30, -30, lsst.geom.degrees)
293 flux = 10
294 radius = 1 * lsst.geom.degrees
295 filter = lsst.afw.image.FilterLabel(band='fake', physical="fake-filter")
297 fakeRefCat = make_fake_refcat(center, flux, filter.bandLabel)
298 fluxField = getRefFluxField(fakeRefCat.schema, filter.bandLabel)
299 returnStruct = lsst.pipe.base.Struct(refCat=fakeRefCat, fluxField=fluxField)
300 refObjLoader = mock.Mock(spec=ReferenceObjectLoader)
301 refObjLoader.loadSkyCircle.return_value = returnStruct
303 return refObjLoader, center, radius, filter, fakeRefCat
305 def test_load_reference_catalog(self):
306 refObjLoader, center, radius, filterLabel, fakeRefCat = self._make_fake_refcat()
308 config = lsst.jointcal.jointcal.JointcalConfig()
309 config.astrometryReferenceErr = 0.1 # our test refcats don't have coord errors
310 jointcal = lsst.jointcal.JointcalTask(config=config)
312 # NOTE: we cannot test application of proper motion here, because we
313 # mock the refObjLoader, so the real loader is never called.
314 refCat, fluxField = jointcal._load_reference_catalog(refObjLoader,
315 jointcal.astrometryReferenceSelector,
316 center,
317 radius,
318 filterLabel)
319 # operator== isn't implemented for Catalogs, so we have to check like
320 # this, in case the records are copied during load.
321 self.assertEqual(len(refCat), len(fakeRefCat))
322 for r1, r2 in zip(refCat, fakeRefCat):
323 self.assertEqual(r1, r2)
325 def test_load_reference_catalog_subselect(self):
326 """Test that we can select out the one source in the fake refcat
327 with a ridiculous S/N cut.
328 """
329 refObjLoader, center, radius, filterLabel, fakeRefCat = self._make_fake_refcat()
331 config = lsst.jointcal.jointcal.JointcalConfig()
332 config.astrometryReferenceErr = 0.1 # our test refcats don't have coord errors
333 config.astrometryReferenceSelector.doSignalToNoise = True
334 config.astrometryReferenceSelector.signalToNoise.minimum = 1e10
335 config.astrometryReferenceSelector.signalToNoise.fluxField = "fake_flux"
336 config.astrometryReferenceSelector.signalToNoise.errField = "fake_fluxErr"
337 jointcal = lsst.jointcal.JointcalTask(config=config)
339 refCat, fluxField = jointcal._load_reference_catalog(refObjLoader,
340 jointcal.astrometryReferenceSelector,
341 center,
342 radius,
343 filterLabel)
344 self.assertEqual(len(refCat), 0)
347class TestJointcalFitModel(JointcalTestBase, lsst.utils.tests.TestCase):
348 def test_fit_photometry_writeChi2(self):
349 """Test that we are calling saveChi2 with appropriate file prefixes."""
350 self.config.photometryModel = "constrainedFlux"
351 self.config.writeChi2FilesOuterLoop = True
352 jointcal = lsst.jointcal.JointcalTask(config=self.config)
353 jointcal.focalPlaneBBox = lsst.geom.Box2D()
355 # Mock the fitter, so we can pretend it found a good fit
356 with mock.patch("lsst.jointcal.PhotometryFit", autospec=True) as fitPatch:
357 fitPatch.return_value.computeChi2.return_value = self.goodChi2
358 fitPatch.return_value.minimize.return_value = MinimizeResult.Converged
360 # config.debugOutputPath is prepended to the filenames that go into saveChi2Contributions
361 expected = ["./photometry_init-ModelVisit_chi2", "./photometry_init-Model_chi2",
362 "./photometry_init-Fluxes_chi2", "./photometry_init-ModelFluxes_chi2"]
363 expected = [mock.call(x+"-fake{type}") for x in expected]
364 jointcal._fit_photometry(self.associations, dataName=self.dataName)
365 fitPatch.return_value.saveChi2Contributions.assert_has_calls(expected)
367 def test_fit_astrometry_writeChi2(self):
368 """Test that we are calling saveChi2 with appropriate file prefixes."""
369 self.config.astrometryModel = "constrained"
370 self.config.writeChi2FilesOuterLoop = True
371 jointcal = lsst.jointcal.JointcalTask(config=self.config)
372 jointcal.focalPlaneBBox = lsst.geom.Box2D()
374 # Mock the fitter, so we can pretend it found a good fit
375 fitPatch = mock.patch("lsst.jointcal.AstrometryFit")
376 # Mock the projection handler so we don't segfault due to not-fully initialized ccdImages
377 projectorPatch = mock.patch("lsst.jointcal.OneTPPerVisitHandler")
378 with fitPatch as fit, projectorPatch as projector:
379 fit.return_value.computeChi2.return_value = self.goodChi2
380 fit.return_value.minimize.return_value = MinimizeResult.Converged
381 # return a real ProjectionHandler to keep ConstrainedAstrometryModel() happy
382 projector.return_value = lsst.jointcal.IdentityProjectionHandler()
384 # config.debugOutputPath is prepended to the filenames that go into saveChi2Contributions
385 expected = ["./astrometry_init-DistortionsVisit_chi2", "./astrometry_init-Distortions_chi2",
386 "./astrometry_init-Positions_chi2", "./astrometry_init-DistortionsPositions_chi2"]
387 expected = [mock.call(x+"-fake{type}") for x in expected]
388 jointcal._fit_astrometry(self.associations, dataName=self.dataName)
389 fit.return_value.saveChi2Contributions.assert_has_calls(expected)
392class TestComputeBoundingCircle(lsst.utils.tests.TestCase):
393 """Tests of Associations.computeBoundingCircle()"""
394 def _checkPointsInCircle(self, points, center, radius):
395 """Check that all points are within the (center, radius) circle.
397 The test is whether the max(points - center) separation is equal to
398 (or slightly less than) radius.
399 """
400 maxSeparation = 0*lsst.geom.degrees
401 for point in points:
402 maxSeparation = max(maxSeparation, center.separation(point))
403 self.assertAnglesAlmostEqual(maxSeparation, radius, maxDiff=3*lsst.geom.arcseconds)
404 self.assertLess(maxSeparation, radius)
406 def _testPoints(self, ccdImage1, ccdImage2, skyWcs1, skyWcs2, bbox):
407 """Fill an Associations object and test that it computes the correct
408 bounding circle for the input data.
410 Parameters
411 ----------
412 ccdImage1, ccdImage2 : `lsst.jointcal.CcdImage`
413 The CcdImages to add to the Associations object.
414 skyWcs1, skyWcs2 : `lsst.afw.geom.SkyWcs`
415 The WCS of each of the above images.
416 bbox : `lsst.geom.Box2D`
417 The ccd bounding box of both images.
418 """
419 lsst.log.setLevel('lsst.jointcal', lsst.log.DEBUG)
420 associations = lsst.jointcal.Associations()
421 associations.addCcdImage(ccdImage1)
422 associations.addCcdImage(ccdImage2)
423 associations.computeCommonTangentPoint()
425 circle = associations.computeBoundingCircle()
426 center = lsst.geom.SpherePoint(circle.getCenter())
427 radius = lsst.geom.Angle(circle.getOpeningAngle().asRadians(), lsst.geom.radians)
428 points = [lsst.geom.SpherePoint(skyWcs1.pixelToSky(lsst.geom.Point2D(x)))
429 for x in bbox.getCorners()]
430 points.extend([lsst.geom.SpherePoint(skyWcs2.pixelToSky(lsst.geom.Point2D(x)))
431 for x in bbox.getCorners()])
432 self._checkPointsInCircle(points, center, radius)
434 def testPoints(self):
435 """Test for points in an "easy" area, far from RA=0 or the poles."""
436 struct = lsst.jointcal.testUtils.createTwoFakeCcdImages()
437 self._testPoints(struct.ccdImageList[0], struct.ccdImageList[1],
438 struct.skyWcs[0], struct.skyWcs[1], struct.bbox)
440 def testPointsRA0(self):
441 """Test for CcdImages crossing RA=0; this demonstrates a fix for
442 the bug described in DM-19802.
443 """
444 wcs1, wcs2 = make_fake_wcs()
446 # Put the visit boresights at the WCS origin, for consistency
447 visitInfo1 = lsst.afw.image.VisitInfo(date=DateTime(date=65321.1),
448 boresightRaDec=wcs1.getSkyOrigin())
449 visitInfo2 = lsst.afw.image.VisitInfo(date=DateTime(date=65322.1),
450 boresightRaDec=wcs1.getSkyOrigin())
452 struct = lsst.jointcal.testUtils.createTwoFakeCcdImages(fakeWcses=[wcs1, wcs2],
453 fakeVisitInfos=[visitInfo1, visitInfo2])
454 self._testPoints(struct.ccdImageList[0], struct.ccdImageList[1],
455 struct.skyWcs[0], struct.skyWcs[1], struct.bbox)
458class TestJointcalComputePMDate(JointcalTestBase, lsst.utils.tests.TestCase):
459 """Tests of jointcal._compute_proper_motion_epoch(), using fake dates."""
460 def test_compute_proper_motion_epoch(self):
461 mjds = np.array((65432.1, 66666.0, 55555.0, 44322.2))
463 wcs1, wcs2 = make_fake_wcs()
464 visitInfo1 = lsst.afw.image.VisitInfo(date=DateTime(date=mjds[0]),
465 boresightRaDec=wcs1.getSkyOrigin())
466 visitInfo2 = lsst.afw.image.VisitInfo(date=DateTime(date=mjds[1]),
467 boresightRaDec=wcs2.getSkyOrigin())
468 visitInfo3 = lsst.afw.image.VisitInfo(date=DateTime(date=mjds[2]),
469 boresightRaDec=wcs1.getSkyOrigin())
470 visitInfo4 = lsst.afw.image.VisitInfo(date=DateTime(date=mjds[3]),
471 boresightRaDec=wcs2.getSkyOrigin())
473 struct1 = lsst.jointcal.testUtils.createTwoFakeCcdImages(fakeWcses=[wcs1, wcs2],
474 fakeVisitInfos=[visitInfo1, visitInfo2])
475 struct2 = lsst.jointcal.testUtils.createTwoFakeCcdImages(fakeWcses=[wcs1, wcs2],
476 fakeVisitInfos=[visitInfo3, visitInfo4])
477 ccdImageList = list(itertools.chain(struct1.ccdImageList, struct2.ccdImageList))
478 associations = lsst.jointcal.Associations()
479 for ccdImage in ccdImageList:
480 associations.addCcdImage(ccdImage)
481 associations.computeCommonTangentPoint()
483 jointcal = lsst.jointcal.JointcalTask(config=self.config)
484 result = jointcal._compute_proper_motion_epoch(ccdImageList)
485 self.assertEqual(result.jyear, (astropy.time.Time(mjds, format="mjd", scale="tai").jyear).mean())
488class MemoryTester(lsst.utils.tests.MemoryTestCase):
489 pass
492if __name__ == "__main__": 492 ↛ 493line 492 didn't jump to line 493 because the condition on line 492 was never true
493 lsst.utils.tests.init()
494 unittest.main()