Coverage for python / lsst / cp / pipe / utils.py: 7%
1181 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 09:35 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 09:35 +0000
1# This file is part of cp_pipe.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
21#
23__all__ = [
24 "ddict2dict",
25 "CovFastFourierTransform",
26 "getReadNoise",
27 "ampOffsetGainRatioFixup",
28 "ElectrostaticFit",
29 "BoundaryShifts",
30 "ElectrostaticCcdGeom",
31]
33from astropy.table import Table
34import copy
35import galsim
36import logging
37import numpy as np
38import itertools
39import numpy.polynomial.polynomial as poly
40import warnings
42from scipy.interpolate import Akima1DInterpolator
43from scipy.optimize import leastsq, minimize
44from scipy.stats import median_abs_deviation, norm
45from lmfit import Minimizer
47from lsst.ip.isr import isrMock
48import lsst.afw.cameraGeom
49import lsst.afw.image
50import lsst.afw.math
52from numpy.polynomial.legendre import leggauss
55def sigmaClipCorrection(nSigClip):
56 """Correct measured sigma to account for clipping.
58 If we clip our input data and then measure sigma, then the
59 measured sigma is smaller than the true value because real
60 points beyond the clip threshold have been removed. This is a
61 small (1.5% at nSigClip=3) effect when nSigClip >~ 3, but the
62 default parameters for measure crosstalk use nSigClip=2.0.
63 This causes the measured sigma to be about 15% smaller than
64 real. This formula corrects the issue, for the symmetric case
65 (upper clip threshold equal to lower clip threshold).
67 Parameters
68 ----------
69 nSigClip : `float`
70 Number of sigma the measurement was clipped by.
72 Returns
73 -------
74 scaleFactor : `float`
75 Scale factor to increase the measured sigma by.
76 """
77 varFactor = 1.0 - (2 * nSigClip * norm.pdf(nSigClip)) / (norm.cdf(nSigClip) - norm.cdf(-nSigClip))
78 return 1.0 / np.sqrt(varFactor)
81def calculateWeightedReducedChi2(measured, model, weightsMeasured, nData, nParsModel):
82 """Calculate weighted reduced chi2.
84 Parameters
85 ----------
86 measured : `list`
87 List with measured data.
88 model : `list`
89 List with modeled data.
90 weightsMeasured : `list`
91 List with weights for the measured data.
92 nData : `int`
93 Number of data points.
94 nParsModel : `int`
95 Number of parameters in the model.
97 Returns
98 -------
99 redWeightedChi2 : `float`
100 Reduced weighted chi2.
101 """
102 wRes = (measured - model)*weightsMeasured
103 return ((wRes*wRes).sum())/(nData-nParsModel)
106def makeMockFlats(expTime, gain=1.0, readNoiseElectrons=5, fluxElectrons=1000,
107 randomSeedFlat1=1984, randomSeedFlat2=666, powerLawBfParams=[],
108 expId1=0, expId2=1, ampNames=[]):
109 """Create a pair or mock flats with isrMock.
111 Parameters
112 ----------
113 expTime : `float`
114 Exposure time of the flats.
115 gain : `float`, optional
116 Gain, in e/ADU.
117 readNoiseElectrons : `float`, optional
118 Read noise rms, in electrons.
119 fluxElectrons : `float`, optional
120 Flux of flats, in electrons per second.
121 randomSeedFlat1 : `int`, optional
122 Random seed for the normal distrubutions for the mean signal
123 and noise (flat1).
124 randomSeedFlat2 : `int`, optional
125 Random seed for the normal distrubutions for the mean signal
126 and noise (flat2).
127 powerLawBfParams : `list`, optional
128 Parameters for `galsim.cdmodel.PowerLawCD` to simulate the
129 brightter-fatter effect.
130 expId1 : `int`, optional
131 Exposure ID for first flat.
132 expId2 : `int`, optional
133 Exposure ID for second flat.
134 ampNames : `list` [`str`], optional
135 Names of amplifiers for filling in header information.
137 Returns
138 -------
139 flatExp1 : `lsst.afw.image.exposure.ExposureF`
140 First exposure of flat field pair.
141 flatExp2 : `lsst.afw.image.exposure.ExposureF`
142 Second exposure of flat field pair.
144 Notes
145 -----
146 The parameters of `galsim.cdmodel.PowerLawCD` are `n, r0, t0, rx,
147 tx, r, t, alpha`. For more information about their meaning, see
148 the Galsim documentation
149 https://galsim-developers.github.io/GalSim/_build/html/_modules/galsim/cdmodel.html # noqa: W505
150 and Gruen+15 (1501.02802).
152 Example: galsim.cdmodel.PowerLawCD(8, 1.1e-7, 1.1e-7, 1.0e-8,
153 1.0e-8, 1.0e-9, 1.0e-9, 2.0)
154 """
155 flatFlux = fluxElectrons # e/s
156 flatMean = flatFlux*expTime # e
157 readNoise = readNoiseElectrons # e
159 mockImageConfig = isrMock.IsrMock.ConfigClass()
161 mockImageConfig.flatDrop = 0.99999
162 mockImageConfig.isTrimmed = True
164 flatExp1 = isrMock.FlatMock(config=mockImageConfig).run()
165 flatExp2 = flatExp1.clone()
166 (shapeY, shapeX) = flatExp1.getDimensions()
167 flatWidth = np.sqrt(flatMean)
169 rng1 = np.random.RandomState(randomSeedFlat1)
170 flatData1 = rng1.normal(flatMean, flatWidth, (shapeX, shapeY)) + rng1.normal(0.0, readNoise,
171 (shapeX, shapeY))
172 rng2 = np.random.RandomState(randomSeedFlat2)
173 flatData2 = rng2.normal(flatMean, flatWidth, (shapeX, shapeY)) + rng2.normal(0.0, readNoise,
174 (shapeX, shapeY))
175 # Simulate BF with power law model in galsim
176 if len(powerLawBfParams):
177 if not len(powerLawBfParams) == 8:
178 raise RuntimeError("Wrong number of parameters for `galsim.cdmodel.PowerLawCD`. "
179 f"Expected 8; passed {len(powerLawBfParams)}.")
180 cd = galsim.cdmodel.PowerLawCD(*powerLawBfParams)
181 tempFlatData1 = galsim.Image(flatData1)
182 temp2FlatData1 = cd.applyForward(tempFlatData1)
184 tempFlatData2 = galsim.Image(flatData2)
185 temp2FlatData2 = cd.applyForward(tempFlatData2)
187 flatExp1.image.array[:] = temp2FlatData1.array/gain # ADU
188 flatExp2.image.array[:] = temp2FlatData2.array/gain # ADU
189 else:
190 flatExp1.image.array[:] = flatData1/gain # ADU
191 flatExp2.image.array[:] = flatData2/gain # ADU
193 visitInfoExp1 = lsst.afw.image.VisitInfo(exposureTime=expTime)
194 visitInfoExp2 = lsst.afw.image.VisitInfo(exposureTime=expTime)
196 flatExp1.info.id = expId1
197 flatExp1.getInfo().setVisitInfo(visitInfoExp1)
198 flatExp2.info.id = expId2
199 flatExp2.getInfo().setVisitInfo(visitInfoExp2)
201 # Set ISR metadata.
202 flatExp1.metadata["LSST ISR UNITS"] = "adu"
203 flatExp2.metadata["LSST ISR UNITS"] = "adu"
204 for ampName in ampNames:
205 key = f"LSST ISR OVERSCAN RESIDUAL SERIAL STDEV {ampName}"
206 value = readNoiseElectrons / gain
208 flatExp1.metadata[key] = value
209 flatExp2.metadata[key] = value
211 key = f"LSST ISR OVERSCAN SERIAL MEDIAN {ampName}"
212 flatExp1.metadata[key] = 25000.0 # adu
213 flatExp2.metadata[key] = 25000.0 # adu
215 key = f"LSST ISR AMPOFFSET PEDESTAL {ampName}"
216 value = 0.0
218 flatExp1.metadata[key] = value
219 flatExp2.metadata[key] = value
221 return flatExp1, flatExp2
224def irlsFit(initialParams, dataX, dataY, function, weightsY=None, weightType='Cauchy', scaleResidual=True):
225 """Iteratively reweighted least squares fit.
227 This uses the `lsst.cp.pipe.utils.fitLeastSq`, but applies weights
228 based on the Cauchy distribution by default. Other weight options
229 are implemented. See e.g. Holland and Welsch, 1977,
230 doi:10.1080/03610927708827533
232 Parameters
233 ----------
234 initialParams : `list` [`float`]
235 Starting parameters.
236 dataX : `numpy.array`, (N,)
237 Abscissa data.
238 dataY : `numpy.array`, (N,)
239 Ordinate data.
240 function : callable
241 Function to fit.
242 weightsY : `numpy.array`, (N,)
243 Weights to apply to the data.
244 weightType : `str`, optional
245 Type of weighting to use. One of Cauchy, Anderson, bisquare,
246 box, Welsch, Huber, logistic, or Fair.
247 scaleResidual : `bool`, optional
248 If true, the residual is scaled by the sqrt of the Y values.
250 Returns
251 -------
252 polyFit : `list` [`float`]
253 Final best fit parameters.
254 polyFitErr : `list` [`float`]
255 Final errors on fit parameters.
256 chiSq : `float`
257 Reduced chi squared.
258 weightsY : `list` [`float`]
259 Final weights used for each point.
261 Raises
262 ------
263 RuntimeError :
264 Raised if an unknown weightType string is passed.
265 """
266 if not weightsY:
267 weightsY = np.ones_like(dataX)
269 polyFit, polyFitErr, chiSq = fitLeastSq(initialParams, dataX, dataY, function, weightsY=weightsY)
270 for iteration in range(10):
271 resid = np.abs(dataY - function(polyFit, dataX))
272 if scaleResidual:
273 resid = resid / np.sqrt(dataY)
274 if weightType == 'Cauchy':
275 # Use Cauchy weighting. This is a soft weight.
276 # At [2, 3, 5, 10] sigma, weights are [.59, .39, .19, .05].
277 Z = resid / 2.385
278 weightsY = 1.0 / (1.0 + np.square(Z))
279 elif weightType == 'Anderson':
280 # Anderson+1972 weighting. This is a hard weight.
281 # At [2, 3, 5, 10] sigma, weights are [.67, .35, 0.0, 0.0].
282 Z = resid / (1.339 * np.pi)
283 weightsY = np.where(Z < 1.0, np.sinc(Z), 0.0)
284 elif weightType == 'bisquare':
285 # Beaton and Tukey (1974) biweight. This is a hard weight.
286 # At [2, 3, 5, 10] sigma, weights are [.81, .59, 0.0, 0.0].
287 Z = resid / 4.685
288 weightsY = np.where(Z < 1.0, 1.0 - np.square(Z), 0.0)
289 elif weightType == 'box':
290 # Hinich and Talwar (1975). This is a hard weight.
291 # At [2, 3, 5, 10] sigma, weights are [1.0, 0.0, 0.0, 0.0].
292 weightsY = np.where(resid < 2.795, 1.0, 0.0)
293 elif weightType == 'Welsch':
294 # Dennis and Welsch (1976). This is a hard weight.
295 # At [2, 3, 5, 10] sigma, weights are [.64, .36, .06, 1e-5].
296 Z = resid / 2.985
297 weightsY = np.exp(-1.0 * np.square(Z))
298 elif weightType == 'Huber':
299 # Huber (1964) weighting. This is a soft weight.
300 # At [2, 3, 5, 10] sigma, weights are [.67, .45, .27, .13].
301 Z = resid / 1.345
302 weightsY = np.where(Z < 1.0, 1.0, 1 / Z)
303 elif weightType == 'logistic':
304 # Logistic weighting. This is a soft weight.
305 # At [2, 3, 5, 10] sigma, weights are [.56, .40, .24, .12].
306 Z = resid / 1.205
307 weightsY = np.tanh(Z) / Z
308 elif weightType == 'Fair':
309 # Fair (1974) weighting. This is a soft weight.
310 # At [2, 3, 5, 10] sigma, weights are [.41, .32, .22, .12].
311 Z = resid / 1.4
312 weightsY = (1.0 / (1.0 + (Z)))
313 else:
314 raise RuntimeError(f"Unknown weighting type: {weightType}")
315 polyFit, polyFitErr, chiSq = fitLeastSq(initialParams, dataX, dataY, function, weightsY=weightsY)
317 return polyFit, polyFitErr, chiSq, weightsY
320def fitLeastSq(initialParams, dataX, dataY, function, weightsY=None):
321 """Do a fit and estimate the parameter errors using using
322 scipy.optimize.leastq.
324 optimize.leastsq returns the fractional covariance matrix. To
325 estimate the standard deviation of the fit parameters, multiply
326 the entries of this matrix by the unweighted reduced chi squared
327 and take the square root of the diagonal elements.
329 Parameters
330 ----------
331 initialParams : `list` [`float`]
332 initial values for fit parameters.
333 dataX : `numpy.array`, (N,)
334 Data in the abscissa axis.
335 dataY : `numpy.array`, (N,)
336 Data in the ordinate axis.
337 function : callable object (function)
338 Function to fit the data with.
339 weightsY : `numpy.array`, (N,)
340 Weights of the data in the ordinate axis.
342 Return
343 ------
344 pFitSingleLeastSquares : `list` [`float`]
345 List with fitted parameters.
346 pErrSingleLeastSquares : `list` [`float`]
347 List with errors for fitted parameters.
349 reducedChiSqSingleLeastSquares : `float`
350 Reduced chi squared, unweighted if weightsY is not provided.
351 """
352 if weightsY is None:
353 weightsY = np.ones(len(dataX))
355 def errFunc(p, x, y, weightsY=None):
356 if weightsY is None:
357 weightsY = np.ones(len(x))
358 return (function(p, x) - y)*weightsY
360 pFit, pCov, infoDict, errMessage, success = leastsq(errFunc, initialParams,
361 args=(dataX, dataY, weightsY), full_output=1,
362 epsfcn=0.0001)
364 if (len(dataY) > len(initialParams)) and pCov is not None:
365 reducedChiSq = calculateWeightedReducedChi2(dataY, function(pFit, dataX), weightsY, len(dataY),
366 len(initialParams))
367 pCov *= reducedChiSq
368 else:
369 pCov = np.zeros((len(initialParams), len(initialParams)))
370 pCov[:, :] = np.nan
371 reducedChiSq = np.nan
373 errorVec = []
374 for i in range(len(pFit)):
375 errorVec.append(np.fabs(pCov[i][i])**0.5)
377 pFitSingleLeastSquares = pFit
378 pErrSingleLeastSquares = np.array(errorVec)
380 return pFitSingleLeastSquares, pErrSingleLeastSquares, reducedChiSq
383def fitBootstrap(initialParams, dataX, dataY, function, weightsY=None, confidenceSigma=1.):
384 """Do a fit using least squares and bootstrap to estimate parameter errors.
386 The bootstrap error bars are calculated by fitting 100 random data sets.
388 Parameters
389 ----------
390 initialParams : `list` [`float`]
391 initial values for fit parameters.
392 dataX : `numpy.array`, (N,)
393 Data in the abscissa axis.
394 dataY : `numpy.array`, (N,)
395 Data in the ordinate axis.
396 function : callable object (function)
397 Function to fit the data with.
398 weightsY : `numpy.array`, (N,), optional.
399 Weights of the data in the ordinate axis.
400 confidenceSigma : `float`, optional.
401 Number of sigmas that determine confidence interval for the
402 bootstrap errors.
404 Return
405 ------
406 pFitBootstrap : `list` [`float`]
407 List with fitted parameters.
408 pErrBootstrap : `list` [`float`]
409 List with errors for fitted parameters.
410 reducedChiSqBootstrap : `float`
411 Reduced chi squared, unweighted if weightsY is not provided.
412 """
413 if weightsY is None:
414 weightsY = np.ones(len(dataX))
416 def errFunc(p, x, y, weightsY):
417 if weightsY is None:
418 weightsY = np.ones(len(x))
419 return (function(p, x) - y)*weightsY
421 # Fit first time
422 pFit, _ = leastsq(errFunc, initialParams, args=(dataX, dataY, weightsY), full_output=0)
424 # Get the stdev of the residuals
425 residuals = errFunc(pFit, dataX, dataY, weightsY)
426 # 100 random data sets are generated and fitted
427 pars = []
428 for i in range(100):
429 randomDelta = np.random.normal(0., np.fabs(residuals), len(dataY))
430 randomDataY = dataY + randomDelta
431 randomFit, _ = leastsq(errFunc, initialParams,
432 args=(dataX, randomDataY, weightsY), full_output=0)
433 pars.append(randomFit)
434 pars = np.array(pars)
435 meanPfit = np.mean(pars, 0)
437 # confidence interval for parameter estimates
438 errPfit = confidenceSigma*np.std(pars, 0)
439 pFitBootstrap = meanPfit
440 pErrBootstrap = errPfit
442 reducedChiSq = calculateWeightedReducedChi2(dataY, function(pFitBootstrap, dataX), weightsY, len(dataY),
443 len(initialParams))
444 return pFitBootstrap, pErrBootstrap, reducedChiSq
447def funcPolynomial(pars, x):
448 """Polynomial function definition
449 Parameters
450 ----------
451 params : `list`
452 Polynomial coefficients. Its length determines the polynomial order.
454 x : `numpy.array`, (N,)
455 Abscisa array.
457 Returns
458 -------
459 y : `numpy.array`, (N,)
460 Ordinate array after evaluating polynomial of order
461 len(pars)-1 at `x`.
462 """
463 return poly.polyval(x, [*pars])
466def funcAstier(pars, x):
467 """Single brighter-fatter parameter model for PTC; Equation 16 of
468 Astier+19.
470 Model:
472 C_{00}(mu) = frac{1}{2 a_{00} g**2} * [exp(2 a_{00} mu g ) - 1]
473 + n_{00} / g**2
475 Parameters
476 ----------
477 params : `list`
478 Parameters of the model: a00 (brightter-fatter), gain (e/ADU),
479 and noise (e^2).
480 x : `numpy.array`, (N,)
481 Signal mu (ADU).
483 Returns
484 -------
485 y : `numpy.array`, (N,)
486 C_00 (variance) in ADU^2.
487 """
488 a00, gain, noiseSquared = pars
489 return 0.5/(a00*gain*gain)*(np.exp(2*a00*x*gain)-1) + noiseSquared/(gain*gain) # C_00
492def ptcRolloffModel(params, mu):
493 """Piece-wise exponential saturation roll-off model of the PTC.
495 Parameters
496 ----------
497 params : `list`
498 Parameters of the model: muTurnoff (adu), tau (rolloff sharpness).
499 mu : `numpy.array`, (N,)
500 Signal mu (ADU).
502 Returns
503 -------
504 y : `numpy.array`, (N,)
505 Difference in variance in ADU^2.
506 """
507 muTurnoff, tau = params
508 return np.where(mu < muTurnoff, 0, np.exp(-(mu - muTurnoff) / tau) - 1)
511def funcAstierWithRolloff(pars, x):
512 """Single brighter-fatter parameter model for PTC; Equation 16 of
513 Astier+19 with an piece-wise exponential model for the PTC roll-off
514 of the PTC caused by saturation.
516 The nominal turnoff is calculated beforehand, and we extend the PTC
517 fit to include signal values up to 5% above the nominally computed
518 turnoff.
520 Model:
522 C_{00}(mu) = funcAstier - np.where(
523 x < muTurnoff,
524 0,
525 np.exp(-(x - muTurnoff) / tau) - 1,
526 )
528 Parameters
529 ----------
530 params : `list`
531 Parameters of the model: a00 (brightter-fatter), gain (e/ADU),
532 and noise (e^2), muTurnoff (adu), tau (rolloff sharpness).
533 x : `numpy.array`, (N,)
534 Signal mu (ADU).
536 Returns
537 -------
538 y : `numpy.array`, (N,)
539 C_00 (variance) in ADU^2.
540 """
541 # Initial computation of Astier+19 (Eqn 19).
542 originalModelPars = pars[:-2]
543 ptcRolloffModelPars = pars[-2:]
544 model = funcAstier(originalModelPars, x)
546 return model - ptcRolloffModel(ptcRolloffModelPars, x) # C_00
549def arrangeFlatsByExpTime(exposureList, exposureIdList, log=None):
550 """Arrange exposures by exposure time.
552 Parameters
553 ----------
554 exposureList : `list` [`lsst.pipe.base.connections.DeferredDatasetRef`]
555 Input list of exposure references.
556 exposureIdList : `list` [`int`]
557 List of exposure ids as obtained by dataId[`exposure`].
558 log : `lsst.utils.logging.LsstLogAdapter`, optional
559 Log object.
561 Returns
562 ------
563 flatsAtExpTime : `dict` [`float`,
564 `list`[(`lsst.pipe.base.connections.DeferredDatasetRef`,
565 `int`)]]
566 Dictionary that groups references to flat-field exposures
567 (and their IDs) that have the same exposure time (seconds).
568 """
569 flatsAtExpTime = {}
570 assert len(exposureList) == len(exposureIdList), "Different lengths for exp. list and exp. ID lists"
571 for expRef, expId in zip(exposureList, exposureIdList):
572 expTime = expRef.get(component='visitInfo').exposureTime
573 if not np.isfinite(expTime) and log is not None:
574 log.warning("Exposure %d has non-finite exposure time.", expId)
575 listAtExpTime = flatsAtExpTime.setdefault(expTime, [])
576 listAtExpTime.append((expRef, expId))
578 return flatsAtExpTime
581def arrangeFlatsByExpFlux(exposureList, exposureIdList, fluxKeyword, log=None):
582 """Arrange exposures by exposure flux.
584 Parameters
585 ----------
586 exposureList : `list` [`lsst.pipe.base.connections.DeferredDatasetRef`]
587 Input list of exposure references.
588 exposureIdList : `list` [`int`]
589 List of exposure ids as obtained by dataId[`exposure`].
590 fluxKeyword : `str`
591 Header keyword that contains the flux per exposure.
592 log : `lsst.utils.logging.LsstLogAdapter`, optional
593 Log object.
595 Returns
596 -------
597 flatsAtFlux : `dict` [`float`,
598 `list`[(`lsst.pipe.base.connections.DeferredDatasetRef`,
599 `int`)]]
600 Dictionary that groups references to flat-field exposures
601 (and their IDs) that have the same flux.
602 """
603 flatsAtExpFlux = {}
604 assert len(exposureList) == len(exposureIdList), "Different lengths for exp. list and exp. ID lists"
605 for expRef, expId in zip(exposureList, exposureIdList):
606 # Get flux from header, assuming it is in the metadata.
607 try:
608 expFlux = expRef.get().getMetadata()[fluxKeyword]
609 except KeyError:
610 # If it's missing from the header, continue; it will
611 # be caught and rejected when pairing exposures.
612 expFlux = None
613 if expFlux is None:
614 if log is not None:
615 log.warning("Exposure %d does not have valid header keyword %s.", expId, fluxKeyword)
616 expFlux = np.nan
617 listAtExpFlux = flatsAtExpFlux.setdefault(expFlux, [])
618 listAtExpFlux.append((expRef, expId))
620 return flatsAtExpFlux
623def arrangeFlatsByExpId(exposureList, exposureIdList):
624 """Arrange exposures by exposure ID.
626 There is no guarantee that this will properly group exposures,
627 but allows a sequence of flats that have different
628 illumination (despite having the same exposure time) to be
629 processed.
631 Parameters
632 ----------
633 exposureList : `list`[`lsst.pipe.base.connections.DeferredDatasetRef`]
634 Input list of exposure references.
635 exposureIdList : `list`[`int`]
636 List of exposure ids as obtained by dataId[`exposure`].
638 Returns
639 ------
640 flatsAtExpId : `dict` [`float`,
641 `list`[(`lsst.pipe.base.connections.DeferredDatasetRef`,
642 `int`)]]
643 Dictionary that groups references to flat-field exposures (and their
644 IDs) sequentially by their exposure id.
646 Notes
647 -----
649 This algorithm sorts the input exposure references by their exposure
650 id, and then assigns each pair of exposure references (exp_j, exp_{j+1})
651 to pair k, such that 2*k = j, where j is the python index of one of the
652 exposure references (starting from zero). By checking for the IndexError
653 while appending, we can ensure that there will only ever be fully
654 populated pairs.
655 """
656 flatsAtExpId = {}
657 assert len(exposureList) == len(exposureIdList), "Different lengths for exp. list and exp. ID lists"
658 # Sort exposures by expIds, which are in the second list `exposureIdList`.
659 sortedExposures = sorted(zip(exposureList, exposureIdList), key=lambda pair: pair[1])
661 for jPair, expTuple in enumerate(sortedExposures):
662 if (jPair + 1) % 2:
663 kPair = jPair // 2
664 listAtExpId = flatsAtExpId.setdefault(kPair, [])
665 try:
666 listAtExpId.append(expTuple)
667 listAtExpId.append(sortedExposures[jPair + 1])
668 except IndexError:
669 pass
671 return flatsAtExpId
674def extractCalibDate(calib):
675 """Extract common calibration metadata values that will be written to
676 output header.
678 Parameters
679 ----------
680 calib : `lsst.afw.image.Exposure` or `lsst.ip.isr.IsrCalib`
681 Calibration to pull date information from.
683 Returns
684 -------
685 dateString : `str`
686 Calibration creation date string to add to header.
687 """
688 if hasattr(calib, "getMetadata"):
689 if 'CALIB_CREATION_DATE' in calib.getMetadata():
690 return " ".join((calib.getMetadata().get("CALIB_CREATION_DATE", "Unknown"),
691 calib.getMetadata().get("CALIB_CREATION_TIME", "Unknown")))
692 else:
693 return " ".join((calib.getMetadata().get("CALIB_CREATE_DATE", "Unknown"),
694 calib.getMetadata().get("CALIB_CREATE_TIME", "Unknown")))
695 else:
696 return "Unknown Unknown"
699class CovFastFourierTransform:
700 """A class to compute (via FFT) the nearby pixels correlation function.
702 Implements appendix of Astier+19.
704 Parameters
705 ----------
706 diff : `numpy.array`
707 Image where to calculate the covariances (e.g., the difference
708 image of two flats).
709 w : `numpy.array`
710 Weight image (mask): it should consist of 1's (good pixel) and
711 0's (bad pixels).
712 fftShape : `tuple`
713 2d-tuple with the shape of the FFT
714 maxRangeCov : `int`
715 Maximum range for the covariances.
716 """
718 def __init__(self, diff, w, fftShape, maxRangeCov):
719 # check that the zero padding implied by "fft_shape"
720 # is large enough for the required correlation range
721 assert fftShape[0] > diff.shape[0]+maxRangeCov+1
722 assert fftShape[1] > diff.shape[1]+maxRangeCov+1
723 # for some reason related to numpy.fft.rfftn,
724 # the second dimension should be even, so
725 if fftShape[1]%2 == 1:
726 fftShape = (fftShape[0], fftShape[1]+1)
727 tIm = np.fft.rfft2(diff*w, fftShape)
728 tMask = np.fft.rfft2(w, fftShape)
729 # sum of "squares"
730 self.pCov = np.fft.irfft2(tIm*tIm.conjugate())
731 # sum of values
732 self.pMean = np.fft.irfft2(tIm*tMask.conjugate())
733 # number of w!=0 pixels.
734 self.pCount = np.fft.irfft2(tMask*tMask.conjugate())
736 def cov(self, dx, dy):
737 """Covariance for dx,dy averaged with dx,-dy if both non zero.
739 Implements appendix of Astier+19.
741 Parameters
742 ----------
743 dx : `int`
744 Lag in x
745 dy : `int`
746 Lag in y
748 Returns
749 -------
750 0.5*(cov1+cov2): `float`
751 Covariance at (dx, dy) lag
752 npix1+npix2 : `int`
753 Number of pixels used in covariance calculation.
755 Raises
756 ------
757 ValueError if number of pixels for a given lag is 0.
758 """
759 # compensate rounding errors
760 nPix1 = int(round(self.pCount[dy, dx]))
761 if nPix1 == 0:
762 raise ValueError(f"Could not compute covariance term {dy}, {dx}, as there are no good pixels.")
763 cov1 = self.pCov[dy, dx]/nPix1-self.pMean[dy, dx]*self.pMean[-dy, -dx]/(nPix1*nPix1)
764 if (dx == 0 or dy == 0):
765 return cov1, nPix1
766 nPix2 = int(round(self.pCount[-dy, dx]))
767 if nPix2 == 0:
768 raise ValueError("Could not compute covariance term {dy}, {dx} as there are no good pixels.")
769 cov2 = self.pCov[-dy, dx]/nPix2-self.pMean[-dy, dx]*self.pMean[dy, -dx]/(nPix2*nPix2)
770 return 0.5*(cov1+cov2), nPix1+nPix2
772 def reportCovFastFourierTransform(self, maxRange):
773 """Produce a list of tuples with covariances.
775 Implements appendix of Astier+19.
777 Parameters
778 ----------
779 maxRange : `int`
780 Maximum range of covariances.
782 Returns
783 -------
784 tupleVec : `list`
785 List with covariance tuples.
786 """
787 tupleVec = []
788 # (dy,dx) = (0,0) has to be first
789 for dy in range(maxRange+1):
790 for dx in range(maxRange+1):
791 cov, npix = self.cov(dx, dy)
792 if (dx == 0 and dy == 0):
793 var = cov
794 tupleVec.append((dx, dy, var, cov, npix))
795 return tupleVec
798def getFitDataFromCovariances(i, j, mu, fullCov, fullCovModel, fullCovSqrtWeights, gain=1.0,
799 divideByMu=False, returnMasked=False):
800 """Get measured signal and covariance, cov model, weigths, and mask at
801 covariance lag (i, j).
803 Parameters
804 ----------
805 i : `int`
806 Lag for covariance matrix.
807 j : `int`
808 Lag for covariance matrix.
809 mu : `list`
810 Mean signal values.
811 fullCov : `list` of `numpy.array`
812 Measured covariance matrices at each mean signal level in mu.
813 fullCovSqrtWeights : `list` of `numpy.array`
814 List of square root of measured covariances at each mean
815 signal level in mu.
816 fullCovModel : `list` of `numpy.array`
817 List of modeled covariances at each mean signal level in mu.
818 gain : `float`, optional
819 Gain, in e-/ADU. If other than 1.0 (default), the returned
820 quantities will be in electrons or powers of electrons.
821 divideByMu : `bool`, optional
822 Divide returned covariance, model, and weights by the mean
823 signal mu?
824 returnMasked : `bool`, optional
825 Use mask (based on weights) in returned arrays (mu,
826 covariance, and model)?
828 Returns
829 -------
830 mu : `numpy.array`
831 list of signal values at (i, j).
832 covariance : `numpy.array`
833 Covariance at (i, j) at each mean signal mu value (fullCov[:, i, j]).
834 covarianceModel : `numpy.array`
835 Covariance model at (i, j).
836 weights : `numpy.array`
837 Weights at (i, j).
838 maskFromWeights : `numpy.array`, optional
839 Boolean mask of the covariance at (i,j), where the weights
840 differ from 0.
841 """
842 mu = np.array(mu)
843 fullCov = np.array(fullCov)
844 fullCovModel = np.array(fullCovModel)
845 fullCovSqrtWeights = np.array(fullCovSqrtWeights)
846 covariance = fullCov[:, i, j]*(gain**2)
847 covarianceModel = fullCovModel[:, i, j]*(gain**2)
848 weights = fullCovSqrtWeights[:, i, j]/(gain**2)
850 maskFromWeights = weights != 0
851 if returnMasked:
852 weights = weights[maskFromWeights]
853 covarianceModel = covarianceModel[maskFromWeights]
854 mu = mu[maskFromWeights]
855 covariance = covariance[maskFromWeights]
857 if divideByMu:
858 covariance /= mu
859 covarianceModel /= mu
860 weights *= mu
861 return mu, covariance, covarianceModel, weights, maskFromWeights
864def symmetrize(inputArray):
865 """ Copy array over 4 quadrants prior to convolution.
867 Parameters
868 ----------
869 inputarray : `numpy.array`
870 Input array to symmetrize.
872 Returns
873 -------
874 aSym : `numpy.array`
875 Symmetrized array.
876 """
877 targetShape = list(inputArray.shape)
878 r1, r2 = inputArray.shape[-1], inputArray.shape[-2]
879 targetShape[-1] = 2*r1-1
880 targetShape[-2] = 2*r2-1
881 aSym = np.ndarray(tuple(targetShape))
882 aSym[..., r2-1:, r1-1:] = inputArray
883 aSym[..., r2-1:, r1-1::-1] = inputArray
884 aSym[..., r2-1::-1, r1-1::-1] = inputArray
885 aSym[..., r2-1::-1, r1-1:] = inputArray
887 return aSym
890def ddict2dict(d):
891 """Convert nested default dictionaries to regular dictionaries.
893 This is needed to prevent yaml persistence issues.
895 Parameters
896 ----------
897 d : `defaultdict`
898 A possibly nested set of `defaultdict`.
900 Returns
901 -------
902 dict : `dict`
903 A possibly nested set of `dict`.
904 """
905 for k, v in d.items():
906 if isinstance(v, dict):
907 d[k] = ddict2dict(v)
908 return dict(d)
911class Pol2D:
912 """2D Polynomial Regression.
914 Parameters
915 ----------
916 x : numpy.ndarray
917 Input array for the x-coordinate.
918 y : numpy.ndarray
919 Input array for the y-coordinate.
920 z : numpy.ndarray
921 Input array for the dependent variable.
922 order : int
923 Order of the polynomial.
924 w : numpy.ndarray, optional
925 Weight array for weighted regression. Default is None.
927 Notes
928 -----
929 Ported from by https://gitlab.in2p3.fr/astier/bfptc P. Astier.
931 Example:
932 >>> x = np.array([1, 2, 3])
933 >>> y = np.array([4, 5, 6])
934 >>> z = np.array([7, 8, 9])
935 >>> order = 2
936 >>> poly_reg = Pol2D(x, y, z, order)
937 >>> result = poly_reg.eval(2.5, 5.5)
938 """
939 def __init__(self, x, y, z, order, w=None):
940 """
941 orderx : `int`
942 Effective order in the x-direction.
943 ordery : `int`
944 Effective order in the y-direction.
945 coeff : `numpy.ndarray`
946 Coefficients of the polynomial regression.
947 """
948 self.orderx = min(order, x.shape[0] - 1)
949 self.ordery = min(order, x.shape[1] - 1)
950 G = self.monomials(x.ravel(), y.ravel())
951 if w is None:
952 self.coeff, _, rank, _ = np.linalg.lstsq(G, z.ravel(), rcond=None)
953 else:
954 self.coeff, _, rank, _ = np.linalg.lstsq((w.ravel() * G.T).T, z.ravel() * w.ravel(), rcond=None)
956 def monomials(self, x, y):
957 """
958 Generate the monomials matrix for the given x and y.
960 Parameters
961 ----------
962 x : numpy.ndarray
963 Input array for the x-coordinate.
964 y : numpy.ndarray
965 Input array for the y-coordinate.
967 Returns
968 -------
969 G : numpy.ndarray
970 Monomials matrix.
971 """
972 ncols = (self.orderx + 1) * (self.ordery + 1)
973 G = np.zeros(x.shape + (ncols,))
974 ij = itertools.product(range(self.orderx + 1), range(self.ordery + 1))
975 for k, (i, j) in enumerate(ij):
976 G[..., k] = x**i * y**j
977 return G
979 def eval(self, x, y):
980 """
981 Evaluate the polynomial at the given x and y coordinates.
983 Parameters
984 ----------
985 x : `float`
986 x-coordinate for evaluation.
987 y : `float`
988 y-coordinate for evaluation.
990 Returns
991 -------
992 result : `float`
993 Result of the polynomial evaluation.
994 """
995 G = self.monomials(x, y)
996 return np.dot(G, self.coeff)
999class AstierSplineLinearityFitter:
1000 """Class to fit the Astier spline linearity model.
1002 This is a spline fit with photodiode data based on a model
1003 from Pierre Astier, referenced in June 2023 from
1004 https://me.lsst.eu/astier/bot/7224D/model_nonlin.py
1006 This model fits a spline with (optional) nuisance parameters
1007 to allow for different linearity coefficients with different
1008 photodiode settings. The minimization is a least-squares
1009 fit with the residual of
1010 Sum[(S(mu_i) + mu_i)/(k_j * D_i - O) - 1]**2, where S(mu_i)
1011 is an Akima Spline function of mu_i, the observed flat-pair
1012 mean; D_j is the photo-diode measurement corresponding to
1013 that flat-pair; and k_j is a constant of proportionality
1014 which is over index j as it is allowed to
1015 be different based on different photodiode settings (e.g.
1016 CCOBCURR); and O is a constant offset to allow for light
1017 leaks (and is only fit if fit_offset=True). In
1018 addition, if config.doSplineFitTemperature is True then
1019 the fit will adjust mu such that
1020 mu = mu_input*(1 + alpha*(temperature_scaled))
1021 and temperature_scaled = T - T_ref. Finally, if
1022 config.doSplineFitTemporal is True then the fit will
1023 further adjust mu such that
1024 mu = mu_input*(1 + beta*(mjd_scaled))
1025 and mjd_scaled = mjd - mjd_ref.
1027 The fit has additional constraints to ensure that the spline
1028 goes through the (0, 0) point, as well as a normalization
1029 condition so that the average of the spline over the full
1030 range is 0. The normalization ensures that the spline only
1031 fits deviations from linearity, rather than the linear
1032 function itself which is degenerate with the gain.
1034 Parameters
1035 ----------
1036 nodes : `np.ndarray` (N,)
1037 Array of spline node locations.
1038 grouping_values : `np.ndarray` (M,)
1039 Array of values to group values for different proportionality
1040 constants (e.g. CCOBCURR).
1041 pd : `np.ndarray` (M,)
1042 Array of photodiode measurements.
1043 mu : `np.ndarray` (M,)
1044 Array of flat mean values.
1045 mask : `np.ndarray` (M,), optional
1046 Input mask (True is good point, False is bad point).
1047 log : `logging.logger`, optional
1048 Logger object to use for logging.
1049 fit_offset : `bool`, optional
1050 Fit a constant offset to allow for light leaks?
1051 fit_weights : `bool`, optional
1052 Fit for the weight parameters?
1053 weight_pars_start : `list` [ `float` ]
1054 Iterable of 2 weight parameters for weighed fit. These will
1055 be used as input if the weight parameters are not fit.
1056 fit_temperature : `bool`, optional
1057 Fit for temperature scaling?
1058 temperature_scaled : `np.ndarray` (M,), optional
1059 Input scaled temperature values (T - T_ref).
1060 max_signal_nearly_linear : `float`, optional
1061 Maximum signal that we are confident the input is nearly
1062 linear. This is used both for regularization, and for
1063 fitting the raw slope. Usually set to the ptc turnoff,
1064 above which we allow the spline to significantly deviate
1065 and do not demand the deviation to average to zero.
1066 fit_temporal : `bool`, optional
1067 Fit for temporal scaling?
1068 mjd_scaled : `np.ndarray` (M,), optional
1069 Input scaled mjd values (mjd - mjd_ref).
1070 max_frac_correction : `float`, optional
1071 Maximum fractional correction.
1072 max_correction : `float`, optional
1073 Maximum correction (unscaled).
1074 """
1075 def __init__(
1076 self,
1077 nodes,
1078 grouping_values,
1079 pd,
1080 mu,
1081 mask=None,
1082 log=None,
1083 fit_offset=True,
1084 fit_weights=False,
1085 weight_pars_start=[1.0, 0.0],
1086 fit_temperature=False,
1087 temperature_scaled=None,
1088 max_signal_nearly_linear=None,
1089 fit_temporal=False,
1090 mjd_scaled=None,
1091 max_frac_correction=0.25,
1092 max_correction=np.inf,
1093 ):
1094 self._pd = np.asarray(pd).copy()
1095 self._mu = np.asarray(mu).copy()
1096 self._grouping_values = grouping_values
1097 self.log = log if log else logging.getLogger(__name__)
1098 self._fit_offset = fit_offset
1099 self._fit_weights = fit_weights
1100 self._weight_pars_start = weight_pars_start
1101 self._fit_temperature = fit_temperature
1102 self._fit_temporal = fit_temporal
1103 self._max_frac_correction = max_frac_correction
1104 self._max_correction = max_correction
1106 self._nodes = nodes
1107 if nodes[0] != 0.0:
1108 raise ValueError("First node must be 0.0")
1109 if not np.all(np.diff(nodes) > 0):
1110 raise ValueError("Nodes must be sorted with no repeats.")
1112 # Find the group indices.
1113 u_group_values = np.unique(self._grouping_values)
1114 self.ngroup = len(u_group_values)
1116 self.group_indices = []
1117 for i in range(self.ngroup):
1118 self.group_indices.append(np.where(self._grouping_values == u_group_values[i])[0])
1120 # Weight values. Outliers will be set to 0.
1121 if mask is None:
1122 _mask = np.ones(len(mu), dtype=np.bool_)
1123 else:
1124 _mask = mask
1125 self._w = self.compute_weights(self._weight_pars_start, self._mu, _mask)
1127 if temperature_scaled is None:
1128 temperature_scaled = np.zeros(len(self._mu))
1129 else:
1130 if len(np.atleast_1d(temperature_scaled)) != len(self._mu):
1131 raise ValueError("temperature_scaled must be the same length as input mu.")
1132 self._temperature_scaled = temperature_scaled
1134 if mjd_scaled is None:
1135 mjd_scaled = np.zeros(len(self._mu))
1136 else:
1137 if len(np.atleast_1d(mjd_scaled)) != len(self._mu):
1138 raise ValueError("mjd_scaled must be the same length as input mu.")
1139 self._mjd_scaled = mjd_scaled
1141 # Values to regularize spline fit.
1142 if max_signal_nearly_linear is None:
1143 max_signal_nearly_linear = self._mu[self.mask].max()
1144 self._max_signal_nearly_linear = max_signal_nearly_linear
1145 self._x_regularize = np.linspace(0.0, self._max_signal_nearly_linear, 100)
1147 # Set up the indices for the fit parameters.
1148 self.par_indices = {
1149 "values": np.arange(len(self._nodes)),
1150 "groups": len(self._nodes) + np.arange(self.ngroup),
1151 "offset": np.zeros(0, dtype=np.int64),
1152 "weight_pars": np.zeros(0, dtype=np.int64),
1153 "temperature_coeff": np.zeros(0, dtype=np.int64),
1154 "temporal_coeff": np.zeros(0, dtype=np.int64),
1155 }
1156 if self._fit_offset:
1157 self.par_indices["offset"] = np.arange(1) + (
1158 len(self.par_indices["values"])
1159 + len(self.par_indices["groups"])
1160 )
1161 if self._fit_weights:
1162 self.par_indices["weight_pars"] = np.arange(2) + (
1163 len(self.par_indices["values"])
1164 + len(self.par_indices["groups"])
1165 + len(self.par_indices["offset"])
1166 )
1167 if self._fit_temperature:
1168 self.par_indices["temperature_coeff"] = np.arange(1) + (
1169 len(self.par_indices["values"])
1170 + len(self.par_indices["groups"])
1171 + len(self.par_indices["offset"])
1172 + len(self.par_indices["weight_pars"])
1173 )
1174 if self._fit_temporal:
1175 self.par_indices["temporal_coeff"] = np.arange(1) + (
1176 len(self.par_indices["values"])
1177 + len(self.par_indices["groups"])
1178 + len(self.par_indices["offset"])
1179 + len(self.par_indices["weight_pars"])
1180 + len(self.par_indices["temperature_coeff"])
1181 )
1183 self._npt = (
1184 len(self.par_indices["values"])
1185 + len(self.par_indices["groups"])
1186 + len(self.par_indices["offset"])
1187 + len(self.par_indices["weight_pars"])
1188 + len(self.par_indices["temperature_coeff"])
1189 + len(self.par_indices["temporal_coeff"])
1190 )
1192 @staticmethod
1193 def compute_weights(weight_pars, mu, mask):
1194 w = 1./np.sqrt(weight_pars[0]**2. + weight_pars[1]**2./mu)
1195 w[~mask] = 0.0
1197 return w
1199 def estimate_p0(self, use_all_for_normalization=False):
1200 """Estimate initial fit parameters.
1202 Parameters
1203 ----------
1204 use_all_for_normalization : `bool`, optional
1205 Use all the points (not just below turnoff) for normalization;
1206 this is for compatibility with the old linearizer fits.
1208 Returns
1209 -------
1210 p0 : `np.ndarray`
1211 Parameter array, with spline values (one for each node) followed
1212 by proportionality constants (one for each group); one extra
1213 for the offset O (if fit_offset was set to True); two extra
1214 for the weights (if fit_weights was set to True); one
1215 extra for the temperature coefficient (if fit_temperature was
1216 set to True); and one extra for the temporal coefficient (if
1217 fit_temporal was set to True).
1218 """
1219 p0 = np.zeros(self._npt)
1221 # We adjust this slightly because it increases the stability
1222 # of the initialization of the fit parameters.
1223 max_signal_nearly_linear = self._max_signal_nearly_linear
1224 self._max_signal_nearly_linear *= 0.8
1226 # Do a simple linear fit for each group.
1227 for i, indices in enumerate(self.group_indices):
1228 mask = self.mask[indices]
1229 if mask.sum() == 0:
1230 # There are no points in this group; we can ignore it.
1231 p0[self.par_indices["groups"][i]] = 1.0
1232 continue
1233 mu = self._mu[indices][mask]
1234 pd = self._pd[indices][mask]
1235 to_fit = (mu < self._max_signal_nearly_linear)
1236 # If we have no points now (low PTC turnoff) then
1237 # we just use the first 25% of the points in the group.
1238 if to_fit.sum() == 0:
1239 to_fit = (mu < np.nanpercentile(mu, 25.0))
1240 linfit = np.polyfit(pd[to_fit], mu[to_fit], 1)
1241 p0[self.par_indices["groups"][i]] = linfit[0]
1243 # This re-fit will be skipped for compatibility.
1244 # if use_all_for_normalization is set.
1245 if not use_all_for_normalization:
1246 params, cov_params, _, _, _ = leastsq(
1247 self._group_minfunc,
1248 p0[self.par_indices["groups"]],
1249 full_output=True,
1250 ftol=1e-5,
1251 maxfev=12000,
1252 )
1254 p0[self.par_indices["groups"]] = params
1256 # Look at the residuals...
1257 ratio_model = self.compute_ratio_model(
1258 self._nodes,
1259 self.group_indices,
1260 self.par_indices,
1261 p0,
1262 self._pd,
1263 self._mu,
1264 self._temperature_scaled,
1265 self._mjd_scaled,
1266 )
1267 # ...and adjust the linear parameters accordingly.
1268 if use_all_for_normalization:
1269 ok = self.mask
1270 else:
1271 ok = (self.mask & (self._mu < self._max_signal_nearly_linear))
1272 p0[self.par_indices["groups"]] *= np.median(ratio_model[ok])
1274 # Re-compute the residuals.
1275 ratio_model2 = self.compute_ratio_model(
1276 self._nodes,
1277 self.group_indices,
1278 self.par_indices,
1279 p0,
1280 self._pd,
1281 self._mu,
1282 self._temperature_scaled,
1283 self._mjd_scaled,
1284 )
1286 # And compute a first guess of the spline nodes.
1287 bins = np.clip(np.searchsorted(self._nodes, self._mu[self.mask]), 0, len(self._nodes) - 1)
1288 tot_arr = np.zeros(len(self._nodes))
1289 n_arr = np.zeros(len(self._nodes), dtype=int)
1290 np.add.at(tot_arr, bins, ratio_model2[self.mask])
1291 np.add.at(n_arr, bins, 1)
1293 ratio = np.ones(len(self._nodes))
1294 ratio[n_arr > 0] = tot_arr[n_arr > 0]/n_arr[n_arr > 0]
1295 ratio[0] = 1.0
1296 p0[self.par_indices["values"]] = (ratio - 1) * self._nodes
1298 if self._fit_offset:
1299 p0[self.par_indices["offset"]] = 0.0
1301 if self._fit_weights:
1302 p0[self.par_indices["weight_pars"]] = self._weight_pars_start
1304 # Restore the correct value
1305 self._max_signal_nearly_linear = max_signal_nearly_linear
1307 return p0
1309 @staticmethod
1310 def compute_ratio_model(
1311 nodes,
1312 group_indices,
1313 par_indices,
1314 pars,
1315 pd,
1316 mu,
1317 temperature_scaled,
1318 mjd_scaled,
1319 return_spline=False,
1320 ):
1321 """Compute the ratio model values.
1323 Parameters
1324 ----------
1325 nodes : `np.ndarray` (M,)
1326 Array of node positions.
1327 group_indices : `list` [`np.ndarray`]
1328 List of group indices, one array for each group.
1329 par_indices : `dict`
1330 Dictionary showing which indices in the pars belong to
1331 each set of fit values.
1332 pars : `np.ndarray`
1333 Parameter array, with spline values (one for each node) followed
1334 by proportionality constants (one for each group); one extra
1335 for the offset O (if fit_offset was set to True); two extra
1336 for the weights (if fit_weights was set to True); and one
1337 extra for the temperature coefficient (if fit_temperature was
1338 set to True).
1339 pd : `np.ndarray` (N,)
1340 Array of photodiode measurements.
1341 mu : `np.ndarray` (N,)
1342 Array of flat means.
1343 temperature_scaled : `np.ndarray` (N,)
1344 Array of scaled temperature values.
1345 mjd_scaled : `np.ndarray` (N,)
1346 Array of scaled mjd values.
1347 return_spline : `bool`, optional
1348 Return the spline interpolation as well as the model ratios?
1350 Returns
1351 -------
1352 ratio_models : `np.ndarray` (N,)
1353 Model ratio, (mu_i - S(mu_i) - O)/(k_j * D_i)
1354 spl : `scipy.interpolate.Akima1DInterpolator`
1355 Spline interpolator (returned if return_spline=True).
1356 """
1357 spl = Akima1DInterpolator(nodes, pars[par_indices["values"]], method="akima")
1359 # Check if we want to do just the left or both with temp scale.
1360 if len(par_indices["temperature_coeff"]) == 1:
1361 mu_corr = mu*(1. + pars[par_indices["temperature_coeff"]]*temperature_scaled)
1362 else:
1363 mu_corr = mu
1365 if len(par_indices["temporal_coeff"]) == 1:
1366 mu_corr = mu_corr*(1. + pars[par_indices["temporal_coeff"]]*mjd_scaled)
1368 numerator = mu_corr - spl(np.clip(mu_corr, nodes[0], nodes[-1]))
1369 if len(par_indices["offset"]) == 1:
1370 numerator -= pars[par_indices["offset"][0]]
1371 denominator = pd.copy()
1372 ngroup = len(group_indices)
1373 kj = pars[par_indices["groups"]]
1374 for j in range(ngroup):
1375 denominator[group_indices[j]] *= kj[j]
1377 if return_spline:
1378 return numerator / denominator, spl
1379 else:
1380 return numerator / denominator
1382 def fit(
1383 self,
1384 p0,
1385 min_iter=3,
1386 max_iter=20,
1387 max_rejection_per_iteration=5,
1388 n_sigma_clip=5.0,
1389 n_outer_iter=1,
1390 ):
1391 """
1392 Perform iterative fit for linear + spline model with offsets.
1394 Parameters
1395 ----------
1396 p0 : `np.ndarray`
1397 Initial parameter array, with spline values (one for each node)
1398 followed by proportionality constants (one for each group); one
1399 extra for the offset O (if fit_offset was set to True); two extra
1400 for the weights (if fit_weights was set to True); one
1401 extra for the temperature coefficient (if fit_temperature was
1402 set to True); and one extra for the temporal coefficient (if
1403 fit_temporal was set to True).
1404 min_iter : `int`, optional
1405 Minimum number of fit iterations.
1406 max_iter : `int`, optional
1407 Maximum number of fit iterations.
1408 max_rejection_per_iteration : `int`, optional
1409 Maximum number of points to reject per iteration.
1410 n_sigma_clip : `float`, optional
1411 Number of sigma to do clipping in each iteration.
1412 n_outer_iter : `int`, optional
1413 Number of "outer" iterations, where things are reset.
1414 """
1415 init_params = p0
1416 for j in range(n_outer_iter):
1417 self.log.info("Starting outer iteration %d", j)
1418 for k in range(max_iter):
1419 params, cov_params, _, msg, ierr = leastsq(
1420 self,
1421 init_params,
1422 full_output=True,
1423 ftol=1e-5,
1424 maxfev=12000,
1425 )
1426 init_params = params.copy()
1428 # We need to cut off the constraints at the end (there are more
1429 # residuals than data points.)
1430 res = self(params)[: len(self._w)]
1431 std_res = median_abs_deviation(res[self.good_points], scale="normal")
1432 sample = len(self.good_points)
1434 # We don't want to reject too many outliers at once.
1435 if sample > max_rejection_per_iteration:
1436 sres = np.sort(np.abs(res))
1437 cut = max(sres[-max_rejection_per_iteration], std_res*n_sigma_clip)
1438 else:
1439 cut = std_res*n_sigma_clip
1441 outliers = np.abs(res) > cut
1442 self._w[outliers] = 0
1443 if outliers.sum() != 0:
1444 self.log.info(
1445 "After iteration %d there are %d outliers (of %d).",
1446 k,
1447 outliers.sum(),
1448 sample,
1449 )
1450 elif k >= min_iter:
1451 self.log.info("After iteration %d there are no more outliers.", k)
1452 break
1454 # Reset for next "outer" iteration, if doing them.
1455 if n_outer_iter > 1:
1456 if j == 0:
1457 params_accum = params.copy()
1458 mu_orig = self._mu.copy()
1459 pd_orig = self._pd.copy()
1460 else:
1461 if self._fit_temperature:
1462 a = params_accum[self.par_indices["temperature_coeff"]]
1463 b = params[self.par_indices["temperature_coeff"]]
1464 params_accum[self.par_indices["temperature_coeff"]] = (a + b + a * b)
1466 if self._fit_temporal:
1467 a = params_accum[self.par_indices["temporal_coeff"]]
1468 b = params[self.par_indices["temporal_coeff"]]
1469 params_accum[self.par_indices["temporal_coeff"]] = (a + b + a * b)
1471 slope_mean = np.mean(params[self.par_indices["groups"]])
1472 adjustment = params[self.par_indices["groups"]] / slope_mean
1473 params_accum[self.par_indices["groups"]] *= adjustment
1475 if self._fit_offset:
1476 params_accum[self.par_indices["offset"]] += params[self.par_indices["offset"]]
1478 # The new linearizer values should simply be a replacement
1479 # because these are the better fit parameters we are
1480 # interested in.
1481 params_accum[self.par_indices["values"]] = params[self.par_indices["values"]]
1483 # Adjust the mu and pd values according to the previous fit.
1484 slope_mean = np.mean(params[self.par_indices["groups"]])
1486 if self._fit_temperature:
1487 self._mu *= (1.0
1488 + params[self.par_indices["temperature_coeff"]] * self._temperature_scaled)
1489 if self._fit_temporal:
1490 self._mu *= (1.0
1491 + params[self.par_indices["temporal_coeff"]] * self._mjd_scaled)
1492 for gi, group_index in enumerate(self.group_indices):
1493 self._pd[group_index] *= (params[self.par_indices["groups"][gi]] / slope_mean)
1494 if self._fit_offset:
1495 self._mu -= params[self.par_indices["offset"]]
1497 # Set parameters for next outer fit iteration.
1498 init_params = self.estimate_p0(use_all_for_normalization=True)
1500 if n_outer_iter > 1:
1501 # When doing multiple outer iterations, we want to return the
1502 # "accumulated" parameters, and reset the input values.
1503 params = params_accum
1505 # Reset input values
1506 self._mu = mu_orig
1507 self._pd = pd_orig
1509 return params
1511 @property
1512 def mask(self):
1513 return (self._w > 0)
1515 @property
1516 def good_points(self):
1517 return self.mask.nonzero()[0]
1519 def compute_chisq_dof(self, pars):
1520 """Compute the chi-squared per degree of freedom for a set of pars.
1522 Parameters
1523 ----------
1524 pars : `np.ndarray`
1525 Parameter array.
1527 Returns
1528 -------
1529 chisq_dof : `float`
1530 Chi-squared per degree of freedom.
1531 """
1532 resids = self(pars)[0: len(self.mask)]
1533 chisq = np.sum(resids[self.mask]**2.)
1534 dof = self.mask.sum() - self.ngroup
1535 if self._fit_temporal:
1536 dof -= 1
1537 if self._fit_temperature:
1538 dof -= 1
1539 if self._fit_offset:
1540 dof -= 1
1541 if self._fit_weights:
1542 dof -= 2
1544 return chisq/dof
1546 def __call__(self, pars):
1548 ratio_model, spl = self.compute_ratio_model(
1549 self._nodes,
1550 self.group_indices,
1551 self.par_indices,
1552 pars,
1553 self._pd,
1554 self._mu,
1555 self._temperature_scaled,
1556 self._mjd_scaled,
1557 return_spline=True,
1558 )
1560 _mask = self.mask
1561 # Update the weights if we are fitting them.
1562 if self._fit_weights:
1563 self._w = self.compute_weights(pars[self.par_indices["weight_pars"]], self._mu, _mask)
1564 resid = self._w*(ratio_model - 1.0)
1566 # Ensure masked points have 0 residual.
1567 resid[~_mask] = 0.0
1569 constraint = [1e3 * np.mean(spl(np.clip(self._x_regularize, self._nodes[0], self._nodes[-1])))]
1570 # 0 should transform to 0
1571 constraint.append(spl(0)*1e10)
1572 # Use a Jeffreys prior on the weight if we are fitting it.
1573 if self._fit_weights:
1574 # This factor ensures that log(fact * w) is negative.
1575 fact = 1e-3 / self._w.max()
1576 # We only add non-zero weights to the constraint array.
1577 log_w = np.sqrt(-2.*np.log(fact*self._w[self._w > 0]))
1578 constraint = np.hstack([constraint, log_w])
1580 # Don't let it get to >5% correction.
1581 values = pars[self.par_indices["values"]]
1582 if np.abs(values[-1])/self._nodes[-1] > self._max_frac_correction \
1583 or np.abs(values[-1]) > self._max_correction:
1584 extra_constraint = 1e10
1585 else:
1586 extra_constraint = 0
1588 return np.hstack([resid, constraint, extra_constraint])
1590 def _group_minfunc(self, group_pars):
1591 """Minimization function for initial group parameters.
1593 Parameters
1594 ----------
1595 group_pars : `np.ndarray`
1596 Array of slope parameters.
1597 """
1598 full_pars = np.zeros(self._npt)
1599 full_pars[self.par_indices["groups"]] = group_pars
1601 ratio_model = self.compute_ratio_model(
1602 self._nodes,
1603 self.group_indices,
1604 self.par_indices,
1605 full_pars,
1606 self._pd,
1607 self._mu,
1608 self._temperature_scaled,
1609 self._mjd_scaled,
1610 return_spline=False,
1611 )
1612 wt = self._w.copy()
1613 wt[self._mu > self._max_signal_nearly_linear] = 0.0
1614 return wt*(ratio_model - 1.0)
1617def getReadNoise(exposure, ampName, taskMetadata=None, log=None):
1618 """Gets readout noise for an amp from ISR metadata.
1620 If possible, this attempts to get the now-standard headers
1621 added to the exposure itself. If not found there, the ISR
1622 TaskMetadata is searched. If neither of these has the value,
1623 warn and set the read noise to NaN.
1625 Parameters
1626 ----------
1627 exposure : `lsst.afw.image.Exposure`
1628 Exposure to check for read noise first.
1629 ampName : `str`
1630 Amplifier name.
1631 taskMetadata : `lsst.pipe.base.TaskMetadata`, optional
1632 List of exposures metadata from ISR for this exposure.
1633 log : `logging.logger`, optional
1634 Log for messages.
1636 Returns
1637 -------
1638 readNoise : `float`
1639 The read noise for this set of exposure/amplifier.
1640 """
1641 exposureMetadata = exposure.getMetadata()
1643 # Try from the exposure first.
1644 expectedKey = f"LSST ISR OVERSCAN RESIDUAL SERIAL STDEV {ampName}"
1645 if expectedKey in exposureMetadata:
1646 return exposureMetadata[expectedKey]
1648 # If not, try getting it from the task metadata.
1649 if taskMetadata:
1650 expectedKey = f"RESIDUAL STDEV {ampName}"
1651 if "isr" in taskMetadata:
1652 if expectedKey in taskMetadata["isr"]:
1653 return taskMetadata["isr"][expectedKey]
1655 log = log if log else logging.getLogger(__name__)
1656 log.warning("Median readout noise from ISR metadata for amp %s "
1657 "could not be found.", ampName)
1658 return np.nan
1661def ampOffsetGainRatioFixup(ptc, minAdu, maxAdu, log=None):
1662 """Apply gain ratio fixup using amp offsets.
1664 Parameters
1665 ----------
1666 ptc : `lsst.ip.isr.PhotonTransferCurveDataset`
1667 Input PTC. Will be modified in place.
1668 minAdu : `float`
1669 Minimum number of ADU in mean of amplifier to use for
1670 computing gain ratio fixup.
1671 maxAdu : `float`
1672 Maximum number of ADU in mean of amplifier to use for
1673 computing gain ratio fixup.
1674 log : `lsst.utils.logging.LsstLogAdapter`, optional
1675 Log object.
1676 """
1677 # We need to find the reference amplifier.
1678 # Find an amp near the middle to use as a pivot.
1679 if log is None:
1680 log = logging.getLogger(__name__)
1682 # We first check for which amps have gain measurements
1683 # (fully bad amps are filled with NaN for gain.)
1684 gainArray = np.zeros(len(ptc.ampNames))
1685 for i, ampName in enumerate(ptc.ampNames):
1686 gainArray[i] = ptc.gain[ampName]
1687 good, = np.where(np.isfinite(gainArray))
1689 if len(good) > 1:
1690 # This only works with more than 1 good amp.
1692 # We sort the gains and take the one that is closest
1693 # to the median to use as the reference amplifier for
1694 # gain ratios.
1695 st = np.argsort(gainArray[good])
1696 midAmp = good[st[int(0.5*len(good))]]
1697 midAmpName = ptc.ampNames[midAmp]
1699 log.info("Using amplifier %s as the reference for doLinearityGainRatioFixup.", midAmpName)
1701 # First pass, we need to compute the corrections.
1702 corrections = {}
1703 for ampName in ptc.ampNames:
1704 if not np.isfinite(ptc.gain[ampName]) or ampName == midAmpName:
1705 continue
1707 ratioPtc = ptc.gain[ampName] / ptc.gain[midAmpName]
1709 deltas = ptc.ampOffsets[ampName] - ptc.ampOffsets[midAmpName]
1710 use = (
1711 (ptc.expIdMask[ampName])
1712 & (np.isfinite(deltas))
1713 & (ptc.finalMeans[ampName] >= minAdu)
1714 & (ptc.finalMeans[ampName] <= maxAdu)
1715 & (np.isfinite(ptc.finalMeans[midAmpName]))
1716 & (ptc.expIdMask[midAmpName])
1717 )
1718 if use.sum() < 3:
1719 log.warning("Not enough good amp offset measurements to fix up amp %s "
1720 "gains from amp ratios.", ampName)
1721 continue
1723 ratios = 1. / (deltas / ptc.finalMeans[midAmpName] + 1.0)
1724 ratio = np.median(ratios[use])
1725 corrections[ampName] = ratio / ratioPtc
1727 # For the final correction, we need to make sure that the
1728 # reference amplifier is included. By definition, it has a
1729 # correction factor of 1.0 before any final fix.
1730 corrections[midAmpName] = 1.0
1732 # Adjust the median correction to be 1.0 so we do not
1733 # change the gain of the detector on average.
1734 # This is needed in case the reference amplifier is
1735 # skewed in terms of offsets even though it has the median
1736 # gain.
1737 medCorrection = np.median([corrections[key] for key in corrections])
1739 for ampName in ptc.ampNames:
1740 if ampName not in corrections:
1741 continue
1743 correction = corrections[ampName] / medCorrection
1744 newGain = ptc.gain[ampName] * correction
1745 log.info(
1746 "Adjusting gain from amplifier %s by factor of %.5f (from %.5f to %.5f)",
1747 ampName,
1748 correction,
1749 ptc.gain[ampName],
1750 newGain,
1751 )
1752 # Copying the value should not be necessary, but we record
1753 # it just in case.
1754 ptc.gainUnadjusted[ampName] = ptc.gain[ampName]
1755 ptc.gain[ampName] = newGain
1756 else:
1757 log.warning("Cannot apply ampOffsetGainRatioFixup with fewer than 2 good amplifiers.")
1760class FlatGradientFitter:
1761 """Class to fit various large-scale flat-field gradients.
1763 This fitter will take arrays of x/y/value and (by default) fit a radial
1764 gradient, using a spline function at the specified nodes. The fitter
1765 will also fit out a nuisance parameter for the ratio of the ITL/E2V
1766 throughput (though in general this could work for any such focal plane).
1767 The focal plane origin is set by fp_centroid_x and fp_centroid_y which
1768 is the center of the radial gradient, though it may be modified if
1769 fit_centroid is True. In addition, the fitter may fit a linear gradient
1770 in x/y if fit_gradient is True. The "pivot" of the gradient is at
1771 fp_centroid_x/fp_centroid_y.
1773 Parameters
1774 ----------
1775 nodes : `np.ndarray`
1776 Array of spline nodes for radial spline.
1777 x : `np.ndarray`
1778 Array of x values for points to fit (mm).
1779 y : `np.ndarray`
1780 Array of y values for points to fit (mm).
1781 value : `np.ndarray`
1782 Array of values describing the flat field to fit the gradient.
1783 itl_indices : `np.ndarray`
1784 Array of indices corresponding to the ITL detectors.
1785 constrain_zero : `bool`, optional
1786 Constrain the outermost radial spline value to 0?
1787 fit_centroid : `bool`, optional
1788 Fit an additional centroid offset?
1789 fit_gradient : `bool`, optional
1790 Fit an additional plane gradient?
1791 fp_centroid_x : `float`, optional
1792 Focal plane centroid x (mm).
1793 fp_centroid_y : `float`, optional
1794 Focal plane centroid y (mm).
1795 """
1796 def __init__(
1797 self,
1798 nodes,
1799 x,
1800 y,
1801 value,
1802 itl_indices,
1803 constrain_zero=True,
1804 fit_centroid=False,
1805 fit_gradient=False,
1806 fp_centroid_x=0.0,
1807 fp_centroid_y=0.0
1808 ):
1809 self._nodes = nodes
1810 self._x = x
1811 self._y = y
1812 self._value = value
1813 self._itl_indices = itl_indices
1814 self._fp_centroid_x = fp_centroid_x
1815 self._fp_centroid_y = fp_centroid_y
1817 self._constrain_zero = constrain_zero
1819 self._fit_centroid = fit_centroid
1820 self._fit_gradient = fit_gradient
1822 self.indices = {"spline": np.arange(len(nodes))}
1823 npar = len(nodes)
1825 self._fit_itl_ratio = False
1826 if len(itl_indices) > 0:
1827 self._fit_itl_ratio = True
1828 self.indices["itl_ratio"] = npar
1829 npar += 1
1831 radius = np.sqrt((self._x - self._fp_centroid_x)**2. + (self._y - self._fp_centroid_y)**2.)
1833 if fit_centroid:
1834 self.indices["centroid_delta"] = np.arange(2) + npar
1835 npar += 2
1836 else:
1837 self._radius = radius
1839 if fit_gradient:
1840 self.indices["gradient"] = np.arange(2) + npar
1841 npar += 2
1843 self._npar = npar
1845 self._bounds = [(None, None)]*npar
1847 if self._constrain_zero:
1848 self._bounds[self.indices["spline"][-1]] = (0.0, 0.0)
1850 def compute_p0(self, itl_ratio=None):
1851 """Compute initial guess for fit parameters.
1853 Returns
1854 -------
1855 pars : `np.ndarray`
1856 Array of first guess fit parameters.
1857 """
1858 pars = np.zeros(self._npar)
1859 value = self._value.copy()
1861 if itl_ratio is not None and self._fit_itl_ratio:
1862 value[self._itl_indices] /= itl_ratio
1864 # Initial spline values
1865 radius = np.sqrt((self._x - self._fp_centroid_x)**2. + (self._y - self._fp_centroid_y)**2.)
1866 for i, index in enumerate(self.indices["spline"]):
1867 if i == 0:
1868 low = self._nodes[i]
1869 else:
1870 low = (self._nodes[i - 1] + self._nodes[i])/2.
1871 if i == (len(self._nodes) - 1):
1872 high = self._nodes[i]
1873 else:
1874 high = (self._nodes[i] + self._nodes[i + 1])/2.
1875 u = ((radius > low) & (radius < high))
1876 if u.sum() == 0:
1877 pars[index] = 0.0
1878 else:
1879 pars[index] = np.median(value[u])
1881 if self._constrain_zero:
1882 pars[self.indices["spline"][-1]] = 0.0
1884 spl = Akima1DInterpolator(self._nodes, pars[self.indices["spline"]], method="akima")
1885 model = spl(radius)
1886 resid_ratio = value / model
1888 if self._fit_itl_ratio:
1889 if itl_ratio is not None:
1890 pars[self.indices["itl_ratio"]] = itl_ratio
1891 else:
1892 e2v_indices = np.delete(np.arange(len(self._value)), self._itl_indices)
1894 itl_inner = radius[self._itl_indices] < 0.8*np.max(radius)
1895 e2v_inner = radius[e2v_indices] < 0.8*np.max(radius)
1897 itl_median = np.nanmedian(resid_ratio[self._itl_indices][itl_inner])
1898 e2v_median = np.nanmedian(resid_ratio[e2v_indices][e2v_inner])
1900 pars[self.indices["itl_ratio"]] = itl_median / e2v_median
1902 if self._fit_centroid:
1903 pars[self.indices["centroid_delta"]] = [0.0, 0.0]
1905 if self._fit_gradient:
1906 resid_ratio = self._value / model
1908 ok = (np.isfinite(resid_ratio) & (model > 0.5))
1910 fit = np.polyfit(self._x[ok], resid_ratio[ok], 1)
1911 pars[self.indices["gradient"][0]] = fit[0]
1913 fit = np.polyfit(self._y[ok], resid_ratio[ok], 1)
1914 pars[self.indices["gradient"][1]] = fit[0]
1916 return pars
1918 def fit(self, p0, freeze_itl_ratio=False, fit_eps=1e-8, fit_gtol=1e-10):
1919 """Do a non-linear minimization to fit the parameters.
1921 Parameters
1922 ----------
1923 p0 : `np.ndarray`
1924 Array of initial parameter estimates.
1925 freeze_itl_ratio : `bool`, optional
1926 Freeze the ITL ratio in the fit?
1927 fit_eps : `float`, optional
1928 Value of ``eps`` to send to the scipy minimizer.
1929 fit_gtol : `float`, optional
1930 Value of ``gtol`` to send to the scipy minimizer.
1932 Returns
1933 -------
1934 pars : `np.ndarray`
1935 Array of parameters. Use ``fitter.indices`` for the
1936 dictionary to map parameters to subsets.
1937 """
1938 bounds = self._bounds
1939 if freeze_itl_ratio and self._fit_itl_ratio:
1940 ind = self.indices["itl_ratio"]
1941 par = p0[ind]
1942 bounds[ind] = (par, par)
1944 res = minimize(
1945 self,
1946 p0,
1947 method="L-BFGS-B",
1948 jac=False,
1949 bounds=bounds,
1950 options={
1951 "maxfun": 10000,
1952 "maxiter": 10000,
1953 "maxcor": 20,
1954 "eps": fit_eps,
1955 "gtol": fit_gtol,
1956 },
1957 callback=None,
1958 )
1959 pars = res.x
1961 return pars
1963 def compute_model(self, pars):
1964 """Compute the model given a set of parameters.
1966 Parameters
1967 ----------
1968 pars : `np.ndarray`
1969 Parameter array to compute model.
1971 Returns
1972 -------
1973 model_array : `np.ndarray`
1974 Array of model parameters at the input x/y.
1975 """
1976 spl = Akima1DInterpolator(self._nodes, pars[self.indices["spline"]], method="akima")
1977 if self._fit_centroid:
1978 centroid_delta = pars[self.indices["centroid_delta"]]
1979 centroid_x = self._fp_centroid_x + centroid_delta[0]
1980 centroid_y = self._fp_centroid_y + centroid_delta[1]
1981 radius = np.sqrt((self._x - centroid_x)**2. + (self._y - centroid_y)**2.)
1982 else:
1983 radius = self._radius
1985 model = spl(np.clip(radius, self._nodes[0], self._nodes[-1]))
1986 if self._fit_itl_ratio:
1987 model[self._itl_indices] *= pars[self.indices["itl_ratio"]]
1989 if self._fit_gradient:
1990 a, b = pars[self.indices["gradient"]]
1991 gradient = 1 + a*(self._x - self._fp_centroid_x) + b*(self._y - self._fp_centroid_y)
1992 model /= gradient
1994 return model
1996 def __call__(self, pars):
1997 """Compute the cost function for a set of parameters.
1999 Parameters
2000 ----------
2001 pars : `np.ndarray`
2002 Parameter array to compute model.
2004 Returns
2005 -------
2006 cost : `float`
2007 Cost value computed from the absolute deviation.
2008 """
2010 model = self.compute_model(pars)
2012 absdev = np.abs(self._value - model)
2013 t = np.sum(absdev.astype(np.float64))
2015 return t
2018class FlatGainRatioFitter:
2019 """Class to fit amplifier gain ratios, removing a simple gradient.
2021 This fitter will take arrays of x/y/amp_num/value and fit amplifier
2022 gain ratios, using one amplifier as the fixed point. The fitter
2023 uses a low-order chebyshev polynomial to fit out the gradient, with
2024 amp ratios on top of this.
2026 Parameters
2027 ----------
2028 bbox : `lsst.geom.Box2I`
2029 Bounding box for Chebyshev polynomial gradient.
2030 order : `int`
2031 Order of Chebyshev polynomial.
2032 x : `np.ndarray`
2033 Array of x values for points to fit (detector pixels).
2034 y : `np.ndarray`
2035 Array of y values for points to fit (detector pixels).
2036 amp_index : `np.ndarray`
2037 Array of amp numbers associated with each x/y pair.
2038 value : `np.ndarray`
2039 Flat value at each x/y pair.
2040 amps : `np.ndarray`
2041 Array of unique amplifier numbers that will be parameterized.
2042 Any of these amps that does not have any data with the same
2043 amp_index will be set to 1.0.
2044 fixed_amp_index : `int`
2045 Amplifier number to keep fixed.
2046 """
2047 def __init__(self, bbox, order, x, y, amp_index, value, amps, fixed_amp_index):
2048 self._bbox = bbox
2049 self._order = order
2050 self._x = x.astype(np.float64)
2051 self._y = y.astype(np.float64)
2052 self._amp_index = amp_index
2053 self._value = value.astype(np.float64)
2054 self._fixed_amp_index = fixed_amp_index
2056 self.indices = {"chebyshev": np.arange((order + 1) * (order + 1))}
2057 npar = len(self.indices["chebyshev"])
2059 self._amps = amps
2060 self._n_amp = len(self._amps)
2062 self.indices["amp_pars"] = np.arange(self._n_amp) + npar
2063 npar += self._n_amp
2065 self._npar = npar
2067 self._amp_indices = {}
2068 for i in range(self._n_amp):
2069 amp_index = self._amps[i]
2070 self._amp_indices[amp_index] = (self._amp_index == amp_index)
2072 def fit(self, n_iter=10, nsig_clip=5.0):
2073 """Fit the amp ratio parameters.
2075 This uses an iterative fit, where it fits a Chebyshev gradient,
2076 computes amp ratios, and re-fits the gradient.
2078 Parameters
2079 ----------
2080 n_iter : `int`, optional
2081 Number of iterations for fit.
2082 nsig_clip : `float`, optional
2083 Number of sigma in gain correction distribution to clip when
2084 fitting out the Chebyshev gradient.
2086 Returns
2087 -------
2088 pars : `np.ndarray`
2089 Chebyshev parameters and amp offset parameters.
2090 """
2091 value = self._value.copy()
2093 control = lsst.afw.math.ChebyshevBoundedFieldControl()
2094 control.orderX = self._order
2095 control.orderY = self._order
2096 control.triangular = False
2098 pars = np.zeros(self._npar)
2100 pars[self.indices["amp_pars"]] = 1.0
2102 field_use = np.ones(len(value), dtype=np.bool_)
2104 for i in range(n_iter):
2105 field = lsst.afw.math.ChebyshevBoundedField.fit(
2106 self._bbox,
2107 self._x[field_use],
2108 self._y[field_use],
2109 value[field_use],
2110 control,
2111 )
2113 pars[self.indices["chebyshev"][:]] = field.getCoefficients().ravel()
2115 ratio = self._value / field.evaluate(self._x, self._y)
2117 fixed_med = np.median(ratio[self._amp_indices[self._fixed_amp_index]])
2118 ratio /= fixed_med
2120 pars[self.indices["amp_pars"][self._fixed_amp_index]] = 1.0
2122 value = self._value.copy()
2124 for j in range(self._n_amp):
2125 amp_index = self._amps[j]
2127 if np.sum(self._amp_indices[amp_index]) == 0:
2128 continue
2130 pars[self.indices["amp_pars"][j]] = np.median(ratio[self._amp_indices[amp_index]])
2132 amp_pars = pars[self.indices["amp_pars"]]
2133 med = np.median(amp_pars)
2134 sig = median_abs_deviation(amp_pars, scale="normal")
2135 amp_gradient_bad, = np.where(np.abs(amp_pars - med) > nsig_clip * sig)
2136 field_use[:] = True
2137 for agb in amp_gradient_bad:
2138 field_use[self._amp_indices[agb]] = False
2140 return pars
2142 def compute_model(self, pars):
2143 """Compute the gradient/amp ratio model for a given set of parameters.
2145 Parameters
2146 ----------
2147 pars : `np.ndarray`
2148 Chebyshev parameters and amp offset parameters.
2150 Returns
2151 -------
2152 model : `np.ndarray`
2153 The model at each x/y pair.
2154 """
2155 field = lsst.afw.math.ChebyshevBoundedField(
2156 self._bbox,
2157 pars[self.indices["chebyshev"]].reshape(self._order + 1, self._order + 1),
2158 )
2159 model = field.evaluate(self._x, self._y)
2161 for i in range(self._n_amp):
2162 amp_index = self._amps[i]
2164 model[self._amp_indices[amp_index]] *= pars[self.indices["amp_pars"][i]]
2166 return model
2168 def __call__(self, pars):
2169 """Compute the cost function for a set of parameters.
2171 Parameters
2172 ----------
2173 pars : `np.ndarray`
2174 Chebyshev parameters and amp offset parameters.
2176 Returns
2177 -------
2178 cost : `float`
2179 Cost value computed from the absolute deviation.
2180 """
2181 model = self.compute_model(pars)
2183 absdev = np.abs(self._value - model)
2184 t = np.sum(absdev.astype(np.float64))
2186 return t
2189def bin_focal_plane(
2190 exposure_handle_dict,
2191 detector_boundary,
2192 bin_factor,
2193 defect_handle_dict={},
2194 include_itl_flag=True,
2195):
2196 """Bin all the detectors into the full focal plane.
2198 This function reads in images; takes a simple average if there
2199 are more than one input per detector; excludes detector edges;
2200 and bins according to the bin factor. The output is a struct
2201 with focal plane coordinates, detector numbers, and a flag
2202 if the detector is an ITL detector.
2204 Parameters
2205 ----------
2206 exposure_handle_dict : `dict`
2207 Dict keyed by detector (`int`), each element is a list
2208 of `lsst.daf.butler.DeferredDatasetHandle` that will be averaged.
2209 detector_boundary : `int`
2210 Boundary of the detector to ignore (pixels).
2211 bin_factor : `int`
2212 Binning factor. Detectors will be cropped (after applying the
2213 ``detector_boundary``) such that there are no partially
2214 covered binned pixels.
2215 defect_handle_dict : `dict`, optional
2216 Dict keyed by detector (`int`), each element is a
2217 `lsst.daf.butler.DeferredDatasetHandle` for defects to be applied.
2218 include_itl_flag : `bool`, optional
2219 Include a flag for which detectors are ITL?
2221 Returns
2222 -------
2223 binned : `astropy.table.Table`
2224 Table with focal plane positions at the center of each bin
2225 (``xf``, ``yf``); average image values (``value``); and detector
2226 number (``detector``).
2227 """
2228 xf_arrays = []
2229 yf_arrays = []
2230 value_arrays = []
2231 detector_arrays = []
2232 itl_arrays = []
2234 for det in exposure_handle_dict.keys():
2235 flat = exposure_handle_dict[det].get()
2236 defect_handle = defect_handle_dict.get(det, None)
2237 if defect_handle is not None:
2238 defects = defect_handle.get()
2239 else:
2240 defects = None
2242 detector = flat.getDetector()
2244 # Mask out defects if we have them.
2245 if defects is not None:
2246 for defect in defects:
2247 flat.image[defect.getBBox()].array[:, :] = np.nan
2249 # Mask NO_DATA pixels if we have them.
2250 no_data = ((flat.mask.array[:, :] & flat.mask.getPlaneBitMask("NO_DATA")) > 0)
2251 flat.image.array[no_data] = np.nan
2253 # Bin the image, avoiding the boundary and the masked pixels.
2254 # We also make sure we are using an integral number of
2255 # steps to avoid partially covered binned pixels.
2257 arr = flat.image.array
2259 n_step_y = (arr.shape[0] - (2 * detector_boundary)) // bin_factor
2260 y_min = detector_boundary
2261 y_max = bin_factor * n_step_y + y_min
2262 n_step_x = (arr.shape[1] - (2 * detector_boundary)) // bin_factor
2263 x_min = detector_boundary
2264 x_max = bin_factor * n_step_x + x_min
2266 arr = arr[y_min: y_max, x_min: x_max]
2267 binned = arr.reshape((n_step_y, bin_factor, n_step_x, bin_factor))
2268 with warnings.catch_warnings():
2269 warnings.filterwarnings("ignore", r"Mean of empty")
2270 binned = np.nanmean(binned, axis=1)
2271 binned = np.nanmean(binned, axis=2)
2273 xx = np.arange(binned.shape[1]) * bin_factor + bin_factor / 2. + x_min
2274 yy = np.arange(binned.shape[0]) * bin_factor + bin_factor / 2. + y_min
2275 x, y = np.meshgrid(xx, yy)
2276 x = x.ravel()
2277 y = y.ravel()
2278 value = binned.ravel()
2280 # Transform to focal plane coordinates.
2281 transform = detector.getTransform(lsst.afw.cameraGeom.PIXELS, lsst.afw.cameraGeom.FOCAL_PLANE)
2282 xy = np.vstack((x, y))
2283 xf, yf = np.vsplit(transform.getMapping().applyForward(xy), 2)
2284 xf = xf.ravel()
2285 yf = yf.ravel()
2287 if include_itl_flag:
2288 is_itl = np.zeros(len(value), dtype=np.bool_)
2289 # We use this check so that ITL matches ITL science detectors,
2290 # ITL_WF wavefront detectors, and pseudoITL test detectors.
2291 is_itl[:] = ("ITL" in detector.getPhysicalType())
2293 xf_arrays.append(xf)
2294 yf_arrays.append(yf)
2295 value_arrays.append(value)
2296 detector_arrays.append(np.full_like(xf, det, dtype=np.int32))
2297 if include_itl_flag:
2298 itl_arrays.append(is_itl)
2300 xf = np.concatenate(xf_arrays)
2301 yf = np.concatenate(yf_arrays)
2302 value = np.concatenate(value_arrays)
2303 detector = np.concatenate(detector_arrays)
2305 binned = Table(
2306 np.zeros(
2307 len(xf),
2308 dtype=[
2309 ("xf", "f8"),
2310 ("yf", "f8"),
2311 ("value", "f8"),
2312 ("detector", "i4"),
2313 ],
2314 )
2315 )
2316 binned["xf"] = xf
2317 binned["yf"] = yf
2318 binned["value"] = value
2319 binned["detector"] = detector
2321 if include_itl_flag:
2322 binned["itl"] = np.concatenate(itl_arrays).astype(np.bool_)
2324 return binned
2327def bin_flat(ptc, exposure, bin_factor=8, amp_boundary=20, apply_gains=True, gain_ratios=None):
2328 """Bin a flat image, being careful with amplifier edges.
2330 This will optionally apply gains, and apply any gain
2331 ratios.
2333 Parameters
2334 ----------
2335 ptc : `lsst.ip.isr.PhotonTransferCurveDatasets`
2336 PTC dataset with relevant gains.
2337 exposure : `lsst.afw.image.Exposure`
2338 Exposure to bin.
2339 bin_factor : `int`, optional
2340 Binning factor.
2341 amp_boundary : `int`, optional
2342 Boundary around each amp to ignore in binning.
2343 apply_gains : `bool`, optional
2344 Apply gains before binning?
2345 gain_ratios : `np.ndarray`, optional
2346 Array of gain ratios to apply.
2348 Returns
2349 -------
2350 binned : `astropy.table.Table`
2351 Table with detector coordinates at the center of each bin
2352 (``xd``, ``yd``); average image values (``value``); and amplifier
2353 index (``amp_index``).
2354 """
2355 detector = exposure.getDetector()
2357 for i, amp_name in enumerate(ptc.ampNames):
2358 bbox = detector[amp_name].getBBox()
2359 if amp_name in ptc.badAmps:
2360 exposure[bbox].image.array[:, :] = np.nan
2361 continue
2363 if apply_gains:
2364 exposure[bbox].image.array[:, :] *= ptc.gainUnadjusted[amp_name]
2365 if gain_ratios is not None:
2366 exposure[bbox].image.array[:, :] /= gain_ratios[i]
2368 # Next we bin the detector, avoiding amp edges.
2369 xd_arrays = []
2370 yd_arrays = []
2371 value_arrays = []
2372 amp_arrays = []
2374 for i, amp in enumerate(detector):
2375 arr = exposure[amp.getBBox()].image.array
2377 n_step_y = (arr.shape[0] - (2 * amp_boundary)) // bin_factor
2378 y_min = amp_boundary
2379 y_max = bin_factor * n_step_y + y_min
2380 n_step_x = (arr.shape[1] - (2 * amp_boundary)) // bin_factor
2381 x_min = amp_boundary
2382 x_max = bin_factor * n_step_x + x_min
2384 arr = arr[y_min: y_max, x_min: x_max]
2385 binned = arr.reshape((n_step_y, bin_factor, n_step_x, bin_factor))
2386 with warnings.catch_warnings():
2387 warnings.filterwarnings("ignore", r"Mean of empty")
2388 binned = np.nanmean(binned, axis=1)
2389 binned = np.nanmean(binned, axis=2)
2391 xx = np.arange(binned.shape[1]) * bin_factor + bin_factor / 2. + x_min
2392 yy = np.arange(binned.shape[0]) * bin_factor + bin_factor / 2. + y_min
2393 x, y = np.meshgrid(xx, yy)
2394 x = x.ravel() + amp.getBBox().getBeginX()
2395 y = y.ravel() + amp.getBBox().getBeginY()
2396 value = binned.ravel()
2398 xd_arrays.append(x)
2399 yd_arrays.append(y)
2400 value_arrays.append(value)
2401 amp_arrays.append(np.full(len(x), i))
2403 xd = np.concatenate(xd_arrays)
2404 yd = np.concatenate(yd_arrays)
2405 value = np.concatenate(value_arrays)
2406 amp_index = np.concatenate(amp_arrays)
2408 binned = np.zeros(
2409 len(xd),
2410 dtype=[
2411 ("xd", "f8"),
2412 ("yd", "f8"),
2413 ("value", "f8"),
2414 ("amp_index", "i4"),
2415 ],
2416 )
2417 binned["xd"] = xd
2418 binned["yd"] = yd
2419 binned["value"] = value
2420 binned["amp_index"] = amp_index
2422 return Table(data=binned)
2425class ElectrostaticFit():
2426 """
2427 Class to handle the electrostatic fit of area coefficients.
2429 This class manages the fitting of electrostatic model parameters to
2430 measured area change coefficients. The actual electrostatic calculations
2431 are performed by the ElectrostaticCcdGeom class. The fit is performed
2432 using the lmfit Minimizer, and the model includes normalization
2433 parameters (alpha, beta) as well as physical CCD parameters.
2435 Attributes
2436 ----------
2437 fitRange : int
2438 Range of pixels to fit.
2439 inputRange : int
2440 Range of input data.
2441 doFitNormalizationOffset : bool
2442 Whether to fit an offset parameter.
2443 nImageChargePairs : int
2444 Number of image charge pairs used in the electrostatic calculation.
2445 fitMethod : str
2446 Minimization method used by lmfit.
2447 aMatrix : np.ndarray
2448 Measured area change matrix.
2449 aMatrixSigma : np.ndarray
2450 Uncertainty matrix for area changes.
2451 sqrtWeights : np.ndarray
2452 Weights for fitting, inverse of aMatrixSigma.
2453 params : lmfit.Parameters
2454 Fit parameters.
2455 conversionWeights : tuple, (np.ndarray, np.ndarray), optional
2456 Tuple containing depths and their associated probabilities.
2457 """
2459 def __init__(
2460 self,
2461 initialParams,
2462 fitMethod,
2463 aMatrix,
2464 aMatrixSigma,
2465 fitRange,
2466 doFitNormalizationOffset,
2467 nImageChargePairs,
2468 conversionWeights=None,
2469 ):
2470 """
2471 Initialize the ElectrostaticFit class.
2473 Parameters
2474 ----------
2475 initialParams : lmfit.Parameters
2476 Initial fit parameters.
2477 fitMethod : str
2478 Minimization method for lmfit.
2479 aMatrix : np.ndarray
2480 Measured area change matrix.
2481 aMatrixSigma : np.ndarray
2482 Uncertainty matrix for area changes.
2483 fitRange : int
2484 Range of pixels to fit.
2485 doFitNormalizationOffset : bool
2486 Whether to fit an offset parameter.
2487 nImageChargePairs : int
2488 Number of image charge pairs for electrostatic calculation.
2489 conversionWeights : tuple of (np.ndarray, np.ndarray), optional
2490 Tuple containing depths and their associated probabilities.
2491 """
2492 self.fitRange = fitRange
2493 self.inputRange = aMatrix.shape[0]
2494 self.doFitNormalizationOffset = doFitNormalizationOffset
2495 self.nImageChargePairs = nImageChargePairs
2496 self.fitMethod = fitMethod
2498 if self.fitRange != 0 and self.inputRange < self.fitRange:
2499 self.fitRange = self.inputRange
2501 self.aMatrix = aMatrix[0:fitRange, 0:fitRange]
2502 self.aMatrixSigma = aMatrixSigma[0:fitRange, 0:fitRange]
2503 self.sqrtWeights = 1.0 / aMatrixSigma
2505 self.params = initialParams
2507 self.conversionWeights = conversionWeights
2509 def fit(self, max_nfev=20000, epsfcn=1.0e-8, ftol=1.0e-8, xtol=1.0e-8, gtol=0.0):
2510 """Do the fit.
2512 Parameters
2513 ----------
2514 max_nfev : `float`, optional
2515 Max number of function evaluations.
2516 epsfcn : `float`, optional
2517 A variable used in determining a suitable step length for the
2518 forward-difference approximation of the Jacobian.
2519 ftol : `float`, optional
2520 Relative error desired in the sum of squares.
2521 xtol : `float`, optional
2522 Relative error desired in the approximate solution.
2523 gtol : `float`, optional
2524 Orthogonality desired between the function vector and the columns
2525 of the Jacobian.
2526 """
2527 minner = Minimizer(
2528 self.computeWeightedResidual,
2529 self.params,
2530 )
2532 result = minner.minimize(
2533 method=self.fitMethod,
2534 max_nfev=max_nfev,
2535 epsfcn=epsfcn,
2536 ftol=ftol,
2537 xtol=xtol,
2538 gtol=gtol,
2539 )
2541 return result
2543 def getParamsDict(self):
2544 """
2545 Return a copy of the free parameters vector as a dictionary.
2546 """
2547 return self.params.valuesdict()
2549 def computeWeightedResidual(self, params=None):
2550 self.params = params
2551 m = self.model(params, conversionWeights=self.conversionWeights)
2552 w = self.sqrtWeights[:self.fitRange, :self.fitRange]
2553 y = self.aMatrix[:self.fitRange, :self.fitRange]
2554 # Multiply two 2-d arrays term by term:
2555 weightedResiduals = (w * (m - y))
2556 # The result has the same size as both of them.
2557 return weightedResiduals
2559 def model(self, params, conversionWeights=None):
2560 m = self.rawModel(params, conversionWeights=conversionWeights)
2561 alpha = params['alpha']
2562 beta = params['beta']
2563 return alpha * m + beta
2565 def rawModel(self, params, conversionWeights):
2566 # Get all parameters as a dictionary:
2567 parameterDict = self.getParamsDict()
2568 del parameterDict['alpha']
2569 del parameterDict['beta']
2570 del parameterDict['zsh_minus_zq']
2571 del parameterDict['zsv_minus_zq']
2573 # Push them into the electrostatic calculator
2574 c = ElectrostaticCcdGeom(parameterDict)
2576 # Compute the observables:
2577 # If you change the routine called here, also
2578 # change it in BoundaryShifts.__init__()
2579 zf = parameterDict["thickness"]
2580 zsh = parameterDict["zsh"]
2581 zsv = parameterDict["zsv"]
2583 if conversionWeights is None:
2584 conversionWeights = (np.array([0.0]), np.array([1.0]))
2586 m = None
2587 (d, p) = conversionWeights
2589 # Zero depths lower than the end of drift
2590 too_low = ((zf - d) < zsh) | ((zf - d) < zsv)
2591 p[too_low] = 0
2592 p /= p.sum() # Normalize to 1.
2594 # Check if the conversion weights were
2595 # computed properly.
2596 if np.all(p == 0):
2597 # This is bad, and no work to be done.
2598 raise RuntimeError("All conversion depth probabilities are zero. "
2599 "Cannot compute electrostatic solution.")
2600 for (depth, prob) in zip(d, p):
2601 if prob == 0:
2602 continue
2603 if m is None:
2604 m = prob * c.evalAreaChangeSidesFast(
2605 self.fitRange,
2606 nImageChargePairs=self.nImageChargePairs,
2607 zf=zf - depth,
2608 )
2609 else:
2610 m = m + prob * c.evalAreaChangeSidesFast(
2611 self.fitRange,
2612 nImageChargePairs=self.nImageChargePairs,
2613 zf=zf - depth,
2614 )
2616 m = m[:self.fitRange, :self.fitRange]
2618 return m
2620 def normalizeModel(self, m):
2621 """
2622 Compute optimal normalization and offset for the model.
2624 The overall normalization is a linear parameter. This
2625 method computes the optimal value given the other
2626 parameters.
2627 """
2628 fr = m.shape[0]
2629 sqrtw = self.sqrtWeights[:fr, :fr]
2630 w = sqrtw ** 2
2631 y = self.aMatrix[:fr, :fr]
2632 if self.doFitNormalizationOffset:
2633 sxx = (w * m * m).sum()
2634 sx = (w * m).sum()
2635 s1 = w.sum()
2636 sxy = (w * m * y).sum()
2637 sy = (w * y).sum()
2638 d = sxx * s1 - sx * sx
2639 a = (s1 * sxy - sx * sy) / d
2640 b = (-sx * sxy + sxx * sy) / d
2641 else:
2642 # Just scale
2643 a = (w * y * m).sum() / (w * m * m).sum()
2644 b = 0
2645 return a, b
2647 def computePixelDistortions(self, conversionWeights=None):
2648 """
2649 Compute pixel distortions using a probability distribution of
2650 conversion depths.
2652 If provided, conversionWeights is expected to be a tuple of
2653 (depth, probability). The routine computes the model corresponding
2654 to this probability distribution. If conversionWeights is not
2655 provided, then [(0, 1.)] is used as the distribution.
2657 Parameters
2658 ----------
2659 conversionWeights : tuple of (np.ndarray, np.ndarray), optional
2660 Tuple containing depths and their associated probabilities.
2662 Returns
2663 -------
2664 BoundaryShifts
2665 The computed boundary shifts for the pixel.
2666 """
2667 zf = self.params["thickness"].value
2668 zsh = self.params["zsh"].value
2669 zsv = self.params["zsv"].value
2670 if conversionWeights is None:
2671 conversionWeights = (np.array([0.0]), np.array([1.0]))
2673 r = None
2674 (d, p) = conversionWeights
2676 # Zero depths lower than the end of drift
2677 too_low = ((zf - d) < zsh) | ((zf - d) < zsv)
2678 p[too_low] = 0
2679 p /= p.sum() # Normalize to 1.
2680 for (depth, prob) in zip(d, p):
2681 if prob == 0:
2682 continue
2683 if r is None:
2684 r = prob * BoundaryShifts(
2685 electrostaticFit=self,
2686 zf=zf - depth,
2687 )
2688 else:
2689 r = r + prob * BoundaryShifts(
2690 electrostaticFit=self,
2691 zf=zf - depth,
2692 )
2694 return r
2697class BoundaryShifts:
2698 """
2699 Class to compute and store boundary shift values for CCD pixels based on
2700 electrostatic field models.
2702 This class calculates the shifts at the North, South, East, and West
2703 boundaries of each pixel, as well as the area change at the pixel sides,
2704 using parameters from an electrostatic fit and a specified integration
2705 depth `zf`. The shifts are computed using fast integration methods from
2706 the ElectrostaticCcdGeom model.
2708 Attributes
2709 ----------
2710 aN : np.ndarray
2711 Array of North boundary shifts for each pixel.
2712 aS : np.ndarray
2713 Array of South boundary shifts for each pixel.
2714 aE : np.ndarray
2715 Array of East boundary shifts for each pixel.
2716 aW : np.ndarray
2717 Array of West boundary shifts for each pixel.
2718 ath : np.ndarray
2719 Array of area changes at the pixel sides, including beta.
2720 athMinusBeta : np.ndarray
2721 Array of area changes at the pixel sides, excluding beta.
2722 """
2724 def __init__(self, electrostaticFit, zf):
2725 if zf <= 0:
2726 raise RuntimeError(
2727 f"Cannot integrate below the bottom of the pixel, zf={zf}."
2728 )
2730 paramDict = electrostaticFit.getParamsDict()
2731 del paramDict['alpha']
2732 del paramDict['beta']
2733 del paramDict['zsh_minus_zq']
2734 del paramDict['zsv_minus_zq']
2736 c = ElectrostaticCcdGeom(paramDict)
2737 fr = electrostaticFit.fitRange
2738 ii, jj = np.meshgrid(list(range(fr)), list(range(fr)))
2739 ii = ii.flatten()
2740 jj = jj.flatten()
2741 self.aN = np.ndarray((fr, fr))
2742 self.aS = np.zeros_like(self.aN)
2743 self.aE = np.zeros_like(self.aN)
2744 self.aW = np.zeros_like(self.aN)
2745 self.ath = np.zeros_like(self.aN)
2746 alpha = electrostaticFit.params['alpha'].value
2747 beta = electrostaticFit.params['beta'].value
2749 # alpha*rawModel+beta is the description of the measurements
2750 # We should not apply beta to the outcome, because beta is meant to
2751 # accomodate some long-range contamination (non-electrostatic)
2752 # of the covariance measurements.
2753 iMax, jMax = fr, fr
2754 self.aN = -alpha * c.integrateEyFieldFast(iMax, jMax, 1, zf=zf)
2755 self.aS = -alpha * c.integrateEyFieldFast(iMax, jMax, -1, zf=zf)
2756 self.aW = -alpha * c.integrateExFieldFast(iMax, jMax, -1, zf=zf)
2757 self.aE = -alpha * c.integrateExFieldFast(iMax, jMax, 1, zf=zf)
2758 self.ath = alpha * c.evalAreaChangeSidesFast(fr, zf=zf) + beta
2760 self.athMinusBeta = self.ath - beta
2762 def __rmul__(self, factor):
2763 """
2764 """
2765 res = copy.deepcopy(self)
2766 res.aN *= factor
2767 res.aS *= factor
2768 res.aE *= factor
2769 res.aW *= factor
2770 res.ath *= factor
2771 res.athMinusBeta *= factor
2772 return res
2774 def __add__(self, other):
2775 """
2776 """
2777 res = copy.deepcopy(self)
2778 res.aN += other.aN
2779 res.aS += other.aS
2780 res.aE += other.aE
2781 res.aW += other.aW
2782 res.ath += other.ath
2783 res.athMinusBeta += other.athMinusBeta
2784 return res
2787def calcEFieldCoulomb(xv, xqv):
2788 """
2789 xv = where, xqv = charge location.
2790 both should be numpy arrays.
2791 if qv is multi-d, the routine assumes that the
2792 physical coordinates (x,y,z) are patrolled by
2793 the last index
2794 """
2795 d = xv - xqv
2796 r3 = np.power((d**2).sum(axis=-1), 1.5)
2797 # anything more clever ?
2798 # of course d/r3 does not work
2799 return (d.T / r3.T).T
2802class ElectrostaticCcdGeom():
2803 def __init__(self, paramDict):
2804 """
2805 Dictionary of parameters containing the following keys:
2806 parameters : (all in microns)
2807 zq : altitude of the burried channel (microns)
2808 zsh : vertex altitude for horizontal boundaries
2809 zsv : vertex altitude for vertical boundaries
2810 a, b : size of the rectangular charge source
2811 thickness : thickness
2812 pixelsize : pixel size
2813 """
2814 zq = paramDict['zq']
2815 zsh = paramDict['zsh']
2816 zsv = paramDict['zsv']
2817 a = paramDict['a']
2818 b = paramDict['b']
2819 thickness = paramDict['thickness']
2820 pixelsize = paramDict['pixelsize']
2822 self.zq = np.fabs(zq)
2823 self.zsh = np.fabs(zsh)
2824 self.zsv = np.fabs(zsv)
2825 self.b = np.fabs(b)
2826 self.a = np.fabs(a)
2827 self.thickness = np.fabs(thickness)
2828 self.pixelsize = pixelsize
2830 self.nStepsZ = 100
2831 self.nStepsXY = 20
2833 # yields a ~ 1% precision of the field at z~10
2834 # if compared to the uniform sheet model (eField
2835 self.nChargeElements = 3
2837 # Set up weights
2838 x, w = leggauss(self.nStepsXY)
2839 self.integrationWeights = w*0.5
2841 # Abcissa refer to [-1, 1], we want [0, self.pixelsize]
2842 self.xyOffsets = (x+1) * 0.5 * self.pixelsize
2844 def calcEFieldCoulombChargeSheet(self, xv, xqv):
2845 """
2846 xv = where (the last index should address x,y,z.
2847 xqv = charge location
2848 Both Should be numpy arrays.
2849 if xv is multi-d, the routine assumes
2850 that the physical coordinates (x,y,z) are patrolled by the last index.
2851 Returns the electric field from a unitely charged horizontal rectangle
2852 centered at x_q of size 2a * 2b.
2853 The returned electric field assumes 4*pi*epsilon=1.
2854 """
2855 # Use Durand page 244 tome 1
2856 # four corners :
2857 x1 = xqv + np.array([self.a, self.b, 0])
2858 x2 = xqv + np.array([-self.a, self.b, 0])
2859 x3 = xqv + np.array([-self.a, -self.b, 0])
2860 x4 = xqv + np.array([self.a, -self.b, 0])
2862 # Distances to the four corners
2863 d1 = np.sqrt(((xv-x1)**2).sum(axis=-1))
2864 d2 = np.sqrt(((xv-x2)**2).sum(axis=-1))
2865 d3 = np.sqrt(((xv-x3)**2).sum(axis=-1))
2866 d4 = np.sqrt(((xv-x4)**2).sum(axis=-1))
2868 # Reserve the returned array
2869 r = np.ndarray(xv.shape)
2870 xvOffset = xv[..., 0] - xqv[0]
2871 yvOffset = xv[..., 1] - xqv[1]
2873 # Ex
2874 # note : if a or b goes to 0, the log is 0 and the denominator (last
2875 # line) is zero as well. So some expansion would be required
2876 ao = yvOffset + self.b
2877 bo = yvOffset - self.b
2878 co = xvOffset + self.a
2879 do = xvOffset - self.a
2880 # Ex (eq 105)
2881 r[..., 0] = np.log((d4+ao) * (d2+bo) / (d3+ao) / (d1+bo))
2882 # Ey (eq 106)
2883 r[..., 1] = np.log((d2+co) * (d4+do) / (d3+co) / (d1+do))
2884 # point source approximation
2885 # ret[...,2] = (4*self.a*self.b)*calcEFieldCoulomb(X,X_q)[...,2]
2886 # full expression for ez : p 244
2887 # ez (eq 111 is only valid if the x and y are "inside")
2888 # there is a discussion of the general case around Fig VI-18.
2889 z = xv[..., 2] - xqv[2]
2890 # it ressembles equation 111 but I flipped two signs
2891 r[..., 2] = (np.arctan(do * bo / z / d1)
2892 - np.arctan(bo * co / z / d2)
2893 + np.arctan(co * ao / z / d3)
2894 - np.arctan(ao * do / z / d4))
2896 # seems OK both "inside" and "outside"
2897 return r / (4 * self.a * self.b)
2899 def integrateEField(self, xv, component, z0, zf, nImageChargePairs=11):
2900 """
2901 Integrate transverse E Field along Z at point X (2 coordinates, last
2902 index). The coordinate of the field is given by component (0,
2903 or 1). at point X from the point charge The computation uses
2904 the dipole series trick. The number of dipoles is an optional
2905 argument. Odd numbers are better for what we are doing here.
2907 """
2908 # The integral of the field (x/r^3 dz from z1 to z2) reads
2909 # x/rho**2*(z2/r2-z1/r1) with rho2 = x**2+y**2
2910 # x/rho2 does not change when going through image sources
2911 # so we use them as arguments, dz1 and dz2 z{begin,end}--Xq[2]
2912 # just for test: if z0==zf, then return the field value
2913 if z0 != zf:
2914 def integral(rho2, x, dz1, dz2):
2915 r1 = np.sqrt(rho2 + dz1**2)
2916 r2 = np.sqrt(rho2 + dz2**2)
2917 return x * (dz2/r2 - dz1/r1) / rho2
2918 else: # see the comment above: return the value, not the integral.
2919 def integral(rho2, x, dz1, dz2):
2920 """
2921 x/r**3
2922 """
2923 r = np.sqrt(rho2 + dz1**2)
2924 vals = x / r**3
2925 return vals
2927 # Check validity
2928 if component not in [0, 1]:
2929 raise RuntimeError("Can only integrate electric field along component 0 or 1, "
2930 f"not {component}.")
2932 # Reserve the result array
2933 result = np.zeros(xv.shape[:-1])
2934 zqp = self.zq
2935 zqm = -zqp
2937 # 1st dipole
2938 # Generate a set of point charges to emulate
2939 # an extended distribution (size 2a*2b)
2940 xStep = (2 * self.a) / self.nChargeElements
2941 yStep = (2 * self.b) / self.nChargeElements
2942 xqs = -self.a + (np.linspace(0, self.nChargeElements-1, self.nChargeElements) + 0.5) * xStep
2943 yqs = -self.b + (np.linspace(0, self.nChargeElements-1, self.nChargeElements) + 0.5) * yStep
2944 for xq in xqs:
2945 for yq in yqs:
2946 dx = xv[..., 0] - xq
2947 dy = xv[..., 1] - yq
2948 dxv = [dx, dy]
2949 rho2 = dx**2 + dy**2
2950 result += integral(rho2, dxv[component], z0 - zqp, zf - zqp)
2951 # Construct image charge, change sign of z and q
2952 result -= integral(rho2, dxv[component], z0 - zqm, zf - zqm)
2953 result /= (self.nChargeElements * self.nChargeElements)
2955 # Remaining dipoles : no more extended charge
2956 # The (x,y) charge coordinates are 0, and common to all images:
2957 rho2 = xv[..., 0]**2 + xv[..., 1]**2
2958 xi = xv[..., component]
2959 for i in range(1, nImageChargePairs):
2960 odd = i%2
2961 if odd:
2962 ztmp = (2*self.thickness) - zqm
2963 zqm = (2*self.thickness) - zqp
2964 zqp = ztmp
2965 else:
2966 ztmp = -zqm
2967 zqm = -zqp
2968 zqp = ztmp
2969 result += integral(rho2, xi, z0 - zqp, zf - zqp)
2970 result -= integral(rho2, xi, z0 - zqm, zf - zqm)
2972 # 8.85418781e-12 (F/m) * 1e-6 (microns/m) / 1.602e-19 (C/electron)
2973 # = 55.26958
2974 # eps_r_Si = 12, so eps = 55.27*12 = 663.23 el/V/um
2975 # This routine hence returns the field sourced by -1 electron
2976 result *= 1 / (4*np.pi*660)
2978 return result
2980 def eField(self, xv, nImageChargePairs=11):
2981 """
2982 Field at point xv from the point charge
2983 if xv is multi-dimensional, x,y,z should be represented
2984 by the last index ([0:3]).
2985 The computation uses the dipole series trick. The number
2986 of dipoles is an optional argument. Odd numbers are better
2987 for what we are doing here.
2988 """
2989 # Put the center of the source pixel at (x, y) = (0, 0)
2990 # This assumption is relied on in Eval_Eth and Eval_Etv
2991 xqv = np.array([0, 0, self.zq])
2993 # Image charge w.r.t. the parallel clock lines
2994 xqvImage = np.array([xqv[0], xqv[1], -xqv[2]])
2996 # Split the calculation in 2 parts: approximation
2997 # when far from the source, image method when near.
2998 rho = np.sqrt(xv[..., 0]**2 + xv[..., 1]**2)
3000 # Field near the charge cloud
3001 # First dipole
3002 # The points lower than half the thickness of the pixel
3003 # are considered "close"
3004 pointsNearChargeCloudIdxs = (rho/self.thickness < 2)
3005 xvNear = xv[pointsNearChargeCloudIdxs]
3006 eFieldNear = self.calcEFieldCoulombChargeSheet(xvNear, xqv)
3007 eFieldNear -= self.calcEFieldCoulombChargeSheet(xvNear, xqvImage)
3009 # Next dipoles
3010 for i in range(1, nImageChargePairs):
3011 if (i % 2): # Is odd?
3012 xqv[2] = 2 * self.thickness - xqv[2]
3013 xqvImage[2] = 2 * self.thickness - xqvImage[2]
3014 eFieldNear += calcEFieldCoulomb(xvNear, xqvImage)
3015 eFieldNear -= calcEFieldCoulomb(xvNear, xqv)
3016 else:
3017 xqv[2] = -xqv[2]
3018 xqvImage[2] = -xqvImage[2]
3019 eFieldNear += calcEFieldCoulomb(xvNear, xqv)
3020 eFieldNear -= calcEFieldCoulomb(xvNear, xqvImage)
3022 # Field far from the charge cloud
3023 xvFar = xv[~pointsNearChargeCloudIdxs]
3024 rhoFar = rho[~pointsNearChargeCloudIdxs]
3026 # Jon Pumplin, Am. Jour. Phys. 37,7 (1969), eq 7
3027 # When changing coordinate system (shift along z), cos -> sin.
3028 # And since this only applies far from the source, the latter
3029 # can be regarded as point-like.
3030 # I checked the continuity over the separation point.
3031 factor = -np.sin(np.pi * self.zq / self.thickness)
3032 factor *= np.sin(np.pi * xvFar[..., 2] / self.thickness)
3033 factor *= np.exp(-np.pi * rhoFar / self.thickness)
3034 factor *= np.sqrt(8 / rhoFar / self.thickness)
3035 factor *= (-np.pi / self.thickness - 0.5/rhoFar)
3036 eFieldFar = np.zeros_like(xvFar)
3037 eFieldFar[..., 0] = xvFar[..., 0] / (rhoFar*factor)
3038 eFieldFar[..., 1] = xvFar[..., 1] / (rhoFar*factor)
3040 # aggregate the results
3041 eField = np.zeros_like(xv)
3042 eField[pointsNearChargeCloudIdxs] = eFieldNear
3043 eField[~pointsNearChargeCloudIdxs] = eFieldFar
3045 # epsilon0 = 55 el/V/micron
3046 # 8.85418781e-12 (F/m) * 1e-6 (microns/m) / 1.602e-19 (C/electron)
3047 # = 55.26958
3048 # eps_r_Si = 12, so eps = 55.27*12 = 663.23 el/V/um
3049 # This routine hence returns the field sourced by -1 electron
3050 eField *= 1 / (4*np.pi*660)
3052 return eField
3054 def integrateExFieldFast(self, iMax, jMax, leftOrRight, zf=None, nImageChargePairs=11):
3055 """
3056 Computes the integrals of Ex along z from
3057 self.zsh to zf. The returned array is 2d of
3058 shape (iMax, jMax).
3059 """
3060 zf = self.thickness if zf is None else zf
3061 assert zf > self.zsv
3062 ii, jj = np.indices((iMax, jMax))
3063 yy = (jj - 0.5)[:, :, np.newaxis] * self.pixelsize + self.xyOffsets[np.newaxis, np.newaxis, :]
3064 xx = (ii + 0.5 * leftOrRight) * self.pixelsize
3065 xx = np.broadcast_to(xx[..., None], yy.shape)
3066 xv = np.stack([xx, yy], axis=-1)
3068 # By definition of zsh, we integrate from zsv to zf,
3069 # and divide by the pixel size to be consistent with Eval_ET{v,h}
3070 w = np.broadcast_to(self.integrationWeights, yy.shape)
3071 integral = np.sum(
3072 w * self.integrateEField(xv, 0, self.zsv, zf,
3073 nImageChargePairs=nImageChargePairs),
3074 axis=2,
3075 )
3076 return leftOrRight * integral/self.pixelsize
3078 def integrateEyFieldFast(self, iMax, jMax, topOrBottom, zf=None, nImageChargePairs=11):
3079 """
3080 Computes the integral of Ey along z from
3081 self.zsh to zf. The returned array is
3082 2d [0:iMax, 0:jMax].
3083 """
3084 zf = self.thickness if zf is None else zf
3085 # assert zf > self.zsh
3086 ii, jj = np.indices((iMax, jMax))
3087 xx = (ii - 0.5)[:, :, np.newaxis] * self.pixelsize + self.xyOffsets[np.newaxis, np.newaxis, :]
3088 yy = (jj + 0.5*topOrBottom) * self.pixelsize
3089 yy = np.broadcast_to(yy[..., None], xx.shape)
3090 xv = np.stack([xx, yy], axis=-1)
3092 # By definition of zsh, we integrate from zsh to zf,
3093 # and divide by the pixel size to be consistent with Eval_ET{v,h}
3094 # integrate
3095 w = np.broadcast_to(self.integrationWeights, yy.shape) # Add leading dimensions
3096 integral = np.sum(
3097 w * self.integrateEField(xv, 1, self.zsh, zf,
3098 nImageChargePairs=nImageChargePairs),
3099 axis=2,
3100 )
3101 return topOrBottom * integral / self.pixelsize
3103 def eFieldTransverseToHorizontalPixelEdgeGrid(self, i, j, topOrBottom, zf=None):
3104 """
3105 Returns the field transverse to the horizontal pixel boundary.
3106 return a 2d array of shifts at evenly spaced points in x and z.
3107 normalized in units of pixel size, for a unit charge.
3108 The returned array has 3 indices:
3109 [along the pixel side, along the drift,E-field coordinate].
3110 The resutl is multiplied by the z- and x- steps, so that the
3111 sum is the averge over x, divided by the pixel size.
3112 """
3113 assert np.abs(topOrBottom) == 1
3114 zf = self.thickness if zf is None else zf
3116 # by definition of zsh, we integrate from zsh to zf:
3117 zStep = (zf - self.zsh) / self.nStepsZ
3118 xyStep = self.pixelsize / self.nStepsXY
3120 z = self.zsh + (np.linspace(0, self.nStepsZ - 1, num=self.nStepsZ) + 0.5) * zStep
3121 x = (i - 0.5)*self.pixelsize + (np.linspace(0, self.nStepsXY - 1, self.nStepsXY) + 0.5) * xyStep
3123 xx, zz = np.meshgrid(x, z)
3124 yy = np.ones(xx.shape) * (j + 0.5*topOrBottom) * self.pixelsize
3125 xv = np.array([xx, yy, zz]).T
3127 return self.eField(xv) * zStep * xyStep
3129 def averageHorizontalBoundaryShift(self, i, j, topOrBottom, zf=None):
3130 """
3131 Integrate the field transverse to the horizontal pixel boundary
3132 """
3133 sum = self.eFieldTransverseToHorizontalPixelEdgeGrid(i, j, topOrBottom, zf)[..., 1].sum()
3134 # we want the integral over z and the average over x,
3135 # divided by the pixel size, with a sign that defines
3136 # if in moves inside or outside. Here is it:
3137 return sum * (topOrBottom / (self.pixelsize**2))
3139 def corner_shift_h2(self, i, j, topOrBottom, zf=None): # Do we even need this??
3140 E = self.eFieldTransverseToHorizontalPixelEdgeGrid(i, j, topOrBottom, zf)[..., 1]
3141 # Integrate over z (multiplication by the
3142 # step done in the calling routine)
3143 intz = E.sum(axis=1)
3144 x = range(intz.shape[0])
3145 p = np.polyfit(x, intz, 1)
3146 return p[0] * -0.5 + p[1], p[0] * (x[-1] + 0.5) + p[1]
3148 def eFieldTransverseToVerticalPixelEdgeGrid(self, i, j, leftOrRight, zf=None):
3149 """
3150 Returns the field transverse to the vertical pixel boundary.
3151 return a 3d array of shifts at evenly spaced points in y and z.
3152 Ex,y,z is indexed by the last index.
3153 """
3154 # Determine the depth of the photon conversion
3155 zf = self.thickness if zf is None else zf
3157 # The source charge is at x,y=0
3158 zStep = (zf-self.zsv) / (self.nStepsZ)
3159 xyStep = self.pixelsize / (self.nStepsXY)
3161 z = self.zsv + (np.linspace(0, self.nStepsZ - 1, self.nStepsZ) + 0.5) * zStep
3162 y = (j - 0.5)*self.pixelsize + (np.linspace(0, self.nStepsXY - 1, self.nStepsXY) + 0.5) * xyStep
3164 yy, zz = np.meshgrid(y, z)
3165 xx = np.ones(yy.shape) * (i + 0.5*leftOrRight) * self.pixelsize
3166 xv = np.array([xx, yy, zz]).T
3168 return self.eField(xv) * zStep * xyStep
3170 def averageVerticalBoundaryShift(self, i, j, leftOrRight, zf=None):
3171 """
3172 Average shift of the vertical boundary of pixel (i j).
3173 """
3174 sum = self.eFieldTransverseToVerticalPixelEdgeGrid(i, j, leftOrRight, zf)[..., 0].sum()
3175 # we want the integral over z and the average over x,
3176 # divided by the pixel size, with a sign that defines
3177 # if in moves inside or outside. Here is it:
3178 return sum * (leftOrRight / (self.pixelsize**2))
3180 def dxdy(self, iMax, zf=None):
3181 """
3182 corner shifts calculations. The returned array are larger by 1
3183 than iMax, because there are more corners than pixels
3184 """
3185 horiztonalShifts = np.ndarray((iMax, iMax))
3186 verticalShifts = np.ndarray((iMax, iMax))
3187 for i in range(iMax):
3188 for j in range(iMax):
3189 horiztonalShifts[i, j] = self.averageHorizontalBoundaryShift(i + 0.5, j, 1, zf)
3190 verticalShifts[i, j] = self.averageVerticalBoundaryShift(i, j + 0.5, 1, zf)
3192 # Parametrize the corner shifts of iMax pixels: iMax+1
3193 # corners in each direction
3194 dx = np.zeros((iMax+1, iMax+1))
3195 dy = dx + 0.
3196 dx[1:, 1:] = verticalShifts
3197 dx[0, 1:] = -verticalShifts[0, :] # leftmost column
3198 dx[:, 0] = dx[:, 1] # bottom row
3199 dy[1:, 1:] = horiztonalShifts
3200 dy[1:, 0] = -horiztonalShifts[:, 0] # bottom row
3201 dy[0, :] = dy[1, :] # leftmost column
3203 return dx, dy
3205 def evalAreaChangeCorners(self, iMax, zf=None):
3206 """
3207 pixel area alterations computed through corner shifts
3208 """
3209 dx, dy = self.dxdy(iMax, zf)
3210 areaChange = dx[1:, 1:] - dx[:-1, 1:] + dx[1:, :-1] - dx[:-1, :-1]
3211 areaChange += dy[1:, 1:] - dy[1:, :-1] + dy[:-1, 1:] - dy[:-1, :-1]
3212 return -0.5*areaChange
3214 def evalAreaChangeSidesFast(self, iMax, zf=None, nImageChargePairs=11):
3215 """
3216 Same as EvalAreaChangeSides, but uses direct integration.
3217 it evaluates the divergence of the discrete boundary
3218 displacement field.
3219 This routine groups the calls to the field computing routines.
3220 """
3221 horiztonalShifts = np.ndarray((iMax, iMax+1))
3222 verticalShifts = np.zeros_like(horiztonalShifts.T)
3223 horiztonalShifts[:, 1:] = self.integrateEyFieldFast(iMax, iMax, 1, zf,
3224 nImageChargePairs=nImageChargePairs)
3225 verticalShifts[1:, :] = self.integrateExFieldFast(iMax, iMax, 1, zf,
3226 nImageChargePairs=nImageChargePairs)
3228 # special case for [0,j] and [i,0] (they have two opposite values)
3229 horiztonalShifts[:, 0] = -horiztonalShifts[:, 1]
3230 verticalShifts[0, :] = -verticalShifts[1, :]
3232 # the divergence
3233 areaChange = verticalShifts[1:, :]
3234 areaChange -= verticalShifts[:-1, :]
3235 areaChange += horiztonalShifts[:, 1:]
3236 areaChange -= horiztonalShifts[:, :-1]
3237 areaChange *= -1 # Pixel area is decreased
3239 return areaChange