Coverage for tests / test_ptcAdjustGainRatios.py: 8%

411 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-18 09:18 +0000

1# 

2# LSST Data Management System 

3# 

4# Copyright 2008-2017 AURA/LSST. 

5# 

6# This product includes software developed by the 

7# LSST Project (http://www.lsst.org/). 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the LSST License Statement and 

20# the GNU General Public License along with this program. If not, 

21# see <https://www.lsstcorp.org/LegalNotices/>. 

22# 

23"""Test cases for cp_pipe PTC gain adjustment code.""" 

24 

25import copy 

26import logging 

27import unittest 

28import numpy as np 

29from matplotlib.figure import Figure 

30from scipy.interpolate import Akima1DInterpolator 

31 

32import lsst.utils.tests 

33 

34import lsst.afw.cameraGeom 

35from lsst.afw.image import ExposureF 

36from lsst.cp.pipe import CpMeasureGainCorrectionTask 

37from lsst.cp.pipe.ptc import PhotonTransferCurveAdjustGainRatiosTask 

38from lsst.cp.pipe.ptc.cpPtcAdjustGainRatios import _compute_gain_ratios 

39from lsst.cp.pipe.utils import bin_flat 

40from lsst.ip.isr import IsrMockLSST, PhotonTransferCurveDataset 

41from lsst.pipe.base import InMemoryDatasetHandle 

42 

43 

44class PtcAdjustGainRatiosTestCase(lsst.utils.tests.TestCase): 

45 def setUp(self): 

46 mock = IsrMockLSST() 

47 self.camera = mock.getCamera() 

48 self.detector = self.camera[20] 

49 self.amp_names = [amp.getName() for amp in self.detector] 

50 

51 rng = np.random.RandomState(seed=12345) 

52 

53 self.gain_true = {} 

54 for amp_name in self.amp_names: 

55 self.gain_true[amp_name] = float(rng.normal(loc=1.5, scale=0.05, size=1)[0]) 

56 

57 def _gain_correct_flat(self, flat, ptc, adjust_ratios): 

58 """Gain correct a flat to a biased and debiased version. 

59 

60 Parameters 

61 ---------- 

62 flat : `lsst.afw.image.Exposure` 

63 Flat to correct. 

64 ptc : `lsst.ip.isr.PhotonTransferCurveDataset` 

65 PTC dataset with unadjusted gains. 

66 adjust_ratios : `np.ndarray` 

67 Array of adjustment ratios. 

68 

69 Returns 

70 ------- 

71 flat_biased : `lsst.afw.image.Exposure` 

72 Flat with the original unadjusted gains. 

73 flat_debiased : `lsst.afw.image.Exposure` 

74 Flat with adjusted gains. 

75 """ 

76 flat_biased = flat.clone() 

77 flat_debiased = flat.clone() 

78 for i, amp_name in enumerate(self.amp_names): 

79 bbox = self.detector[amp_name].getBBox() 

80 if amp_name in ptc.badAmps: 

81 flat_biased[bbox].image.array[:, :] = np.nan 

82 flat_debiased[bbox].image.array[:, :] = np.nan 

83 continue 

84 

85 flat_biased[bbox].image.array[:, :] *= ptc.gainUnadjusted[amp_name] 

86 flat_debiased[bbox].image.array[:, :] *= (ptc.gainUnadjusted[amp_name] / adjust_ratios[i]) 

87 

88 return flat_biased, flat_debiased 

89 

90 def test_compute_gain_ratios_small_gradient(self): 

91 fixed_amp_num = 2 

92 

93 flat, radial_gradient, planar_gradient = _get_adjusted_flat( 

94 self.detector, 

95 self.gain_true, 

96 10000.0, 

97 0.98, 

98 -0.00005, 

99 0.00001, 

100 ) 

101 

102 ptc = PhotonTransferCurveDataset() 

103 ptc.ampNames = self.amp_names 

104 ptc.badAmps = [] 

105 

106 gain_biased = copy.copy(self.gain_true) 

107 gain_biased[self.amp_names[0]] *= 1.002 

108 gain_biased[self.amp_names[4]] *= 0.998 

109 

110 # These are the gains recorded in the PTC. 

111 for amp_name in self.amp_names: 

112 ptc.gain[amp_name] = gain_biased[amp_name] 

113 ptc.gainUnadjusted[amp_name] = gain_biased[amp_name] 

114 ptc.noise[amp_name] = 10.0 

115 ptc.ptcTurnoff[amp_name] = 50000.0 

116 

117 with self.assertNoLogs(level=logging.WARNING): 

118 binned = bin_flat(ptc, flat.clone(), bin_factor=2, amp_boundary=0, apply_gains=True) 

119 lo, hi = np.nanpercentile(binned["value"], [5.0, 95.0]) 

120 lo *= 0.8 

121 hi *= 1.2 

122 use = (np.isfinite(binned["value"]) & (binned["value"] >= lo) & (binned["value"] <= hi)) 

