Coverage for python / lsst / ip / isr / ampOffset.py: 13%

211 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-28 08:55 +0000

1# This file is part of ip_isr. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22__all__ = ["AmpOffsetConfig", "AmpOffsetTask"] 

23 

24import warnings 

25 

26import numpy as np 

27 

28from lsst.afw.math import MEANCLIP, StatisticsControl, makeStatistics 

29from lsst.afw.table import SourceTable 

30from lsst.meas.algorithms import SourceDetectionTask, SubtractBackgroundTask 

31from lsst.meas.algorithms.subtractBackground import TooManyMaskedPixelsError 

32from lsst.pex.config import Config, ConfigurableField, Field 

33from lsst.pipe.base import Struct, Task 

34 

35 

36class AmpOffsetConfig(Config): 

37 """Configuration parameters for AmpOffsetTask.""" 

38 

39 def setDefaults(self): 

40 self.background.algorithm = "AKIMA_SPLINE" 

41 self.background.useApprox = False 

42 self.background.ignoredPixelMask = [ 

43 "BAD", 

44 "SAT", 

45 "INTRP", 

46 "CR", 

47 "EDGE", 

48 "DETECTED", 

49 "DETECTED_NEGATIVE", 

50 "SUSPECT", 

51 "NO_DATA", 

52 ] 

53 self.detection.reEstimateBackground = False 

54 

55 # This maintains existing behavior and test values after DM-39796. 

56 self.detection.thresholdType = "stdev" 

57 

58 ampEdgeInset = Field( 

59 doc="Number of pixels the amp edge strip is inset from the amp edge. A thin strip of pixels running " 

60 "parallel to the edge of the amp is used to characterize the average flux level at the amp edge.", 

61 dtype=int, 

62 default=5, 

63 ) 

64 ampEdgeWidth = Field( 

65 doc="Pixel width of the amp edge strip, starting at ampEdgeInset and extending inwards.", 

66 dtype=int, 

67 default=64, 

68 ) 

69 ampEdgeMinFrac = Field( 

70 doc="Minimum allowed fraction of viable pixel rows along an amp edge. No amp offset estimate will be " 

71 "generated for amp edges that do not have at least this fraction of unmasked pixel rows.", 

72 dtype=float, 

73 default=0.5, 

74 ) 

75 ampEdgeMaxOffset = Field( 

76 doc="Maximum allowed amp offset ADU value. If a measured amp offset value is larger than this, the " 

77 "result will be discarded and therefore not used to determine amp pedestal corrections.", 

78 dtype=float, 

79 default=5.0, 

80 ) 

81 doWindowSmoothing = Field( 

82 doc="Smooth amp edge differences by taking a rolling average.", 

83 dtype=bool, 

84 default=True, 

85 ) 

86 ampEdgeWindowFrac = Field( 

87 doc="Fraction of the amp edge lengths utilized as the sliding window for generating rolling average " 

88 "amp offset values. It should be reconfigured for every instrument (HSC, LSSTCam, etc.) and should " 

89 "not exceed 1. If not provided, it defaults to the fraction that recovers the pixel size of the " 

90 "sliding window used in obs_subaru for compatibility with existing HSC data. Only relevant if " 

91 "`doWindowSmoothing` is set to True.", 

92 dtype=float, 

93 default=512 / 4176, 

94 ) 

95 doBackground = Field( 

96 doc="Estimate and subtract background prior to amp offset estimation?", 

97 dtype=bool, 

98 default=True, 

99 ) 

100 background = ConfigurableField( 

101 doc="An initial background estimation step run prior to amp offset calculation.", 

102 target=SubtractBackgroundTask, 

103 ) 

104 backgroundFractionSample = Field( 

105 doc="The fraction of the shorter side of the amplifier used for background binning.", 

106 dtype=float, 

107 default=1.0, 

108 ) 

109 doDetection = Field( 

110 doc="Detect sources and update cloned exposure prior to amp offset estimation?", 

111 dtype=bool, 

112 default=True, 

113 ) 

114 detection = ConfigurableField( 

115 doc="Source detection to add temporary detection footprints prior to amp offset calculation.", 

116 target=SourceDetectionTask, 

117 ) 

