Coverage for tests / test_linearize.py: 10%
193 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-24 08:28 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-24 08:28 +0000
1# This file is part of ip_isr.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22import unittest
24import logging
25import numpy as np
27import lsst.utils.tests
28import lsst.utils
29import lsst.afw.image as afwImage
30import lsst.afw.math as afwMath
31import lsst.afw.cameraGeom as cameraGeom
32from lsst.afw.geom.testUtils import BoxGrid
33from lsst.afw.image.testUtils import makeRampImage
34from lsst.ip.isr import applyLookupTable, Linearizer
37def referenceImage(image, detector, linearityType, inputData, table=None):
38 """Generate a reference linearization.
40 Parameters
41 ----------
42 image: `lsst.afw.image.Image`
43 Image to linearize.
44 detector: `lsst.afw.cameraGeom.Detector`
45 Detector this image is from.
46 linearityType: `str`
47 Type of linearity to apply.
48 inputData: `numpy.array`
49 An array of values for the linearity correction.
50 table: `numpy.array`, optional
51 An optional lookup table to use.
53 Returns
54 -------
55 outImage: `lsst.afw.image.Image`
56 The output linearized image.
57 numOutOfRange: `int`
58 The number of values that could not be linearized.
60 Raises
61 ------
62 RuntimeError :
63 Raised if an invalid linearityType is supplied.
64 """
65 numOutOfRange = 0
66 for ampIdx, amp in enumerate(detector.getAmplifiers()):
67 ampIdx = (ampIdx // 3, ampIdx % 3)
68 bbox = amp.getBBox()
69 imageView = image.Factory(image, bbox)
71 if linearityType == 'Squared':
72 sqCoeff = inputData[ampIdx]
73 array = imageView.getArray()
75 array[:] = array + sqCoeff*array**2
76 elif linearityType == 'LookupTable':
77 rowInd, colIndOffset = inputData[ampIdx]
78 rowInd = int(rowInd)
79 tableRow = table[rowInd, :]
80 numOutOfRange += applyLookupTable(imageView, tableRow, colIndOffset)
81 elif linearityType == 'Polynomial':
82 coeffs = inputData[ampIdx]
83 array = imageView.getArray()
84 summation = np.zeros_like(array)
85 for index, coeff in enumerate(coeffs):
86 summation += coeff*np.power(array, (index + 2))
87 array += summation
88 elif linearityType == 'Spline':
89 centers, values = np.split(inputData, 2) # This uses the full data
90 # Note that we are using the slow afw AKIMA_SPLINE interpolator
91 # to offset the data, but using the equivalent but faster scipy
92 # Akima1DInterpolator to correct the data.
93 interp = afwMath.makeInterpolate(centers.tolist(), values.tolist(),
94 afwMath.stringToInterpStyle('AKIMA_SPLINE'))
95 array = imageView.getArray()
96 delta = interp.interpolate(array.flatten())
97 array -= np.array(delta).reshape(array.shape)
98 elif linearityType == 'DoubleSpline':
99 nNodes1 = int(inputData[0])
100 nNodes2 = int(inputData[1])
102 centers1, values1 = np.split(inputData[2: 2 + 2*nNodes1], 2)
103 centers2, values2 = np.split(inputData[2 + 2*nNodes1: 2 + 2*nNodes1 + 2*nNodes2], 2)
104 interp1 = afwMath.makeInterpolate(centers1.tolist(), values1.tolist(),
105 afwMath.stringToInterpStyle('AKIMA_SPLINE'))
106 interp2 = afwMath.makeInterpolate(centers2.tolist(), values2.tolist(),
107 afwMath.stringToInterpStyle('AKIMA_SPLINE'))
108 array = imageView.getArray()
109 delta1 = interp1.interpolate(array.ravel())
110 array -= np.asarray(delta1).reshape(array.shape)
111 delta2 = interp2.interpolate(array.ravel())
112 array -= np.asarray(delta2).reshape(array.shape)
113 else:
114 raise RuntimeError(f"Unknown linearity: {linearityType}")
115 return image, numOutOfRange
118class LinearizeTestCase(lsst.utils.tests.TestCase):
119 """Unit tests for linearizers.
120 """
122 def setUp(self):
123 # This uses the same arbitrary values used in previous tests.
124 self.bbox = lsst.geom.Box2I(lsst.geom.Point2I(-31, 22), lsst.geom.Extent2I(100, 85))
125 self.ampArrangement = (2, 3)
126 self.numAmps = self.ampArrangement[0]*self.ampArrangement[1]
128 # Squared Parameters
129 self.sqCoeffs = np.array([[0, 5e-6, 2.5e-5], [1e-5, 1.1e-6, 2.1e-6]], dtype=float)
131 # Lookup Table Parameters
132 self.colIndOffsets = np.array([[0, -50, 2.5], [37, 1, -3]], dtype=float)
133 self.rowInds = np.array([[0, 1, 4], [3, 5, 2]])
134 # This creates a 2x3 array (matching the amplifiers) that contains a
135 # 2x1 array containing [colIndOffset_i, rowInd_i].
136 self.lookupIndices = np.transpose(np.stack((self.rowInds, self.colIndOffsets), axis=0),
137 axes=[1, 2, 0])
139 rng = np.random.Generator(np.random.MT19937(1))
140 self.table = rng.normal(scale=55, size=(self.numAmps, 2500))
141 self.assertLess(np.max(self.rowInds), self.numAmps, "error in test conditions; invalid row index")
143 # Polynomial Parameters: small perturbation on Squared
144 self.polyCoeffs = np.array([[[0, 1e-7], [5e-6, 1e-7], [2.5e-5, 1e-7]],
145 [[1e-5, 1e-7], [1.1e-6, 1e-7], [2.1e-6, 1e-7]]], dtype=float)
147 # Spline coefficients: should match a 1e-6 Squared solution
148 self.splineCoeffs = np.array([0.0, 1000, 2000, 3000, 4000, 5000,
149 0.0, 1.0, 4.0, 9.0, 16.0, 25.0])
151 # Double spline coefficients.
152 self.doubleSplineCoeffs = np.asarray(
153 [
154 5, # Number of nodes in first spline.
155 6, # Number of nodes in second spline.
156 0.0, 2000., 3000., 4000., 5000., # Nodes for first spline.
157 0.0, 0.01, 0.02, 0.03, 0.04, # Values for first spline.
158 0.0, 1000, 2000, 3000, 4000, 5000, # Nodes for second spline.
159 0.0, 1.0, 4.0, 9.0, 16.0, 25.0, # Values for second spline.
160 0.0, 0.0, # Extra filler.
161 ],
162 )
164 self.log = logging.getLogger("lsst.ip.isr.testLinearizer")
166 def tearDown(self):
167 # destroy LSST objects so memory test passes.
168 self.bbox = None
169 self.detector = None
171 def compareResults(self, linearizedImage, linearizedOutOfRange, linearizedCount, linearizedAmps,
172 referenceImage, referenceOutOfRange, referenceCount, referenceAmps):
173 """Run assert tests on results.
175 Parameters
176 ----------
177 linearizedImage : `lsst.afw.image.Image`
178 Corrected image.
179 linearizedOutOfRange : `int`
180 Number of measured out-of-range pixels.
181 linearizedCount : `int`
182 Number of amplifiers that should be linearized.
183 linearizedAmps : `int`
184 Total number of amplifiers checked.
185 referenceImage : `lsst.afw.image.Image`
186 Truth image to compare against.
187 referenceOutOfRange : `int`
188 Number of expected out-of-range-pixels.
189 referenceCount : `int`
190 Number of amplifiers that are expected to be linearized.
191 referenceAmps : `int`
192 Expected number of amplifiers checked.
193 """
194 self.assertImagesAlmostEqual(linearizedImage, referenceImage)
195 self.assertEqual(linearizedOutOfRange, referenceOutOfRange)
196 self.assertEqual(linearizedCount, referenceCount)
197 self.assertEqual(linearizedAmps, referenceAmps)
199 def testBasics(self):
200 """Test basic linearization functionality.
201 """
202 for imageClass in (afwImage.ImageF, afwImage.ImageD):
203 inImage = makeRampImage(bbox=self.bbox, start=-5, stop=2500, imageClass=imageClass)
205 for linearityType in ('Squared', 'LookupTable', 'Polynomial', 'Spline', 'DoubleSpline'):
206 detector = self.makeDetector(linearityType)
207 table = None
208 inputData = {'Squared': self.sqCoeffs,
209 'LookupTable': self.lookupIndices,
210 'Polynomial': self.polyCoeffs,
211 'Spline': self.splineCoeffs,
212 'DoubleSpline': self.doubleSplineCoeffs}[linearityType]
213 if linearityType == 'LookupTable':
214 table = np.array(self.table, dtype=inImage.getArray().dtype)
215 linearizer = Linearizer(detector=detector, table=table)
217 measImage = inImage.Factory(inImage, True)
218 result = linearizer.applyLinearity(measImage, detector=detector, log=self.log)
219 refImage, refNumOutOfRange = referenceImage(inImage.Factory(inImage, True),
220 detector, linearityType, inputData, table)
222 # This is necessary for the same tests to be used on
223 # all types. The first amplifier has 0.0 for the
224 # coefficient, which should be tested (it has a log
225 # message), but we are not linearizing an amplifier
226 # with no correction, so it fails the test that
227 # numLinearized == numAmps.
228 zeroLinearity = 1 if linearityType == 'Squared' else 0
230 self.compareResults(measImage, result.numOutOfRange, result.numLinearized, result.numAmps,
231 refImage, refNumOutOfRange, self.numAmps - zeroLinearity, self.numAmps)
233 # Test a stand alone linearizer. This ignores validate checks.
234 measImage = inImage.Factory(inImage, True)
235 storedLinearizer = self.makeLinearizer(linearityType)
236 storedResult = storedLinearizer.applyLinearity(measImage, log=self.log)
238 self.compareResults(measImage, storedResult.numOutOfRange, storedResult.numLinearized,
239 storedResult.numAmps,
240 refImage, refNumOutOfRange, self.numAmps - zeroLinearity, self.numAmps)
242 # "Save to yaml" and test again
243 storedDict = storedLinearizer.toDict()
244 storedLinearizer = Linearizer().fromDict(storedDict)
246 measImage = inImage.Factory(inImage, True)
247 storedResult = storedLinearizer.applyLinearity(measImage, log=self.log)
249 self.compareResults(measImage, storedResult.numOutOfRange, storedResult.numLinearized,
250 storedResult.numAmps,
251 refImage, refNumOutOfRange, self.numAmps - zeroLinearity, self.numAmps)
253 # "Save to fits" and test again
254 storedTable = storedLinearizer.toTable()
255 storedLinearizer = Linearizer().fromTable(storedTable)
257 measImage = inImage.Factory(inImage, True)
258 storedResult = storedLinearizer.applyLinearity(measImage, log=self.log)
260 self.compareResults(measImage, storedResult.numOutOfRange, storedResult.numLinearized,
261 storedResult.numAmps,
262 refImage, refNumOutOfRange, self.numAmps - zeroLinearity, self.numAmps)
264 # Use a gain and test again
265 measImage = inImage.Factory(inImage, True)
266 storedLinearizer = self.makeLinearizer(linearityType)
267 gains = {key: 1.0 for key in storedLinearizer.linearityType.keys()}
268 storedResult = storedLinearizer.applyLinearity(measImage, log=self.log, gains=gains)
270 self.compareResults(measImage, storedResult.numOutOfRange, storedResult.numLinearized,
271 storedResult.numAmps,
272 refImage, refNumOutOfRange, self.numAmps - zeroLinearity, self.numAmps)
274 def makeDetector(self, linearityType, bbox=None):
275 """Generate a fake detector for the test.
277 Parameters
278 ----------
279 linearityType : `str`
280 Which linearity to assign to the detector's cameraGeom.
281 bbox : `lsst.geom.Box2I`, optional
282 Bounding box to use for the detector.
284 Returns
285 -------
286 detBuilder : `lsst.afw.cameraGeom.Detector`
287 The fake detector.
288 """
289 bbox = bbox if bbox is not None else self.bbox
290 numAmps = self.ampArrangement
292 detName = "det_a"
293 detId = 1
294 detSerial = "123"
295 orientation = cameraGeom.Orientation()
296 pixelSize = lsst.geom.Extent2D(1, 1)
298 camBuilder = cameraGeom.Camera.Builder("fakeCam")
299 detBuilder = camBuilder.add(detName, detId)
300 detBuilder.setSerial(detSerial)
301 detBuilder.setBBox(bbox)
302 detBuilder.setOrientation(orientation)
303 detBuilder.setPixelSize(pixelSize)
305 boxArr = BoxGrid(box=bbox, numColRow=numAmps)
306 for i in range(numAmps[0]):
307 for j in range(numAmps[1]):
308 ampInfo = cameraGeom.Amplifier.Builder()
309 ampInfo.setName("amp %d_%d" % (i + 1, j + 1))
310 ampInfo.setBBox(boxArr[i, j])
311 ampInfo.setLinearityType(linearityType)
312 if linearityType == 'Squared':
313 ampInfo.setLinearityCoeffs([self.sqCoeffs[i, j]])
314 elif linearityType == 'LookupTable':
315 # setLinearityCoeffs is picky about getting a mixed
316 # int/float list.
317 ampInfo.setLinearityCoeffs(np.array([self.rowInds[i, j], self.colIndOffsets[i, j],
318 0, 0], dtype=float))
319 elif linearityType == 'Polynomial':
320 ampInfo.setLinearityCoeffs(self.polyCoeffs[i, j])
321 elif linearityType == 'Spline':
322 ampInfo.setLinearityCoeffs(self.splineCoeffs)
323 elif linearityType == 'DoubleSpline':
324 ampInfo.setLinearityCoeffs(self.doubleSplineCoeffs)
325 detBuilder.append(ampInfo)
327 return detBuilder
329 def makeLinearizer(self, linearityType, bbox=None):
330 """Construct a linearizer with the test coefficients.
332 Parameters
333 ----------
334 linearityType : `str`
335 Type of linearity to use. The coefficients are set by the
336 setUp method.
337 bbox : `lsst.geom.Box2I`
338 Bounding box for the full detector. Used to assign
339 amp-based bounding boxes.
341 Returns
342 -------
343 linearizer : `lsst.ip.isr.Linearizer`
344 A fully constructed, persistable linearizer.
345 """
346 bbox = bbox if bbox is not None else self.bbox
347 numAmps = self.ampArrangement
348 boxArr = BoxGrid(box=bbox, numColRow=numAmps)
349 linearizer = Linearizer()
350 linearizer.hasLinearity = True
352 for i in range(numAmps[0]):
353 for j in range(numAmps[1]):
354 ampName = f"amp {i+1}_{j+1}"
355 ampBox = boxArr[i, j]
356 linearizer.ampNames.append(ampName)
358 if linearityType == 'Squared':
359 linearizer.linearityCoeffs[ampName] = np.array([self.sqCoeffs[i, j]])
360 elif linearityType == 'LookupTable':
361 linearizer.linearityCoeffs[ampName] = np.array(self.lookupIndices[i, j])
362 linearizer.tableData = self.table
363 elif linearityType == 'Polynomial':
364 linearizer.linearityCoeffs[ampName] = np.array(self.polyCoeffs[i, j])
365 elif linearityType == 'Spline':
366 linearizer.linearityCoeffs[ampName] = np.array(self.splineCoeffs)
367 elif linearityType == 'DoubleSpline':
368 linearizer.linearityCoeffs[ampName] = np.asarray(self.doubleSplineCoeffs)
370 linearizer.inputGain[ampName] = 1.0
371 linearizer.inputTurnoff[ampName] = 100_000.0
372 linearizer.linearityType[ampName] = linearityType
373 linearizer.linearityBBox[ampName] = ampBox
374 linearizer.inputAbscissa[ampName] = np.array([])
375 linearizer.inputOrdinate[ampName] = np.array([])
376 linearizer.inputMask[ampName] = np.array([], dtype=np.bool_)
377 linearizer.inputGroupingIndex[ampName] = np.array([], dtype=np.int64)
378 linearizer.inputNormalization[ampName] = np.array([])
379 linearizer.fitParams[ampName] = np.array([])
380 linearizer.fitParamsErr[ampName] = np.array([])
381 linearizer.fitChiSq[ampName] = np.nan
382 linearizer.fitResiduals[ampName] = np.array([])
383 linearizer.fitResidualsSigmaMad[ampName] = np.nan
384 linearizer.fitResidualsUnmasked[ampName] = np.array([])
385 linearizer.fitResidualsModel[ampName] = np.array([])
386 linearizer.linearFit[ampName] = np.array([])
387 linearizer.linearityTurnoff[ampName] = np.nan
388 linearizer.linearityMaxSignal[ampName] = np.nan
390 return linearizer
393class MemoryTester(lsst.utils.tests.MemoryTestCase):
394 pass
397def setup_module(module):
398 lsst.utils.tests.init()
401if __name__ == "__main__": 401 ↛ 402line 401 didn't jump to line 402 because the condition on line 401 was never true
402 lsst.utils.tests.init()
403 unittest.main()