Coverage for python / lsst / cp / pipe / cpQuadNotch.py: 0%

231 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-23 08:59 +0000

1# This file is part of cp_pipe. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21import numpy as np 

22 

23import lsst.afw.detection as afwDetect 

24import lsst.pex.config as pexConfig 

25import lsst.pipe.base as pipeBase 

26import lsst.pipe.base.connectionTypes as cT 

27 

28from astropy.stats import sigma_clipped_stats 

29from astropy.table import Table 

30from scipy.optimize import curve_fit 

31 

32 

33__all__ = ["QuadNotchExtractConfig", "QuadNotchExtractTask", 

34 "QuadNotchMergeConfig", "QuadNotchMergeTask"] 

35 

36 

37class QuadNotchExtractConnections(pipeBase.PipelineTaskConnections, 

38 dimensions=("instrument", "exposure", "detector")): 

39 inputExp = cT.Input( 

40 name="post_isr_quadnotch", 

41 doc="Input ISR-processed exposures to combine.", 

42 storageClass="Exposure", 

43 dimensions=("instrument", "exposure", "detector"), 

44 multiple=False, 

45 deferLoad=False, 

46 ) 

47 camera = cT.PrerequisiteInput( 

48 name="camera", 

49 storageClass="Camera", 

50 doc="Input camera to construct complete exposures.", 

51 dimensions=["instrument"], 

52 isCalibration=True, 

53 ) 

54 

55 outputData = cT.Output( 

56 name="quadNotchSingle", 

57 doc="Output quad-notch analysis.", 

58 storageClass="ArrowAstropy", 

59 dimensions=("instrument", "exposure", "detector"), 

60 ) 

61 

62 

63class QuadNotchExtractConfig(pipeBase.PipelineTaskConfig, 

64 pipelineConnections=QuadNotchExtractConnections): 

65 """Configuration for quad-notch processing.""" 

66 nSigma = pexConfig.Field( 

67 dtype=float, 

68 default=2.0, 

69 doc="Significance of detected objects.", 

70 ) 

71 nPixMin = pexConfig.Field( 

72 dtype=int, 

73 default=5, 

74 doc="Minimum area for a detected object.", 

75 ) 

76 detectionType = pexConfig.ChoiceField( 

77 dtype=str, 

78 doc="Which kind of threshold to use for detection.", 

79 default="VALUE", 

80 allowed={ 

81 "STDEV": "afwDetect.STDEV", 

82 "VALUE": "afwDetect.VALUE, using manual STDEV calculation.", 

83 }, 

84 ) 

85 grow = pexConfig.Field( 

86 dtype=int, 

87 default=0, 

88 doc="If non-zero, grow footprints by this amount.", 

89 ) 

90 xWindow = pexConfig.Field( 

91 dtype=int, 

92 default=0, 

93 doc="Window around the central object detected that we will " 

94 "look for the quad fluxes in.", 

95 ) 

96 yWindow = pexConfig.Field( 

97 dtype=int, 

98 default=50, 

99 doc="Additional widening along the y-axis." 

100 ) 

101 xGauge = pexConfig.Field( 

102 dtype=float, 

103 default=1.75, 

104 doc="Scale multiplier of the FWHM in the x-direction for each band.", 

105 ) 

106 threshold = pexConfig.Field( 

107 dtype=float, 

108 default=1.2e5, 

109 doc="Threshold above which fluxes are measured.", 

110 ) 

111 targetReplacements = pexConfig.DictField( 

112 keytype=str, 

113 itemtype=str, 

114 doc="Dictionary of target names with replacements; Any not specified will not be changed.", 

115 default={ 

116 "MU-COL": "HD38666" 

117 } 

118 ) 

119 

120 

121class QuadNotchExtractTask(pipeBase.PipelineTask): 

122 """Task to measure quad-notch data.""" 

123 

124 ConfigClass = QuadNotchExtractConfig 

125 _DefaultName = "quadNotchExtract" 

126 

127 FLAG_SUCCESS = 0x0 

128 FLAG_UNKNOWN_ERROR = 0x1 

129 FLAG_NO_FOOTPRINT_FOUND = 0x2 