118 applyWeights = Field( 

119 doc="Weights the amp offset calculation by the length of the interface between amplifiers. Applying " 

120 "weights does not affect outcomes for amplifiers in a 2D grid with square-shaped amplifiers or in " 

121 "any 1D layout on a detector, regardless of whether the amplifiers are square.", 

122 dtype=bool, 

123 default=True, 

124 ) 

125 doApplyAmpOffset = Field( 

126 doc="Apply amp offset corrections to the input exposure?", 

127 dtype=bool, 

128 default=False, 

129 ) 

130 

131 def validate(self): 

132 super().validate() 

133 if self.ampEdgeWindowFrac > 1: 

134 raise RuntimeError( 

135 f"The amp edge window fraction ('ampEdgeWindowFrac') is set to {self.ampEdgeWindowFrac}. " 

136 "Values greater than 1 lead to post-convolution complications in getInterfaceOffset. " 

137 "Please modify this value to be 1 or less." 

138 ) 

139 

140 

141class AmpOffsetTask(Task): 

142 """Calculate and apply amp offset corrections to an exposure.""" 

143 

144 ConfigClass = AmpOffsetConfig 

145 _DefaultName = "isrAmpOffset" 

146 

147 def __init__(self, *args, **kwargs): 

148 super().__init__(*args, **kwargs) 

149 # Always load background subtask, even if doBackground=False; 

150 # this allows for default plane bit masks to be defined. 

151 self.makeSubtask("background") 

152 if self.config.doDetection: 

153 self.makeSubtask("detection") 

154 # Initialize all of the instance variables here. 

155 self.shortAmpSide = 0 

156 

157 def run(self, exposure): 

158 """Calculate amp offset values, determine corrective pedestals for each 

159 amp, and update the input exposure in-place. 

160 

161 Parameters 

162 ---------- 

163 exposure: `lsst.afw.image.Exposure` 

164 Exposure to be corrected for amp offsets. 

165 """ 

166 

167 # Generate an exposure clone to work on and establish the bit mask. 

168 exp = exposure.clone() 

169 bitMask = exp.mask.getPlaneBitMask(self.background.config.ignoredPixelMask) 

170 amps = exp.getDetector().getAmplifiers() 

171 

172 # Check that all amps have the same geometry. 

173 ampDims = [amp.getBBox().getDimensions() for amp in amps] 

174 if not all(dim == ampDims[0] for dim in ampDims): 

175 raise RuntimeError("All amps should have the same geometry.") 

176 else: 

177 # The zeroth amp is representative of all amps in the detector. 

178 self.ampDims = ampDims[0] 

179 # Dictionary mapping side numbers to interface lengths. 

180 # See `getAmpAssociations()` for details about sides. 

181 self.interfaceLengthLookupBySide = {i: self.ampDims[i % 2] for i in range(4)} 

182 

183 # Determine amplifier geometry. 

184 ampWidths = {amp.getBBox().getWidth() for amp in amps} 

185 ampHeights = {amp.getBBox().getHeight() for amp in amps} 

186 if len(ampWidths) > 1 or len(ampHeights) > 1: 

187 raise NotImplementedError( 

188 "Amp offset correction is not yet implemented for detectors with differing amp sizes." 

189 ) 

190 

191 # Assuming all the amps have the same geometry. 

192 self.shortAmpSide = np.min(ampDims[0]) 

193 

194 # Check that the edge width and inset are not too large. 

195 if self.config.ampEdgeWidth >= self.shortAmpSide - 2 * self.config.ampEdgeInset: 

196 raise RuntimeError( 

197 f"The edge width ({self.config.ampEdgeWidth}) plus insets ({self.config.ampEdgeInset}) " 

198 f"exceed the amp's short side ({self.shortAmpSide}). This setup leads to incorrect results." 

199 ) 

200 

201 # Fit and subtract background. 

202 if self.config.doBackground: 

203 maskedImage = exp.getMaskedImage() 

204 # Assuming all the detectors are the same. 

205 nX = exp.getWidth() // (self.shortAmpSide * self.config.backgroundFractionSample) + 1 

206 nY = exp.getHeight() // (self.shortAmpSide * self.config.backgroundFractionSample) + 1 

207 # This ensures that the `binSize` is as large as possible, 

