Coverage for python / lsst / cp / pipe / cpElectrostaticBF.py: 16%

165 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-05 18:52 +0000

1# This file is part of cp_pipe. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21# 

22"""Computation of electrostatic solution of brighter-fatter effect impact 

23on pixel distortions""" 

24 

25__all__ = ['ElectrostaticBrighterFatterSolveTask', 

26 'ElectrostaticBrighterFatterSolveConfig'] 

27 

28import numpy as np 

29import lsst.pex.config as pexConfig 

30import lsst.pipe.base as pipeBase 

31import lsst.pipe.base.connectionTypes as cT 

32 

33from .utils import ( 

34 extractCalibDate, 

35 ElectrostaticFit, 

36) 

37from .cpLinearitySolve import ptcLookup 

38from lsst.ip.isr.isrFunctions import symmetrize 

39from lsst.ip.isr import ElectrostaticBrighterFatterDistortionMatrix 

40from lmfit import Parameters, report_fit 

41 

42 

43class ElectrostaticBrighterFatterSolveConnections(pipeBase.PipelineTaskConnections, 

44 dimensions=("instrument", "detector")): 

45 dummy = cT.Input( 

46 name="raw", 

47 doc="Dummy exposure.", 

48 storageClass='Exposure', 

49 dimensions=("instrument", "exposure", "detector"), 

50 multiple=True, 

51 deferLoad=True, 

52 ) 

53 camera = cT.PrerequisiteInput( 

54 name="camera", 

55 doc="Camera associated with this data.", 

56 storageClass="Camera", 

57 dimensions=("instrument", ), 

58 isCalibration=True, 

59 ) 

60 inputPtc = cT.PrerequisiteInput( 

61 name="ptc", 

62 doc="Photon transfer curve dataset.", 

63 storageClass="IsrCalib", 

64 dimensions=("instrument", "detector"), 

65 isCalibration=True, 

66 lookupFunction=ptcLookup, 

67 ) 

68 inputBfPtc = cT.Input( 

69 name="bfPtc", 

70 doc="Input BF PTC dataset.", 

71 storageClass="IsrCalib", 

72 dimensions=("instrument", "detector"), 

73 isCalibration=True, 

74 ) 

75 

76 output = cT.Output( 

77 name="electroBfDistortionMatrix", 

78 doc="Output measured brighter-fatter electrostatic model.", 

79 storageClass="IsrCalib", 

80 dimensions=("instrument", "detector"), 

81 isCalibration=True, 

82 ) 

83 

84 def __init__(self, *, config=None): 

85 if config.useBfPtc: 

86 del self.inputPtc 

87 del self.dummy 

88 else: 

89 del self.inputBfPtc 

90 

91 

92class ElectrostaticBrighterFatterSolveConfig(pipeBase.PipelineTaskConfig, 

93 pipelineConnections=ElectrostaticBrighterFatterSolveConnections): 

94 

95 useBfPtc = pexConfig.Field( 

96 dtype=bool, 

97 doc="Use a BF ptc in a single pipeline?", 

98 default=False, 

99 ) 

100 fitRange = pexConfig.Field( 

101 dtype=int, 

102 doc="Maximum pixel range to compute the electrostatic fit.", 

103 default=8, 

104 ) 

105 fitMethod = pexConfig.Field( 

106 dtype=str, 

107 doc="Minimization technique to fit the electrostatic solution. " 

108 "Should be one of the available fitting methods in " 

109 "`lmfit.minimizer.Minimizer.minimize`. For list of all possible " 

110 "methods see the documentation.", 

111 default="leastsq", 

112 ) 

113 doNormalizeElectrostaticModel = pexConfig.Field( 

114 dtype=bool, 

115 doc="Do you want apply a final normalization to the modeled " 

116 "aMatrix?", 

117 default=False, 

118 ) 

119 doFitNormalizationOffset = pexConfig.Field( 

120 dtype=bool, 

121 doc="Do you want to fit an offset to the a matrix? This caused " 

122 "by long range correlations in the data. Only used if " 

123 "doNormalizeElectrostaticModel.", 

124 default=True, 

125 ) 

