Coverage for tests / test_transformDiaSourceCatalog.py: 22%

110 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-24 08:39 +0000

1# This file is part of ap_association 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22import os 

23import unittest 

24 

25import numpy as np 

26 

27from lsst.ap.association.transformDiaSourceCatalog import (TransformDiaSourceCatalogConfig, 

28 TransformDiaSourceCatalogTask) 

29from lsst.afw.cameraGeom.testUtils import DetectorWrapper 

30import lsst.daf.base as dafBase 

31import lsst.afw.image as afwImage 

32import lsst.geom as geom 

33import lsst.meas.base.tests as measTests 

34from lsst.pipe.base import Struct 

35import lsst.utils.tests 

36 

37from lsst.ap.association.transformDiaSourceCatalog import UnpackApdbFlags 

38 

39TESTDIR = os.path.abspath(os.path.dirname(__file__)) 

40 

41 

42class TestTransformDiaSourceCatalogTask(unittest.TestCase): 

43 def setUp(self): 

44 # Create an instance of random generator with fixed seed. 

45 rng = np.random.default_rng(1234) 

46 

47 self.nSources = 10 

48 # Default PSF size (psfDim in makeEmptyExposure) in TestDataset results 

49 # in an 18 pixel wide source box. 

50 self.bboxSize = 18 

51 self.yLoc = 100 

52 self.bbox = geom.Box2I(geom.Point2I(0, 0), 

53 geom.Extent2I(1024, 1153)) 

54 dataset = measTests.TestDataset(self.bbox) 

55 for srcIdx in range(self.nSources-1): 

56 # Place sources at (index, yLoc), so we can distinguish them later. 

57 dataset.addSource(100000.0, geom.Point2D(srcIdx, self.yLoc)) 

58 # Ensure the last source has no peak `significance` field. 

59 dataset.addSource(100000.0, geom.Point2D(srcIdx+1, self.yLoc), setPeakSignificance=False) 

60 schema = dataset.makeMinimalSchema() 

61 schema.addField("base_PixelFlags_flag", type="Flag") 

62 schema.addField("base_PixelFlags_flag_offimage", type="Flag") 

63 schema.addField("sky_source", type="Flag", doc="Sky objects.") 

64 schema.addField("reliability", doc="real/bogus score of this source", type=float) 

65 self.exposure, self.inputCatalog = dataset.realize(10.0, schema, randomSeed=1234) 

66 # Create schemas for use in initializing the TransformDiaSourceCatalog task. 

67 self.initInputs = {"diaSourceSchema": Struct(schema=schema)} 

68 self.initInputsBadFlags = {"diaSourceSchema": Struct(schema=dataset.makeMinimalSchema())} 

69 

70 self.inputCatalog["reliability"] = rng.random(self.nSources, dtype=np.float32) 

71 

72 self.expId = 4321 

73 self.date = dafBase.DateTime(nsecs=1400000000 * 10**9) 

74 detector = DetectorWrapper(id=23, bbox=self.exposure.getBBox()).detector 

75 visit = afwImage.VisitInfo( 

76 id=self.expId, 

77 exposureTime=200., 

78 date=self.date) 

79 self.exposure.info.id = self.expId + (10000*detector.getId()) 

80 self.exposure.setDetector(detector) 

81 self.exposure.info.setVisitInfo(visit) 

82 self.band = 'g' 

83 self.exposure.setFilter(afwImage.FilterLabel(band=self.band, physical='g.MP9401')) 

84 scale = 2 

85 scaleErr = 1 

86 self.photoCalib = afwImage.PhotoCalib(scale, scaleErr) 

87 self.exposure.setPhotoCalib(self.photoCalib) 

88 

89 self.config = TransformDiaSourceCatalogConfig() 

90 self.config.flagMap = os.path.join(TESTDIR, "data", "test-flag-map.yaml") 

91 self.config.functorFile = os.path.join(TESTDIR, 

92 "data", 

93 "testDiaSource.yaml") 

94 

95 def test_run(self): 

96 """Test output dataFrame is created and values are correctly inserted 

97 from the exposure. 

98 """ 

99 transformTask = TransformDiaSourceCatalogTask(initInputs=self.initInputs, 

100 config=self.config) 

101 result = transformTask.run(self.inputCatalog, 

102 self.exposure, 

103 self.band) 

104 

105 self.assertEqual(len(result.diaSourceTable), len(self.inputCatalog)) 

106 np.testing.assert_array_equal(result.diaSourceTable["bboxSize"], [self.bboxSize]*self.nSources) 

107 np.testing.assert_array_equal(result.diaSourceTable["visit"], 

108 [self.exposure.visitInfo.id]*self.nSources) 

109 np.testing.assert_array_equal(result.diaSourceTable["detector"], 

110 [self.exposure.detector.getId()]*self.nSources) 

111 np.testing.assert_array_equal(result.diaSourceTable["band"], [self.band]*self.nSources) 

112 np.testing.assert_array_equal(result.diaSourceTable["midpointMjdTai"], 

113 [self.date.get(system=dafBase.DateTime.MJD)]*self.nSources) 