208 # preventing background subtraction from inadvertently removing the 

209 # amp offset signature. Here it's set to the shorter dimension of 

210 # the amplifier by default (`backgroundFractionSample` = 1), which 

211 # seems reasonable. 

212 try: 

213 bg = self.background.fitBackground(maskedImage, nx=int(nX), ny=int(nY)) 

214 except TooManyMaskedPixelsError: 

215 self.log.warning( 

216 "Background estimation failed due to too many masked pixels; " 

217 "proceeding without background subtraction." 

218 ) 

219 else: 

220 bgImage = bg.getImageF( 

221 self.background.config.algorithm, 

222 self.background.config.undersampleStyle, 

223 ) 

224 maskedImage -= bgImage 

225 

226 # Detect sources and update cloned exposure mask planes in-place. 

227 if self.config.doDetection: 

228 schema = SourceTable.makeMinimalSchema() 

229 table = SourceTable.make(schema) 

230 # Detection sigma, used for smoothing and to grow detections, is 

231 # normally measured from the PSF of the exposure. As the PSF hasn't 

232 # been measured at this stage of processing, sigma is instead 

233 # set to an approximate value here (which should be sufficient). 

234 _ = self.detection.run(table=table, exposure=exp, sigma=2) 

235 

236 # Set up amp offset inputs. 

237 im = exp.image 

238 im.array[(exp.mask.array & bitMask) > 0] = np.nan 

239 

240 # Obtain association and offset matrices. 

241 associations, sides = self.getAmpAssociations(amps) 

242 ampOffsets, interfaceOffsetDict = self.getAmpOffsets(im, amps, associations, sides) 

243 

244 # Safety check: do any pixels remain for amp offset estimation? 

245 if (exp.mask.array & bitMask).all(): 

246 log_fn = self.log.warning if self.config.doApplyAmpOffset else self.log.info 

247 log_fn( 

248 "All pixels masked: cannot calculate any amp offset corrections. " 

249 "All pedestals are being set to zero." 

250 ) 

251 pedestals = np.zeros(len(amps)) 

252 else: 

253 # If least-squares minimization fails, convert NaNs to zeroes, 

254 # ensuring that no values are erroneously added/subtracted. 

255 pedestals = np.nan_to_num(np.linalg.lstsq(associations, ampOffsets, rcond=None)[0]) 

256 

257 metadata = exposure.getMetadata() # Exposure metadata. 

258 self.metadata["AMPOFFSET_PEDESTALS"] = {} # Task metadata. 

259 ampNames = [amp.getName() for amp in amps] 

260 

261 # Add the amp interface offsets to the exposure metadata. 

262 for interfaceId, interfaceOffset in interfaceOffsetDict.items(): 

263 metadata.set( 

264 f"LSST ISR AMPOFFSET INTERFACEOFFSET {interfaceId}", 

265 float(interfaceOffset), 

266 f"Raw amp interface offset calculated for {interfaceId}", 

267 ) 

268 

269 for ampName, amp, pedestal in zip(ampNames, amps, pedestals): 

270 # Add the amp pedestal to the exposure metadata. 

271 metadata.set( 

272 f"LSST ISR AMPOFFSET PEDESTAL {ampName}", 

273 float(pedestal), 

274 f"Pedestal level calculated for amp {ampName}", 

275 ) 

276 if self.config.doApplyAmpOffset and pedestal != 0.0: 

277 ampIm = exposure.image[amp.getBBox()].array 

278 ampIm -= pedestal 

279 # Add the amp pedestal to the "Task" metadata as well. 

280 # Needed for Sasquatch/Chronograf! 

281 self.metadata["AMPOFFSET_PEDESTALS"][ampName] = float(pedestal) 

282 if self.config.doApplyAmpOffset: 

283 status = "subtracted from exposure" 

284 metadata.set("LSST ISR AMPOFFSET PEDESTAL SUBTRACTED", True, "Amp pedestals have been subtracted") 

285 else: 

286 status = "not subtracted from exposure" 

287 metadata.set( 

288 "LSST ISR AMPOFFSET PEDESTAL SUBTRACTED", False, "Amp pedestals have not been subtracted" 

289 ) 

