Coverage for python / lsst / cp / pipe / utils.py: 7%
1110 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 00:22 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 00:22 +0000
1# This file is part of cp_pipe.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
21#
23__all__ = [
24 "ddict2dict",
25 "CovFastFourierTransform",
26 "getReadNoise",
27 "ampOffsetGainRatioFixup",
28 "ElectrostaticFit",
29 "BoundaryShifts",
30 "ElectrostaticCcdGeom",
31]
33from astropy.table import Table
34import copy
35import galsim
36import logging
37import numpy as np
38import itertools
39import numpy.polynomial.polynomial as poly
40import warnings
42from scipy.interpolate import Akima1DInterpolator
43from scipy.optimize import leastsq, minimize
44from scipy.stats import median_abs_deviation, norm
45from lmfit import Minimizer
47from lsst.ip.isr import isrMock
48import lsst.afw.cameraGeom
49import lsst.afw.image
50import lsst.afw.math
52from numpy.polynomial.legendre import leggauss
55def sigmaClipCorrection(nSigClip):
56 """Correct measured sigma to account for clipping.
58 If we clip our input data and then measure sigma, then the
59 measured sigma is smaller than the true value because real
60 points beyond the clip threshold have been removed. This is a
61 small (1.5% at nSigClip=3) effect when nSigClip >~ 3, but the
62 default parameters for measure crosstalk use nSigClip=2.0.
63 This causes the measured sigma to be about 15% smaller than
64 real. This formula corrects the issue, for the symmetric case
65 (upper clip threshold equal to lower clip threshold).
67 Parameters
68 ----------
69 nSigClip : `float`
70 Number of sigma the measurement was clipped by.
72 Returns
73 -------
74 scaleFactor : `float`
75 Scale factor to increase the measured sigma by.
76 """
77 varFactor = 1.0 - (2 * nSigClip * norm.pdf(nSigClip)) / (norm.cdf(nSigClip) - norm.cdf(-nSigClip))
78 return 1.0 / np.sqrt(varFactor)
81def calculateWeightedReducedChi2(measured, model, weightsMeasured, nData, nParsModel):
82 """Calculate weighted reduced chi2.
84 Parameters
85 ----------
86 measured : `list`
87 List with measured data.
88 model : `list`
89 List with modeled data.
90 weightsMeasured : `list`
91 List with weights for the measured data.
92 nData : `int`
93 Number of data points.
94 nParsModel : `int`
95 Number of parameters in the model.
97 Returns
98 -------
99 redWeightedChi2 : `float`
100 Reduced weighted chi2.
101 """
102 wRes = (measured - model)*weightsMeasured
103 return ((wRes*wRes).sum())/(nData-nParsModel)
106def makeMockFlats(expTime, gain=1.0, readNoiseElectrons=5, fluxElectrons=1000,
107 randomSeedFlat1=1984, randomSeedFlat2=666, powerLawBfParams=[],
108 expId1=0, expId2=1, ampNames=[]):
109 """Create a pair or mock flats with isrMock.
111 Parameters
112 ----------
113 expTime : `float`
114 Exposure time of the flats.
115 gain : `float`, optional
116 Gain, in e/ADU.
117 readNoiseElectrons : `float`, optional
118 Read noise rms, in electrons.
119 fluxElectrons : `float`, optional
120 Flux of flats, in electrons per second.
121 randomSeedFlat1 : `int`, optional
122 Random seed for the normal distrubutions for the mean signal
123 and noise (flat1).
124 randomSeedFlat2 : `int`, optional
125 Random seed for the normal distrubutions for the mean signal
126 and noise (flat2).
127 powerLawBfParams : `list`, optional
128 Parameters for `galsim.cdmodel.PowerLawCD` to simulate the
129 brightter-fatter effect.
130 expId1 : `int`, optional
131 Exposure ID for first flat.
132 expId2 : `int`, optional
133 Exposure ID for second flat.
134 ampNames : `list` [`str`], optional
135 Names of amplifiers for filling in header information.
137 Returns
138 -------
139 flatExp1 : `lsst.afw.image.exposure.ExposureF`
140 First exposure of flat field pair.
141 flatExp2 : `lsst.afw.image.exposure.ExposureF`
142 Second exposure of flat field pair.
144 Notes
145 -----
146 The parameters of `galsim.cdmodel.PowerLawCD` are `n, r0, t0, rx,
147 tx, r, t, alpha`. For more information about their meaning, see
148 the Galsim documentation
149 https://galsim-developers.github.io/GalSim/_build/html/_modules/galsim/cdmodel.html # noqa: W505
150 and Gruen+15 (1501.02802).
152 Example: galsim.cdmodel.PowerLawCD(8, 1.1e-7, 1.1e-7, 1.0e-8,
153 1.0e-8, 1.0e-9, 1.0e-9, 2.0)
154 """
155 flatFlux = fluxElectrons # e/s
156 flatMean = flatFlux*expTime # e
157 readNoise = readNoiseElectrons # e
159 mockImageConfig = isrMock.IsrMock.ConfigClass()
161 mockImageConfig.flatDrop = 0.99999
162 mockImageConfig.isTrimmed = True
164 flatExp1 = isrMock.FlatMock(config=mockImageConfig).run()
165 flatExp2 = flatExp1.clone()
166 (shapeY, shapeX) = flatExp1.getDimensions()
167 flatWidth = np.sqrt(flatMean)
169 rng1 = np.random.RandomState(randomSeedFlat1)
170 flatData1 = rng1.normal(flatMean, flatWidth, (shapeX, shapeY)) + rng1.normal(0.0, readNoise,
171 (shapeX, shapeY))
172 rng2 = np.random.RandomState(randomSeedFlat2)
173 flatData2 = rng2.normal(flatMean, flatWidth, (shapeX, shapeY)) + rng2.normal(0.0, readNoise,
174 (shapeX, shapeY))
175 # Simulate BF with power law model in galsim
176 if len(powerLawBfParams):
177 if not len(powerLawBfParams) == 8:
178 raise RuntimeError("Wrong number of parameters for `galsim.cdmodel.PowerLawCD`. "
179 f"Expected 8; passed {len(powerLawBfParams)}.")
180 cd = galsim.cdmodel.PowerLawCD(*powerLawBfParams)
181 tempFlatData1 = galsim.Image(flatData1)
182 temp2FlatData1 = cd.applyForward(tempFlatData1)
184 tempFlatData2 = galsim.Image(flatData2)
185 temp2FlatData2 = cd.applyForward(tempFlatData2)
187 flatExp1.image.array[:] = temp2FlatData1.array/gain # ADU
188 flatExp2.image.array[:] = temp2FlatData2.array/gain # ADU
189 else:
190 flatExp1.image.array[:] = flatData1/gain # ADU
191 flatExp2.image.array[:] = flatData2/gain # ADU
193 visitInfoExp1 = lsst.afw.image.VisitInfo(exposureTime=expTime)
194 visitInfoExp2 = lsst.afw.image.VisitInfo(exposureTime=expTime)
196 flatExp1.info.id = expId1
197 flatExp1.getInfo().setVisitInfo(visitInfoExp1)
198 flatExp2.info.id = expId2
199 flatExp2.getInfo().setVisitInfo(visitInfoExp2)
201 # Set ISR metadata.
202 flatExp1.metadata["LSST ISR UNITS"] = "adu"
203 flatExp2.metadata["LSST ISR UNITS"] = "adu"
204 for ampName in ampNames:
205 key = f"LSST ISR OVERSCAN RESIDUAL SERIAL STDEV {ampName}"
206 value = readNoiseElectrons / gain
208 flatExp1.metadata[key] = value
209 flatExp2.metadata[key] = value
211 key = f"LSST ISR OVERSCAN SERIAL MEDIAN {ampName}"
212 flatExp1.metadata[key] = 25000.0 # adu
213 flatExp2.metadata[key] = 25000.0 # adu
215 key = f"LSST ISR AMPOFFSET PEDESTAL {ampName}"
216 value = 0.0
218 flatExp1.metadata[key] = value
219 flatExp2.metadata[key] = value
221 return flatExp1, flatExp2
224def irlsFit(initialParams, dataX, dataY, function, weightsY=None, weightType='Cauchy', scaleResidual=True):
225 """Iteratively reweighted least squares fit.
227 This uses the `lsst.cp.pipe.utils.fitLeastSq`, but applies weights
228 based on the Cauchy distribution by default. Other weight options
229 are implemented. See e.g. Holland and Welsch, 1977,
230 doi:10.1080/03610927708827533
232 Parameters
233 ----------
234 initialParams : `list` [`float`]
235 Starting parameters.
236 dataX : `numpy.array`, (N,)
237 Abscissa data.
238 dataY : `numpy.array`, (N,)
239 Ordinate data.
240 function : callable
241 Function to fit.
242 weightsY : `numpy.array`, (N,)
243 Weights to apply to the data.
244 weightType : `str`, optional
245 Type of weighting to use. One of Cauchy, Anderson, bisquare,
246 box, Welsch, Huber, logistic, or Fair.
247 scaleResidual : `bool`, optional
248 If true, the residual is scaled by the sqrt of the Y values.
250 Returns
251 -------
252 polyFit : `list` [`float`]
253 Final best fit parameters.
254 polyFitErr : `list` [`float`]
255 Final errors on fit parameters.
256 chiSq : `float`
257 Reduced chi squared.
258 weightsY : `list` [`float`]
259 Final weights used for each point.
261 Raises
262 ------
263 RuntimeError :
264 Raised if an unknown weightType string is passed.
265 """
266 if not weightsY:
267 weightsY = np.ones_like(dataX)
269 polyFit, polyFitErr, chiSq = fitLeastSq(initialParams, dataX, dataY, function, weightsY=weightsY)
270 for iteration in range(10):
271 resid = np.abs(dataY - function(polyFit, dataX))
272 if scaleResidual:
273 resid = resid / np.sqrt(dataY)
274 if weightType == 'Cauchy':
275 # Use Cauchy weighting. This is a soft weight.
276 # At [2, 3, 5, 10] sigma, weights are [.59, .39, .19, .05].
277 Z = resid / 2.385
278 weightsY = 1.0 / (1.0 + np.square(Z))
279 elif weightType == 'Anderson':
280 # Anderson+1972 weighting. This is a hard weight.
281 # At [2, 3, 5, 10] sigma, weights are [.67, .35, 0.0, 0.0].
282 Z = resid / (1.339 * np.pi)
283 weightsY = np.where(Z < 1.0, np.sinc(Z), 0.0)
284 elif weightType == 'bisquare':
285 # Beaton and Tukey (1974) biweight. This is a hard weight.
286 # At [2, 3, 5, 10] sigma, weights are [.81, .59, 0.0, 0.0].
287 Z = resid / 4.685
288 weightsY = np.where(Z < 1.0, 1.0 - np.square(Z), 0.0)
289 elif weightType == 'box':
290 # Hinich and Talwar (1975). This is a hard weight.
291 # At [2, 3, 5, 10] sigma, weights are [1.0, 0.0, 0.0, 0.0].
292 weightsY = np.where(resid < 2.795, 1.0, 0.0)
293 elif weightType == 'Welsch':
294 # Dennis and Welsch (1976). This is a hard weight.
295 # At [2, 3, 5, 10] sigma, weights are [.64, .36, .06, 1e-5].
296 Z = resid / 2.985
297 weightsY = np.exp(-1.0 * np.square(Z))
298 elif weightType == 'Huber':
299 # Huber (1964) weighting. This is a soft weight.
300 # At [2, 3, 5, 10] sigma, weights are [.67, .45, .27, .13].
301 Z = resid / 1.345
302 weightsY = np.where(Z < 1.0, 1.0, 1 / Z)
303 elif weightType == 'logistic':
304 # Logistic weighting. This is a soft weight.
305 # At [2, 3, 5, 10] sigma, weights are [.56, .40, .24, .12].
306 Z = resid / 1.205
307 weightsY = np.tanh(Z) / Z
308 elif weightType == 'Fair':
309 # Fair (1974) weighting. This is a soft weight.
310 # At [2, 3, 5, 10] sigma, weights are [.41, .32, .22, .12].
311 Z = resid / 1.4
312 weightsY = (1.0 / (1.0 + (Z)))
313 else:
314 raise RuntimeError(f"Unknown weighting type: {weightType}")
315 polyFit, polyFitErr, chiSq = fitLeastSq(initialParams, dataX, dataY, function, weightsY=weightsY)
317 return polyFit, polyFitErr, chiSq, weightsY
320def fitLeastSq(initialParams, dataX, dataY, function, weightsY=None):
321 """Do a fit and estimate the parameter errors using using
322 scipy.optimize.leastq.
324 optimize.leastsq returns the fractional covariance matrix. To
325 estimate the standard deviation of the fit parameters, multiply
326 the entries of this matrix by the unweighted reduced chi squared
327 and take the square root of the diagonal elements.
329 Parameters
330 ----------
331 initialParams : `list` [`float`]
332 initial values for fit parameters.
333 dataX : `numpy.array`, (N,)
334 Data in the abscissa axis.
335 dataY : `numpy.array`, (N,)
336 Data in the ordinate axis.
337 function : callable object (function)
338 Function to fit the data with.
339 weightsY : `numpy.array`, (N,)
340 Weights of the data in the ordinate axis.
342 Return
343 ------
344 pFitSingleLeastSquares : `list` [`float`]
345 List with fitted parameters.
346 pErrSingleLeastSquares : `list` [`float`]
347 List with errors for fitted parameters.
349 reducedChiSqSingleLeastSquares : `float`
350 Reduced chi squared, unweighted if weightsY is not provided.
351 """
352 if weightsY is None:
353 weightsY = np.ones(len(dataX))
355 def errFunc(p, x, y, weightsY=None):
356 if weightsY is None:
357 weightsY = np.ones(len(x))
358 return (function(p, x) - y)*weightsY
360 pFit, pCov, infoDict, errMessage, success = leastsq(errFunc, initialParams,
361 args=(dataX, dataY, weightsY), full_output=1,
362 epsfcn=0.0001)
364 if (len(dataY) > len(initialParams)) and pCov is not None:
365 reducedChiSq = calculateWeightedReducedChi2(dataY, function(pFit, dataX), weightsY, len(dataY),
366 len(initialParams))
367 pCov *= reducedChiSq
368 else:
369 pCov = np.zeros((len(initialParams), len(initialParams)))
370 pCov[:, :] = np.nan
371 reducedChiSq = np.nan
373 errorVec = []
374 for i in range(len(pFit)):
375 errorVec.append(np.fabs(pCov[i][i])**0.5)
377 pFitSingleLeastSquares = pFit
378 pErrSingleLeastSquares = np.array(errorVec)
380 return pFitSingleLeastSquares, pErrSingleLeastSquares, reducedChiSq
383def fitBootstrap(initialParams, dataX, dataY, function, weightsY=None, confidenceSigma=1.):
384 """Do a fit using least squares and bootstrap to estimate parameter errors.
386 The bootstrap error bars are calculated by fitting 100 random data sets.
388 Parameters
389 ----------
390 initialParams : `list` [`float`]
391 initial values for fit parameters.
392 dataX : `numpy.array`, (N,)
393 Data in the abscissa axis.
394 dataY : `numpy.array`, (N,)
395 Data in the ordinate axis.
396 function : callable object (function)
397 Function to fit the data with.
398 weightsY : `numpy.array`, (N,), optional.
399 Weights of the data in the ordinate axis.
400 confidenceSigma : `float`, optional.
401 Number of sigmas that determine confidence interval for the
402 bootstrap errors.
404 Return
405 ------
406 pFitBootstrap : `list` [`float`]
407 List with fitted parameters.
408 pErrBootstrap : `list` [`float`]
409 List with errors for fitted parameters.
410 reducedChiSqBootstrap : `float`
411 Reduced chi squared, unweighted if weightsY is not provided.
412 """
413 if weightsY is None:
414 weightsY = np.ones(len(dataX))
416 def errFunc(p, x, y, weightsY):
417 if weightsY is None:
418 weightsY = np.ones(len(x))
419 return (function(p, x) - y)*weightsY
421 # Fit first time
422 pFit, _ = leastsq(errFunc, initialParams, args=(dataX, dataY, weightsY), full_output=0)
424 # Get the stdev of the residuals
425 residuals = errFunc(pFit, dataX, dataY, weightsY)
426 # 100 random data sets are generated and fitted
427 pars = []
428 for i in range(100):
429 randomDelta = np.random.normal(0., np.fabs(residuals), len(dataY))
430 randomDataY = dataY + randomDelta
431 randomFit, _ = leastsq(errFunc, initialParams,
432 args=(dataX, randomDataY, weightsY), full_output=0)
433 pars.append(randomFit)
434 pars = np.array(pars)
435 meanPfit = np.mean(pars, 0)
437 # confidence interval for parameter estimates
438 errPfit = confidenceSigma*np.std(pars, 0)
439 pFitBootstrap = meanPfit
440 pErrBootstrap = errPfit
442 reducedChiSq = calculateWeightedReducedChi2(dataY, function(pFitBootstrap, dataX), weightsY, len(dataY),
443 len(initialParams))
444 return pFitBootstrap, pErrBootstrap, reducedChiSq
447def funcPolynomial(pars, x):
448 """Polynomial function definition
449 Parameters
450 ----------
451 params : `list`
452 Polynomial coefficients. Its length determines the polynomial order.
454 x : `numpy.array`, (N,)
455 Abscisa array.
457 Returns
458 -------
459 y : `numpy.array`, (N,)
460 Ordinate array after evaluating polynomial of order
461 len(pars)-1 at `x`.
462 """
463 return poly.polyval(x, [*pars])
466def funcAstier(pars, x):
467 """Single brighter-fatter parameter model for PTC; Equation 16 of
468 Astier+19.
470 Model:
472 C_{00}(mu) = frac{1}{2 a_{00} g**2} * [exp(2 a_{00} mu g ) - 1]
473 + n_{00} / g**2
475 Parameters
476 ----------
477 params : `list`
478 Parameters of the model: a00 (brightter-fatter), gain (e/ADU),
479 and noise (e^2).
480 x : `numpy.array`, (N,)
481 Signal mu (ADU).
483 Returns
484 -------
485 y : `numpy.array`, (N,)
486 C_00 (variance) in ADU^2.
487 """
488 a00, gain, noiseSquared = pars
489 return 0.5/(a00*gain*gain)*(np.exp(2*a00*x*gain)-1) + noiseSquared/(gain*gain) # C_00
492def ptcRolloffModel(params, mu):
493 """Piece-wise exponential saturation roll-off model of the PTC.
495 Parameters
496 ----------
497 params : `list`
498 Parameters of the model: muTurnoff (adu), tau (rolloff sharpness).
499 mu : `numpy.array`, (N,)
500 Signal mu (ADU).
502 Returns
503 -------
504 y : `numpy.array`, (N,)
505 Difference in variance in ADU^2.
506 """
507 muTurnoff, tau = params
508 return np.where(mu < muTurnoff, 0, np.exp(-(mu - muTurnoff) / tau) - 1)
511def funcAstierWithRolloff(pars, x):
512 """Single brighter-fatter parameter model for PTC; Equation 16 of
513 Astier+19 with an piece-wise exponential model for the PTC roll-off
514 of the PTC caused by saturation.
516 The nominal turnoff is calculated beforehand, and we extend the PTC
517 fit to include signal values up to 5% above the nominally computed
518 turnoff.
520 Model:
522 C_{00}(mu) = funcAstier - np.where(
523 x < muTurnoff,
524 0,
525 np.exp(-(x - muTurnoff) / tau) - 1,
526 )
528 Parameters
529 ----------
530 params : `list`
531 Parameters of the model: a00 (brightter-fatter), gain (e/ADU),
532 and noise (e^2), muTurnoff (adu), tau (rolloff sharpness).
533 x : `numpy.array`, (N,)
534 Signal mu (ADU).
536 Returns
537 -------
538 y : `numpy.array`, (N,)
539 C_00 (variance) in ADU^2.
540 """
541 # Initial computation of Astier+19 (Eqn 19).
542 originalModelPars = pars[:-2]
543 ptcRolloffModelPars = pars[-2:]
544 model = funcAstier(originalModelPars, x)
546 return model - ptcRolloffModel(ptcRolloffModelPars, x) # C_00
549def arrangeFlatsByExpTime(exposureList, exposureIdList, log=None):
550 """Arrange exposures by exposure time.
552 Parameters
553 ----------
554 exposureList : `list` [`lsst.pipe.base.connections.DeferredDatasetRef`]
555 Input list of exposure references.
556 exposureIdList : `list` [`int`]
557 List of exposure ids as obtained by dataId[`exposure`].
558 log : `lsst.utils.logging.LsstLogAdapter`, optional
559 Log object.
561 Returns
562 ------
563 flatsAtExpTime : `dict` [`float`,
564 `list`[(`lsst.pipe.base.connections.DeferredDatasetRef`,
565 `int`)]]
566 Dictionary that groups references to flat-field exposures
567 (and their IDs) that have the same exposure time (seconds).
568 """
569 flatsAtExpTime = {}
570 assert len(exposureList) == len(exposureIdList), "Different lengths for exp. list and exp. ID lists"
571 for expRef, expId in zip(exposureList, exposureIdList):
572 expTime = expRef.get(component='visitInfo').exposureTime
573 if not np.isfinite(expTime) and log is not None:
574 log.warning("Exposure %d has non-finite exposure time.", expId)
575 listAtExpTime = flatsAtExpTime.setdefault(expTime, [])
576 listAtExpTime.append((expRef, expId))
578 return flatsAtExpTime
581def arrangeFlatsByExpFlux(exposureList, exposureIdList, fluxKeyword, log=None):
582 """Arrange exposures by exposure flux.
584 Parameters
585 ----------
586 exposureList : `list` [`lsst.pipe.base.connections.DeferredDatasetRef`]
587 Input list of exposure references.
588 exposureIdList : `list` [`int`]
589 List of exposure ids as obtained by dataId[`exposure`].
590 fluxKeyword : `str`
591 Header keyword that contains the flux per exposure.
592 log : `lsst.utils.logging.LsstLogAdapter`, optional
593 Log object.
595 Returns
596 -------
597 flatsAtFlux : `dict` [`float`,
598 `list`[(`lsst.pipe.base.connections.DeferredDatasetRef`,
599 `int`)]]
600 Dictionary that groups references to flat-field exposures
601 (and their IDs) that have the same flux.
602 """
603 flatsAtExpFlux = {}
604 assert len(exposureList) == len(exposureIdList), "Different lengths for exp. list and exp. ID lists"
605 for expRef, expId in zip(exposureList, exposureIdList):
606 # Get flux from header, assuming it is in the metadata.
607 try:
608 expFlux = expRef.get().getMetadata()[fluxKeyword]
609 except KeyError:
610 # If it's missing from the header, continue; it will
611 # be caught and rejected when pairing exposures.
612 expFlux = None
613 if expFlux is None:
614 if log is not None:
615 log.warning("Exposure %d does not have valid header keyword %s.", expId, fluxKeyword)
616 expFlux = np.nan
617 listAtExpFlux = flatsAtExpFlux.setdefault(expFlux, [])
618 listAtExpFlux.append((expRef, expId))
620 return flatsAtExpFlux
623def arrangeFlatsByExpId(exposureList, exposureIdList):
624 """Arrange exposures by exposure ID.
626 There is no guarantee that this will properly group exposures,
627 but allows a sequence of flats that have different
628 illumination (despite having the same exposure time) to be
629 processed.
631 Parameters
632 ----------
633 exposureList : `list`[`lsst.pipe.base.connections.DeferredDatasetRef`]
634 Input list of exposure references.
635 exposureIdList : `list`[`int`]
636 List of exposure ids as obtained by dataId[`exposure`].
638 Returns
639 ------
640 flatsAtExpId : `dict` [`float`,
641 `list`[(`lsst.pipe.base.connections.DeferredDatasetRef`,
642 `int`)]]
643 Dictionary that groups references to flat-field exposures (and their
644 IDs) sequentially by their exposure id.
646 Notes
647 -----
649 This algorithm sorts the input exposure references by their exposure
650 id, and then assigns each pair of exposure references (exp_j, exp_{j+1})
651 to pair k, such that 2*k = j, where j is the python index of one of the
652 exposure references (starting from zero). By checking for the IndexError
653 while appending, we can ensure that there will only ever be fully
654 populated pairs.
655 """
656 flatsAtExpId = {}
657 assert len(exposureList) == len(exposureIdList), "Different lengths for exp. list and exp. ID lists"
658 # Sort exposures by expIds, which are in the second list `exposureIdList`.
659 sortedExposures = sorted(zip(exposureList, exposureIdList), key=lambda pair: pair[1])
661 for jPair, expTuple in enumerate(sortedExposures):
662 if (jPair + 1) % 2:
663 kPair = jPair // 2
664 listAtExpId = flatsAtExpId.setdefault(kPair, [])
665 try:
666 listAtExpId.append(expTuple)
667 listAtExpId.append(sortedExposures[jPair + 1])
668 except IndexError:
669 pass
671 return flatsAtExpId
674def extractCalibDate(calib):
675 """Extract common calibration metadata values that will be written to
676 output header.
678 Parameters
679 ----------
680 calib : `lsst.afw.image.Exposure` or `lsst.ip.isr.IsrCalib`
681 Calibration to pull date information from.
683 Returns
684 -------
685 dateString : `str`
686 Calibration creation date string to add to header.
687 """
688 if hasattr(calib, "getMetadata"):
689 if 'CALIB_CREATION_DATE' in calib.getMetadata():
690 return " ".join((calib.getMetadata().get("CALIB_CREATION_DATE", "Unknown"),
691 calib.getMetadata().get("CALIB_CREATION_TIME", "Unknown")))
692 else:
693 return " ".join((calib.getMetadata().get("CALIB_CREATE_DATE", "Unknown"),
694 calib.getMetadata().get("CALIB_CREATE_TIME", "Unknown")))
695 else:
696 return "Unknown Unknown"
699class CovFastFourierTransform:
700 """A class to compute (via FFT) the nearby pixels correlation function.
702 Implements appendix of Astier+19.
704 Parameters
705 ----------
706 diff : `numpy.array`
707 Image where to calculate the covariances (e.g., the difference
708 image of two flats).
709 w : `numpy.array`
710 Weight image (mask): it should consist of 1's (good pixel) and
711 0's (bad pixels).
712 fftShape : `tuple`
713 2d-tuple with the shape of the FFT
714 maxRangeCov : `int`
715 Maximum range for the covariances.
716 """
718 def __init__(self, diff, w, fftShape, maxRangeCov):
719 # check that the zero padding implied by "fft_shape"
720 # is large enough for the required correlation range
721 assert fftShape[0] > diff.shape[0]+maxRangeCov+1
722 assert fftShape[1] > diff.shape[1]+maxRangeCov+1
723 # for some reason related to numpy.fft.rfftn,
724 # the second dimension should be even, so
725 if fftShape[1]%2 == 1:
726 fftShape = (fftShape[0], fftShape[1]+1)
727 tIm = np.fft.rfft2(diff*w, fftShape)
728 tMask = np.fft.rfft2(w, fftShape)
729 # sum of "squares"
730 self.pCov = np.fft.irfft2(tIm*tIm.conjugate())
731 # sum of values
732 self.pMean = np.fft.irfft2(tIm*tMask.conjugate())
733 # number of w!=0 pixels.
734 self.pCount = np.fft.irfft2(tMask*tMask.conjugate())
736 def cov(self, dx, dy):
737 """Covariance for dx,dy averaged with dx,-dy if both non zero.
739 Implements appendix of Astier+19.
741 Parameters
742 ----------
743 dx : `int`
744 Lag in x
745 dy : `int`
746 Lag in y
748 Returns
749 -------
750 0.5*(cov1+cov2): `float`
751 Covariance at (dx, dy) lag
752 npix1+npix2 : `int`
753 Number of pixels used in covariance calculation.
755 Raises
756 ------
757 ValueError if number of pixels for a given lag is 0.
758 """
759 # compensate rounding errors
760 nPix1 = int(round(self.pCount[dy, dx]))
761 if nPix1 == 0:
762 raise ValueError(f"Could not compute covariance term {dy}, {dx}, as there are no good pixels.")
763 cov1 = self.pCov[dy, dx]/nPix1-self.pMean[dy, dx]*self.pMean[-dy, -dx]/(nPix1*nPix1)
764 if (dx == 0 or dy == 0):
765 return cov1, nPix1
766 nPix2 = int(round(self.pCount[-dy, dx]))
767 if nPix2 == 0:
768 raise ValueError("Could not compute covariance term {dy}, {dx} as there are no good pixels.")
769 cov2 = self.pCov[-dy, dx]/nPix2-self.pMean[-dy, dx]*self.pMean[dy, -dx]/(nPix2*nPix2)
770 return 0.5*(cov1+cov2), nPix1+nPix2
772 def reportCovFastFourierTransform(self, maxRange):
773 """Produce a list of tuples with covariances.
775 Implements appendix of Astier+19.
777 Parameters
778 ----------
779 maxRange : `int`
780 Maximum range of covariances.
782 Returns
783 -------
784 tupleVec : `list`
785 List with covariance tuples.
786 """
787 tupleVec = []
788 # (dy,dx) = (0,0) has to be first
789 for dy in range(maxRange+1):
790 for dx in range(maxRange+1):
791 cov, npix = self.cov(dx, dy)
792 if (dx == 0 and dy == 0):
793 var = cov
794 tupleVec.append((dx, dy, var, cov, npix))
795 return tupleVec
798def getFitDataFromCovariances(i, j, mu, fullCov, fullCovModel, fullCovSqrtWeights, gain=1.0,
799 divideByMu=False, returnMasked=False):
800 """Get measured signal and covariance, cov model, weigths, and mask at
801 covariance lag (i, j).
803 Parameters
804 ----------
805 i : `int`
806 Lag for covariance matrix.
807 j : `int`
808 Lag for covariance matrix.
809 mu : `list`
810 Mean signal values.
811 fullCov : `list` of `numpy.array`
812 Measured covariance matrices at each mean signal level in mu.
813 fullCovSqrtWeights : `list` of `numpy.array`
814 List of square root of measured covariances at each mean
815 signal level in mu.
816 fullCovModel : `list` of `numpy.array`
817 List of modeled covariances at each mean signal level in mu.
818 gain : `float`, optional
819 Gain, in e-/ADU. If other than 1.0 (default), the returned
820 quantities will be in electrons or powers of electrons.
821 divideByMu : `bool`, optional
822 Divide returned covariance, model, and weights by the mean
823 signal mu?
824 returnMasked : `bool`, optional
825 Use mask (based on weights) in returned arrays (mu,
826 covariance, and model)?
828 Returns
829 -------
830 mu : `numpy.array`
831 list of signal values at (i, j).
832 covariance : `numpy.array`
833 Covariance at (i, j) at each mean signal mu value (fullCov[:, i, j]).
834 covarianceModel : `numpy.array`
835 Covariance model at (i, j).
836 weights : `numpy.array`
837 Weights at (i, j).
838 maskFromWeights : `numpy.array`, optional
839 Boolean mask of the covariance at (i,j), where the weights
840 differ from 0.
841 """
842 mu = np.array(mu)
843 fullCov = np.array(fullCov)
844 fullCovModel = np.array(fullCovModel)
845 fullCovSqrtWeights = np.array(fullCovSqrtWeights)
846 covariance = fullCov[:, i, j]*(gain**2)
847 covarianceModel = fullCovModel[:, i, j]*(gain**2)
848 weights = fullCovSqrtWeights[:, i, j]/(gain**2)
850 maskFromWeights = weights != 0
851 if returnMasked:
852 weights = weights[maskFromWeights]
853 covarianceModel = covarianceModel[maskFromWeights]
854 mu = mu[maskFromWeights]
855 covariance = covariance[maskFromWeights]
857 if divideByMu:
858 covariance /= mu
859 covarianceModel /= mu
860 weights *= mu
861 return mu, covariance, covarianceModel, weights, maskFromWeights
864def symmetrize(inputArray):
865 """ Copy array over 4 quadrants prior to convolution.
867 Parameters
868 ----------
869 inputarray : `numpy.array`
870 Input array to symmetrize.
872 Returns
873 -------
874 aSym : `numpy.array`
875 Symmetrized array.
876 """
877 targetShape = list(inputArray.shape)
878 r1, r2 = inputArray.shape[-1], inputArray.shape[-2]
879 targetShape[-1] = 2*r1-1
880 targetShape[-2] = 2*r2-1
881 aSym = np.ndarray(tuple(targetShape))
882 aSym[..., r2-1:, r1-1:] = inputArray
883 aSym[..., r2-1:, r1-1::-1] = inputArray
884 aSym[..., r2-1::-1, r1-1::-1] = inputArray
885 aSym[..., r2-1::-1, r1-1:] = inputArray
887 return aSym
890def ddict2dict(d):
891 """Convert nested default dictionaries to regular dictionaries.
893 This is needed to prevent yaml persistence issues.
895 Parameters
896 ----------
897 d : `defaultdict`
898 A possibly nested set of `defaultdict`.
900 Returns
901 -------
902 dict : `dict`
903 A possibly nested set of `dict`.
904 """
905 for k, v in d.items():
906 if isinstance(v, dict):
907 d[k] = ddict2dict(v)
908 return dict(d)
911class Pol2D:
912 """2D Polynomial Regression.
914 Parameters
915 ----------
916 x : numpy.ndarray
917 Input array for the x-coordinate.
918 y : numpy.ndarray
919 Input array for the y-coordinate.
920 z : numpy.ndarray
921 Input array for the dependent variable.
922 order : int
923 Order of the polynomial.
924 w : numpy.ndarray, optional
925 Weight array for weighted regression. Default is None.
927 Notes
928 -----
929 Ported from by https://gitlab.in2p3.fr/astier/bfptc P. Astier.
931 Example:
932 >>> x = np.array([1, 2, 3])
933 >>> y = np.array([4, 5, 6])
934 >>> z = np.array([7, 8, 9])
935 >>> order = 2
936 >>> poly_reg = Pol2D(x, y, z, order)
937 >>> result = poly_reg.eval(2.5, 5.5)
938 """
939 def __init__(self, x, y, z, order, w=None):
940 """
941 orderx : `int`
942 Effective order in the x-direction.
943 ordery : `int`
944 Effective order in the y-direction.
945 coeff : `numpy.ndarray`
946 Coefficients of the polynomial regression.
947 """
948 self.orderx = min(order, x.shape[0] - 1)
949 self.ordery = min(order, x.shape[1] - 1)
950 G = self.monomials(x.ravel(), y.ravel())
951 if w is None:
952 self.coeff, _, rank, _ = np.linalg.lstsq(G, z.ravel(), rcond=None)
953 else:
954 self.coeff, _, rank, _ = np.linalg.lstsq((w.ravel() * G.T).T, z.ravel() * w.ravel(), rcond=None)
956 def monomials(self, x, y):
957 """
958 Generate the monomials matrix for the given x and y.
960 Parameters
961 ----------
962 x : numpy.ndarray
963 Input array for the x-coordinate.
964 y : numpy.ndarray
965 Input array for the y-coordinate.
967 Returns
968 -------
969 G : numpy.ndarray
970 Monomials matrix.
971 """
972 ncols = (self.orderx + 1) * (self.ordery + 1)
973 G = np.zeros(x.shape + (ncols,))
974 ij = itertools.product(range(self.orderx + 1), range(self.ordery + 1))
975 for k, (i, j) in enumerate(ij):
976 G[..., k] = x**i * y**j
977 return G
979 def eval(self, x, y):
980 """
981 Evaluate the polynomial at the given x and y coordinates.
983 Parameters
984 ----------
985 x : `float`
986 x-coordinate for evaluation.
987 y : `float`
988 y-coordinate for evaluation.
990 Returns
991 -------
992 result : `float`
993 Result of the polynomial evaluation.
994 """
995 G = self.monomials(x, y)
996 return np.dot(G, self.coeff)
999class AstierSplineLinearityFitter:
1000 """Class to fit the Astier spline linearity model.
1002 This is a spline fit with photodiode data based on a model
1003 from Pierre Astier, referenced in June 2023 from
1004 https://me.lsst.eu/astier/bot/7224D/model_nonlin.py
1006 This model fits a spline with (optional) nuisance parameters
1007 to allow for different linearity coefficients with different
1008 photodiode settings. The minimization is a least-squares
1009 fit with the residual of
1010 Sum[(S(mu_i) + mu_i)/(k_j * D_i - O) - 1]**2, where S(mu_i)
1011 is an Akima Spline function of mu_i, the observed flat-pair
1012 mean; D_j is the photo-diode measurement corresponding to
1013 that flat-pair; and k_j is a constant of proportionality
1014 which is over index j as it is allowed to
1015 be different based on different photodiode settings (e.g.
1016 CCOBCURR); and O is a constant offset to allow for light
1017 leaks (and is only fit if fit_offset=True). In
1018 addition, if config.doSplineFitTemperature is True then
1019 the fit will adjust mu such that
1020 mu = mu_input*(1 + alpha*(temperature_scaled))
1021 and temperature_scaled = T - T_ref. Finally, if
1022 config.doSplineFitTemporal is True then the fit will
1023 further adjust mu such that
1024 mu = mu_input*(1 + beta*(mjd_scaled))
1025 and mjd_scaled = mjd - mjd_ref.
1027 The fit has additional constraints to ensure that the spline
1028 goes through the (0, 0) point, as well as a normalization
1029 condition so that the average of the spline over the full
1030 range is 0. The normalization ensures that the spline only
1031 fits deviations from linearity, rather than the linear
1032 function itself which is degenerate with the gain.
1034 Parameters
1035 ----------
1036 nodes : `np.ndarray` (N,)
1037 Array of spline node locations.
1038 grouping_values : `np.ndarray` (M,)
1039 Array of values to group values for different proportionality
1040 constants (e.g. CCOBCURR).
1041 pd : `np.ndarray` (M,)
1042 Array of photodiode measurements.
1043 mu : `np.ndarray` (M,)
1044 Array of flat mean values.
1045 mask : `np.ndarray` (M,), optional
1046 Input mask (True is good point, False is bad point).
1047 log : `logging.logger`, optional
1048 Logger object to use for logging.
1049 fit_offset : `bool`, optional
1050 Fit a constant offset to allow for light leaks?
1051 fit_weights : `bool`, optional
1052 Fit for the weight parameters?
1053 weight_pars_start : `list` [ `float` ]
1054 Iterable of 2 weight parameters for weighed fit. These will
1055 be used as input if the weight parameters are not fit.
1056 fit_temperature : `bool`, optional
1057 Fit for temperature scaling?
1058 temperature_scaled : `np.ndarray` (M,), optional
1059 Input scaled temperature values (T - T_ref).
1060 max_signal_nearly_linear : `float`, optional
1061 Maximum signal that we are confident the input is nearly
1062 linear. This is used both for regularization, and for
1063 fitting the raw slope. Usually set to the ptc turnoff,
1064 above which we allow the spline to significantly deviate
1065 and do not demand the deviation to average to zero.
1066 fit_temporal : `bool`, optional
1067 Fit for temporal scaling?
1068 mjd_scaled : `np.ndarray` (M,), optional
1069 Input scaled mjd values (mjd - mjd_ref).
1070 max_frac_correction : `float`, optional
1071 Maximum fractional correction.
1072 max_correction : `float`, optional
1073 Maximum correction (unscaled).
1074 """
1075 def __init__(
1076 self,
1077 nodes,
1078 grouping_values,
1079 pd,
1080 mu,
1081 mask=None,
1082 log=None,
1083 fit_offset=True,
1084 fit_weights=False,
1085 weight_pars_start=[1.0, 0.0],
1086 fit_temperature=False,
1087 temperature_scaled=None,
1088 max_signal_nearly_linear=None,
1089 fit_temporal=False,
1090 mjd_scaled=None,
1091 max_frac_correction=0.25,
1092 max_correction=np.inf,
1093 ):
1094 self._pd = pd
1095 self._mu = mu
1096 self._grouping_values = grouping_values
1097 self.log = log if log else logging.getLogger(__name__)
1098 self._fit_offset = fit_offset
1099 self._fit_weights = fit_weights
1100 self._weight_pars_start = weight_pars_start
1101 self._fit_temperature = fit_temperature
1102 self._fit_temporal = fit_temporal
1103 self._max_frac_correction = max_frac_correction
1104 self._max_correction = max_correction
1106 self._nodes = nodes
1107 if nodes[0] != 0.0:
1108 raise ValueError("First node must be 0.0")
1109 if not np.all(np.diff(nodes) > 0):
1110 raise ValueError("Nodes must be sorted with no repeats.")
1112 # Find the group indices.
1113 u_group_values = np.unique(self._grouping_values)
1114 self.ngroup = len(u_group_values)
1116 self.group_indices = []
1117 for i in range(self.ngroup):
1118 self.group_indices.append(np.where(self._grouping_values == u_group_values[i])[0])
1120 # Weight values. Outliers will be set to 0.
1121 if mask is None:
1122 _mask = np.ones(len(mu), dtype=np.bool_)
1123 else:
1124 _mask = mask
1125 self._w = self.compute_weights(self._weight_pars_start, self._mu, _mask)
1127 if temperature_scaled is None:
1128 temperature_scaled = np.zeros(len(self._mu))
1129 else:
1130 if len(np.atleast_1d(temperature_scaled)) != len(self._mu):
1131 raise ValueError("temperature_scaled must be the same length as input mu.")
1132 self._temperature_scaled = temperature_scaled
1134 if mjd_scaled is None:
1135 mjd_scaled = np.zeros(len(self._mu))
1136 else:
1137 if len(np.atleast_1d(mjd_scaled)) != len(self._mu):
1138 raise ValueError("mjd_scaled must be the same length as input mu.")
1139 self._mjd_scaled = mjd_scaled
1141 # Values to regularize spline fit.
1142 if max_signal_nearly_linear is None:
1143 max_signal_nearly_linear = self._mu[self.mask].max()
1144 self._max_signal_nearly_linear = max_signal_nearly_linear
1145 self._x_regularize = np.linspace(0.0, self._max_signal_nearly_linear, 100)
1147 # Set up the indices for the fit parameters.
1148 self.par_indices = {
1149 "values": np.arange(len(self._nodes)),
1150 "groups": len(self._nodes) + np.arange(self.ngroup),
1151 "offset": np.zeros(0, dtype=np.int64),
1152 "weight_pars": np.zeros(0, dtype=np.int64),
1153 "temperature_coeff": np.zeros(0, dtype=np.int64),
1154 "temporal_coeff": np.zeros(0, dtype=np.int64),
1155 }
1156 if self._fit_offset:
1157 self.par_indices["offset"] = np.arange(1) + (
1158 len(self.par_indices["values"])
1159 + len(self.par_indices["groups"])
1160 )
1161 if self._fit_weights:
1162 self.par_indices["weight_pars"] = np.arange(2) + (
1163 len(self.par_indices["values"])
1164 + len(self.par_indices["groups"])
1165 + len(self.par_indices["offset"])
1166 )
1167 if self._fit_temperature:
1168 self.par_indices["temperature_coeff"] = np.arange(1) + (
1169 len(self.par_indices["values"])
1170 + len(self.par_indices["groups"])
1171 + len(self.par_indices["offset"])
1172 + len(self.par_indices["weight_pars"])
1173 )
1174 if self._fit_temporal:
1175 self.par_indices["temporal_coeff"] = np.arange(1) + (
1176 len(self.par_indices["values"])
1177 + len(self.par_indices["groups"])
1178 + len(self.par_indices["offset"])
1179 + len(self.par_indices["weight_pars"])
1180 + len(self.par_indices["temperature_coeff"])
1181 )
1183 @staticmethod
1184 def compute_weights(weight_pars, mu, mask):
1185 w = 1./np.sqrt(weight_pars[0]**2. + weight_pars[1]**2./mu)
1186 w[~mask] = 0.0
1188 return w
1190 def estimate_p0(self):
1191 """Estimate initial fit parameters.
1193 Returns
1194 -------
1195 p0 : `np.ndarray`
1196 Parameter array, with spline values (one for each node) followed
1197 by proportionality constants (one for each group); one extra
1198 for the offset O (if fit_offset was set to True); two extra
1199 for the weights (if fit_weights was set to True); one
1200 extra for the temperature coefficient (if fit_temperature was
1201 set to True); and one extra for the temporal coefficient (if
1202 fit_temporal was set to True).
1203 """
1204 npt = (len(self.par_indices["values"])
1205 + len(self.par_indices["groups"])
1206 + len(self.par_indices["offset"])
1207 + len(self.par_indices["weight_pars"])
1208 + len(self.par_indices["temperature_coeff"])
1209 + len(self.par_indices["temporal_coeff"]))
1210 p0 = np.zeros(npt)
1212 # Do a simple linear fit for each group.
1213 for i, indices in enumerate(self.group_indices):
1214 mask = self.mask[indices]
1215 mu = self._mu[indices][mask]
1216 pd = self._pd[indices][mask]
1217 to_fit = (mu < self._max_signal_nearly_linear)
1218 # If we have no points (low PTC turnoff) then
1219 # we just use the first half for our initial value.
1220 if to_fit.sum() == 0:
1221 to_fit = (mu < np.median(mu))
1222 linfit = np.polyfit(pd[to_fit], mu[to_fit], 1)
1223 p0[self.par_indices["groups"][i]] = linfit[0]
1225 # Look at the residuals...
1226 ratio_model = self.compute_ratio_model(
1227 self._nodes,
1228 self.group_indices,
1229 self.par_indices,
1230 p0,
1231 self._pd,
1232 self._mu,
1233 self._temperature_scaled,
1234 self._mjd_scaled,
1235 )
1236 # ...and adjust the linear parameters accordingly.
1237 p0[self.par_indices["groups"]] *= np.median(ratio_model[self.mask])
1239 # Re-compute the residuals.
1240 ratio_model2 = self.compute_ratio_model(
1241 self._nodes,
1242 self.group_indices,
1243 self.par_indices,
1244 p0,
1245 self._pd,
1246 self._mu,
1247 self._temperature_scaled,
1248 self._mjd_scaled,
1249 )
1251 # And compute a first guess of the spline nodes.
1252 bins = np.clip(np.searchsorted(self._nodes, self._mu[self.mask]), 0, len(self._nodes) - 1)
1253 tot_arr = np.zeros(len(self._nodes))
1254 n_arr = np.zeros(len(self._nodes), dtype=int)
1255 np.add.at(tot_arr, bins, ratio_model2[self.mask])
1256 np.add.at(n_arr, bins, 1)
1258 ratio = np.ones(len(self._nodes))
1259 ratio[n_arr > 0] = tot_arr[n_arr > 0]/n_arr[n_arr > 0]
1260 ratio[0] = 1.0
1261 p0[self.par_indices["values"]] = (ratio - 1) * self._nodes
1263 if self._fit_offset:
1264 p0[self.par_indices["offset"]] = 0.0
1266 if self._fit_weights:
1267 p0[self.par_indices["weight_pars"]] = self._weight_pars_start
1269 return p0
1271 @staticmethod
1272 def compute_ratio_model(
1273 nodes,
1274 group_indices,
1275 par_indices,
1276 pars,
1277 pd,
1278 mu,
1279 temperature_scaled,
1280 mjd_scaled,
1281 return_spline=False,
1282 ):
1283 """Compute the ratio model values.
1285 Parameters
1286 ----------
1287 nodes : `np.ndarray` (M,)
1288 Array of node positions.
1289 group_indices : `list` [`np.ndarray`]
1290 List of group indices, one array for each group.
1291 par_indices : `dict`
1292 Dictionary showing which indices in the pars belong to
1293 each set of fit values.
1294 pars : `np.ndarray`
1295 Parameter array, with spline values (one for each node) followed
1296 by proportionality constants (one for each group); one extra
1297 for the offset O (if fit_offset was set to True); two extra
1298 for the weights (if fit_weights was set to True); and one
1299 extra for the temperature coefficient (if fit_temperature was
1300 set to True).
1301 pd : `np.ndarray` (N,)
1302 Array of photodiode measurements.
1303 mu : `np.ndarray` (N,)
1304 Array of flat means.
1305 temperature_scaled : `np.ndarray` (N,)
1306 Array of scaled temperature values.
1307 mjd_scaled : `np.ndarray` (N,)
1308 Array of scaled mjd values.
1309 return_spline : `bool`, optional
1310 Return the spline interpolation as well as the model ratios?
1312 Returns
1313 -------
1314 ratio_models : `np.ndarray` (N,)
1315 Model ratio, (mu_i - S(mu_i) - O)/(k_j * D_i)
1316 spl : `scipy.interpolate.Akima1DInterpolator`
1317 Spline interpolator (returned if return_spline=True).
1318 """
1319 spl = Akima1DInterpolator(nodes, pars[par_indices["values"]], method="akima")
1321 # Check if we want to do just the left or both with temp scale.
1322 if len(par_indices["temperature_coeff"]) == 1:
1323 mu_corr = mu*(1. + pars[par_indices["temperature_coeff"]]*temperature_scaled)
1324 else:
1325 mu_corr = mu
1327 if len(par_indices["temporal_coeff"]) == 1:
1328 mu_corr = mu_corr*(1. + pars[par_indices["temporal_coeff"]]*mjd_scaled)
1330 numerator = mu_corr - spl(np.clip(mu_corr, nodes[0], nodes[-1]))
1331 if len(par_indices["offset"]) == 1:
1332 numerator -= pars[par_indices["offset"][0]]
1333 denominator = pd.copy()
1334 ngroup = len(group_indices)
1335 kj = pars[par_indices["groups"]]
1336 for j in range(ngroup):
1337 denominator[group_indices[j]] *= kj[j]
1339 if return_spline:
1340 return numerator / denominator, spl
1341 else:
1342 return numerator / denominator
1344 def fit(self, p0, min_iter=3, max_iter=20, max_rejection_per_iteration=5, n_sigma_clip=5.0):
1345 """
1346 Perform iterative fit for linear + spline model with offsets.
1348 Parameters
1349 ----------
1350 p0 : `np.ndarray`
1351 Initial parameter array, with spline values (one for each node)
1352 followed by proportionality constants (one for each group); one
1353 extra for the offset O (if fit_offset was set to True); two extra
1354 for the weights (if fit_weights was set to True); one
1355 extra for the temperature coefficient (if fit_temperature was
1356 set to True); and one extra for the temporal coefficient (if
1357 fit_temporal was set to True).
1358 min_iter : `int`, optional
1359 Minimum number of fit iterations.
1360 max_iter : `int`, optional
1361 Maximum number of fit iterations.
1362 max_rejection_per_iteration : `int`, optional
1363 Maximum number of points to reject per iteration.
1364 n_sigma_clip : `float`, optional
1365 Number of sigma to do clipping in each iteration.
1366 """
1367 init_params = p0
1368 for k in range(max_iter):
1369 params, cov_params, _, msg, ierr = leastsq(
1370 self,
1371 init_params,
1372 full_output=True,
1373 ftol=1e-5,
1374 maxfev=12000,
1375 )
1376 init_params = params.copy()
1378 # We need to cut off the constraints at the end (there are more
1379 # residuals than data points.)
1380 res = self(params)[: len(self._w)]
1381 std_res = median_abs_deviation(res[self.good_points], scale="normal")
1382 sample = len(self.good_points)
1384 # We don't want to reject too many outliers at once.
1385 if sample > max_rejection_per_iteration:
1386 sres = np.sort(np.abs(res))
1387 cut = max(sres[-max_rejection_per_iteration], std_res*n_sigma_clip)
1388 else:
1389 cut = std_res*n_sigma_clip
1391 outliers = np.abs(res) > cut
1392 self._w[outliers] = 0
1393 if outliers.sum() != 0:
1394 self.log.info(
1395 "After iteration %d there are %d outliers (of %d).",
1396 k,
1397 outliers.sum(),
1398 sample,
1399 )
1400 elif k >= min_iter:
1401 self.log.info("After iteration %d there are no more outliers.", k)
1402 break
1404 return params
1406 @property
1407 def mask(self):
1408 return (self._w > 0)
1410 @property
1411 def good_points(self):
1412 return self.mask.nonzero()[0]
1414 def compute_chisq_dof(self, pars):
1415 """Compute the chi-squared per degree of freedom for a set of pars.
1417 Parameters
1418 ----------
1419 pars : `np.ndarray`
1420 Parameter array.
1422 Returns
1423 -------
1424 chisq_dof : `float`
1425 Chi-squared per degree of freedom.
1426 """
1427 resids = self(pars)[0: len(self.mask)]
1428 chisq = np.sum(resids[self.mask]**2.)
1429 dof = self.mask.sum() - self.ngroup
1430 if self._fit_temporal:
1431 dof -= 1
1432 if self._fit_temperature:
1433 dof -= 1
1434 if self._fit_offset:
1435 dof -= 1
1436 if self._fit_weights:
1437 dof -= 2
1439 return chisq/dof
1441 def __call__(self, pars):
1443 ratio_model, spl = self.compute_ratio_model(
1444 self._nodes,
1445 self.group_indices,
1446 self.par_indices,
1447 pars,
1448 self._pd,
1449 self._mu,
1450 self._temperature_scaled,
1451 self._mjd_scaled,
1452 return_spline=True,
1453 )
1455 _mask = self.mask
1456 # Update the weights if we are fitting them.
1457 if self._fit_weights:
1458 self._w = self.compute_weights(pars[self.par_indices["weight_pars"]], self._mu, _mask)
1459 resid = self._w*(ratio_model - 1.0)
1461 # Ensure masked points have 0 residual.
1462 resid[~_mask] = 0.0
1464 constraint = [1e3 * np.mean(spl(np.clip(self._x_regularize, self._nodes[0], self._nodes[-1])))]
1465 # 0 should transform to 0
1466 constraint.append(spl(0)*1e10)
1467 # Use a Jeffreys prior on the weight if we are fitting it.
1468 if self._fit_weights:
1469 # This factor ensures that log(fact * w) is negative.
1470 fact = 1e-3 / self._w.max()
1471 # We only add non-zero weights to the constraint array.
1472 log_w = np.sqrt(-2.*np.log(fact*self._w[self._w > 0]))
1473 constraint = np.hstack([constraint, log_w])
1475 # Don't let it get to >5% correction.
1476 values = pars[self.par_indices["values"]]
1477 if np.abs(values[-1])/self._nodes[-1] > self._max_frac_correction \
1478 or np.abs(values[-1]) > self._max_correction:
1479 extra_constraint = 1e10
1480 else:
1481 extra_constraint = 0
1483 return np.hstack([resid, constraint, extra_constraint])
1486def getReadNoise(exposure, ampName, taskMetadata=None, log=None):
1487 """Gets readout noise for an amp from ISR metadata.
1489 If possible, this attempts to get the now-standard headers
1490 added to the exposure itself. If not found there, the ISR
1491 TaskMetadata is searched. If neither of these has the value,
1492 warn and set the read noise to NaN.
1494 Parameters
1495 ----------
1496 exposure : `lsst.afw.image.Exposure`
1497 Exposure to check for read noise first.
1498 ampName : `str`
1499 Amplifier name.
1500 taskMetadata : `lsst.pipe.base.TaskMetadata`, optional
1501 List of exposures metadata from ISR for this exposure.
1502 log : `logging.logger`, optional
1503 Log for messages.
1505 Returns
1506 -------
1507 readNoise : `float`
1508 The read noise for this set of exposure/amplifier.
1509 """
1510 exposureMetadata = exposure.getMetadata()
1512 # Try from the exposure first.
1513 expectedKey = f"LSST ISR OVERSCAN RESIDUAL SERIAL STDEV {ampName}"
1514 if expectedKey in exposureMetadata:
1515 return exposureMetadata[expectedKey]
1517 # If not, try getting it from the task metadata.
1518 if taskMetadata:
1519 expectedKey = f"RESIDUAL STDEV {ampName}"
1520 if "isr" in taskMetadata:
1521 if expectedKey in taskMetadata["isr"]:
1522 return taskMetadata["isr"][expectedKey]
1524 log = log if log else logging.getLogger(__name__)
1525 log.warning("Median readout noise from ISR metadata for amp %s "
1526 "could not be found.", ampName)
1527 return np.nan
1530def ampOffsetGainRatioFixup(ptc, minAdu, maxAdu, log=None):
1531 """Apply gain ratio fixup using amp offsets.
1533 Parameters
1534 ----------
1535 ptc : `lsst.ip.isr.PhotonTransferCurveDataset`
1536 Input PTC. Will be modified in place.
1537 minAdu : `float`
1538 Minimum number of ADU in mean of amplifier to use for
1539 computing gain ratio fixup.
1540 maxAdu : `float`
1541 Maximum number of ADU in mean of amplifier to use for
1542 computing gain ratio fixup.
1543 log : `lsst.utils.logging.LsstLogAdapter`, optional
1544 Log object.
1545 """
1546 # We need to find the reference amplifier.
1547 # Find an amp near the middle to use as a pivot.
1548 if log is None:
1549 log = logging.getLogger(__name__)
1551 # We first check for which amps have gain measurements
1552 # (fully bad amps are filled with NaN for gain.)
1553 gainArray = np.zeros(len(ptc.ampNames))
1554 for i, ampName in enumerate(ptc.ampNames):
1555 gainArray[i] = ptc.gain[ampName]
1556 good, = np.where(np.isfinite(gainArray))
1558 if len(good) > 1:
1559 # This only works with more than 1 good amp.
1561 # We sort the gains and take the one that is closest
1562 # to the median to use as the reference amplifier for
1563 # gain ratios.
1564 st = np.argsort(gainArray[good])
1565 midAmp = good[st[int(0.5*len(good))]]
1566 midAmpName = ptc.ampNames[midAmp]
1568 log.info("Using amplifier %s as the reference for doLinearityGainRatioFixup.", midAmpName)
1570 # First pass, we need to compute the corrections.
1571 corrections = {}
1572 for ampName in ptc.ampNames:
1573 if not np.isfinite(ptc.gain[ampName]) or ampName == midAmpName:
1574 continue
1576 ratioPtc = ptc.gain[ampName] / ptc.gain[midAmpName]
1578 deltas = ptc.ampOffsets[ampName] - ptc.ampOffsets[midAmpName]
1579 use = (
1580 (ptc.expIdMask[ampName])
1581 & (np.isfinite(deltas))
1582 & (ptc.finalMeans[ampName] >= minAdu)
1583 & (ptc.finalMeans[ampName] <= maxAdu)
1584 & (np.isfinite(ptc.finalMeans[midAmpName]))
1585 & (ptc.expIdMask[midAmpName])
1586 )
1587 if use.sum() < 3:
1588 log.warning("Not enough good amp offset measurements to fix up amp %s "
1589 "gains from amp ratios.", ampName)
1590 continue
1592 ratios = 1. / (deltas / ptc.finalMeans[midAmpName] + 1.0)
1593 ratio = np.median(ratios[use])
1594 corrections[ampName] = ratio / ratioPtc
1596 # For the final correction, we need to make sure that the
1597 # reference amplifier is included. By definition, it has a
1598 # correction factor of 1.0 before any final fix.
1599 corrections[midAmpName] = 1.0
1601 # Adjust the median correction to be 1.0 so we do not
1602 # change the gain of the detector on average.
1603 # This is needed in case the reference amplifier is
1604 # skewed in terms of offsets even though it has the median
1605 # gain.
1606 medCorrection = np.median([corrections[key] for key in corrections])
1608 for ampName in ptc.ampNames:
1609 if ampName not in corrections:
1610 continue
1612 correction = corrections[ampName] / medCorrection
1613 newGain = ptc.gain[ampName] * correction
1614 log.info(
1615 "Adjusting gain from amplifier %s by factor of %.5f (from %.5f to %.5f)",
1616 ampName,
1617 correction,
1618 ptc.gain[ampName],
1619 newGain,
1620 )
1621 # Copying the value should not be necessary, but we record
1622 # it just in case.
1623 ptc.gainUnadjusted[ampName] = ptc.gain[ampName]
1624 ptc.gain[ampName] = newGain
1625 else:
1626 log.warning("Cannot apply ampOffsetGainRatioFixup with fewer than 2 good amplifiers.")
1629class FlatGradientFitter:
1630 """Class to fit various large-scale flat-field gradients.
1632 This fitter will take arrays of x/y/value and (by default) fit a radial
1633 gradient, using a spline function at the specified nodes. The fitter
1634 will also fit out a nuisance parameter for the ratio of the ITL/E2V
1635 throughput (though in general this could work for any such focal plane).
1636 The focal plane origin is set by fp_centroid_x and fp_centroid_y which
1637 is the center of the radial gradient, though it may be modified if
1638 fit_centroid is True. In addition, the fitter may fit a linear gradient
1639 in x/y if fit_gradient is True. The "pivot" of the gradient is at
1640 fp_centroid_x/fp_centroid_y.
1642 Parameters
1643 ----------
1644 nodes : `np.ndarray`
1645 Array of spline nodes for radial spline.
1646 x : `np.ndarray`
1647 Array of x values for points to fit (mm).
1648 y : `np.ndarray`
1649 Array of y values for points to fit (mm).
1650 value : `np.ndarray`
1651 Array of values describing the flat field to fit the gradient.
1652 itl_indices : `np.ndarray`
1653 Array of indices corresponding to the ITL detectors.
1654 constrain_zero : `bool`, optional
1655 Constrain the outermost radial spline value to 0?
1656 fit_centroid : `bool`, optional
1657 Fit an additional centroid offset?
1658 fit_gradient : `bool`, optional
1659 Fit an additional plane gradient?
1660 fp_centroid_x : `float`, optional
1661 Focal plane centroid x (mm).
1662 fp_centroid_y : `float`, optional
1663 Focal plane centroid y (mm).
1664 """
1665 def __init__(
1666 self,
1667 nodes,
1668 x,
1669 y,
1670 value,
1671 itl_indices,
1672 constrain_zero=True,
1673 fit_centroid=False,
1674 fit_gradient=False,
1675 fp_centroid_x=0.0,
1676 fp_centroid_y=0.0
1677 ):
1678 self._nodes = nodes
1679 self._x = x
1680 self._y = y
1681 self._value = value
1682 self._itl_indices = itl_indices
1683 self._fp_centroid_x = fp_centroid_x
1684 self._fp_centroid_y = fp_centroid_y
1686 self._constrain_zero = constrain_zero
1688 self._fit_centroid = fit_centroid
1689 self._fit_gradient = fit_gradient
1691 self.indices = {"spline": np.arange(len(nodes))}
1692 npar = len(nodes)
1694 self._fit_itl_ratio = False
1695 if len(itl_indices) > 0:
1696 self._fit_itl_ratio = True
1697 self.indices["itl_ratio"] = npar
1698 npar += 1
1700 radius = np.sqrt((self._x - self._fp_centroid_x)**2. + (self._y - self._fp_centroid_y)**2.)
1702 if fit_centroid:
1703 self.indices["centroid_delta"] = np.arange(2) + npar
1704 npar += 2
1705 else:
1706 self._radius = radius
1708 if fit_gradient:
1709 self.indices["gradient"] = np.arange(2) + npar
1710 npar += 2
1712 self._npar = npar
1714 self._bounds = [(None, None)]*npar
1716 if self._constrain_zero:
1717 self._bounds[self.indices["spline"][-1]] = (0.0, 0.0)
1719 def compute_p0(self, itl_ratio=None):
1720 """Compute initial guess for fit parameters.
1722 Returns
1723 -------
1724 pars : `np.ndarray`
1725 Array of first guess fit parameters.
1726 """
1727 pars = np.zeros(self._npar)
1728 value = self._value.copy()
1730 if itl_ratio is not None and self._fit_itl_ratio:
1731 value[self._itl_indices] /= itl_ratio
1733 # Initial spline values
1734 radius = np.sqrt((self._x - self._fp_centroid_x)**2. + (self._y - self._fp_centroid_y)**2.)
1735 for i, index in enumerate(self.indices["spline"]):
1736 if i == 0:
1737 low = self._nodes[i]
1738 else:
1739 low = (self._nodes[i - 1] + self._nodes[i])/2.
1740 if i == (len(self._nodes) - 1):
1741 high = self._nodes[i]
1742 else:
1743 high = (self._nodes[i] + self._nodes[i + 1])/2.
1744 u = ((radius > low) & (radius < high))
1745 if u.sum() == 0:
1746 pars[index] = 0.0
1747 else:
1748 pars[index] = np.median(value[u])
1750 if self._constrain_zero:
1751 pars[self.indices["spline"][-1]] = 0.0
1753 spl = Akima1DInterpolator(self._nodes, pars[self.indices["spline"]], method="akima")
1754 model = spl(radius)
1755 resid_ratio = value / model
1757 if self._fit_itl_ratio:
1758 if itl_ratio is not None:
1759 pars[self.indices["itl_ratio"]] = itl_ratio
1760 else:
1761 e2v_indices = np.delete(np.arange(len(self._value)), self._itl_indices)
1763 itl_inner = radius[self._itl_indices] < 0.8*np.max(radius)
1764 e2v_inner = radius[e2v_indices] < 0.8*np.max(radius)
1766 itl_median = np.nanmedian(resid_ratio[self._itl_indices][itl_inner])
1767 e2v_median = np.nanmedian(resid_ratio[e2v_indices][e2v_inner])
1769 pars[self.indices["itl_ratio"]] = itl_median / e2v_median
1771 if self._fit_centroid:
1772 pars[self.indices["centroid_delta"]] = [0.0, 0.0]
1774 if self._fit_gradient:
1775 resid_ratio = self._value / model
1777 ok = (np.isfinite(resid_ratio) & (model > 0.5))
1779 fit = np.polyfit(self._x[ok], resid_ratio[ok], 1)
1780 pars[self.indices["gradient"][0]] = fit[0]
1782 fit = np.polyfit(self._y[ok], resid_ratio[ok], 1)
1783 pars[self.indices["gradient"][1]] = fit[0]
1785 return pars
1787 def fit(self, p0, freeze_itl_ratio=False, fit_eps=1e-8, fit_gtol=1e-10):
1788 """Do a non-linear minimization to fit the parameters.
1790 Parameters
1791 ----------
1792 p0 : `np.ndarray`
1793 Array of initial parameter estimates.
1794 freeze_itl_ratio : `bool`, optional
1795 Freeze the ITL ratio in the fit?
1796 fit_eps : `float`, optional
1797 Value of ``eps`` to send to the scipy minimizer.
1798 fit_gtol : `float`, optional
1799 Value of ``gtol`` to send to the scipy minimizer.
1801 Returns
1802 -------
1803 pars : `np.ndarray`
1804 Array of parameters. Use ``fitter.indices`` for the
1805 dictionary to map parameters to subsets.
1806 """
1807 bounds = self._bounds
1808 if freeze_itl_ratio and self._fit_itl_ratio:
1809 ind = self.indices["itl_ratio"]
1810 par = p0[ind]
1811 bounds[ind] = (par, par)
1813 res = minimize(
1814 self,
1815 p0,
1816 method="L-BFGS-B",
1817 jac=False,
1818 bounds=bounds,
1819 options={
1820 "maxfun": 10000,
1821 "maxiter": 10000,
1822 "maxcor": 20,
1823 "eps": fit_eps,
1824 "gtol": fit_gtol,
1825 },
1826 callback=None,
1827 )
1828 pars = res.x
1830 return pars
1832 def compute_model(self, pars):
1833 """Compute the model given a set of parameters.
1835 Parameters
1836 ----------
1837 pars : `np.ndarray`
1838 Parameter array to compute model.
1840 Returns
1841 -------
1842 model_array : `np.ndarray`
1843 Array of model parameters at the input x/y.
1844 """
1845 spl = Akima1DInterpolator(self._nodes, pars[self.indices["spline"]], method="akima")
1846 if self._fit_centroid:
1847 centroid_delta = pars[self.indices["centroid_delta"]]
1848 centroid_x = self._fp_centroid_x + centroid_delta[0]
1849 centroid_y = self._fp_centroid_y + centroid_delta[1]
1850 radius = np.sqrt((self._x - centroid_x)**2. + (self._y - centroid_y)**2.)
1851 else:
1852 radius = self._radius
1854 model = spl(np.clip(radius, self._nodes[0], self._nodes[-1]))
1855 if self._fit_itl_ratio:
1856 model[self._itl_indices] *= pars[self.indices["itl_ratio"]]
1858 if self._fit_gradient:
1859 a, b = pars[self.indices["gradient"]]
1860 gradient = 1 + a*(self._x - self._fp_centroid_x) + b*(self._y - self._fp_centroid_y)
1861 model /= gradient
1863 return model
1865 def __call__(self, pars):
1866 """Compute the cost function for a set of parameters.
1868 Parameters
1869 ----------
1870 pars : `np.ndarray`
1871 Parameter array to compute model.
1873 Returns
1874 -------
1875 cost : `float`
1876 Cost value computed from the absolute deviation.
1877 """
1879 model = self.compute_model(pars)
1881 absdev = np.abs(self._value - model)
1882 t = np.sum(absdev.astype(np.float64))
1884 return t
1887class FlatGainRatioFitter:
1888 """Class to fit amplifier gain ratios, removing a simple gradient.
1890 This fitter will take arrays of x/y/amp_num/value and fit amplifier
1891 gain ratios, using one amplifier as the fixed point. The fitter
1892 uses a low-order chebyshev polynomial to fit out the gradient, with
1893 amp ratios on top of this.
1895 Parameters
1896 ----------
1897 bbox : `lsst.geom.Box2I`
1898 Bounding box for Chebyshev polynomial gradient.
1899 order : `int`
1900 Order of Chebyshev polynomial.
1901 x : `np.ndarray`
1902 Array of x values for points to fit (detector pixels).
1903 y : `np.ndarray`
1904 Array of y values for points to fit (detector pixels).
1905 amp_index : `np.ndarray`
1906 Array of amp numbers associated with each x/y pair.
1907 value : `np.ndarray`
1908 Flat value at each x/y pair.
1909 amps : `np.ndarray`
1910 Array of unique amplifier numbers that will be parameterized.
1911 Any of these amps that does not have any data with the same
1912 amp_index will be set to 1.0.
1913 fixed_amp_index : `int`
1914 Amplifier number to keep fixed.
1915 """
1916 def __init__(self, bbox, order, x, y, amp_index, value, amps, fixed_amp_index):
1917 self._bbox = bbox
1918 self._order = order
1919 self._x = x.astype(np.float64)
1920 self._y = y.astype(np.float64)
1921 self._amp_index = amp_index
1922 self._value = value.astype(np.float64)
1923 self._fixed_amp_index = fixed_amp_index
1925 self.indices = {"chebyshev": np.arange((order + 1) * (order + 1))}
1926 npar = len(self.indices["chebyshev"])
1928 self._amps = amps
1929 self._n_amp = len(self._amps)
1931 self.indices["amp_pars"] = np.arange(self._n_amp) + npar
1932 npar += self._n_amp
1934 self._npar = npar
1936 self._amp_indices = {}
1937 for i in range(self._n_amp):
1938 amp_index = self._amps[i]
1939 self._amp_indices[amp_index] = (self._amp_index == amp_index)
1941 def fit(self, n_iter=10, nsig_clip=5.0):
1942 """Fit the amp ratio parameters.
1944 This uses an iterative fit, where it fits a Chebyshev gradient,
1945 computes amp ratios, and re-fits the gradient.
1947 Parameters
1948 ----------
1949 n_iter : `int`, optional
1950 Number of iterations for fit.
1951 nsig_clip : `float`, optional
1952 Number of sigma in gain correction distribution to clip when
1953 fitting out the Chebyshev gradient.
1955 Returns
1956 -------
1957 pars : `np.ndarray`
1958 Chebyshev parameters and amp offset parameters.
1959 """
1960 value = self._value.copy()
1962 control = lsst.afw.math.ChebyshevBoundedFieldControl()
1963 control.orderX = self._order
1964 control.orderY = self._order
1965 control.triangular = False
1967 pars = np.zeros(self._npar)
1969 pars[self.indices["amp_pars"]] = 1.0
1971 field_use = np.ones(len(value), dtype=np.bool_)
1973 for i in range(n_iter):
1974 field = lsst.afw.math.ChebyshevBoundedField.fit(
1975 self._bbox,
1976 self._x[field_use],
1977 self._y[field_use],
1978 value[field_use],
1979 control,
1980 )
1982 pars[self.indices["chebyshev"][:]] = field.getCoefficients().ravel()
1984 ratio = self._value / field.evaluate(self._x, self._y)
1986 fixed_med = np.median(ratio[self._amp_indices[self._fixed_amp_index]])
1987 ratio /= fixed_med
1989 pars[self.indices["amp_pars"][self._fixed_amp_index]] = 1.0
1991 value = self._value.copy()
1993 for j in range(self._n_amp):
1994 amp_index = self._amps[j]
1996 if np.sum(self._amp_indices[amp_index]) == 0:
1997 continue
1999 pars[self.indices["amp_pars"][j]] = np.median(ratio[self._amp_indices[amp_index]])
2001 amp_pars = pars[self.indices["amp_pars"]]
2002 med = np.median(amp_pars)
2003 sig = median_abs_deviation(amp_pars, scale="normal")
2004 amp_gradient_bad, = np.where(np.abs(amp_pars - med) > nsig_clip * sig)
2005 field_use[:] = True
2006 for agb in amp_gradient_bad:
2007 field_use[self._amp_indices[agb]] = False
2009 return pars
2011 def compute_model(self, pars):
2012 """Compute the gradient/amp ratio model for a given set of parameters.
2014 Parameters
2015 ----------
2016 pars : `np.ndarray`
2017 Chebyshev parameters and amp offset parameters.
2019 Returns
2020 -------
2021 model : `np.ndarray`
2022 The model at each x/y pair.
2023 """
2024 field = lsst.afw.math.ChebyshevBoundedField(
2025 self._bbox,
2026 pars[self.indices["chebyshev"]].reshape(self._order + 1, self._order + 1),
2027 )
2028 model = field.evaluate(self._x, self._y)
2030 for i in range(self._n_amp):
2031 amp_index = self._amps[i]
2033 model[self._amp_indices[amp_index]] *= pars[self.indices["amp_pars"][i]]
2035 return model
2037 def __call__(self, pars):
2038 """Compute the cost function for a set of parameters.
2040 Parameters
2041 ----------
2042 pars : `np.ndarray`
2043 Chebyshev parameters and amp offset parameters.
2045 Returns
2046 -------
2047 cost : `float`
2048 Cost value computed from the absolute deviation.
2049 """
2050 model = self.compute_model(pars)
2052 absdev = np.abs(self._value - model)
2053 t = np.sum(absdev.astype(np.float64))
2055 return t
2058def bin_focal_plane(
2059 exposure_handle_dict,
2060 detector_boundary,
2061 bin_factor,
2062 defect_handle_dict={},
2063 include_itl_flag=True,
2064):
2065 """Bin all the detectors into the full focal plane.
2067 This function reads in images; takes a simple average if there
2068 are more than one input per detector; excludes detector edges;
2069 and bins according to the bin factor. The output is a struct
2070 with focal plane coordinates, detector numbers, and a flag
2071 if the detector is an ITL detector.
2073 Parameters
2074 ----------
2075 exposure_handle_dict : `dict`
2076 Dict keyed by detector (`int`), each element is a list
2077 of `lsst.daf.butler.DeferredDatasetHandle` that will be averaged.
2078 detector_boundary : `int`
2079 Boundary of the detector to ignore (pixels).
2080 bin_factor : `int`
2081 Binning factor. Detectors will be cropped (after applying the
2082 ``detector_boundary``) such that there are no partially
2083 covered binned pixels.
2084 defect_handle_dict : `dict`, optional
2085 Dict keyed by detector (`int`), each element is a
2086 `lsst.daf.butler.DeferredDatasetHandle` for defects to be applied.
2087 include_itl_flag : `bool`, optional
2088 Include a flag for which detectors are ITL?
2090 Returns
2091 -------
2092 binned : `astropy.table.Table`
2093 Table with focal plane positions at the center of each bin
2094 (``xf``, ``yf``); average image values (``value``); and detector
2095 number (``detector``).
2096 """
2097 xf_arrays = []
2098 yf_arrays = []
2099 value_arrays = []
2100 detector_arrays = []
2101 itl_arrays = []
2103 for det in exposure_handle_dict.keys():
2104 flat = exposure_handle_dict[det].get()
2105 defect_handle = defect_handle_dict.get(det, None)
2106 if defect_handle is not None:
2107 defects = defect_handle.get()
2108 else:
2109 defects = None
2111 detector = flat.getDetector()
2113 # Mask out defects if we have them.
2114 if defects is not None:
2115 for defect in defects:
2116 flat.image[defect.getBBox()].array[:, :] = np.nan
2118 # Mask NO_DATA pixels if we have them.
2119 no_data = ((flat.mask.array[:, :] & flat.mask.getPlaneBitMask("NO_DATA")) > 0)
2120 flat.image.array[no_data] = np.nan
2122 # Bin the image, avoiding the boundary and the masked pixels.
2123 # We also make sure we are using an integral number of
2124 # steps to avoid partially covered binned pixels.
2126 arr = flat.image.array
2128 n_step_y = (arr.shape[0] - (2 * detector_boundary)) // bin_factor
2129 y_min = detector_boundary
2130 y_max = bin_factor * n_step_y + y_min
2131 n_step_x = (arr.shape[1] - (2 * detector_boundary)) // bin_factor
2132 x_min = detector_boundary
2133 x_max = bin_factor * n_step_x + x_min
2135 arr = arr[y_min: y_max, x_min: x_max]
2136 binned = arr.reshape((n_step_y, bin_factor, n_step_x, bin_factor))
2137 with warnings.catch_warnings():
2138 warnings.filterwarnings("ignore", r"Mean of empty")
2139 binned = np.nanmean(binned, axis=1)
2140 binned = np.nanmean(binned, axis=2)
2142 xx = np.arange(binned.shape[1]) * bin_factor + bin_factor / 2. + x_min
2143 yy = np.arange(binned.shape[0]) * bin_factor + bin_factor / 2. + y_min
2144 x, y = np.meshgrid(xx, yy)
2145 x = x.ravel()
2146 y = y.ravel()
2147 value = binned.ravel()
2149 # Transform to focal plane coordinates.
2150 transform = detector.getTransform(lsst.afw.cameraGeom.PIXELS, lsst.afw.cameraGeom.FOCAL_PLANE)
2151 xy = np.vstack((x, y))
2152 xf, yf = np.vsplit(transform.getMapping().applyForward(xy), 2)
2153 xf = xf.ravel()
2154 yf = yf.ravel()
2156 if include_itl_flag:
2157 is_itl = np.zeros(len(value), dtype=np.bool_)
2158 # We use this check so that ITL matches ITL science detectors,
2159 # ITL_WF wavefront detectors, and pseudoITL test detectors.
2160 is_itl[:] = ("ITL" in detector.getPhysicalType())
2162 xf_arrays.append(xf)
2163 yf_arrays.append(yf)
2164 value_arrays.append(value)
2165 detector_arrays.append(np.full_like(xf, det, dtype=np.int32))
2166 if include_itl_flag:
2167 itl_arrays.append(is_itl)
2169 xf = np.concatenate(xf_arrays)
2170 yf = np.concatenate(yf_arrays)
2171 value = np.concatenate(value_arrays)
2172 detector = np.concatenate(detector_arrays)
2174 binned = Table(
2175 np.zeros(
2176 len(xf),
2177 dtype=[
2178 ("xf", "f8"),
2179 ("yf", "f8"),
2180 ("value", "f8"),
2181 ("detector", "i4"),
2182 ],
2183 )
2184 )
2185 binned["xf"] = xf
2186 binned["yf"] = yf
2187 binned["value"] = value
2188 binned["detector"] = detector
2190 if include_itl_flag:
2191 binned["itl"] = np.concatenate(itl_arrays).astype(np.bool_)
2193 return binned
2196def bin_flat(ptc, exposure, bin_factor=8, amp_boundary=20, apply_gains=True, gain_ratios=None):
2197 """Bin a flat image, being careful with amplifier edges.
2199 This will optionally apply gains, and apply any gain
2200 ratios.
2202 Parameters
2203 ----------
2204 ptc : `lsst.ip.isr.PhotonTransferCurveDatasets`
2205 PTC dataset with relevant gains.
2206 exposure : `lsst.afw.image.Exposure`
2207 Exposure to bin.
2208 bin_factor : `int`, optional
2209 Binning factor.
2210 amp_boundary : `int`, optional
2211 Boundary around each amp to ignore in binning.
2212 apply_gains : `bool`, optional
2213 Apply gains before binning?
2214 gain_ratios : `np.ndarray`, optional
2215 Array of gain ratios to apply.
2217 Returns
2218 -------
2219 binned : `astropy.table.Table`
2220 Table with detector coordinates at the center of each bin
2221 (``xd``, ``yd``); average image values (``value``); and amplifier
2222 index (``amp_index``).
2223 """
2224 detector = exposure.getDetector()
2226 for i, amp_name in enumerate(ptc.ampNames):
2227 bbox = detector[amp_name].getBBox()
2228 if amp_name in ptc.badAmps:
2229 exposure[bbox].image.array[:, :] = np.nan
2230 continue
2232 if apply_gains:
2233 exposure[bbox].image.array[:, :] *= ptc.gainUnadjusted[amp_name]
2234 if gain_ratios is not None:
2235 exposure[bbox].image.array[:, :] /= gain_ratios[i]
2237 # Next we bin the detector, avoiding amp edges.
2238 xd_arrays = []
2239 yd_arrays = []
2240 value_arrays = []
2241 amp_arrays = []
2243 for i, amp in enumerate(detector):
2244 arr = exposure[amp.getBBox()].image.array
2246 n_step_y = (arr.shape[0] - (2 * amp_boundary)) // bin_factor
2247 y_min = amp_boundary
2248 y_max = bin_factor * n_step_y + y_min
2249 n_step_x = (arr.shape[1] - (2 * amp_boundary)) // bin_factor
2250 x_min = amp_boundary
2251 x_max = bin_factor * n_step_x + x_min
2253 arr = arr[y_min: y_max, x_min: x_max]
2254 binned = arr.reshape((n_step_y, bin_factor, n_step_x, bin_factor))
2255 with warnings.catch_warnings():
2256 warnings.filterwarnings("ignore", r"Mean of empty")
2257 binned = np.nanmean(binned, axis=1)
2258 binned = np.nanmean(binned, axis=2)
2260 xx = np.arange(binned.shape[1]) * bin_factor + bin_factor / 2. + x_min
2261 yy = np.arange(binned.shape[0]) * bin_factor + bin_factor / 2. + y_min
2262 x, y = np.meshgrid(xx, yy)
2263 x = x.ravel() + amp.getBBox().getBeginX()
2264 y = y.ravel() + amp.getBBox().getBeginY()
2265 value = binned.ravel()
2267 xd_arrays.append(x)
2268 yd_arrays.append(y)
2269 value_arrays.append(value)
2270 amp_arrays.append(np.full(len(x), i))
2272 xd = np.concatenate(xd_arrays)
2273 yd = np.concatenate(yd_arrays)
2274 value = np.concatenate(value_arrays)
2275 amp_index = np.concatenate(amp_arrays)
2277 binned = np.zeros(
2278 len(xd),
2279 dtype=[
2280 ("xd", "f8"),
2281 ("yd", "f8"),
2282 ("value", "f8"),
2283 ("amp_index", "i4"),
2284 ],
2285 )
2286 binned["xd"] = xd
2287 binned["yd"] = yd
2288 binned["value"] = value
2289 binned["amp_index"] = amp_index
2291 return Table(data=binned)
2294class ElectrostaticFit():
2295 """
2296 Class to handle the electrostatic fit of area coefficients.
2298 This class manages the fitting of electrostatic model parameters to
2299 measured area change coefficients. The actual electrostatic calculations
2300 are performed by the ElectrostaticCcdGeom class. The fit is performed
2301 using the lmfit Minimizer, and the model includes normalization
2302 parameters (alpha, beta) as well as physical CCD parameters.
2304 Attributes
2305 ----------
2306 fitRange : int
2307 Range of pixels to fit.
2308 inputRange : int
2309 Range of input data.
2310 doFitNormalizationOffset : bool
2311 Whether to fit an offset parameter.
2312 nImageChargePairs : int
2313 Number of image charge pairs used in the electrostatic calculation.
2314 fitMethod : str
2315 Minimization method used by lmfit.
2316 aMatrix : np.ndarray
2317 Measured area change matrix.
2318 aMatrixSigma : np.ndarray
2319 Uncertainty matrix for area changes.
2320 sqrtWeights : np.ndarray
2321 Weights for fitting, inverse of aMatrixSigma.
2322 params : lmfit.Parameters
2323 Fit parameters.
2324 """
2326 def __init__(
2327 self,
2328 initialParams,
2329 fitMethod,
2330 aMatrix,
2331 aMatrixSigma,
2332 fitRange,
2333 doFitNormalizationOffset,
2334 nImageChargePairs,
2335 ):
2336 """
2337 Initialize the ElectrostaticFit class.
2339 Parameters
2340 ----------
2341 initialParams : lmfit.Parameters
2342 Initial fit parameters.
2343 fitMethod : str
2344 Minimization method for lmfit.
2345 aMatrix : np.ndarray
2346 Measured area change matrix.
2347 aMatrixSigma : np.ndarray
2348 Uncertainty matrix for area changes.
2349 fitRange : int
2350 Range of pixels to fit.
2351 doFitNormalizationOffset : bool
2352 Whether to fit an offset parameter.
2353 nImageChargePairs : int
2354 Number of image charge pairs for electrostatic calculation.
2355 """
2356 self.fitRange = fitRange
2357 self.inputRange = aMatrix.shape[0]
2358 self.doFitNormalizationOffset = doFitNormalizationOffset
2359 self.nImageChargePairs = nImageChargePairs
2360 self.fitMethod = fitMethod
2362 if self.fitRange != 0 and self.inputRange < self.fitRange:
2363 print("INFO : truncating input data at %d" % self.inputRange)
2364 self.fitRange = self.inputRange
2366 self.aMatrix = aMatrix[0:fitRange, 0:fitRange]
2367 self.aMatrixSigma = aMatrixSigma[0:fitRange, 0:fitRange]
2368 self.sqrtWeights = 1.0 / aMatrixSigma
2370 self.params = initialParams
2372 def fit(self):
2373 minner = Minimizer(
2374 self.computeWeightedResidual,
2375 self.params,
2376 )
2378 result = minner.minimize(
2379 method=self.fitMethod,
2380 max_nfev=20000,
2381 epsfcn=1.0e-8,
2382 ftol=1.0e-8,
2383 xtol=1.0e-8,
2384 gtol=0.0,
2385 )
2387 return result
2389 def getParamsDict(self):
2390 """
2391 Return a copy of the free parameters vector as a dictionary.
2392 """
2393 return self.params.valuesdict()
2395 def computeWeightedResidual(self, params=None):
2396 self.params = params
2397 m = self.model(params)
2398 w = self.sqrtWeights[:self.fitRange, :self.fitRange]
2399 y = self.aMatrix[:self.fitRange, :self.fitRange]
2400 # Multiply two 2-d arrays term by term:
2401 weightedResiduals = (w * (m - y))
2402 # The result has the same size as both of them.
2403 return weightedResiduals
2405 def model(self, params):
2406 m = self.rawModel(params)
2407 alpha = params['alpha']
2408 beta = params['beta']
2409 return alpha * m + beta
2411 def rawModel(self, params):
2412 # Get all parameters as a dictionary:
2413 parameterDict = self.getParamsDict()
2414 del parameterDict['alpha']
2415 del parameterDict['beta']
2416 del parameterDict['zsh_minus_zq']
2417 del parameterDict['zsv_minus_zq']
2419 # Push them into the electrostatic calculator
2420 c = ElectrostaticCcdGeom(parameterDict)
2422 # Compute the observables:
2423 # If you change the routine called here, also
2424 # change it in BoundaryShifts.__init__()
2425 m = c.evalAreaChangeSidesFast(
2426 self.fitRange,
2427 nImageChargePairs=self.nImageChargePairs,
2428 )
2429 m = m[:self.fitRange, :self.fitRange]
2431 return m
2433 def normalizeModel(self, m):
2434 """
2435 Compute optimal normalization and offset for the model.
2437 The overall normalization is a linear parameter. This
2438 method computes the optimal value given the other
2439 parameters.
2440 """
2441 fr = m.shape[0]
2442 sqrtw = self.sqrtWeights[:fr, :fr]
2443 w = sqrtw ** 2
2444 y = self.aMatrix[:fr, :fr]
2445 if self.doFitNormalizationOffset:
2446 sxx = (w * m * m).sum()
2447 sx = (w * m).sum()
2448 s1 = w.sum()
2449 sxy = (w * m * y).sum()
2450 sy = (w * y).sum()
2451 d = sxx * s1 - sx * sx
2452 a = (s1 * sxy - sx * sy) / d
2453 b = (-sx * sxy + sxx * sy) / d
2454 else:
2455 # Just scale
2456 a = (w * y * m).sum() / (w * m * m).sum()
2457 b = 0
2458 return a, b
2460 def computePixelDistortions(self, conversionWeights=None):
2461 """
2462 Compute pixel distortions using a probability distribution of
2463 conversion depths.
2465 If provided, conversionWeights is expected to be a tuple of
2466 (depth, probability). The routine computes the model corresponding
2467 to this probability distribution. If conversionWeights is not
2468 provided, then [(0, 1.)] is used as the distribution.
2470 Parameters
2471 ----------
2472 conversionWeights : tuple of (np.ndarray, np.ndarray), optional
2473 Tuple containing depths and their associated probabilities.
2475 Returns
2476 -------
2477 BoundaryShifts
2478 The computed boundary shifts for the pixel.
2479 """
2480 zf = self.params["thickness"].value
2481 zsh = self.params["zsh"].value
2482 zsv = self.params["zsv"].value
2483 if conversionWeights is None:
2484 conversionWeights = (np.array([0.0]), np.array([1.0]))
2486 r = None
2487 (d, p) = conversionWeights
2489 # Zero depths lower than the end of drift
2490 too_low = ((zf - d) < zsh) | ((zf - d) < zsv)
2491 p[too_low] = 0
2492 p /= p.sum() # Normalize to 1.
2493 for (depth, prob) in zip(d, p):
2494 if prob == 0:
2495 continue
2496 if r is None:
2497 r = prob * BoundaryShifts(
2498 electrostaticFit=self,
2499 zf=zf - depth
2500 )
2501 else:
2502 r = r + prob * BoundaryShifts(
2503 electrostaticFit=self,
2504 zf=zf - depth
2505 )
2507 return r
2510class BoundaryShifts:
2511 """
2512 Class to compute and store boundary shift values for CCD pixels based on
2513 electrostatic field models.
2515 This class calculates the shifts at the North, South, East, and West
2516 boundaries of each pixel, as well as the area change at the pixel sides,
2517 using parameters from an electrostatic fit and a specified integration
2518 depth `zf`. The shifts are computed using fast integration methods from
2519 the ElectrostaticCcdGeom model.
2521 Attributes
2522 ----------
2523 aN : np.ndarray
2524 Array of North boundary shifts for each pixel.
2525 aS : np.ndarray
2526 Array of South boundary shifts for each pixel.
2527 aE : np.ndarray
2528 Array of East boundary shifts for each pixel.
2529 aW : np.ndarray
2530 Array of West boundary shifts for each pixel.
2531 ath : np.ndarray
2532 Array of area changes at the pixel sides, including beta.
2533 athMinusBeta : np.ndarray
2534 Array of area changes at the pixel sides, excluding beta.
2535 """
2537 def __init__(self, electrostaticFit, zf):
2538 if zf <= 0:
2539 raise RuntimeError(
2540 f"Cannot integrate below the bottom of the pixel, zf={zf}."
2541 )
2543 paramDict = electrostaticFit.getParamsDict()
2544 del paramDict['alpha']
2545 del paramDict['beta']
2546 del paramDict['zsh_minus_zq']
2547 del paramDict['zsv_minus_zq']
2549 c = ElectrostaticCcdGeom(paramDict)
2550 fr = electrostaticFit.fitRange
2551 ii, jj = np.meshgrid(list(range(fr)), list(range(fr)))
2552 ii = ii.flatten()
2553 jj = jj.flatten()
2554 self.aN = np.ndarray((fr, fr))
2555 self.aS = np.zeros_like(self.aN)
2556 self.aE = np.zeros_like(self.aN)
2557 self.aW = np.zeros_like(self.aN)
2558 self.ath = np.zeros_like(self.aN)
2559 alpha = electrostaticFit.params['alpha'].value
2560 beta = electrostaticFit.params['beta'].value
2562 # alpha*rawModel+beta is the description of the measurements
2563 # We should not apply beta to the outcome, because beta is meant to
2564 # accomodate some long-range contamination (non-electrostatic)
2565 # of the covariance measurements.
2566 iMax, jMax = fr, fr
2567 self.aN = -alpha * c.integrateEyFieldFast(iMax, jMax, 1, zf=zf)
2568 self.aS = -alpha * c.integrateEyFieldFast(iMax, jMax, -1, zf=zf)
2569 self.aW = -alpha * c.integrateExFieldFast(iMax, jMax, -1, zf=zf)
2570 self.aE = -alpha * c.integrateExFieldFast(iMax, jMax, 1, zf=zf)
2571 self.ath = alpha * c.evalAreaChangeSidesFast(fr, zf=zf) + beta
2573 self.athMinusBeta = self.ath - beta
2575 def __rmul__(self, factor):
2576 """
2577 """
2578 res = copy.deepcopy(self)
2579 res.aN *= factor
2580 res.aS *= factor
2581 res.aE *= factor
2582 res.aW *= factor
2583 res.ath *= factor
2584 res.athMinusBeta *= factor
2585 return res
2587 def __add__(self, other):
2588 """
2589 """
2590 res = copy.deepcopy(self)
2591 res.aN += other.aN
2592 res.aS += other.aS
2593 res.aE += other.aE
2594 res.aW += other.aW
2595 res.ath += other.ath
2596 res.athMinusBeta += other.athMinusBeta
2597 return res
2600def calcEFieldCoulomb(xv, xqv):
2601 """
2602 xv = where, xqv = charge location.
2603 both should be numpy arrays.
2604 if qv is multi-d, the routine assumes that the
2605 physical coordinates (x,y,z) are patrolled by
2606 the last index
2607 """
2608 d = xv - xqv
2609 r3 = np.power((d**2).sum(axis=-1), 1.5)
2610 # anything more clever ?
2611 # of course d/r3 does not work
2612 return (d.T / r3.T).T
2615class ElectrostaticCcdGeom():
2616 def __init__(self, paramDict):
2617 """
2618 Dictionary of parameters containing the following keys:
2619 parameters : (all in microns)
2620 zq : altitude of the burried channel (microns)
2621 zsh : vertex altitude for horizontal boundaries
2622 zsv : vertex altitude for vertical boundaries
2623 a, b : size of the rectangular charge source
2624 thickness : thickness
2625 pixelsize : pixel size
2626 """
2627 zq = paramDict['zq']
2628 zsh = paramDict['zsh']
2629 zsv = paramDict['zsv']
2630 a = paramDict['a']
2631 b = paramDict['b']
2632 thickness = paramDict['thickness']
2633 pixelsize = paramDict['pixelsize']
2635 self.zq = np.fabs(zq)
2636 self.zsh = np.fabs(zsh)
2637 self.zsv = np.fabs(zsv)
2638 self.b = np.fabs(b)
2639 self.a = np.fabs(a)
2640 self.thickness = np.fabs(thickness)
2641 self.pixelsize = pixelsize
2643 self.nStepsZ = 100
2644 self.nStepsXY = 20
2646 # yields a ~ 1% precision of the field at z~10
2647 # if compared to the uniform sheet model (eField
2648 self.nChargeElements = 3
2650 # Set up weights
2651 x, w = leggauss(self.nStepsXY)
2652 self.integrationWeights = w*0.5
2654 # Abcissa refer to [-1, 1], we want [0, self.pixelsize]
2655 self.xyOffsets = (x+1) * 0.5 * self.pixelsize
2657 def calcEFieldCoulombChargeSheet(self, xv, xqv):
2658 """
2659 xv = where (the last index should address x,y,z.
2660 xqv = charge location
2661 Both Should be numpy arrays.
2662 if xv is multi-d, the routine assumes
2663 that the physical coordinates (x,y,z) are patrolled by the last index.
2664 Returns the electric field from a unitely charged horizontal rectangle
2665 centered at x_q of size 2a * 2b.
2666 The returned electric field assumes 4*pi*epsilon=1.
2667 """
2668 # Use Durand page 244 tome 1
2669 # four corners :
2670 x1 = xqv + np.array([self.a, self.b, 0])
2671 x2 = xqv + np.array([-self.a, self.b, 0])
2672 x3 = xqv + np.array([-self.a, -self.b, 0])
2673 x4 = xqv + np.array([self.a, -self.b, 0])
2675 # Distances to the four corners
2676 d1 = np.sqrt(((xv-x1)**2).sum(axis=-1))
2677 d2 = np.sqrt(((xv-x2)**2).sum(axis=-1))
2678 d3 = np.sqrt(((xv-x3)**2).sum(axis=-1))
2679 d4 = np.sqrt(((xv-x4)**2).sum(axis=-1))
2681 # Reserve the returned array
2682 r = np.ndarray(xv.shape)
2683 xvOffset = xv[..., 0] - xqv[0]
2684 yvOffset = xv[..., 1] - xqv[1]
2686 # Ex
2687 # note : if a or b goes to 0, the log is 0 and the denominator (last
2688 # line) is zero as well. So some expansion would be required
2689 ao = yvOffset + self.b
2690 bo = yvOffset - self.b
2691 co = xvOffset + self.a
2692 do = xvOffset - self.a
2693 # Ex (eq 105)
2694 r[..., 0] = np.log((d4+ao) * (d2+bo) / (d3+ao) / (d1+bo))
2695 # Ey (eq 106)
2696 r[..., 1] = np.log((d2+co) * (d4+do) / (d3+co) / (d1+do))
2697 # point source approximation
2698 # ret[...,2] = (4*self.a*self.b)*calcEFieldCoulomb(X,X_q)[...,2]
2699 # full expression for ez : p 244
2700 # ez (eq 111 is only valid if the x and y are "inside")
2701 # there is a discussion of the general case around Fig VI-18.
2702 z = xv[..., 2] - xqv[2]
2703 # it ressembles equation 111 but I flipped two signs
2704 r[..., 2] = (np.arctan(do * bo / z / d1)
2705 - np.arctan(bo * co / z / d2)
2706 + np.arctan(co * ao / z / d3)
2707 - np.arctan(ao * do / z / d4))
2709 # seems OK both "inside" and "outside"
2710 return r / (4 * self.a * self.b)
2712 def integrateEField(self, xv, component, z0, zf, nImageChargePairs=11):
2713 """
2714 Integrate transverse E Field along Z at point X (2 coordinates, last
2715 index). The coordinate of the field is given by component (0,
2716 or 1). at point X from the point charge The computation uses
2717 the dipole series trick. The number of dipoles is an optional
2718 argument. Odd numbers are better for what we are doing here.
2720 """
2721 # The integral of the field (x/r^3 dz from z1 to z2) reads
2722 # x/rho**2*(z2/r2-z1/r1) with rho2 = x**2+y**2
2723 # x/rho2 does not change when going through image sources
2724 # so we use them as arguments, dz1 and dz2 z{begin,end}--Xq[2]
2725 # just for test: if z0==zf, then return the field value
2726 if z0 != zf:
2727 def integral(rho2, x, dz1, dz2):
2728 r1 = np.sqrt(rho2 + dz1**2)
2729 r2 = np.sqrt(rho2 + dz2**2)
2730 return x * (dz2/r2 - dz1/r1) / rho2
2731 else: # see the comment above: return the value, not the integral.
2732 def integral(rho2, x, dz1, dz2):
2733 """
2734 x/r**3
2735 """
2736 r = np.sqrt(rho2 + dz1**2)
2737 vals = x / r**3
2738 return vals
2740 # Check validity
2741 if component not in [0, 1]:
2742 raise RuntimeError("Can only integrate electric field along component 0 or 1, "
2743 f"not {component}.")
2745 # Reserve the result array
2746 result = np.zeros(xv.shape[:-1])
2747 zqp = self.zq
2748 zqm = -zqp
2750 # 1st dipole
2751 # Generate a set of point charges to emulate
2752 # an extended distribution (size 2a*2b)
2753 xStep = (2 * self.a) / self.nChargeElements
2754 yStep = (2 * self.b) / self.nChargeElements
2755 xqs = -self.a + (np.linspace(0, self.nChargeElements-1, self.nChargeElements) + 0.5) * xStep
2756 yqs = -self.b + (np.linspace(0, self.nChargeElements-1, self.nChargeElements) + 0.5) * yStep
2757 for xq in xqs:
2758 for yq in yqs:
2759 dx = xv[..., 0] - xq
2760 dy = xv[..., 1] - yq
2761 dxv = [dx, dy]
2762 rho2 = dx**2 + dy**2
2763 result += integral(rho2, dxv[component], z0 - zqp, zf - zqp)
2764 # Construct image charge, change sign of z and q
2765 result -= integral(rho2, dxv[component], z0 - zqm, zf - zqm)
2766 result /= (self.nChargeElements * self.nChargeElements)
2768 # Remaining dipoles : no more extended charge
2769 # The (x,y) charge coordinates are 0, and common to all images:
2770 rho2 = xv[..., 0]**2 + xv[..., 1]**2
2771 xi = xv[..., component]
2772 for i in range(1, nImageChargePairs):
2773 odd = i%2
2774 if odd:
2775 ztmp = (2*self.thickness) - zqm
2776 zqm = (2*self.thickness) - zqp
2777 zqp = ztmp
2778 else:
2779 ztmp = -zqm
2780 zqm = -zqp
2781 zqp = ztmp
2782 result += integral(rho2, xi, z0 - zqp, zf - zqp)
2783 result -= integral(rho2, xi, z0 - zqm, zf - zqm)
2785 # 8.85418781e-12 (F/m) * 1e-6 (microns/m) / 1.602e-19 (C/electron)
2786 # = 55.26958
2787 # eps_r_Si = 12, so eps = 55.27*12 = 663.23 el/V/um
2788 # This routine hence returns the field sourced by -1 electron
2789 result *= 1 / (4*np.pi*660)
2791 return result
2793 def eField(self, xv, nImageChargePairs=11):
2794 """
2795 Field at point xv from the point charge
2796 if xv is multi-dimensional, x,y,z should be represented
2797 by the last index ([0:3]).
2798 The computation uses the dipole series trick. The number
2799 of dipoles is an optional argument. Odd numbers are better
2800 for what we are doing here.
2801 """
2802 # Put the center of the source pixel at (x, y) = (0, 0)
2803 # This assumption is relied on in Eval_Eth and Eval_Etv
2804 xqv = np.array([0, 0, self.zq])
2806 # Image charge w.r.t. the parallel clock lines
2807 xqvImage = np.array([xqv[0], xqv[1], -xqv[2]])
2809 # Split the calculation in 2 parts: approximation
2810 # when far from the source, image method when near.
2811 rho = np.sqrt(xv[..., 0]**2 + xv[..., 1]**2)
2813 # Field near the charge cloud
2814 # First dipole
2815 # The points lower than half the thickness of the pixel
2816 # are considered "close"
2817 pointsNearChargeCloudIdxs = (rho/self.thickness < 2)
2818 xvNear = xv[pointsNearChargeCloudIdxs]
2819 eFieldNear = self.calcEFieldCoulombChargeSheet(xvNear, xqv)
2820 eFieldNear -= self.calcEFieldCoulombChargeSheet(xvNear, xqvImage)
2822 # Next dipoles
2823 for i in range(1, nImageChargePairs):
2824 if (i % 2): # Is odd?
2825 xqv[2] = 2 * self.thickness - xqv[2]
2826 xqvImage[2] = 2 * self.thickness - xqvImage[2]
2827 eFieldNear += calcEFieldCoulomb(xvNear, xqvImage)
2828 eFieldNear -= calcEFieldCoulomb(xvNear, xqv)
2829 else:
2830 xqv[2] = -xqv[2]
2831 xqvImage[2] = -xqvImage[2]
2832 eFieldNear += calcEFieldCoulomb(xvNear, xqv)
2833 eFieldNear -= calcEFieldCoulomb(xvNear, xqvImage)
2835 # Field far from the charge cloud
2836 xvFar = xv[~pointsNearChargeCloudIdxs]
2837 rhoFar = rho[~pointsNearChargeCloudIdxs]
2839 # Jon Pumplin, Am. Jour. Phys. 37,7 (1969), eq 7
2840 # When changing coordinate system (shift along z), cos -> sin.
2841 # And since this only applies far from the source, the latter
2842 # can be regarded as point-like.
2843 # I checked the continuity over the separation point.
2844 factor = -np.sin(np.pi * self.zq / self.thickness)
2845 factor *= np.sin(np.pi * xvFar[..., 2] / self.thickness)
2846 factor *= np.exp(-np.pi * rhoFar / self.thickness)
2847 factor *= np.sqrt(8 / rhoFar / self.thickness)
2848 factor *= (-np.pi / self.thickness - 0.5/rhoFar)
2849 eFieldFar = np.zeros_like(xvFar)
2850 eFieldFar[..., 0] = xvFar[..., 0] / (rhoFar*factor)
2851 eFieldFar[..., 1] = xvFar[..., 1] / (rhoFar*factor)
2853 # aggregate the results
2854 eField = np.zeros_like(xv)
2855 eField[pointsNearChargeCloudIdxs] = eFieldNear
2856 eField[~pointsNearChargeCloudIdxs] = eFieldFar
2858 # epsilon0 = 55 el/V/micron
2859 # 8.85418781e-12 (F/m) * 1e-6 (microns/m) / 1.602e-19 (C/electron)
2860 # = 55.26958
2861 # eps_r_Si = 12, so eps = 55.27*12 = 663.23 el/V/um
2862 # This routine hence returns the field sourced by -1 electron
2863 eField *= 1 / (4*np.pi*660)
2865 return eField
2867 def integrateExFieldFast(self, iMax, jMax, leftOrRight, zf=None, nImageChargePairs=11):
2868 """
2869 Computes the integrals of Ex along z from
2870 self.zsh to zf. The returned array is 2d of
2871 shape (iMax, jMax).
2872 """
2873 zf = self.thickness if zf is None else zf
2874 assert zf > self.zsv
2875 ii, jj = np.indices((iMax, jMax))
2876 yy = (jj - 0.5)[:, :, np.newaxis] * self.pixelsize + self.xyOffsets[np.newaxis, np.newaxis, :]
2877 xx = (ii + 0.5 * leftOrRight) * self.pixelsize
2878 xx = np.broadcast_to(xx[..., None], yy.shape)
2879 xv = np.stack([xx, yy], axis=-1)
2881 # By definition of zsh, we integrate from zsv to zf,
2882 # and divide by the pixel size to be consistent with Eval_ET{v,h}
2883 w = np.broadcast_to(self.integrationWeights, yy.shape)
2884 integral = np.sum(
2885 w * self.integrateEField(xv, 0, self.zsv, zf,
2886 nImageChargePairs=nImageChargePairs),
2887 axis=2,
2888 )
2889 return leftOrRight * integral/self.pixelsize
2891 def integrateEyFieldFast(self, iMax, jMax, topOrBottom, zf=None, nImageChargePairs=11):
2892 """
2893 Computes the integral of Ey along z from
2894 self.zsh to zf. The returned array is
2895 2d [0:iMax, 0:jMax].
2896 """
2897 zf = self.thickness if zf is None else zf
2898 # assert zf > self.zsh
2899 ii, jj = np.indices((iMax, jMax))
2900 xx = (ii - 0.5)[:, :, np.newaxis] * self.pixelsize + self.xyOffsets[np.newaxis, np.newaxis, :]
2901 yy = (jj + 0.5*topOrBottom) * self.pixelsize
2902 yy = np.broadcast_to(yy[..., None], xx.shape)
2903 xv = np.stack([xx, yy], axis=-1)
2905 # By definition of zsh, we integrate from zsh to zf,
2906 # and divide by the pixel size to be consistent with Eval_ET{v,h}
2907 # integrate
2908 w = np.broadcast_to(self.integrationWeights, yy.shape) # Add leading dimensions
2909 integral = np.sum(
2910 w * self.integrateEField(xv, 1, self.zsh, zf,
2911 nImageChargePairs=nImageChargePairs),
2912 axis=2,
2913 )
2914 return topOrBottom * integral / self.pixelsize
2916 def eFieldTransverseToHorizontalPixelEdgeGrid(self, i, j, topOrBottom, zf=None):
2917 """
2918 Returns the field transverse to the horizontal pixel boundary.
2919 return a 2d array of shifts at evenly spaced points in x and z.
2920 normalized in units of pixel size, for a unit charge.
2921 The returned array has 3 indices:
2922 [along the pixel side, along the drift,E-field coordinate].
2923 The resutl is multiplied by the z- and x- steps, so that the
2924 sum is the averge over x, divided by the pixel size.
2925 """
2926 assert np.abs(topOrBottom) == 1
2927 zf = self.thickness if zf is None else zf
2929 # by definition of zsh, we integrate from zsh to zf:
2930 zStep = (zf - self.zsh) / self.nStepsZ
2931 xyStep = self.pixelsize / self.nStepsXY
2933 z = self.zsh + (np.linspace(0, self.nStepsZ - 1, num=self.nStepsZ) + 0.5) * zStep
2934 x = (i - 0.5)*self.pixelsize + (np.linspace(0, self.nStepsXY - 1, self.nStepsXY) + 0.5) * xyStep
2936 xx, zz = np.meshgrid(x, z)
2937 yy = np.ones(xx.shape) * (j + 0.5*topOrBottom) * self.pixelsize
2938 xv = np.array([xx, yy, zz]).T
2940 return self.eField(xv) * zStep * xyStep
2942 def averageHorizontalBoundaryShift(self, i, j, topOrBottom, zf=None):
2943 """
2944 Integrate the field transverse to the horizontal pixel boundary
2945 """
2946 sum = self.eFieldTransverseToHorizontalPixelEdgeGrid(i, j, topOrBottom, zf)[..., 1].sum()
2947 # we want the integral over z and the average over x,
2948 # divided by the pixel size, with a sign that defines
2949 # if in moves inside or outside. Here is it:
2950 return sum * (topOrBottom / (self.pixelsize**2))
2952 def corner_shift_h2(self, i, j, topOrBottom, zf=None): # Do we even need this??
2953 E = self.eFieldTransverseToHorizontalPixelEdgeGrid(i, j, topOrBottom, zf)[..., 1]
2954 # Integrate over z (multiplication by the
2955 # step done in the calling routine)
2956 intz = E.sum(axis=1)
2957 x = range(intz.shape[0])
2958 p = np.polyfit(x, intz, 1)
2959 return p[0] * -0.5 + p[1], p[0] * (x[-1] + 0.5) + p[1]
2961 def eFieldTransverseToVerticalPixelEdgeGrid(self, i, j, leftOrRight, zf=None):
2962 """
2963 Returns the field transverse to the vertical pixel boundary.
2964 return a 3d array of shifts at evenly spaced points in y and z.
2965 Ex,y,z is indexed by the last index.
2966 """
2967 # Determine the depth of the photon conversion
2968 zf = self.thickness if zf is None else zf
2970 # The source charge is at x,y=0
2971 zStep = (zf-self.zsv) / (self.nStepsZ)
2972 xyStep = self.pixelsize / (self.nStepsXY)
2974 z = self.zsv + (np.linspace(0, self.nStepsZ - 1, self.nStepsZ) + 0.5) * zStep
2975 y = (j - 0.5)*self.pixelsize + (np.linspace(0, self.nStepsXY - 1, self.nStepsXY) + 0.5) * xyStep
2977 yy, zz = np.meshgrid(y, z)
2978 xx = np.ones(yy.shape) * (i + 0.5*leftOrRight) * self.pixelsize
2979 xv = np.array([xx, yy, zz]).T
2981 return self.eField(xv) * zStep * xyStep
2983 def averageVerticalBoundaryShift(self, i, j, leftOrRight, zf=None):
2984 """
2985 Average shift of the vertical boundary of pixel (i j).
2986 """
2987 sum = self.eFieldTransverseToVerticalPixelEdgeGrid(i, j, leftOrRight, zf)[..., 0].sum()
2988 # we want the integral over z and the average over x,
2989 # divided by the pixel size, with a sign that defines
2990 # if in moves inside or outside. Here is it:
2991 return sum * (leftOrRight / (self.pixelsize**2))
2993 def dxdy(self, iMax, zf=None):
2994 """
2995 corner shifts calculations. The returned array are larger by 1
2996 than iMax, because there are more corners than pixels
2997 """
2998 horiztonalShifts = np.ndarray((iMax, iMax))
2999 verticalShifts = np.ndarray((iMax, iMax))
3000 for i in range(iMax):
3001 for j in range(iMax):
3002 horiztonalShifts[i, j] = self.averageHorizontalBoundaryShift(i + 0.5, j, 1, zf)
3003 verticalShifts[i, j] = self.averageVerticalBoundaryShift(i, j + 0.5, 1, zf)
3005 # Parametrize the corner shifts of iMax pixels: iMax+1
3006 # corners in each direction
3007 dx = np.zeros((iMax+1, iMax+1))
3008 dy = dx + 0.
3009 dx[1:, 1:] = verticalShifts
3010 dx[0, 1:] = -verticalShifts[0, :] # leftmost column
3011 dx[:, 0] = dx[:, 1] # bottom row
3012 dy[1:, 1:] = horiztonalShifts
3013 dy[1:, 0] = -horiztonalShifts[:, 0] # bottom row
3014 dy[0, :] = dy[1, :] # leftmost column
3016 return dx, dy
3018 def evalAreaChangeCorners(self, iMax, zf=None):
3019 """
3020 pixel area alterations computed through corner shifts
3021 """
3022 dx, dy = self.dxdy(iMax, zf)
3023 areaChange = dx[1:, 1:] - dx[:-1, 1:] + dx[1:, :-1] - dx[:-1, :-1]
3024 areaChange += dy[1:, 1:] - dy[1:, :-1] + dy[:-1, 1:] - dy[:-1, :-1]
3025 return -0.5*areaChange
3027 def evalAreaChangeSidesFast(self, iMax, zf=None, nImageChargePairs=11):
3028 """
3029 Same as EvalAreaChangeSides, but uses direct integration.
3030 it evaluates the divergence of the discrete boundary
3031 displacement field.
3032 This routine groups the calls to the field computing routines.
3033 """
3034 horiztonalShifts = np.ndarray((iMax, iMax+1))
3035 verticalShifts = np.zeros_like(horiztonalShifts.T)
3036 horiztonalShifts[:, 1:] = self.integrateEyFieldFast(iMax, iMax, 1, zf,
3037 nImageChargePairs=nImageChargePairs)
3038 verticalShifts[1:, :] = self.integrateExFieldFast(iMax, iMax, 1, zf,
3039 nImageChargePairs=nImageChargePairs)
3041 # special case for [0,j] and [i,0] (they have two opposite values)
3042 horiztonalShifts[:, 0] = -horiztonalShifts[:, 1]
3043 verticalShifts[0, :] = -verticalShifts[1, :]
3045 # the divergence
3046 areaChange = verticalShifts[1:, :]
3047 areaChange -= verticalShifts[:-1, :]
3048 areaChange += horiztonalShifts[:, 1:]
3049 areaChange -= horiztonalShifts[:, :-1]
3050 areaChange *= -1 # Pixel area is decreased
3052 return areaChange