Coverage for python / lsst / ip / isr / linearize.py: 11%

331 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:58 +0000

1# 

2# LSST Data Management System 

3# Copyright 2016 AURA/LSST. 

4# 

5# This product includes software developed by the 

6# LSST Project (http://www.lsst.org/). 

7# 

8# This program is free software: you can redistribute it and/or modify 

9# it under the terms of the GNU General Public License as published by 

10# the Free Software Foundation, either version 3 of the License, or 

11# (at your option) any later version. 

12# 

13# This program is distributed in the hope that it will be useful, 

14# but WITHOUT ANY WARRANTY; without even the implied warranty of 

15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

16# GNU General Public License for more details. 

17# 

18# You should have received a copy of the LSST License Statement and 

19# the GNU General Public License along with this program. If not, 

20# see <http://www.lsstcorp.org/LegalNotices/>. 

21# 

22 

23__all__ = [ 

24 "Linearizer", 

25 "LinearizeBase", 

26 "LinearizeLookupTable", 

27 "LinearizeSquared", 

28 "LinearizeProportional", 

29 "LinearizePolynomial", 

30 "LinearizeSpline", 

31 "LinearizeDoubleSpline", 

32 "LinearizeNone", 

33] 

34 

35import abc 

36import numpy as np 

37from scipy.interpolate import Akima1DInterpolator 

38 

39from astropy.table import Table 

40 

41from lsst.pipe.base import Struct 

42from lsst.geom import Box2I, Point2I, Extent2I 

43from .applyLookupTable import applyLookupTable 

44from .calibType import IsrCalib 

45from .isrFunctions import isTrimmedImage 

46 

47 

48class Linearizer(IsrCalib): 

49 """Parameter set for linearization. 

50 

51 These parameters are included in `lsst.afw.cameraGeom.Amplifier`, but 

52 should be accessible externally to allow for testing. 

53 

54 Parameters 

55 ---------- 

56 table : `numpy.array`, optional 

57 Lookup table; a 2-dimensional array of floats: 

58 

59 - one row for each row index (value of coef[0] in the amplifier) 

60 - one column for each image value 

61 

62 To avoid copying the table the last index should vary fastest 

63 (numpy default "C" order) 

64 detector : `lsst.afw.cameraGeom.Detector`, optional 

65 Detector object. Passed to self.fromDetector() on init. 

66 log : `logging.Logger`, optional 

67 Logger to handle messages. 

68 kwargs : `dict`, optional 

69 Other keyword arguments to pass to the parent init. 

70 

71 Raises 

72 ------ 

73 RuntimeError 

74 Raised if the supplied table is not 2D, or if the table has fewer 

75 columns than rows (indicating that the indices are swapped). 

76 

77 Notes 

78 ----- 

79 The linearizer attributes stored are: 

80 

81 hasLinearity : `bool` 

82 Whether a linearity correction is defined for this detector. 

83 override : `bool` 

84 Whether the detector parameters should be overridden. 

85 ampNames : `list` [`str`] 

86 List of amplifier names to correct. 

87 inputGain : `dict` [`str`, `float`] 

88 Dictionary keyed by amplifirer name containing the input 

89 gain from the PTC used to construct this linearizer. 

90 linearityCoeffs : `dict` [`str`, `np.ndarray`] 

91 Coefficients to use in correction. Indexed by amplifier 

92 names. The format of the array depends on the type of 

93 correction to apply. 

94 linearityType : `dict` [`str`, `str`] 

95 Type of correction to use, indexed by amplifier names. 

96 linearityBBox : `dict` [`str`, `lsst.geom.Box2I`] 

97 Bounding box the correction is valid over, indexed by 

98 amplifier names. 

99 fitParams : `dict` [`str`, `np.ndarray`], optional 

100 Linearity fit parameters used to construct the correction 

101 coefficients, indexed as above. 

102 fitParamsErr : `dict` [`str`, `np.ndarray`], optional 

103 Uncertainty values of the linearity fit parameters used to 

104 construct the correction coefficients, indexed as above. 

105 fitChiSq : `dict` [`str`, `float`], optional 

106 Chi-squared value of the linearity fit, indexed as above. 

107 fitResiduals : `dict` [`str`, `np.ndarray`], optional 

108 Residuals of the fit, indexed as above. Used for 

109 calculating photodiode corrections 

110 fitResidualsSigmaMad : `dict` [`str`, `float`], optional 

111 Robust median-absolute-deviation of fit residuals, scaled 

112 by the signal level. 

113 fitResidualsUnmasked : `dict` [`str`, `np.ndarray`], optional 

114 Same as fitResiduals, but all outliers are included and 

115 not masked as nans. 

116 fitResidualsModel : `dict` [`str`, `np.ndarray`], optional 

117 The model count level for each of the fitResiduals. 

118 linearFit : The linear fit to the low flux region of the curve. 

119 [intercept, slope]. 

120 tableData : `np.ndarray`, optional 

121 Lookup table data for the linearity correction. 

122 inputAbscissa : `dict` [`str`, `np.ndarray`], optional 

123 Input abscissa used to construct linearizer (usually photodiode 

124 or exposure time). 

125 inputOrdinate : `dict` [`str`, `np.ndarray`], optional 

126 Input ordinate used to construct linearizer (raw mean counts). 

127 inputMask : `dict` [`str`, `np.ndarray`], optional 

128 Input mask used for the fitting. 

129 inputGroupingIndex : `dict` [`str`, `np.ndarray`], optional 

130 Input grouping index used for fitting. 

131 inputNormalization : `dict` [`str`, `np.ndarray`], optional 

132 Input normalization that was applied to the abscissa for 

133 each pair from the PTC used for the linearization fit. 

134 absoluteReferenceAmplifier : `str`, optional 

135 Amplifier used for the reference for absolute linearization 

136 (DoubleSpline) mode. 

137 

138 Version 1.4 adds ``linearityTurnoff`` and ``linearityMaxSignal``. 

139 Version 1.5 adds ``fitResidualsUnmasked``, ``inputAbscissa``, 

140 ``inputOrdinate``, ``inputMask``, ``inputGroupingIndex``, 

141 ``fitResidualsModel``, and ``inputNormalization``. 

142 Version 1.6 adds ``absoluteReferenceAmplifier``. 

143 Version 1.7 adds ``inputGain``. 

144 """ 