290 ampPedestalReport = ", ".join( 

291 [f"{ampName}: {ampPedestal:.4f}" for (ampName, ampPedestal) in zip(ampNames, pedestals)] 

292 ) 

293 self.log.info(f"Amp pedestal values ({status}): {ampPedestalReport}") 

294 

295 return Struct(pedestals=pedestals) 

296 

297 def getAmpAssociations(self, amps): 

298 """Determine amp geometry and amp associations from a list of 

299 amplifiers. 

300 

301 Parse an input list of amplifiers to determine the layout of amps 

302 within a detector, and identify all amp sides (i.e., the 

303 horizontal and vertical junctions between amps). 

304 

305 Returns a matrix with a shape corresponding to the geometry of the amps 

306 in the detector. 

307 

308 Parameters 

309 ---------- 

310 amps : `list` [`lsst.afw.cameraGeom.Amplifier`] 

311 List of amplifier objects used to deduce associations. 

312 

313 Returns 

314 ------- 

315 ampAssociations : `numpy.ndarray` 

316 An N x N matrix (N = number of amplifiers) that illustrates the 

317 connections between amplifiers within the detector layout. Each row 

318 and column index corresponds to the ampIds of a specific pair of 

319 amplifiers, and the matrix elements indicate their associations as 

320 follows: 

321 

322 * 0: No association 

323 * -1: Association exists (direction specified in the ampSides 

324 matrix) 

325 * n >= 1: Diagonal elements indicate the number of neighboring 

326 amplifiers for the corresponding ampId==row==column number. 

327 

328 ampSides : `numpy.ndarray` 

329 An N x N matrix (N = the number of amplifiers) representing the amp 

330 side information corresponding to the `ampAssociations` 

331 matrix. The elements are integers defined as below: 

332 

333 * -1: No side due to no association or the same amp (diagonals) 

334 * 0: Side on the bottom 

335 * 1: Side on the right 

336 * 2: Side on the top 

337 * 3: Side on the left 

338 """ 

339 xCenters = [amp.getBBox().getCenterX() for amp in amps] 

340 yCenters = [amp.getBBox().getCenterY() for amp in amps] 

341 xIndices = np.ceil(xCenters / np.min(xCenters) / 2).astype(int) - 1 

342 yIndices = np.ceil(yCenters / np.min(yCenters) / 2).astype(int) - 1 

343 

344 nAmps = len(amps) 

345 ampIds = np.zeros((len(set(yIndices)), len(set(xIndices))), dtype=int) 

346 

347 for ampId, xIndex, yIndex in zip(np.arange(nAmps), xIndices, yIndices): 

348 ampIds[yIndex, xIndex] = ampId 

349 

350 ampAssociations = np.zeros((nAmps, nAmps), dtype=int) 

351 ampSides = np.full_like(ampAssociations, -1) 

352 

353 for ampId in ampIds.ravel(): 

354 neighbors, sides = self.getNeighbors(ampIds, ampId) 

355 interfaceWeights = ( 

356 1 

357 if not self.config.applyWeights 

358 else np.array([self.interfaceLengthLookupBySide[side] for side in sides]) 

359 ) 

360 ampAssociations[ampId, neighbors] = -1 * interfaceWeights 

361 ampSides[ampId, neighbors] = sides 

362 ampAssociations[ampId, ampId] = -ampAssociations[ampId].sum() 

363 

364 if ampAssociations.sum() != 0: 

365 raise RuntimeError("The `ampAssociations` array does not sum to zero.") 

366 

367 if not np.all(ampAssociations == ampAssociations.T): 

368 raise RuntimeError("The `ampAssociations` is not symmetric about the diagonal.") 

369 

370 with np.printoptions(linewidth=200): 

371 self.log.debug("amp associations:\n%s", ampAssociations) 

372 self.log.debug("amp sides:\n%s", ampSides) 

373 

374 return ampAssociations, ampSides 

375 

376 def getNeighbors(self, ampIds, ampId): 