123 binned = binned[use] 

124 

125 adjust_ratios = _compute_gain_ratios( 

126 flat.getDetector(), 

127 binned, 

128 fixed_amp_num, 

129 radial_gradient_n_spline_nodes=3, 

130 ) 

131 

132 # Make two flats with the old adjustment and the new adjustment. 

133 flat_biased, flat_debiased = self._gain_correct_flat(flat, ptc, adjust_ratios) 

134 

135 # Remove the gradients... 

136 flat_biased.image.array[:, :] /= radial_gradient 

137 flat_biased.image.array[:, :] /= planar_gradient 

138 flat_debiased.image.array[:, :] /= radial_gradient 

139 flat_debiased.image.array[:, :] /= planar_gradient 

140 

141 # Insist that the standard deviation decreases after 

142 # removing gradients. 

143 self.assertLess( 

144 np.std(flat_debiased.image.array.ravel()), 

145 np.std(flat_biased.image.array.ravel()), 

146 ) 

147 

148 def test_compute_gain_ratios_large_gradient(self): 

149 fixed_amp_num = 2 

150 

151 flat, radial_gradient, planar_gradient = _get_adjusted_flat( 

152 self.detector, 

153 self.gain_true, 

154 10000.0, 

155 0.6, 

156 -0.00005, 

157 0.00001, 

158 ) 

159 

160 ptc = PhotonTransferCurveDataset() 

161 ptc.ampNames = self.amp_names 

162 ptc.badAmps = [] 

163 

164 gain_biased = copy.copy(self.gain_true) 

165 gain_biased[self.amp_names[0]] *= 1.002 

166 gain_biased[self.amp_names[4]] *= 0.998 

167 

168 # These are the gains recorded in the PTC. 

169 for amp_name in self.amp_names: 

170 ptc.gain[amp_name] = gain_biased[amp_name] 

171 ptc.gainUnadjusted[amp_name] = gain_biased[amp_name] 

172 ptc.noise[amp_name] = 10.0 

173 ptc.ptcTurnoff[amp_name] = 50000.0 

174 

175 with self.assertNoLogs(level=logging.WARNING): 

176 binned = bin_flat(ptc, flat.clone(), bin_factor=2, amp_boundary=0, apply_gains=True) 

177 lo, hi = np.nanpercentile(binned["value"], [5.0, 95.0]) 

178 lo *= 0.8 

179 hi *= 1.2 

180 use = (np.isfinite(binned["value"]) & (binned["value"] >= lo) & (binned["value"] <= hi)) 

181 binned = binned[use] 

182 

183 adjust_ratios = _compute_gain_ratios( 

184 flat.getDetector(), 

185 binned, 

186 fixed_amp_num, 

187 radial_gradient_n_spline_nodes=3, 

188 ) 

189 

190 # Make two flats with the old adjustment and the new adjustment. 

191 flat_biased, flat_debiased = self._gain_correct_flat(flat, ptc, adjust_ratios) 

192 

193 # Remove the gradients... 

194 flat_biased.image.array[:, :] /= radial_gradient 

195 flat_biased.image.array[:, :] /= planar_gradient 

196 flat_debiased.image.array[:, :] /= radial_gradient 

197 flat_debiased.image.array[:, :] /= planar_gradient 

198 

199 # Insist that the standard deviation decreases after 

200 # removing gradients. 

201 self.assertLess( 

202 np.std(flat_debiased.image.array.ravel()), 

203 np.std(flat_biased.image.array.ravel()), 

204 ) 

205 

206 def test_compute_gain_ratios_bad_amp(self): 

207 fixed_amp_num = 2 

208 

209 flat, radial_gradient, planar_gradient = _get_adjusted_flat( 

210 self.detector, 

211 self.gain_true, 

212 10000.0, 

213 0.6, 

214 -0.00005, 

215 0.00001, 

216 bad_amps=[self.amp_names[1]], 

217 ) 

218 

219 ptc = PhotonTransferCurveDataset() 

220 ptc.ampNames = self.amp_names 

221 ptc.badAmps = [self.amp_names[1]] 

222 

223 gain_biased = copy.copy(self.gain_true) 

224 gain_biased[self.amp_names[0]] *= 1.002 

225 gain_biased[self.amp_names[4]] *= 0.998 

226 

227 # These are the gains recorded in the PTC. 

228 for amp_name in self.amp_names: 

229 ptc.gain[amp_name] = gain_biased[amp_name] 

230 ptc.gainUnadjusted[amp_name] = gain_biased[amp_name] 

231 ptc.noise[amp_name] = 10.0 

232 ptc.ptcTurnoff[amp_name] = 50000.0 

233 

234 with self.assertNoLogs(level=logging.WARNING): 

235 binned = bin_flat(ptc, flat.clone(), bin_factor=2, amp_boundary=0, apply_gains=True) 

236 lo, hi = np.nanpercentile(binned["value"], [5.0, 95.0]) 

