Coverage for python / lsst / multiprofit / modeller.py: 13%

518 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-17 08:58 +0000

1# This file is part of multiprofit. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22__all__ = [ 

23 "InvalidProposalError", 

24 "fit_methods_linear", 

25 "LinearGaussians", 

26 "make_image_gaussians", 

27 "make_psf_model_null", 

28 "FitInputsBase", 

29 "FitInputsDummy", 

30 "ModelFitConfig", 

31 "FitResult", 

32 "Modeller", 

33] 

34 

35from abc import ABC, abstractmethod 

36from collections.abc import Sequence 

37import logging 

38import sys 

39import time 

40from typing import Any, ClassVar, Iterable, TypeAlias 

41 

42import lsst.gauss2d as g2 

43import lsst.gauss2d.fit as g2f 

44import lsst.pex.config as pexConfig 

45import numpy as np 

46import pydantic 

47import scipy.optimize as spopt 

48 

49from .model_utils import make_image_gaussians, make_psf_model_null 

50from .utils import arbitrary_allowed_config, frozen_arbitrary_allowed_config, get_params_uniq 

51 

52_has_py_11_plus = sys.version_info >= (3, 11, 0) 

53if _has_py_11_plus: 53 ↛ 56line 53 didn't jump to line 56 because the condition on line 53 was always true

54 from typing import Self 

55else: 

56 from typing import TypeVar 

57 

58 Self = TypeVar("Self", bound="LinearGaussians") # type: ignore 

59 

60try: 

61 # TODO: try importlib.util.find_spec 

62 from fastnnls import fnnls # noqa 

63 

64 has_fastnnls = True 

65except ImportError: 

66 has_fastnnls = False 

67 

68try: 

69 # TODO: try importlib.util.find_spec 

70 import pygmo as pg # noqa 

71 

72 has_pygmo = True 

73except ImportError: 

74 has_pygmo = False 

75 

76 

77Model: TypeAlias = g2f.ModelD | g2f.ModelF 

78 

79 

80class InvalidProposalError(ValueError): 

81 """Error for an invalid parameter proposal.""" 

82 

83 

84fit_methods_linear = { 

85 "scipy.optimize.nnls": {}, 

86 "scipy.optimize.lsq_linear": {"bounds": (1e-5, np.inf), "method": "bvls"}, 

87 "numpy.linalg.lstsq": {"rcond": 1e-3}, 

88} 

89if has_fastnnls: 89 ↛ 90line 89 didn't jump to line 90 because the condition on line 89 was never true

90 fit_methods_linear["fastnnls.fnnls"] = {} 

91 

92 

93class LinearGaussians(pydantic.BaseModel): 

94 """Helper for linear least-squares fitting of Gaussian mixtures.""" 

95 

96 model_config: ClassVar[pydantic.ConfigDict] = frozen_arbitrary_allowed_config 

97 

98 gaussians_fixed: g2.Gaussians = pydantic.Field(title="Fixed Gaussian components") 

99 gaussians_free: tuple[tuple[g2.Gaussians, g2f.ParameterD], ...] = pydantic.Field( 

100 title="Free Gaussian components" 

101 ) 

102 

103 @staticmethod 

104 def make( 

105 component_mixture: g2f.ComponentMixture, 

106 channel: g2f.Channel = None, 

107 is_psf: bool = False, 

108 ) -> Self: 

109 """Make a LinearGaussians from a ComponentMixture. 

110 

111 Parameters 

112 ---------- 

113 component_mixture 

114 A component mixture to initialize Gaussians from. 

115 channel 

116 The channel all Gaussians are applicable for. 

117 is_psf 

118 Whether the components are a smoothing kernel. 

119 

120 Returns 

121 ------- 

122 lineargaussians 

123 A LinearGaussians instance initialized with the appropriate 

124 fixed/free gaussians. 

125 """ 

126 if channel is None: 

127 channel = g2f.Channel.NONE 

128 components = component_mixture.components 

129 if len(components) == 0: 

130 raise ValueError(f"Can't get linear Source from {component_mixture=} with no components") 

131 

132 gaussians_free = [] 

133 gaussians_fixed = [] 

134 

135 for component in components: 

136 gaussians: g2.Gaussians = component.gaussians(channel) 

137 # TODO: Support multi-Gaussian components if sensible 

138 # The challenge would be in mapping linear param values back onto 

139 # non-linear IntegralModels 

140 if is_psf: 

141 n_g = len(gaussians) 

142 if n_g != 1: 

143 raise ValueError(f"{component=} has {gaussians=} of len {n_g=}!=1") 

144 param_fluxes = component.parameters(paramfilter=g2f.ParamFilter(nonlinear=False, channel=channel)) 

145 if len(param_fluxes) != 1: 

146 raise ValueError(f"Can't make linear source from {component=} with {len(param_fluxes)=}") 

147 param_flux: g2f.ParameterD = param_fluxes[0] 

148 if param_flux.fixed: 

149 gaussians_fixed.append(gaussians.at(0)) 

150 else: 

151 gaussians_free.append((gaussians, param_flux)) 

152 

153 return LinearGaussians( 

154 gaussians_fixed=g2.Gaussians(gaussians_fixed), gaussians_free=tuple(gaussians_free) 

155 ) 

156 

157 

158class FitInputsBase(ABC): 

159 """Interface for inputs to a model fit.""" 

160 

161 @abstractmethod 

162 def validate_for_model(self, model: Model) -> list[str]: 

163 """Check that this FitInputs is valid for a Model. 

164 

165 Parameters 

166 ---------- 

167 model 

168 The model to validate with. 

169 

170 Returns 

171 ------- 

172 errors 

173 A list of validation errors, if any. 

174 """ 

175 

176 

177class FitInputsDummy(FitInputsBase): 

178 """A dummy FitInputs that always fails to validate. 

179 

180 This class can be used to initialize a FitInputsBase that may be 

181 reassigned to a non-dummy derived instance in a loop. 

182 """ 

183 

184 def validate_for_model(self, model: Model) -> list[str]: 

185 return [ 

186 "This is a dummy FitInputs and will never validate", 

187 ] 

188 

189 

190if not _has_py_11_plus: 190 ↛ 191line 190 didn't jump to line 191 because the condition on line 190 was never true

191 Self = TypeVar("Self", bound="FitInputs") # type: ignore 

192 

193 

194class FitInputs(FitInputsBase, pydantic.BaseModel): 