126 nImageChargePairs = pexConfig.Field( 

127 dtype=int, 

128 doc="Number of image charge pairs to use when computing " 

129 "Gauss's law. The larger number, the better, and an " 

130 "odd number is preferred.", 

131 default=11, 

132 ) 

133 initialParametersDict = pexConfig.DictField( 

134 keytype=str, 

135 itemtype=float, 

136 doc="Initial fit parameters, should contain `thickness`, " 

137 "`pixelsize`, `zq`, zsh`, `zsv`, `a`, `b`, `alpha`, " 

138 " and `beta`. See the class docstring for descriptions " 

139 " and units of each parameter.", 

140 default={ 

141 'thickness': 100.0, 

142 'pixelsize': 10.0, 

143 'zq': 1.0, 

144 'zsh': 2.0, 

145 'zsv': 3.0, 

146 'a': 2.0, 

147 'b': 2.0, 

148 'alpha': 1.0, 

149 'beta': 0.0, 

150 }, 

151 ) 

152 parametersToVary = pexConfig.DictField( 

153 keytype=str, 

154 itemtype=bool, 

155 doc="Dictionary of parameters and booleans which will configure " 

156 "if the parameter is allowed to vary in the fit, should contain " 

157 "`thickness`,`pixelsize`, `zq`, zsh`, `zsv`, `a`, `b`, `alpha`, " 

158 "and `beta`. If False, the parameter will be fixed to the initial " 

159 "value set in initialParameterDict. See the class docstring for " 

160 "descriptions and units of each parameter.", 

161 default={ 

162 'thickness': False, 

163 'pixelsize': False, 

164 'zq': True, 

165 'zsh': True, 

166 'zsv': True, 

167 'a': True, 

168 'b': True, 

169 'alpha': True, 

170 'beta': True, 

171 }, 

172 ) 

173 

174 

175class ElectrostaticBrighterFatterSolveTask(pipeBase.PipelineTask): 

176 """Find the complete electrostatic solution to the given PTC. 

177 """ 

178 

179 ConfigClass = ElectrostaticBrighterFatterSolveConfig 

180 _DefaultName = 'cpElectrostaticBfSolve' 

181 

182 def runQuantum(self, butlerQC, inputRefs, outputRefs): 

183 """Ensure that the input and output dimensions are passed along. 

184 

185 Parameters 

186 ---------- 

187 butlerQC : `lsst.daf.butler.QuantumContext` 

188 Butler to operate on. 

189 inputRefs : `lsst.pipe.base.InputQuantizedConnection` 

190 Input data refs to load. 

191 ouptutRefs : `lsst.pipe.base.OutputQuantizedConnection` 

192 Output data refs to persist. 

193 """ 

194 inputs = butlerQC.get(inputRefs) 

195 

196 # Use the dimensions to set 

197 # electroBfDistortionMatrix/provenance 

198 # information. 

199 if self.config.useBfPtc: 

200 inputs["inputDims"] = dict(inputRefs.inputBfPtc.dataId.required) 

201 inputs["inputPtc"] = inputs["inputBfPtc"] 

202 inputs["dummy"] = [] 

203 del inputs["inputBfPtc"] 

204 else: 

205 inputs["inputDims"] = dict(inputRefs.inputPtc.dataId.required) 

206 

207 # Add calibration provenance info to header. 

208 kwargs = dict() 

209 reference = getattr(inputRefs, "inputPtc", None) 

210 

211 if reference is not None and hasattr(reference, "run"): 

212 runKey = "PTC_RUN" 

213 runValue = reference.run 

214 idKey = "PTC_UUID" 

215 idValue = str(reference.id) 

216 dateKey = "PTC_DATE" 

217 calib = inputs.get("inputPtc", None) 

218 dateValue = extractCalibDate(calib) 

219 

220 kwargs[runKey] = runValue 

221 kwargs[idKey] = idValue 

222 kwargs[dateKey] = dateValue 

223 

224 self.log.info("Using " + str(reference.run)) 

225 

226 outputs = self.run(**inputs) 

227 outputs.output.updateMetadata(setDate=False, **kwargs) 

228 

229 butlerQC.put(outputs, outputRefs) 

230 

231 def run(self, inputPtc, dummy, camera, inputDims): 

