Coverage for tests / test_match_probabilistic_task.py: 25%
55 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 08:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 08:55 +0000
1# This file is part of meas_astrom.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
23import unittest
24import lsst.utils.tests
26import lsst.afw.geom as afwGeom
27from lsst.meas.astrom import ConvertCatalogCoordinatesConfig, MatchProbabilisticConfig, MatchProbabilisticTask
29import astropy.table
30import numpy as np
33class MatchProbabilisticTaskTestCase(lsst.utils.tests.TestCase):
34 """MatchProbabilisticTask test case."""
35 def setUp(self):
36 # Add an extra target with a small spatial offset at the end
37 ra = np.array([-0.1, -0.2, 0., 0.1, 0.2, 0.2+1e-10])
38 dec = np.array([-0.15, 0.15, 0, 0.15, -0.15, -0.15-1e-10])
39 mag_g = np.array([23., 24., 25., 25.5, 26., 27.])
40 mag_r = mag_g + [0.5, -0.2, -0.8, -0.5, -1.5, 0.1]
41 coord_format = ConvertCatalogCoordinatesConfig
42 zeropoint = coord_format.mag_zeropoint_ref.default
43 fluxes = tuple(-0.4*10**(mag - zeropoint) for mag in (mag_g, mag_r))
44 eps_coord = np.full_like(ra, lsst.geom.Angle(0.2, lsst.geom.arcseconds).asDegrees())
45 eps_flux = np.full_like(eps_coord, 10)
46 flags = np.ones_like(eps_coord, dtype=bool)
47 name_index = 'index'
49 columns_flux = ['flux_g', 'flux_r']
50 columns_ref_meas = [
51 coord_format.column_ref_coord1.default,
52 coord_format.column_ref_coord2.default,
53 ] + columns_flux
55 n_target = len(ra)
56 # Exclude the extra target from the ref cat
57 # This makes it a spurious detection and gives this ref object two
58 # candidates to match to
59 n_exclude = 1
60 self.n_exclude = 1
61 # This removes the last n_exclude elements and then reversing
62 slice_ref = slice(-n_exclude - 1, None, -1)
63 data_ref = {
64 name_index: np.arange(n_target - n_exclude),
65 columns_ref_meas[0]: ra[slice_ref],
66 columns_ref_meas[1]: dec[slice_ref],
67 columns_flux[0]: fluxes[0][slice_ref],
68 columns_flux[1]: fluxes[1][slice_ref],
69 }
70 self.catalog_ref = astropy.table.Table(data=data_ref)
71 value_unmatched = np.iinfo(data_ref[name_index].dtype).min
72 self.indices_expected = np.concatenate(
73 (np.arange(n_target - n_exclude - 1, -1, -1), np.full(self.n_exclude, value_unmatched))
74 )
76 columns_target_meas = [
77 coord_format.column_target_coord1.default,
78 coord_format.column_target_coord2.default,
79 ] + columns_flux
80 columns_target_err = [f'{column}Err' for column in columns_target_meas]
82 data_target = {
83 name_index: np.arange(len(ra)),
84 columns_target_meas[0]: ra + eps_coord,
85 columns_target_meas[1]: dec + eps_coord,
86 f'{columns_target_meas[0]}Err': eps_coord,
87 f'{columns_target_meas[1]}Err': eps_coord,
88 columns_flux[0]: fluxes[0] + eps_flux,
89 columns_flux[1]: fluxes[1] - eps_flux,
90 f'{columns_flux[0]}Err': eps_flux,
91 f'{columns_flux[1]}Err': eps_flux,
92 "detect_isPrimary": flags,
93 "merge_peak_sky": ~flags,
94 }
95 self.catalog_target = astropy.table.Table(data=data_target)
97 self.task = MatchProbabilisticTask(config=MatchProbabilisticConfig(
98 columns_ref_flux=columns_flux,
99 columns_ref_meas=columns_ref_meas,
100 columns_ref_copy=[name_index],
101 columns_target_meas=columns_target_meas,
102 columns_target_err=columns_target_err,
103 columns_target_copy=[name_index],
104 columns_target_select_true=["detect_isPrimary"],
105 columns_target_select_false=["merge_peak_sky"],
106 ))
107 self.wcs = afwGeom.makeSkyWcs(crpix=lsst.geom.Point2D(9000, 9000),
108 crval=lsst.geom.SpherePoint(180., 0., lsst.geom.degrees),
109 cdMatrix=afwGeom.makeCdMatrix(scale=0.2*lsst.geom.arcseconds))
111 def tearDown(self):
112 del self.catalog_ref
113 del self.catalog_target
114 del self.indices_expected
115 del self.n_exclude
116 del self.task
117 del self.wcs
119 def test_MatchProbabilisticTask(self):
120 for (columns, catalog) in (
121 (self.task.columns_in_ref, self.catalog_ref),
122 (self.task.columns_in_target, self.catalog_target),
123 ):
124 self.assertTrue(all((column in catalog.columns for column in columns)))
125 result = self.task.run(
126 catalog_ref=self.catalog_ref,
127 catalog_target=self.catalog_target,
128 wcs=self.wcs,
129 logging_n_rows=2,
130 )
131 indices_target = result.cat_output_target["match_row"]
132 np.testing.assert_array_equal(indices_target, self.indices_expected)
135class MemoryTester(lsst.utils.tests.MemoryTestCase):
136 pass
139def setup_module(module):
140 lsst.utils.tests.init()
143if __name__ == "__main__": 143 ↛ 144line 143 didn't jump to line 144 because the condition on line 143 was never true
144 lsst.utils.tests.init()
145 unittest.main()