Coverage for tests / test_models.py: 12%

194 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-26 08:40 +0000

1# This file is part of lsst.scarlet.lite. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22import os 

23from functools import partial 

24from typing import cast 

25 

26import lsst.scarlet.lite.models as models 

27import numpy as np 

28from lsst.scarlet.lite import Blend, Box, FistaParameter, Image, Observation, Source 

29from lsst.scarlet.lite.component import Component, FactorizedComponent, default_adaprox_parameterization 

30from lsst.scarlet.lite.initialization import FactorizedInitialization 

31from lsst.scarlet.lite.models import ( 

32 CartesianFrame, 

33 EllipseFrame, 

34 EllipticalParametricComponent, 

35 FactorizedFreeFormComponent, 

36 FittedPsfBlend, 

37 FittedPsfObservation, 

38 ParametricComponent, 

39) 

40from lsst.scarlet.lite.operators import Monotonicity 

41from lsst.scarlet.lite.parameters import AdaproxParameter, parameter, relative_step 

42from lsst.scarlet.lite.utils import integrated_circular_gaussian 

43from numpy.testing import assert_array_equal 

44from utils import ScarletTestCase 

45 

46 

47def parameterize(component: Component): 

48 assert isinstance(component, ParametricComponent) 

49 component._spectrum = AdaproxParameter( 

50 component.spectrum, 

51 step=partial(relative_step, factor=1e-2, minimum=1e-16), 

52 ) 

53 component._params = AdaproxParameter( 

54 component._params.x, 

55 step=1e-2, 

56 ) 

57 

58 

59class TestFreeForm(ScarletTestCase): 

60 def setUp(self) -> None: 

61 yx0 = (1000, 2000) 

62 filename = os.path.join(__file__, "..", "..", "data", "hsc_cosmos_35.npz") 

63 filename = os.path.abspath(filename) 

64 data = np.load(filename) 

65 self.data = data 

66 model_psf = integrated_circular_gaussian(sigma=0.8) 

67 self.detect = np.sum(data["images"], axis=0) 

68 self.centers = np.array([data["catalog"]["y"], data["catalog"]["x"]]).T + np.array(yx0) 

69 bands = data["filters"] 

70 self.observation = Observation( 

71 Image(data["images"], bands=bands, yx0=yx0), 

72 Image(data["variance"], bands=bands, yx0=yx0), 

73 Image(1 / data["variance"], bands=bands, yx0=yx0), 

74 data["psfs"], 

75 model_psf[None], 

76 bands=bands, 

77 ) 

78 

79 def tearDown(self): 

80 del self.data 

81 

82 def test_free_form_component(self): 

83 images = self.data["images"] 

84 

85 # Test with no thresholding (sparsity) 

86 sources = [] 

87 for i in range(5): 

88 component = FactorizedFreeFormComponent( 

89 self.observation.bands, 

90 np.ones(5), 

91 images[i].copy(), 

92 self.observation.bbox, 

93 ) 

94 sources.append(Source([component])) 

95 

96 blend = Blend(sources, self.observation).fit_spectra() 

97 blend.parameterize(default_adaprox_parameterization) 

98 blend.fit(12, e_rel=1e-6) 

99 

100 # Test with thresholding (sparsity) 

101 sources = [] 

102 for i in range(5): 

103 component = FactorizedFreeFormComponent( 

104 self.observation.bands, 

105 np.ones(5), 

106 images[i].copy(), 

107 self.observation.bbox, 

108 bg_rms=self.observation.noise_rms, 

109 bg_thresh=0.25, 

110 min_area=4, 

111 ) 

112 sources.append(Source([component])) 

113 

114 blend = Blend(sources, self.observation).fit_spectra() 

115 blend.parameterize(default_adaprox_parameterization) 

116 blend.fit(12, e_rel=1e-6) 

117 

118 # Test with peak centers specified 

119 sources = [] 

120 peaks = list(np.array([self.data["catalog"]["y"], self.data["catalog"]["x"]]).T.astype(int)) 

121 for i in range(5): 

122 component = FactorizedFreeFormComponent( 

123 self.observation.bands, 

124 np.ones(5), 

125 images[i].copy(), 

126 self.observation.bbox, 

127 peaks=peaks, 

128 ) 

129 sources.append(Source([component])) 

130 

131 blend = Blend(sources, self.observation).fit_spectra() 