130 

131 def __init__(self, **kwargs): 

132 super().__init__(**kwargs) 

133 

134 def runQuantum(self, butlerQC, inputRefs, outputRefs): 

135 inputs = butlerQC.get(inputRefs) 

136 

137 inputs["inputDims"] = dict(inputRefs.inputExp.dataId.required) 

138 

139 outputs = self.run(**inputs) 

140 butlerQC.put(outputs, outputRefs) 

141 

142 def run(self, inputExp, camera, inputDims): 

143 """Quadnotch extraction task. 

144 

145 Parameters 

146 ---------- 

147 inputExp : `lsst.afw.image.Exposure` 

148 Input exposure to do analysis on. 

149 camera : `lsst.afw.cameraGeom.Camera` 

150 Camera geometry to use. 

151 inputDims: `dict` 

152 Dictionary of input dimensions. 

153 

154 Returns 

155 ------- 

156 outputData : `astropy.table.Table` 

157 Extracted data for this exposure. 

158 """ 

159 row = {} 

160 detector = inputExp.getDetector() 

161 

162 # Get visitInfo based values: 

163 row['airmass'] = inputExp.visitInfo.boresightAirmass 

164 row['azimuth'] = inputExp.visitInfo.boresightAzAlt[0].asDegrees() 

165 row['altitude'] = inputExp.visitInfo.boresightAzAlt[1].asDegrees() 

166 # Cast to isoformat so we don't try to persist an object 

167 # in the output table. 

168 row['date'] = inputExp.visitInfo.date.toPython().isoformat() 

169 row['hourangle'] = inputExp.visitInfo.boresightHourAngle.asDegrees() 

170 row['expTime'] = inputExp.visitInfo.exposureTime 

171 row['target'] = inputExp.visitInfo.object 

172 if row['target'] in self.config.targetReplacements.keys(): 

173 row['target'] = self.config.targetReplacments[row['target']] 

174 

175 # These can be gotten from the butler, but are also in the header. 

176 metadata = inputExp.getMetadata() 

177 row['day_obs'] = int(metadata.get("DAYOBS", 0)) 

178 row['exposureId'] = inputDims['exposure'] 

179 

180 # This can be retrieved from consDB, but not really, because 

181 # we're not supposed to access consDB in pipetasks. 

182 row['mount_jitter'] = np.nan 

183 

184 # Also from visitInfo, but this is where things start to happen. 

185 target_amp = 3 

186 centered = False 

187 

188 match inputExp.visitInfo.observationReason: 

189 case "x_offset_50": 

190 target_amp = 2 

191 centered = True 

192 case "x_offset_-50": 

193 target_amp = 4 

194 centered = True 

195 case "x_offset_0": 

196 target_amp = 3 

197 centered = True 

198 case _: 

199 pass 

200 

201 row['target_amp'] = target_amp 

202 row['centered'] = centered 

203 row['flag'] = self.FLAG_SUCCESS 

204 

205 bbox = detector[target_amp].getBBox() 

206 # ampImage = inputExp[bbox] # unused error 

207 # I think this gets returned on failure? 

208 

209 fpSet = self.findObjects(inputExp) 

210 if len(fpSet.getFootprints()) == 0: 

211 rowAddendum = self._returnFailure(self.FLAG_NO_FOOTPRINT_FOUND) 

212 else: 

213 starCentroid, centerX = self.getCutoutLocation(inputExp, fpSet) 

214 

215 # If we failed to find a center earlier, find one now. 

216 # This does not handle the case where the commanded offset 

217 # resulted in a bad exposure. 

218 while centered is False: 

219 if centerX < bbox.minX: 

220 target_amp -= 1 

221 bbox = detector[target_amp].getBBox() 

222 row['target_amp'] = target_amp 

223 elif centerX > bbox.maxX: 

224 target_amp += 1 

225 bbox = detector[target_amp].getBBox() 

226 row['target_amp'] = target_amp 

227 else: 

228 centered = True 

229 

230 cutout = inputExp[bbox] 

231 

232 rowAddendum = self.getAdvancedFluxes(cutout) 

233 

234 # Combine base row with updates 