145 _OBSTYPE = "LINEARIZER" 

146 _SCHEMA = 'Gen3 Linearizer' 

147 _VERSION = 1.7 

148 

149 def __init__(self, table=None, **kwargs): 

150 self.hasLinearity = False 

151 self.override = False 

152 

153 self.ampNames = list() 

154 self.inputGain = dict() 

155 self.linearityCoeffs = dict() 

156 self.linearityType = dict() 

157 self.linearityBBox = dict() 

158 self.inputAbscissa = dict() 

159 self.inputOrdinate = dict() 

160 self.inputMask = dict() 

161 self.inputGroupingIndex = dict() 

162 self.inputNormalization = dict() 

163 self.fitParams = dict() 

164 self.fitParamsErr = dict() 

165 self.fitChiSq = dict() 

166 self.fitResiduals = dict() 

167 self.fitResidualsSigmaMad = dict() 

168 self.fitResidualsUnmasked = dict() 

169 self.fitResidualsModel = dict() 

170 self.linearFit = dict() 

171 self.linearityTurnoff = dict() 

172 self.linearityMaxSignal = dict() 

173 self.absoluteReferenceAmplifier = "" 

174 self.tableData = None 

175 if table is not None: 

176 if len(table.shape) != 2: 

177 raise RuntimeError("table shape = %s; must have two dimensions" % (table.shape,)) 

178 if table.shape[1] < table.shape[0]: 

179 raise RuntimeError("table shape = %s; indices are switched" % (table.shape,)) 

180 self.tableData = np.array(table, order="C") 

181 

182 # The linearizer is always natively in adu because it 

183 # is computed prior to computing gains. 

184 self.linearityUnits = 'adu' 

185 

186 super().__init__(**kwargs) 

187 self.requiredAttributes.update(['hasLinearity', 'override', 

188 'ampNames', 'inputGain', 

189 'linearityCoeffs', 'linearityType', 'linearityBBox', 

190 'fitParams', 'fitParamsErr', 'fitChiSq', 

191 'fitResiduals', 'fitResidualsSigmaMad', 'linearFit', 'tableData', 

192 'units', 'linearityTurnoff', 'linearityMaxSignal', 

193 'fitResidualsUnmasked', 'inputAbscissa', 'inputOrdinate', 

194 'inputMask', 'inputGroupingIndex', 'fitResidualsModel', 

195 'inputNormalization', 'absoluteReferenceAmplifier']) 

196 

197 def updateMetadata(self, setDate=False, **kwargs): 

198 """Update metadata keywords with new values. 

199 

200 This calls the base class's method after ensuring the required 

201 calibration keywords will be saved. 

202 

203 Parameters 

204 ---------- 

205 setDate : `bool`, optional 

206 Update the CALIBDATE fields in the metadata to the current 

207 time. Defaults to False. 

208 kwargs : 

209 Other keyword parameters to set in the metadata. 

210 """ 

211 kwargs['HAS_LINEARITY'] = self.hasLinearity 

212 kwargs['OVERRIDE'] = self.override 

213 kwargs['HAS_TABLE'] = self.tableData is not None 

214 kwargs['LINEARITY_UNITS'] = self.linearityUnits 

215 

216 super().updateMetadata(setDate=setDate, **kwargs) 

217 

218 def fromDetector(self, detector): 

219 """Read linearity parameters from a detector. 

220 

221 Parameters 

222 ---------- 

223 detector : `lsst.afw.cameraGeom.detector` 

224 Input detector with parameters to use. 

225 

226 Returns 

227 ------- 

228 calib : `lsst.ip.isr.Linearizer` 

229 The calibration constructed from the detector. 

230 """ 

231 self._detectorName = detector.getName() 

232 self._detectorSerial = detector.getSerial() 

233 self._detectorId = detector.getId() 

234 self.hasLinearity = True 

235 

236 # Do not translate Threshold, Maximum, Units. 

237 for amp in detector.getAmplifiers(): 

238 ampName = amp.getName() 

239 self.ampNames.append(ampName) 

240 self.linearityType[ampName] = amp.getLinearityType() 

241 self.linearityCoeffs[ampName] = amp.getLinearityCoeffs() 

242 self.linearityBBox[ampName] = amp.getBBox() 

243 

244 # Detector linearizers (legacy) are assumed to be adu units. 

245 self.linearityUnits = 'adu' 

246 

247 return self 

248 

249 @classmethod 

250 def fromDict(cls, dictionary): 

251 """Construct a calibration from a dictionary of properties 

252 

253 Parameters 

254 ---------- 

255 dictionary : `dict` 

256 Dictionary of properties 

257 

258 Returns 

259 ------- 

260 calib : `lsst.ip.isr.Linearity` 

261 Constructed calibration. 

262 

263 Raises 

264 ------ 

265 RuntimeError 

266 Raised if the supplied dictionary is for a different 

267 calibration. 

268 """ 

269 

270 calib = cls() 

271 

272 if calib._OBSTYPE != dictionary['metadata']['OBSTYPE']: 

273 raise RuntimeError(f"Incorrect linearity supplied. Expected {calib._OBSTYPE}, " 

274 f"found {dictionary['metadata']['OBSTYPE']}") 

275 

276 calib.setMetadata(dictionary['metadata']) 

277 

278 calib.hasLinearity = dictionary.get('hasLinearity', 

279 dictionary['metadata'].get('HAS_LINEARITY', False)) 

280 calib.override = dictionary.get('override', True) 

281 

282 # Old linearizers which do not have linearityUnits are 

