Coverage for tests / test_computeExPsf.py: 17%

92 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-23 08:32 +0000

1# This file is part of meas_algorithms. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22 

23import unittest 

24from typing import Optional 

25import numpy.typing as npt 

26 

27import numpy as np 

28 

29import lsst.utils.tests 

30from lsst.meas.algorithms import GaussianProcessTreegp 

31from lsst.meas.algorithms.computeExPsf import ComputeExPsfTask 

32import lsst.pipe.base as pipeBase 

33 

34 

35def rbf_kernel( 

36 x1: npt.NDArray, x2: npt.NDArray, sigma: float, correlation_length: float 

37) -> npt.NDArray: 

38 """ 

39 Computes the radial basis function (RBF) kernel matrix. 

40 

41 Parameters: 

42 ----------- 

43 x1 : `np.array` 

44 Location of training data point with shape (n_samples, n_features). 

45 x2 : `np.array` 

46 Location of training/test data point with shape (n_samples, n_features). 

47 sigma : `float` 

48 The scale parameter of the kernel. 

49 correlation_length : `float` 

50 The correlation length parameter of the kernel. 

51 

52 Returns: 

53 -------- 

54 kernel : `np.ndarray` 

55 RBF kernel matrix with shape (n_samples, n_samples). 

56 """ 

57 distance_squared = np.sum((x1[:, None, :] - x2[None, :, :]) ** 2, axis=-1) 

58 kernel = (sigma**2) * np.exp(-0.5 * distance_squared / (correlation_length**2)) 

59 return kernel 

60 

61 

62def generate_gaussian_random_field( 

63 xmin: int = 0, 

64 xmax: int = 2000, 

65 ymin: int = 0, 

66 ymax: int = 2000, 

67 npoints: int = 10000, 

68 nmax: int = 1000, 

69 std: float = 1.0, 

70 correlation_length: float = 10.0, 

71 white_noise: float = 1.0, 

72 seed: int = 42, 

73 input_coord: Optional[npt.NDArray] = None, 

74) -> pipeBase.Struct: 

75 """Generate a Gaussian Random Field. 

76 

77 Function to generate a Gaussian Random Field. 

78 Help for unit test and generate spatial correlated 

79 function and have an analytical 2-point correlation 

80 to compared with (Gaussian here). 

81 

82 Parameters 

83 ---------- 

84 xmin: `int` 

85 Min value in x direction. 

86 Default: ``0`` 

87 xmax: `int` 

88 Max value in x direction. 

89 Default: ``2000`` 

90 ymin: `int` 

91 Min value in y direction. 

92 Default: ``0`` 

93 ymax: `int` 

94 Max value in y direction. 

95 Default: ``2000`` 

96 npoints: `int` 

97 Number of data points generated. 

98 Default: ``10000`` 

99 nmax: `int` 

100 Max number of data points generated using 

101 `np.random.Generator.multivariate_normal`. If 

102 npoints>nmax, a GP will be used in addition. 

103 Default: ``1000`` 

104 std: `float` 

105 Amplitude of the gaussian random field. 

106 Default: ``1.0`` 

107 correlation_length: `float` 

108 Correlation length of the gaussian random field. 

109 Default: ``10.0`` 

110 white_noise: `float` 

111 Noise added to the gaussian random field. 

112 Default: ``1.0`` 

113 seed: `int` 

114 Seed of the random generator. 

115 Default: ``42`` 

116 input_coord: `np.ndarray` 

117 Take a input coord to generate the Gaussian Random field 

118 Default: ``None`` 

119 

120 Returns 

121 ------- 

122 struct : `lsst.pipe.base.Struct` 

123 The struct contains the following data: 

124 ``coord``: `np.ndarray` 

125 2D coordinate of the gaussian random field 

126 ``z``: `np.ndarray` 

127 Scalar value of the gaussian random field 

128 """ 

129 # Choose explicit RNG. 

130 rng = np.random.Generator(np.random.MT19937(seed)) 

131 

132 if input_coord is not None: 

133 npoints = len(input_coord[:, 0]) 

134 

135 if npoints > nmax: 

136 ngenerated = nmax 

137 else: 

138 ngenerated = npoints 

139 

140 if input_coord is None or npoints > nmax: 

141 x1 = rng.uniform(xmin, xmax, ngenerated) 

142 x2 = rng.uniform(ymin, ymax, ngenerated) 

143 coord1 = np.array([x1, x2]).T 

144 else: 

145 coord1 = input_coord 

146 kernel = rbf_kernel(coord1, coord1, std, correlation_length) 

147 kernel += np.eye(ngenerated) * white_noise**2 

148 

149 z1 = rng.multivariate_normal(np.zeros(ngenerated), kernel) 

150 

151 # Data augmentation. Create a gaussian random field 

152 # with npoints>nmax is to slow. So generate nmax points 

153 # and then interpolate it with a GP to do data augmentation. 

154 

155 if npoints > nmax: 

156 if input_coord is None: 

157 x1 = rng.uniform(xmin, xmax, npoints) 

