Coverage for tests / test_models.py: 12%
194 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 08:40 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 08:40 +0000
1# This file is part of lsst.scarlet.lite.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22import os
23from functools import partial
24from typing import cast
26import lsst.scarlet.lite.models as models
27import numpy as np
28from lsst.scarlet.lite import Blend, Box, FistaParameter, Image, Observation, Source
29from lsst.scarlet.lite.component import Component, FactorizedComponent, default_adaprox_parameterization
30from lsst.scarlet.lite.initialization import FactorizedInitialization
31from lsst.scarlet.lite.models import (
32 CartesianFrame,
33 EllipseFrame,
34 EllipticalParametricComponent,
35 FactorizedFreeFormComponent,
36 FittedPsfBlend,
37 FittedPsfObservation,
38 ParametricComponent,
39)
40from lsst.scarlet.lite.operators import Monotonicity
41from lsst.scarlet.lite.parameters import AdaproxParameter, parameter, relative_step
42from lsst.scarlet.lite.utils import integrated_circular_gaussian
43from numpy.testing import assert_array_equal
44from utils import ScarletTestCase
47def parameterize(component: Component):
48 assert isinstance(component, ParametricComponent)
49 component._spectrum = AdaproxParameter(
50 component.spectrum,
51 step=partial(relative_step, factor=1e-2, minimum=1e-16),
52 )
53 component._params = AdaproxParameter(
54 component._params.x,
55 step=1e-2,
56 )
59class TestFreeForm(ScarletTestCase):
60 def setUp(self) -> None:
61 yx0 = (1000, 2000)
62 filename = os.path.join(__file__, "..", "..", "data", "hsc_cosmos_35.npz")
63 filename = os.path.abspath(filename)
64 data = np.load(filename)
65 self.data = data
66 model_psf = integrated_circular_gaussian(sigma=0.8)
67 self.detect = np.sum(data["images"], axis=0)
68 self.centers = np.array([data["catalog"]["y"], data["catalog"]["x"]]).T + np.array(yx0)
69 bands = data["filters"]
70 self.observation = Observation(
71 Image(data["images"], bands=bands, yx0=yx0),
72 Image(data["variance"], bands=bands, yx0=yx0),
73 Image(1 / data["variance"], bands=bands, yx0=yx0),
74 data["psfs"],
75 model_psf[None],
76 bands=bands,
77 )
79 def tearDown(self):
80 del self.data
82 def test_free_form_component(self):
83 images = self.data["images"]
85 # Test with no thresholding (sparsity)
86 sources = []
87 for i in range(5):
88 component = FactorizedFreeFormComponent(
89 self.observation.bands,
90 np.ones(5),
91 images[i].copy(),
92 self.observation.bbox,
93 )
94 sources.append(Source([component]))
96 blend = Blend(sources, self.observation).fit_spectra()
97 blend.parameterize(default_adaprox_parameterization)
98 blend.fit(12, e_rel=1e-6)
100 # Test with thresholding (sparsity)
101 sources = []
102 for i in range(5):
103 component = FactorizedFreeFormComponent(
104 self.observation.bands,
105 np.ones(5),
106 images[i].copy(),
107 self.observation.bbox,
108 bg_rms=self.observation.noise_rms,
109 bg_thresh=0.25,
110 min_area=4,
111 )
112 sources.append(Source([component]))
114 blend = Blend(sources, self.observation).fit_spectra()
115 blend.parameterize(default_adaprox_parameterization)
116 blend.fit(12, e_rel=1e-6)
118 # Test with peak centers specified
119 sources = []
120 peaks = list(np.array([self.data["catalog"]["y"], self.data["catalog"]["x"]]).T.astype(int))
121 for i in range(5):
122 component = FactorizedFreeFormComponent(
123 self.observation.bands,
124 np.ones(5),
125 images[i].copy(),
126 self.observation.bbox,
127 peaks=peaks,
128 )
129 sources.append(Source([component]))
131 blend = Blend(sources, self.observation).fit_spectra()
132 blend.parameterize(default_adaprox_parameterization)
133 blend.fit(12, e_rel=1e-6)
135 # Tests for code blocks that are difficult to reach,
136 # to complete test coverage
137 component = blend.sources[-1].components[0]
138 self.assertFalse(component.resize(self.observation.bbox))
139 component.morph[:] = 0
140 component.prox_morph(component.morph)
143class TestParametric(ScarletTestCase):
144 def setUp(self) -> None:
145 filename = os.path.join(__file__, "..", "..", "data", "hsc_cosmos_35.npz")
146 filename = os.path.abspath(filename)
147 data = np.load(filename)
148 self.data = data
149 self.model_psf = integrated_circular_gaussian(sigma=0.8)
150 self.detect = np.sum(data["images"], axis=0)
151 self.centers = np.array([data["catalog"]["y"], data["catalog"]["x"]]).T
152 bands = data["filters"]
153 self.observation = Observation(
154 Image(data["images"], bands=bands),
155 Image(data["variance"], bands=bands),
156 Image(1 / data["variance"], bands=bands),
157 data["psfs"],
158 self.model_psf[None],
159 bands=bands,
160 )
162 def tearDown(self):
163 del self.data
165 def test_cartesian_frame(self):
166 bbox = Box((31, 60), (1, 2))
167 frame = CartesianFrame(bbox)
168 y = np.linspace(1, 31, 31)
169 x = np.linspace(2, 61, 60)
170 x, y = np.meshgrid(x, y)
172 self.assertBoxEqual(frame.bbox, bbox)
173 self.assertTupleEqual(frame.shape, bbox.shape)
174 assert_array_equal(frame.x_grid, x)
175 assert_array_equal(frame.y_grid, y)
176 self.assertIsNone(frame._r2)
177 self.assertIsNone(frame._r)
179 def test_ellipse_frame(self):
180 y0 = 23
181 x0 = 36
182 major = 3
183 minor = 2
184 theta = 2 * np.pi / 3
185 bbox = Box((50, 39), (2, 7))
186 r_min = 1e-20
187 frame = EllipseFrame(
188 y0,
189 x0,
190 major,
191 minor,
192 theta,
193 bbox,
194 )
196 self.assertEqual(frame.x0, x0)
197 self.assertEqual(frame.y0, y0)
198 self.assertEqual(frame.major, major)
199 self.assertEqual(frame.minor, minor)
200 self.assertEqual(frame.theta, theta)
201 self.assertEqual(frame.bbox, bbox)
203 y = np.linspace(2, 51, 50)
204 x = np.linspace(7, 45, 39)
205 x, y = np.meshgrid(x, y)
206 r2 = frame._xa**2 + frame._yb**2 + r_min**2
207 r = np.sqrt(r2)
209 self.assertEqual(frame._sin, np.sin(theta))
210 self.assertEqual(frame._cos, np.cos(theta))
211 self.assertBoxEqual(frame.bbox, bbox)
212 self.assertTupleEqual(frame.shape, bbox.shape)
213 assert_array_equal(frame.x_grid, x)
214 assert_array_equal(frame.y_grid, y)
215 assert_array_equal(frame.r2_grid, r2)
216 assert_array_equal(frame.r_grid, r)
218 # There doesn't seem to be much utility to testing the
219 # gradients exactly, however we test that they all run
220 # properly.
221 gradients = (
222 frame.grad_x0,
223 frame.grad_y0,
224 frame.grad_major,
225 frame.grad_minor,
226 frame.grad_theta,
227 )
229 input_grad = np.ones(bbox.shape, dtype=float)
230 original_grad = input_grad.copy()
231 for grad in gradients:
232 grad(input_grad, False)
233 grad(input_grad, True)
235 # Make sure that none of the methods changed the input gradient
236 assert_array_equal(input_grad, original_grad)
238 def test_parametric_component(self):
239 observation = self.observation
240 bands = observation.bands
241 spectrum = np.ones((observation.n_bands,), dtype=float)
242 frame = CartesianFrame(observation.bbox)
244 # Test integrated Gaussian PSF and sersic
245 sources = []
246 for idx, center in enumerate(self.centers):
247 # Get the integer center of the source
248 cy, cx = int(np.round(center[0])), int(np.round(center[1]))
249 # For now we use a fixed bounding box that is the size of
250 # the observed PSF image
251 bbox = Box((41, 41), origin=(cy - 20, cx - 20)) & observation.bbox
252 # Keep track of the initial positions
253 yi, xi = cy, cx
254 # Restrict the values of the parameters
255 _proxmin = np.array([yi - 2, xi - 2, 1e-1, 1e-1, -np.pi / 2, 1.1])
256 _proxmax = np.array([yi + 2, xi + 2, frame.shape[-2] / 2, frame.shape[-1] / 2, np.pi / 2, 3])
258 __proxmin = np.array([yi - 2, xi - 2, 0.8])
259 __proxmax = np.array([yi + 2, xi + 2, 1.2])
261 # Initialize a PSF-like component of the source using a
262 # non-pixel integrated gaussian
263 component = ParametricComponent(
264 bands,
265 bbox,
266 spectrum=parameter(spectrum.copy()),
267 morph_params=parameter(np.array([center[0], center[1], 0.8])),
268 morph_func=models.integrated_gaussian,
269 morph_grad=models.grad_integrated_gaussian,
270 morph_prox=partial(models.bounded_prox, proxmin=__proxmin, proxmax=__proxmax),
271 morph_step=np.array([1e-2, 1e-2, 1e-2]),
272 prox_spectrum=lambda x: x,
273 )
274 # Define the component to use ADAPROX as the optimizer
275 components = [component]
277 # Initialize an n=1 sersic component
278 component = EllipticalParametricComponent(
279 bands,
280 bbox,
281 spectrum=parameter(spectrum.copy()),
282 morph_params=parameter(np.array([center[0], center[1], 2 * 1.2**2, 2 * 1.2**2, 0.0, 1])),
283 morph_func=models.sersic,
284 morph_grad=models.grad_sersic,
285 morph_prox=partial(models.bounded_prox, proxmin=_proxmin, proxmax=_proxmax),
286 morph_step=np.array([1e-2, 1e-2, 1e-3, 1e-3, 1e-2, 1e-2]),
287 )
288 # Define the component to use ADAPROX as the optimizer
289 components.append(component)
291 component = EllipticalParametricComponent(
292 bands,
293 bbox,
294 spectrum=parameter(spectrum.copy()),
295 morph_params=parameter(np.array([center[0], center[1], 2 * 1.2**2, 2 * 1.2**2, 0.0, 1])),
296 morph_func=models.sersic,
297 morph_grad=models.grad_sersic,
298 morph_prox=partial(models.bounded_prox, proxmin=_proxmin, proxmax=_proxmax),
299 morph_step=np.array([1e-2, 1e-2, 1e-3, 1e-3, 1e-2, 1e-2]),
300 )
301 # Define the component to use ADAPROX as the optimizer
302 components.append(component)
304 # Create a new source using the two components
305 sources.append(Source(components))
307 # Fit the models
308 blend = Blend(sources, observation)
309 blend.parameterize(parameterize)
310 blend.fit_spectra()
312 # Check properties of a component
313 component = cast(ParametricComponent, blend.components[0])
314 self.assertTupleEqual(component.peak, tuple(self.centers[0]))
315 self.assertEqual(component.y0, component._params.x[0])
316 self.assertEqual(component.x0, component._params.x[1])
317 assert_array_equal(component.morph_step, np.array([1e-2, 1e-2, 1e-2]))
319 component = cast(EllipticalParametricComponent, blend.components[1])
320 self.assertTupleEqual(component.peak, tuple(self.centers[0]))
321 self.assertEqual(component.y0, component._params.x[0])
322 self.assertEqual(component.x0, component._params.x[1])
323 assert_array_equal(component.semi_major, component._params.x[2])
324 assert_array_equal(component.semi_minor, component._params.x[3])
325 assert_array_equal(component.theta, component._params.x[4])
326 assert_array_equal(component.ellipse_params, component._params.x[:5])
327 assert_array_equal(component.radial_params, component._params.x[5:])
328 self.assertIsNotNone(component.morph_prox)
329 self.assertIsNotNone(component.morph_grad)
331 blend.fit(12)
333 # Test elliptical and circular Gaussian models
334 sources = []
335 for idx, center in enumerate(self.centers):
336 # Get the integer center of the source
337 cy, cx = int(np.round(center[0])), int(np.round(center[1]))
338 # For now we use a fixed bounding box that is the size of
339 # the observed PSF image
340 bbox = Box((41, 41), origin=(cy - 20, cx - 20)) & observation.bbox
341 # Keep track of the initial positions
342 yi, xi = cy, cx
343 # Restrict the values of the parameters
344 _proxmin = np.array([yi - 2, xi - 2, 1e-1, 1e-1, -np.pi / 2])
345 _proxmax = np.array([yi + 2, xi + 2, frame.shape[-2] / 2, frame.shape[-1] / 2, np.pi / 2])
347 __proxmin = np.array([yi - 2, xi - 2])
348 __proxmax = np.array([yi + 2, xi + 2])
350 # Initialize a PSF-like component of the source using a
351 # non-pixel integrated gaussian
352 component = ParametricComponent(
353 bands,
354 bbox,
355 spectrum=parameter(spectrum.copy()),
356 morph_params=parameter(np.array([center[0], center[1]])),
357 morph_func=partial(models.circular_gaussian, sigma=0.8),
358 morph_grad=partial(models.grad_circular_gaussian, sigma=0.8),
359 morph_prox=partial(models.bounded_prox, proxmin=__proxmin, proxmax=__proxmax),
360 morph_step=np.array([1e-2, 1e-2, 1e-2]),
361 )
362 # Define the component to use ADAPROX as the optimizer
363 components = [component]
365 # Initialize an n=1 sersic component
366 component = EllipticalParametricComponent(
367 bands,
368 bbox,
369 spectrum=parameter(spectrum.copy()),
370 morph_params=parameter(np.array([center[0], center[1], 2 * 1.2**2, 2 * 1.2**2, 0.0])),
371 morph_func=models.gaussian2d,
372 morph_grad=models.grad_gaussian2,
373 morph_prox=partial(models.bounded_prox, proxmin=_proxmin, proxmax=_proxmax),
374 morph_step=np.array([1e-2, 1e-2, 1e-3, 1e-3, 1e-2, 1e-2]),
375 )
376 # Define the component to use ADAPROX as the optimizer
377 components.append(component)
378 # Create a new source using the two components
379 sources.append(Source(components))
381 # Fit the models
382 blend = Blend(sources, observation)
383 blend.parameterize(parameterize)
384 blend.fit_spectra()
385 blend.fit(12)
387 def test_psf_fitting(self):
388 # Use flat weights for FISTA optimization
389 weights = np.ones(self.data["images"].shape)
391 monotonicity = Monotonicity((101, 101))
393 observation = FittedPsfObservation(
394 self.data["images"],
395 self.data["variance"],
396 weights,
397 self.data["psfs"],
398 self.model_psf[None],
399 bands=self.data["filters"],
400 )
402 def fista_parameterization(component: Component):
403 if isinstance(component, FactorizedComponent):
404 component._spectrum = FistaParameter(component.spectrum, step=0.5)
405 component._morph = FistaParameter(component.morph, step=0.5)
406 else:
407 if isinstance(component, FittedPsfObservation):
408 component._fitted_kernel = FistaParameter(component._fitted_kernel.x, step=1e-2)
410 init = FactorizedInitialization(observation, self.centers, monotonicity=monotonicity)
411 blend = FittedPsfBlend(init.sources, observation).fit_spectra()
412 blend.parameterize(fista_parameterization)
413 assert isinstance(cast(FittedPsfObservation, blend.observation)._fitted_kernel, FistaParameter)
414 blend.fit(12, e_rel=1e-4)