283 # assumed to be adu because that's all that has been 

284 # supported. 

285 calib.linearityUnits = dictionary.get('linearityUnits', 'adu') 

286 

287 if calib.hasLinearity: 

288 for ampName in dictionary['amplifiers']: 

289 amp = dictionary['amplifiers'][ampName] 

290 calib.ampNames.append(ampName) 

291 # If gain = None on isrFunction.gainContext(), 

292 # the default amp geometry gains are used 

293 # in IsrTask* 

294 calib.inputGain[ampName] = amp.get('inputGain', np.nan) 

295 calib.linearityCoeffs[ampName] = np.array(amp.get('linearityCoeffs', [0.0])) 

296 calib.linearityType[ampName] = amp.get('linearityType', 'None') 

297 calib.linearityBBox[ampName] = amp.get('linearityBBox', None) 

298 

299 calib.inputAbscissa[ampName] = np.array(amp.get('inputAbscissa', [0.0])) 

300 calib.inputOrdinate[ampName] = np.array(amp.get('inputOrdinate', [0.0])) 

301 calib.inputMask[ampName] = np.array(amp.get('inputMask', [False])) 

302 calib.inputGroupingIndex[ampName] = np.array(amp.get('inputGroupingIndex', [0.0])) 

303 calib.inputNormalization[ampName] = np.array(amp.get('inputNormalization', [1.0])) 

304 

305 calib.fitParams[ampName] = np.array(amp.get('fitParams', [0.0])) 

306 calib.fitParamsErr[ampName] = np.array(amp.get('fitParamsErr', [0.0])) 

307 calib.fitChiSq[ampName] = amp.get('fitChiSq', np.nan) 

308 calib.fitResiduals[ampName] = np.array(amp.get('fitResiduals', [0.0])) 

309 calib.fitResidualsSigmaMad[ampName] = np.array(amp.get('fitResidualsSigmaMad', np.nan)) 

310 calib.fitResidualsUnmasked[ampName] = np.array(amp.get('fitResidualsUnmasked', [0.0])) 

311 calib.fitResidualsModel[ampName] = np.array(amp.get('fitResidualsModel', [0.0])) 

312 calib.linearFit[ampName] = np.array(amp.get('linearFit', [0.0])) 

313 

314 calib.linearityTurnoff[ampName] = np.array(amp.get('linearityTurnoff', np.nan)) 

315 calib.linearityMaxSignal[ampName] = np.array(amp.get('linearityMaxSignal', np.nan)) 

316 

317 calib.tableData = dictionary.get('tableData', None) 

318 if calib.tableData: 

319 calib.tableData = np.array(calib.tableData) 

320 

321 calib.absoluteReferenceAmplifier = dictionary.get( 

322 'absoluteReferenceAmplifier', calib.absoluteReferenceAmplifier, 

323 ) 

324 

325 return calib 

326 

327 def toDict(self): 

328 """Return linearity parameters as a dict. 

329 

330 Returns 

331 ------- 

332 outDict : `dict`: 

333 """ 

334 self.updateMetadata() 

335 

336 outDict = {'metadata': self.getMetadata(), 

337 'detectorName': self._detectorName, 

338 'detectorSerial': self._detectorSerial, 

339 'detectorId': self._detectorId, 

340 'hasTable': self.tableData is not None, 

341 'amplifiers': dict(), 

342 'linearityUnits': self.linearityUnits, 

343 } 

344 for ampName in self.linearityType: 

345 outDict['amplifiers'][ampName] = { 

346 'inputGain': self.inputGain[ampName], 

347 'linearityType': self.linearityType[ampName], 

348 'linearityCoeffs': self.linearityCoeffs[ampName].tolist(), 

349 'linearityBBox': self.linearityBBox[ampName], 

350 'inputAbscissa': self.inputAbscissa[ampName].tolist(), 

351 'inputOrdinate': self.inputOrdinate[ampName].tolist(), 

352 'inputMask': self.inputMask[ampName].tolist(), 

353 'inputGroupingIndex': self.inputGroupingIndex[ampName].tolist(), 

354 'inputNormalization': self.inputNormalization[ampName].tolist(), 

355 'fitParams': self.fitParams[ampName].tolist(), 

356 'fitParamsErr': self.fitParamsErr[ampName].tolist(), 

357 'fitChiSq': self.fitChiSq[ampName], 

358 'fitResiduals': self.fitResiduals[ampName].tolist(), 

359 'fitResidualsSigmaMad': self.fitResidualsSigmaMad[ampName], 

360 'fitResidualsUnmasked': self.fitResidualsUnmasked[ampName].tolist(), 

361 'fitResidualsModel': self.fitResidualsModel[ampName].tolist(), 

362 'linearFit': self.linearFit[ampName].tolist(), 

363 'linearityTurnoff': self.linearityTurnoff[ampName], 

364 'linearityMaxSignal': self.linearityMaxSignal[ampName], 

365 } 

366 if self.tableData is not None: 

367 outDict['tableData'] = self.tableData.tolist() 

368 

369 outDict['absoluteReferenceAmplifier'] = self.absoluteReferenceAmplifier 

370 

371 return outDict 

372 

373 @classmethod 

374 def fromTable(cls, tableList): 

375 """Read linearity from a FITS file. 

376 

377 This method uses the `fromDict` method to create the 

378 calibration, after constructing an appropriate dictionary from 

379 the input tables. 

380 

381 Parameters 

382 ---------- 

383 tableList : `list` [`astropy.table.Table`] 

384 afwTable read from input file name. 

385 

386 Returns 

387 ------- 

388 linearity : `~lsst.ip.isr.linearize.Linearizer`` 

389 Linearity parameters. 

390 

391 Notes 

392 ----- 

393 The method reads a FITS file with 1 or 2 extensions. The metadata is 

394 read from the header of extension 1, which must exist. Then the table 

395 is loaded, and the ['AMPLIFIER_NAME', 'TYPE', 'COEFFS', 'BBOX_X0', 

396 'BBOX_Y0', 'BBOX_DX', 'BBOX_DY'] columns are read and used to set each 

397 dictionary by looping over rows. 

398 Extension 2 is then attempted to read in the try block (which only 

399 exists for lookup tables). It has a column named 'LOOKUP_VALUES' that 

400 contains a vector of the lookup entries in each row. 

401 """ 

