Coverage for python / lsst / cp / verify / verifyLinearity.py: 25%

140 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-24 08:55 +0000

1# This file is part of cp_verify. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22__all__ = [ 

23 "CpVerifyLinearityConnections", 

24 "CpVerifyLinearityConfig", 

25 "CpVerifyLinearityTask", 

26 "CpvLinearitySolveConfig", 

27 "CpvLinearitySolveTask", 

28] 

29 

30import lsst.cp.pipe as cpPipe 

31import lsst.pipe.base as pipeBase 

32 

33import numpy as np 

34import lsst.pex.config as pexConfig 

35 

36import lsst.pipe.base.connectionTypes as cT 

37from .verifyCalib import CpVerifyCalibConfig, CpVerifyCalibTask, CpVerifyCalibConnections 

38 

39 

40class CpVerifyLinearityConnections(CpVerifyCalibConnections, 

41 dimensions={"instrument", "detector"}, 

42 defaultTemplates={}): 

43 inputCalib = cT.Input( 

44 name="calib", 

45 doc="Input calib to calculate statistics for.", 

46 storageClass="Linearizer", 

47 dimensions=("instrument", "detector"), 

48 isCalibration=True 

49 ) 

50 

51 

52class CpVerifyLinearityConfig(CpVerifyCalibConfig, 

53 pipelineConnections=CpVerifyLinearityConnections): 

54 """Inherits from base CpVerifyCalibConfig.""" 

55 

56 maxResidualThresholdSpline = pexConfig.Field( 

57 dtype=float, 

58 doc="Maximum percentage for linearity residuals, if spline", 

59 default=1.0, 

60 ) 

61 expectedQuadraticCoeffPolynomial = pexConfig.Field( 

62 dtype=float, 

63 doc="Expected amplitude of second-order non-linearity coefficient, if polynomial.", 

64 default=1e-6, 

65 ) 

66 maxResidualThresholdTable = pexConfig.Field( 

67 dtype=float, 

68 doc="Maximum percentage for linearity residuals, if lookup table.", 

69 default=1.0, 

70 ) 

71 

72 def setDefaults(self): 

73 super().setDefaults() 

74 self.stageName = 'LINEARITY' 

75 

76 

77class CpVerifyLinearityTask(CpVerifyCalibTask): 

78 """Linearity verification sub-class, implementing the verify method. 

79 """ 

80 ConfigClass = CpVerifyLinearityConfig 

81 _DefaultName = 'cpVerifyLinearity' 

82 

83 def detectorStatistics(self, inputCalib, camera=None): 

84 """Calculate detector level statistics from the calibration. 

85 

86 Parameters 

87 ---------- 

88 inputCalib : `lsst.ip.isr.IsrCalib` 

89 The calibration to verify. 

90 camera : `lsst.afw.cameraGeom.Camera`, optional 

91 Input camera to get detectors from. 

92 

93 Returns 

94 ------- 

95 outputStatistics : `dict` [`str`, scalar] 

96 A dictionary of the statistics measured and their values. 

97 """ 

98 return {} 

99 

100 def amplifierStatistics(self, inputCalib, camera=None): 

101 """Calculate detector level statistics from the calibration. 

102 

103 Parameters 

104 ---------- 

105 inputCalib : `lsst.ip.isr.IsrCalib` 

106 The calibration to verify. 

107 camera : `lsst.afw.cameraGeom.Camera`, optional 

108 Input camera to get detectors from. 

109 

110 Returns 

111 ------- 

112 outputStatistics : `dict` [`str`, scalar] 

113 A dictionary of the statistics measured and their values. 

114 """ 

115 calibMetadata = inputCalib.getMetadata() 

116 detId = calibMetadata['DETECTOR'] 

117 detector = camera[detId] 

118 outputStatistics = {amp.getName(): {} for amp in detector} 

119 for amp in detector: 

120 ampName = amp.getName() 

121 outputStatistics[ampName]['FIT_PARAMS'] = inputCalib.fitParams[ampName].tolist() 

122 outputStatistics[ampName]['FIT_PARAMS_ERR'] = inputCalib.fitParamsErr[ampName].tolist() 

123 outputStatistics[ampName]['FIT_RESIDUALS'] = inputCalib.fitResiduals[ampName].tolist() 

124 outputStatistics[ampName]['LINEAR_FIT'] = inputCalib.linearFit[ampName].tolist() 

125 outputStatistics[ampName]['LINEARITY_COEFFS'] = inputCalib.linearityCoeffs[ampName].tolist() 

126 outputStatistics[ampName]['LINEARITY_TYPE'] = str(inputCalib.linearityType[ampName]) 

127 if inputCalib.linearityType[ampName] == 'LookupTable': 

128 outputStatistics[ampName]['TABLE_DATA'] = inputCalib.tableData[ampName].tolist() 

129 return outputStatistics 

130 

131 def verify(self, calib, statisticsDict, camera=None): 

