Coverage for python / lsst / cp / pipe / cpQuadNotch.py: 0%

224 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-15 00:22 +0000

1# This file is part of cp_pipe. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21import numpy as np 

22 

23import lsst.afw.detection as afwDetect 

24import lsst.pex.config as pexConfig 

25import lsst.pipe.base as pipeBase 

26import lsst.pipe.base.connectionTypes as cT 

27 

28from astropy.stats import sigma_clipped_stats 

29from astropy.table import Table 

30from scipy.optimize import curve_fit 

31 

32 

33__all__ = ["QuadNotchExtractConfig", "QuadNotchExtractTask", 

34 "QuadNotchMergeConfig", "QuadNotchMergeTask"] 

35 

36 

37class QuadNotchExtractConnections(pipeBase.PipelineTaskConnections, 

38 dimensions=("instrument", "exposure", "detector")): 

39 inputExp = cT.Input( 

40 name="post_isr_quadnotch", 

41 doc="Input ISR-processed exposures to combine.", 

42 storageClass="Exposure", 

43 dimensions=("instrument", "exposure", "detector"), 

44 multiple=False, 

45 deferLoad=False, 

46 ) 

47 camera = cT.PrerequisiteInput( 

48 name="camera", 

49 storageClass="Camera", 

50 doc="Input camera to construct complete exposures.", 

51 dimensions=["instrument"], 

52 isCalibration=True, 

53 ) 

54 

55 outputData = cT.Output( 

56 name="quadNotchSingle", 

57 doc="Output quad-notch analysis.", 

58 storageClass="ArrowAstropy", 

59 dimensions=("instrument", "exposure", "detector"), 

60 ) 

61 

62 

63class QuadNotchExtractConfig(pipeBase.PipelineTaskConfig, 

64 pipelineConnections=QuadNotchExtractConnections): 

65 """Configuration for quad-notch processing.""" 

66 nSigma = pexConfig.Field( 

67 dtype=float, 

68 default=2.0, 

69 doc="Significance of detected objects.", 

70 ) 

71 nPixMin = pexConfig.Field( 

72 dtype=int, 

73 default=5, 

74 doc="Minimum area for a detected object.", 

75 ) 

76 grow = pexConfig.Field( 

77 dtype=int, 

78 default=0, 

79 doc="If non-zero, grow footprints by this amount.", 

80 ) 

81 xWindow = pexConfig.Field( 

82 dtype=int, 

83 default=0, 

84 doc="Window around the central object detected that we will " 

85 "look for the quad fluxes in.", 

86 ) 

87 yWindow = pexConfig.Field( 

88 dtype=int, 

89 default=50, 

90 doc="Additional widening along the y-axis." 

91 ) 

92 xGauge = pexConfig.Field( 

93 dtype=float, 

94 default=1.75, 

95 doc="Scale multiplier of the FWHM in the x-direction for each band.", 

96 ) 

97 threshold = pexConfig.Field( 

98 dtype=float, 

99 default=1.2e5, 

100 doc="Threshold above which fluxes are measured.", 

101 ) 

102 targetReplacements = pexConfig.DictField( 

103 keytype=str, 

104 itemtype=str, 

105 doc="Dictionary of target names with replacements; Any not specified will not be changed.", 

106 default={ 

107 "MU-COL": "HD38666" 

108 } 

109 ) 

110 

111 

112class QuadNotchExtractTask(pipeBase.PipelineTask): 

113 """Task to measure quad-notch data.""" 

114 

115 ConfigClass = QuadNotchExtractConfig 

116 _DefaultName = "quadNotchExtract" 

117 

118 FLAG_SUCCESS = 0x0 

119 FLAG_UNKNOWN_ERROR = 0x1 

120 FLAG_NO_FOOTPRINT_FOUND = 0x2 

121 

122 def __init__(self, **kwargs): 

123 super().__init__(**kwargs) 

124 

125 def runQuantum(self, butlerQC, inputRefs, outputRefs): 