237 lo *= 0.8 

238 hi *= 1.2 

239 use = (np.isfinite(binned["value"]) & (binned["value"] >= lo) & (binned["value"] <= hi)) 

240 binned = binned[use] 

241 

242 adjust_ratios = _compute_gain_ratios( 

243 flat.getDetector(), 

244 binned, 

245 fixed_amp_num, 

246 radial_gradient_n_spline_nodes=3, 

247 ) 

248 

249 # Make two flats with the old adjustment and the new adjustment. 

250 flat_biased, flat_debiased = self._gain_correct_flat(flat, ptc, adjust_ratios) 

251 

252 # Remove the gradients... 

253 flat_biased.image.array[:, :] /= radial_gradient 

254 flat_biased.image.array[:, :] /= planar_gradient 

255 flat_debiased.image.array[:, :] /= radial_gradient 

256 flat_debiased.image.array[:, :] /= planar_gradient 

257 

258 # Insist that the standard deviation decreases after 

259 # removing gradients. 

260 # Note that this contains nans which must be filtered. 

261 self.assertLess( 

262 np.nanstd(flat_debiased.image.array.ravel()), 

263 np.nanstd(flat_biased.image.array.ravel()), 

264 ) 

265 

266 def test_compute_gain_ratios_weird_amp(self): 

267 fixed_amp_num = 2 

268 

269 # The weird amps have a large offset but are not known 

270 # as bad. 

271 flat, radial_gradient, planar_gradient = _get_adjusted_flat( 

272 self.detector, 

273 self.gain_true, 

274 10000.0, 

275 0.6, 

276 -0.00005, 

277 0.00001, 

278 bad_amps=[self.amp_names[1]], 

279 ) 

280 

281 ptc = PhotonTransferCurveDataset() 

282 ptc.ampNames = self.amp_names 

283 

284 gain_biased = copy.copy(self.gain_true) 

285 gain_biased[self.amp_names[0]] *= 1.002 

286 gain_biased[self.amp_names[4]] *= 0.998 

287 

288 # These are the gains recorded in the PTC. 

289 for amp_name in self.amp_names: 

290 ptc.gain[amp_name] = gain_biased[amp_name] 

291 ptc.gainUnadjusted[amp_name] = gain_biased[amp_name] 

292 ptc.noise[amp_name] = 10.0 

293 ptc.ptcTurnoff[amp_name] = 50000.0 

294 

295 max_fractional_gain_ratio = 0.05 

296 with self.assertLogs(level=logging.WARNING) as cm: 

297 binned = bin_flat(ptc, flat.clone(), bin_factor=2, amp_boundary=0, apply_gains=True) 

298 lo, hi = np.nanpercentile(binned["value"], [5.0, 95.0]) 

299 lo *= 0.8 

300 hi *= 1.2 

301 use = (np.isfinite(binned["value"]) & (binned["value"] >= lo) & (binned["value"] <= hi)) 

302 binned = binned[use] 

303 

304 adjust_ratios = _compute_gain_ratios( 

305 flat.getDetector(), 

306 binned, 

307 fixed_amp_num, 

308 radial_gradient_n_spline_nodes=3, 

309 max_fractional_gain_ratio=max_fractional_gain_ratio, 

310 ) 

311 self.assertIn("Found bad amp", cm.output[0]) 

312 

313 np.testing.assert_array_less(np.abs(adjust_ratios - 1.0), max_fractional_gain_ratio) 

314 

315 # Make two flats with the old adjustment and the new adjustment. 

316 flat_biased, flat_debiased = self._gain_correct_flat(flat, ptc, adjust_ratios) 

317 

318 # Remove the gradients... 

319 flat_biased.image.array[:, :] /= radial_gradient 

320 flat_biased.image.array[:, :] /= planar_gradient 

321 flat_debiased.image.array[:, :] /= radial_gradient 

322 flat_debiased.image.array[:, :] /= planar_gradient 

323 

324 # Insist that the standard deviation decreases after 

325 # removing gradients. 

326 # Note that this contains nans which must be filtered. 

327 self.assertLess( 

328 np.nanstd(flat_debiased.image.array.ravel()), 

329 np.nanstd(flat_biased.image.array.ravel()), 

330 ) 

331 

332 def test_task_run(self): 

333 levels = np.linspace(500.0, 25000.0, 40) 

334 

335 flat_handles = [] 

336 exp_id_pairs = [] 

337 for i, level in enumerate(levels): 

338 exp_id_1 = 100 + 2*i 

339 exp_id_2 = exp_id_1 + 1 

340 

341 data_id_1 = { 

342 "detector": self.detector.getId(), 

343 "exposure": exp_id_1, 

344 } 

345 data_id_2 = { 

346 "detector": self.detector.getId(), 

347 "exposure": exp_id_2, 

348 } 

349 