235 row.update(rowAddendum) 

236 outputTable = Table([row]) 

237 

238 return pipeBase.Struct( 

239 outputData=outputTable, 

240 ) 

241 

242 @staticmethod 

243 def _returnFailure(flag): 

244 row = {} 

245 row['flux'] = [np.nan, np.nan, np.nan, np.nan] 

246 row['percentiles'] = np.zeros((4, 41)) 

247 row['centroids'] = [(0, 0), (0, 0), (0, 0), (0, 0)] 

248 row['background'] = np.nan 

249 row['flag'] = flag 

250 row['fwhm'] = [0, 0, 0, 0] 

251 row['counts_above_threshold'] = [0, 0, 0, 0] 

252 # This doesn't return everything it should 

253 return row 

254 

255 def findObjects(self, exposure): 

256 """Quick detection method. 

257 

258 Parameters 

259 ---------- 

260 exposure : `lsst.afw.image.Exposure` 

261 The exposure to detect objects in. 

262 

263 Returns 

264 ------- 

265 footprints : `lsst.afw.detection.FootprintSet` 

266 Footprints of detections 

267 

268 Notes 

269 ----- 

270 This comes from summit_utils/utils.py/detectObjectsInExp. 

271 """ 

272 exposureCopy = exposure.clone() 

273 median = np.nanmedian(exposureCopy.image.array) 

274 exposureCopy.image -= median 

275 

276 if self.config.detectionType == "VALUE": 

277 stdev = np.nanstd(exposureCopy.image.array) 

278 threshold = afwDetect.Threshold(self.config.nSigma * stdev, 

279 afwDetect.Threshold.VALUE) 

280 else: 

281 threshold = afwDetect.Threshold(self.config.nSigma, 

282 afwDetect.Threshold.STDEV) 

283 

284 footPrintSet = afwDetect.FootprintSet(exposureCopy.getMaskedImage(), 

285 threshold, 

286 "DETECTED", 

287 self.config.nPixMin) 

288 if self.config.grow > 0: 

289 isotropic = True 

290 footPrintSet = afwDetect.FootprintSet(footPrintSet, self.config.grow, isotropic) 

291 

292 return footPrintSet 

293 

294 def getCutoutLocation(self, inputExp, fpSet): 

295 """Search footprint list for the brightest object. 

296 

297 Parameters 

298 ---------- 

299 inputExp : `lsst.afw.image.Exposure` 

300 Exposure to study. 

301 fpSet : `lsst.afw.detection.FootprintSet` 

302 List of detected footprints. 

303 

304 Returns 

305 ------- 

306 centroid : `lsst.geom.Point2D` 

307 The centroid of the brightest object. 

308 centerX : `float` 

309 X-axis coordinate of the center of the object. 

310 """ 

311 median = np.nanmedian(inputExp.image.array) 

312 centerOfMass_numerator = 0 

313 centerOfMass_denominator = 0 

314 

315 fluxes = [] 

316 centroids = [] 

317 for fp in fpSet.getFootprints(): 

318 if fp.getArea() < self.config.nPixMin: 

319 continue 

320 centroid = fp.getCentroid() 

321 height = fp.getBBox().height 

322 flux = fp.getSpans().flatten(inputExp.image.array - median).sum() 

323 centerOfMass_numerator += centroid[0]*flux*height 

324 centerOfMass_denominator += flux*height 

325 

326 fluxes.append(flux) 

327 centroids.append(centroid) 

328 

329 # Take the largest flux centroid as the central star. This 

330 # should probably have shape checks. 

331 starCentroidIndex = np.array(fluxes).argmax() 

332 starCentroid = centroids[starCentroidIndex] 

333 centerX = centerOfMass_numerator / centerOfMass_denominator 

334 

335 return starCentroid, centerX 

336 

337 def getAdvancedFluxes(self, exp): 

338 """Measure the fluxes within the expected four spectral bandpasses. 

339 

340 Parameters 

341 ---------- 

342 exp : `lsst.afw.image.Exposure` 

343 Single amp cutout to consider. 

344 

345 Returns 

346 ------- 

347 row : `dict` 

348 Addendum to the output row. 

349 """ 