132 """Verify that the calibration meets the verification criteria. 

133 

134 Parameters 

135 ---------- 

136 inputCalib : `lsst.ip.isr.IsrCalib` 

137 The calibration to verify. 

138 statisticsDictionary : `dict` [`str`, `dict` [`str`, scalar]], 

139 Dictionary of measured statistics. The inner dictionary 

140 should have keys that are statistic names (`str`) with 

141 values that are some sort of scalar (`int` or `float` are 

142 the mostly likely types). 

143 camera : `lsst.afw.cameraGeom.Camera`, optional 

144 Input camera to get detectors from. 

145 

146 Returns 

147 ------- 

148 outputStatistics : `dict` [`str`, `dict` [`str`, `bool`]] 

149 A dictionary indexed by the amplifier name, containing 

150 dictionaries of the verification criteria. 

151 success : `bool` 

152 A boolean indicating whether all tests have passed. 

153 """ 

154 verifyStats = {} 

155 success = True 

156 calibMetadata = calib.getMetadata() 

157 detId = calibMetadata['DETECTOR'] 

158 detector = camera[detId] 

159 verifyAmpStats = {} 

160 for amp in detector: 

161 verify = {} 

162 ampName = amp.getName() 

163 linearityType = calib.linearityType[ampName] 

164 measuredCoeffs = calib.linearityCoeffs[ampName] 

165 if linearityType == 'Spline': 

166 binCenters, values = np.split(calib.linearityCoeffs[ampName], 2) 

167 maxError = max(abs(values/binCenters))*100 

168 verify['MAX_RESIDUAL_ERROR'] = bool(maxError <= self.config.maxResidualThresholdSpline) 

169 elif linearityType == 'Squared': 

170 c0 = np.abs(measuredCoeffs[2]) 

171 verify['MAX_RESIDUAL_ERROR'] = bool(c0 <= self.config.expectedQuadraticCoeffPolynomial) 

172 elif linearityType == 'Polynomial': 

173 epsilon = np.sqrt(self.config.expectedQuadraticCoeffPolynomial) 

174 coeffs = np.abs(measuredCoeffs[2:]) 

175 # coeffs[0] is now the quadratic term. Scale higher-order terms 

176 # with epsilon. 

177 thresholds = [coeffs[0]*epsilon**order for order, c in enumerate(coeffs)] 

178 thresholds[0] = epsilon**2 

179 verify['MAX_RESIDUAL_ERROR'] = bool(np.all(coeffs <= thresholds)) 

180 elif linearityType == 'LookupTable': 

181 # If 'LookupTable', linearityCoeffs is of the form {'C10': 

182 # array([0, 0]), 'C11': array([1, 0]), ... } 

183 indexTableAmp, offset = calib.linearityCoeffs[ampName] 

184 # Indices of second axis of table is flux range, up to 

185 # 2**18 ADU 

186 indices = np.arange(1, len(calib.tableData.T)) + offset 

187 # Look at correction (what the table provides) divided by 

188 # signal 

189 delta = calib.tableData[indexTableAmp, :][1:] / indices 

190 maxError = np.max(np.abs(delta)) 

191 verify['MAX_RESIDUAL_ERROR'] = bool(maxError <= self.config.maxResidualThresholdTable) 

192 else: 

193 # 'None' type found. Dummy linearizer. 

194 self.log.warning("Dummy linearizer found (type: `None`).") 

195 verify['MAX_RESIDUAL_ERROR'] = False 

196 

197 # Overall success among all tests for this amp. 

198 verify['SUCCESS'] = bool(np.all(list(verify.values()))) 

199 if verify['SUCCESS'] is False: 

200 success = False 

201 

202 verifyAmpStats[ampName] = verify 

203 

204 # Loop over amps to make a detector summary. 

205 verifyDetStats = {'MAX_RESIDUAL_ERROR': []} 

206 for amp in verifyStats: 

207 for testName in verifyStats[amp]: 

208 if testName == 'SUCCESS': 

209 continue 

210 verifyDetStats[testName].append(verifyStats[amp][testName]) 

211 

212 # VerifyDetStatsFinal has final boolean test over all amps 

213 verifyDetStatsFinal = {} 

214 for testName in verifyDetStats: 

215 testBool = bool(np.all(list(verifyDetStats[testName]))) 

216 # Save the tests that failed 

217 if not testBool: 

218 verifyDetStatsFinal[testName] = bool(np.all(list(verifyDetStats[testName]))) 

219 return {'AMP': verifyAmpStats, 

220 'DET': verifyDetStatsFinal}, bool(success) 

221 

222 def repackStats(self, statisticsDict, dimensions): 

223 # docstring inherited 

224 rows = {} 

225 rowList = [] 

226 matrixRowList = [] 

227 

228 rowBase = { 

229 "instrument": dimensions["instrument"], 

230 "detector": dimensions["detector"], 

231 } 

232 

233 # AMP results 

234 for ampName, stats in statisticsDict["AMP"].items(): 

235 coeffs = stats.pop("LINEARITY_COEFFS") 

236 if stats["LINEARITY_TYPE"] == 'spline': 