350 flat, radial_gradient, planar_gradient = _get_adjusted_flat( 

351 self.detector, 

352 self.gain_true, 

353 levels[i], 

354 0.6, 

355 -0.00005, 

356 0.00001, 

357 ) 

358 flat_handles.append(InMemoryDatasetHandle(flat.clone(), dataId=data_id_1)) 

359 flat_handles.append(InMemoryDatasetHandle(flat.clone(), dataId=data_id_2)) 

360 

361 exp_id_pairs.append([exp_id_1, exp_id_2]) 

362 

363 ptc = PhotonTransferCurveDataset() 

364 ptc.ampNames = self.amp_names 

365 ptc.badAmps = [] 

366 

367 gain_biased = copy.copy(self.gain_true) 

368 gain_biased[self.amp_names[0]] *= 1.002 

369 gain_biased[self.amp_names[4]] *= 0.998 

370 

371 # These are the gains recorded in the PTC. 

372 for amp_name in self.amp_names: 

373 ptc.gain[amp_name] = gain_biased[amp_name] 

374 ptc.gainUnadjusted[amp_name] = gain_biased[amp_name] 

375 ptc.noise[amp_name] = 10.0 

376 ptc.ptcTurnoff[amp_name] = 50000.0 

377 

378 ptc.rawMeans[amp_name] = levels 

379 ptc.finalMeans[amp_name] = levels 

380 

381 ptc.inputExpIdPairs[amp_name] = exp_id_pairs 

382 

383 # Add in one noisy amp which will not be considered 

384 # to be the reference amp. 

385 ptc.noise[self.amp_names[-1]] = 15.0 

386 # And add in one low turnoff amp. 

387 ptc.ptcTurnoff[self.amp_names[-2]] = 10000.0 

388 

389 config = PhotonTransferCurveAdjustGainRatiosTask.ConfigClass() 

390 config.bin_factor = 2 

391 config.n_flat = 10 

392 config.amp_boundary = 0 

393 config.radial_gradient_n_spline_nodes = 3 

394 config.n_flat = 10 

395 

396 task = PhotonTransferCurveAdjustGainRatiosTask(config=config) 

397 struct = task.run(exposures=flat_handles, input_ptc=ptc) 

398 

399 output_ptc = struct.output_ptc 

400 summary = struct.output_adjust_summary 

401 

402 self.assertGreater(summary["mean_adu"].min(), config.min_adu) 

403 self.assertLess(summary["mean_adu"].max(), config.max_adu) 

404 

405 self.assertIn("fixed_amp_index", summary.meta.keys()) 

406 self.assertIn("fixed_amp_name", summary.meta.keys()) 

407 self.assertIn("median_correction", summary.meta.keys()) 

408 

409 # In this idealized case, the summary values should be the same 

410 # for every flux level. 

411 adjust_ratios = np.zeros(len(self.amp_names)) 

412 for i, amp_name in enumerate(self.amp_names): 

413 np.testing.assert_array_almost_equal( 

414 summary[f"{amp_name}_gain_ratio"][0], 

415 summary[f"{amp_name}_gain_ratio"], 

416 ) 

417 adjust_ratios[i] = output_ptc.gainUnadjusted[amp_name] / output_ptc.gain[amp_name] 

418 self.assertFloatsAlmostEqual( 

419 adjust_ratios[i], 

420 summary[f"{amp_name}_gain_ratio"] / summary.meta["median_correction"], 

421 rtol=1e-6, 

422 ) 

423 

424 # Confirm this actually works. 

425 flat = flat_handles[0].get() 

426 flat_biased, flat_debiased = self._gain_correct_flat(flat, ptc, adjust_ratios) 

427 

428 # Remove the gradients... 

429 flat_biased.image.array[:, :] /= radial_gradient 

430 flat_biased.image.array[:, :] /= planar_gradient 

431 flat_debiased.image.array[:, :] /= radial_gradient 

432 flat_debiased.image.array[:, :] /= planar_gradient 

433 

434 # Insist that the standard deviation decreases after 

435 # removing gradients. 

436 self.assertLess( 

437 np.std(flat_debiased.image.array.ravel()), 

438 np.std(flat_biased.image.array.ravel()), 

439 ) 

440 

441 def test_task_run_bad_amp(self): 

442 levels = np.linspace(500.0, 25000.0, 40) 

443 

444 bad_amps = [self.amp_names[1]] 

445 

446 flat_handles = [] 

447 exp_id_pairs = [] 

448 for i, level in enumerate(levels): 

449 exp_id_1 = 100 + 2*i 

450 exp_id_2 = exp_id_1 + 1 

451 

452 data_id_1 = { 

453 "detector": self.detector.getId(), 

454 "exposure": exp_id_1, 

455 } 

456 data_id_2 = { 

457 "detector": self.detector.getId(), 

458 "exposure": exp_id_2, 

459 } 

460 

461 flat, radial_gradient, planar_gradient = _get_adjusted_flat( 

462 self.detector, 

463 self.gain_true, 

464 levels[i], 

465 0.6, 

466 -0.00005, 

467 0.00001, 

468 bad_amps=bad_amps, 

469 ) 