232 """Fit the PTC A MATRIX into a vectorized a matrix form 

233 based on a complete electrostatic solution. 

234 

235 Parameters 

236 ---------- 

237 inputPtc : `lsst.ip.isr.PhotonTransferCurveDataset` 

238 PTC data containing per-amplifier covariance measurements. 

239 dummy : `lsst.afw.image.Exposure` 

240 The exposure used to select the appropriate PTC dataset. 

241 In almost all circumstances, one of the input exposures 

242 used to generate the PTC dataset is the best option. 

243 camera : `lsst.afw.cameraGeom.Camera` 

244 Camera to use for camera geometry information. 

245 inputDims : `lsst.daf.butler.DataCoordinate` or `dict` 

246 DataIds to use to populate the output calibration. 

247 

248 Returns 

249 ------- 

250 results : `lsst.pipe.base.Struct` 

251 The resulst struct containing: 

252 

253 ``output`` 

254 Resulting Brighter-Fatter electrostatic solution 

255 (`lsst.ip.isr.ElectrostaticBrighterFatterDistortionMatrix`). 

256 """ 

257 detector = camera[inputDims['detector']] 

258 

259 inputRange = int(inputPtc.covMatrixSideFullCovFit) 

260 fitRange = int(self.config.fitRange) 

261 

262 if not inputPtc.ptcFitType.startswith("FULLCOVARIANCE"): 

263 raise ValueError( 

264 "ptcFitType must be FULLCOVARIANCE* to solve for electrostatic solution." 

265 ) 

266 if fitRange > inputRange: 

267 raise ValueError( 

268 "Cannot compute the electrostatic solution if " 

269 "int(inputPtc.covMatrixSide) < fitRange." 

270 ) 

271 

272 # Initialize the output calibration 

273 electroBfDistortionMatrix = ElectrostaticBrighterFatterDistortionMatrix( 

274 camera=camera, 

275 detectorId=detector.getId(), 

276 inputRange=inputRange, 

277 fitRange=fitRange, 

278 ) 

279 

280 # Inherit data + metadata 

281 electroBfDistortionMatrix.updateMetadataFromExposures([inputPtc]) 

282 

283 badAmps = inputPtc.badAmps 

284 electroBfDistortionMatrix.badAmps = badAmps 

285 electroBfDistortionMatrix.gain = inputPtc.gain 

286 

287 aMatrixDict = inputPtc.aMatrix 

288 aMatrixList = np.array([m for _, m in aMatrixDict.items() if _ not in badAmps]) 

289 

290 nGoodAmps = len(detector.getAmplifiers()) - len(badAmps) 

291 if nGoodAmps == 0: 

292 self.log.warning("The entire detector is bad and cannot generate a " 

293 "detector solution.") 

294 return pipeBase.Struct( 

295 output=electroBfDistortionMatrix, 

296 ) 

297 elif nGoodAmps < 2: 

298 # If the general uncertainty is one, the measurement 

299 # uncertainties along the axes are sqrt(2), and sqrt(8) 

300 # in (0,0) (because the slope of C00 is fitted). 

301 # 

302 # This sets variances at (1, 2, 8) for the three groups. 

303 # Then the number of replicas (when going to 4 quadrants) 

304 # are (4, 2, 1) for the same three groups. 

305 # 

306 # The effective variances are then in the ratios (1/4, 1, 8) 

307 # or (1, 4, 32). 

308 self.log.warning("Not enough good amplifiers in this detector " 

309 "to confidently solve. Setting aMatrixSigma " 

310 "to default.") 

311 aMatrix = np.mean(aMatrixList, axis=0) 

312 aMatrixSigma = np.ones_like(aMatrix, dtype=np.float64) 

313 aMatrixSigma[0, :] = 2.0 

314 aMatrixSigma[:, 0] = 2.0 

315 aMatrixSigma[0, 0] = np.sqrt(32) 

316 

317 else: 

318 # Do a quick sigma-clipped mean 

319 errors = aMatrixList - np.mean(aMatrixList, axis=0) 

320 sigmaErrors = errors / np.std(aMatrixList, axis=0) 

321 badValues = np.argwhere(np.abs(sigmaErrors) > 3) 