132 blend.parameterize(default_adaprox_parameterization) 

133 blend.fit(12, e_rel=1e-6) 

134 

135 # Tests for code blocks that are difficult to reach, 

136 # to complete test coverage 

137 component = blend.sources[-1].components[0] 

138 self.assertFalse(component.resize(self.observation.bbox)) 

139 component.morph[:] = 0 

140 component.prox_morph(component.morph) 

141 

142 

143class TestParametric(ScarletTestCase): 

144 def setUp(self) -> None: 

145 filename = os.path.join(__file__, "..", "..", "data", "hsc_cosmos_35.npz") 

146 filename = os.path.abspath(filename) 

147 data = np.load(filename) 

148 self.data = data 

149 self.model_psf = integrated_circular_gaussian(sigma=0.8) 

150 self.detect = np.sum(data["images"], axis=0) 

151 self.centers = np.array([data["catalog"]["y"], data["catalog"]["x"]]).T 

152 bands = data["filters"] 

153 self.observation = Observation( 

154 Image(data["images"], bands=bands), 

155 Image(data["variance"], bands=bands), 

156 Image(1 / data["variance"], bands=bands), 

157 data["psfs"], 

158 self.model_psf[None], 

159 bands=bands, 

160 ) 

161 

162 def tearDown(self): 

163 del self.data 

164 

165 def test_cartesian_frame(self): 

166 bbox = Box((31, 60), (1, 2)) 

167 frame = CartesianFrame(bbox) 

168 y = np.linspace(1, 31, 31) 

169 x = np.linspace(2, 61, 60) 

170 x, y = np.meshgrid(x, y) 

171 

172 self.assertBoxEqual(frame.bbox, bbox) 

173 self.assertTupleEqual(frame.shape, bbox.shape) 

174 assert_array_equal(frame.x_grid, x) 

175 assert_array_equal(frame.y_grid, y) 

176 self.assertIsNone(frame._r2) 

177 self.assertIsNone(frame._r) 

178 

179 def test_ellipse_frame(self): 

180 y0 = 23 

181 x0 = 36 

182 major = 3 

183 minor = 2 

184 theta = 2 * np.pi / 3 

185 bbox = Box((50, 39), (2, 7)) 

186 r_min = 1e-20 

187 frame = EllipseFrame( 

188 y0, 

189 x0, 

190 major, 

191 minor, 

192 theta, 

193 bbox, 

194 ) 

195 

196 self.assertEqual(frame.x0, x0) 

197 self.assertEqual(frame.y0, y0) 

198 self.assertEqual(frame.major, major) 

199 self.assertEqual(frame.minor, minor) 

200 self.assertEqual(frame.theta, theta) 

201 self.assertEqual(frame.bbox, bbox) 

202 

203 y = np.linspace(2, 51, 50) 

204 x = np.linspace(7, 45, 39) 

205 x, y = np.meshgrid(x, y) 

206 r2 = frame._xa**2 + frame._yb**2 + r_min**2 

207 r = np.sqrt(r2) 

208 

209 self.assertEqual(frame._sin, np.sin(theta)) 

210 self.assertEqual(frame._cos, np.cos(theta)) 

211 self.assertBoxEqual(frame.bbox, bbox) 

212 self.assertTupleEqual(frame.shape, bbox.shape) 

213 assert_array_equal(frame.x_grid, x) 

214 assert_array_equal(frame.y_grid, y) 

215 assert_array_equal(frame.r2_grid, r2) 

216 assert_array_equal(frame.r_grid, r) 

217 

218 # There doesn't seem to be much utility to testing the 

219 # gradients exactly, however we test that they all run 

220 # properly. 

221 gradients = ( 

222 frame.grad_x0, 

223 frame.grad_y0, 

224 frame.grad_major, 

225 frame.grad_minor, 

226 frame.grad_theta, 

227 ) 

228 

229 input_grad = np.ones(bbox.shape, dtype=float) 

230 original_grad = input_grad.copy() 

231 for grad in gradients: 

232 grad(input_grad, False) 

233 grad(input_grad, True) 

234 

235 # Make sure that none of the methods changed the input gradient 

236 assert_array_equal(input_grad, original_grad) 

237 

238 def test_parametric_component(self): 

239 observation = self.observation 

240 bands = observation.bands 

241 spectrum = np.ones((observation.n_bands,), dtype=float) 