402 coeffTable = tableList[0] 

403 

404 metadata = coeffTable.meta 

405 inDict = dict() 

406 inDict['metadata'] = metadata 

407 inDict['hasLinearity'] = metadata.get('HAS_LINEARITY', False) 

408 inDict['amplifiers'] = dict() 

409 inDict['linearityUnits'] = metadata.get('LINEARITY_UNITS', 'adu') 

410 

411 for record in coeffTable: 

412 ampName = record['AMPLIFIER_NAME'] 

413 inputGain = record['INPUT_GAIN'] if 'INPUT_GAIN' in record.columns else np.nan 

414 inputAbscissa = record['INP_ABSCISSA'] if 'INP_ABSCISSA' in record.columns else np.array([0.0]) 

415 inputOrdinate = record['INP_ORDINATE'] if 'INP_ORDINATE' in record.columns else np.array([0.0]) 

416 inputMask = record['INP_MASK'] if 'INP_MASK' in record.columns else np.array([False]) 

417 inputGroupingIndex = record['INP_GROUPING_INDEX'] if 'INP_GROUPING_INDEX' in record.columns \ 

418 else np.array([0]) 

419 inputNormalization = record['INP_NORMALIZATION'] if 'INP_NORMALIZATION' in record.columns \ 

420 else np.array([1.0]) 

421 fitParams = record['FIT_PARAMS'] if 'FIT_PARAMS' in record.columns else np.array([0.0]) 

422 fitParamsErr = record['FIT_PARAMS_ERR'] if 'FIT_PARAMS_ERR' in record.columns else np.array([0.0]) 

423 fitChiSq = record['RED_CHI_SQ'] if 'RED_CHI_SQ' in record.columns else np.nan 

424 fitResiduals = record['FIT_RES'] if 'FIT_RES' in record.columns else np.array([0.0]) 

425 fitResidualsSigmaMad = record['FIT_RES_SIGMAD'] if 'FIT_RES_SIGMAD' in record.columns else np.nan 

426 fitResidualsUnmasked = record['FIT_RES_UNMASKED'] \ 

427 if 'FIT_RES_UNMASKED' in record.columns else np.array([0.0]) 

428 fitResidualsModel = record['FIT_RES_MODEL'] \ 

429 if 'FIT_RES_MODEL' in record.columns else np.array([0.0]) 

430 linearFit = record['LIN_FIT'] if 'LIN_FIT' in record.columns else np.array([0.0]) 

431 

432 linearityTurnoff = record['LINEARITY_TURNOFF'] if 'LINEARITY_TURNOFF' in record.columns \ 

433 else np.nan 

434 linearityMaxSignal = record['LINEARITY_MAX_SIGNAL'] if 'LINEARITY_MAX_SIGNAL' in record.columns \ 

435 else np.nan 

436 

437 inDict['amplifiers'][ampName] = { 

438 'inputGain': inputGain, 

439 'linearityType': record['TYPE'], 

440 'linearityCoeffs': record['COEFFS'], 

441 'linearityBBox': Box2I(Point2I(record['BBOX_X0'], record['BBOX_Y0']), 

442 Extent2I(record['BBOX_DX'], record['BBOX_DY'])), 

443 'inputAbscissa': inputAbscissa, 

444 'inputOrdinate': inputOrdinate, 

445 'inputMask': inputMask, 

446 'inputGroupingIndex': inputGroupingIndex, 

447 'inputNormalization': inputNormalization, 

448 'fitParams': fitParams, 

449 'fitParamsErr': fitParamsErr, 

450 'fitChiSq': fitChiSq, 

451 'fitResiduals': fitResiduals, 

452 'fitResidualsSigmaMad': fitResidualsSigmaMad, 

453 'fitResidualsUnmasked': fitResidualsUnmasked, 

454 'fitResidualsModel': fitResidualsModel, 

455 'linearFit': linearFit, 

456 'linearityTurnoff': linearityTurnoff, 

457 'linearityMaxSignal': linearityMaxSignal, 

458 } 

459 

460 if len(tableList) > 1: 

461 tableData = tableList[1] 

462 inDict['tableData'] = [record['LOOKUP_VALUES'] for record in tableData] 

463 

464 if 'ABS_REF_AMP' in coeffTable.columns: 

465 inDict['absoluteReferenceAmplifier'] = str(coeffTable['ABS_REF_AMP'][0]) 

466 else: 

467 inDict['absoluteReferenceAmplifier'] = '' 

468 

469 return cls().fromDict(inDict) 

470 

471 def toTable(self): 

472 """Construct a list of tables containing the information in this 

473 calibration. 

474 

475 The list of tables should create an identical calibration 

476 after being passed to this class's fromTable method. 

477 

478 Returns 

479 ------- 

480 tableList : `list` [`astropy.table.Table`] 

481 List of tables containing the linearity calibration 

482 information. 

483 """ 

484 

485 tableList = [] 

486 self.updateMetadata() 