195 """Model fit inputs for gauss2dfit.""" 

196 

197 model_config: ClassVar[pydantic.ConfigDict] = arbitrary_allowed_config 

198 

199 jacobian: np.ndarray = pydantic.Field(None, title="The full Jacobian array") 

200 jacobians: list[list[g2.ImageD]] = pydantic.Field( 

201 title="Jacobian arrays (views) for each observation", 

202 ) 

203 outputs_prior: tuple[g2.ImageD, ...] = pydantic.Field( 

204 title="Jacobian arrays (views) for each free parameter's prior", 

205 ) 

206 residual: np.ndarray = pydantic.Field(title="The full residual (chi) array") 

207 residuals: list[g2.ImageD] = pydantic.Field( 

208 default_factory=list, 

209 title="Residual (chi) arrays (views) for each observation", 

210 ) 

211 residuals_prior: g2.ImageD = pydantic.Field( 

212 title="Shared residual array for all Prior instances", 

213 ) 

214 

215 @classmethod 

216 def get_sizes( 

217 cls, 

218 model: Model, 

219 ) -> tuple[int, int, int, np.ndarray]: 

220 """Initialize Jacobian and residual arrays for a model. 

221 

222 Parameters 

223 ---------- 

224 model : `lsst.gauss2d.fit.Model` 

225 The model to initialize arrays for. 

226 

227 Returns 

228 ------- 

229 n_obs 

230 The number of observations initialized. 

231 n_params_jac 

232 The number of Jacobian matrix columns, which is the number of free 

233 parameters plus one validation column. 

234 n_prior_residuals 

235 The number of residual array values required for priors. 

236 shapes 

237 An ndarray containing the number of rows and columns for each 

238 observation in rows. 

239 """ 

240 priors = model.priors 

241 n_prior_residuals = sum(len(p) for p in priors) 

242 params_free = tuple(get_params_uniq(model, fixed=False)) 

243 n_params_free = len(params_free) 

244 # gauss2d_fit reserves the zeroth index of the jacobian array for 

245 # validation, i.e. it can be used to dump terms for fixed params 

246 n_params_jac = n_params_free + 1 

247 if not (n_params_jac > 1): 

248 raise ValueError("Can't fit model with no free parameters") 

249 

250 n_obs = len(model.data) 

251 shapes = np.zeros((n_obs, 2), dtype=int) 

252 ranges_params = [None] * n_obs 

253 

254 for idx_obs in range(n_obs): 

255 observation = model.data[idx_obs] 

256 shapes[idx_obs, :] = (observation.image.n_rows, observation.image.n_cols) 

257 # Get the free parameter indices for each observation 

258 params = tuple(get_params_uniq(model, fixed=False, channel=observation.channel)) 

259 n_params_obs = len(params) 

260 ranges_params_obs = [0] * (n_params_obs + 1) 

261 for idx_param in range(n_params_obs): 

262 ranges_params_obs[idx_param + 1] = params_free.index(params[idx_param]) + 1 

263 ranges_params[idx_obs] = ranges_params_obs 

264 

265 n_free_first = len(ranges_params[0]) 

266 # Ensure that there are the same number of free parameters in each obs 

267 # They don't need to be the same set, but the counts should equal 

268 # (this assumption may be violated by future IntegralModels - TBD) 

269 assert all([len(rp) == n_free_first for rp in ranges_params[1:]]) 

270 

271 return n_obs, n_params_jac, n_prior_residuals, shapes 

272 

273 @classmethod 

274 def from_model( 

275 cls, 

276 model: Model, 

277 ) -> Self: 

278 """Initialize Jacobian and residual arrays for a model. 

279 

280 Parameters 

281 ---------- 

282 model : `gauss2d.fit.Model` 

283 The model to initialize arrays for. 

284 """ 

285 n_obs, n_params_jac, n_prior_residuals, shapes = cls.get_sizes(model) 

286 n_pixels_cumsum = np.cumsum(np.prod(shapes, axis=1)) 

287 n_pixels_total = n_pixels_cumsum[-1] 

288 size_data = n_pixels_total + n_prior_residuals 

289 shape_jacobian = (size_data, n_params_jac) 

290 jacobian = np.zeros(shape_jacobian) 

291 jacobians = [None] * n_obs 

292 outputs_prior = [None] * n_params_jac 

293 for idx in range(n_params_jac): 

294 outputs_prior[idx] = g2.ImageD(jacobian[n_pixels_total:, idx].reshape((1, n_prior_residuals))) 

295 

296 residual = np.zeros(size_data) 

297 residuals = [None] * n_obs 

298 residuals_prior = g2.ImageD(residual[n_pixels_total:].reshape(1, n_prior_residuals)) 

299 

300 offset = 0 

301 for idx_obs in range(n_obs): 

302 shape = shapes[idx_obs, :] 

303 size_obs = shape[0] * shape[1] 

304 end = offset + size_obs 

305 jacobians_obs = [None] * n_params_jac 

306 for idx_jac in range(n_params_jac): 

307 jacobians_obs[idx_jac] = g2.ImageD(jacobian[offset:end, idx_jac].reshape(shape)) 

308 jacobians[idx_obs] = jacobians_obs 

309 residuals[idx_obs] = g2.ImageD(residual[offset:end].reshape(shape)) 

310 offset = end 

311 if offset != n_pixels_cumsum[idx_obs]: 

312 raise RuntimeError(f"Assigned {offset=} data points != {n_pixels_cumsum[idx_obs]=}") 

313 return cls( 

314 jacobian=jacobian, 

315 jacobians=jacobians, 

316 residual=residual, 

317 residuals=residuals, 

318 outputs_prior=tuple(outputs_prior), 

319 residuals_prior=residuals_prior, 

320 ) 

321 

322 def validate_for_model(self, model: Model) -> list[str]: 

323 n_obs, n_params_jac, n_prior_residuals, shapes = self.get_sizes(model) 

324 n_pixels_total = np.sum(np.prod(shapes, axis=1)) 

325 size_data = n_pixels_total + n_prior_residuals 

326 shape_jacobian = (size_data, n_params_jac) 

327 

328 errors = [] 

329 

330 if self.jacobian.shape != shape_jacobian: 

331 errors.append(f"{self.jacobian.shape=} != {shape_jacobian=}") 

332 

333 if len(self.jacobians) != n_obs: 

334 errors.append(f"{len(self.jacobians)=} != {n_obs=}") 

335 

336 if len(self.residuals) != n_obs: 

