Coverage for tests / test_linearize.py: 10%

193 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-23 08:35 +0000

1# This file is part of ip_isr. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22import unittest 

23 

24import logging 

25import numpy as np 

26 

27import lsst.utils.tests 

28import lsst.utils 

29import lsst.afw.image as afwImage 

30import lsst.afw.math as afwMath 

31import lsst.afw.cameraGeom as cameraGeom 

32from lsst.afw.geom.testUtils import BoxGrid 

33from lsst.afw.image.testUtils import makeRampImage 

34from lsst.ip.isr import applyLookupTable, Linearizer 

35 

36 

37def referenceImage(image, detector, linearityType, inputData, table=None): 

38 """Generate a reference linearization. 

39 

40 Parameters 

41 ---------- 

42 image: `lsst.afw.image.Image` 

43 Image to linearize. 

44 detector: `lsst.afw.cameraGeom.Detector` 

45 Detector this image is from. 

46 linearityType: `str` 

47 Type of linearity to apply. 

48 inputData: `numpy.array` 

49 An array of values for the linearity correction. 

50 table: `numpy.array`, optional 

51 An optional lookup table to use. 

52 

53 Returns 

54 ------- 

55 outImage: `lsst.afw.image.Image` 

56 The output linearized image. 

57 numOutOfRange: `int` 

58 The number of values that could not be linearized. 

59 

60 Raises 

61 ------ 

62 RuntimeError : 

63 Raised if an invalid linearityType is supplied. 

64 """ 

65 numOutOfRange = 0 

66 for ampIdx, amp in enumerate(detector.getAmplifiers()): 

