Coverage for tests / test_createApFakes.py: 21%
94 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 09:10 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 09:10 +0000
1#
2# This file is part of ap_pipe.
3#
4# Developed for the LSST Data Management System.
5# This product includes software developed by the LSST Project
6# (http://www.lsst.org).
7# See the COPYRIGHT file at the top-level directory of this distribution
8# for details of code ownership.
9#
10# This program is free software: you can redistribute it and/or modify
11# it under the terms of the GNU General Public License as published by
12# the Free Software Foundation, either version 3 of the License, or
13# (at your option) any later version.
14#
15# This program is distributed in the hope that it will be useful,
16# but WITHOUT ANY WARRANTY; without even the implied warranty of
17# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18# GNU General Public License for more details.
19#
20# You should have received a copy of the GNU General Public License
21# along with this program. If not, see <http://www.gnu.org/licenses/>.
22#
24import numpy as np
25import shutil
26import tempfile
27import unittest
29import lsst.daf.butler.tests as butlerTests
30import lsst.geom as geom
31from lsst.pipe.base import testUtils
32import lsst.skymap as skyMap
33import lsst.utils.tests
35from lsst.ap.pipe.createApFakes import CreateRandomApFakesTask, CreateRandomApFakesConfig
38class TestCreateApFakes(lsst.utils.tests.TestCase):
40 def setUp(self):
41 """
42 """
43 self.tractId = 0
44 self.rng = np.random.default_rng(self.tractId)
45 simpleMapConfig = skyMap.discreteSkyMap.DiscreteSkyMapConfig()
46 simpleMapConfig.raList = [10]
47 simpleMapConfig.decList = [-1]
48 simpleMapConfig.radiusList = [0.1]
49 self.simpleMap = skyMap.DiscreteSkyMap(simpleMapConfig)
50 self.tract = self.simpleMap.generateTract(self.tractId)
52 bBox = self.tract.getOuterSkyPolygon().getBoundingBox()
53 self.nSources = 50
54 self.sourceDensity = (self.nSources
55 / (bBox.getArea() * (180 / np.pi) ** 2))
56 self.fraction = 0.5
57 self.nInVisit = (int(self.nSources * self.fraction)
58 + int((1 - self.fraction) / 2 * self.nSources))
59 self.nInTemplate = (self.nSources - self.nInVisit
60 + int(self.nSources * self.fraction))
62 def testRunQuantum(self):
63 """Test the run quantum method with a gen3 butler.
64 """
65 root = tempfile.mkdtemp()
66 dimensions = {"instrument": ["notACam"],
67 "skymap": ["skyMap"],
68 "tract": [0, 42],
69 }
70 testRepo = butlerTests.makeTestRepo(root, dimensions)
71 with self.assertWarns(FutureWarning):
72 fakesTask = CreateRandomApFakesTask()
73 connections = fakesTask.config.ConnectionsClass(
74 config=fakesTask.config)
75 butlerTests.addDatasetType(
76 testRepo,
77 connections.skyMap.name,
78 connections.skyMap.dimensions,
79 connections.skyMap.storageClass)
80 butlerTests.addDatasetType(
81 testRepo,
82 connections.fakeCat.name,
83 connections.fakeCat.dimensions,
84 connections.fakeCat.storageClass)
86 dataId = {"skymap": "skyMap", "tract": 0}
87 butler = butlerTests.makeTestCollection(testRepo)
88 butler.put(self.simpleMap, "skyMap", {"skymap": "skyMap"})
90 quantum = testUtils.makeQuantum(
91 fakesTask, butler, dataId,
92 {"skyMap": {"skymap": dataId["skymap"]}, "fakeCat": dataId})
93 run = testUtils.runTestQuantum(fakesTask, butler, quantum, True)
94 # Actual input dataset omitted for simplicity
95 run.assert_called_once_with(tractId=dataId["tract"], skyMap=self.simpleMap)
96 shutil.rmtree(root, ignore_errors=True)
98 def testRun(self):
99 """Test the run method.
100 """
101 with self.assertWarns(FutureWarning):
102 fakesConfig = CreateRandomApFakesConfig()
103 fakesConfig.fraction = 0.5
104 fakesConfig.fakeDensity = self.sourceDensity
106 with self.assertWarns(FutureWarning):
107 fakesTask = CreateRandomApFakesTask(config=fakesConfig)
108 bBox = self.tract.getOuterSkyPolygon().getBoundingBox()
109 result = fakesTask.run(self.tractId, self.simpleMap)
110 fakeCat = result.fakeCat
111 self.assertEqual(len(fakeCat), self.nSources)
113 for idx, row in fakeCat.iterrows():
114 self.assertTrue(
115 bBox.contains(
116 geom.SpherePoint(row[fakesTask.config.ra_col],
117 row[fakesTask.config.dec_col],
118 geom.degrees).getVector()))
119 self.assertEqual(fakeCat[fakesConfig.visitSourceFlagCol].sum(),
120 self.nInVisit)
121 self.assertEqual(fakeCat[fakesConfig.templateSourceFlagCol].sum(),
122 self.nInTemplate)
123 for f in fakesConfig.filterSet:
124 filterMags = fakeCat[fakesConfig.mag_col % f]
125 self.assertEqual(self.nSources, len(filterMags))
126 self.assertTrue(
127 np.all(fakesConfig.magMin <= filterMags))
128 self.assertTrue(
129 np.all(fakesConfig.magMax > filterMags))
131 def testVisitCoaddSubdivision(self):
132 """Test that the number of assigned visit to template objects is
133 correct.
134 """
135 with self.assertWarns(FutureWarning):
136 fakesConfig = CreateRandomApFakesConfig()
137 fakesConfig.fraction = 0.5
138 with self.assertWarns(FutureWarning):
139 fakesTask = CreateRandomApFakesTask(config=fakesConfig)
140 subdivision = fakesTask.createVisitCoaddSubdivision(self.nSources)
141 self.assertEqual(
142 subdivision[fakesConfig.visitSourceFlagCol].sum(),
143 self.nInVisit)
144 self.assertEqual(
145 subdivision[fakesConfig.templateSourceFlagCol].sum(),
146 self.nInTemplate)
148 def testRandomMagnitudes(self):
149 """Test that the correct number of filters and magnitudes have been
150 produced.
151 This is using currently the filter mags and an additional non-filter mag
152 column. In any case the magnitudes are all random and equal.
153 """
154 with self.assertWarns(FutureWarning):
155 fakesConfig = CreateRandomApFakesConfig()
156 fakesConfig.filterSet = ["u", "g"]
157 fakesConfig.mag_col = "%s_mag"
158 fakesConfig.magMin = 20
159 fakesConfig.magMax = 21
160 with self.assertWarns(FutureWarning):
161 fakesTask = CreateRandomApFakesTask(config=fakesConfig)
162 mags = fakesTask.createRandomMagnitudes(self.nSources, self.rng)
163 # this is because we have a column for mag without filter
164 self.assertEqual(len(fakesConfig.filterSet) + 1, len(mags))
166 for f in fakesConfig.filterSet:
167 filterMags = mags[fakesConfig.mag_col % f]
168 self.assertEqual(self.nSources, len(filterMags))
169 self.assertTrue(
170 np.all(fakesConfig.magMin <= filterMags))
171 self.assertTrue(
172 np.all(fakesConfig.magMax > filterMags))
175class MemoryTester(lsst.utils.tests.MemoryTestCase):
176 pass
179def setup_module(module):
180 lsst.utils.tests.init()
183if __name__ == "__main__": 183 ↛ 184line 183 didn't jump to line 184 because the condition on line 183 was never true
184 lsst.utils.tests.init()
185 unittest.main()