Coverage for tests / test_match_probabilistic_task.py: 25%

55 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-17 09:11 +0000

1# This file is part of meas_astrom. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22 

23import unittest 

24import lsst.utils.tests 

25 

26import lsst.afw.geom as afwGeom 

27from lsst.meas.astrom import ConvertCatalogCoordinatesConfig, MatchProbabilisticConfig, MatchProbabilisticTask 

28 

29import astropy.table 

30import numpy as np 

31 

32 

33class MatchProbabilisticTaskTestCase(lsst.utils.tests.TestCase): 

34 """MatchProbabilisticTask test case.""" 

35 def setUp(self): 

36 # Add an extra target with a small spatial offset at the end 

37 ra = np.array([-0.1, -0.2, 0., 0.1, 0.2, 0.2+1e-10]) 

38 dec = np.array([-0.15, 0.15, 0, 0.15, -0.15, -0.15-1e-10]) 

39 mag_g = np.array([23., 24., 25., 25.5, 26., 27.]) 

40 mag_r = mag_g + [0.5, -0.2, -0.8, -0.5, -1.5, 0.1] 

41 coord_format = ConvertCatalogCoordinatesConfig 

42 zeropoint = coord_format.mag_zeropoint_ref.default 

43 fluxes = tuple(-0.4*10**(mag - zeropoint) for mag in (mag_g, mag_r)) 

44 eps_coord = np.full_like(ra, lsst.geom.Angle(0.2, lsst.geom.arcseconds).asDegrees()) 

45 eps_flux = np.full_like(eps_coord, 10) 

46 flags = np.ones_like(eps_coord, dtype=bool) 

47 name_index = 'index' 

48 

49 columns_flux = ['flux_g', 'flux_r'] 

50 columns_ref_meas = [ 

51 coord_format.column_ref_coord1.default, 

52 coord_format.column_ref_coord2.default, 

53 ] + columns_flux 

54 

55 n_target = len(ra) 

56 # Exclude the extra target from the ref cat 

57 # This makes it a spurious detection and gives this ref object two 

58 # candidates to match to 

59 n_exclude = 1 

60 self.n_exclude = 1 

61 # This removes the last n_exclude elements and then reversing 

62 slice_ref = slice(-n_exclude - 1, None, -1) 

63 data_ref = { 

64 name_index: np.arange(n_target - n_exclude), 

65 columns_ref_meas[0]: ra[slice_ref], 

66 columns_ref_meas[1]: dec[slice_ref], 

67 columns_flux[0]: fluxes[0][slice_ref], 

68 columns_flux[1]: fluxes[1][slice_ref], 

69 } 

70 self.catalog_ref = astropy.table.Table(data=data_ref) 

71 value_unmatched = np.iinfo(data_ref[name_index].dtype).min 

72 self.indices_expected = np.concatenate( 

73 (np.arange(n_target - n_exclude - 1, -1, -1), np.full(self.n_exclude, value_unmatched)) 

74 ) 

75 

76 columns_target_meas = [ 

77 coord_format.column_target_coord1.default, 

78 coord_format.column_target_coord2.default, 

79 ] + columns_flux 

80 columns_target_err = [f'{column}Err' for column in columns_target_meas] 

81 

82 data_target = { 

83 name_index: np.arange(len(ra)), 

84 columns_target_meas[0]: ra + eps_coord, 

85 columns_target_meas[1]: dec + eps_coord, 

86 f'{columns_target_meas[0]}Err': eps_coord, 

87 f'{columns_target_meas[1]}Err': eps_coord, 

88 columns_flux[0]: fluxes[0] + eps_flux, 

89 columns_flux[1]: fluxes[1] - eps_flux, 

90 f'{columns_flux[0]}Err': eps_flux, 

91 f'{columns_flux[1]}Err': eps_flux, 

92 "detect_isPrimary": flags, 

93 "merge_peak_sky": ~flags, 

94 } 

95 self.catalog_target = astropy.table.Table(data=data_target) 

96 

97 self.task = MatchProbabilisticTask(config=MatchProbabilisticConfig( 

98 columns_ref_flux=columns_flux, 

99 columns_ref_meas=columns_ref_meas, 

100 columns_ref_copy=[name_index], 

101 columns_target_meas=columns_target_meas, 

102 columns_target_err=columns_target_err, 

103 columns_target_copy=[name_index], 

104 columns_target_select_true=["detect_isPrimary"], 

105 columns_target_select_false=["merge_peak_sky"], 

106 )) 

107 self.wcs = afwGeom.makeSkyWcs(crpix=lsst.geom.Point2D(9000, 9000), 

108 crval=lsst.geom.SpherePoint(180., 0., lsst.geom.degrees), 

109 cdMatrix=afwGeom.makeCdMatrix(scale=0.2*lsst.geom.arcseconds)) 

110 

111 def tearDown(self): 

112 del self.catalog_ref 

113 del self.catalog_target 

114 del self.indices_expected 

115 del self.n_exclude 

116 del self.task 

117 del self.wcs 

118 

119 def test_MatchProbabilisticTask(self): 

120 for (columns, catalog) in ( 

121 (self.task.columns_in_ref, self.catalog_ref), 

122 (self.task.columns_in_target, self.catalog_target), 

123 ): 

124 self.assertTrue(all((column in catalog.columns for column in columns))) 

125 result = self.task.run( 

126 catalog_ref=self.catalog_ref, 

127 catalog_target=self.catalog_target, 

128 wcs=self.wcs, 

129 logging_n_rows=2, 

130 ) 

131 indices_target = result.cat_output_target["match_row"] 

132 np.testing.assert_array_equal(indices_target, self.indices_expected) 

133 

134 

135class MemoryTester(lsst.utils.tests.MemoryTestCase): 

136 pass 

137 

138 

139def setup_module(module): 

140 lsst.utils.tests.init() 

141 

142 

143if __name__ == "__main__": 143 ↛ 144line 143 didn't jump to line 144 because the condition on line 143 was never true

144 lsst.utils.tests.init() 

145 unittest.main()