322 aMatrixList[badValues] = np.nan 

323 aMatrix = np.nanmean(aMatrixList, axis=0) 

324 aMatrixSigma = np.nanstd(aMatrixList, axis=0) 

325 

326 # Ensure we have numpy arrays in 64-bit float precision 

327 aMatrix = np.asarray(aMatrix, dtype=np.float64) 

328 aMatrixSigma = np.asarray(aMatrixSigma, dtype=np.float64) 

329 

330 # Set initial parameters using config 

331 thickness = np.float64(self.config.initialParametersDict['thickness']) 

332 pixelsize = np.float64(self.config.initialParametersDict['pixelsize']) 

333 zq = np.float64(self.config.initialParametersDict['zq']) 

334 zsh = np.float64(self.config.initialParametersDict['zsh']) 

335 zsv = np.float64(self.config.initialParametersDict['zsv']) 

336 a = np.float64(self.config.initialParametersDict['a']) 

337 b = np.float64(self.config.initialParametersDict['b']) 

338 alpha = np.float64(self.config.initialParametersDict['alpha']) 

339 beta = np.float64(self.config.initialParametersDict['beta']) 

340 

341 initialParams = Parameters() 

342 initialParams.add( 

343 "thickness", 

344 value=thickness, 

345 min=0, 

346 max=1.25*thickness, 

347 vary=self.config.parametersToVary["thickness"], 

348 ) 

349 initialParams.add( 

350 "pixelsize", 

351 value=pixelsize, 

352 min=0.5*np.abs(pixelsize), 

353 max=1.5*np.abs(pixelsize), 

354 vary=self.config.parametersToVary["pixelsize"], 

355 ) 

356 initialParams.add( 

357 "zq", 

358 value=zq, 

359 vary=self.config.parametersToVary["zq"], 

360 min=0.0, 

361 max=0.5*thickness, 

362 ) 

363 # These nuisance parameters ensure that 

364 # (zsh > zq) & (zsv > zq) 

365 initialParams.add( 

366 "zsh_minus_zq", 

367 value=zsh - zq, 

368 vary=self.config.parametersToVary["zsh"], 

369 min=1.0e-12, 

370 max=0.1*thickness, 

371 ) 

372 initialParams.add( 

373 "zsh", 

374 vary=self.config.parametersToVary["zsh"], 

375 min=0.0, 

376 max=0.5*thickness, 

377 expr="zq + zsh_minus_zq" if self.config.parametersToVary["zsh"] else f"{zsh}", 

378 ) 

379 initialParams.add( 

380 "zsv_minus_zq", 

381 value=zsv - zq, 

382 vary=self.config.parametersToVary["zsv"], 

383 min=1.0e-12, 

384 max=0.1*thickness, 

385 ) 

386 initialParams.add( 

387 "zsv", 

388 vary=self.config.parametersToVary["zsv"], 

389 min=0.0, 

390 max=0.5*thickness, 

391 expr="zq + zsv_minus_zq" if self.config.parametersToVary["zsv"] else f"{zsv}", 

392 ) 

393 initialParams.add( 

394 "a", 

395 value=a, 

396 vary=self.config.parametersToVary["a"], 

397 min=1.0e-5, 

398 max=0.35*pixelsize, 

399 ) 

400 initialParams.add( 

401 "b", 

402 value=b, 

403 vary=self.config.parametersToVary["b"], 

404 min=1.0e-5, 

405 max=0.35*pixelsize, 

406 ) 

407 initialParams.add( 

408 "alpha", 

409 value=alpha, 

410 vary=self.config.parametersToVary["alpha"], 

411 min=-10.0, 

412 max=10.0, 

413 ) 

414 initialParams.add( 

415 "beta", 

416 value=beta, 

417 vary=self.config.parametersToVary["beta"], 

418 min=-10.0, 

419 max=10.0, 

420 ) 

421 

422 # Compute the electrostatic fit 

423 electrostaticFit = ElectrostaticFit( 

424 initialParams=initialParams, 

425 fitMethod=self.config.fitMethod, 

426 aMatrix=aMatrix, 

427 aMatrixSigma=aMatrixSigma, 

428 fitRange=fitRange, 

429 doFitNormalizationOffset=self.config.doFitNormalizationOffset, 

430 nImageChargePairs=self.config.nImageChargePairs, 

431 ) 

