Coverage for python / lsst / cp / pipe / cpQuadNotch.py: 0%
224 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 18:52 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 18:52 +0000
1# This file is part of cp_pipe.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21import numpy as np
23import lsst.afw.detection as afwDetect
24import lsst.pex.config as pexConfig
25import lsst.pipe.base as pipeBase
26import lsst.pipe.base.connectionTypes as cT
28from astropy.stats import sigma_clipped_stats
29from astropy.table import Table
30from scipy.optimize import curve_fit
33__all__ = ["QuadNotchExtractConfig", "QuadNotchExtractTask",
34 "QuadNotchMergeConfig", "QuadNotchMergeTask"]
37class QuadNotchExtractConnections(pipeBase.PipelineTaskConnections,
38 dimensions=("instrument", "exposure", "detector")):
39 inputExp = cT.Input(
40 name="post_isr_quadnotch",
41 doc="Input ISR-processed exposures to combine.",
42 storageClass="Exposure",
43 dimensions=("instrument", "exposure", "detector"),
44 multiple=False,
45 deferLoad=False,
46 )
47 camera = cT.PrerequisiteInput(
48 name="camera",
49 storageClass="Camera",
50 doc="Input camera to construct complete exposures.",
51 dimensions=["instrument"],
52 isCalibration=True,
53 )
55 outputData = cT.Output(
56 name="quadNotchSingle",
57 doc="Output quad-notch analysis.",
58 storageClass="ArrowAstropy",
59 dimensions=("instrument", "exposure", "detector"),
60 )
63class QuadNotchExtractConfig(pipeBase.PipelineTaskConfig,
64 pipelineConnections=QuadNotchExtractConnections):
65 """Configuration for quad-notch processing."""
66 nSigma = pexConfig.Field(
67 dtype=float,
68 default=2.0,
69 doc="Significance of detected objects.",
70 )
71 nPixMin = pexConfig.Field(
72 dtype=int,
73 default=5,
74 doc="Minimum area for a detected object.",
75 )
76 grow = pexConfig.Field(
77 dtype=int,
78 default=0,
79 doc="If non-zero, grow footprints by this amount.",
80 )
81 xWindow = pexConfig.Field(
82 dtype=int,
83 default=0,
84 doc="Window around the central object detected that we will "
85 "look for the quad fluxes in.",
86 )
87 yWindow = pexConfig.Field(
88 dtype=int,
89 default=50,
90 doc="Additional widening along the y-axis."
91 )
92 xGauge = pexConfig.Field(
93 dtype=float,
94 default=1.75,
95 doc="Scale multiplier of the FWHM in the x-direction for each band.",
96 )
97 threshold = pexConfig.Field(
98 dtype=float,
99 default=1.2e5,
100 doc="Threshold above which fluxes are measured.",
101 )
102 targetReplacements = pexConfig.DictField(
103 keytype=str,
104 itemtype=str,
105 doc="Dictionary of target names with replacements; Any not specified will not be changed.",
106 default={
107 "MU-COL": "HD38666"
108 }
109 )
112class QuadNotchExtractTask(pipeBase.PipelineTask):
113 """Task to measure quad-notch data."""
115 ConfigClass = QuadNotchExtractConfig
116 _DefaultName = "quadNotchExtract"
118 FLAG_SUCCESS = 0x0
119 FLAG_UNKNOWN_ERROR = 0x1
120 FLAG_NO_FOOTPRINT_FOUND = 0x2
122 def __init__(self, **kwargs):
123 super().__init__(**kwargs)
125 def runQuantum(self, butlerQC, inputRefs, outputRefs):
126 inputs = butlerQC.get(inputRefs)
128 inputs["inputDims"] = dict(inputRefs.inputExp.dataId.required)
130 outputs = self.run(**inputs)
131 butlerQC.put(outputs, outputRefs)
133 def run(self, inputExp, camera, inputDims):
134 """Quadnotch extraction task.
136 Parameters
137 ----------
138 inputExp : `lsst.afw.image.Exposure`
139 Input exposure to do analysis on.
140 camera : `lsst.afw.cameraGeom.Camera`
141 Camera geometry to use.
142 inputDims: `dict`
143 Dictionary of input dimensions.
145 Returns
146 -------
147 outputData : `astropy.table.Table`
148 Extracted data for this exposure.
149 """
150 row = {}
151 detector = inputExp.getDetector()
153 # Get visitInfo based values:
154 row['airmass'] = inputExp.visitInfo.boresightAirmass
155 row['azimuth'] = inputExp.visitInfo.boresightAzAlt[0].asDegrees()
156 row['altitude'] = inputExp.visitInfo.boresightAzAlt[1].asDegrees()
157 # Cast to isoformat so we don't try to persist an object
158 # in the output table.
159 row['date'] = inputExp.visitInfo.date.toPython().isoformat()
160 row['hourangle'] = inputExp.visitInfo.boresightHourAngle.asDegrees()
161 row['expTime'] = inputExp.visitInfo.exposureTime
162 row['target'] = inputExp.visitInfo.object
163 if row['target'] in self.config.targetReplacements.keys():
164 row['target'] = self.config.targetReplacments[row['target']]
166 # These can be gotten from the butler, but are also in the header.
167 metadata = inputExp.getMetadata()
168 row['day_obs'] = int(metadata.get("DAYOBS", 0))
169 row['exposureId'] = inputDims['exposure']
171 # This can be retrieved from consDB, but not really, because
172 # we're not supposed to access consDB in pipetasks.
173 row['mount_jitter'] = np.nan
175 # Also from visitInfo, but this is where things start to happen.
176 target_amp = 3
177 centered = False
179 match inputExp.visitInfo.observationReason:
180 case "x_offset_50":
181 target_amp = 2
182 centered = True
183 case "x_offset_-50":
184 target_amp = 4
185 centered = True
186 case "x_offset_0":
187 target_amp = 3
188 centered = True
189 case _:
190 pass
192 row['target_amp'] = target_amp
193 row['centered'] = centered
194 row['flags'] = self.FLAG_SUCCESS
196 bbox = detector[target_amp].getBBox()
197 # ampImage = inputExp[bbox] # unused error
198 # I think this gets returned on failure?
200 fpSet = self.findObjects(inputExp)
201 if len(fpSet.getFootprints()) == 0:
202 rowAddendum = self._returnFailure(self.FLAG_NO_FOOTPRINT_FOUND)
203 else:
204 starCentroid, centerX = self.getCutoutLocation(inputExp, fpSet)
206 # If we failed to find a center earlier, find one now.
207 # This does not handle the case where the commanded offset
208 # resulted in a bad exposure.
209 while centered is False:
210 if centerX < bbox.minX:
211 target_amp -= 1
212 bbox = detector[target_amp].getBBox()
213 row['target_amp'] = target_amp
214 elif centerX > bbox.maxX:
215 target_amp += 1
216 bbox = detector[target_amp].getBBox()
217 row['target_amp'] = target_amp
218 else:
219 centered = True
221 cutout = inputExp[bbox]
223 rowAddendum = self.getAdvancedFluxes(cutout)
225 # Combine base row with updates
226 row.update(rowAddendum)
227 outputTable = Table([row])
229 return pipeBase.Struct(
230 outputData=outputTable,
231 )
233 @staticmethod
234 def _returnFailure(flag):
235 row = {}
236 row['flux'] = []
237 row['cdf95'] = []
238 row['centroids'] = []
239 row['background'] = []
240 row['flags'] = flag
241 # This doesn't return everything it should
242 return row
244 def findObjects(self, exposure):
245 """Quick detection method.
247 Parameters
248 ----------
249 exposure : `lsst.afw.image.Exposure`
250 The exposure to detect objects in.
252 Returns
253 -------
254 footprints : `lsst.afw.detection.FootprintSet`
255 Footprints of detections
257 Notes
258 -----
259 This comes from summit_utils/utils.py/detectObjectsInExp.
260 """
261 exposureCopy = exposure.clone()
262 median = np.nanmedian(exposureCopy.image.array)
263 exposureCopy.image -= median
265 threshold = afwDetect.Threshold(self.config.nSigma, afwDetect.Threshold.STDEV)
266 footPrintSet = afwDetect.FootprintSet(exposureCopy.getMaskedImage(),
267 threshold,
268 "DETECTED",
269 self.config.nPixMin)
270 if self.config.grow > 0:
271 isotropic = True
272 footPrintSet = afwDetect.FootprintSet(footPrintSet, self.config.grow, isotropic)
274 return footPrintSet
276 def getCutoutLocation(self, inputExp, fpSet):
277 """Search footprint list for the brightest object.
279 Parameters
280 ----------
281 inputExp : `lsst.afw.image.Exposure`
282 Exposure to study.
283 fpSet : `lsst.afw.detection.FootprintSet`
284 List of detected footprints.
286 Returns
287 -------
288 centroid : `lsst.geom.Point2D`
289 The centroid of the brightest object.
290 centerX : `float`
291 X-axis coordinate of the center of the object.
292 """
293 median = np.nanmedian(inputExp.image.array)
294 centerOfMass_numerator = 0
295 centerOfMass_denominator = 0
297 fluxes = []
298 centroids = []
299 for fp in fpSet.getFootprints():
300 if fp.getArea() < self.config.nPixMin:
301 continue
302 centroid = fp.getCentroid()
303 height = fp.getBBox().height
304 flux = fp.getSpans().flatten(inputExp.image.array - median).sum()
305 centerOfMass_numerator += centroid[0]*flux*height
306 centerOfMass_denominator += flux*height
308 fluxes.append(flux)
309 centroids.append(centroid)
311 # Take the largest flux centroid as the central star. This
312 # should probably have shape checks.
313 starCentroidIndex = np.array(fluxes).argmax()
314 starCentroid = centroids[starCentroidIndex]
315 centerX = centerOfMass_numerator / centerOfMass_denominator
317 return starCentroid, centerX
319 def getAdvancedFluxes(self, exp):
320 """Measure the fluxes within the expected four spectral bandpasses.
322 Parameters
323 ----------
324 exp : `lsst.afw.image.Exposure`
325 Single amp cutout to consider.
327 Returns
328 -------
329 row : `dict`
330 Addendum to the output row.
331 """
332 # This method removes the plots that were in the original.
333 row = {}
335 fpSet = self.findObjects(exp)
336 if (fpLen := len(fpSet.getFootprints())) < 4:
337 if fpLen == 0:
338 return self._returnFailure(self.FLAG_NO_FOOTPRINT_FOUND)
339 else:
340 return self._returnFailure(self.FLAG_UNKNOWN_ERROR)
342 centroids = []
343 bboxes = []
344 for fp in fpSet.getFootprints():
345 centroid = fp.getCentroid()
346 ampBBox = exp.getBBox()
347 # TODO: DM-52259 This should be configurable
348 if centroid[1] < ampBBox.getMaxY() - 100:
349 centroids.append(centroid)
350 bboxes.append(fp.getBBox())
351 if len(bboxes) < 4:
352 return self._returnFailure(self.FLAG_UNKNOWN_ERROR)
354 center_guess_x = np.array(centroids).T[0] - exp.image.getX0()
355 left = np.zeros(4)
356 right = np.zeros(4)
357 for idx in range(4):
358 left[idx] = bboxes[idx].minY - exp.image.getY0()
359 right[idx] = bboxes[idx].maxY - exp.image.getY0()
361 bin_edges = [int(left[0] - 2*self.config.yWindow),
362 int(left[1] + right[0]) // 2,
363 int(left[2] + right[1]) // 2,
364 int(left[3] + right[2]) // 2,
365 int(right[3] + self.config.yWindow)]
367 # now find midpoint in y, estimate midpoint for x
368 x_centers = np.zeros(4, dtype=int)
369 x_min = np.zeros(4, dtype=int)
370 x_max = np.zeros(4, dtype=int)
372 alt_centroids = []
373 fwhms = np.zeros(4, dtype=int)
374 fit_parameters = []
376 for idx in range(4):
377 mid_y = int((bin_edges[idx + 1] - bin_edges[idx])/2. + bin_edges[idx])
378 # TODO: DM-52259 This should be configurable
379 x_data = np.median(exp.image.array[mid_y - 5:mid_y + 5, :], axis=0)
380 alt_x_vals = np.arange(len(x_data))
382 x_norm = (x_data - x_data.min())/(x_data.max() - x_data.min())
383 theta, covariance = curve_fit(self._d_gaussian,
384 alt_x_vals,
385 x_norm,
386 p0=[0.5, center_guess_x[idx], 3, 20],
387 bounds=(0, [1, 400, 25, 100]),
388 max_nfev=10_000)
389 fit_parameters.append(theta)
390 x_centers[idx] = int(theta[1])
391 alt_centroids.append([x_centers[idx], mid_y])
392 if self.config.xWindow == 0:
393 scale = 2*np.sqrt(2*np.log(2))
394 fwhm_1 = int(scale * np.abs(theta[2]))
395 fwhm_2 = int(scale * np.abs(theta[3]))
396 fwhm = int(theta[0]*fwhm_1 + 2*(1-theta[0])*fwhm_2)
398 fwhms[idx] = 2*int(self.config.xGauge*fwhm) + 20
400 if self.config.xWindow != 0:
401 fwhms = self.config.xWindow*np.ones(4, dtype=int)
402 max_width = np.max(fwhms)
404 x_min = (x_centers - int(max_width/2))
405 x_max = (x_centers + int(max_width/2))
407 # Use center to get both data and background boxes:
408 flag = self.FLAG_SUCCESS
409 flux = []
410 background = []
411 percentiles = []
412 counts_above_threshold = []
413 ranks = np.arange(60, 101)
415 for idx in range(4):
416 # Background first:
417 # TODO: DM-52259 This should be configurable
418 left = exp.image.array[bin_edges[idx]:bin_edges[idx + 1],
419 x_min[idx] - 10:x_min[idx]]
420 right = exp.image.array[bin_edges[idx]:bin_edges[idx + 1],
421 x_max[idx]: x_max[idx] + 10]
422 background_box = np.concatenate((left, right), axis=1)
423 _, background_vec, _ = sigma_clipped_stats(background_box, axis=1)
425 # Define aperture box:
426 signal_box = exp.image.array[bin_edges[idx]:bin_edges[idx + 1],
427 x_min[idx]:x_max[idx]]
428 corrected = signal_box - background_vec[:, np.newaxis]
430 background.append(np.mean(background_vec))
431 flux.append(np.sum(corrected))
432 counts_above_threshold.append(np.sum(corrected > self.config.threshold))
434 if bin_edges[0] <= 0 or signal_box.size == 0:
435 percentiles.append(np.zeros_like(ranks))
436 flag = self.FLAG_UNKNOWN_ERROR | self.FLAG_NO_FOOTPRINT_FOUND
437 else:
438 percentiles.append(np.percentile(corrected, ranks))
440 row['flux'] = flux
441 row['percentiles'] = percentiles
442 row['centroids'] = alt_centroids
443 row['background'] = np.mean(np.array(background)) # This is mean of means of bkg_vec.
444 row['flag'] = flag
445 row['fwhm'] = fwhms
446 row['counts_above_threshold'] = counts_above_threshold
448 return row
450 @staticmethod
451 def _d_gaussian(x, a, mean, sigma_1, sigma_2):
452 """Double Gaussian function.
454 Parameters
455 ----------
456 x : `float` or `np.array`
457 Position to evalueate the double Gaussian.
458 a : `float`
459 Amplitude of the first component. The second component is 1-a.
460 mean : `mean`
461 Mean for both components of the double Gaussian.
462 sigma_1 : `float`
463 Standard deviation for the first component.
464 sigma_2 : `float`
465 Standard deviation for the second component.
467 Returns
468 -------
469 value : `float` or `np.array`
470 The double Gaussian value.
471 """
472 return a*np.exp(-(x-mean)**2/(2*sigma_1**2)) + (1-a)*np.exp(-(x-mean)**2/(2*sigma_2**2))
475class QuadNotchMergeConnections(pipeBase.PipelineTaskConnections,
476 dimensions=("instrument", "detector")):
477 inputData = cT.Input(
478 name="quadNotchSingle",
479 doc="Quad-notch measurements from individual exposures.",
480 storageClass="ArrowAstropy",
481 dimensions=("instrument", "exposure", "detector"),
482 multiple=True,
483 deferLoad=False,
484 )
485 outputData = cT.Output(
486 name="quadNotchCombined",
487 doc="Output combined quad-notch analysis.",
488 storageClass="ArrowAstropy",
489 dimensions=("instrument", "detector"),
490 )
493class QuadNotchMergeConfig(pipeBase.PipelineTaskConfig,
494 pipelineConnections=QuadNotchMergeConnections):
495 """Configuration for quad-notch processing."""
496 nSigma = pexConfig.Field(
497 dtype=float,
498 default=2.0,
499 doc="This is a dummy parameter that isn't used, but I didn't want no config here.",
500 )
503class QuadNotchMergeTask(pipeBase.PipelineTask):
504 """Task to measure quad-notch data."""
506 ConfigClass = QuadNotchMergeConfig
507 _DefaultName = "quadNotchMerge"
509 def __init__(self, **kwargs):
510 super().__init__(**kwargs)
512 def run(self, inputData, camera):
513 """
514 """
515 rows = []
516 for dataset in inputData:
517 rows.append(dataset[0])
519 outputTable = Table(rows)
520 return pipeBase.Struct(
521 outputData=outputTable,
522 )