487 catalog = Table([{'AMPLIFIER_NAME': ampName, 

488 'INPUT_GAIN': self.inputGain[ampName], 

489 'TYPE': self.linearityType[ampName], 

490 'COEFFS': self.linearityCoeffs[ampName], 

491 'BBOX_X0': self.linearityBBox[ampName].getMinX(), 

492 'BBOX_Y0': self.linearityBBox[ampName].getMinY(), 

493 'BBOX_DX': self.linearityBBox[ampName].getWidth(), 

494 'BBOX_DY': self.linearityBBox[ampName].getHeight(), 

495 'INP_ABSCISSA': self.inputAbscissa[ampName], 

496 'INP_ORDINATE': self.inputOrdinate[ampName], 

497 'INP_MASK': self.inputMask[ampName], 

498 'INP_GROUPING_INDEX': self.inputGroupingIndex[ampName], 

499 'INP_NORMALIZATION': self.inputNormalization[ampName], 

500 'FIT_PARAMS': self.fitParams[ampName], 

501 'FIT_PARAMS_ERR': self.fitParamsErr[ampName], 

502 'RED_CHI_SQ': self.fitChiSq[ampName], 

503 'FIT_RES': self.fitResiduals[ampName], 

504 'FIT_RES_SIGMAD': self.fitResidualsSigmaMad[ampName], 

505 'FIT_RES_UNMASKED': self.fitResidualsUnmasked[ampName], 

506 'FIT_RES_MODEL': self.fitResidualsModel[ampName], 

507 'LIN_FIT': self.linearFit[ampName], 

508 'LINEARITY_TURNOFF': self.linearityTurnoff[ampName], 

509 'LINEARITY_MAX_SIGNAL': self.linearityMaxSignal[ampName], 

510 'ABS_REF_AMP': self.absoluteReferenceAmplifier, 

511 } for ampName in self.ampNames]) 

512 catalog.meta = self.getMetadata().toDict() 

513 tableList.append(catalog) 

514 

515 if self.tableData is not None: 

516 catalog = Table([{'LOOKUP_VALUES': value} for value in self.tableData]) 

517 tableList.append(catalog) 

518 return tableList 

519 

520 def getLinearityTypeByName(self, linearityTypeName): 

521 """Determine the linearity class to use from the type name. 

522 

523 Parameters 

524 ---------- 

525 linearityTypeName : str 

526 String name of the linearity type that is needed. 

527 

528 Returns 

529 ------- 

530 linearityType : `~lsst.ip.isr.linearize.LinearizeBase` 

531 The appropriate linearity class to use. If no matching class 

532 is found, `None` is returned. 

533 """ 

534 for t in [LinearizeLookupTable, 

535 LinearizeSquared, 

536 LinearizePolynomial, 

537 LinearizeProportional, 

538 LinearizeSpline, 

539 LinearizeDoubleSpline, 

540 LinearizeNone]: 

541 if t.LinearityType == linearityTypeName: 

542 return t 

543 return None 

544 

545 def validate(self, detector=None, amplifier=None): 

546 """Validate linearity for a detector/amplifier. 

547 

548 Parameters 

549 ---------- 

550 detector : `lsst.afw.cameraGeom.Detector`, optional 

551 Detector to validate, along with its amplifiers. 

552 amplifier : `lsst.afw.cameraGeom.Amplifier`, optional 

553 Single amplifier to validate. 

554 

555 Raises 

556 ------ 

557 RuntimeError 

558 Raised if there is a mismatch in linearity parameters, and 

559 the cameraGeom parameters are not being overridden. 

560 """ 

561 amplifiersToCheck = [] 

562 if detector: 

563 if self._detectorName != detector.getName(): 

564 raise RuntimeError("Detector names don't match: %s != %s" % 

565 (self._detectorName, detector.getName())) 

566 if int(self._detectorId) != int(detector.getId()): 

567 raise RuntimeError("Detector IDs don't match: %s != %s" % 

568 (int(self._detectorId), int(detector.getId()))) 

569 # TODO: DM-38778: This check fails on LATISS due to an 

570 # error in the camera configuration. 

571 # if self._detectorSerial != detector.getSerial(): 

572 # raise RuntimeError( 

573 # "Detector serial numbers don't match: %s != %s" % 

574 # (self._detectorSerial, detector.getSerial())) 

575 if len(detector.getAmplifiers()) != len(self.linearityCoeffs.keys()): 

576 raise RuntimeError("Detector number of amps = %s does not match saved value %s" % 

577 (len(detector.getAmplifiers()), 

578 len(self.linearityCoeffs.keys()))) 

579 amplifiersToCheck.extend(detector.getAmplifiers()) 

580 

581 if amplifier: 

582 amplifiersToCheck.extend(amplifier) 

583 

584 for amp in amplifiersToCheck: 

585 ampName = amp.getName() 

586 if ampName not in self.linearityCoeffs.keys(): 

587 raise RuntimeError("Amplifier %s is not in linearity data" % 

588 (ampName, )) 

589 if amp.getLinearityType() != self.linearityType[ampName]: 

590 if self.override: 

591 self.log.debug("Overriding amplifier defined linearityType (%s) for %s", 

592 self.linearityType[ampName], ampName) 

593 else: 

594 raise RuntimeError("Amplifier %s type %s does not match saved value %s" % 

595 (ampName, amp.getLinearityType(), self.linearityType[ampName])) 

596 if (amp.getLinearityCoeffs().shape != self.linearityCoeffs[ampName].shape or not 

597 np.allclose(amp.getLinearityCoeffs(), self.linearityCoeffs[ampName], equal_nan=True)): 

598 if self.override: 

599 self.log.debug("Overriding amplifier defined linearityCoeffs (%s) for %s", 

600 self.linearityCoeffs[ampName], ampName) 

601 else: 

602 raise RuntimeError("Amplifier %s coeffs %s does not match saved value %s" % 

603 (ampName, amp.getLinearityCoeffs(), self.linearityCoeffs[ampName])) 

604 

605 def applyLinearity(self, image, detector=None, log=None, gains=None): 

606 """Apply the linearity to an image. 

607 

608 If the linearity parameters are populated, use those, 

609 otherwise use the values from the detector. 

610 

611 Parameters 

612 ---------- 

613 image : `~lsst.afw.image.image` 

614 Image to correct. 

615 detector : `~lsst.afw.cameraGeom.detector`, optional 

616 Detector to use to determine exposure trimmed state. If 

617 supplied, but no other linearity information is provided 

618 by the calibration, then the static solution stored in the 

619 detector will be used. 

620 log : `~logging.Logger`, optional 

621 Log object to use for logging. 

622 gains : `dict` [`str`, `float`], optional 

623 Dictionary of amp name to gain. If this is provided then 

624 linearity terms will be converted from adu to electrons. 

625 Only used for Spline linearity corrections. 

626 """ 

