Coverage for tests / test_astrometryTask.py: 18%

184 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-17 09:11 +0000

1# 

2# LSST Data Management System 

3# Copyright 2008-2017 LSST Corporation. 

4# 

5# This product includes software developed by the 

6# LSST Project (http://www.lsst.org/). 

7# 

8# This program is free software: you can redistribute it and/or modify 

9# it under the terms of the GNU General Public License as published by 

10# the Free Software Foundation, either version 3 of the License, or 

11# (at your option) any later version. 

12# 

13# This program is distributed in the hope that it will be useful, 

14# but WITHOUT ANY WARRANTY; without even the implied warranty of 

15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

16# GNU General Public License for more details. 

17# 

18# You should have received a copy of the LSST License Statement and 

19# the GNU General Public License along with this program. If not, 

20# see <http://www.lsstcorp.org/LegalNotices/>. 

21# 

22 

23import os.path 

24import math 

25import unittest 

26import glob 

27 

28import astropy.units as u 

29import scipy.stats 

30import numpy as np 

31 

32import lsst.utils.tests 

33import lsst.geom 

34import lsst.afw.geom as afwGeom 

35import lsst.afw.table as afwTable 

36import lsst.afw.image as afwImage 

37import lsst.meas.base as measBase 

38from lsst.meas.algorithms.testUtils import MockReferenceObjectLoaderFromFiles 

39from lsst.meas.astrom import AstrometryTask, exceptions 

40import lsst.pipe.base as pipeBase 

41 

42 

43class TestAstrometricSolver(lsst.utils.tests.TestCase): 

44 

45 def setUp(self): 

46 refCatDir = os.path.join(os.path.dirname(__file__), "data", "sdssrefcat") 

47 

48 self.bbox = lsst.geom.Box2I(lsst.geom.Point2I(0, 0), lsst.geom.Extent2I(3001, 3001)) 

49 crpix = lsst.geom.Box2D(self.bbox).getCenter() 

50 self.tanWcs = afwGeom.makeSkyWcs(crpix=crpix, 

51 crval=lsst.geom.SpherePoint(215.5, 53.0, lsst.geom.degrees), 

52 cdMatrix=afwGeom.makeCdMatrix(scale=5.1e-5*lsst.geom.degrees)) 

53 self.exposure = afwImage.ExposureF(self.bbox) 

54 self.exposure.setWcs(self.tanWcs) 

55 self.exposure.info.setVisitInfo(afwImage.VisitInfo(date=lsst.daf.base.DateTime(60000))) 

56 self.exposure.setFilter(afwImage.FilterLabel(band="r", physical="rTest")) 

57 filenames = sorted(glob.glob(os.path.join(refCatDir, 'ref_cats', 'cal_ref_cat', '??????.fits'))) 

58 self.refObjLoader = MockReferenceObjectLoaderFromFiles(filenames, htmLevel=8) 

59 

60 def testTrivial(self): 

61 """Test fit with no distortion 

62 """ 

63 self.doTest(afwGeom.makeIdentityTransform()) 

64 

65 def testRadial(self): 

66 """Test fit with radial distortion 

67 

68 The offset comes from the fact that the CCD is not centered 

69 """ 

70 self.doTest(afwGeom.makeRadialTransform([0, 1.01, 1e-7])) 

71 

72 def doTest(self, pixelsToTanPixels): 

73 """Test using pixelsToTanPixels to distort the source positions. 

74 """ 

75 schema = self._makeSourceCatalogSchema() 

76 config = AstrometryTask.ConfigClass() 

77 config.wcsFitter.order = 3 

78 config.wcsFitter.numRejIter = 0 

79 config.sourceSelector["science"].doCentroidErrorLimit = False 

80 task = AstrometryTask(config=config, refObjLoader=self.refObjLoader, schema=schema) 

81 distortedWcs = afwGeom.makeModifiedWcs(pixelTransform=pixelsToTanPixels, wcs=self.tanWcs, 

82 modifyActualPixels=False) 

83 # Make the source catalog at the distorted positions, but keep the 

84 # initial TAN WCS on the exposure, to check that the fitted WCS 

85 # is close to the distorted one and different from the input. 

86 sourceCat = self.makeSourceCat(distortedWcs, schema) 

87 # This test is from before rough magnitude rejection was implemented. 

88 config.doMagnitudeOutlierRejection = False 

89 results = task.run(sourceCat=sourceCat, exposure=self.exposure) 

90 