242 frame = CartesianFrame(observation.bbox) 

243 

244 # Test integrated Gaussian PSF and sersic 

245 sources = [] 

246 for idx, center in enumerate(self.centers): 

247 # Get the integer center of the source 

248 cy, cx = int(np.round(center[0])), int(np.round(center[1])) 

249 # For now we use a fixed bounding box that is the size of 

250 # the observed PSF image 

251 bbox = Box((41, 41), origin=(cy - 20, cx - 20)) & observation.bbox 

252 # Keep track of the initial positions 

253 yi, xi = cy, cx 

254 # Restrict the values of the parameters 

255 _proxmin = np.array([yi - 2, xi - 2, 1e-1, 1e-1, -np.pi / 2, 1.1]) 

256 _proxmax = np.array([yi + 2, xi + 2, frame.shape[-2] / 2, frame.shape[-1] / 2, np.pi / 2, 3]) 

257 

258 __proxmin = np.array([yi - 2, xi - 2, 0.8]) 

259 __proxmax = np.array([yi + 2, xi + 2, 1.2]) 

260 

261 # Initialize a PSF-like component of the source using a 

262 # non-pixel integrated gaussian 

263 component = ParametricComponent( 

264 bands, 

265 bbox, 

266 spectrum=parameter(spectrum.copy()), 

267 morph_params=parameter(np.array([center[0], center[1], 0.8])), 

268 morph_func=models.integrated_gaussian, 

269 morph_grad=models.grad_integrated_gaussian, 

270 morph_prox=partial(models.bounded_prox, proxmin=__proxmin, proxmax=__proxmax), 

271 morph_step=np.array([1e-2, 1e-2, 1e-2]), 

272 prox_spectrum=lambda x: x, 

273 ) 

274 # Define the component to use ADAPROX as the optimizer 

275 components = [component] 

276 

277 # Initialize an n=1 sersic component 

278 component = EllipticalParametricComponent( 

279 bands, 

280 bbox, 

281 spectrum=parameter(spectrum.copy()), 

282 morph_params=parameter(np.array([center[0], center[1], 2 * 1.2**2, 2 * 1.2**2, 0.0, 1])), 

283 morph_func=models.sersic, 

284 morph_grad=models.grad_sersic, 

285 morph_prox=partial(models.bounded_prox, proxmin=_proxmin, proxmax=_proxmax), 

286 morph_step=np.array([1e-2, 1e-2, 1e-3, 1e-3, 1e-2, 1e-2]), 

287 ) 

288 # Define the component to use ADAPROX as the optimizer 

289 components.append(component) 

290 

291 component = EllipticalParametricComponent( 

292 bands, 

293 bbox, 

294 spectrum=parameter(spectrum.copy()), 

295 morph_params=parameter(np.array([center[0], center[1], 2 * 1.2**2, 2 * 1.2**2, 0.0, 1])), 

296 morph_func=models.sersic, 

297 morph_grad=models.grad_sersic, 

298 morph_prox=partial(models.bounded_prox, proxmin=_proxmin, proxmax=_proxmax), 

299 morph_step=np.array([1e-2, 1e-2, 1e-3, 1e-3, 1e-2, 1e-2]), 

300 ) 

301 # Define the component to use ADAPROX as the optimizer 

302 components.append(component) 

303 

304 # Create a new source using the two components 

305 sources.append(Source(components)) 

306 

307 # Fit the models 

308 blend = Blend(sources, observation) 

309 blend.parameterize(parameterize) 

310 blend.fit_spectra() 

311 

312 # Check properties of a component 

313 component = cast(ParametricComponent, blend.components[0]) 

314 self.assertTupleEqual(component.peak, tuple(self.centers[0])) 

315 self.assertEqual(component.y0, component._params.x[0]) 

316 self.assertEqual(component.x0, component._params.x[1]) 

317 assert_array_equal(component.morph_step, np.array([1e-2, 1e-2, 1e-2])) 

318 

319 component = cast(EllipticalParametricComponent, blend.components[1]) 

320 self.assertTupleEqual(component.peak, tuple(self.centers[0])) 

321 self.assertEqual(component.y0, component._params.x[0]) 

322 self.assertEqual(component.x0, component._params.x[1]) 

323 assert_array_equal(component.semi_major, component._params.x[2]) 

324 assert_array_equal(component.semi_minor, component._params.x[3]) 