126 inputs = butlerQC.get(inputRefs) 

127 

128 inputs["inputDims"] = dict(inputRefs.inputExp.dataId.required) 

129 

130 outputs = self.run(**inputs) 

131 butlerQC.put(outputs, outputRefs) 

132 

133 def run(self, inputExp, camera, inputDims): 

134 """Quadnotch extraction task. 

135 

136 Parameters 

137 ---------- 

138 inputExp : `lsst.afw.image.Exposure` 

139 Input exposure to do analysis on. 

140 camera : `lsst.afw.cameraGeom.Camera` 

141 Camera geometry to use. 

142 inputDims: `dict` 

143 Dictionary of input dimensions. 

144 

145 Returns 

146 ------- 

147 outputData : `astropy.table.Table` 

148 Extracted data for this exposure. 

149 """ 

150 row = {} 

151 detector = inputExp.getDetector() 

152 

153 # Get visitInfo based values: 

154 row['airmass'] = inputExp.visitInfo.boresightAirmass 

155 row['azimuth'] = inputExp.visitInfo.boresightAzAlt[0].asDegrees() 

156 row['altitude'] = inputExp.visitInfo.boresightAzAlt[1].asDegrees() 

157 # Cast to isoformat so we don't try to persist an object 

158 # in the output table. 

159 row['date'] = inputExp.visitInfo.date.toPython().isoformat() 

160 row['hourangle'] = inputExp.visitInfo.boresightHourAngle.asDegrees() 

161 row['expTime'] = inputExp.visitInfo.exposureTime 

162 row['target'] = inputExp.visitInfo.object 

163 if row['target'] in self.config.targetReplacements.keys(): 

164 row['target'] = self.config.targetReplacments[row['target']] 

165 

166 # These can be gotten from the butler, but are also in the header. 

167 metadata = inputExp.getMetadata() 

168 row['day_obs'] = int(metadata.get("DAYOBS", 0)) 

169 row['exposureId'] = inputDims['exposure'] 

170 

171 # This can be retrieved from consDB, but not really, because 

172 # we're not supposed to access consDB in pipetasks. 

173 row['mount_jitter'] = np.nan 

174 

175 # Also from visitInfo, but this is where things start to happen. 

176 target_amp = 3 

177 centered = False 

178 

179 match inputExp.visitInfo.observationReason: 

180 case "x_offset_50": 

181 target_amp = 2 

182 centered = True 

183 case "x_offset_-50": 

184 target_amp = 4 

185 centered = True 

186 case "x_offset_0": 

187 target_amp = 3 

188 centered = True 

189 case _: 

190 pass 

191 

192 row['target_amp'] = target_amp 

193 row['centered'] = centered 

194 row['flags'] = self.FLAG_SUCCESS 

195 

196 bbox = detector[target_amp].getBBox() 

197 # ampImage = inputExp[bbox] # unused error 

198 # I think this gets returned on failure? 

199 

200 fpSet = self.findObjects(inputExp) 

201 if len(fpSet.getFootprints()) == 0: 

202 rowAddendum = self._returnFailure(self.FLAG_NO_FOOTPRINT_FOUND) 

203 else: 

204 starCentroid, centerX = self.getCutoutLocation(inputExp, fpSet) 

205 

206 # If we failed to find a center earlier, find one now. 

207 # This does not handle the case where the commanded offset 

208 # resulted in a bad exposure. 

209 while centered is False: 

210 if centerX < bbox.minX: 

211 target_amp -= 1 

212 bbox = detector[target_amp].getBBox() 

213 row['target_amp'] = target_amp 

214 elif centerX > bbox.maxX: 

215 target_amp += 1 

216 bbox = detector[target_amp].getBBox() 

217 row['target_amp'] = target_amp 

218 else: 

219 centered = True 

220 

221 cutout = inputExp[bbox] 

222 

223 rowAddendum = self.getAdvancedFluxes(cutout) 

224 

225 # Combine base row with updates 