627 if log is None: 

628 log = self.log 

629 if detector and not self.hasLinearity: 

630 self.fromDetector(detector) 

631 

632 self.validate(detector) 

633 

634 # Check if the image is trimmed. 

635 isTrimmed = None 

636 if detector: 

637 isTrimmed = isTrimmedImage(image, detector) 

638 

639 numAmps = 0 

640 numLinearized = 0 

641 numOutOfRange = 0 

642 for ampName in self.linearityType.keys(): 

643 linearizer = self.getLinearityTypeByName(self.linearityType[ampName]) 

644 numAmps += 1 

645 

646 if gains and self.linearityUnits == 'adu': 

647 gainValue = gains[ampName] 

648 else: 

649 gainValue = 1.0 

650 

651 # If the gain is NaN, then this is a no-op 

652 if np.isnan(gainValue): 

653 log.warning("Gain value for amp %s is NaN, skipping " 

654 "linearization.", ampName) 

655 continue 

656 

657 if linearizer is not None: 

658 match isTrimmed: 

659 case True: 

660 bbox = detector[ampName].getBBox() 

661 case False: 

662 bbox = detector[ampName].getRawBBox() 

663 case None: 

664 bbox = self.linearityBBox[ampName] 

665 

666 ampView = image.Factory(image, bbox) 

667 success, outOfRange = linearizer()(ampView, **{'coeffs': self.linearityCoeffs[ampName], 

668 'table': self.tableData, 

669 'log': self.log, 

670 'gain': gainValue}) 

671 numOutOfRange += outOfRange 

672 if success: 

673 numLinearized += 1 

674 elif log is not None: 

675 log.warning("Amplifier %s did not linearize.", 

676 ampName) 

677 return Struct( 

678 numAmps=numAmps, 

679 numLinearized=numLinearized, 

680 numOutOfRange=numOutOfRange 

681 ) 

682 

683 

684class LinearizeBase(metaclass=abc.ABCMeta): 

685 """Abstract base class functor for correcting non-linearity. 

686 

687 Subclasses must define ``__call__`` and set class variable 

688 LinearityType to a string that will be used for linearity type in 

689 the cameraGeom.Amplifier.linearityType field. 

690 

691 All linearity corrections should be defined in terms of an 

692 additive correction, such that: 

693 

694 corrected_value = uncorrected_value + f(uncorrected_value) 

695 """ 

696 LinearityType = None # linearity type, a string used for AmpInfoCatalogs 

697 

698 @abc.abstractmethod 

699 def __call__(self, image, **kwargs): 

700 """Correct non-linearity. 

701 

702 Parameters 

703 ---------- 

704 image : `lsst.afw.image.Image` 

705 Image to be corrected 

706 kwargs : `dict` 

707 Dictionary of parameter keywords: 

708 

709 ``coeffs`` 

710 Coefficient vector (`list` or `np.ndarray`). 

711 ``table`` 

712 Lookup table data (`np.ndarray`). 

713 ``log`` 

714 Logger to handle messages (`logging.Logger`). 

715 

716 Returns 

717 ------- 

718 output : `bool` 

719 If `True`, a correction was applied successfully. 

720 

721 Raises 

722 ------ 

723 RuntimeError: 

724 Raised if the linearity type listed in the 

725 detector does not match the class type. 

726 """ 

727 pass 

728 

729 

730class LinearizeLookupTable(LinearizeBase): 

731 """Correct non-linearity with a persisted lookup table. 

732 

733 The lookup table consists of entries such that given 

734 "coefficients" c0, c1: 

735 

736 for each i,j of image: 

737 rowInd = int(c0) 

738 colInd = int(c1 + uncorrImage[i,j]) 

739 corrImage[i,j] = uncorrImage[i,j] + table[rowInd, colInd] 

740 

741 - c0: row index; used to identify which row of the table to use 

742 (typically one per amplifier, though one can have multiple 

743 amplifiers use the same table) 

744 - c1: column index offset; added to the uncorrected image value 

745 before truncation; this supports tables that can handle 

746 negative image values; also, if the c1 ends with .5 then 

747 the nearest index is used instead of truncating to the 

748 next smaller index 

749 """ 

750 LinearityType = "LookupTable" 

751 

752 def __call__(self, image, **kwargs): 

753 """Correct for non-linearity. 

754 

755 Parameters 

756 ---------- 

757 image : `lsst.afw.image.Image` 

758 Image to be corrected 

759 kwargs : `dict` 

760 Dictionary of parameter keywords: 

761 

762 ``coeffs`` 

763 Columnation vector (`list` or `np.ndarray`). 

764 ``table`` 

765 Lookup table data (`np.ndarray`). 

766 ``log`` 

767 Logger to handle messages (`logging.Logger`). 

768 

769 Returns 

770 ------- 

771 output : `tuple` [`bool`, `int`] 

772 If true, a correction was applied successfully. The 

773 integer indicates the number of pixels that were 

774 uncorrectable by being out of range. 

775 

776 Raises 

777 ------ 

778 RuntimeError: 

779 Raised if the requested row index is out of the table 

780 bounds. 

781 """ 

782 numOutOfRange = 0 

783 

784 rowInd, colIndOffset = kwargs['coeffs'][0:2] 

785 table = kwargs['table'] 

786 log = kwargs['log'] 

787 

788 numTableRows = table.shape[0] 

789 rowInd = int(rowInd) 

790 if rowInd < 0 or rowInd > numTableRows: 

791 raise RuntimeError("LinearizeLookupTable rowInd=%s not in range[0, %s)" % 

792 (rowInd, numTableRows)) 