325 assert_array_equal(component.theta, component._params.x[4]) 

326 assert_array_equal(component.ellipse_params, component._params.x[:5]) 

327 assert_array_equal(component.radial_params, component._params.x[5:]) 

328 self.assertIsNotNone(component.morph_prox) 

329 self.assertIsNotNone(component.morph_grad) 

330 

331 blend.fit(12) 

332 

333 # Test elliptical and circular Gaussian models 

334 sources = [] 

335 for idx, center in enumerate(self.centers): 

336 # Get the integer center of the source 

337 cy, cx = int(np.round(center[0])), int(np.round(center[1])) 

338 # For now we use a fixed bounding box that is the size of 

339 # the observed PSF image 

340 bbox = Box((41, 41), origin=(cy - 20, cx - 20)) & observation.bbox 

341 # Keep track of the initial positions 

342 yi, xi = cy, cx 

343 # Restrict the values of the parameters 

344 _proxmin = np.array([yi - 2, xi - 2, 1e-1, 1e-1, -np.pi / 2]) 

345 _proxmax = np.array([yi + 2, xi + 2, frame.shape[-2] / 2, frame.shape[-1] / 2, np.pi / 2]) 

346 

347 __proxmin = np.array([yi - 2, xi - 2]) 

348 __proxmax = np.array([yi + 2, xi + 2]) 

349 

350 # Initialize a PSF-like component of the source using a 

351 # non-pixel integrated gaussian 

352 component = ParametricComponent( 

353 bands, 

354 bbox, 

355 spectrum=parameter(spectrum.copy()), 

356 morph_params=parameter(np.array([center[0], center[1]])), 

357 morph_func=partial(models.circular_gaussian, sigma=0.8), 

358 morph_grad=partial(models.grad_circular_gaussian, sigma=0.8), 

359 morph_prox=partial(models.bounded_prox, proxmin=__proxmin, proxmax=__proxmax), 

360 morph_step=np.array([1e-2, 1e-2, 1e-2]), 

361 ) 

362 # Define the component to use ADAPROX as the optimizer 

363 components = [component] 

364 

365 # Initialize an n=1 sersic component 

366 component = EllipticalParametricComponent( 

367 bands, 

368 bbox, 

369 spectrum=parameter(spectrum.copy()), 

370 morph_params=parameter(np.array([center[0], center[1], 2 * 1.2**2, 2 * 1.2**2, 0.0])), 

371 morph_func=models.gaussian2d, 

372 morph_grad=models.grad_gaussian2, 

373 morph_prox=partial(models.bounded_prox, proxmin=_proxmin, proxmax=_proxmax), 

374 morph_step=np.array([1e-2, 1e-2, 1e-3, 1e-3, 1e-2, 1e-2]), 

375 ) 

376 # Define the component to use ADAPROX as the optimizer 

377 components.append(component) 

378 # Create a new source using the two components 

379 sources.append(Source(components)) 

380 

381 # Fit the models 

382 blend = Blend(sources, observation) 

383 blend.parameterize(parameterize) 

384 blend.fit_spectra() 

385 blend.fit(12) 

386 

387 def test_psf_fitting(self): 

388 # Use flat weights for FISTA optimization 

389 weights = np.ones(self.data["images"].shape) 

390 

391 monotonicity = Monotonicity((101, 101)) 

392 

393 observation = FittedPsfObservation( 

394 self.data["images"], 

395 self.data["variance"], 

396 weights, 

397 self.data["psfs"], 

398 self.model_psf[None], 

399 bands=self.data["filters"], 

400 ) 

401 

402 def fista_parameterization(component: Component): 

403 if isinstance(component, FactorizedComponent): 

404 component._spectrum = FistaParameter(component.spectrum, step=0.5) 

405 component._morph = FistaParameter(component.morph, step=0.5) 

406 else: 

407 if isinstance(component, FittedPsfObservation): 

408 component._fitted_kernel = FistaParameter(component._fitted_kernel.x, step=1e-2) 

409 

410 init = FactorizedInitialization(observation, self.centers, monotonicity=monotonicity) 

411 blend = FittedPsfBlend(init.sources, observation).fit_spectra() 

412 blend.parameterize(fista_parameterization) 

413 assert isinstance(cast(FittedPsfObservation, blend.observation)._fitted_kernel, FistaParameter) 

414 blend.fit(12, e_rel=1e-4)