237 centers, values = np.split(coeffs, 2) 

238 centers = centers.tolist() 

239 values = values.tolist() 

240 else: 

241 values = coeffs 

242 centers = np.full_like(values, np.nan).tolist() 

243 

244 rows[ampName] = {} 

245 rows[ampName].update(rowBase) 

246 rows[ampName]["amplifier"] = ampName 

247 rows[ampName][f"{self.config.stageName}_CENTERS"] = centers 

248 rows[ampName][f"{self.config.stageName}_COEFFS"] = values 

249 for testName, value in stats.items(): 

250 rows[ampName][f"{self.config.stageName}_{testName}"] = value 

251 

252 # VERIFY results 

253 rows["detector"] = rowBase 

254 for ampName, stats in statisticsDict["VERIFY"]["AMP"].items(): 

255 for testName, value in stats.items(): 

256 rows[ampName][f"{self.config.stageName}_{testName}"] = value 

257 

258 for ampName, stats in rows.items(): 

259 rowList.append(stats) 

260 

261 return rowList, matrixRowList 

262 

263 

264# Subclass the linearity classess so that the linearizer 

265# is a regular Input instead of a PrerequisiteInput 

266class CpvLinearitySolveConnections(pipeBase.PipelineTaskConnections, 

267 dimensions=("instrument", "detector")): 

268 dummy = cT.Input( 

269 name="raw", 

270 doc="Dummy exposure.", 

271 storageClass='Exposure', 

272 dimensions=("instrument", "exposure", "detector"), 

273 multiple=True, 

274 deferLoad=True, 

275 ) 

276 camera = cT.PrerequisiteInput( 

277 name="camera", 

278 doc="Camera Geometry definition.", 

279 storageClass="Camera", 

280 dimensions=("instrument", ), 

281 isCalibration=True, 

282 ) 

283 inputPtc = cT.Input( 

284 name="ptc", 

285 doc="Input PTC dataset.", 

286 storageClass="PhotonTransferCurveDataset", 

287 dimensions=("instrument", "detector"), 

288 isCalibration=True, 

289 ) 

290 inputPhotodiodeCorrection = cT.Input( 

291 name="cpvPdCorrection", 

292 doc="Input photodiode correction.", 

293 storageClass="IsrCalib", 

294 dimensions=("instrument", ), 

295 isCalibration=True, 

296 ) 

297 

298 outputLinearizer = cT.Output( 

299 name="cpvLinearity", 

300 doc="Output linearity measurements.", 

301 storageClass="Linearizer", 

302 dimensions=("instrument", "detector"), 

303 isCalibration=True, 

304 ) 

305 

306 def __init__(self, *, config=None): 

307 super().__init__(config=config) 

308 

309 if config.applyPhotodiodeCorrection is not True: 

310 self.inputs.discard("inputPhotodiodeCorrection") 

311 

312 

313class CpvLinearitySolveConfig(cpPipe.LinearitySolveConfig, 

314 pipelineConnections=CpvLinearitySolveConnections): 

315 pass 

316 

317 

318class CpvLinearitySolveTask(cpPipe.LinearitySolveTask): 

319 

320 ConfigClass = CpvLinearitySolveConfig 

321 _DefaultName = "cpvLinearityTask" 

322 

323 pass 

324 

325 

326class CpvPhotodiodeCorrectionConnections(pipeBase.PipelineTaskConnections, 

327 dimensions=("instrument",)): 

328 

329 camera = cT.PrerequisiteInput( 

330 name="camera", 

331 doc="Camera Geometry definition.", 

332 storageClass="Camera", 

333 dimensions=("instrument", ), 

334 isCalibration=True, 

335 ) 

336 

337 inputPtc = cT.Input( 

338 name="ptc", 

339 doc="Input PTC dataset.", 

340 storageClass="PhotonTransferCurveDataset", 

341 dimensions=("instrument", "detector"), 

342 multiple=True, 

343 isCalibration=True, 

344 ) 

345 

346 inputLinearizer = cT.Input( 

347 name="unCorrectedLinearizer", 

348 doc="Raw linearizers that have not been corrected.", 

349 storageClass="Linearizer", 

350 dimensions=("instrument", "detector"), 

351 multiple=True, 

352 isCalibration=True, 

353 ) 

354 

355 outputPhotodiodeCorrection = cT.Output( 

356 name="pdCorrection", 

357 doc="Correction of photodiode systematic error.", 

358 storageClass="IsrCalib", 

359 dimensions=("instrument", ), 

360 isCalibration=True, 

361 ) 

362 

363 

364class CpvPhotodiodeCorrectionConfig(cpPipe.PhotodiodeCorrectionConfig, 

365 pipelineConnections=CpvPhotodiodeCorrectionConnections): 

366 pass 

367 

368 

369class CpvPhotodiodeCorrectionTask(cpPipe.PhotodiodeCorrectionTask): 

370 

371 ConfigClass = CpvPhotodiodeCorrectionConfig 

372 _DefaultName = "cpvPdCorrTask" 

373 

374 pass