337 errors.append(f"{len(self.residuals)=} != {n_obs=}") 

338 

339 if not errors: 

340 for idx_obs in range(n_obs): 

341 shape_obs = shapes[idx_obs, :] 

342 jacobian_obs = self.jacobians[idx_obs] 

343 n_jacobian_obs = len(jacobian_obs) 

344 if n_jacobian_obs != n_params_jac: 

345 errors.append(f"len(self.jacobians[{idx_obs}])={n_jacobian_obs} != {n_params_jac=}") 

346 else: 

347 for idx_jac in range(n_jacobian_obs): 

348 if not all(jacobian_obs[idx_jac].shape == shape_obs): 

349 errors.append(f"{jacobian_obs[idx_jac].shape=} != {shape_obs=}") 

350 if not all(self.residuals[idx_obs].shape == shape_obs): 

351 errors.append(f"{self.residuals[idx_obs].shape=} != {shape_obs=}") 

352 

353 shape_residual_prior = [1, n_prior_residuals] 

354 if len(self.outputs_prior) != n_params_jac: 

355 errors.append(f"{len(self.outputs_prior)=} != {n_params_jac=}") 

356 elif n_prior_residuals > 0: 

357 for idx in range(n_params_jac): 

358 if self.outputs_prior[idx].shape != shape_residual_prior: 

359 errors.append(f"{self.outputs_prior[idx].shape=} != {shape_residual_prior=}") 

360 

361 if n_prior_residuals > 0: 

362 if self.residuals_prior.shape != shape_residual_prior: 

363 errors.append(f"{self.residuals_prior.shape=} != {shape_residual_prior=}") 

364 

365 return errors 

366 

367 

368class ModelFitConfig(pexConfig.Config): 

369 """Configuration for model fitting.""" 

370 

371 eval_residual = pexConfig.Field[bool]( 

372 doc="Whether to evaluate the residual every iteration before the Jacobian, which can improve " 

373 "performance if most steps do not call the Jacobian function. Must be set to True if the " 

374 "optimizer does not always evaluate the residual first, before the Jacobian.", 

375 default=True, 

376 ) 

377 fit_linear_iter = pexConfig.Field[int]( 

378 doc="The number of iterations to wait before performing a linear fit during optimization." 

379 " Default 0 disables the feature.", 

380 default=0, 

381 ) 

382 optimization_library = pexConfig.ChoiceField[str]( 

383 doc="The optimization library to use when fitting", 

384 allowed={ 

385 "pygmo": "Pygmo2", 

386 "scipy": "scipy.optimize", 

387 }, 

388 default="scipy", 

389 ) 

390 

391 def validate(self) -> None: 

392 if not self.fit_linear_iter >= 0: 

393 raise ValueError(f"{self.fit_linear_iter=} must be >=0") 

394 

395 

396class FitResult(pydantic.BaseModel): 

397 """Results from a Modeller fit, including metadata.""" 

398 

399 model_config: ClassVar[pydantic.ConfigDict] = arbitrary_allowed_config 

400 

401 chisq_best: float = pydantic.Field(default=0, title="The chi-squared (sum) of the best-fit parameters") 

402 # TODO: Why does setting default=ModelFitConfig() cause a circular import? 

403 config: ModelFitConfig = pydantic.Field(None, title="The configuration for fitting") 

404 inputs: FitInputs | None = pydantic.Field(None, title="The fit input arrays") 

405 result: Any | None = pydantic.Field( 

406 None, 

407 title="The result object of the fit, directly from the optimizer", 

408 ) 

409 params: tuple[g2f.ParameterD, ...] | None = pydantic.Field( 

410 None, 

411 title="The parameter instances corresponding to params_best", 

412 ) 

413 params_best: tuple[float, ...] | None = pydantic.Field( 

414 None, 

415 title="The best-fit parameter array (un-transformed)", 

416 ) 

417 params_free_missing: tuple[g2f.ParameterD, ...] | None = pydantic.Field( 

418 None, 

419 title="Free parameters that were fixed during fitting - usually an" 

420 " IntegralParameterD for a band with missing data", 

421 ) 

422 n_eval_resid: int = pydantic.Field(0, title="Total number of self-reported residual function evaluations") 

423 n_eval_func: int = pydantic.Field( 

424 0, title="Total number of optimizer-reported fitness function evaluations" 

425 ) 

426 n_eval_jac: int = pydantic.Field( 

427 0, title="Total number of optimizer-reported Jacobian function evaluations" 

428 ) 

429 time_eval: float = pydantic.Field(0, title="Total runtime spent in model/Jacobian evaluation") 

430 time_run: float = pydantic.Field(0, title="Total runtime spent in fitting, excluding initial setup") 

431 

432 

433def set_params(params: Iterable[g2f.ParameterD], params_new: Iterable[float], model_loglike: Model): 

434 """Set new parameter values from an optimizer proposal. 

435 

436 Parameters 

437 ---------- 

438 params 

439 An iterable of ParameterD instances. 

440 params_new 

441 An iterable of new untransformed values for params. 

442 model_loglike 

443 A model instance configured to compute the log-likelihood. 

444 

445 Raises 

446 ------ 

447 InvalidProposalError 

448 Raised if a new value is nan, or if a RuntimeError is raised when 

449 setting the new value. 

450 RuntimeError 

451 Raised if the new transformed value is not finite. 

452 """ 

453 try: 

454 for param, value in zip(params, params_new, strict=True): 

455 if np.isnan(value): 

456 raise InvalidProposalError( 

457 f"optimizer for {model_loglike=} proposed non-finite {value=} for {param=}" 

458 ) 

459 param.value_transformed = value 

460 if not np.isfinite(param.value): 

461 raise RuntimeError(f"{param=} set to (transformed) non-finite {value=}") 

462 except RuntimeError as e: 

463 raise InvalidProposalError(f"optimizer for {model_loglike=} proposal generated error={e}") 

464 

465 

466def residual_scipy( 

467 params_new: np.ndarray, 

468 model_jacobian: Model, 

469 model_loglike: Model, 

470 params: tuple[g2f.ParameterD], 

471 result: FitResult, 

472 jacobian: np.ndarray | None, 

473 never_evaluate_jacobian: bool = False, 

474 return_loglike: bool = False, 

475) -> np.ndarray: 