470 flat_handles.append(InMemoryDatasetHandle(flat.clone(), dataId=data_id_1)) 

471 flat_handles.append(InMemoryDatasetHandle(flat.clone(), dataId=data_id_2)) 

472 

473 exp_id_pairs.append([exp_id_1, exp_id_2]) 

474 

475 ptc = PhotonTransferCurveDataset() 

476 ptc.ampNames = self.amp_names 

477 ptc.badAmps = bad_amps 

478 

479 gain_biased = copy.copy(self.gain_true) 

480 gain_biased[self.amp_names[0]] *= 1.002 

481 gain_biased[self.amp_names[4]] *= 0.998 

482 

483 # These are the gains recorded in the PTC. 

484 for amp_name in self.amp_names: 

485 ptc.gain[amp_name] = gain_biased[amp_name] 

486 ptc.gainUnadjusted[amp_name] = gain_biased[amp_name] 

487 ptc.noise[amp_name] = 10.0 

488 ptc.ptcTurnoff[amp_name] = 50000.0 

489 

490 ptc.rawMeans[amp_name] = levels 

491 ptc.finalMeans[amp_name] = levels 

492 

493 ptc.inputExpIdPairs[amp_name] = exp_id_pairs 

494 

495 config = PhotonTransferCurveAdjustGainRatiosTask.ConfigClass() 

496 config.bin_factor = 2 

497 config.n_flat = 10 

498 config.amp_boundary = 0 

499 config.radial_gradient_n_spline_nodes = 3 

500 # Make sure the code works when we have fewer flats. 

501 config.n_flat = 100 

502 

503 task = PhotonTransferCurveAdjustGainRatiosTask(config=config) 

504 struct = task.run(exposures=flat_handles, input_ptc=ptc) 

505 

506 output_ptc = struct.output_ptc 

507 summary = struct.output_adjust_summary 

508 

509 self.assertGreater(summary["mean_adu"].min(), config.min_adu) 

510 self.assertLess(summary["mean_adu"].max(), config.max_adu) 

511 

512 self.assertIn("fixed_amp_index", summary.meta.keys()) 

513 self.assertIn("fixed_amp_name", summary.meta.keys()) 

514 self.assertIn("median_correction", summary.meta.keys()) 

515 

516 # In this idealized case, the summary values should be the same 

517 # for every flux level. 

518 adjust_ratios = np.zeros(len(self.amp_names)) 

519 for i, amp_name in enumerate(self.amp_names): 

520 np.testing.assert_array_almost_equal( 

521 summary[f"{amp_name}_gain_ratio"][0], 

522 summary[f"{amp_name}_gain_ratio"], 

523 ) 

524 adjust_ratios[i] = output_ptc.gainUnadjusted[amp_name] / output_ptc.gain[amp_name] 

525 self.assertFloatsAlmostEqual( 

526 adjust_ratios[i], 

527 summary[f"{amp_name}_gain_ratio"] / summary.meta["median_correction"], 

528 rtol=1e-6, 

529 ) 

530 

531 # Confirm this actually works. 

532 flat = flat_handles[0].get() 

533 flat_biased, flat_debiased = self._gain_correct_flat(flat, ptc, adjust_ratios) 

534 

535 # Remove the gradients... 

536 flat_biased.image.array[:, :] /= radial_gradient 

537 flat_biased.image.array[:, :] /= planar_gradient 

538 flat_debiased.image.array[:, :] /= radial_gradient 

539 flat_debiased.image.array[:, :] /= planar_gradient 

540 

541 # Insist that the standard deviation decreases after 

542 # removing gradients. 

543 self.assertLess( 

544 np.nanstd(flat_debiased.image.array.ravel()), 

545 np.nanstd(flat_biased.image.array.ravel()), 

546 ) 

547 

548 

549class MeasureGainCorrectionTestCase(lsst.utils.tests.TestCase): 

550 def setUp(self): 

551 mock = IsrMockLSST() 

552 self.camera = mock.getCamera() 

553 self.detector = self.camera[20] 

554 self.amp_names = [amp.getName() for amp in self.detector] 

555 

556 self.n_radial_nodes = 10 

557 

558 self.gain_ref = {} 

559 for amp_name in self.amp_names: 

560 self.gain_ref[amp_name] = 1.0 

561 

562 self.ptc = PhotonTransferCurveDataset() 

563 self.ptc.ampNames = self.amp_names 

564 self.ptc.badAmps = [] 

565 

566 for amp_name in self.amp_names: 

567 self.ptc.gain[amp_name] = self.gain_ref[amp_name] 

568 self.ptc.noise[amp_name] = 10.0 

569 self.ptc.ptcTurnoff[amp_name] = 50000.0 

570 