793 tableRow = np.array(table[rowInd, :], dtype=image.getArray().dtype) 

794 

795 numOutOfRange += applyLookupTable(image, tableRow, colIndOffset) 

796 

797 if numOutOfRange > 0 and log is not None: 

798 log.warning("%s pixels were out of range of the linearization table", 

799 numOutOfRange) 

800 if numOutOfRange < image.getArray().size: 

801 return True, numOutOfRange 

802 else: 

803 return False, numOutOfRange 

804 

805 

806class LinearizePolynomial(LinearizeBase): 

807 """Correct non-linearity with a polynomial mode. 

808 

809 .. code-block:: 

810 

811 corrImage = uncorrImage + sum_i c_i uncorrImage^(2 + i) 

812 

813 where ``c_i`` are the linearity coefficients for each amplifier. 

814 Lower order coefficients are not included as they duplicate other 

815 calibration parameters: 

816 

817 ``k0`` 

818 A coefficient multiplied by ``uncorrImage**0`` is equivalent to 

819 bias level. Irrelevant for correcting non-linearity. 

820 ``k1`` 

821 A coefficient multiplied by ``uncorrImage**1`` is proportional 

822 to the gain. Not necessary for correcting non-linearity. 

823 """ 

824 LinearityType = "Polynomial" 

825 

826 def __call__(self, image, **kwargs): 

827 """Correct non-linearity. 

828 

829 Parameters 

830 ---------- 

831 image : `lsst.afw.image.Image` 

832 Image to be corrected 

833 kwargs : `dict` 

834 Dictionary of parameter keywords: 

835 

836 ``coeffs`` 

837 Coefficient vector (`list` or `np.ndarray`). 

838 If the order of the polynomial is n, this list 

839 should have a length of n-1 ("k0" and "k1" are 

840 not needed for the correction). 

841 ``log`` 

842 Logger to handle messages (`logging.Logger`). 

843 

844 Returns 

845 ------- 

846 output : `tuple` [`bool`, `int`] 

847 If true, a correction was applied successfully. The 

848 integer indicates the number of pixels that were 

849 uncorrectable by being out of range. 

850 """ 

851 if not np.any(np.isfinite(kwargs['coeffs'])): 

852 return False, 0 

853 if not np.any(kwargs['coeffs']): 

854 return False, 0 

855 

856 ampArray = image.getArray() 

857 correction = np.zeros_like(ampArray) 

858 for order, coeff in enumerate(kwargs['coeffs'], start=2): 

859 correction += coeff * np.power(ampArray, order) 

860 ampArray += correction 

861 

862 return True, 0 

863 

864 

865class LinearizeSquared(LinearizeBase): 

866 """Correct non-linearity with a squared model. 

867 

868 corrImage = uncorrImage + c0*uncorrImage^2 

869 

870 where c0 is linearity coefficient 0 for each amplifier. 

871 """ 

872 LinearityType = "Squared" 

873 

874 def __call__(self, image, **kwargs): 

875 """Correct for non-linearity. 

876 

877 Parameters 

878 ---------- 

879 image : `lsst.afw.image.Image` 

880 Image to be corrected 

881 kwargs : `dict` 

882 Dictionary of parameter keywords: 

883 

884 ``coeffs`` 

885 Coefficient vector (`list` or `np.ndarray`). 

886 ``log`` 

887 Logger to handle messages (`logging.Logger`). 

888 

889 Returns 

890 ------- 

891 output : `tuple` [`bool`, `int`] 

892 If true, a correction was applied successfully. The 

893 integer indicates the number of pixels that were 

894 uncorrectable by being out of range. 

895 """ 

896 

897 sqCoeff = kwargs['coeffs'][0] 

898 if sqCoeff != 0: 

899 ampArr = image.getArray() 

900 ampArr *= (1 + sqCoeff*ampArr) 

901 return True, 0 

902 else: 

903 return False, 0 

904 

905 

906class LinearizeSpline(LinearizeBase): 

907 """Correct non-linearity with a spline model. 

908 

909 corrImage = uncorrImage - Spline(coeffs, uncorrImage) 

910 

911 Notes 

912 ----- 

913 

914 The spline fit calculates a correction as a function of the 

915 expected linear flux term. Because of this, the correction needs 

916 to be subtracted from the observed flux. 

917 

918 """ 

919 LinearityType = "Spline" 

920 

921 def __call__(self, image, **kwargs): 

922 """Correct for non-linearity. 

923 

924 Parameters 

925 ---------- 

926 image : `lsst.afw.image.Image` 

927 Image to be corrected 

928 kwargs : `dict` 

929 Dictionary of parameter keywords: 

930 

931 ``coeffs`` 

932 Coefficient vector (`list` or `np.ndarray`). 

933 ``log`` 

934 Logger to handle messages (`logging.Logger`). 

935 ``gain`` 

936 Gain value to apply. 

937 

938 Returns 

939 ------- 

940 output : `tuple` [`bool`, `int`] 

941 If true, a correction was applied successfully. The 

942 integer indicates the number of pixels that were 

943 uncorrectable by being out of range. 

944 """ 

945 splineCoeff = kwargs['coeffs'] 

946 gain = kwargs.get('gain', 1.0) 

947 centers, values = np.split(splineCoeff, 2) 

948 # Note that we must do a copy here because the input array 

949 # may not be volatile memory. 

950 centers = gain * centers 

951 values = gain * values 

952 # If the spline is not anchored at zero, remove the offset 

953 # found at the lowest flux available, and add an anchor at 

954 # flux=0.0 if there's no entry at that point. 

955 if values[0] != 0: 

956 offset = values[0] 

957 values -= offset 

958 if centers[0] != 0.0: 

959 centers = np.concatenate(([0.0], centers)) 

960 values = np.concatenate(([0.0], values)) 

961 

962 ampArr = image.array 

963 

964 if np.any(~np.isfinite(centers)) or np.any(~np.isfinite(values)): 