476 """Compute the residual for a scipy optimizer. 

477 

478 Parameters 

479 ---------- 

480 params_new 

481 An array of new parameter values. 

482 model_jacobian 

483 A model instance configured to compute the Jacobian. 

484 model_loglike 

485 A model instance configured to compute the log-likelihood. 

486 params 

487 A tuple of the free parameters. The length and order must be identical 

488 to params_new. 

489 result 

490 A FitResult instance to update. 

491 jacobian 

492 The Jacobian array. Unused in this function. 

493 never_evaluate_jacobian 

494 If True, the jacobian will never be evaluated, taking precedence 

495 over result.config.eval_residual. 

496 return_loglike 

497 If False, will return the negative of the residual instead of the 

498 log-likelihood. 

499 

500 Returns 

501 ------- 

502 The log-likehood if return_loglike, otherwise the negative of the 

503 residual from result.inputs.residual. kwargs are for the convenience of 

504 libraries other than scipy and will not be changed by scipy itself. 

505 

506 Notes 

507 ----- 

508 Scipy requires that this function have the same args as the jacobian 

509 function (jacobian_scipy), so unused args must not be removed. 

510 

511 Scipy generally calls this function every iteration, but only conditionally 

512 calls the jacobian_scipy function (e.g. if the proposal is accepted). If 

513 users expect proposals to (almost) always be accepted, it is more efficient 

514 to compute the Jacobian here (and skip evaluating it again when 

515 jacobian_scipy is called), because evaluating model_jacobian also updates 

516 the residual array, and so there is no need to evaluate model_loglike. 

517 

518 To summarize, if never_evaluate_jacobian or config_fit.eval_residual: 

519 There is ALWAYS one call to model_jacobian.evaluate, 

520 and ZERO calls to model_loglike.evaluate. 

521 else: 

522 There is always one call to model_loglike.evaluate, 

523 and MAYBE one call to model_jacobian.evaluate. 

524 

525 """ 

526 set_params(params, params_new, model_loglike) 

527 config_fit = result.config 

528 fit_linear_iter = config_fit.fit_linear_iter 

529 if (fit_linear_iter > 0) and ((result.n_eval_resid + 1) % fit_linear_iter == 0): 

530 Modeller.fit_model_linear(model_loglike, ratio_min=1e-6) 

531 time_init = time.process_time() 

532 

533 if never_evaluate_jacobian or config_fit.eval_residual: 

534 try: 

535 loglike = model_loglike.evaluate() 

536 except Exception: 

537 loglike = None 

538 result.n_eval_resid += 1 

539 else: 

540 loglike = model_jacobian.evaluate() 

541 result.n_eval_jac += 1 

542 result.time_eval += time.process_time() - time_init 

543 return loglike if return_loglike else -result.inputs.residual 

544 

545 

546def jacobian_scipy( 

547 params_new: np.ndarray, 

548 model_jacobian: Model, 

549 model_loglike: Model, 

550 params: tuple[g2f.ParameterD], 

551 result: FitResult, 

552 jacobian: np.ndarray, 

553 always_evaluate_jacobian: bool = False, 

554) -> np.ndarray: 

555 """Compute the Jacobian for a scipy optimizer. 

556 

557 Parameters 

558 ---------- 

559 params_new 

560 An array of new parameter values. Unused here. 

561 model_jacobian 

562 A model instance configured to compute the Jacobian. 

563 model_loglike 

564 A model instance configured to compute the log-likelihood. Unused here. 

565 params 

566 A tuple of the free parameters. Unused here. 

567 result 

568 A FitResult instance to update. 

569 jacobian 

570 The Jacobian array. Unused in this function. 

571 always_evaluate_jacobian 

572 If True, the jacobian will always be evaluated, taking precedence 

573 over result.config.eval_residual. 

574 

575 Returns 

576 ------- 

577 A reference to jacobian, whose values may have been updated. 

578 

579 Notes 

580 ----- 

581 Scipy requires that this function have the same args as the residual 

582 function (residual_scipy), so unused args must not be removed. kwargs are 

583 for the convenience of libraries other than scipy and will not be changed 

584 by scipy itself. 

585 

586 Parameter objects and new values are unused here as they will have already 

587 been set by the residual funciton. 

588 

589 Scipy generally does not call this function every iteration. If it is 

590 configured to skip evaluating the Jacobian, it is presumed to have been 

591 updated by the residual function already. 

592 """ 

593 if always_evaluate_jacobian or result.config.eval_residual: 

594 time_init = time.process_time() 

595 model_jacobian.evaluate() 

596 result.time_eval += time.process_time() - time_init 

597 result.n_eval_jac += 1 

598 return jacobian 

599 

600 

601if has_pygmo: 601 ↛ 602line 601 didn't jump to line 602 because the condition on line 601 was never true

602 class PygmoUDP: 

603 """A Pygmo User-Defined Problem for a MultiProFit model. 

604 

605 Pygmo optimizers take a class with a fitness function 

606 (i.e. the negative log-likelihood, although one could use some other 

607 arbitrary fitness function if it made snese to do so), with a 

608 fitness function and a gradient function returning the derivative of 

609 the fitness w.r.t. each free parameters. 

610 

611 Pygmo optimizers do not appear to use the full residual array or 

612 Jacobian the way scipy optimizers do. The gradient of the 

613 log-likelihood is cheaper to compute than the full Jacobian; however, 

614 using only the gradient of the fitness may cause slower convergence. 

615 

616 Parameters 

617 ---------- 

618 params 

619 A tuple of the free parameters. 

620 model_loglike 

621 A model configured to compute the log-likelihood. 

622 model_loglike_grad 

623 A model configured to compute the gradient of the 

624 log-likelihood w.r.t. each free parameter. 

625 bounds_lower 

626 A tuple of the lower bounds of the transformed value for each 

627 free parameter in params. 

628 bounds_upper 

629 A tuple of the upper bounds of the transformed value for each 

630 free parameter in params. 

631 result 

632 A result object to update and read configuration from. 

633 """ 

634 

635 def __init__( 

636 self, 

637 params: tuple[g2f.ParameterD], 

638 model_loglike: Model, 

639 model_loglike_grad: Model, 

640 bounds_lower: tuple[float], 

641 bounds_upper: tuple[float], 

642 result: FitResult, 

643 ): 

644 self.params = params 

645 self.model_loglike = model_loglike 

646 self.model_loglike_grad = model_loglike_grad 

647 self.bounds_lower = bounds_lower 

648 self.bounds_upper = bounds_upper 

649 self.result = result 

650 

651 def fitness(self, x): 