226 row.update(rowAddendum) 

227 outputTable = Table([row]) 

228 

229 return pipeBase.Struct( 

230 outputData=outputTable, 

231 ) 

232 

233 @staticmethod 

234 def _returnFailure(flag): 

235 row = {} 

236 row['flux'] = [] 

237 row['cdf95'] = [] 

238 row['centroids'] = [] 

239 row['background'] = [] 

240 row['flags'] = flag 

241 # This doesn't return everything it should 

242 return row 

243 

244 def findObjects(self, exposure): 

245 """Quick detection method. 

246 

247 Parameters 

248 ---------- 

249 exposure : `lsst.afw.image.Exposure` 

250 The exposure to detect objects in. 

251 

252 Returns 

253 ------- 

254 footprints : `lsst.afw.detection.FootprintSet` 

255 Footprints of detections 

256 

257 Notes 

258 ----- 

259 This comes from summit_utils/utils.py/detectObjectsInExp. 

260 """ 

261 exposureCopy = exposure.clone() 

262 median = np.nanmedian(exposureCopy.image.array) 

263 exposureCopy.image -= median 

264 

265 threshold = afwDetect.Threshold(self.config.nSigma, afwDetect.Threshold.STDEV) 

266 footPrintSet = afwDetect.FootprintSet(exposureCopy.getMaskedImage(), 

267 threshold, 

268 "DETECTED", 

269 self.config.nPixMin) 

270 if self.config.grow > 0: 

271 isotropic = True 

272 footPrintSet = afwDetect.FootprintSet(footPrintSet, self.config.grow, isotropic) 

273 

274 return footPrintSet 

275 

276 def getCutoutLocation(self, inputExp, fpSet): 

277 """Search footprint list for the brightest object. 

278 

279 Parameters 

280 ---------- 

281 inputExp : `lsst.afw.image.Exposure` 

282 Exposure to study. 

283 fpSet : `lsst.afw.detection.FootprintSet` 

284 List of detected footprints. 

285 

286 Returns 

287 ------- 

288 centroid : `lsst.geom.Point2D` 

289 The centroid of the brightest object. 

290 centerX : `float` 

291 X-axis coordinate of the center of the object. 

292 """ 

293 median = np.nanmedian(inputExp.image.array) 

294 centerOfMass_numerator = 0 

295 centerOfMass_denominator = 0 

296 

297 fluxes = [] 

298 centroids = [] 

299 for fp in fpSet.getFootprints(): 

300 if fp.getArea() < self.config.nPixMin: 

301 continue 

302 centroid = fp.getCentroid() 

303 height = fp.getBBox().height 

304 flux = fp.getSpans().flatten(inputExp.image.array - median).sum() 

305 centerOfMass_numerator += centroid[0]*flux*height 

306 centerOfMass_denominator += flux*height 

307 

308 fluxes.append(flux) 

309 centroids.append(centroid) 

310 

311 # Take the largest flux centroid as the central star. This 

312 # should probably have shape checks. 

313 starCentroidIndex = np.array(fluxes).argmax() 

314 starCentroid = centroids[starCentroidIndex] 

315 centerX = centerOfMass_numerator / centerOfMass_denominator 

316 

317 return starCentroid, centerX 

318 

319 def getAdvancedFluxes(self, exp): 

320 """Measure the fluxes within the expected four spectral bandpasses. 

321 

322 Parameters 

323 ---------- 

324 exp : `lsst.afw.image.Exposure` 

325 Single amp cutout to consider. 

326 

327 Returns 

328 ------- 

329 row : `dict` 

330 Addendum to the output row. 

331 """ 

332 # This method removes the plots that were in the original. 

333 row = {} 

334 

335 fpSet = self.findObjects(exp) 

336 if (fpLen := len(fpSet.getFootprints())) < 4: 

337 if fpLen == 0: 

338 return self._returnFailure(self.FLAG_NO_FOOTPRINT_FOUND) 

339 else: 