67 ampIdx = (ampIdx // 3, ampIdx % 3) 

68 bbox = amp.getBBox() 

69 imageView = image.Factory(image, bbox) 

70 

71 if linearityType == 'Squared': 

72 sqCoeff = inputData[ampIdx] 

73 array = imageView.getArray() 

74 

75 array[:] = array + sqCoeff*array**2 

76 elif linearityType == 'LookupTable': 

77 rowInd, colIndOffset = inputData[ampIdx] 

78 rowInd = int(rowInd) 

79 tableRow = table[rowInd, :] 

80 numOutOfRange += applyLookupTable(imageView, tableRow, colIndOffset) 

81 elif linearityType == 'Polynomial': 

82 coeffs = inputData[ampIdx] 

83 array = imageView.getArray() 

84 summation = np.zeros_like(array) 

85 for index, coeff in enumerate(coeffs): 

86 summation += coeff*np.power(array, (index + 2)) 

87 array += summation 

88 elif linearityType == 'Spline': 

89 centers, values = np.split(inputData, 2) # This uses the full data 

90 # Note that we are using the slow afw AKIMA_SPLINE interpolator 

91 # to offset the data, but using the equivalent but faster scipy 

92 # Akima1DInterpolator to correct the data. 

93 interp = afwMath.makeInterpolate(centers.tolist(), values.tolist(), 

94 afwMath.stringToInterpStyle('AKIMA_SPLINE')) 

95 array = imageView.getArray() 

96 delta = interp.interpolate(array.flatten()) 

97 array -= np.array(delta).reshape(array.shape) 

98 elif linearityType == 'DoubleSpline': 

99 nNodes1 = int(inputData[0]) 

100 nNodes2 = int(inputData[1]) 

101 

102 centers1, values1 = np.split(inputData[2: 2 + 2*nNodes1], 2) 

103 centers2, values2 = np.split(inputData[2 + 2*nNodes1: 2 + 2*nNodes1 + 2*nNodes2], 2) 

104 interp1 = afwMath.makeInterpolate(centers1.tolist(), values1.tolist(), 

105 afwMath.stringToInterpStyle('AKIMA_SPLINE')) 

106 interp2 = afwMath.makeInterpolate(centers2.tolist(), values2.tolist(), 

107 afwMath.stringToInterpStyle('AKIMA_SPLINE')) 

108 array = imageView.getArray() 

109 delta1 = interp1.interpolate(array.ravel()) 

110 array -= np.asarray(delta1).reshape(array.shape) 

111 delta2 = interp2.interpolate(array.ravel()) 

112 array -= np.asarray(delta2).reshape(array.shape) 

113 else: 

114 raise RuntimeError(f"Unknown linearity: {linearityType}") 

115 return image, numOutOfRange 

116 

117 

118class LinearizeTestCase(lsst.utils.tests.TestCase): 

119 """Unit tests for linearizers. 

120 """ 

121 

122 def setUp(self): 

123 # This uses the same arbitrary values used in previous tests. 

124 self.bbox = lsst.geom.Box2I(lsst.geom.Point2I(-31, 22), lsst.geom.Extent2I(100, 85)) 

125 self.ampArrangement = (2, 3) 

126 self.numAmps = self.ampArrangement[0]*self.ampArrangement[1] 

127 

128 # Squared Parameters 

129 self.sqCoeffs = np.array([[0, 5e-6, 2.5e-5], [1e-5, 1.1e-6, 2.1e-6]], dtype=float) 

130 

131 # Lookup Table Parameters 

132 self.colIndOffsets = np.array([[0, -50, 2.5], [37, 1, -3]], dtype=float) 

133 self.rowInds = np.array([[0, 1, 4], [3, 5, 2]]) 

134 # This creates a 2x3 array (matching the amplifiers) that contains a 

135 # 2x1 array containing [colIndOffset_i, rowInd_i]. 

136 self.lookupIndices = np.transpose(np.stack((self.rowInds, self.colIndOffsets), axis=0), 

137 axes=[1, 2, 0]) 

138 

139 rng = np.random.Generator(np.random.MT19937(1)) 

140 self.table = rng.normal(scale=55, size=(self.numAmps, 2500)) 

141 self.assertLess(np.max(self.rowInds), self.numAmps, "error in test conditions; invalid row index") 

142 

143 # Polynomial Parameters: small perturbation on Squared 

144 self.polyCoeffs = np.array([[[0, 1e-7], [5e-6, 1e-7], [2.5e-5, 1e-7]], 

145 [[1e-5, 1e-7], [1.1e-6, 1e-7], [2.1e-6, 1e-7]]], dtype=float) 

146 

147 # Spline coefficients: should match a 1e-6 Squared solution 

148 self.splineCoeffs = np.array([0.0, 1000, 2000, 3000, 4000, 5000, 

149 0.0, 1.0, 4.0, 9.0, 16.0, 25.0]) 

150 

151 # Double spline coefficients. 

152 self.doubleSplineCoeffs = np.asarray( 

153 [ 

154 5, # Number of nodes in first spline. 

155 6, # Number of nodes in second spline. 

156 0.0, 2000., 3000., 4000., 5000., # Nodes for first spline. 

157 0.0, 0.01, 0.02, 0.03, 0.04, # Values for first spline. 

158 0.0, 1000, 2000, 3000, 4000, 5000, # Nodes for second spline. 

159 0.0, 1.0, 4.0, 9.0, 16.0, 25.0, # Values for second spline. 

160 0.0, 0.0, # Extra filler. 

161 ], 

162 ) 

163 

164 self.log = logging.getLogger("lsst.ip.isr.testLinearizer") 

165 

166 def tearDown(self): 

167 # destroy LSST objects so memory test passes. 

168 self.bbox = None 

169 self.detector = None 

170 

171 def compareResults(self, linearizedImage, linearizedOutOfRange, linearizedCount, linearizedAmps, 

172 referenceImage, referenceOutOfRange, referenceCount, referenceAmps): 

173 """Run assert tests on results. 

174 

175 Parameters 

176 ---------- 

177 linearizedImage : `lsst.afw.image.Image` 

178 Corrected image. 

179 linearizedOutOfRange : `int` 

180 Number of measured out-of-range pixels. 

181 linearizedCount : `int` 

182 Number of amplifiers that should be linearized. 

183 linearizedAmps : `int` 

184 Total number of amplifiers checked. 

185 referenceImage : `lsst.afw.image.Image` 

186 Truth image to compare against. 

187 referenceOutOfRange : `int` 

188 Number of expected out-of-range-pixels. 

189 referenceCount : `int` 

190 Number of amplifiers that are expected to be linearized. 

191 referenceAmps : `int` 

192 Expected number of amplifiers checked. 

193 """ 

194 self.assertImagesAlmostEqual(linearizedImage, referenceImage) 

195 self.assertEqual(linearizedOutOfRange, referenceOutOfRange) 

196 self.assertEqual(linearizedCount, referenceCount) 

197 self.assertEqual(linearizedAmps, referenceAmps) 

198 

199 def testBasics(self): 

200 """Test basic linearization functionality. 

201 """ 

202 for imageClass in (afwImage.ImageF, afwImage.ImageD): 

203 inImage = makeRampImage(bbox=self.bbox, start=-5, stop=2500, imageClass=imageClass) 

204 

205 for linearityType in ('Squared', 'LookupTable', 'Polynomial', 'Spline', 'DoubleSpline'): 

206 detector = self.makeDetector(linearityType) 

207 table = None 

208 inputData = {'Squared': self.sqCoeffs, 

209 'LookupTable': self.lookupIndices, 

210 'Polynomial': self.polyCoeffs, 

211 'Spline': self.splineCoeffs, 

212 'DoubleSpline': self.doubleSplineCoeffs}[linearityType] 

213 if linearityType == 'LookupTable': 

214 table = np.array(self.table, dtype=inImage.getArray().dtype) 

215 linearizer = Linearizer(detector=detector, table=table) 

216 

217 measImage = inImage.Factory(inImage, True) 

218 result = linearizer.applyLinearity(measImage, detector=detector, log=self.log) 

219 refImage, refNumOutOfRange = referenceImage(inImage.Factory(inImage, True), 

220 detector, linearityType, inputData, table) 

221 

222 # This is necessary for the same tests to be used on 

223 # all types. The first amplifier has 0.0 for the 

224 # coefficient, which should be tested (it has a log 

225 # message), but we are not linearizing an amplifier 

226 # with no correction, so it fails the test that 

227 # numLinearized == numAmps. 

228 zeroLinearity = 1 if linearityType == 'Squared' else 0 

229 

230 self.compareResults(measImage, result.numOutOfRange, result.numLinearized, result.numAmps, 

231 refImage, refNumOutOfRange, self.numAmps - zeroLinearity, self.numAmps) 

232 

233 # Test a stand alone linearizer. This ignores validate checks. 

234 measImage = inImage.Factory(inImage, True) 

235 storedLinearizer = self.makeLinearizer(linearityType) 

236 storedResult = storedLinearizer.applyLinearity(measImage, log=self.log) 

237 

238 self.compareResults(measImage, storedResult.numOutOfRange, storedResult.numLinearized, 

239 storedResult.numAmps, 

240 refImage, refNumOutOfRange, self.numAmps - zeroLinearity, self.numAmps) 

241 

242 # "Save to yaml" and test again 

243 storedDict = storedLinearizer.toDict() 

244 storedLinearizer = Linearizer().fromDict(storedDict) 

245 

246 measImage = inImage.Factory(inImage, True) 

247 storedResult = storedLinearizer.applyLinearity(measImage, log=self.log) 

248 

249 self.compareResults(measImage, storedResult.numOutOfRange, storedResult.numLinearized, 

250 storedResult.numAmps, 

251 refImage, refNumOutOfRange, self.numAmps - zeroLinearity, self.numAmps) 

252 

253 # "Save to fits" and test again 

254 storedTable = storedLinearizer.toTable() 

255 storedLinearizer = Linearizer().fromTable(storedTable) 

256 

257 measImage = inImage.Factory(inImage, True) 

258 storedResult = storedLinearizer.applyLinearity(measImage, log=self.log) 

259 

260 self.compareResults(measImage, storedResult.numOutOfRange, storedResult.numLinearized, 

261 storedResult.numAmps, 

262 refImage, refNumOutOfRange, self.numAmps - zeroLinearity, self.numAmps) 

263 

264 # Use a gain and test again 

265 measImage = inImage.Factory(inImage, True) 

266 storedLinearizer = self.makeLinearizer(linearityType) 

267 gains = {key: 1.0 for key in storedLinearizer.linearityType.keys()} 

268 storedResult = storedLinearizer.applyLinearity(measImage, log=self.log, gains=gains) 

269 

270 self.compareResults(measImage, storedResult.numOutOfRange, storedResult.numLinearized, 

271 storedResult.numAmps, 

272 refImage, refNumOutOfRange, self.numAmps - zeroLinearity, self.numAmps) 

273 

274 def makeDetector(self, linearityType, bbox=None): 

275 """Generate a fake detector for the test. 

276 

277 Parameters 

278 ---------- 

279 linearityType : `str` 

280 Which linearity to assign to the detector's cameraGeom. 

281 bbox : `lsst.geom.Box2I`, optional 

282 Bounding box to use for the detector. 

283 

284 Returns 

285 ------- 

286 detBuilder : `lsst.afw.cameraGeom.Detector` 

287 The fake detector. 

288 """ 

289 bbox = bbox if bbox is not None else self.bbox 

290 numAmps = self.ampArrangement 

291 

292 detName = "det_a" 

293 detId = 1 

294 detSerial = "123" 

295 orientation = cameraGeom.Orientation() 

296 pixelSize = lsst.geom.Extent2D(1, 1) 

297 

298 camBuilder = cameraGeom.Camera.Builder("fakeCam") 

299 detBuilder = camBuilder.add(detName, detId) 

300 detBuilder.setSerial(detSerial) 

301 detBuilder.setBBox(bbox) 

302 detBuilder.setOrientation(orientation) 

303 detBuilder.setPixelSize(pixelSize) 

304 

305 boxArr = BoxGrid(box=bbox, numColRow=numAmps) 

306 for i in range(numAmps[0]): 

307 for j in range(numAmps[1]): 

308 ampInfo = cameraGeom.Amplifier.Builder() 

309 ampInfo.setName("amp %d_%d" % (i + 1, j + 1)) 

310 ampInfo.setBBox(boxArr[i, j]) 

311 ampInfo.setLinearityType(linearityType) 

312 if linearityType == 'Squared': 

313 ampInfo.setLinearityCoeffs([self.sqCoeffs[i, j]]) 

314 elif linearityType == 'LookupTable': 

315 # setLinearityCoeffs is picky about getting a mixed 

316 # int/float list. 

317 ampInfo.setLinearityCoeffs(np.array([self.rowInds[i, j], self.colIndOffsets[i, j], 

318 0, 0], dtype=float)) 

319 elif linearityType == 'Polynomial': 

320 ampInfo.setLinearityCoeffs(self.polyCoeffs[i, j]) 

321 elif linearityType == 'Spline': 

322 ampInfo.setLinearityCoeffs(self.splineCoeffs) 

323 elif linearityType == 'DoubleSpline': 

324 ampInfo.setLinearityCoeffs(self.doubleSplineCoeffs) 

325 detBuilder.append(ampInfo) 

326 

327 return detBuilder 

328 

329 def makeLinearizer(self, linearityType, bbox=None): 

330 """Construct a linearizer with the test coefficients. 

331 

332 Parameters 

333 ---------- 

334 linearityType : `str` 

335 Type of linearity to use. The coefficients are set by the 

336 setUp method. 

337 bbox : `lsst.geom.Box2I` 

338 Bounding box for the full detector. Used to assign 

339 amp-based bounding boxes. 

340 

341 Returns 

342 ------- 

343 linearizer : `lsst.ip.isr.Linearizer` 

344 A fully constructed, persistable linearizer. 

345 """ 

346 bbox = bbox if bbox is not None else self.bbox 

347 numAmps = self.ampArrangement 

348 boxArr = BoxGrid(box=bbox, numColRow=numAmps) 

349 linearizer = Linearizer() 

350 linearizer.hasLinearity = True 

351 

352 for i in range(numAmps[0]): 

353 for j in range(numAmps[1]): 

354 ampName = f"amp {i+1}_{j+1}" 

355 ampBox = boxArr[i, j] 

356 linearizer.ampNames.append(ampName) 

357 

358 if linearityType == 'Squared': 

359 linearizer.linearityCoeffs[ampName] = np.array([self.sqCoeffs[i, j]]) 

360 elif linearityType == 'LookupTable': 

361 linearizer.linearityCoeffs[ampName] = np.array(self.lookupIndices[i, j]) 

362 linearizer.tableData = self.table 

363 elif linearityType == 'Polynomial': 

364 linearizer.linearityCoeffs[ampName] = np.array(self.polyCoeffs[i, j]) 

365 elif linearityType == 'Spline': 

366 linearizer.linearityCoeffs[ampName] = np.array(self.splineCoeffs) 

367 elif linearityType == 'DoubleSpline': 

368 linearizer.linearityCoeffs[ampName] = np.asarray(self.doubleSplineCoeffs) 

369 

370 linearizer.inputGain[ampName] = 1.0 

371 linearizer.inputTurnoff[ampName] = 100_000.0 

372 linearizer.linearityType[ampName] = linearityType 

373 linearizer.linearityBBox[ampName] = ampBox 

374 linearizer.inputAbscissa[ampName] = np.array([]) 

375 linearizer.inputOrdinate[ampName] = np.array([]) 

376 linearizer.inputMask[ampName] = np.array([], dtype=np.bool_) 

377 linearizer.inputGroupingIndex[ampName] = np.array([], dtype=np.int64) 

378 linearizer.inputNormalization[ampName] = np.array([]) 

379 linearizer.fitParams[ampName] = np.array([]) 

380 linearizer.fitParamsErr[ampName] = np.array([]) 

381 linearizer.fitChiSq[ampName] = np.nan 

382 linearizer.fitResiduals[ampName] = np.array([]) 

383 linearizer.fitResidualsSigmaMad[ampName] = np.nan 

384 linearizer.fitResidualsUnmasked[ampName] = np.array([]) 

385 linearizer.fitResidualsModel[ampName] = np.array([]) 

386 linearizer.linearFit[ampName] = np.array([]) 

387 linearizer.linearityTurnoff[ampName] = np.nan 

388 linearizer.linearityMaxSignal[ampName] = np.nan 

389 

390 return linearizer 

391 

392 

393class MemoryTester(lsst.utils.tests.MemoryTestCase): 

394 pass 

395 

396 

397def setup_module(module): 

398 lsst.utils.tests.init() 

399 

400 

401if __name__ == "__main__": 401 ↛ 402line 401 didn't jump to line 402 because the condition on line 401 was never true

402 lsst.utils.tests.init() 

403 unittest.main()