652 loglike = residual_scipy( 

653 x, 

654 model_jacobian=self.model_loglike_grad, 

655 model_loglike=self.model_loglike, 

656 params=self.params, 

657 result=self.result, 

658 jacobian=None, 

659 never_evaluate_jacobian=True, 

660 return_loglike=True, 

661 ) 

662 return [-sum(loglike),] 

663 

664 def get_bounds(self): 

665 return self.bounds_lower, self.bounds_upper 

666 

667 def gradient(self, x): 

668 set_params(params=self.params, params_new=x, model_loglike=self.model_loglike) 

669 time_init = time.process_time() 

670 loglike_grad = -np.array(self.model_loglike_grad.compute_loglike_grad()) 

671 self.result.time_eval += time.process_time() - time_init 

672 self.result.n_eval_jac += 1 

673 return loglike_grad 

674 

675 def __deepcopy__(self, memo): 

676 """Make a deep copy of a model with a shallow copy of the data 

677 which should not be duplicated. 

678 

679 Pygmo optimizers always make at least one copy of this class, and 

680 some (like particle swarm) will make many more. The input data 

681 must be shallow copies, both to avoid excess memory usage and 

682 because Model instances cannot be deep copied. 

683 """ 

684 fitinputs = FitInputs.from_model(self.model_loglike) 

685 model_loglike, model_loglike_grad = (g2f.ModelD( 

686 data=model.data, 

687 psfmodels=model.psfmodels, 

688 sources=model.sources, 

689 priors=model.priors, 

690 ) for model in (self.model_loglike, self.model_loglike_grad)) 

691 model_loglike.setup_evaluators(evaluatormode=g2f.EvaluatorMode.loglike) 

692 model_loglike_grad.setup_evaluators(evaluatormode=g2f.EvaluatorMode.loglike_grad) 

693 

694 copied = self.__class__( 

695 params=self.params, 

696 model_loglike=model_loglike, 

697 model_loglike_grad=model_loglike_grad, 

698 bounds_lower=self.bounds_lower, 

699 bounds_upper=self.bounds_upper, 

700 result=FitResult(inputs=fitinputs, config=self.result.config), 

701 ) 

702 memo[id(self)] = copied 

703 return copied 

704 

705 

706class Modeller: 

707 """Fit lsst.gauss2d.fit Model instances using Python optimizers. 

708 

709 Parameters 

710 ---------- 

711 logger : `logging.Logger` 

712 The logger. Defaults to calling `_getlogger`. 

713 """ 

714 

715 def __init__(self, logger: logging.Logger | None = None) -> None: 

716 if logger is None: 

717 logger = self._get_logger() 

718 self.logger = logger 

719 

720 @staticmethod 

721 def _get_logger() -> logging.Logger: 

722 logger = logging.getLogger(__name__) 

723 return logger 

724 

725 @staticmethod 

726 def compute_variances( 

727 model: Model, use_diag_only: bool = False, use_svd: bool = False, **kwargs: Any 

728 ) -> np.ndarray: 

729 """Compute model free parameter variances from the inverse Hessian. 

730 

731 Parameters 

732 ---------- 

733 model 

734 The model to compute parameter variances for. 

735 use_diag_only 

736 Whether to use diagonal terms only, i.e. ignore covariance. 

737 use_svd 

738 Whether to use singular value decomposition to compute the inverse 

739 Hessian. 

740 **kwargs 

741 Additional keyword arguments to pass to model.compute_hessian. 

742 

743 Returns 

744 ------- 

745 variances 

746 The free parameter variances. 

747 """ 

748 hessian = model.compute_hessian(**kwargs).data 

749 if use_diag_only: 

750 return -1 / np.diag(hessian) 

751 if use_svd: 

752 u, s, v = np.linalg.svd(-hessian) 

753 inverse = np.dot(v.transpose(), np.dot(np.diag(s**-1), u.transpose())) 

754 else: 

755 inverse = np.linalg.inv(-hessian) 

756 return np.diag(inverse) 

757 

758 @staticmethod 

759 def fit_gaussians_linear( 

760 gaussians_linear: LinearGaussians, 

761 observation: g2f.ObservationD, 

762 psf_model: g2f.PsfModel = None, 

763 fit_methods: dict[str, dict[str, Any]] | None = None, 

764 plot: bool = False, 

765 ) -> dict[str, FitResult]: 

766 """Fit normalizations for a Gaussian mixture model. 

767 

768 Parameters 

769 ---------- 

770 gaussians_linear 

771 The Gaussian components - fixed or otherwise - to fit. 

772 observation 

773 The observation to fit against. 

774 psf_model 

775 A PSF model for the observation, if fitting sources. 

776 fit_methods 

777 A dictionary of fitting methods to employ, keyed by method name, 

778 with a value of a dict of options (kwargs) to pass on. Default 

779 is "scipy.optimize.nnls". 

780 plot 

781 Whether to generate fit residual/diagnostic plots. 

782 

783 Returns 

784 ------- 

785 results 

786 Fit results for each method, keyed by the fit method name. 

787 """ 

788 if psf_model is None: 

789 psf_model = make_psf_model_null() 

790 if fit_methods is None: 

791 fit_methods = {"scipy.optimize.nnls": fit_methods_linear["scipy.optimize.nnls"]} 

792 else: 

793 for fit_method in fit_methods: 

794 if fit_method not in fit_methods_linear: 

795 raise ValueError(f"Unknown linear {fit_method=}") 

796 n_params = len(gaussians_linear.gaussians_free) 

797 if not (n_params > 0): 

798 raise ValueError(f"!({len(gaussians_linear.gaussians_free)=}>0); can't fit with no free params") 

799 image = observation.image 

800 shape = image.shape 

801 coordsys = image.coordsys 

802 

803 mask_inv = observation.mask_inv.data 

804 sigma_inv = observation.sigma_inv.data 

805 bad = ~(sigma_inv > 0) 

806 n_bad = np.sum(bad) 

807 if n_bad > 0: 

808 mask_inv &= ~bad 

809 

810 sigma_inv = sigma_inv[mask_inv] 

811 size = np.sum(mask_inv) 

812 

813 gaussians_psf = psf_model.gaussians(g2f.Channel.NONE) 

814 if len(gaussians_linear.gaussians_fixed) > 0: 

815 image_fixed = make_image_gaussians( 

816 gaussians_source=gaussians_linear.gaussians_fixed, 

817 gaussians_kernel=gaussians_psf, 

818 n_rows=shape[0], 

819 n_cols=shape[1], 

820 ).data 

