Coverage for tests / test_transformDiaSourceCatalog.py: 22%
110 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 08:53 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 08:53 +0000
1# This file is part of ap_association
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22import os
23import unittest
25import numpy as np
27from lsst.ap.association.transformDiaSourceCatalog import (TransformDiaSourceCatalogConfig,
28 TransformDiaSourceCatalogTask)
29from lsst.afw.cameraGeom.testUtils import DetectorWrapper
30import lsst.daf.base as dafBase
31import lsst.afw.image as afwImage
32import lsst.geom as geom
33import lsst.meas.base.tests as measTests
34from lsst.pipe.base import Struct
35import lsst.utils.tests
37from lsst.ap.association.transformDiaSourceCatalog import UnpackApdbFlags
39TESTDIR = os.path.abspath(os.path.dirname(__file__))
42class TestTransformDiaSourceCatalogTask(unittest.TestCase):
43 def setUp(self):
44 # Create an instance of random generator with fixed seed.
45 rng = np.random.default_rng(1234)
47 self.nSources = 10
48 # Default PSF size (psfDim in makeEmptyExposure) in TestDataset results
49 # in an 18 pixel wide source box.
50 self.bboxSize = 18
51 self.yLoc = 100
52 self.bbox = geom.Box2I(geom.Point2I(0, 0),
53 geom.Extent2I(1024, 1153))
54 dataset = measTests.TestDataset(self.bbox)
55 for srcIdx in range(self.nSources-1):
56 # Place sources at (index, yLoc), so we can distinguish them later.
57 dataset.addSource(100000.0, geom.Point2D(srcIdx, self.yLoc))
58 # Ensure the last source has no peak `significance` field.
59 dataset.addSource(100000.0, geom.Point2D(srcIdx+1, self.yLoc), setPeakSignificance=False)
60 schema = dataset.makeMinimalSchema()
61 schema.addField("base_PixelFlags_flag", type="Flag")
62 schema.addField("base_PixelFlags_flag_offimage", type="Flag")
63 schema.addField("sky_source", type="Flag", doc="Sky objects.")
64 schema.addField("reliability", doc="real/bogus score of this source", type=float)
65 self.exposure, self.inputCatalog = dataset.realize(10.0, schema, randomSeed=1234)
66 # Create schemas for use in initializing the TransformDiaSourceCatalog task.
67 self.initInputs = {"diaSourceSchema": Struct(schema=schema)}
68 self.initInputsBadFlags = {"diaSourceSchema": Struct(schema=dataset.makeMinimalSchema())}
70 self.inputCatalog["reliability"] = rng.random(self.nSources, dtype=np.float32)
72 self.expId = 4321
73 self.date = dafBase.DateTime(nsecs=1400000000 * 10**9)
74 detector = DetectorWrapper(id=23, bbox=self.exposure.getBBox()).detector
75 visit = afwImage.VisitInfo(
76 id=self.expId,
77 exposureTime=200.,
78 date=self.date)
79 self.exposure.info.id = self.expId + (10000*detector.getId())
80 self.exposure.setDetector(detector)
81 self.exposure.info.setVisitInfo(visit)
82 self.band = 'g'
83 self.exposure.setFilter(afwImage.FilterLabel(band=self.band, physical='g.MP9401'))
84 scale = 2
85 scaleErr = 1
86 self.photoCalib = afwImage.PhotoCalib(scale, scaleErr)
87 self.exposure.setPhotoCalib(self.photoCalib)
89 self.config = TransformDiaSourceCatalogConfig()
90 self.config.flagMap = os.path.join(TESTDIR, "data", "test-flag-map.yaml")
91 self.config.functorFile = os.path.join(TESTDIR,
92 "data",
93 "testDiaSource.yaml")
95 def test_run(self):
96 """Test output dataFrame is created and values are correctly inserted
97 from the exposure.
98 """
99 transformTask = TransformDiaSourceCatalogTask(initInputs=self.initInputs,
100 config=self.config)
101 result = transformTask.run(self.inputCatalog,
102 self.exposure,
103 self.band)
105 self.assertEqual(len(result.diaSourceTable), len(self.inputCatalog))
106 np.testing.assert_array_equal(result.diaSourceTable["bboxSize"], [self.bboxSize]*self.nSources)
107 np.testing.assert_array_equal(result.diaSourceTable["visit"],
108 [self.exposure.visitInfo.id]*self.nSources)
109 np.testing.assert_array_equal(result.diaSourceTable["detector"],
110 [self.exposure.detector.getId()]*self.nSources)
111 np.testing.assert_array_equal(result.diaSourceTable["band"], [self.band]*self.nSources)
112 np.testing.assert_array_equal(result.diaSourceTable["midpointMjdTai"],
113 [self.date.get(system=dafBase.DateTime.MJD)]*self.nSources)
114 np.testing.assert_array_equal(result.diaSourceTable["diaObjectId"], [0]*self.nSources)
115 np.testing.assert_array_equal(result.diaSourceTable["x"], np.arange(self.nSources))
116 # The final snr value should be NaN because it doesn't have a peak significance field.
117 expect_snr = [397.887353515625]*9
118 expect_snr.append(np.nan)
119 # Have to use allclose because assert_array_equal doesn't support equal_nan.
120 np.testing.assert_allclose(result.diaSourceTable["snr"], expect_snr, equal_nan=True, rtol=0)
122 def test_run_with_apdb_schema(self):
123 """Test output dataFrame is created and values are correctly inserted
124 from the exposure.
125 """
126 self.config.doUseSchema = True
127 transformTask = TransformDiaSourceCatalogTask(initInputs=self.initInputs,
128 config=self.config)
129 result = transformTask.run(self.inputCatalog,
130 self.exposure,
131 self.band)
133 self.assertEqual(len(result.diaSourceTable), len(self.inputCatalog))
134 np.testing.assert_array_equal(result.diaSourceTable["bboxSize"], [self.bboxSize]*self.nSources)
135 np.testing.assert_array_equal(result.diaSourceTable["visit"],
136 [self.exposure.visitInfo.id]*self.nSources)
137 np.testing.assert_array_equal(result.diaSourceTable["detector"],
138 [self.exposure.detector.getId()]*self.nSources)
139 np.testing.assert_array_equal(result.diaSourceTable["band"], [self.band]*self.nSources)
140 np.testing.assert_array_equal(result.diaSourceTable["midpointMjdTai"],
141 [self.date.get(system=dafBase.DateTime.MJD)]*self.nSources)
142 np.testing.assert_array_equal(result.diaSourceTable["diaObjectId"], [0]*self.nSources)
143 np.testing.assert_array_equal(result.diaSourceTable["x"], np.arange(self.nSources))
144 # The final snr value should be NaN because it doesn't have a peak significance field.
145 expect_snr = [397.887353515625]*9
146 expect_snr.append(np.nan)
147 # Downcast to single precision to match `sdm_schemas`
148 expect_snr = np.array(expect_snr, dtype=np.float32)
149 # Have to use allclose because assert_array_equal doesn't support equal_nan.
150 np.testing.assert_allclose(result.diaSourceTable["snr"], expect_snr, equal_nan=True, rtol=0)
152 def test_run_dia_source_wrong_flags(self):
153 """Test that the proper errors are thrown when requesting flag columns
154 that are not in the input schema.
155 """
156 with self.assertRaises(KeyError):
157 TransformDiaSourceCatalogTask(initInputs=self.initInputsBadFlags)
159 def test_computeBBoxSize(self):
160 transform = TransformDiaSourceCatalogTask(initInputs=self.initInputs,
161 config=self.config)
162 boxSizes = transform.computeBBoxSizes(self.inputCatalog)
164 for size in boxSizes:
165 self.assertEqual(size, self.bboxSize)
166 self.assertEqual(len(boxSizes), self.nSources)
168 def test_flag_existence_check(self):
169 unpacker = UnpackApdbFlags(self.config.flagMap, "DiaSource")
171 self.assertTrue(unpacker.flagExists('base_PixelFlags_flag'))
172 self.assertFalse(unpacker.flagExists(''))
173 with self.assertRaisesRegex(ValueError, 'column doesNotExist not in flag map'):
174 unpacker.flagExists('base_PixelFlags_flag', columnName='doesNotExist')
176 def test_flag_bitmask(self):
177 """Test that we get the expected bitmask back from supplied flag names.
178 """
179 unpacker = UnpackApdbFlags(self.config.flagMap, "DiaSource")
181 with self.assertRaisesRegex(ValueError, "flag '' not included"):
182 unpacker.makeFlagBitMask([''])
183 with self.assertRaisesRegex(ValueError, 'column doesNotExist not in flag map'):
184 unpacker.makeFlagBitMask(['base_PixelFlags_flag'], columnName='doesNotExist')
185 self.assertEqual(unpacker.makeFlagBitMask(['base_PixelFlags_flag']), np.uint64(1))
186 self.assertEqual(unpacker.makeFlagBitMask(['base_PixelFlags_flag_offimage']), np.uint64(4))
187 self.assertEqual(unpacker.makeFlagBitMask(['base_PixelFlags_flag',
188 'base_PixelFlags_flag_offimage']),
189 np.uint64(5))
192class MemoryTester(lsst.utils.tests.MemoryTestCase):
193 pass
196def setup_module(module):
197 lsst.utils.tests.init()
200if __name__ == "__main__": 200 ↛ 201line 200 didn't jump to line 201 because the condition on line 200 was never true
201 lsst.utils.tests.init()
202 unittest.main()