965 # This cannot be used; turns everything into nans. 

966 ampArr[:] = np.nan 

967 

968 return False, 0 

969 else: 

970 interp = Akima1DInterpolator(centers, values, method="akima") 

971 # Clip to avoid extrapolation and hitting the top end. 

972 ampArr -= interp(np.clip(ampArr, centers[0], centers[-1] - 0.1)) 

973 

974 return True, 0 

975 

976 

977class LinearizeDoubleSpline(LinearizeBase): 

978 """Correct non-linearity with a spline model. 

979 

980 corrImage1 = uncorrImage - Spline1(coeffs, uncorrImage) 

981 corrImage = corrImage1 - Spline2(coeffs, corrImage1) 

982 

983 Notes 

984 ----- 

985 

986 The spline fit calculates a correction as a function of the 

987 expected linear flux term. Because of this, the correction needs 

988 to be subtracted from the observed flux. 

989 

990 """ 

991 LinearityType = "DoubleSpline" 

992 

993 def __call__(self, image, **kwargs): 

994 """Correct for non-linearity. 

995 

996 Parameters 

997 ---------- 

998 image : `lsst.afw.image.Image` 

999 Image to be corrected 

1000 kwargs : `dict` 

1001 Dictionary of parameter keywords: 

1002 

1003 ``coeffs`` 

1004 Coefficient vector (`list` or `np.ndarray`). 

1005 ``log`` 

1006 Logger to handle messages (`logging.Logger`). 

1007 ``gain`` 

1008 Gain value to apply. 

1009 

1010 Returns 

1011 ------- 

1012 output : `tuple` [`bool`, `int`] 

1013 If true, a correction was applied successfully. The 

1014 integer indicates the number of pixels that were 

1015 uncorrectable by being out of range. 

1016 """ 

1017 coeffs = kwargs['coeffs'] 

1018 gain = kwargs.get('gain', 1.0) 

1019 

1020 # The coeffs have [nNodes1, nNodes2, nodes1, values1, nodes2, values2] 

1021 

1022 nNodes1 = int(coeffs[0]) 

1023 nNodes2 = int(coeffs[1]) 

1024 

1025 ampArr = image.array 

1026 

1027 if nNodes1 > 0: 

1028 splineCoeff1 = coeffs[2: 2 + 2*nNodes1] 

1029 centers1, values1 = np.split(splineCoeff1, 2) 

1030 

1031 if np.any(~np.isfinite(centers1)) or np.any(~np.isfinite(values1)): 

1032 # Bad linearizer. 

1033 ampArr[:] = np.nan 

1034 return False, 0 

1035 

1036 if gain != 1.0: 

1037 centers1 = centers1 * gain 

1038 values1 = values1 * gain 

1039 if nNodes2 > 0: 

1040 splineCoeff2 = coeffs[2 + 2*nNodes1: 2 + 2*nNodes1 + 2*nNodes2] 

1041 centers2, values2 = np.split(splineCoeff2, 2) 

1042 

1043 if np.any(~np.isfinite(centers2)) or np.any(~np.isfinite(values2)): 

1044 # Bad linearizer. 

1045 ampArr[:] = np.nan 

1046 return False, 0 

1047 

1048 if gain != 1.0: 

1049 centers2 = centers2 * gain 

1050 values2 = values2 * gain 

1051 

1052 # Note the double-spline will always be anchored at zero. 

1053 

1054 if nNodes1 > 0: 

1055 interp1 = Akima1DInterpolator(centers1, values1, method="akima") 

1056 # Clip to avoid extrapolation and hitting the top end. 

1057 ampArr -= interp1(np.clip(ampArr, centers1[0], centers1[-1] - 0.01)) 

1058 if nNodes2 > 0: 

1059 interp2 = Akima1DInterpolator(centers2, values2, method="akima") 

1060 # Clip to avoid extrapolation and hitting the top end. 

1061 ampArr -= interp2(np.clip(ampArr, centers2[0], centers2[-1] - 0.01)) 

1062 

1063 return True, 0 

1064 

1065 

1066class LinearizeProportional(LinearizeBase): 

1067 """Do not correct non-linearity. 

1068 """ 

1069 LinearityType = "Proportional" 

1070 

1071 def __call__(self, image, **kwargs): 

1072 """Do not correct for non-linearity. 

1073 

1074 Parameters 

1075 ---------- 

1076 image : `lsst.afw.image.Image` 

1077 Image to be corrected 

1078 kwargs : `dict` 

1079 Dictionary of parameter keywords: 

1080 

1081 ``coeffs`` 

1082 Coefficient vector (`list` or `np.ndarray`). 

1083 ``log`` 

1084 Logger to handle messages (`logging.Logger`). 

1085 

1086 Returns 

1087 ------- 

1088 output : `tuple` [`bool`, `int`] 

1089 If true, a correction was applied successfully. The 

1090 integer indicates the number of pixels that were 

1091 uncorrectable by being out of range. 

1092 """ 

1093 return True, 0 

1094 

1095 

1096class LinearizeNone(LinearizeBase): 

1097 """Do not correct non-linearity. 

1098 """ 

1099 LinearityType = "None" 

1100 

1101 def __call__(self, image, **kwargs): 

1102 """Do not correct for non-linearity. 

1103 

1104 Parameters 

1105 ---------- 

1106 image : `lsst.afw.image.Image` 

1107 Image to be corrected 

1108 kwargs : `dict` 

1109 Dictionary of parameter keywords: 

1110 

1111 ``coeffs`` 

1112 Coefficient vector (`list` or `np.ndarray`). 

1113 ``log`` 

1114 Logger to handle messages (`logging.Logger`). 

1115 

1116 Returns 

1117 ------- 

1118 output : `tuple` [`bool`, `int`] 

1119 If true, a correction was applied successfully. The 

1120 integer indicates the number of pixels that were 

1121 uncorrectable by being out of range. 

1122 """ 

1123 return True, 0