821 image_fixed = image_fixed[mask_inv] 

822 else: 

823 image_fixed = None 

824 

825 x = np.zeros((size, n_params)) 

826 

827 params = [None] * n_params 

828 for idx_param, (gaussians_free, param) in enumerate(gaussians_linear.gaussians_free): 

829 image_free = make_image_gaussians( 

830 gaussians_source=gaussians_free, 

831 gaussians_kernel=gaussians_psf, 

832 n_rows=shape[0], 

833 n_cols=shape[1], 

834 coordsys=coordsys, 

835 ).data 

836 x[:, idx_param] = ((image_free if mask_inv is None else image_free[mask_inv]) * sigma_inv).flat 

837 params[idx_param] = param 

838 

839 y = observation.image.data 

840 if plot: 

841 import matplotlib.pyplot as plt 

842 

843 plt.imshow(y, origin="lower") 

844 plt.show() 

845 if mask_inv is not None: 

846 y = y[mask_inv] 

847 if image_fixed is not None: 

848 y -= image_fixed 

849 y = (y * sigma_inv).flat 

850 

851 results = {} 

852 

853 for fit_method, kwargs in fit_methods.items(): 

854 kwargs = kwargs if kwargs is not None else fit_methods_linear[fit_method] 

855 if fit_method == "scipy.optimize.nnls": 

856 values = spopt.nnls(x, y)[0] 

857 elif fit_method == "scipy.optimize.lsq_linear": 

858 values = spopt.lsq_linear(x, y, **kwargs).x 

859 elif fit_method == "numpy.linalg.lstsq": 

860 values = np.linalg.lstsq(x, y, **kwargs)[0] 

861 elif fit_method == "fastnnls.fnnls": 

862 y = x.T.dot(y) 

863 x = x.T.dot(x) 

864 values = fnnls(x, y) 

865 else: 

866 raise RuntimeError(f"Unknown linear {fit_method=} not caught earlier (logic error)") 

867 results[fit_method] = values 

868 return results 

869 

870 def fit_model( 

871 self, 

872 model: Model, 

873 fitinputs: FitInputs | None = None, 

874 printout: bool = False, 

875 config: ModelFitConfig | None = None, 

876 **kwargs: Any, 

877 ) -> FitResult: 

878 """Fit a model with a nonlinear optimizer. 

879 

880 Parameters 

881 ---------- 

882 model 

883 The model to fit. 

884 fitinputs 

885 An existing FitInputs with jacobian/residual arrays to reuse. 

886 printout 

887 Whether to print diagnostic information. 

888 config 

889 Configuration settings for model fitting. 

890 **kwargs 

891 Keyword arguments to pass to the optimizer. 

892 

893 Returns 

894 ------- 

895 result 

896 The results from running the fitter. 

897 

898 Notes 

899 ----- 

900 The only supported fitter is scipy.optimize.least_squares. 

901 """ 

902 if config is None: 

903 config = ModelFitConfig() 

904 config.validate() 

905 

906 use_pygmo = config.optimization_library == "pygmo" 

907 model_loglike = g2f.ModelD( 

908 data=model.data, 

909 psfmodels=model.psfmodels, 

910 sources=model.sources, 

911 priors=model.priors, 

912 ) if (use_pygmo or config.eval_residual) else None 

913 

914 if use_pygmo: 

915 model.setup_evaluators(g2f.EvaluatorMode.loglike_grad, force=True) 

916 model_loglike.setup_evaluators(g2f.EvaluatorMode.loglike, force=True) 

917 else: 

918 if fitinputs is None: 

919 fitinputs = FitInputs.from_model(model) 

920 else: 

921 errors = fitinputs.validate_for_model(model) 

922 if errors: 

923 newline = "\n" 

924 raise ValueError(f"fitinputs validation got errors:\n{newline.join(errors)}") 

925 model.setup_evaluators( 

926 evaluatormode=g2f.EvaluatorMode.jacobian, 

927 outputs=fitinputs.jacobians, 

928 residuals=fitinputs.residuals, 

929 outputs_prior=fitinputs.outputs_prior, 

930 residuals_prior=fitinputs.residuals_prior, 

931 print=printout, 

932 force=True, 

933 ) 

934 

935 params_psf_free = [] 

936 for psfmodel in model.psfmodels: 

937 params_psf_free.extend(get_params_uniq(psfmodel, fixed=False)) 

938 if params_psf_free: 

939 params_psf_free = {k: None for k in params_psf_free} 

940 raise ValueError( 

941 f"Model has free PSF model params: {list(params_psf_free.keys())}." 

942 f" All PSF model parameters must be fixed before fitting." 

943 ) 

944 

945 offsets_params = dict(model.offsets_parameters()) 

946 params_offsets = {v: k for (k, v) in offsets_params.items()} 

947 params_free = tuple(params_offsets[idx] for idx in range(1, len(offsets_params) + 1)) 

948 params_free_sorted_all = tuple(get_params_uniq(model, fixed=False)) 

949 params_free_sorted = [] 

950 params_free_sorted_missing = [] 

951 

952 # If we were forced to drop an observation, re-generate the modeller 

953 # Only integral parameters should be missing 

954 for param in params_free_sorted_all: 

955 if param in params_offsets.values(): 

956 params_free_sorted.append(param) 

957 else: 

958 if not isinstance(param, g2f.IntegralParameterD): 

959 raise RuntimeError(f"non-integral {param=} missing from {offsets_params=}") 

960 param.limits = g2f.LimitsD(param.min, param.max) 

961 param.value = param.min 

962 param.fixed = True 

963 params_free_sorted_missing.append(param) 

964 

965 try: 

966 if not use_pygmo: 

967 if params_free_sorted_missing: 

968 fitinputs = FitInputs.from_model(model) 

969 params_free_sorted = tuple(params_free_sorted) 

970 model.setup_evaluators( 

971 evaluatormode=g2f.EvaluatorMode.jacobian, 

972 outputs=fitinputs.jacobians, 

973 residuals=fitinputs.residuals, 

974 outputs_prior=fitinputs.outputs_prior, 

975 residuals_prior=fitinputs.residuals_prior, 

976 print=printout, 

977 force=True, 

978 ) 

979 else: 

980 params_free_sorted = params_free_sorted_all 

981 if config.eval_residual: 