432 

433 # Do the fit 

434 result = electrostaticFit.fit() 

435 

436 # Check if fit was successful 

437 if not result.success: 

438 raise RuntimeError(f"Fit was not successful: {result.message}") 

439 

440 # Save the fit 

441 finalParams = result.params 

442 finalParamsDict = finalParams.valuesdict() 

443 

444 # No longer need these nusiance variables 

445 if 'zsh_minus_zq' in finalParamsDict: 

446 del finalParamsDict['zsh_minus_zq'] 

447 if 'zsv_minus_zq' in finalParamsDict: 

448 del finalParamsDict['zsv_minus_zq'] 

449 

450 fitParamNames = list(finalParamsDict.keys()) 

451 freeFitParamNames = result.var_names 

452 electroBfDistortionMatrix.fitParamNames = fitParamNames 

453 electroBfDistortionMatrix.freeFitParamNames = freeFitParamNames 

454 electroBfDistortionMatrix.fitParams = finalParamsDict 

455 fitParamErrors = dict() 

456 for fitParamName in fitParamNames: 

457 if fitParamName in freeFitParamNames: 

458 fitParamErrors[fitParamName] = finalParams[fitParamName].stderr 

459 else: 

460 fitParamErrors[fitParamName] = 0.0 

461 electroBfDistortionMatrix.fitParamErrors = fitParamErrors 

462 

463 electroBfDistortionMatrix.fitChi2 = result.chisqr 

464 electroBfDistortionMatrix.fitReducedChi2 = result.redchi 

465 electroBfDistortionMatrix.fitParamCovMatrix = result.covar 

466 

467 # Compute the final model 

468 aMatrixModel = electrostaticFit.model(result.params) 

469 

470 # Optional: 

471 # Perform the final model normalization 

472 modelNormalization = [1.0, 0.0] 

473 if self.config.doNormalizeElectrostaticModel: 

474 m, o = electrostaticFit.normalizeModel(aMatrixModel) 

475 modelNormalization = [m, o] 

476 aMatrixModel = m*aMatrixModel + o 

477 self.log.info( 

478 "Normalization (factor, offset) for amp %s: (%.3f, %.3f)", m, o 

479 ) 

480 

481 # Save the original data and the final model. 

482 electroBfDistortionMatrix.aMatrix = aMatrix 

483 electroBfDistortionMatrix.aMatrixSigma = aMatrixSigma 

484 electroBfDistortionMatrix.aMatrixModel = aMatrixModel 

485 electroBfDistortionMatrix.aMatrixSum = symmetrize(aMatrix).sum() 

486 electroBfDistortionMatrix.aMatrixModelSum = symmetrize(aMatrixModel).sum() 

487 electroBfDistortionMatrix.modelNormalization = modelNormalization 

488 

489 # Fit result information 

490 self.log.info(report_fit(result)) 

491 

492 # TODO: add conversion depth probability distribution 

493 # for each band/wavelength. Compute boundary shifts 

494 # for a single electron 

495 pd = electrostaticFit.computePixelDistortions(conversionWeights=None) 

496 

497 aN, aS, aE, aW = (pd.aN, pd.aS, pd.aE, pd.aW) 

498 ath = pd.ath 

499 athMinusBeta = pd.athMinusBeta 

500 usedPixels = np.zeros_like(aN, dtype=bool) 

501 usedPixels[:fitRange, :fitRange] = True 

502 

503 electroBfDistortionMatrix.usedPixels = usedPixels 

504 electroBfDistortionMatrix.ath = ath 

505 electroBfDistortionMatrix.athMinusBeta = athMinusBeta 

506 electroBfDistortionMatrix.aN = aN 

507 electroBfDistortionMatrix.aS = aS 

508 electroBfDistortionMatrix.aE = aE 

509 electroBfDistortionMatrix.aW = aW 

510 

511 # Optional: Check for validity 

512 # if self.config.doCheckValidity: 

513 # # Todo: 

514 # pass 

515 

516 return pipeBase.Struct( 

517 output=electroBfDistortionMatrix, 

518 )