Coverage for tests / test_fit_turbulence.py: 21%
89 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 09:28 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 09:28 +0000
1# This file is part of drp_tasks.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
21import unittest
23import astropy.units as u
24import numpy as np
25import treegp
26from astropy.table import Table, vstack
28import lsst.afw.table as afwTable
29import lsst.utils.tests
30from lsst.drp.tasks.fit_turbulence import (
31 GaussianProcessesTurbulenceFitConfig,
32 GaussianProcessesTurbulenceFitTask,
33)
34from lsst.obs.base import createInitialSkyWcsFromBoresight
35from lsst.obs.base.instrument_tests import DummyCam
36from lsst.pipe.base import AnnotatedPartialOutputsError
39class FitTurbulenceTestCase(lsst.utils.tests.TestCase):
41 def setUp(self):
43 visit = 1234
44 self.noise = 0.01
45 self.npoints = 250
46 self.rng = np.random.RandomState(12345)
48 config = GaussianProcessesTurbulenceFitConfig()
49 # Set the max separation lower, since we only have two detectors.
50 config.correlationSeparationMax = 0.1
51 config.initKernel = "15**2 * AnisotropicVonKarman(invLam=array([[1.0/0.1**2,0],[0,1.0/0.1**2]]))"
52 config.splineNNodes = 60
53 self.task = GaussianProcessesTurbulenceFitTask(config=config)
55 kernel_skl = treegp.eval_kernel(self.task.config.initKernel)
57 # Make a simulated WCS catalog and position catalog.
58 self.wcsCatalog = self._makeWcsCatalog(visit)
59 self.positions = self._makeResidualCatalog(self.wcsCatalog, visit, kernel_skl)
61 # Run the Gaussian Processes and predict at the input positions.
62 self.gpx, self.gpy, *_ = self.task.runGP(self.wcsCatalog, self.positions)
64 sourceCatalog = Table(
65 {
66 "x": self.positions["xpix"],
67 "y": self.positions["ypix"],
68 "detector": self.positions["deviceName"],
69 }
70 )
71 self.predictCat = self.task.predict(self.gpx, self.gpy, self.wcsCatalog, sourceCatalog)
73 def _makeWcsCatalog(self, visit):
74 """Make a simulated WCS catalog."""
75 camera = DummyCam().getCamera()
76 schema = afwTable.ExposureTable.makeMinimalSchema()
77 schema.addField("visit", type="L", doc="Visit number")
78 wcsCatalog = afwTable.ExposureCatalog(schema)
80 boresight = lsst.geom.SpherePoint(150 * lsst.geom.degrees, 2.5 * lsst.geom.degrees)
82 for detectorId in [0, 1]:
83 detector = camera[detectorId]
84 wcs = createInitialSkyWcsFromBoresight(boresight, 0 * lsst.geom.degrees, detector)
85 record = wcsCatalog.addNew()
86 record.setId(detectorId)
87 record.setWcs(wcs)
88 record.setBBox(detector.getBBox())
89 wcsCatalog["visit"] = visit
90 return wcsCatalog
92 def _makeResidualCatalog(self, wcsCatalog, visit, kernel):
93 """Make a simulated catalog of source positions and residuals."""
94 visitPositions = []
95 for detector in [0, 1]:
96 bbox = wcsCatalog[detector].getBBox()
97 pixToTP = wcsCatalog[detector].wcs.getFrameDict().getMapping("PIXELS", "IWC")
99 x = self.rng.uniform(bbox.getBeginX(), bbox.getEndX(), self.npoints)
100 y = self.rng.uniform(bbox.getBeginY(), bbox.getEndY(), self.npoints)
101 xTP, yTP = pixToTP.applyForward(np.array([x, y]))
102 xSky, ySky = wcsCatalog[detector].wcs.pixelToSkyArray(x, y, degrees=True)
103 detectorPositions = Table(
104 {
105 "xpix": x,
106 "ypix": y,
107 "xworld": xTP,
108 "yworld": yTP,
109 "xsky": xSky,
110 "ysky": ySky,
111 }
112 )
113 detectorPositions["deviceName"] = detector
114 visitPositions.append(detectorPositions)
115 visitPositions = vstack(visitPositions)
116 totPoints = len(visitPositions)
118 K = kernel.__call__(np.array([visitPositions["xworld"], visitPositions["yworld"]]).T)
119 dx = self.rng.multivariate_normal(np.zeros(totPoints), K)
120 dy = self.rng.multivariate_normal(np.zeros(totPoints), K)
122 dx += self.rng.normal(scale=self.noise, size=totPoints)
123 dy += self.rng.normal(scale=self.noise, size=totPoints)
125 residError = np.ones(totPoints) * self.noise
126 visitPositions["xresw"] = dx
127 visitPositions["yresw"] = dy
128 visitPositions["covTotalW_00"] = residError**2
129 visitPositions["covTotalW_11"] = residError**2
130 visitPositions["exposureName"] = str(visit)
132 return visitPositions
134 def test_runGP(self):
135 """Check that the predicted position is about the same as the simulated
136 dx/dy values, modulo the scatter in the data."""
137 dRA = self.positions["xsky"] - self.predictCat["coord_ra"]
138 dDec = self.positions["ysky"] - self.predictCat["coord_dec"]
140 np.testing.assert_allclose(dRA, (self.positions["xresw"] * u.mas).to(u.degree).value, rtol=1e-1)
141 np.testing.assert_allclose(dDec, (self.positions["yresw"] * u.mas).to(u.degree).value, rtol=1e-1)
143 def test_makeWcs(self):
144 """Check that the output WCS maps pixel positions to the same RA and
145 Dec as the Gaussian Processes prediction.
146 """
148 wcsWithGP = self.task.addGPToWcs(self.gpx, self.gpy, self.wcsCatalog)
149 for detectorRow in self.wcsCatalog:
150 detector = detectorRow.getId()
151 newWcs = wcsWithGP.find(detector).wcs
153 sources = self.predictCat[self.predictCat["detector"] == detector]
154 ra, dec = newWcs.pixelToSkyArray(sources["x"], sources["y"], degrees=True)
156 np.testing.assert_allclose(ra, sources["coord_ra"])
157 np.testing.assert_allclose(dec, sources["coord_dec"], rtol=1e-6)
159 def test_raiseError(self):
160 """Check that fitting insufficient data raises an
161 AnnotatedPartialOutputsError."""
162 # Try fitting GP with only a small amount of data, which should fail.
163 with self.assertRaises(AnnotatedPartialOutputsError):
164 self.task.runGP(self.wcsCatalog, self.positions[:5])
167if __name__ == "__main__": 167 ↛ 168line 167 didn't jump to line 168 because the condition on line 167 was never true
168 lsst.utils.tests.init()
169 unittest.main()