982 model_loglike.setup_evaluators( 

983 evaluatormode=g2f.EvaluatorMode.loglike, 

984 residuals=fitinputs.residuals, 

985 residuals_prior=fitinputs.residuals_prior, 

986 ) 

987 

988 jac = fitinputs.jacobian[:, 1:] 

989 # Assert that this is a view, otherwise this won't work 

990 assert id(jac.base) == id(fitinputs.jacobian) 

991 

992 n_params_free = len(params_free) 

993 bounds = ([None] * n_params_free, [None] * n_params_free) 

994 params_init = [None] * n_params_free 

995 

996 for idx, param in enumerate(params_free): 

997 limits = param.limits 

998 # If the transform has more restrictive limits, use those 

999 if hasattr(param.transform, "limits"): 

1000 limits_transform = param.transform.limits 

1001 n_within = limits.check(limits_transform.min) + limits.check(limits_transform.min) 

1002 if n_within == 2: 

1003 limits = limits_transform 

1004 elif n_within != 0: 

1005 raise ValueError( 

1006 f"{param=} {param.limits=} and {param.transform.limits=}" 

1007 f" intersect; one must be a subset of the other" 

1008 ) 

1009 bounds[0][idx] = param.transform.forward(limits.min) 

1010 bounds[1][idx] = param.transform.forward(limits.max) 

1011 if not limits.check(param.value): 

1012 raise RuntimeError(f"{param=}.value_transformed={param.value} not within {limits=}") 

1013 params_init[idx] = param.value_transformed 

1014 

1015 results = FitResult(inputs=fitinputs, config=config) 

1016 time_init = time.process_time() 

1017 if use_pygmo: 

1018 uda = pg.nlopt("lbfgs") 

1019 uda.ftol_abs = 1e-4 

1020 algo = pg.algorithm(uda) 

1021 

1022 # pygmo seems to make proposals right at the limits 

1023 # parameter limits are currently set as untransformed values 

1024 # and sometimes the proposal exceeds those when transformed 

1025 bounds_lower = tuple(np.nextafter(x, np.inf) for x in bounds[0]) 

1026 bounds_upper = tuple(np.nextafter(x, -np.inf) for x in bounds[1]) 

1027 

1028 udp = PygmoUDP( 

1029 params=params_free, 

1030 model_loglike=model_loglike, 

1031 model_loglike_grad=model, 

1032 bounds_lower=bounds_lower, 

1033 bounds_upper=bounds_upper, 

1034 result=results, 

1035 ) 

1036 

1037 # if the initial value was at one of the bounds, reset it to 

1038 # one percent of the range away from the bound 

1039 for idx, value_init in enumerate(params_init): 

1040 bound_lower = bounds_lower[idx] 

1041 bound_upper = bounds_upper[idx] 

1042 if value_init >= bound_upper: 

1043 params_init[idx] = bound_lower + 0.99*(bound_upper - bound_lower) 

1044 elif value_init <= bound_lower: 

1045 params_init[idx] = bound_lower + 0.01*(bound_upper - bound_lower) 

1046 

1047 problem = pg.problem(udp) 

1048 pop = pg.population(prob=problem, size=0) 

1049 pop.push_back(np.array(params_init)) 

1050 result_opt = algo.evolve(pop) 

1051 x_best = result_opt.champion_x 

1052 results.n_eval_func = pop.problem.get_fevals() 

1053 results.n_eval_jac = pop.problem.get_gevals() 

1054 results.chisq_best = 2*result_opt.champion_f 

1055 else: 

1056 # The initial evaluate will fill in jac for the next line 

1057 # _ll_init is assigned just for convenient debugging 

1058 _ll_init = model.evaluate() # noqa: F841 

1059 x_scale_jac_clipped = np.clip(1.0 / (np.sum(jac**2, axis=0) ** 0.5), 1e-5, 1e19) 

1060 result_opt = spopt.least_squares( 

1061 residual_scipy, 

1062 params_init, 

1063 jac=jacobian_scipy, 

1064 bounds=bounds, 

1065 args=(model, model_loglike, params_free, results, jac), 

1066 x_scale=x_scale_jac_clipped, 

1067 **kwargs, 

1068 ) 

1069 x_best = result_opt.x 

1070 results.n_eval_func = result_opt.nfev 

1071 results.n_eval_jac = result_opt.njev if result_opt.njev else 0 

1072 results.chisq_best = 2*result_opt.cost 

1073 

1074 results.time_run = time.process_time() - time_init 

1075 results.result = result_opt 

1076 if params_free_sorted_missing: 

1077 params_best = [] 

1078 for param in params_free_sorted_all: 

1079 if param in params_free_sorted_missing: 

1080 params_best.append(param.value) 

1081 param.fixed = False 

1082 else: 

1083 params_best.append(x_best[offsets_params[param] - 1]) 

1084 results.params_best = tuple(params_best) 

1085 results.params = params_free_sorted_all 

1086 else: 

1087 results.params_best = tuple( 

1088 x_best[offsets_params[param] - 1] for param in params_free_sorted 

1089 ) 

1090 results.params = params_free_sorted 

1091 results.params_free_missing = tuple(params_free_sorted_missing) 

1092 except Exception as e: 

1093 # Any missing params we fixed must be set free again 

1094 for param in params_free_sorted_missing: 

1095 param.fixed = False 

1096 raise e 

1097 

1098 return results 

1099 

1100 # TODO: change to staticmethod if requiring py3.10+ 

1101 @classmethod 

1102 def fit_model_linear( 

1103 cls, 

1104 model: Model, 

1105 idx_obs: int | Sequence[int] | None = None, 

1106 ratio_min: float = 0, 

1107 validate: bool = False, 

1108 limits_interval_min: float = 0.01, 

1109 limits_interval_max: float = 1.0, 

1110 ) -> tuple[np.ndarray, np.ndarray]: 