91 self.assertWcsAlmostEqualOverBBox(distortedWcs, self.exposure.wcs, self.bbox, 

92 maxDiffSky=0.002*lsst.geom.arcseconds, maxDiffPix=0.02) 

93 # Test that the sources used in the fit are flagged in the catalog. 

94 self.assertEqual(sum(sourceCat["calib_astrometry_used"]), len(results.matches)) 

95 

96 # The following tests that the measured mean astrometry offset is not 

97 # equal to its standard deviation (within the default tolerances of 

98 # double-precision epsilon). While this condition is not strictly 

99 # required to be true, in reality it would be highly unlikely and is 

100 # certainly not the case for this test as configured. The motivation 

101 # for adding this check is that it would have caught a refactor that 

102 # mistakenly set them both as the mean. 

103 self.assertFloatsNotEqual( 

104 self.exposure.metadata["SFM_ASTROM_OFFSET_MEAN"], self.exposure.metadata["SFM_ASTROM_OFFSET_STD"] 

105 ) 

106 

107 srcCoordKey = afwTable.CoordKey(sourceCat.schema["coord"]) 

108 refCoordKey = afwTable.CoordKey(results.refCat.schema["coord"]) 

109 refCentroidKey = afwTable.Point2DKey(results.refCat.schema["centroid"]) 

110 maxAngSep = 0*lsst.geom.radians 

111 maxPixSep = 0 

112 for refObj, src, d in results.matches: 

113 refCoord = refObj.get(refCoordKey) 

114 refPixPos = refObj.get(refCentroidKey) 

115 srcCoord = src.get(srcCoordKey) 

116 srcPixPos = src.getCentroid() 

117 

118 angSep = refCoord.separation(srcCoord) 

119 maxAngSep = max(maxAngSep, angSep) 

120 

121 pixSep = math.hypot(*(srcPixPos-refPixPos)) 

122 maxPixSep = max(maxPixSep, pixSep) 

123 print("max angular separation = %0.4f arcsec" % (maxAngSep.asArcseconds(),)) 

124 print("max pixel separation = %0.3f" % (maxPixSep,)) 

125 self.assertLess(maxAngSep.asArcseconds(), 0.0038) 

126 self.assertLess(maxPixSep, 0.021) 

127 

128 # try again, invoking the reference selector 

129 config.referenceSelector.doUnresolved = True 

130 config.referenceSelector.unresolved.name = 'resolved' 

131 solverRefSelect = AstrometryTask(config=config, refObjLoader=self.refObjLoader) 

132 self.exposure.setWcs(distortedWcs) 

133 resultsRefSelect = solverRefSelect.run( 

134 sourceCat=sourceCat, 

135 exposure=self.exposure, 

136 ) 

137 self.assertLess(len(resultsRefSelect.matches), len(results.matches)) 

138 

139 # try again, allowing magnitude outlier rejection. 

140 config.doMagnitudeOutlierRejection = True 

141 solverMagOutlierRejection = AstrometryTask(config=config, refObjLoader=self.refObjLoader) 

142 self.exposure.setWcs(distortedWcs) 

143 resultsMagOutlierRejection = solverMagOutlierRejection.run( 

144 sourceCat=sourceCat, 

145 exposure=self.exposure, 

146 ) 

147 self.assertLess(len(resultsMagOutlierRejection.matches), len(resultsRefSelect.matches)) 

148 config.doMagnitudeOutlierRejection = False 

149 

150 # try again, but without fitting the WCS, no reference selector 

151 config.referenceSelector.doUnresolved = False 

152 config.forceKnownWcs = True 

153 solverNoFit = AstrometryTask(config=config, refObjLoader=self.refObjLoader) 

154 self.exposure.setWcs(distortedWcs) 

155 resultsNoFit = solverNoFit.run( 

156 sourceCat=sourceCat, 

157 exposure=self.exposure, 

158 ) 

159 self.assertIsNone(resultsNoFit.scatterOnSky) 

160 

161 # fitting should result in matches that are at least as good 

162 # (strictly speaking fitting might result in a larger match list with 

163 # some outliers, but in practice this test passes) 

164 meanFitDist = np.mean([match.distance for match in results.matches]) 

165 meanNoFitDist = np.mean([match.distance for match in resultsNoFit.matches]) 

166 self.assertLessEqual(meanFitDist, meanNoFitDist) 

167 

168 # try once again, without fitting the WCS, with the reference selector 

169 # (this goes through a different code path) 