340 return self._returnFailure(self.FLAG_UNKNOWN_ERROR) 

341 

342 centroids = [] 

343 bboxes = [] 

344 for fp in fpSet.getFootprints(): 

345 centroid = fp.getCentroid() 

346 ampBBox = exp.getBBox() 

347 # TODO: DM-52259 This should be configurable 

348 if centroid[1] < ampBBox.getMaxY() - 100: 

349 centroids.append(centroid) 

350 bboxes.append(fp.getBBox()) 

351 if len(bboxes) < 4: 

352 return self._returnFailure(self.FLAG_UNKNOWN_ERROR) 

353 

354 center_guess_x = np.array(centroids).T[0] - exp.image.getX0() 

355 left = np.zeros(4) 

356 right = np.zeros(4) 

357 for idx in range(4): 

358 left[idx] = bboxes[idx].minY - exp.image.getY0() 

359 right[idx] = bboxes[idx].maxY - exp.image.getY0() 

360 

361 bin_edges = [int(left[0] - 2*self.config.yWindow), 

362 int(left[1] + right[0]) // 2, 

363 int(left[2] + right[1]) // 2, 

364 int(left[3] + right[2]) // 2, 

365 int(right[3] + self.config.yWindow)] 

366 

367 # now find midpoint in y, estimate midpoint for x 

368 x_centers = np.zeros(4, dtype=int) 

369 x_min = np.zeros(4, dtype=int) 

370 x_max = np.zeros(4, dtype=int) 

371 

372 alt_centroids = [] 

373 fwhms = np.zeros(4, dtype=int) 

374 fit_parameters = [] 

375 

376 for idx in range(4): 

377 mid_y = int((bin_edges[idx + 1] - bin_edges[idx])/2. + bin_edges[idx]) 

378 # TODO: DM-52259 This should be configurable 

379 x_data = np.median(exp.image.array[mid_y - 5:mid_y + 5, :], axis=0) 

380 alt_x_vals = np.arange(len(x_data)) 

381 

382 x_norm = (x_data - x_data.min())/(x_data.max() - x_data.min()) 

383 theta, covariance = curve_fit(self._d_gaussian, 

384 alt_x_vals, 

385 x_norm, 

386 p0=[0.5, center_guess_x[idx], 3, 20], 

387 bounds=(0, [1, 400, 25, 100]), 

388 max_nfev=10_000) 

389 fit_parameters.append(theta) 

390 x_centers[idx] = int(theta[1]) 

391 alt_centroids.append([x_centers[idx], mid_y]) 

392 if self.config.xWindow == 0: 

393 scale = 2*np.sqrt(2*np.log(2)) 

394 fwhm_1 = int(scale * np.abs(theta[2])) 

395 fwhm_2 = int(scale * np.abs(theta[3])) 

396 fwhm = int(theta[0]*fwhm_1 + 2*(1-theta[0])*fwhm_2) 

397 

398 fwhms[idx] = 2*int(self.config.xGauge*fwhm) + 20 

399 

400 if self.config.xWindow != 0: 

401 fwhms = self.config.xWindow*np.ones(4, dtype=int) 

402 max_width = np.max(fwhms) 

403 

404 x_min = (x_centers - int(max_width/2)) 

405 x_max = (x_centers + int(max_width/2)) 

406 

407 # Use center to get both data and background boxes: 

408 flag = self.FLAG_SUCCESS 

409 flux = [] 

410 background = [] 

411 percentiles = [] 

412 counts_above_threshold = [] 

413 ranks = np.arange(60, 101) 

414 

415 for idx in range(4): 

416 # Background first: 

417 # TODO: DM-52259 This should be configurable 

418 left = exp.image.array[bin_edges[idx]:bin_edges[idx + 1], 

419 x_min[idx] - 10:x_min[idx]] 

420 right = exp.image.array[bin_edges[idx]:bin_edges[idx + 1], 

421 x_max[idx]: x_max[idx] + 10] 