1111 """Fit a model's linear parameters (integrals). 

1112 

1113 Parameters 

1114 ---------- 

1115 model 

1116 The model to fit parameters for. 

1117 idx_obs 

1118 An index or sequence of indices of observations to fit. 

1119 The default is to fit all observations. 

1120 ratio_min 

1121 The minimum ratio of the previous value to set any parameter to. 

1122 This can prevent setting parameters to zero. 

1123 validate 

1124 If True, check that the model log-likelihood improves and restore 

1125 the original parameter values if not. 

1126 limits_interval_min 

1127 A value 0<=x<limits_interval_max<=1 specifying the lower bound to 

1128 clip parameter values to, as a ratio of each parameter's limits. 

1129 limits_interval_max 

1130 A value 0<=limits_interval_min<x<=1 specifying the upper bound to 

1131 clip parameter values to, as a ratio of each parameter's limits. 

1132 

1133 Returns 

1134 ------- 

1135 loglike_init 

1136 The initial log likelihood if validate is True, otherwise None. 

1137 loglike_final 

1138 The post-fit log likelihood if validate is True, otherwise None. 

1139 

1140 Notes 

1141 ----- 

1142 The purpose of limits_interval is to slightly offset parameters from 

1143 the extrema of their limits. This is typically most useful for 

1144 integral parameters with a minimum of zero, which might otherwise be 

1145 stuck at zero in a subsequent nonlinear fit. 

1146 """ 

1147 if ( 

1148 not (0 <= limits_interval_min <= 1) 

1149 or not (0 <= limits_interval_max <= 1) 

1150 or not (limits_interval_min < limits_interval_max) 

1151 ): 

1152 raise ValueError(f"Must have 0 <= {limits_interval_min} < {limits_interval_max} <= 1") 

1153 n_data = len(model.data) 

1154 n_sources = len(model.sources) 

1155 if n_sources != 1: 

1156 raise ValueError("fit_model_linear does not yet support models with >1 sources") 

1157 if idx_obs is not None: 

1158 if isinstance(idx_obs, int): 

1159 if not ((idx_obs >= 0) and (idx_obs < n_data)): 

1160 raise ValueError(f"{idx_obs=} not >=0 and < {len(model.data)=}") 

1161 indices = range(idx_obs, idx_obs + 1) 

1162 else: 

1163 if len(set(idx_obs)) != len(idx_obs): 

1164 raise ValueError(f"{idx_obs=} has duplicate values") 

1165 indices = tuple(idx_obs) 

1166 if not all(((idx_obs >= 0) and (idx_obs < n_data) for idx_obs in indices)): 

1167 raise ValueError(f"idx_obs={indices} has values not >=0 and < {len(model.data)=}") 

1168 else: 

1169 indices = range(n_data) 

1170 

1171 if validate: 

1172 model.setup_evaluators(evaluatormode=g2f.EvaluatorMode.loglike) 

1173 loglike_init = model.evaluate() 

1174 else: 

1175 loglike_init = None 

1176 values_init = {} 

1177 values_new = {} 

1178 

1179 for idx_obs in indices: 

1180 obs = model.data[idx_obs] 

1181 gaussians_linear = LinearGaussians.make(model.sources[0], channel=obs.channel) 

1182 result = cls.fit_gaussians_linear(gaussians_linear, obs, psf_model=model.psfmodels[idx_obs]) 

1183 values = list(result.values())[0] 

1184 

1185 for (_, parameter), ratio in zip(gaussians_linear.gaussians_free, values): 

1186 values_init[parameter] = float(parameter.value) 

1187 if not (ratio >= ratio_min): 

1188 ratio = ratio_min 

1189 value_new = max(ratio * parameter.value, parameter.limits.min) 

1190 values_new[parameter] = value_new 

1191 

1192 for parameter, value in values_new.items(): 

1193 value_min, value_max = parameter.limits.min, parameter.limits.max 

1194 min_is_inf = value_min == -np.inf 

1195 max_is_inf = value_max == np.inf 

1196 if min_is_inf: 

1197 if max_is_inf: 

1198 parameter.value = value 

1199 continue 

1200 if not value < value_max: 

1201 value = value_max - 1e-5 

1202 elif max_is_inf: 

1203 if not value > value_min: 

1204 value = value_min + 1e-5 

1205 else: 

1206 limits_interval = parameter.limits.max - parameter.limits.min 

1207 value = np.clip( 

1208 value, 

1209 value_min + limits_interval_min * limits_interval, 

1210 value_min + limits_interval_max * limits_interval, 

1211 ) 

1212 parameter.value = value 

1213 

1214 if validate: 

1215 loglike_new = model.evaluate() 

1216 if not (sum(loglike_new) > sum(loglike_init)): 

1217 for parameter, value in values_init.items(): 

1218 parameter.value = value 

1219 else: 

1220 loglike_new = None 

1221 return loglike_init, loglike_new 

1222 

1223 @staticmethod 

1224 def make_components_linear( 

1225 component_mixture: g2f.ComponentMixture, 

1226 ) -> list[g2f.GaussianComponent]: 

1227 """Make a list of fixed Gaussian components from a ComponentMixture. 

1228 

1229 Parameters 

1230 ---------- 

1231 component_mixture 

1232 A component mixture to create a component list for. 

1233 

1234 Returns 

1235 ------- 

1236 gaussians 

1237 A list of Gaussians components with fixed parameters and values 

1238 matching those in the original component mixture. 

1239 """ 

1240 components = component_mixture.components 

1241 if len(components) == 0: 

1242 raise ValueError(f"Can't get linear Source from {component_mixture=} with no components") 

1243 components_new = [None] * len(components) 

1244 for idx, component in enumerate(components): 

1245 gaussians = component.gaussians(g2f.Channel.NONE) 

1246 # TODO: Support multi-Gaussian components if sensible 

1247 # The challenge would be in mapping linear param values back onto 

1248 # non-linear IntegralModels 

1249 n_g = len(gaussians) 

1250 if not n_g == 1: 

1251 raise ValueError(f"{component=} has {gaussians=} of len {n_g=}!=1") 

1252 gaussian = gaussians.at(0) 

1253 component_new = g2f.GaussianComponent( 

1254 g2f.GaussianParametricEllipse( 

1255 g2f.SigmaXParameterD(gaussian.ellipse.sigma_x, fixed=True), 

1256 g2f.SigmaYParameterD(gaussian.ellipse.sigma_y, fixed=True), 

1257 g2f.RhoParameterD(gaussian.ellipse.rho, fixed=True), 

1258 ), 

1259 g2f.CentroidParameters( 

1260 g2f.CentroidXParameterD(gaussian.centroid.x, fixed=True), 

1261 g2f.CentroidYParameterD(gaussian.centroid.y, fixed=True), 

1262 ), 

1263 g2f.LinearIntegralModel( 

1264 [ 

1265 (g2f.Channel.NONE, g2f.IntegralParameterD(gaussian.integral.value)), 

1266 ] 

1267 ), 

1268 ) 

1269 components_new[idx] = component_new 

1270 return components_new