350 # This method removes the plots that were in the original. 

351 row = {} 

352 

353 fpSet = self.findObjects(exp) 

354 if (fpLen := len(fpSet.getFootprints())) < 4: 

355 if fpLen == 0: 

356 return self._returnFailure(self.FLAG_NO_FOOTPRINT_FOUND) 

357 else: 

358 return self._returnFailure(self.FLAG_UNKNOWN_ERROR) 

359 

360 centroids = [] 

361 bboxes = [] 

362 for fp in fpSet.getFootprints(): 

363 centroid = fp.getCentroid() 

364 ampBBox = exp.getBBox() 

365 # TODO: DM-52259 This should be configurable 

366 if centroid[1] < ampBBox.getMaxY() - 100: 

367 centroids.append(centroid) 

368 bboxes.append(fp.getBBox()) 

369 if len(bboxes) < 4: 

370 return self._returnFailure(self.FLAG_UNKNOWN_ERROR) 

371 

372 center_guess_x = np.array(centroids).T[0] - exp.image.getX0() 

373 left = np.zeros(4) 

374 right = np.zeros(4) 

375 for idx in range(4): 

376 left[idx] = bboxes[idx].minY - exp.image.getY0() 

377 right[idx] = bboxes[idx].maxY - exp.image.getY0() 

378 

379 bin_edges = [int(left[0] - 2*self.config.yWindow), 

380 int(left[1] + right[0]) // 2, 

381 int(left[2] + right[1]) // 2, 

382 int(left[3] + right[2]) // 2, 

383 int(right[3] + self.config.yWindow)] 

384 

385 # now find midpoint in y, estimate midpoint for x 

386 x_centers = np.zeros(4, dtype=int) 

387 x_min = np.zeros(4, dtype=int) 

388 x_max = np.zeros(4, dtype=int) 

389 

390 alt_centroids = [] 

391 fwhms = np.zeros(4, dtype=int) 

392 fit_parameters = [] 

393 

394 for idx in range(4): 

395 mid_y = int((bin_edges[idx + 1] - bin_edges[idx])/2. + bin_edges[idx]) 

396 # TODO: DM-52259 This should be configurable 

397 x_data = np.median(exp.image.array[mid_y - 5:mid_y + 5, :], axis=0) 

398 alt_x_vals = np.arange(len(x_data)) 

399 

400 x_norm = (x_data - x_data.min())/(x_data.max() - x_data.min()) 

401 

402 theta, covariance = curve_fit(self._d_gaussian, 

403 alt_x_vals, 

404 x_norm, 

405 p0=[0.5, center_guess_x[idx], 3, 20], 

406 bounds=(0, [1, 400, 25, 100]), 

407 max_nfev=10_000) 

408 fit_parameters.append(theta) 

409 x_centers[idx] = int(theta[1]) 

410 alt_centroids.append([x_centers[idx], mid_y]) 

411 if self.config.xWindow == 0: 

412 scale = 2*np.sqrt(2*np.log(2)) 

413 fwhm_1 = int(scale * np.abs(theta[2])) 

414 fwhm_2 = int(scale * np.abs(theta[3])) 

415 fwhm = int(theta[0]*fwhm_1 + 2*(1-theta[0])*fwhm_2) 

416 

417 fwhms[idx] = 2*int(self.config.xGauge*fwhm) + 20 

418 

419 if self.config.xWindow != 0: 

420 fwhms = self.config.xWindow*np.ones(4, dtype=int) 

421 max_width = np.max(fwhms) 

422 

423 x_min = (x_centers - int(max_width/2)) 

424 x_max = (x_centers + int(max_width/2)) 

425 

426 # Use center to get both data and background boxes: 

427 flag = self.FLAG_SUCCESS 

428 flux = [] 

429 background = [] 

430 percentiles = [] 

431 counts_above_threshold = [] 

432 ranks = np.arange(60, 101) 

433 

434 for idx in range(4): 

435 # Background first: 

436 # TODO: DM-52259 This should be configurable 

437 left = exp.image.array[bin_edges[idx]:bin_edges[idx + 1], 

438 x_min[idx] - 10:x_min[idx]] 