170 config.referenceSelector.doUnresolved = True 

171 solverNoFitRefSelect = AstrometryTask(config=config, refObjLoader=self.refObjLoader) 

172 resultsNoFitRefSelect = solverNoFitRefSelect.run( 

173 sourceCat=sourceCat, 

174 exposure=self.exposure, 

175 ) 

176 self.assertLess(len(resultsNoFitRefSelect.matches), len(resultsNoFit.matches)) 

177 

178 @staticmethod 

179 def _makeSourceCatalogSchema(): 

180 """Return a catalog schema with all necessary fields added. 

181 """ 

182 schema = afwTable.SourceTable.makeMinimalSchema() 

183 measBase.SingleFrameMeasurementTask(schema=schema) # expand the schema 

184 schema.addField("deblend_nChild", type=np.int32, 

185 doc="Number of children this object has (defaults to 0)") 

186 schema.addField("detect_isPrimary", type=np.int32, 

187 doc="true if source has no children and is not a sky source") 

188 return schema 

189 

190 def makeSourceCat(self, wcs, schema, doScatterCentroids=False): 

191 """Make a source catalog by reading the position reference stars using 

192 the proviced WCS. 

193 

194 Optionally, via doScatterCentroids, add some scatter to the centroids 

195 assiged to the source catalog (otherwise they will be identical to 

196 those of the reference catalog). 

197 """ 

198 loadRes = self.refObjLoader.loadPixelBox(bbox=self.bbox, wcs=wcs, filterName="r") 

199 refCat = loadRes.refCat 

200 

201 sourceCat = afwTable.SourceCatalog(schema) 

202 

203 sourceCat.resize(len(refCat)) 

204 scatterFactor = 1.0 

205 if doScatterCentroids: 

206 np.random.seed(12345) 

207 scatterFactor = np.random.uniform(0.999, 1.001, len(sourceCat)) 

208 sourceCat["slot_Centroid_x"] = scatterFactor*refCat["centroid_x"] 

209 sourceCat["slot_Centroid_y"] = scatterFactor*refCat["centroid_y"] 

210 sourceCat["slot_PsfFlux_instFlux"] = refCat["r_flux"] 

211 sourceCat["slot_PsfFlux_instFluxErr"] = refCat["r_flux"]/100 

212 # All of these sources are primary. 

213 sourceCat['detect_isPrimary'] = 1 

214 

215 # Deliberately add some outliers to check that the magnitude 

216 # outlier rejection code is being run. 

217 sourceCat["slot_PsfFlux_instFlux"][0: 4] *= 1000.0 

218 

219 return sourceCat 

220 

221 def testBadAstrometry(self): 

222 """Test that an appropriately informative exception is raised for a 

223 bad quality fit. 

224 """ 

225 catalog = self.makeSourceCat(self.tanWcs, self._makeSourceCatalogSchema()) 

226 # Fake match list with 10" match distance for every source to force 

227 # a bad quality fit result. 

228 matches = [afwTable.ReferenceMatch(r, r, (10*u.arcsecond).to_value(u.radian)) for r in catalog] 

229 result = pipeBase.Struct( 

230 matches=matches, 

231 wcs=self.exposure.wcs, 

232 scatterOnSky=20.0*lsst.geom.arcseconds, 

233 matchTolerance=None 

234 ) 

235 with unittest.mock.patch("lsst.meas.astrom.AstrometryTask._matchAndFitWcs", 

236 return_value=result, autospec=True): 

237 with self.assertRaises(exceptions.BadAstrometryFit): 

238 config = AstrometryTask.ConfigClass() 

239 config.sourceSelector["science"].doCentroidErrorLimit = False 

240 task = AstrometryTask(config=config, refObjLoader=self.refObjLoader) 

241 task.run(catalog, self.exposure) 

242 task.check(self.exposure, catalog, len(matches)) 

243 self.assertIsNone(self.exposure.wcs) 

244 self.assertTrue(np.all(np.isnan(catalog["coord_ra"]))) 

245 self.assertTrue(np.all(np.isnan(catalog["coord_dec"]))) 

246 

247 def testMatcherFails(self): 

248 """Test that a matcher exception has additional metadata attached. 

249 """ 

250 catalog = self.makeSourceCat(self.tanWcs, self._makeSourceCatalogSchema()) 

251 with unittest.mock.patch("lsst.meas.astrom.AstrometryTask._matchAndFitWcs", autospec=True, 

252 side_effect=exceptions.MatcherFailure("some matcher problem")): 

