Coverage for tests / test_ssoAssociationTask.py: 23%
82 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-25 08:40 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-25 08:40 +0000
1# This file is part of ap_association.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22import numpy as np
23from astropy.table import Table
24import astropy.table as tb
25import astropy.units as u
26import unittest
28from lsst.pipe.tasks.ssoAssociation import SolarSystemAssociationTask
29import astshim as ast
30import lsst.geom as geom
31import lsst.afw.geom as afwGeom
32import lsst.meas.base.tests as measTests
33import lsst.utils.tests
36class TestSolarSystemAssociation(unittest.TestCase):
38 def setUp(self):
39 # Make fake sources
40 self.nSources = 10
41 self.bbox = geom.Box2I(geom.Point2I(0, 0),
42 geom.Extent2I(1024, 1153))
43 self.xyLoc = 100
44 dataset = measTests.TestDataset(self.bbox)
45 for srcIdx in range(self.nSources):
46 dataset.addSource(100000.0,
47 geom.Point2D(srcIdx*self.xyLoc,
48 srcIdx*self.xyLoc))
49 schema = dataset.makeMinimalSchema()
50 schema.addField("base_PixelFlags_flag", type="Flag")
51 schema.addField("base_PixelFlags_flag_offimage", type="Flag")
52 self.exposure, catalog = dataset.realize(
53 10.0, schema, randomSeed=1234)
54 for src in catalog:
55 src.setCoord(self.exposure.getWcs().pixelToSky(src.getCentroid()))
56 # Non-invertible WCS to test robustness to distortions.
57 # Coefficients transform (x - 1e-7*x^3 -> x, y -> y); see docs for PolyMap.
58 pixelCoeffs = np.array([[-1.0e-7, 1, 3, 0],
59 [1.0, 1, 1, 0],
60 [1.0, 2, 0, 1],
61 ])
62 self.exposure.setWcs(afwGeom.makeModifiedWcs(
63 afwGeom.TransformPoint2ToPoint2(ast.PolyMap(pixelCoeffs, 2, options="IterInverse=1")),
64 self.exposure.wcs,
65 modifyActualPixels=False
66 ))
68 # Convert to task required format
69 self.testDiaSources = catalog.asAstropy()
70 self.testDiaSources.rename_columns(["coord_ra", "coord_dec"], ["ra", "dec"])
71 self.testDiaSources["ra"] = np.rad2deg(self.testDiaSources["ra"]) * u.deg
72 self.testDiaSources["dec"] = np.rad2deg(self.testDiaSources["dec"]) * u.deg
73 self.testDiaSources["midpointMjdTai"] = (self.testDiaSources["ra"]*0 + 65000.0) * u.d
74 self.testDiaSources["extendedness"] = np.zeros(len(self.testDiaSources))
75 self.testDiaSources["band"] = np.full(len(self.testDiaSources), 'r', dtype='U1')
76 self.testDiaSources["psfFlux"] = np.ones(len(self.testDiaSources)) * 3. * u.nJy
77 self.testDiaSources["psfFluxErr"] = np.ones(len(self.testDiaSources)) * 0.1 * u.nJy
78 self.testDiaSources = tb.vstack([self.testDiaSources, Table([[45], [45]], names=["ra", "dec"])])
79 self.testDiaSources["ssObjectId"] = 0
80 self.testDiaSources["diaSourceId"] = [i for i in range(len(self.testDiaSources))]
82 # Grab a subset to treat as solar system objects
83 self.testSsObjects = Table()
84 self.testSsObjects["ObjID"] = ["K15MK4Q"]
85 self.testSsObjects["MPCORB_unpacked_primary_provisional_designation"] = self.testSsObjects["ObjID"]
87 self.testSsObjects["ssObjectId"] = [21164728253101137]
88 self.testSsObjects["ephRa"] = [45 * u.deg]
89 self.testSsObjects["ephDec"] = [45 * u.deg]
90 self.testSsObjects["tmin"] = [-1 * u.d]
91 self.testSsObjects["tmax"] = [1 * u.d]
92 self.testSsObjects["obs_x_poly"] = [np.array([0])]
93 self.testSsObjects["obs_y_poly"] = [np.array([0])]
94 self.testSsObjects["obs_z_poly"] = [np.array([0])]
95 self.testSsObjects["obj_x_poly"] = [np.array([1])]
96 self.testSsObjects["obj_y_poly"] = [np.array([1])]
97 self.testSsObjects["obj_z_poly"] = [np.array([2 ** 0.5])]
98 self.testSsObjects["Err(arcsec)"] = np.ones(len(self.testSsObjects)) * u.arcsec
99 self.testSsObjects["trailedSourceMagTrue"] = 22
101 def test_run(self):
102 """Test that association and id assignment work as expected.
103 """
104 ssAssocTask = SolarSystemAssociationTask()
105 results = ssAssocTask.run(self.testDiaSources,
106 self.testSsObjects,
107 self.exposure.visitInfo,
108 self.exposure.getBBox(),
109 self.exposure.wcs)
110 self.assertEqual(len(results.ssoAssocDiaSources), 1)
111 self.assertEqual(results.ssoAssocDiaSources['ra'][0], 45.0)
112 self.assertEqual(results.ssoAssocDiaSources['dec'][0], 45.0)
113 self.assertEqual(results.ssoAssocDiaSources['ssObjectId'][0], 21164728253101137)
115 def test_mask(self):
116 """Test that masking against the CCD bounding box works as expected.
117 """
118 ssAssocTask = SolarSystemAssociationTask()
119 # Test will all inside ccd
120 maskedObjects = ssAssocTask._maskToCcdRegion(self.testSsObjects,
121 self.exposure.getBBox(),
122 self.exposure.wcs,
123 1.0)
124 self.assertEqual(len(maskedObjects), len(self.testSsObjects))
126 # Add a new SolarSystemObjects outside of the bbox and test that it
127 # is excluded.
128 testObjects = tb.vstack([self.testSsObjects[:1] for i in range(3)])
129 testObjects[0]["ephRa"] = 150
130 testObjects[0]["ephDec"] = 80
131 testObjects[1]["ephRa"] = 150
132 testObjects[1]["ephDec"] = -80
133 # Coordinates are chosen so that the inverse WCS erroneously maps them
134 # to inside the box (to (74.5, 600.6) instead of around (1745, 600.6)).
135 testObjects[2]["ephRa"] = 44.91215199831453
136 testObjects[2]["ephDec"] = 45.001331943391406
137 maskedObjects = ssAssocTask._maskToCcdRegion(
138 tb.vstack([self.testSsObjects, testObjects]),
139 self.exposure.getBBox(),
140 self.exposure.wcs,
141 1.0)
142 self.assertEqual(len(maskedObjects), len(self.testSsObjects))
145class MemoryTester(lsst.utils.tests.MemoryTestCase):
146 pass
149def setup_module(module):
150 lsst.utils.tests.init()
153if __name__ == "__main__": 153 ↛ 154line 153 didn't jump to line 154 because the condition on line 153 was never true
154 lsst.utils.tests.init()
155 unittest.main()