Coverage for tests / test_createApFakes.py: 21%

94 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-07 08:56 +0000

1# 

2# This file is part of ap_pipe. 

3# 

4# Developed for the LSST Data Management System. 

5# This product includes software developed by the LSST Project 

6# (http://www.lsst.org). 

7# See the COPYRIGHT file at the top-level directory of this distribution 

8# for details of code ownership. 

9# 

10# This program is free software: you can redistribute it and/or modify 

11# it under the terms of the GNU General Public License as published by 

12# the Free Software Foundation, either version 3 of the License, or 

13# (at your option) any later version. 

14# 

15# This program is distributed in the hope that it will be useful, 

16# but WITHOUT ANY WARRANTY; without even the implied warranty of 

17# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

18# GNU General Public License for more details. 

19# 

20# You should have received a copy of the GNU General Public License 

21# along with this program. If not, see <http://www.gnu.org/licenses/>. 

22# 

23 

24import numpy as np 

25import shutil 

26import tempfile 

27import unittest 

28 

29import lsst.daf.butler.tests as butlerTests 

30import lsst.geom as geom 

31from lsst.pipe.base import testUtils 

32import lsst.skymap as skyMap 

33import lsst.utils.tests 

34 

35from lsst.ap.pipe.createApFakes import CreateRandomApFakesTask, CreateRandomApFakesConfig 

36 

37 

38class TestCreateApFakes(lsst.utils.tests.TestCase): 

39 

40 def setUp(self): 

41 """ 

42 """ 

43 self.tractId = 0 

44 self.rng = np.random.default_rng(self.tractId) 

45 simpleMapConfig = skyMap.discreteSkyMap.DiscreteSkyMapConfig() 

46 simpleMapConfig.raList = [10] 

47 simpleMapConfig.decList = [-1] 

48 simpleMapConfig.radiusList = [0.1] 

49 self.simpleMap = skyMap.DiscreteSkyMap(simpleMapConfig) 

50 self.tract = self.simpleMap.generateTract(self.tractId) 

51 

52 bBox = self.tract.getOuterSkyPolygon().getBoundingBox() 

53 self.nSources = 50 

54 self.sourceDensity = (self.nSources 

55 / (bBox.getArea() * (180 / np.pi) ** 2)) 

56 self.fraction = 0.5 

57 self.nInVisit = (int(self.nSources * self.fraction) 

58 + int((1 - self.fraction) / 2 * self.nSources)) 

59 self.nInTemplate = (self.nSources - self.nInVisit 

60 + int(self.nSources * self.fraction)) 

61 

62 def testRunQuantum(self): 

63 """Test the run quantum method with a gen3 butler. 

64 """ 

65 root = tempfile.mkdtemp() 

66 dimensions = {"instrument": ["notACam"], 

67 "skymap": ["skyMap"], 

68 "tract": [0, 42], 

69 } 

70 testRepo = butlerTests.makeTestRepo(root, dimensions) 

71 with self.assertWarns(FutureWarning): 

72 fakesTask = CreateRandomApFakesTask() 

73 connections = fakesTask.config.ConnectionsClass( 

74 config=fakesTask.config) 

75 butlerTests.addDatasetType( 

76 testRepo, 

77 connections.skyMap.name, 

78 connections.skyMap.dimensions, 

79 connections.skyMap.storageClass) 

80 butlerTests.addDatasetType( 

81 testRepo, 

82 connections.fakeCat.name, 

83 connections.fakeCat.dimensions, 

84 connections.fakeCat.storageClass) 

85 

86 dataId = {"skymap": "skyMap", "tract": 0} 

87 butler = butlerTests.makeTestCollection(testRepo) 

88 butler.put(self.simpleMap, "skyMap", {"skymap": "skyMap"}) 

89 

90 quantum = testUtils.makeQuantum( 

91 fakesTask, butler, dataId, 

92 {"skyMap": {"skymap": dataId["skymap"]}, "fakeCat": dataId}) 

93 run = testUtils.runTestQuantum(fakesTask, butler, quantum, True) 

94 # Actual input dataset omitted for simplicity 

95 run.assert_called_once_with(tractId=dataId["tract"], skyMap=self.simpleMap) 