422 background_box = np.concatenate((left, right), axis=1) 

423 _, background_vec, _ = sigma_clipped_stats(background_box, axis=1) 

424 

425 # Define aperture box: 

426 signal_box = exp.image.array[bin_edges[idx]:bin_edges[idx + 1], 

427 x_min[idx]:x_max[idx]] 

428 corrected = signal_box - background_vec[:, np.newaxis] 

429 

430 background.append(np.mean(background_vec)) 

431 flux.append(np.sum(corrected)) 

432 counts_above_threshold.append(np.sum(corrected > self.config.threshold)) 

433 

434 if bin_edges[0] <= 0 or signal_box.size == 0: 

435 percentiles.append(np.zeros_like(ranks)) 

436 flag = self.FLAG_UNKNOWN_ERROR | self.FLAG_NO_FOOTPRINT_FOUND 

437 else: 

438 percentiles.append(np.percentile(corrected, ranks)) 

439 

440 row['flux'] = flux 

441 row['percentiles'] = percentiles 

442 row['centroids'] = alt_centroids 

443 row['background'] = np.mean(np.array(background)) # This is mean of means of bkg_vec. 

444 row['flag'] = flag 

445 row['fwhm'] = fwhms 

446 row['counts_above_threshold'] = counts_above_threshold 

447 

448 return row 

449 

450 @staticmethod 

451 def _d_gaussian(x, a, mean, sigma_1, sigma_2): 

452 """Double Gaussian function. 

453 

454 Parameters 

455 ---------- 

456 x : `float` or `np.array` 

457 Position to evalueate the double Gaussian. 

458 a : `float` 

459 Amplitude of the first component. The second component is 1-a. 

460 mean : `mean` 

461 Mean for both components of the double Gaussian. 

462 sigma_1 : `float` 

463 Standard deviation for the first component. 

464 sigma_2 : `float` 

465 Standard deviation for the second component. 

466 

467 Returns 

468 ------- 

469 value : `float` or `np.array` 

470 The double Gaussian value. 

471 """ 

472 return a*np.exp(-(x-mean)**2/(2*sigma_1**2)) + (1-a)*np.exp(-(x-mean)**2/(2*sigma_2**2)) 

473 

474 

475class QuadNotchMergeConnections(pipeBase.PipelineTaskConnections, 

476 dimensions=("instrument", "detector")): 

477 inputData = cT.Input( 

478 name="quadNotchSingle", 

479 doc="Quad-notch measurements from individual exposures.", 

480 storageClass="ArrowAstropy", 

481 dimensions=("instrument", "exposure", "detector"), 

482 multiple=True, 

483 deferLoad=False, 

484 ) 

485 outputData = cT.Output( 

486 name="quadNotchCombined", 

487 doc="Output combined quad-notch analysis.", 

488 storageClass="ArrowAstropy", 

489 dimensions=("instrument", "detector"), 

490 ) 

491 

492 

493class QuadNotchMergeConfig(pipeBase.PipelineTaskConfig, 

494 pipelineConnections=QuadNotchMergeConnections): 

495 """Configuration for quad-notch processing.""" 

496 nSigma = pexConfig.Field( 

497 dtype=float, 

498 default=2.0, 

499 doc="This is a dummy parameter that isn't used, but I didn't want no config here.", 

500 ) 

501 

502 

503class QuadNotchMergeTask(pipeBase.PipelineTask): 

504 """Task to measure quad-notch data.""" 

505 

506 ConfigClass = QuadNotchMergeConfig 

507 _DefaultName = "quadNotchMerge" 

508 

509 def __init__(self, **kwargs): 

510 super().__init__(**kwargs) 

511 

512 def run(self, inputData, camera): 

513 """ 

514 """ 

515 rows = [] 

516 for dataset in inputData: 

517 rows.append(dataset[0]) 

518 

519 outputTable = Table(rows) 

520 return pipeBase.Struct( 

521 outputData=outputTable, 

522 )