571 def _check_corrected_ratios( 

572 self, 

573 ref_flat, 

574 input_flat, 

575 gain_adjust_truth, 

576 gain_correction, 

577 check_sig_corr=False, 

578 gain_adjust_atol=1e-5, 

579 ): 

580 """ 

581 Check corrected ratio images for uniformity. 

582 

583 Parameters 

584 ---------- 

585 ref_flat : `lsst.afw.image.Exposure` 

586 Reference flat. 

587 input_flat : `lsst.afw.image.Exposure` 

588 Input flat. 

589 gain_adjust_truth : `dict` [`str`, `float`] 

590 Dictionary of gains used; adjustment is relative to gain_ref. 

591 gain_correction : `lsst.ip.isr.GainCorrection` 

592 Gain correction object. 

593 check_sig_corr : `bool`, optional 

594 Check the value of the corrected ratio sigma? 

595 gain_adjust_atol : `float`, optional 

596 Tolerance for testing gain adjustments. 

597 """ 

598 ratio = input_flat.image.array / ref_flat.image.array 

599 

600 flat_corr = input_flat.clone() 

601 for i, amp in enumerate(self.detector): 

602 flat_corr[amp.getBBox()].image.array /= gain_correction.gainAdjustments[i] 

603 

604 ratio_corr = flat_corr.image.array / ref_flat.image.array 

605 

606 self.assertLess(np.nanstd(ratio_corr), np.nanstd(ratio)) 

607 

608 if check_sig_corr: 

609 self.assertLess(np.std(ratio_corr), 1e-6) 

610 

611 gain_adjustments_truth = np.asarray( 

612 [self.gain_ref[amp.getName()] / gain_adjust_truth[amp.getName()] for amp in self.detector] 

613 ) 

614 

615 finite = np.isfinite(gain_adjustments_truth) 

616 self.assertFloatsAlmostEqual( 

617 gain_correction.gainAdjustments[finite], 

618 gain_adjustments_truth[finite], 

619 atol=gain_adjust_atol, 

620 ) 

621 

622 def test_task_run_matched_gradient(self): 

623 # This test shows what happens if we have matched 

624 # gradients between the reference flat and the 

625 # input flat. 

626 

627 ref_flat, _, _ = _get_adjusted_flat( 

628 self.detector, 

629 self.gain_ref, 

630 1.0, 

631 0.98, 

632 0.0002, 

633 -0.0002, 

634 ) 

635 

636 gain_adj = copy.copy(self.gain_ref) 

637 gain_adj[self.amp_names[3]] = 1.002 

638 gain_adj[self.amp_names[6]] = 0.998 

639 

640 input_flat, _, _ = _get_adjusted_flat( 

641 self.detector, 

642 gain_adj, 

643 1.0, 

644 0.98, 

645 0.0002, 

646 -0.0002, 

647 ) 

648 

649 # Do the first fit with no additional gradient fitting. 

650 config = CpMeasureGainCorrectionTask.ConfigClass() 

651 config.radial_gradient_n_spline_nodes = 3 

652 config.bin_factor = 2 

653 config.amp_boundary = 0 

654 config.chebyshev_gradient_order = 0 

655 config.do_remove_radial_gradient = False 

656 

657 task = CpMeasureGainCorrectionTask(config=config) 

658 struct = task.run( 

659 input_reference_flat=ref_flat, 

660 input_reference_ptc=self.ptc, 

661 input_flat=input_flat, 

662 ) 

663 

664 self._check_corrected_ratios( 

665 ref_flat, 

666 input_flat, 

667 gain_adj, 

668 struct.output_gain_correction, 

669 check_sig_corr=True, 

670 ) 

671 

672 # Second fit with defaults. 

673 config = CpMeasureGainCorrectionTask.ConfigClass() 

674 config.radial_gradient_n_spline_nodes = 3 

675 config.bin_factor = 2 

676 config.amp_boundary = 0 

677 config.chebyshev_gradient_order = 1 

678 config.do_remove_radial_gradient = True 

679 

680 task = CpMeasureGainCorrectionTask(config=config) 

681 struct = task.run( 

682 input_reference_flat=ref_flat, 

683 input_reference_ptc=self.ptc, 

684 input_flat=input_flat, 

685 ) 

686 

687 self._check_corrected_ratios( 

688 ref_flat, 

689 input_flat, 

690 gain_adj, 

691 struct.output_gain_correction, 

692 gain_adjust_atol=3e-3, 

693 ) 

694 

695 self.assertIsInstance(struct.output_flat_ratio_plot, Figure) 

696 

697 def test_task_run_mismatched_spline(self): 

698 # This test shows what happens if we have matched 

699 # gradients but different spline between the reference flat and the 

700 # input flat. 

701 

702 ref_flat, _, _ = _get_adjusted_flat( 

703 self.detector, 

704 self.gain_ref, 

705 1.0, 

706 0.98, 

707 0.0002, 

708 -0.0002, 

709 ) 

710 

711 gain_adj = copy.copy(self.gain_ref) 

712 gain_adj[self.amp_names[3]] = 1.002 