96 shutil.rmtree(root, ignore_errors=True) 

97 

98 def testRun(self): 

99 """Test the run method. 

100 """ 

101 with self.assertWarns(FutureWarning): 

102 fakesConfig = CreateRandomApFakesConfig() 

103 fakesConfig.fraction = 0.5 

104 fakesConfig.fakeDensity = self.sourceDensity 

105 

106 with self.assertWarns(FutureWarning): 

107 fakesTask = CreateRandomApFakesTask(config=fakesConfig) 

108 bBox = self.tract.getOuterSkyPolygon().getBoundingBox() 

109 result = fakesTask.run(self.tractId, self.simpleMap) 

110 fakeCat = result.fakeCat 

111 self.assertEqual(len(fakeCat), self.nSources) 

112 

113 for idx, row in fakeCat.iterrows(): 

114 self.assertTrue( 

115 bBox.contains( 

116 geom.SpherePoint(row[fakesTask.config.ra_col], 

117 row[fakesTask.config.dec_col], 

118 geom.degrees).getVector())) 

119 self.assertEqual(fakeCat[fakesConfig.visitSourceFlagCol].sum(), 

120 self.nInVisit) 

121 self.assertEqual(fakeCat[fakesConfig.templateSourceFlagCol].sum(), 

122 self.nInTemplate) 

123 for f in fakesConfig.filterSet: 

124 filterMags = fakeCat[fakesConfig.mag_col % f] 

125 self.assertEqual(self.nSources, len(filterMags)) 

126 self.assertTrue( 

127 np.all(fakesConfig.magMin <= filterMags)) 

128 self.assertTrue( 

129 np.all(fakesConfig.magMax > filterMags)) 

130 

131 def testVisitCoaddSubdivision(self): 

132 """Test that the number of assigned visit to template objects is 

133 correct. 

134 """ 

135 with self.assertWarns(FutureWarning): 

136 fakesConfig = CreateRandomApFakesConfig() 

137 fakesConfig.fraction = 0.5 

138 with self.assertWarns(FutureWarning): 

139 fakesTask = CreateRandomApFakesTask(config=fakesConfig) 

140 subdivision = fakesTask.createVisitCoaddSubdivision(self.nSources) 

141 self.assertEqual( 

142 subdivision[fakesConfig.visitSourceFlagCol].sum(), 

143 self.nInVisit) 

144 self.assertEqual( 

145 subdivision[fakesConfig.templateSourceFlagCol].sum(), 

146 self.nInTemplate) 

147 

148 def testRandomMagnitudes(self): 

149 """Test that the correct number of filters and magnitudes have been 

150 produced. 

151 This is using currently the filter mags and an additional non-filter mag 

152 column. In any case the magnitudes are all random and equal. 

153 """ 

154 with self.assertWarns(FutureWarning): 

155 fakesConfig = CreateRandomApFakesConfig() 

156 fakesConfig.filterSet = ["u", "g"] 

157 fakesConfig.mag_col = "%s_mag" 

158 fakesConfig.magMin = 20 

159 fakesConfig.magMax = 21 

160 with self.assertWarns(FutureWarning): 

161 fakesTask = CreateRandomApFakesTask(config=fakesConfig) 

162 mags = fakesTask.createRandomMagnitudes(self.nSources, self.rng) 

163 # this is because we have a column for mag without filter 

164 self.assertEqual(len(fakesConfig.filterSet) + 1, len(mags)) 

165 

166 for f in fakesConfig.filterSet: 

167 filterMags = mags[fakesConfig.mag_col % f] 

168 self.assertEqual(self.nSources, len(filterMags)) 

169 self.assertTrue( 

170 np.all(fakesConfig.magMin <= filterMags)) 

171 self.assertTrue( 

172 np.all(fakesConfig.magMax > filterMags)) 

173 

174 

175class MemoryTester(lsst.utils.tests.MemoryTestCase): 

176 pass 

177 

178 

179def setup_module(module): 

180 lsst.utils.tests.init() 

181 

182 

183if __name__ == "__main__": 183 ↛ 184line 183 didn't jump to line 184 because the condition on line 183 was never true

184 lsst.utils.tests.init() 

185 unittest.main()