253 with self.assertRaises(exceptions.MatcherFailure) as cm: 

254 config = AstrometryTask.ConfigClass() 

255 config.sourceSelector["science"].doCentroidErrorLimit = False 

256 task = AstrometryTask(config=config, refObjLoader=self.refObjLoader) 

257 task.run(catalog, self.exposure) 

258 self.assertEqual(cm.exception.metadata["iterations"], 1) 

259 self.assertIsNone(self.exposure.wcs) 

260 self.assertTrue(np.all(np.isnan(catalog["coord_ra"]))) 

261 self.assertTrue(np.all(np.isnan(catalog["coord_dec"]))) 

262 

263 def testExceptions(self): 

264 """Test that the custom astrometry exceptions are well behaved. 

265 """ 

266 error = exceptions.AstrometryError("something", blah=10) 

267 self.assertEqual(error.metadata["blah"], 10) 

268 self.assertIn("something", str(error)) 

269 self.assertIn("'blah': 10", str(error)) 

270 

271 # Metadata cannot contain an astropy unit. 

272 error = exceptions.AstrometryError("something", blah=10*u.arcsecond) 

273 with self.assertRaisesRegex(TypeError, "blah is of type <class 'astropy.units.quantity.Quantity'>"): 

274 error.metadata 

275 

276 

277class TestMagnitudeOutliers(lsst.utils.tests.TestCase): 

278 def testMagnitudeOutlierRejection(self): 

279 """Test rejection of magnitude outliers. 

280 

281 This test only tests the outlier rejection, and not any other 

282 part of the matching or astrometry fitter. 

283 """ 

284 config = AstrometryTask.ConfigClass() 

285 config.doMagnitudeOutlierRejection = True 

286 config.magnitudeOutlierRejectionNSigma = 4.0 

287 task = AstrometryTask(config=config, refObjLoader=None) 

288 

289 nTest = 100 

290 

291 refSchema = lsst.afw.table.SimpleTable.makeMinimalSchema() 

292 refSchema.addField('refFlux', 'F') 

293 refCat = lsst.afw.table.SimpleCatalog(refSchema) 

294 refCat.resize(nTest) 

295 

296 srcSchema = lsst.afw.table.SourceTable.makeMinimalSchema() 

297 srcSchema.addField('srcFlux', 'F') 

298 srcCat = lsst.afw.table.SourceCatalog(srcSchema) 

299 srcCat.resize(nTest) 

300 

301 np.random.seed(12345) 

302 refMag = np.full(nTest, 20.0) 

303 srcMag = np.random.normal(size=nTest, loc=0.0, scale=1.0) 

304 

305 # Determine the sigma of the random sample 

306 zp = np.median(refMag[: -4] - srcMag[: -4]) 

307 sigma = scipy.stats.median_abs_deviation(srcMag[: -4], scale='normal') 

308 

309 # Deliberately alter some magnitudes to be outliers. 

310 srcMag[-3] = (config.magnitudeOutlierRejectionNSigma + 0.1)*sigma + (20.0 - zp) 

311 srcMag[-4] = -(config.magnitudeOutlierRejectionNSigma + 0.1)*sigma + (20.0 - zp) 

312 

313 refCat['refFlux'] = (refMag*u.ABmag).to_value(u.nJy) 

314 srcCat['srcFlux'] = 10.0**(srcMag/(-2.5)) 

315 

316 # Deliberately poison some reference fluxes. 

317 refCat['refFlux'][-1] = np.inf 

318 refCat['refFlux'][-2] = np.nan 

319 

320 matchesIn = [] 

321 for ref, src in zip(refCat, srcCat): 

322 matchesIn.append(lsst.afw.table.ReferenceMatch(first=ref, second=src, distance=0.0)) 

323 

324 matchesOut = task._removeMagnitudeOutliers('srcFlux', 'refFlux', matchesIn) 

325 

326 # We should lose the 4 outliers we created. 

327 self.assertEqual(len(matchesOut), len(matchesIn) - 4) 

328 

329 

330class MemoryTester(lsst.utils.tests.MemoryTestCase): 

331 pass 

332 

333 

334def setup_module(module): 

335 lsst.utils.tests.init() 

336 

337 

338if __name__ == "__main__": 338 ↛ 339line 338 didn't jump to line 339 because the condition on line 338 was never true

339 lsst.utils.tests.init() 

340 unittest.main()