158 x2 = rng.uniform(ymin, ymax, npoints) 

159 coord = np.array([x1, x2]).T 

160 else: 

161 coord = input_coord 

162 

163 tgp = GaussianProcessTreegp( 

164 std=std, 

165 correlation_length=correlation_length, 

166 white_noise=white_noise, 

167 mean=0.0, 

168 ) 

169 tgp.fit(coord1, z1) 

170 z = tgp.predict(coord) 

171 else: 

172 coord = coord1 

173 z = z1 

174 

175 return pipeBase.Struct(coord=coord, z=z) 

176 

177 

178class ComputeExPsfTestCase(lsst.utils.tests.TestCase): 

179 """Test ComputeExPsfTask.""" 

180 

181 def setUp(self) -> None: 

182 super().setUp() 

183 

184 output1 = generate_gaussian_random_field( 

185 xmin=0, 

186 xmax=2000, 

187 ymin=0, 

188 ymax=2000, 

189 npoints=10000, 

190 nmax=2000, 

191 std=1.0, 

192 correlation_length=200.0, 

193 white_noise=0.01, 

194 seed=1, 

195 input_coord=None, 

196 ) 

197 

198 output2 = generate_gaussian_random_field( 

199 xmin=0, 

200 xmax=2000, 

201 ymin=0, 

202 ymax=2000, 

203 npoints=10000, 

204 nmax=2000, 

205 std=1.0, 

206 correlation_length=100.0, 

207 white_noise=0.01, 

208 seed=44, 

209 input_coord=output1.coord, 

210 ) 

211 

212 self.coord1 = output1.coord 

213 self.coord2 = output2.coord 

214 self.de1 = output1.z 

215 self.de2 = output2.z 

216 

217 def test_comp_ex_psf(self) -> None: 

218 """Test that ex metric are compute and make sense.""" 

219 

220 np.testing.assert_equal(self.coord1, self.coord2) 

221 

222 ra = self.coord1[:, 0] 

223 dec = self.coord1[:, 1] 

224 

225 config = ComputeExPsfTask.ConfigClass() 

226 

227 config.treecorr.min_sep = 0.01 

228 config.treecorr.max_sep = 5.0 

229 config.treecorr.nbins = 1 

230 config.treecorr.bin_type = "Linear" 

231 config.treecorr.sep_units = "arcmin" 

232 

233 task = ComputeExPsfTask(config) 

234 output1 = task.run(self.de1, self.de2, ra, dec, units="arcmin") 

235 

236 # At small scale, expect the scalar two-point correlation function 

237 # to be close to the input variance for de1 and de2. Cross correlation 

238 # between de1 and de2 should be zeros are they are 2 indendant field. 

239 

240 np.testing.assert_allclose(output1.metric_E1, 1.0, atol=2e-1) 

241 np.testing.assert_allclose(output1.metric_E2, 1.0, atol=2e-1) 

242 np.testing.assert_allclose(output1.metric_Ex, 0.0, atol=2e-1) 

243 

244 config = ComputeExPsfTask.ConfigClass() 

245 

246 config.treecorr.min_sep = 5.0 

247 config.treecorr.max_sep = 600.0 

248 config.treecorr.nbins = 1 

249 config.treecorr.bin_type = "Linear" 

250 config.treecorr.sep_units = "arcmin" 

251 

252 # At intermediar scale, expect E1>E2>Ex. 

253 

254 task = ComputeExPsfTask(config) 

255 output2 = task.run(self.de1, self.de2, ra, dec, units="arcmin") 

256 

257 np.testing.assert_allclose(output2.metric_E1, 0.20, atol=2e-1) 

258 np.testing.assert_allclose(output2.metric_E2, 0.05, atol=2e-1) 

259 np.testing.assert_allclose(output2.metric_Ex, 0.0, atol=2e-1) 

260 

261 config = ComputeExPsfTask.ConfigClass() 

262 

263 config.treecorr.min_sep = 600.0 

264 config.treecorr.max_sep = 1000.0 

265 config.treecorr.nbins = 1 

266 config.treecorr.bin_type = "Linear" 

267 config.treecorr.sep_units = "arcmin" 

268 

269 # At large scale, expect the scalar two-point correlation function to 

270 # be all close to 0. 

271 

272 task = ComputeExPsfTask(config) 

273 output2 = task.run(self.de1, self.de2, ra, dec, units="arcmin") 

274 

275 np.testing.assert_allclose(output2.metric_E1, 0.0, atol=2e-1) 

276 np.testing.assert_allclose(output2.metric_E2, 0.0, atol=2e-1) 

277 np.testing.assert_allclose(output2.metric_Ex, 0.0, atol=2e-1) 

278 

279 

280def setup_module(module): 

281 lsst.utils.tests.init() 

282 

283 

284class MemoryTestCase(lsst.utils.tests.MemoryTestCase): 

285 pass 

286 

287 

288if __name__ == "__main__": 288 ↛ 289line 288 didn't jump to line 289 because the condition on line 288 was never true

289 lsst.utils.tests.init() 

290 unittest.main()