377 """Get the neighbor amplifiers and their sides for a given 

378 amplifier. 

379 

380 Parameters 

381 ---------- 

382 ampIds : `numpy.ndarray` 

383 Matrix with amp side association information. 

384 ampId : `int` 

385 The amplifier ID for which neighbor amplifiers and side IDs 

386 are to be found. 

387 

388 Returns 

389 ------- 

390 neighbors : `list` [`int`] 

391 List of neighbor amplifier IDs. 

392 sides : `list` [`int`] 

393 List of side IDs, with each ID corresponding to its respective 

394 neighbor amplifier. 

395 """ 

396 m, n = ampIds.shape 

397 r, c = np.ravel(np.where(ampIds == ampId)) 

398 neighbors, sides = [], [] 

399 sideLookup = { 

400 0: (r + 1, c), 

401 1: (r, c + 1), 

402 2: (r - 1, c), 

403 3: (r, c - 1), 

404 } 

405 for side, (row, column) in sideLookup.items(): 

406 if 0 <= row < m and 0 <= column < n: 

407 neighbors.append(ampIds[row][column]) 

408 sides.append(side) 

409 return neighbors, sides 

410 

411 def getAmpOffsets(self, im, amps, associations, sides): 

412 """Calculate the amp offsets for all amplifiers. 

413 

414 Parameters 

415 ---------- 

416 im : `lsst.afw.image._image.ImageF` 

417 Amplifier image to extract data from. 

418 amps : `list` [`lsst.afw.cameraGeom.Amplifier`] 

419 List of amplifier objects. 

420 associations : numpy.ndarray 

421 An N x N matrix containing amp association information, where N is 

422 the number of amplifiers. 

423 sides : numpy.ndarray 

424 An N x N matrix containing amp side information, where N is the 

425 number of amplifiers. 

426 

427 Returns 

428 ------- 

429 ampOffsets : `numpy.ndarray` 

430 1D float array containing the calculated amp offsets for all 

431 amplifiers (optionally weighted by interface length). 

432 interfaceOffsetDict : `dict` [`str`, `float`] 

433 Dictionary mapping interface IDs to their corresponding raw 

434 (uncapped) offset values. 

435 """ 

436 ampOffsets = np.zeros(len(amps)) 

437 ampsEdges = self.getAmpEdges(im, amps, sides) 

438 ampsNames = [amp.getName() for amp in amps] 

439 interfaceOffsetLookup = {} 

440 

441 interfaceIds = [] 

442 interfaceOffsetOriginals = [] 

443 ampEdgeGoodFracs = [] 

444 minFracFails = [] 

445 maxOffsetFails = [] 

446 for ampId, ampAssociations in enumerate(associations): 

447 ampNeighbors = np.ravel(np.where(ampAssociations < 0)) 

448 for ampNeighbor in ampNeighbors: 

449 ampSide = sides[ampId][ampNeighbor] 

450 interfaceWeight = ( 

451 1 if not self.config.applyWeights else self.interfaceLengthLookupBySide[ampSide] 

452 ) 

453 edgeA = ampsEdges[ampId][ampSide] 

454 edgeB = ampsEdges[ampNeighbor][(ampSide + 2) % 4] 

455 if ampId < ampNeighbor: 

456 ( 

457 interfaceId, 

458 interfaceOffset, 

459 interfaceOffsetOriginal, 

460 ampEdgeGoodFrac, 

461 minFracFail, 

462 maxOffsetFail, 

463 ) = self.getInterfaceOffset(ampsNames[ampId], ampsNames[ampNeighbor], edgeA, edgeB) 

464 interfaceIds.append(interfaceId) 

465 interfaceOffsetOriginals.append(interfaceOffsetOriginal) 

466 ampEdgeGoodFracs.append(ampEdgeGoodFrac) 

467 minFracFails.append(minFracFail) 

468 maxOffsetFails.append(maxOffsetFail) 

469 interfaceOffsetLookup[f"{ampId:02d}:{ampNeighbor:02d}"] = interfaceOffset 

470 else: 

471 interfaceOffset = -interfaceOffsetLookup[f"{ampNeighbor:02d}:{ampId:02d}"] 

472 ampOffsets[ampId] += interfaceWeight * interfaceOffset 

473 if interfaceOffsetOriginals: 

474 self.log.debug( 

475 "Raw (uncapped) amp offset values for all interfaces: %s", 

476 ", ".join( 

477 [ 

478 f"{interfaceId}={interfaceOffset:0.2f}" 

479 for interfaceId, interfaceOffset in zip(interfaceIds, interfaceOffsetOriginals) 

480 ] 

481 ), 

482 ) 