439 right = exp.image.array[bin_edges[idx]:bin_edges[idx + 1], 

440 x_max[idx]: x_max[idx] + 10] 

441 background_box = np.concatenate((left, right), axis=1) 

442 _, background_vec, _ = sigma_clipped_stats(background_box, axis=1) 

443 

444 # Define aperture box: 

445 signal_box = exp.image.array[bin_edges[idx]:bin_edges[idx + 1], 

446 x_min[idx]:x_max[idx]] 

447 corrected = signal_box - background_vec[:, np.newaxis] 

448 

449 background.append(np.mean(background_vec)) 

450 flux.append(np.sum(corrected)) 

451 counts_above_threshold.append(np.sum(corrected > self.config.threshold)) 

452 

453 if bin_edges[0] <= 0 or signal_box.size == 0: 

454 percentiles.append(np.zeros_like(ranks)) 

455 flag = self.FLAG_UNKNOWN_ERROR | self.FLAG_NO_FOOTPRINT_FOUND 

456 else: 

457 percentiles.append(np.percentile(corrected, ranks)) 

458 

459 row['flux'] = flux 

460 row['percentiles'] = percentiles 

461 row['centroids'] = alt_centroids 

462 row['background'] = np.mean(np.array(background)) # This is mean of means of bkg_vec. 

463 row['flag'] = flag 

464 row['fwhm'] = fwhms 

465 row['counts_above_threshold'] = counts_above_threshold 

466 

467 return row 

468 

469 @staticmethod 

470 def _d_gaussian(x, a, mean, sigma_1, sigma_2): 

471 """Double Gaussian function. 

472 

473 Parameters 

474 ---------- 

475 x : `float` or `np.array` 

476 Position to evalueate the double Gaussian. 

477 a : `float` 

478 Amplitude of the first component. The second component is 1-a. 

479 mean : `mean` 

480 Mean for both components of the double Gaussian. 

481 sigma_1 : `float` 

482 Standard deviation for the first component. 

483 sigma_2 : `float` 

484 Standard deviation for the second component. 

485 

486 Returns 

487 ------- 

488 value : `float` or `np.array` 

489 The double Gaussian value. 

490 """ 

491 return a*np.exp(-(x-mean)**2/(2*sigma_1**2)) + (1-a)*np.exp(-(x-mean)**2/(2*sigma_2**2)) 

492 

493 

494class QuadNotchMergeConnections(pipeBase.PipelineTaskConnections, 

495 dimensions=("instrument", "detector")): 

496 inputData = cT.Input( 

497 name="quadNotchSingle", 

498 doc="Quad-notch measurements from individual exposures.", 

499 storageClass="ArrowAstropy", 

500 dimensions=("instrument", "exposure", "detector"), 

501 multiple=True, 

502 deferLoad=True, 

503 ) 

504 outputData = cT.Output( 

505 name="quadNotchCombined", 

506 doc="Output combined quad-notch analysis.", 

507 storageClass="ArrowAstropy", 

508 dimensions=("instrument", "detector"), 

509 ) 

510 

511 

512class QuadNotchMergeConfig(pipeBase.PipelineTaskConfig, 

513 pipelineConnections=QuadNotchMergeConnections): 

514 """Configuration for quad-notch processing.""" 

515 nSigma = pexConfig.Field( 

516 dtype=float, 

517 default=2.0, 

518 doc="This is a dummy parameter that isn't used, but I didn't want no config here.", 

519 ) 

520 

521 

522class QuadNotchMergeTask(pipeBase.PipelineTask): 

523 """Task to measure quad-notch data.""" 

524 

525 ConfigClass = QuadNotchMergeConfig 

526 _DefaultName = "quadNotchMerge" 

527 

528 def __init__(self, **kwargs): 

529 super().__init__(**kwargs) 

530 

531 def run(self, inputData): 

532 """ 

533 """ 

534 rows = [] 

535 

536 for dataset in inputData: 

537 data = dataset.get() 

538 rows.append(data[0]) 

539 

540 outputTable = Table(rows) 

541 return pipeBase.Struct( 

542 outputData=outputTable, 

543 )