Coverage for python / lsst / ip / isr / linearize.py: 11%
334 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:30 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:30 +0000
1#
2# LSST Data Management System
3# Copyright 2016 AURA/LSST.
4#
5# This product includes software developed by the
6# LSST Project (http://www.lsst.org/).
7#
8# This program is free software: you can redistribute it and/or modify
9# it under the terms of the GNU General Public License as published by
10# the Free Software Foundation, either version 3 of the License, or
11# (at your option) any later version.
12#
13# This program is distributed in the hope that it will be useful,
14# but WITHOUT ANY WARRANTY; without even the implied warranty of
15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16# GNU General Public License for more details.
17#
18# You should have received a copy of the LSST License Statement and
19# the GNU General Public License along with this program. If not,
20# see <http://www.lsstcorp.org/LegalNotices/>.
21#
23__all__ = [
24 "Linearizer",
25 "LinearizeBase",
26 "LinearizeLookupTable",
27 "LinearizeSquared",
28 "LinearizeProportional",
29 "LinearizePolynomial",
30 "LinearizeSpline",
31 "LinearizeDoubleSpline",
32 "LinearizeNone",
33]
35import abc
36import numpy as np
37from scipy.interpolate import Akima1DInterpolator
39from astropy.table import Table
41from lsst.pipe.base import Struct
42from lsst.geom import Box2I, Point2I, Extent2I
43from .applyLookupTable import applyLookupTable
44from .calibType import IsrCalib
45from .isrFunctions import isTrimmedImage
48class Linearizer(IsrCalib):
49 """Parameter set for linearization.
51 These parameters are included in `lsst.afw.cameraGeom.Amplifier`, but
52 should be accessible externally to allow for testing.
54 Parameters
55 ----------
56 table : `numpy.array`, optional
57 Lookup table; a 2-dimensional array of floats:
59 - one row for each row index (value of coef[0] in the amplifier)
60 - one column for each image value
62 To avoid copying the table the last index should vary fastest
63 (numpy default "C" order)
64 detector : `lsst.afw.cameraGeom.Detector`, optional
65 Detector object. Passed to self.fromDetector() on init.
66 log : `logging.Logger`, optional
67 Logger to handle messages.
68 kwargs : `dict`, optional
69 Other keyword arguments to pass to the parent init.
71 Raises
72 ------
73 RuntimeError
74 Raised if the supplied table is not 2D, or if the table has fewer
75 columns than rows (indicating that the indices are swapped).
77 Notes
78 -----
79 The linearizer attributes stored are:
81 hasLinearity : `bool`
82 Whether a linearity correction is defined for this detector.
83 override : `bool`
84 Whether the detector parameters should be overridden.
85 ampNames : `list` [`str`]
86 List of amplifier names to correct.
87 inputGain : `dict` [`str`, `float`]
88 Dictionary keyed by amplifirer name containing the input
89 gain from the PTC used to construct this linearizer.
90 inputTurnoff : `dict` [`str`, `float`]
91 Dictionary keyed by amplifier name containing the input
92 turnoff from the PTC used to construct this linearizer.
93 linearityCoeffs : `dict` [`str`, `np.ndarray`]
94 Coefficients to use in correction. Indexed by amplifier
95 names. The format of the array depends on the type of
96 correction to apply.
97 linearityType : `dict` [`str`, `str`]
98 Type of correction to use, indexed by amplifier names.
99 linearityBBox : `dict` [`str`, `lsst.geom.Box2I`]
100 Bounding box the correction is valid over, indexed by
101 amplifier names.
102 fitParams : `dict` [`str`, `np.ndarray`], optional
103 Linearity fit parameters used to construct the correction
104 coefficients, indexed as above.
105 fitParamsErr : `dict` [`str`, `np.ndarray`], optional
106 Uncertainty values of the linearity fit parameters used to
107 construct the correction coefficients, indexed as above.
108 fitChiSq : `dict` [`str`, `float`], optional
109 Chi-squared value of the linearity fit, indexed as above.
110 fitResiduals : `dict` [`str`, `np.ndarray`], optional
111 Residuals of the fit, indexed as above. Used for
112 calculating photodiode corrections
113 fitResidualsSigmaMad : `dict` [`str`, `float`], optional
114 Robust median-absolute-deviation of fit residuals, scaled
115 by the signal level.
116 fitResidualsUnmasked : `dict` [`str`, `np.ndarray`], optional
117 Same as fitResiduals, but all outliers are included and
118 not masked as nans.
119 fitResidualsModel : `dict` [`str`, `np.ndarray`], optional
120 The model count level for each of the fitResiduals.
121 linearFit : The linear fit to the low flux region of the curve.
122 [intercept, slope].
123 tableData : `np.ndarray`, optional
124 Lookup table data for the linearity correction.
125 inputAbscissa : `dict` [`str`, `np.ndarray`], optional
126 Input abscissa used to construct linearizer (usually photodiode
127 or exposure time).
128 inputOrdinate : `dict` [`str`, `np.ndarray`], optional
129 Input ordinate used to construct linearizer (raw mean counts).
130 inputMask : `dict` [`str`, `np.ndarray`], optional
131 Input mask used for the fitting.
132 inputGroupingIndex : `dict` [`str`, `np.ndarray`], optional
133 Input grouping index used for fitting.
134 inputNormalization : `dict` [`str`, `np.ndarray`], optional
135 Input normalization that was applied to the abscissa for
136 each pair from the PTC used for the linearization fit.
137 absoluteReferenceAmplifier : `str`, optional
138 Amplifier used for the reference for absolute linearization
139 (DoubleSpline) mode.
141 Version 1.4 adds ``linearityTurnoff`` and ``linearityMaxSignal``.
142 Version 1.5 adds ``fitResidualsUnmasked``, ``inputAbscissa``,
143 ``inputOrdinate``, ``inputMask``, ``inputGroupingIndex``,
144 ``fitResidualsModel``, and ``inputNormalization``.
145 Version 1.6 adds ``absoluteReferenceAmplifier``.
146 Version 1.7 adds ``inputGain``.
147 Version 1.8 adds ``inputTurnoff``.
148 """
149 _OBSTYPE = "LINEARIZER"
150 _SCHEMA = 'Gen3 Linearizer'
151 _VERSION = 1.8
153 def __init__(self, table=None, **kwargs):
154 self.hasLinearity = False
155 self.override = False
157 self.ampNames = list()
158 self.inputGain = dict()
159 self.inputTurnoff = dict()
160 self.linearityCoeffs = dict()
161 self.linearityType = dict()
162 self.linearityBBox = dict()
163 self.inputAbscissa = dict()
164 self.inputOrdinate = dict()
165 self.inputMask = dict()
166 self.inputGroupingIndex = dict()
167 self.inputNormalization = dict()
168 self.fitParams = dict()
169 self.fitParamsErr = dict()
170 self.fitChiSq = dict()
171 self.fitResiduals = dict()
172 self.fitResidualsSigmaMad = dict()
173 self.fitResidualsUnmasked = dict()
174 self.fitResidualsModel = dict()
175 self.linearFit = dict()
176 self.linearityTurnoff = dict()
177 self.linearityMaxSignal = dict()
178 self.absoluteReferenceAmplifier = ""
179 self.tableData = None
180 if table is not None:
181 if len(table.shape) != 2:
182 raise RuntimeError("table shape = %s; must have two dimensions" % (table.shape,))
183 if table.shape[1] < table.shape[0]:
184 raise RuntimeError("table shape = %s; indices are switched" % (table.shape,))
185 self.tableData = np.array(table, order="C")
187 # The linearizer is always natively in adu because it
188 # is computed prior to computing gains.
189 self.linearityUnits = 'adu'
191 super().__init__(**kwargs)
192 self.requiredAttributes.update(['hasLinearity', 'override',
193 'ampNames', 'inputGain', 'inputTurnoff',
194 'linearityCoeffs', 'linearityType', 'linearityBBox',
195 'fitParams', 'fitParamsErr', 'fitChiSq',
196 'fitResiduals', 'fitResidualsSigmaMad', 'linearFit', 'tableData',
197 'units', 'linearityTurnoff', 'linearityMaxSignal',
198 'fitResidualsUnmasked', 'inputAbscissa', 'inputOrdinate',
199 'inputMask', 'inputGroupingIndex', 'fitResidualsModel',
200 'inputNormalization', 'absoluteReferenceAmplifier'])
202 def updateMetadata(self, setDate=False, **kwargs):
203 """Update metadata keywords with new values.
205 This calls the base class's method after ensuring the required
206 calibration keywords will be saved.
208 Parameters
209 ----------
210 setDate : `bool`, optional
211 Update the CALIBDATE fields in the metadata to the current
212 time. Defaults to False.
213 kwargs :
214 Other keyword parameters to set in the metadata.
215 """
216 kwargs['HAS_LINEARITY'] = self.hasLinearity
217 kwargs['OVERRIDE'] = self.override
218 kwargs['HAS_TABLE'] = self.tableData is not None
219 kwargs['LINEARITY_UNITS'] = self.linearityUnits
221 super().updateMetadata(setDate=setDate, **kwargs)
223 def fromDetector(self, detector):
224 """Read linearity parameters from a detector.
226 Parameters
227 ----------
228 detector : `lsst.afw.cameraGeom.detector`
229 Input detector with parameters to use.
231 Returns
232 -------
233 calib : `lsst.ip.isr.Linearizer`
234 The calibration constructed from the detector.
235 """
236 self._detectorName = detector.getName()
237 self._detectorSerial = detector.getSerial()
238 self._detectorId = detector.getId()
239 self.hasLinearity = True
241 # Do not translate Threshold, Maximum, Units.
242 for amp in detector.getAmplifiers():
243 ampName = amp.getName()
244 self.ampNames.append(ampName)
245 self.linearityType[ampName] = amp.getLinearityType()
246 self.linearityCoeffs[ampName] = amp.getLinearityCoeffs()
247 self.linearityBBox[ampName] = amp.getBBox()
249 # Detector linearizers (legacy) are assumed to be adu units.
250 self.linearityUnits = 'adu'
252 return self
254 @classmethod
255 def fromDict(cls, dictionary):
256 """Construct a calibration from a dictionary of properties
258 Parameters
259 ----------
260 dictionary : `dict`
261 Dictionary of properties
263 Returns
264 -------
265 calib : `lsst.ip.isr.Linearity`
266 Constructed calibration.
268 Raises
269 ------
270 RuntimeError
271 Raised if the supplied dictionary is for a different
272 calibration.
273 """
275 calib = cls()
277 if calib._OBSTYPE != dictionary['metadata']['OBSTYPE']:
278 raise RuntimeError(f"Incorrect linearity supplied. Expected {calib._OBSTYPE}, "
279 f"found {dictionary['metadata']['OBSTYPE']}")
281 calib.setMetadata(dictionary['metadata'])
283 calib.hasLinearity = dictionary.get('hasLinearity',
284 dictionary['metadata'].get('HAS_LINEARITY', False))
285 calib.override = dictionary.get('override', True)
287 # Old linearizers which do not have linearityUnits are
288 # assumed to be adu because that's all that has been
289 # supported.
290 calib.linearityUnits = dictionary.get('linearityUnits', 'adu')
292 if calib.hasLinearity:
293 for ampName in dictionary['amplifiers']:
294 amp = dictionary['amplifiers'][ampName]
295 calib.ampNames.append(ampName)
296 # If gain = None on isrFunction.gainContext(),
297 # the default amp geometry gains are used
298 # in IsrTask*
299 calib.inputGain[ampName] = amp.get('inputGain', np.nan)
300 calib.inputTurnoff[ampName] = amp.get('inputTurnoff', np.inf)
301 calib.linearityCoeffs[ampName] = np.array(amp.get('linearityCoeffs', [0.0]))
302 calib.linearityType[ampName] = amp.get('linearityType', 'None')
303 calib.linearityBBox[ampName] = amp.get('linearityBBox', None)
305 calib.inputAbscissa[ampName] = np.array(amp.get('inputAbscissa', [0.0]))
306 calib.inputOrdinate[ampName] = np.array(amp.get('inputOrdinate', [0.0]))
307 calib.inputMask[ampName] = np.array(amp.get('inputMask', [False]))
308 calib.inputGroupingIndex[ampName] = np.array(amp.get('inputGroupingIndex', [0.0]))
309 calib.inputNormalization[ampName] = np.array(amp.get('inputNormalization', [1.0]))
311 calib.fitParams[ampName] = np.array(amp.get('fitParams', [0.0]))
312 calib.fitParamsErr[ampName] = np.array(amp.get('fitParamsErr', [0.0]))
313 calib.fitChiSq[ampName] = amp.get('fitChiSq', np.nan)
314 calib.fitResiduals[ampName] = np.array(amp.get('fitResiduals', [0.0]))
315 calib.fitResidualsSigmaMad[ampName] = np.array(amp.get('fitResidualsSigmaMad', np.nan))
316 calib.fitResidualsUnmasked[ampName] = np.array(amp.get('fitResidualsUnmasked', [0.0]))
317 calib.fitResidualsModel[ampName] = np.array(amp.get('fitResidualsModel', [0.0]))
318 calib.linearFit[ampName] = np.array(amp.get('linearFit', [0.0]))
320 calib.linearityTurnoff[ampName] = np.array(amp.get('linearityTurnoff', np.nan))
321 calib.linearityMaxSignal[ampName] = np.array(amp.get('linearityMaxSignal', np.nan))
323 calib.tableData = dictionary.get('tableData', None)
324 if calib.tableData:
325 calib.tableData = np.array(calib.tableData)
327 calib.absoluteReferenceAmplifier = dictionary.get(
328 'absoluteReferenceAmplifier', calib.absoluteReferenceAmplifier,
329 )
331 return calib
333 def toDict(self):
334 """Return linearity parameters as a dict.
336 Returns
337 -------
338 outDict : `dict`:
339 """
340 self.updateMetadata()
342 outDict = {'metadata': self.getMetadata(),
343 'detectorName': self._detectorName,
344 'detectorSerial': self._detectorSerial,
345 'detectorId': self._detectorId,
346 'hasTable': self.tableData is not None,
347 'amplifiers': dict(),
348 'linearityUnits': self.linearityUnits,
349 }
350 for ampName in self.linearityType:
351 outDict['amplifiers'][ampName] = {
352 'inputGain': self.inputGain[ampName],
353 'inputTurnoff': self.inputTurnoff[ampName],
354 'linearityType': self.linearityType[ampName],
355 'linearityCoeffs': self.linearityCoeffs[ampName].tolist(),
356 'linearityBBox': self.linearityBBox[ampName],
357 'inputAbscissa': self.inputAbscissa[ampName].tolist(),
358 'inputOrdinate': self.inputOrdinate[ampName].tolist(),
359 'inputMask': self.inputMask[ampName].tolist(),
360 'inputGroupingIndex': self.inputGroupingIndex[ampName].tolist(),
361 'inputNormalization': self.inputNormalization[ampName].tolist(),
362 'fitParams': self.fitParams[ampName].tolist(),
363 'fitParamsErr': self.fitParamsErr[ampName].tolist(),
364 'fitChiSq': self.fitChiSq[ampName],
365 'fitResiduals': self.fitResiduals[ampName].tolist(),
366 'fitResidualsSigmaMad': self.fitResidualsSigmaMad[ampName],
367 'fitResidualsUnmasked': self.fitResidualsUnmasked[ampName].tolist(),
368 'fitResidualsModel': self.fitResidualsModel[ampName].tolist(),
369 'linearFit': self.linearFit[ampName].tolist(),
370 'linearityTurnoff': self.linearityTurnoff[ampName],
371 'linearityMaxSignal': self.linearityMaxSignal[ampName],
372 }
373 if self.tableData is not None:
374 outDict['tableData'] = self.tableData.tolist()
376 outDict['absoluteReferenceAmplifier'] = self.absoluteReferenceAmplifier
378 return outDict
380 @classmethod
381 def fromTable(cls, tableList):
382 """Read linearity from a FITS file.
384 This method uses the `fromDict` method to create the
385 calibration, after constructing an appropriate dictionary from
386 the input tables.
388 Parameters
389 ----------
390 tableList : `list` [`astropy.table.Table`]
391 afwTable read from input file name.
393 Returns
394 -------
395 linearity : `~lsst.ip.isr.linearize.Linearizer``
396 Linearity parameters.
398 Notes
399 -----
400 The method reads a FITS file with 1 or 2 extensions. The metadata is
401 read from the header of extension 1, which must exist. Then the table
402 is loaded, and the ['AMPLIFIER_NAME', 'TYPE', 'COEFFS', 'BBOX_X0',
403 'BBOX_Y0', 'BBOX_DX', 'BBOX_DY'] columns are read and used to set each
404 dictionary by looping over rows.
405 Extension 2 is then attempted to read in the try block (which only
406 exists for lookup tables). It has a column named 'LOOKUP_VALUES' that
407 contains a vector of the lookup entries in each row.
408 """
409 coeffTable = tableList[0]
411 metadata = coeffTable.meta
412 inDict = dict()
413 inDict['metadata'] = metadata
414 inDict['hasLinearity'] = metadata.get('HAS_LINEARITY', False)
415 inDict['amplifiers'] = dict()
416 inDict['linearityUnits'] = metadata.get('LINEARITY_UNITS', 'adu')
418 for record in coeffTable:
419 ampName = record['AMPLIFIER_NAME']
420 inputGain = record['INPUT_GAIN'] if 'INPUT_GAIN' in record.columns else np.nan
421 inputTurnoff = record['INPUT_TURNOFF'] if 'INPUT_TURNOFF' in record.columns else np.inf
422 inputAbscissa = record['INP_ABSCISSA'] if 'INP_ABSCISSA' in record.columns else np.array([0.0])
423 inputOrdinate = record['INP_ORDINATE'] if 'INP_ORDINATE' in record.columns else np.array([0.0])
424 inputMask = record['INP_MASK'] if 'INP_MASK' in record.columns else np.array([False])
425 inputGroupingIndex = record['INP_GROUPING_INDEX'] if 'INP_GROUPING_INDEX' in record.columns \
426 else np.array([0])
427 inputNormalization = record['INP_NORMALIZATION'] if 'INP_NORMALIZATION' in record.columns \
428 else np.array([1.0])
429 fitParams = record['FIT_PARAMS'] if 'FIT_PARAMS' in record.columns else np.array([0.0])
430 fitParamsErr = record['FIT_PARAMS_ERR'] if 'FIT_PARAMS_ERR' in record.columns else np.array([0.0])
431 fitChiSq = record['RED_CHI_SQ'] if 'RED_CHI_SQ' in record.columns else np.nan
432 fitResiduals = record['FIT_RES'] if 'FIT_RES' in record.columns else np.array([0.0])
433 fitResidualsSigmaMad = record['FIT_RES_SIGMAD'] if 'FIT_RES_SIGMAD' in record.columns else np.nan
434 fitResidualsUnmasked = record['FIT_RES_UNMASKED'] \
435 if 'FIT_RES_UNMASKED' in record.columns else np.array([0.0])
436 fitResidualsModel = record['FIT_RES_MODEL'] \
437 if 'FIT_RES_MODEL' in record.columns else np.array([0.0])
438 linearFit = record['LIN_FIT'] if 'LIN_FIT' in record.columns else np.array([0.0])
440 linearityTurnoff = record['LINEARITY_TURNOFF'] if 'LINEARITY_TURNOFF' in record.columns \
441 else np.nan
442 linearityMaxSignal = record['LINEARITY_MAX_SIGNAL'] if 'LINEARITY_MAX_SIGNAL' in record.columns \
443 else np.nan
445 inDict['amplifiers'][ampName] = {
446 'inputGain': inputGain,
447 'inputTurnoff': inputTurnoff,
448 'linearityType': record['TYPE'],
449 'linearityCoeffs': record['COEFFS'],
450 'linearityBBox': Box2I(Point2I(record['BBOX_X0'], record['BBOX_Y0']),
451 Extent2I(record['BBOX_DX'], record['BBOX_DY'])),
452 'inputAbscissa': inputAbscissa,
453 'inputOrdinate': inputOrdinate,
454 'inputMask': inputMask,
455 'inputGroupingIndex': inputGroupingIndex,
456 'inputNormalization': inputNormalization,
457 'fitParams': fitParams,
458 'fitParamsErr': fitParamsErr,
459 'fitChiSq': fitChiSq,
460 'fitResiduals': fitResiduals,
461 'fitResidualsSigmaMad': fitResidualsSigmaMad,
462 'fitResidualsUnmasked': fitResidualsUnmasked,
463 'fitResidualsModel': fitResidualsModel,
464 'linearFit': linearFit,
465 'linearityTurnoff': linearityTurnoff,
466 'linearityMaxSignal': linearityMaxSignal,
467 }
469 if len(tableList) > 1:
470 tableData = tableList[1]
471 inDict['tableData'] = [record['LOOKUP_VALUES'] for record in tableData]
473 if 'ABS_REF_AMP' in coeffTable.columns:
474 inDict['absoluteReferenceAmplifier'] = str(coeffTable['ABS_REF_AMP'][0])
475 else:
476 inDict['absoluteReferenceAmplifier'] = ''
478 return cls().fromDict(inDict)
480 def toTable(self):
481 """Construct a list of tables containing the information in this
482 calibration.
484 The list of tables should create an identical calibration
485 after being passed to this class's fromTable method.
487 Returns
488 -------
489 tableList : `list` [`astropy.table.Table`]
490 List of tables containing the linearity calibration
491 information.
492 """
494 tableList = []
495 self.updateMetadata()
496 catalog = Table([{'AMPLIFIER_NAME': ampName,
497 'INPUT_GAIN': self.inputGain[ampName],
498 'INPUT_TURNOFF': self.inputTurnoff[ampName],
499 'TYPE': self.linearityType[ampName],
500 'COEFFS': self.linearityCoeffs[ampName],
501 'BBOX_X0': self.linearityBBox[ampName].getMinX(),
502 'BBOX_Y0': self.linearityBBox[ampName].getMinY(),
503 'BBOX_DX': self.linearityBBox[ampName].getWidth(),
504 'BBOX_DY': self.linearityBBox[ampName].getHeight(),
505 'INP_ABSCISSA': self.inputAbscissa[ampName],
506 'INP_ORDINATE': self.inputOrdinate[ampName],
507 'INP_MASK': self.inputMask[ampName],
508 'INP_GROUPING_INDEX': self.inputGroupingIndex[ampName],
509 'INP_NORMALIZATION': self.inputNormalization[ampName],
510 'FIT_PARAMS': self.fitParams[ampName],
511 'FIT_PARAMS_ERR': self.fitParamsErr[ampName],
512 'RED_CHI_SQ': self.fitChiSq[ampName],
513 'FIT_RES': self.fitResiduals[ampName],
514 'FIT_RES_SIGMAD': self.fitResidualsSigmaMad[ampName],
515 'FIT_RES_UNMASKED': self.fitResidualsUnmasked[ampName],
516 'FIT_RES_MODEL': self.fitResidualsModel[ampName],
517 'LIN_FIT': self.linearFit[ampName],
518 'LINEARITY_TURNOFF': self.linearityTurnoff[ampName],
519 'LINEARITY_MAX_SIGNAL': self.linearityMaxSignal[ampName],
520 'ABS_REF_AMP': self.absoluteReferenceAmplifier,
521 } for ampName in self.ampNames])
522 catalog.meta = self.getMetadata().toDict()
523 tableList.append(catalog)
525 if self.tableData is not None:
526 catalog = Table([{'LOOKUP_VALUES': value} for value in self.tableData])
527 tableList.append(catalog)
528 return tableList
530 def getLinearityTypeByName(self, linearityTypeName):
531 """Determine the linearity class to use from the type name.
533 Parameters
534 ----------
535 linearityTypeName : str
536 String name of the linearity type that is needed.
538 Returns
539 -------
540 linearityType : `~lsst.ip.isr.linearize.LinearizeBase`
541 The appropriate linearity class to use. If no matching class
542 is found, `None` is returned.
543 """
544 for t in [LinearizeLookupTable,
545 LinearizeSquared,
546 LinearizePolynomial,
547 LinearizeProportional,
548 LinearizeSpline,
549 LinearizeDoubleSpline,
550 LinearizeNone]:
551 if t.LinearityType == linearityTypeName:
552 return t
553 return None
555 def validate(self, detector=None, amplifier=None):
556 """Validate linearity for a detector/amplifier.
558 Parameters
559 ----------
560 detector : `lsst.afw.cameraGeom.Detector`, optional
561 Detector to validate, along with its amplifiers.
562 amplifier : `lsst.afw.cameraGeom.Amplifier`, optional
563 Single amplifier to validate.
565 Raises
566 ------
567 RuntimeError
568 Raised if there is a mismatch in linearity parameters, and
569 the cameraGeom parameters are not being overridden.
570 """
571 amplifiersToCheck = []
572 if detector:
573 if self._detectorName != detector.getName():
574 raise RuntimeError("Detector names don't match: %s != %s" %
575 (self._detectorName, detector.getName()))
576 if int(self._detectorId) != int(detector.getId()):
577 raise RuntimeError("Detector IDs don't match: %s != %s" %
578 (int(self._detectorId), int(detector.getId())))
579 # TODO: DM-38778: This check fails on LATISS due to an
580 # error in the camera configuration.
581 # if self._detectorSerial != detector.getSerial():
582 # raise RuntimeError(
583 # "Detector serial numbers don't match: %s != %s" %
584 # (self._detectorSerial, detector.getSerial()))
585 if len(detector.getAmplifiers()) != len(self.linearityCoeffs.keys()):
586 raise RuntimeError("Detector number of amps = %s does not match saved value %s" %
587 (len(detector.getAmplifiers()),
588 len(self.linearityCoeffs.keys())))
589 amplifiersToCheck.extend(detector.getAmplifiers())
591 if amplifier:
592 amplifiersToCheck.extend(amplifier)
594 for amp in amplifiersToCheck:
595 ampName = amp.getName()
596 if ampName not in self.linearityCoeffs.keys():
597 raise RuntimeError("Amplifier %s is not in linearity data" %
598 (ampName, ))
599 if amp.getLinearityType() != self.linearityType[ampName]:
600 if self.override:
601 self.log.debug("Overriding amplifier defined linearityType (%s) for %s",
602 self.linearityType[ampName], ampName)
603 else:
604 raise RuntimeError("Amplifier %s type %s does not match saved value %s" %
605 (ampName, amp.getLinearityType(), self.linearityType[ampName]))
606 if (amp.getLinearityCoeffs().shape != self.linearityCoeffs[ampName].shape or not
607 np.allclose(amp.getLinearityCoeffs(), self.linearityCoeffs[ampName], equal_nan=True)):
608 if self.override:
609 self.log.debug("Overriding amplifier defined linearityCoeffs (%s) for %s",
610 self.linearityCoeffs[ampName], ampName)
611 else:
612 raise RuntimeError("Amplifier %s coeffs %s does not match saved value %s" %
613 (ampName, amp.getLinearityCoeffs(), self.linearityCoeffs[ampName]))
615 def applyLinearity(self, image, detector=None, log=None, gains=None):
616 """Apply the linearity to an image.
618 If the linearity parameters are populated, use those,
619 otherwise use the values from the detector.
621 Parameters
622 ----------
623 image : `~lsst.afw.image.image`
624 Image to correct.
625 detector : `~lsst.afw.cameraGeom.detector`, optional
626 Detector to use to determine exposure trimmed state. If
627 supplied, but no other linearity information is provided
628 by the calibration, then the static solution stored in the
629 detector will be used.
630 log : `~logging.Logger`, optional
631 Log object to use for logging.
632 gains : `dict` [`str`, `float`], optional
633 Dictionary of amp name to gain. If this is provided then
634 linearity terms will be converted from adu to electrons.
635 Only used for Spline linearity corrections.
636 """
637 if log is None:
638 log = self.log
639 if detector and not self.hasLinearity:
640 self.fromDetector(detector)
642 self.validate(detector)
644 # Check if the image is trimmed.
645 isTrimmed = None
646 if detector:
647 isTrimmed = isTrimmedImage(image, detector)
649 numAmps = 0
650 numLinearized = 0
651 numOutOfRange = 0
652 for ampName in self.linearityType.keys():
653 linearizer = self.getLinearityTypeByName(self.linearityType[ampName])
654 numAmps += 1
656 if gains and self.linearityUnits == 'adu':
657 gainValue = gains[ampName]
658 else:
659 gainValue = 1.0
661 # If the gain is NaN, then this is a no-op
662 if np.isnan(gainValue):
663 log.warning("Gain value for amp %s is NaN, skipping "
664 "linearization.", ampName)
665 continue
667 if linearizer is not None:
668 match isTrimmed:
669 case True:
670 bbox = detector[ampName].getBBox()
671 case False:
672 bbox = detector[ampName].getRawBBox()
673 case None:
674 bbox = self.linearityBBox[ampName]
676 ampView = image.Factory(image, bbox)
677 success, outOfRange = linearizer()(ampView, **{'coeffs': self.linearityCoeffs[ampName],
678 'table': self.tableData,
679 'log': self.log,
680 'gain': gainValue})
681 numOutOfRange += outOfRange
682 if success:
683 numLinearized += 1
684 elif log is not None:
685 log.warning("Amplifier %s did not linearize.",
686 ampName)
687 return Struct(
688 numAmps=numAmps,
689 numLinearized=numLinearized,
690 numOutOfRange=numOutOfRange
691 )
694class LinearizeBase(metaclass=abc.ABCMeta):
695 """Abstract base class functor for correcting non-linearity.
697 Subclasses must define ``__call__`` and set class variable
698 LinearityType to a string that will be used for linearity type in
699 the cameraGeom.Amplifier.linearityType field.
701 All linearity corrections should be defined in terms of an
702 additive correction, such that:
704 corrected_value = uncorrected_value + f(uncorrected_value)
705 """
706 LinearityType = None # linearity type, a string used for AmpInfoCatalogs
708 @abc.abstractmethod
709 def __call__(self, image, **kwargs):
710 """Correct non-linearity.
712 Parameters
713 ----------
714 image : `lsst.afw.image.Image`
715 Image to be corrected
716 kwargs : `dict`
717 Dictionary of parameter keywords:
719 ``coeffs``
720 Coefficient vector (`list` or `np.ndarray`).
721 ``table``
722 Lookup table data (`np.ndarray`).
723 ``log``
724 Logger to handle messages (`logging.Logger`).
726 Returns
727 -------
728 output : `bool`
729 If `True`, a correction was applied successfully.
731 Raises
732 ------
733 RuntimeError:
734 Raised if the linearity type listed in the
735 detector does not match the class type.
736 """
737 pass
740class LinearizeLookupTable(LinearizeBase):
741 """Correct non-linearity with a persisted lookup table.
743 The lookup table consists of entries such that given
744 "coefficients" c0, c1:
746 for each i,j of image:
747 rowInd = int(c0)
748 colInd = int(c1 + uncorrImage[i,j])
749 corrImage[i,j] = uncorrImage[i,j] + table[rowInd, colInd]
751 - c0: row index; used to identify which row of the table to use
752 (typically one per amplifier, though one can have multiple
753 amplifiers use the same table)
754 - c1: column index offset; added to the uncorrected image value
755 before truncation; this supports tables that can handle
756 negative image values; also, if the c1 ends with .5 then
757 the nearest index is used instead of truncating to the
758 next smaller index
759 """
760 LinearityType = "LookupTable"
762 def __call__(self, image, **kwargs):
763 """Correct for non-linearity.
765 Parameters
766 ----------
767 image : `lsst.afw.image.Image`
768 Image to be corrected
769 kwargs : `dict`
770 Dictionary of parameter keywords:
772 ``coeffs``
773 Columnation vector (`list` or `np.ndarray`).
774 ``table``
775 Lookup table data (`np.ndarray`).
776 ``log``
777 Logger to handle messages (`logging.Logger`).
779 Returns
780 -------
781 output : `tuple` [`bool`, `int`]
782 If true, a correction was applied successfully. The
783 integer indicates the number of pixels that were
784 uncorrectable by being out of range.
786 Raises
787 ------
788 RuntimeError:
789 Raised if the requested row index is out of the table
790 bounds.
791 """
792 numOutOfRange = 0
794 rowInd, colIndOffset = kwargs['coeffs'][0:2]
795 table = kwargs['table']
796 log = kwargs['log']
798 numTableRows = table.shape[0]
799 rowInd = int(rowInd)
800 if rowInd < 0 or rowInd > numTableRows:
801 raise RuntimeError("LinearizeLookupTable rowInd=%s not in range[0, %s)" %
802 (rowInd, numTableRows))
803 tableRow = np.array(table[rowInd, :], dtype=image.getArray().dtype)
805 numOutOfRange += applyLookupTable(image, tableRow, colIndOffset)
807 if numOutOfRange > 0 and log is not None:
808 log.warning("%s pixels were out of range of the linearization table",
809 numOutOfRange)
810 if numOutOfRange < image.getArray().size:
811 return True, numOutOfRange
812 else:
813 return False, numOutOfRange
816class LinearizePolynomial(LinearizeBase):
817 """Correct non-linearity with a polynomial mode.
819 .. code-block::
821 corrImage = uncorrImage + sum_i c_i uncorrImage^(2 + i)
823 where ``c_i`` are the linearity coefficients for each amplifier.
824 Lower order coefficients are not included as they duplicate other
825 calibration parameters:
827 ``k0``
828 A coefficient multiplied by ``uncorrImage**0`` is equivalent to
829 bias level. Irrelevant for correcting non-linearity.
830 ``k1``
831 A coefficient multiplied by ``uncorrImage**1`` is proportional
832 to the gain. Not necessary for correcting non-linearity.
833 """
834 LinearityType = "Polynomial"
836 def __call__(self, image, **kwargs):
837 """Correct non-linearity.
839 Parameters
840 ----------
841 image : `lsst.afw.image.Image`
842 Image to be corrected
843 kwargs : `dict`
844 Dictionary of parameter keywords:
846 ``coeffs``
847 Coefficient vector (`list` or `np.ndarray`).
848 If the order of the polynomial is n, this list
849 should have a length of n-1 ("k0" and "k1" are
850 not needed for the correction).
851 ``log``
852 Logger to handle messages (`logging.Logger`).
854 Returns
855 -------
856 output : `tuple` [`bool`, `int`]
857 If true, a correction was applied successfully. The
858 integer indicates the number of pixels that were
859 uncorrectable by being out of range.
860 """
861 if not np.any(np.isfinite(kwargs['coeffs'])):
862 return False, 0
863 if not np.any(kwargs['coeffs']):
864 return False, 0
866 ampArray = image.getArray()
867 correction = np.zeros_like(ampArray)
868 for order, coeff in enumerate(kwargs['coeffs'], start=2):
869 correction += coeff * np.power(ampArray, order)
870 ampArray += correction
872 return True, 0
875class LinearizeSquared(LinearizeBase):
876 """Correct non-linearity with a squared model.
878 corrImage = uncorrImage + c0*uncorrImage^2
880 where c0 is linearity coefficient 0 for each amplifier.
881 """
882 LinearityType = "Squared"
884 def __call__(self, image, **kwargs):
885 """Correct for non-linearity.
887 Parameters
888 ----------
889 image : `lsst.afw.image.Image`
890 Image to be corrected
891 kwargs : `dict`
892 Dictionary of parameter keywords:
894 ``coeffs``
895 Coefficient vector (`list` or `np.ndarray`).
896 ``log``
897 Logger to handle messages (`logging.Logger`).
899 Returns
900 -------
901 output : `tuple` [`bool`, `int`]
902 If true, a correction was applied successfully. The
903 integer indicates the number of pixels that were
904 uncorrectable by being out of range.
905 """
907 sqCoeff = kwargs['coeffs'][0]
908 if sqCoeff != 0:
909 ampArr = image.getArray()
910 ampArr *= (1 + sqCoeff*ampArr)
911 return True, 0
912 else:
913 return False, 0
916class LinearizeSpline(LinearizeBase):
917 """Correct non-linearity with a spline model.
919 corrImage = uncorrImage - Spline(coeffs, uncorrImage)
921 Notes
922 -----
924 The spline fit calculates a correction as a function of the
925 expected linear flux term. Because of this, the correction needs
926 to be subtracted from the observed flux.
928 """
929 LinearityType = "Spline"
931 def __call__(self, image, **kwargs):
932 """Correct for non-linearity.
934 Parameters
935 ----------
936 image : `lsst.afw.image.Image`
937 Image to be corrected
938 kwargs : `dict`
939 Dictionary of parameter keywords:
941 ``coeffs``
942 Coefficient vector (`list` or `np.ndarray`).
943 ``log``
944 Logger to handle messages (`logging.Logger`).
945 ``gain``
946 Gain value to apply.
948 Returns
949 -------
950 output : `tuple` [`bool`, `int`]
951 If true, a correction was applied successfully. The
952 integer indicates the number of pixels that were
953 uncorrectable by being out of range.
954 """
955 splineCoeff = kwargs['coeffs']
956 gain = kwargs.get('gain', 1.0)
957 centers, values = np.split(splineCoeff, 2)
958 # Note that we must do a copy here because the input array
959 # may not be volatile memory.
960 centers = gain * centers
961 values = gain * values
962 # If the spline is not anchored at zero, remove the offset
963 # found at the lowest flux available, and add an anchor at
964 # flux=0.0 if there's no entry at that point.
965 if values[0] != 0:
966 offset = values[0]
967 values -= offset
968 if centers[0] != 0.0:
969 centers = np.concatenate(([0.0], centers))
970 values = np.concatenate(([0.0], values))
972 ampArr = image.array
974 if np.any(~np.isfinite(centers)) or np.any(~np.isfinite(values)):
975 # This cannot be used; turns everything into nans.
976 ampArr[:] = np.nan
978 return False, 0
979 else:
980 interp = Akima1DInterpolator(centers, values, method="akima")
981 # Clip to avoid extrapolation and hitting the top end.
982 ampArr -= interp(np.clip(ampArr, centers[0], centers[-1] - 0.1))
984 return True, 0
987class LinearizeDoubleSpline(LinearizeBase):
988 """Correct non-linearity with a spline model.
990 corrImage1 = uncorrImage - Spline1(coeffs, uncorrImage)
991 corrImage = corrImage1 - Spline2(coeffs, corrImage1)
993 Notes
994 -----
996 The spline fit calculates a correction as a function of the
997 expected linear flux term. Because of this, the correction needs
998 to be subtracted from the observed flux.
1000 """
1001 LinearityType = "DoubleSpline"
1003 def __call__(self, image, **kwargs):
1004 """Correct for non-linearity.
1006 Parameters
1007 ----------
1008 image : `lsst.afw.image.Image`
1009 Image to be corrected
1010 kwargs : `dict`
1011 Dictionary of parameter keywords:
1013 ``coeffs``
1014 Coefficient vector (`list` or `np.ndarray`).
1015 ``log``
1016 Logger to handle messages (`logging.Logger`).
1017 ``gain``
1018 Gain value to apply.
1020 Returns
1021 -------
1022 output : `tuple` [`bool`, `int`]
1023 If true, a correction was applied successfully. The
1024 integer indicates the number of pixels that were
1025 uncorrectable by being out of range.
1026 """
1027 coeffs = kwargs['coeffs']
1028 gain = kwargs.get('gain', 1.0)
1030 # The coeffs have [nNodes1, nNodes2, nodes1, values1, nodes2, values2]
1032 nNodes1 = int(coeffs[0])
1033 nNodes2 = int(coeffs[1])
1035 ampArr = image.array
1037 if nNodes1 > 0:
1038 splineCoeff1 = coeffs[2: 2 + 2*nNodes1]
1039 centers1, values1 = np.split(splineCoeff1, 2)
1041 if np.any(~np.isfinite(centers1)) or np.any(~np.isfinite(values1)):
1042 # Bad linearizer.
1043 ampArr[:] = np.nan
1044 return False, 0
1046 if gain != 1.0:
1047 centers1 = centers1 * gain
1048 values1 = values1 * gain
1049 if nNodes2 > 0:
1050 splineCoeff2 = coeffs[2 + 2*nNodes1: 2 + 2*nNodes1 + 2*nNodes2]
1051 centers2, values2 = np.split(splineCoeff2, 2)
1053 if np.any(~np.isfinite(centers2)) or np.any(~np.isfinite(values2)):
1054 # Bad linearizer.
1055 ampArr[:] = np.nan
1056 return False, 0
1058 if gain != 1.0:
1059 centers2 = centers2 * gain
1060 values2 = values2 * gain
1062 # Note the double-spline will always be anchored at zero.
1064 if nNodes1 > 0:
1065 interp1 = Akima1DInterpolator(centers1, values1, method="akima")
1066 # Clip to avoid extrapolation and hitting the top end.
1067 ampArr -= interp1(np.clip(ampArr, centers1[0], centers1[-1] - 0.01))
1068 if nNodes2 > 0:
1069 interp2 = Akima1DInterpolator(centers2, values2, method="akima")
1070 # Clip to avoid extrapolation and hitting the top end.
1071 ampArr -= interp2(np.clip(ampArr, centers2[0], centers2[-1] - 0.01))
1073 return True, 0
1076class LinearizeProportional(LinearizeBase):
1077 """Do not correct non-linearity.
1078 """
1079 LinearityType = "Proportional"
1081 def __call__(self, image, **kwargs):
1082 """Do not correct for non-linearity.
1084 Parameters
1085 ----------
1086 image : `lsst.afw.image.Image`
1087 Image to be corrected
1088 kwargs : `dict`
1089 Dictionary of parameter keywords:
1091 ``coeffs``
1092 Coefficient vector (`list` or `np.ndarray`).
1093 ``log``
1094 Logger to handle messages (`logging.Logger`).
1096 Returns
1097 -------
1098 output : `tuple` [`bool`, `int`]
1099 If true, a correction was applied successfully. The
1100 integer indicates the number of pixels that were
1101 uncorrectable by being out of range.
1102 """
1103 return True, 0
1106class LinearizeNone(LinearizeBase):
1107 """Do not correct non-linearity.
1108 """
1109 LinearityType = "None"
1111 def __call__(self, image, **kwargs):
1112 """Do not correct for non-linearity.
1114 Parameters
1115 ----------
1116 image : `lsst.afw.image.Image`
1117 Image to be corrected
1118 kwargs : `dict`
1119 Dictionary of parameter keywords:
1121 ``coeffs``
1122 Coefficient vector (`list` or `np.ndarray`).
1123 ``log``
1124 Logger to handle messages (`logging.Logger`).
1126 Returns
1127 -------
1128 output : `tuple` [`bool`, `int`]
1129 If true, a correction was applied successfully. The
1130 integer indicates the number of pixels that were
1131 uncorrectable by being out of range.
1132 """
1133 return True, 0