483 quartile_summary = np.nanpercentile(interfaceOffsetOriginals, [0, 25, 50, 75, 100]) 

484 self.log.info( 

485 "Raw amp offset quartile summary for all interfaces (min, Q1, Q2, Q3, max): " 

486 "%.4f, %.4f, %.4f, %.4f, %.4f", 

487 *quartile_summary, 

488 ) 

489 log_fn = self.log.warning if self.config.doApplyAmpOffset else self.log.info 

490 if any(minFracFails): 

491 log_fn( 

492 "The fraction of unmasked edge pixels for the following amp interfaces is below the " 

493 "configured threshold (%s): %s", 

494 self.config.ampEdgeMinFrac, 

495 ", ".join( 

496 [ 

497 f"{interfaceId} ({ampEdgeGoodFrac:0.2f})" 

498 for interfaceId, ampEdgeGoodFrac, minFracFail in zip( 

499 interfaceIds, ampEdgeGoodFracs, minFracFails 

500 ) 

501 if minFracFail 

502 ] 

503 ), 

504 ) 

505 if any(maxOffsetFails): 

506 log_fn( 

507 "Absolute amp offsets exceed the configured maximum (%s) and have been set to zero for the " 

508 "following amp interfaces: %s", 

509 self.config.ampEdgeMaxOffset, 

510 ", ".join( 

511 [ 

512 f"{interfaceId}={np.abs(interfaceOffset):0.2f}" 

513 for interfaceId, interfaceOffset, maxOffsetFail in zip( 

514 interfaceIds, interfaceOffsetOriginals, maxOffsetFails 

515 ) 

516 if maxOffsetFail 

517 ] 

518 ), 

519 ) 

520 

521 # Pair each interface ID with its corresponding original offset. 

522 interfaceOffsetDict = dict(zip(interfaceIds, interfaceOffsetOriginals)) 

523 

524 return ampOffsets, interfaceOffsetDict 

525 

526 def getAmpEdges(self, im, amps, ampSides): 

527 """Calculate the amp edges for all amplifiers. 

528 

529 Parameters 

530 ---------- 

531 im : `lsst.afw.image._image.ImageF` 

532 Amplifier image to extract data from. 

533 amps : `list` [`lsst.afw.cameraGeom.Amplifier`] 

534 List of amplifier objects. 

535 ampSides : `numpy.ndarray` 

536 An N x N matrix containing amp side information, where N is the 

537 number of amplifiers. 

538 

539 Returns 

540 ------- 

541 ampEdges : `dict` [`int`, `dict` [`int`, `numpy.ndarray`]] 

542 A dictionary containing amp edge(s) for each amplifier, 

543 corresponding to one or more potential sides, where each edge is 

544 associated with a side. The outer dictionary has integer keys 

545 representing amplifier IDs, and the inner dictionary has integer 

546 keys representing side IDs for each amplifier and values that are 

547 1D arrays of floats representing the median values of strips from 

548 the amp image, referred to as an "amp edge": 

549 {ampID: {sideID: numpy.ndarray}, ...} 

550 """ 

551 ampEdgeOuter = self.config.ampEdgeInset + self.config.ampEdgeWidth 

552 ampEdges = {} 

553 slice_map = { 

554 0: (slice(-ampEdgeOuter, -self.config.ampEdgeInset), slice(None)), 

555 1: (slice(None), slice(-ampEdgeOuter, -self.config.ampEdgeInset)), 

556 2: (slice(self.config.ampEdgeInset, ampEdgeOuter), slice(None)), 

557 3: (slice(None), slice(self.config.ampEdgeInset, ampEdgeOuter)), 

558 } 

559 for ampId, (amp, ampSides) in enumerate(zip(amps, ampSides)): 

560 ampEdges[ampId] = {} 

561 ampIm = im[amp.getBBox()].array 

562 # Loop over identified sides. 

563 for ampSide in ampSides: 

564 if ampSide < 0: 

565 continue 

566 strip = ampIm[slice_map[ampSide]] 

567 # Catch warnings to prevent all-NaN slice RuntimeWarning. 