713 gain_adj[self.amp_names[6]] = 0.998 

714 

715 input_flat, _, _ = _get_adjusted_flat( 

716 self.detector, 

717 gain_adj, 

718 1.0, 

719 0.97, 

720 0.0002, 

721 -0.0002, 

722 ) 

723 

724 # Fit with defaults. 

725 config = CpMeasureGainCorrectionTask.ConfigClass() 

726 config.radial_gradient_n_spline_nodes = 3 

727 config.bin_factor = 2 

728 config.amp_boundary = 0 

729 config.chebyshev_gradient_order = 1 

730 config.do_remove_radial_gradient = True 

731 

732 task = CpMeasureGainCorrectionTask(config=config) 

733 struct = task.run( 

734 input_reference_flat=ref_flat, 

735 input_reference_ptc=self.ptc, 

736 input_flat=input_flat, 

737 ) 

738 

739 self._check_corrected_ratios( 

740 ref_flat, 

741 input_flat, 

742 gain_adj, 

743 struct.output_gain_correction, 

744 gain_adjust_atol=3e-3, 

745 ) 

746 

747 def test_task_run_mismatched_gradient(self): 

748 # This test shows what happens if we have matched 

749 # spline but different gradient between the reference flat and the 

750 # input flat. 

751 

752 ref_flat, _, _ = _get_adjusted_flat( 

753 self.detector, 

754 self.gain_ref, 

755 1.0, 

756 0.98, 

757 0.0002, 

758 -0.0002, 

759 ) 

760 

761 gain_adj = copy.copy(self.gain_ref) 

762 gain_adj[self.amp_names[3]] = 1.002 

763 gain_adj[self.amp_names[6]] = 0.998 

764 

765 input_flat, _, _ = _get_adjusted_flat( 

766 self.detector, 

767 gain_adj, 

768 1.0, 

769 0.98, 

770 0.0004, 

771 -0.0003, 

772 ) 

773 

774 # Fit with defaults. 

775 config = CpMeasureGainCorrectionTask.ConfigClass() 

776 config.radial_gradient_n_spline_nodes = 3 

777 config.bin_factor = 2 

778 config.amp_boundary = 0 

779 config.chebyshev_gradient_order = 1 

780 config.do_remove_radial_gradient = True 

781 

782 task = CpMeasureGainCorrectionTask(config=config) 

783 struct = task.run( 

784 input_reference_flat=ref_flat, 

785 input_reference_ptc=self.ptc, 

786 input_flat=input_flat, 

787 ) 

788 

789 self._check_corrected_ratios( 

790 ref_flat, 

791 input_flat, 

792 gain_adj, 

793 struct.output_gain_correction, 

794 gain_adjust_atol=3e-3, 

795 ) 

796 

797 def test_task_run_mismatched_both(self): 

798 # This test shows what happens if we have matched 

799 # spline and different gradient between the reference flat and the 

800 # input flat. 

801 

802 ref_flat, _, _ = _get_adjusted_flat( 

803 self.detector, 

804 self.gain_ref, 

805 1.0, 

806 0.98, 

807 0.0002, 

808 -0.0002, 

809 ) 

810 

811 gain_adj = copy.copy(self.gain_ref) 

812 gain_adj[self.amp_names[3]] = 1.002 

813 gain_adj[self.amp_names[6]] = 0.998 

814 

815 input_flat, _, _ = _get_adjusted_flat( 

816 self.detector, 

817 gain_adj, 

818 1.0, 

819 0.97, 

820 0.0004, 

821 -0.0003, 

822 ) 

823 

824 # Fit with defaults. 

825 config = CpMeasureGainCorrectionTask.ConfigClass() 

826 config.radial_gradient_n_spline_nodes = 3 

827 config.bin_factor = 2 

828 config.amp_boundary = 0 

829 config.chebyshev_gradient_order = 1 

830 config.do_remove_radial_gradient = True 

831 

832 task = CpMeasureGainCorrectionTask(config=config) 

833 struct = task.run( 

834 input_reference_flat=ref_flat, 

835 input_reference_ptc=self.ptc, 

836 input_flat=input_flat, 

837 ) 

838 

839 self._check_corrected_ratios( 

840 ref_flat, 

841 input_flat, 

842 gain_adj, 

843 struct.output_gain_correction, 

844 gain_adjust_atol=3e-3, 

845 ) 

846 

847 def test_task_run_mismatched_both_bad_amp(self): 

848 # This test shows what happens if we have matched 

849 # spline and different gradient between the reference flat and the 

850 # input flat. 

851 

852 ref_flat, _, _ = _get_adjusted_flat( 

853 self.detector, 

854 self.gain_ref, 

855 1.0, 

856 0.98, 

857 0.0002, 

858 -0.0002, 

859 ) 

860 

861 gain_adj = copy.copy(self.gain_ref) 

862 gain_adj[self.amp_names[3]] = 1.002 

