Coverage for tests / test_modeller.py: 10%
288 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 08:58 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 08:58 +0000
1# This file is part of multiprofit.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22import math
23import time
25import lsst.gauss2d as g2
26import lsst.gauss2d.fit as g2f
27from lsst.multiprofit.componentconfig import (
28 CentroidConfig,
29 FluxFractionParameterConfig,
30 FluxParameterConfig,
31 GaussianComponentConfig,
32 ParameterConfig,
33 SersicComponentConfig,
34 SersicIndexParameterConfig,
35)
36from lsst.multiprofit.model_utils import make_image_gaussians, make_psf_model_null
37from lsst.multiprofit.modelconfig import ModelConfig
38from lsst.multiprofit.modeller import FitInputs, LinearGaussians, Modeller, fit_methods_linear
39from lsst.multiprofit.observationconfig import CoordinateSystemConfig, ObservationConfig
40from lsst.multiprofit.sourceconfig import ComponentGroupConfig, SourceConfig
41from lsst.multiprofit.utils import get_params_uniq
42import numpy as np
43import pytest
45sigma_inv = 1e4
48@pytest.fixture(scope="module")
49def channels() -> dict[str, g2f.Channel]:
50 return {band: g2f.Channel.get(band) for band in ("R", "G", "B")}
53@pytest.fixture(scope="module")
54def data(channels) -> g2f.DataD:
55 n_rows, n_cols = 25, 27
56 x_min, y_min = 0, 0
58 dn_rows, dn_cols = 2, -3
59 dx_min, dy_min = -1, 1
61 observations = []
62 for idx, band in enumerate(channels):
63 config = ObservationConfig(
64 band=band,
65 coordsys=CoordinateSystemConfig(
66 x_min=x_min + idx * dx_min,
67 y_min=y_min + idx * dy_min,
68 ),
69 n_rows=n_rows + idx * dn_rows,
70 n_cols=n_cols + idx * dn_cols,
71 )
72 observation = config.make_observation()
73 observation.image.fill(0)
74 observation.sigma_inv.fill(sigma_inv)
75 observation.mask_inv.fill(1e4)
76 observations.append(observation)
77 return g2f.DataD(observations)
80@pytest.fixture(scope="module")
81def psf_models(channels) -> list[g2f.PsfModel]:
82 rho, size_x, size_y = 0.12, 1.6, 1.2
83 drho, dsize_x, dsize_y = -0.3, 1.1, 1.9
84 drho_chan, dsize_x_chan, dsize_y_chan = 0.03, 0.12, 0.14
85 frac, dfrac = 0.62, -0.08
87 n_components = 2
88 psf_models = []
90 for idx_chan, channel in enumerate(channels.values()):
91 frac_chan = frac + idx_chan * dfrac
92 config = SourceConfig(
93 component_groups={
94 "psf": ComponentGroupConfig(
95 components_gauss={
96 str(idx): GaussianComponentConfig(
97 rho=ParameterConfig(value_initial=rho + idx * drho + idx_chan * drho_chan),
98 size_x=ParameterConfig(
99 value_initial=size_x + idx * dsize_x + idx_chan * dsize_x_chan
100 ),
101 size_y=ParameterConfig(
102 value_initial=size_y + idx * dsize_y + idx_chan * dsize_y_chan
103 ),
104 **(
105 {
106 "flux": FluxParameterConfig(value_initial=1.0, fixed=True),
107 "fluxfrac": FluxFractionParameterConfig(
108 value_initial=frac_chan, fixed=False
109 ),
110 }
111 if (idx == 0)
112 else {}
113 ),
114 )
115 for idx in range(n_components)
116 },
117 is_fractional=True,
118 )
119 },
120 )
121 config.validate()
122 psf_model, priors = config.make_psf_model(
123 [
124 component_group.get_fluxes_default(
125 channels=(g2f.Channel.NONE,),
126 component_configs=component_group.get_component_configs(),
127 is_fractional=component_group.is_fractional,
128 )
129 for component_group in config.component_groups.values()
130 ]
131 )
132 psf_models.append(psf_model)
133 return psf_models
136@pytest.fixture(scope="module")
137def model(channels, data, psf_models) -> g2f.ModelD:
138 rho, size_x, size_y, sersicn, flux = 0.4, 1.5, 1.9, 1.0, 4.7
139 drho, dsize_x, dsize_y, dsersicn, dflux = -0.9, 2.5, 5.4, 3.0, 13.9
141 components_sersic = {}
142 fluxes_group = []
144 # Linear interpolators fail to compute accurate likelihoods at knot values
145 is_linear_interp = (
146 g2f.SersicMixComponentIndexParameterD(
147 interpolator=SersicComponentConfig().get_interpolator(4)
148 ).interptype
149 == g2f.InterpType.linear
150 )
152 for idx, name in enumerate(("exp", "dev")):
153 components_sersic[name] = SersicComponentConfig(
154 rho=ParameterConfig(value_initial=rho + idx * drho),
155 size_x=ParameterConfig(value_initial=size_x + idx * dsize_x),
156 size_y=ParameterConfig(value_initial=size_y + idx * dsize_y),
157 sersic_index=SersicIndexParameterConfig(
158 # Add a small offset since 1.0 and 4.0 are bound to be knots
159 value_initial=sersicn + idx * dsersicn + 1e-4 * is_linear_interp,
160 fixed=idx == 0,
161 prior_mean=None,
162 ),
163 )
164 fluxes_comp = {
165 channel: flux + idx_channel * dflux * idx for idx_channel, channel in enumerate(channels.values())
166 }
167 fluxes_group.append(fluxes_comp)
169 modelconfig = ModelConfig(
170 sources={
171 "src": SourceConfig(
172 component_groups={
173 "": ComponentGroupConfig(
174 components_sersic=components_sersic,
175 centroids={
176 "default": CentroidConfig(
177 x=ParameterConfig(value_initial=12.14, fixed=True),
178 y=ParameterConfig(value_initial=13.78, fixed=True),
179 )
180 },
181 ),
182 }
183 ),
184 },
185 )
186 model = modelconfig.make_model([[fluxes_group]], data=data, psf_models=psf_models)
187 model.setup_evaluators(g2f.EvaluatorMode.loglike_image)
188 model.evaluate()
190 rng = np.random.default_rng(2)
192 n_obs = len(model.data)
193 for idx_obs in range(n_obs):
194 observation = model.data[idx_obs]
195 output = model.outputs[idx_obs]
196 observation.image.data.flat = (
197 output.data.flat + rng.standard_normal(output.data.size) / observation.sigma_inv.data.flat
198 )
200 return model
203@pytest.fixture
204def model_func_scope(model) -> g2f.ModelD:
205 model_func_scope = g2f.ModelD(data=model.data, psfmodels=model.psfmodels, sources=model.sources)
206 return model_func_scope
209@pytest.fixture(scope="module")
210def psf_observations(psf_models) -> list[g2f.ObservationD]:
211 config = ObservationConfig(n_rows=17, n_cols=19)
212 rng = np.random.default_rng(1)
214 observations = []
215 for psf_model in psf_models:
216 observation = config.make_observation()
217 # Have to make a duplicate image here because one can only call
218 # make_image_gaussians with an owning pointer, whereas
219 # observation.image is a reference
220 image = g2.ImageD(observation.image.data)
221 # Make the kernel centered
222 gaussians_source = psf_model.gaussians(g2f.Channel.NONE)
223 for idx in range(len(gaussians_source)):
224 gaussian_idx = gaussians_source.at(idx)
225 gaussian_idx.centroid.x = image.n_cols / 2.0
226 gaussian_idx.centroid.y = image.n_rows / 2.0
227 gaussians_kernel = g2.Gaussians([g2.Gaussian()])
228 make_image_gaussians(
229 gaussians_source=gaussians_source,
230 gaussians_kernel=gaussians_kernel,
231 output=image,
232 )
233 image.data.flat += 1e-4 * rng.standard_normal(image.data.size)
234 observation.mask_inv.fill(1)
235 observation.sigma_inv.fill(1e3)
236 observations.append(observation)
237 return observations
240@pytest.fixture(scope="module")
241def psf_fit_models(psf_models, psf_observations):
242 psf_null = [make_psf_model_null()]
243 return [
244 g2f.ModelD(g2f.DataD([observation]), psf_null, [g2f.Source(psf_model.components)])
245 for psf_model, observation in zip(psf_models, psf_observations)
246 ]
249def test_model_evaluation(channels, model, model_func_scope):
250 with pytest.raises(RuntimeError):
251 model_func_scope.evaluate()
253 printout = False
254 # Freeze the PSF params - they can't be fit anyway
255 for m in (model, model_func_scope):
256 for psf_model in m.psfmodels:
257 params = psf_model.parameters()
258 for param in params:
259 param.fixed = True
261 model.setup_evaluators(print=printout, force=True)
262 model.evaluate()
264 n_priors = 0
265 n_obs = len(model.data)
266 n_rows = np.zeros(n_obs, dtype=int)
267 n_cols = np.zeros(n_obs, dtype=int)
268 datasizes = np.zeros(n_obs, dtype=int)
269 ranges_params = [None] * n_obs
270 params_free = tuple(get_params_uniq(model_func_scope, fixed=False))
272 # There's one extra validation array
273 n_params_jac = len(params_free) + 1
274 assert n_params_jac > 1
276 for idx_obs in range(n_obs):
277 observation = model.data[idx_obs]
278 n_rows[idx_obs] = observation.image.n_rows
279 n_cols[idx_obs] = observation.image.n_cols
280 datasizes[idx_obs] = n_rows[idx_obs] * n_cols[idx_obs]
281 params = tuple(get_params_uniq(model, fixed=False, channel=observation.channel))
282 n_params_obs = len(params)
283 ranges_params_obs = [0] * (n_params_obs + 1)
284 for idx_param in range(n_params_obs):
285 ranges_params_obs[idx_param + 1] = params_free.index(params[idx_param]) + 1
286 ranges_params[idx_obs] = ranges_params_obs
288 n_free_first = len(ranges_params[0])
289 assert all([len(rp) == n_free_first for rp in ranges_params[1:]])
291 jacobians = [None] * n_obs
292 residuals = [None] * n_obs
293 datasize = np.sum(datasizes) + n_priors
294 jacobian = np.zeros((datasize, n_params_jac))
295 residual = np.zeros(datasize)
297 offset = 0
298 for idx_obs in range(n_obs):
299 size_obs = datasizes[idx_obs]
300 end = offset + size_obs
301 shape = (n_rows[idx_obs], n_cols[idx_obs])
302 jacobians_obs = [None] * n_params_jac
303 for idx_jac in range(n_params_jac):
304 jacobians_obs[idx_jac] = g2.ImageD(jacobian[offset:end, idx_jac].view().reshape(shape))
305 jacobians[idx_obs] = jacobians_obs
306 residuals[idx_obs] = g2.ImageD(residual[offset:end].view().reshape(shape))
307 offset = end
309 model.setup_evaluators(evaluatormode=g2f.EvaluatorMode.loglike)
310 loglike_init = model.evaluate()
312 model_func_scope.setup_evaluators(
313 evaluatormode=g2f.EvaluatorMode.jacobian,
314 outputs=jacobians,
315 residuals=residuals,
316 print=printout,
317 )
318 model_func_scope.verify_jacobian()
319 loglike_jac = model_func_scope.evaluate()
321 assert all(np.isclose(loglike_init, loglike_jac))
324@pytest.fixture(scope="module")
325def psf_models_linear_gaussians(channels, psf_models):
326 gaussians = [None] * len(psf_models)
327 for idx, psf_model in enumerate(psf_models):
328 params = psf_model.parameters(paramfilter=g2f.ParamFilter(nonlinear=False, channel=g2f.Channel.NONE))
329 params[0].fixed = False
330 gaussians[idx] = LinearGaussians.make(psf_model, is_psf=True)
331 # Return the param to its original state
332 params[0].fixed = True
333 return gaussians
336def test_make_psf_source_linear(psf_models, psf_models_linear_gaussians):
337 for psf_model, linear_gaussians in zip(psf_models, psf_models_linear_gaussians):
338 gaussians = psf_model.gaussians(g2f.Channel.NONE)
339 assert len(gaussians) == (
340 len(linear_gaussians.gaussians_free) + len(linear_gaussians.gaussians_fixed)
341 )
344def test_modeller(model):
345 # For debugging purposes
346 printout = False
347 # Force to ensure test-order independence
348 model.setup_evaluators(evaluatormode=g2f.EvaluatorMode.loglike_image, force=True)
349 # Get the model images
350 model.evaluate()
351 rng = np.random.default_rng(3)
353 for idx_obs, observation in enumerate(model.data):
354 output = model.outputs[idx_obs]
355 observation.image.data.flat = (
356 output.data.flat + rng.standard_normal(output.data.size) / observation.sigma_inv.data.flat
357 )
359 # Freeze the PSF params - they can't be fit anyway
360 for psf_model in model.psfmodels:
361 for param in psf_model.parameters():
362 param.fixed = True
364 params_free = tuple(get_params_uniq(model, fixed=False))
365 values_true = tuple(param.value for param in params_free)
367 modeller = Modeller()
369 dloglike = model.compute_loglike_grad(verify=True, findiff_frac=1e-8, findiff_add=1e-8)
370 assert all(np.isfinite(dloglike))
372 time_init = time.process_time()
373 kwargs_fit = dict(ftol=1e-6, xtol=1e-6)
375 for delta_param in (0, 0.2):
376 model = g2f.ModelD(data=model.data, psfmodels=model.psfmodels, sources=model.sources)
377 values_init = values_true
378 if delta_param != 0:
379 for param, value_init in zip(params_free, values_init):
380 param.value = value_init
381 try:
382 param.value_transformed += delta_param
383 except RuntimeError:
384 param.value_transformed -= delta_param
386 model.setup_evaluators(evaluatormode=g2f.EvaluatorMode.loglike)
387 loglike_init = np.array(model.evaluate())
388 results = modeller.fit_model(model, **kwargs_fit)
389 params_best = results.params_best
391 for param, value in zip(params_free, params_best):
392 param.value_transformed = value
394 loglike_noprior = model.evaluate()
395 assert np.sum(loglike_noprior) > np.sum(loglike_init)
397 errors = modeller.compute_variances(model)
398 # TODO: This should check >0, and < (some reasonable value)
399 # However, but the scipy least squares does not do a great job
400 # optimizing and the loglike_grad isn't even negative...
401 assert np.all(np.isfinite(errors))
403 if printout:
404 print(
405 f"got loglike={loglike_noprior} (init={sum(loglike_noprior)})"
406 f" from modeller.fit_model in t={time.process_time() - time_init:.3e}, x={params_best},"
407 f" results: \n{results}"
408 )
410 loglike_noprior_sum = sum(loglike_noprior)
411 for offset in (0, 1e-6):
412 for param, value in zip(params_free, params_best):
413 param.value_transformed = value
414 priors = tuple(
415 g2f.GaussianPrior(param, param.value_transformed + offset, 1.0, transformed=True)
416 for param in params_free
417 )
418 if offset == 0:
419 for p in priors:
420 assert p.evaluate().loglike == 0
421 assert p.loglike_const_terms[0] == -math.log(math.sqrt(2 * math.pi))
422 model_new = g2f.ModelD(
423 data=model.data, psfmodels=model.psfmodels, sources=model.sources, priors=priors
424 )
425 model_new.setup_evaluators(evaluatormode=g2f.EvaluatorMode.loglike)
426 loglike_init = sum(loglike_eval for loglike_eval in model_new.evaluate())
427 if offset == 0:
428 assert np.isclose(loglike_init, loglike_noprior_sum, rtol=1e-10, atol=1e-10)
429 else:
430 assert loglike_init < loglike_noprior_sum
432 time_init = time.process_time()
433 results = modeller.fit_model(model_new, **kwargs_fit)
434 time_init = time.process_time() - time_init
435 loglike_new = -results.result.cost
436 for param, value in zip(params_free, results.params_best):
437 param.value_transformed = value
439 model_new.setup_evaluators(evaluatormode=g2f.EvaluatorMode.loglike)
440 loglike_model = sum(loglike_eval for loglike_eval in model_new.evaluate())
441 assert np.isclose(loglike_new, loglike_model, rtol=1e-10, atol=1e-10)
442 # This should be > 0. TODO: Determine why it isn't always
443 assert (loglike_new - loglike_init) > -1e-3
445 if printout:
446 print(
447 f"got loglike={loglike_new} (first={loglike_noprior})"
448 f" from modeller.fit_model in t={time_init:.3e}, x={results.params_best},"
449 f" results: \n{results}"
450 )
451 # Adding a suitably-scaled prior far from the truth should always
452 # worsen loglikel, but doesn't - why? noise bias? bad convergence?
453 # assert (loglike_new >= loglike_noprior) == (offset == 0)
455 # Return parameters to original values since they are shared between
456 # model instances, which is a useful persistence test.
457 for param, value in zip(params_free, values_true):
458 param.value = value
461def test_psf_model_fit(psf_fit_models):
462 for model in psf_fit_models:
463 params = get_params_uniq(model.sources[0])
464 params_freed = set()
465 for param in params:
466 # Fitting the total flux won't work in a fractional model (yet)
467 if isinstance(param, g2f.IntegralParameterD):
468 assert param.fixed
469 else:
470 params_freed.add(param)
471 param.fixed = False
472 # Necessary whenever parameters are freed/fixed
473 model.setup_evaluators(g2f.EvaluatorMode.jacobian, force=True)
474 errors = model.verify_jacobian(rtol=5e-4, atol=5e-4, findiff_add=1e-6, findiff_frac=1e-6)
475 if errors:
476 import matplotlib.pyplot as plt
478 print(model.parameters())
480 fitinputs = FitInputs.from_model(model)
481 model.setup_evaluators(
482 evaluatormode=g2f.EvaluatorMode.jacobian,
483 outputs=fitinputs.jacobians,
484 residuals=fitinputs.residuals,
485 print=True,
486 force=True,
487 )
488 model.evaluate(print=True)
489 assert (fitinputs.jacobians[0][0].data == 0).all()
490 assert np.sum(np.abs(fitinputs.jacobians[0][1].data)) > 0
491 model.setup_evaluators(evaluatormode=g2f.EvaluatorMode.loglike_image)
492 model.evaluate()
493 outputs = model.outputs
494 diffs = [g2.ImageD(img.data.copy()) for img in outputs]
495 delta = 1e-5
496 param.value -= delta
497 model.evaluate()
498 for diff, output in zip(diffs, outputs):
499 diff = (output.data - diff.data) / delta
500 jacobian = fitinputs.jacobians[0][1].data
501 fig, ax = plt.subplots(1, 2)
502 ax[0].imshow(diff)
503 ax[1].imshow(jacobian)
504 plt.show()
505 assert len(errors) == 0
506 # Return the params to their original settings
507 for param in params_freed:
508 param.fixed = True
511def test_psf_models_linear_gaussians(data, psf_models_linear_gaussians, psf_observations):
512 results = [None] * len(psf_observations)
513 for idx, (gaussians_linear, observation_psf) in enumerate(
514 zip(psf_models_linear_gaussians, psf_observations)
515 ):
516 results[idx] = Modeller.fit_gaussians_linear(
517 gaussians_linear=gaussians_linear,
518 observation=observation_psf,
519 fit_methods=fit_methods_linear,
520 plot=False,
521 )
522 assert len(results[idx]) > 0
525def test_modeller_fit_linear(model):
526 modeller = Modeller()
527 params = get_params_uniq(model)
528 params_linear_free = {}
529 params_other = {}
530 for param in params:
531 (params_linear_free if (param.free and param.linear) else params_other)[param] = param.value
532 results = modeller.fit_model_linear(model, validate=True)
533 assert results is not None
535 for param, value in params_linear_free.items():
536 assert value != param.value
537 param.value = value
538 assert all([param.value == value for param, value in params_other.items()])