568 with warnings.catch_warnings(): 

569 warnings.filterwarnings("ignore", r"All-NaN (slice|axis) encountered") 

570 ampEdges[ampId][ampSide] = np.nanmedian(strip, axis=ampSide % 2) # 1D median strip 

571 return ampEdges 

572 

573 def getInterfaceOffset(self, ampNameA, ampNameB, edgeA, edgeB): 

574 """Calculate the amp offset for a given interface between two 

575 amplifiers. 

576 

577 Parameters 

578 ---------- 

579 ampNameA : str 

580 Name of the first amplifier. 

581 ampNameB : str 

582 Name of the second amplifier. 

583 edgeA : numpy.ndarray 

584 Amp edge for the first amplifier. 

585 edgeB : numpy.ndarray 

586 Amp edge for the second amplifier. 

587 

588 Returns 

589 ------- 

590 interfaceId : str 

591 Identifier for the interface between amps A and B, formatted as 

592 "A-B", where A and B are the respective amp names. 

593 interfaceOffset : float 

594 The calculated amp offset value for the given interface between 

595 amps A and B. 

596 interfaceOffsetOriginal : float 

597 The original calculated amp offset value for the given interface 

598 between amps A and B. 

599 ampEdgeGoodFrac : float 

600 Fraction of viable pixel rows along the amp edge. 

601 minFracFail : bool 

602 True if the fraction of unmasked pixel rows is below the 

603 ampEdgeMinFrac threshold. 

604 maxOffsetFail : bool 

605 True if the absolute offset value exceeds the ampEdgeMaxOffset 

606 threshold. 

607 """ 

608 interfaceId = f"{ampNameA}-{ampNameB}" 

609 if np.all(np.isnan(edgeA)) or np.all(np.isnan(edgeB)): 

610 self.log.warning(f"Amp interface {interfaceId} is fully masked; setting amp offset to zero.") 

611 return interfaceId, 0.0, 0.0, 0.0, True, False 

612 

613 sctrl = StatisticsControl() 

614 # NOTE: Taking the difference with the order below fixes the sign flip 

615 # in the B matrix. 

616 edgeDiff = edgeA - edgeB 

617 if self.config.doWindowSmoothing: 

618 # Compute rolling averages. 

619 window = int(self.config.ampEdgeWindowFrac * len(edgeDiff)) 

620 edgeDiffSum = np.convolve(np.nan_to_num(edgeDiff), np.ones(window), "same") 

621 edgeDiffNum = np.convolve(~np.isnan(edgeDiff), np.ones(window), "same") 

622 edgeDiffAvg = edgeDiffSum / np.clip(edgeDiffNum, 1, None) 

623 else: 

624 # Directly use the difference. 

625 edgeDiffAvg = edgeDiff.copy() 

626 edgeDiffAvg[np.isnan(edgeDiff)] = np.nan 

627 # Take clipped mean of rolling average data as amp offset value. 

628 interfaceOffset = makeStatistics(edgeDiffAvg, MEANCLIP, sctrl).getValue() 

629 interfaceOffsetOriginal = interfaceOffset 

630 ampEdgeGoodFrac = 1 - (np.sum(np.isnan(edgeDiffAvg)) / len(edgeDiffAvg)) 

631 

632 # Perform a couple of do-no-harm safety checks: 

633 # a) The fraction of unmasked pixel rows is > ampEdgeMinFrac, 

634 # b) The absolute offset ADU value is < ampEdgeMaxOffset. 

635 minFracFail = ampEdgeGoodFrac < self.config.ampEdgeMinFrac 

636 maxOffsetFail = np.abs(interfaceOffset) > self.config.ampEdgeMaxOffset 

637 if minFracFail or maxOffsetFail: 

638 interfaceOffset = 0 

639 self.log.debug( 

640 f"amp interface '{interfaceId}': " 

641 f"viable edge difference frac = {ampEdgeGoodFrac:.2f}, " 

642 f"amp offset = {interfaceOffsetOriginal:.3f}" 

643 ) 

644 return ( 

645 interfaceId, 

646 interfaceOffset, 

647 interfaceOffsetOriginal, 

648 ampEdgeGoodFrac, 

649 minFracFail, 

650 maxOffsetFail, 

651 )