863 gain_adj[self.amp_names[6]] = 0.998 

864 gain_adj[self.amp_names[0]] = np.nan 

865 

866 input_flat, _, _ = _get_adjusted_flat( 

867 self.detector, 

868 gain_adj, 

869 1.0, 

870 0.97, 

871 0.0004, 

872 -0.0003, 

873 ) 

874 

875 # Fit with defaults. 

876 config = CpMeasureGainCorrectionTask.ConfigClass() 

877 config.radial_gradient_n_spline_nodes = 3 

878 config.bin_factor = 2 

879 config.amp_boundary = 0 

880 config.chebyshev_gradient_order = 1 

881 config.do_remove_radial_gradient = True 

882 

883 task = CpMeasureGainCorrectionTask(config=config) 

884 struct = task.run( 

885 input_reference_flat=ref_flat, 

886 input_reference_ptc=self.ptc, 

887 input_flat=input_flat, 

888 ) 

889 

890 self._check_corrected_ratios( 

891 ref_flat, 

892 input_flat, 

893 gain_adj, 

894 struct.output_gain_correction, 

895 gain_adjust_atol=3e-3, 

896 ) 

897 

898 

899def _get_adjusted_flat( 

900 detector, 

901 gain_dict, 

902 normalization, 

903 radial_gradient_outer_value, 

904 planar_gradient_x, 

905 planar_gradient_y, 

906 bad_amps=[], 

907 n_radial_nodes=10, 

908): 

909 """Get an adjusted flat. 

910 

911 Parameters 

912 ---------- 

913 detector : `lsst.afw.cameraGeom.Detector` 

914 Detector object. 

915 gain_dict : `dict` [`float`] 

916 Dictionary of amp names to gains. 

917 normalization : `float` 

918 Normalization value. 

919 radial_gradient_outer_value : `float` 

920 Value at max radius (should be < 1.0). 

921 planar_gradient_x : `float` 

922 Planar gradient x slope. 

923 planar_gradient_y : `float` 

924 Planar gradient y slope. 

925 bad_amps : `list` [`str`], optional 

926 List of bad amp names. 

927 n_radial_nodes : `int`, optional 

928 Number of radial nodes. 

929 

930 Returns 

931 ------- 

932 flat : `lsst.afw.image.Exposure` 

933 Flat image generated. 

934 radial_gradient : `np.ndarray` 

935 Radial gradient image applied. 

936 planar_gradient : `np.ndarray` 

937 Planar gradient image applied. 

938 """ 

939 flat = ExposureF(detector.getBBox()) 

940 flat.setDetector(detector) 

941 

942 xx = np.arange(flat.image.array.shape[1], dtype=np.float64) 

943 yy = np.arange(flat.image.array.shape[0], dtype=np.float64) 

944 x, y = np.meshgrid(xx, yy) 

945 x = x.ravel() 

946 y = y.ravel() 

947 

948 transform = detector.getTransform(lsst.afw.cameraGeom.PIXELS, lsst.afw.cameraGeom.FOCAL_PLANE) 

949 xy = np.vstack((x, y)) 

950 xf, yf = np.vsplit(transform.getMapping().applyForward(xy), 2) 

951 xf = xf.ravel() 

952 yf = yf.ravel() 

953 

954 radius = np.sqrt(xf**2. + yf**2.) 

955 

956 nodes = np.linspace(radius.min(), radius.max(), n_radial_nodes) 

957 spline_values = np.linspace(1.0, radial_gradient_outer_value, n_radial_nodes) 

958 

959 spl = Akima1DInterpolator(nodes, spline_values, method="akima") 

960 radial_gradient = spl(radius).reshape(flat.image.array.shape) 

961 flat.image.array[:, :] = radial_gradient.copy() 

962 

963 flat.image.array[:, :] *= normalization 

964 

965 # Add a bit of a planar gradient; approx +/- 1%. 

966 planar_gradient = ( 

967 1 

968 - 0.00005 * (x - detector.getBBox().getCenter().getX()) 

969 + 0.00001 * (y - detector.getBBox().getCenter().getY()) 

970 ).reshape(flat.image.array.shape) 

971 

972 flat.image.array[:, :] *= planar_gradient 

973 

974 for amp in detector: 

975 flat.image[amp.getBBox()].array[:, :] /= gain_dict[amp.getName()] 

976 

977 # Make the bad amps goofy. 

978 for bad_amp in bad_amps: 

979 flat.image[detector[bad_amp].getBBox()].array[:, :] *= 0.1 

980 

981 return flat, radial_gradient, planar_gradient 

982 

983 

984class TestMemory(lsst.utils.tests.MemoryTestCase): 

985 pass 

986 

987 

988def setup_module(module): 

989 lsst.utils.tests.init() 

990 

991 

992if __name__ == "__main__": 992 ↛ 993line 992 didn't jump to line 993 because the condition on line 992 was never true

993 lsst.utils.tests.init() 

994 unittest.main()