Coverage for tests / test_fit_turbulence.py: 21%

89 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-07 08:43 +0000

1# This file is part of drp_tasks. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21import unittest 

22 

23import astropy.units as u 

24import numpy as np 

25import treegp 

26from astropy.table import Table, vstack 

27 

28import lsst.afw.table as afwTable 

29import lsst.utils.tests 

30from lsst.drp.tasks.fit_turbulence import ( 

31 GaussianProcessesTurbulenceFitConfig, 

32 GaussianProcessesTurbulenceFitTask, 

33) 

34from lsst.obs.base import createInitialSkyWcsFromBoresight 

35from lsst.obs.base.instrument_tests import DummyCam 

36from lsst.pipe.base import AnnotatedPartialOutputsError 

37 

38 

39class FitTurbulenceTestCase(lsst.utils.tests.TestCase): 

40 

41 def setUp(self): 

42 

43 visit = 1234 

44 self.noise = 0.01 

45 self.npoints = 250 

46 self.rng = np.random.RandomState(12345) 

47 

48 config = GaussianProcessesTurbulenceFitConfig() 

49 # Set the max separation lower, since we only have two detectors. 

50 config.correlationSeparationMax = 0.1 

51 config.initKernel = "15**2 * AnisotropicVonKarman(invLam=array([[1.0/0.1**2,0],[0,1.0/0.1**2]]))" 

52 config.splineNNodes = 60 

53 self.task = GaussianProcessesTurbulenceFitTask(config=config) 

54 

55 kernel_skl = treegp.eval_kernel(self.task.config.initKernel) 

56 

57 # Make a simulated WCS catalog and position catalog. 

58 self.wcsCatalog = self._makeWcsCatalog(visit) 

59 self.positions = self._makeResidualCatalog(self.wcsCatalog, visit, kernel_skl) 

60 

61 # Run the Gaussian Processes and predict at the input positions. 

62 self.gpx, self.gpy, *_ = self.task.runGP(self.wcsCatalog, self.positions) 

63 

64 sourceCatalog = Table( 

65 { 

66 "x": self.positions["xpix"], 

67 "y": self.positions["ypix"], 

68 "detector": self.positions["deviceName"], 

69 } 

70 ) 

71 self.predictCat = self.task.predict(self.gpx, self.gpy, self.wcsCatalog, sourceCatalog) 

72 

73 def _makeWcsCatalog(self, visit): 

74 """Make a simulated WCS catalog.""" 

75 camera = DummyCam().getCamera() 

76 schema = afwTable.ExposureTable.makeMinimalSchema() 

77 schema.addField("visit", type="L", doc="Visit number") 

78 wcsCatalog = afwTable.ExposureCatalog(schema) 

79 

80 boresight = lsst.geom.SpherePoint(150 * lsst.geom.degrees, 2.5 * lsst.geom.degrees) 

81 

82 for detectorId in [0, 1]: 

83 detector = camera[detectorId] 

84 wcs = createInitialSkyWcsFromBoresight(boresight, 0 * lsst.geom.degrees, detector) 

85 record = wcsCatalog.addNew() 

86 record.setId(detectorId) 

87 record.setWcs(wcs) 

88 record.setBBox(detector.getBBox()) 

89 wcsCatalog["visit"] = visit 

90 return wcsCatalog 

91 

92 def _makeResidualCatalog(self, wcsCatalog, visit, kernel): 

93 """Make a simulated catalog of source positions and residuals.""" 

94 visitPositions = [] 

95 for detector in [0, 1]: 

96 bbox = wcsCatalog[detector].getBBox() 

97 pixToTP = wcsCatalog[detector].wcs.getFrameDict().getMapping("PIXELS", "IWC") 

98 

99 x = self.rng.uniform(bbox.getBeginX(), bbox.getEndX(), self.npoints) 

100 y = self.rng.uniform(bbox.getBeginY(), bbox.getEndY(), self.npoints) 

101 xTP, yTP = pixToTP.applyForward(np.array([x, y])) 

102 xSky, ySky = wcsCatalog[detector].wcs.pixelToSkyArray(x, y, degrees=True) 

103 detectorPositions = Table( 

104 { 

105 "xpix": x, 

106 "ypix": y, 

107 "xworld": xTP, 

108 "yworld": yTP, 

109 "xsky": xSky, 

110 "ysky": ySky, 

111 } 

112 ) 

113 detectorPositions["deviceName"] = detector 

114 visitPositions.append(detectorPositions) 

115 visitPositions = vstack(visitPositions) 

116 totPoints = len(visitPositions) 

117 

118 K = kernel.__call__(np.array([visitPositions["xworld"], visitPositions["yworld"]]).T) 

119 dx = self.rng.multivariate_normal(np.zeros(totPoints), K) 

120 dy = self.rng.multivariate_normal(np.zeros(totPoints), K) 

121 

122 dx += self.rng.normal(scale=self.noise, size=totPoints) 

123 dy += self.rng.normal(scale=self.noise, size=totPoints) 

124 

125 residError = np.ones(totPoints) * self.noise 

126 visitPositions["xresw"] = dx 

127 visitPositions["yresw"] = dy 

128 visitPositions["covTotalW_00"] = residError**2 

129 visitPositions["covTotalW_11"] = residError**2 

130 visitPositions["exposureName"] = str(visit) 

131 

132 return visitPositions 

133 

134 def test_runGP(self): 

135 """Check that the predicted position is about the same as the simulated 

136 dx/dy values, modulo the scatter in the data.""" 

137 dRA = self.positions["xsky"] - self.predictCat["coord_ra"] 

138 dDec = self.positions["ysky"] - self.predictCat["coord_dec"] 

139 

140 np.testing.assert_allclose(dRA, (self.positions["xresw"] * u.mas).to(u.degree).value, rtol=1e-1) 

141 np.testing.assert_allclose(dDec, (self.positions["yresw"] * u.mas).to(u.degree).value, rtol=1e-1) 

142 

143 def test_makeWcs(self): 

144 """Check that the output WCS maps pixel positions to the same RA and 

145 Dec as the Gaussian Processes prediction. 

146 """ 

147 

148 wcsWithGP = self.task.addGPToWcs(self.gpx, self.gpy, self.wcsCatalog) 

149 for detectorRow in self.wcsCatalog: 

150 detector = detectorRow.getId() 

151 newWcs = wcsWithGP.find(detector).wcs 

152 

153 sources = self.predictCat[self.predictCat["detector"] == detector] 

154 ra, dec = newWcs.pixelToSkyArray(sources["x"], sources["y"], degrees=True) 

155 

156 np.testing.assert_allclose(ra, sources["coord_ra"]) 

157 np.testing.assert_allclose(dec, sources["coord_dec"], rtol=1e-6) 

158 

159 def test_raiseError(self): 

160 """Check that fitting insufficient data raises an 

161 AnnotatedPartialOutputsError.""" 

162 # Try fitting GP with only a small amount of data, which should fail. 

163 with self.assertRaises(AnnotatedPartialOutputsError): 

164 self.task.runGP(self.wcsCatalog, self.positions[:5]) 

165 

166 

167if __name__ == "__main__": 167 ↛ 168line 167 didn't jump to line 168 because the condition on line 167 was never true

168 lsst.utils.tests.init() 

169 unittest.main()