114 np.testing.assert_array_equal(result.diaSourceTable["diaObjectId"], [0]*self.nSources) 

115 np.testing.assert_array_equal(result.diaSourceTable["x"], np.arange(self.nSources)) 

116 # The final snr value should be NaN because it doesn't have a peak significance field. 

117 expect_snr = [397.887353515625]*9 

118 expect_snr.append(np.nan) 

119 # Have to use allclose because assert_array_equal doesn't support equal_nan. 

120 np.testing.assert_allclose(result.diaSourceTable["snr"], expect_snr, equal_nan=True, rtol=0) 

121 

122 def test_run_with_apdb_schema(self): 

123 """Test output dataFrame is created and values are correctly inserted 

124 from the exposure. 

125 """ 

126 self.config.doUseSchema = True 

127 transformTask = TransformDiaSourceCatalogTask(initInputs=self.initInputs, 

128 config=self.config) 

129 result = transformTask.run(self.inputCatalog, 

130 self.exposure, 

131 self.band) 

132 

133 self.assertEqual(len(result.diaSourceTable), len(self.inputCatalog)) 

134 np.testing.assert_array_equal(result.diaSourceTable["bboxSize"], [self.bboxSize]*self.nSources) 

135 np.testing.assert_array_equal(result.diaSourceTable["visit"], 

136 [self.exposure.visitInfo.id]*self.nSources) 

137 np.testing.assert_array_equal(result.diaSourceTable["detector"], 

138 [self.exposure.detector.getId()]*self.nSources) 

139 np.testing.assert_array_equal(result.diaSourceTable["band"], [self.band]*self.nSources) 

140 np.testing.assert_array_equal(result.diaSourceTable["midpointMjdTai"], 

141 [self.date.get(system=dafBase.DateTime.MJD)]*self.nSources) 

142 np.testing.assert_array_equal(result.diaSourceTable["diaObjectId"], [0]*self.nSources) 

143 np.testing.assert_array_equal(result.diaSourceTable["x"], np.arange(self.nSources)) 

144 # The final snr value should be NaN because it doesn't have a peak significance field. 

145 expect_snr = [397.887353515625]*9 

146 expect_snr.append(np.nan) 

147 # Downcast to single precision to match `sdm_schemas` 

148 expect_snr = np.array(expect_snr, dtype=np.float32) 

149 # Have to use allclose because assert_array_equal doesn't support equal_nan. 

150 np.testing.assert_allclose(result.diaSourceTable["snr"], expect_snr, equal_nan=True, rtol=0) 

151 

152 def test_run_dia_source_wrong_flags(self): 

153 """Test that the proper errors are thrown when requesting flag columns 

154 that are not in the input schema. 

155 """ 

156 with self.assertRaises(KeyError): 

157 TransformDiaSourceCatalogTask(initInputs=self.initInputsBadFlags) 

158 

159 def test_computeBBoxSize(self): 

160 transform = TransformDiaSourceCatalogTask(initInputs=self.initInputs, 

161 config=self.config) 

162 boxSizes = transform.computeBBoxSizes(self.inputCatalog) 

163 

164 for size in boxSizes: 

165 self.assertEqual(size, self.bboxSize) 

166 self.assertEqual(len(boxSizes), self.nSources) 

167 

168 def test_flag_existence_check(self): 

169 unpacker = UnpackApdbFlags(self.config.flagMap, "DiaSource") 

170 

171 self.assertTrue(unpacker.flagExists('base_PixelFlags_flag')) 

172 self.assertFalse(unpacker.flagExists('')) 

173 with self.assertRaisesRegex(ValueError, 'column doesNotExist not in flag map'): 

174 unpacker.flagExists('base_PixelFlags_flag', columnName='doesNotExist') 

175 

176 def test_flag_bitmask(self): 

177 """Test that we get the expected bitmask back from supplied flag names. 

178 """ 

179 unpacker = UnpackApdbFlags(self.config.flagMap, "DiaSource") 

180 

181 with self.assertRaisesRegex(ValueError, "flag '' not included"): 

182 unpacker.makeFlagBitMask(['']) 

183 with self.assertRaisesRegex(ValueError, 'column doesNotExist not in flag map'): 

184 unpacker.makeFlagBitMask(['base_PixelFlags_flag'], columnName='doesNotExist') 

185 self.assertEqual(unpacker.makeFlagBitMask(['base_PixelFlags_flag']), np.uint64(1)) 

186 self.assertEqual(unpacker.makeFlagBitMask(['base_PixelFlags_flag_offimage']), np.uint64(4)) 

187 self.assertEqual(unpacker.makeFlagBitMask(['base_PixelFlags_flag', 

188 'base_PixelFlags_flag_offimage']), 

189 np.uint64(5)) 

190 

191 

192class MemoryTester(lsst.utils.tests.MemoryTestCase): 

193 pass 

194 

195 

196def setup_module(module): 

197 lsst.utils.tests.init() 

198 

199 

200if __name__ == "__main__": 200 ↛ 201line 200 didn't jump to line 201 because the condition on line 200 was never true

201 lsst.utils.tests.init() 

202 unittest.main()