Coverage for python / lsst / cp / pipe / cpQuadNotch.py: 0%
231 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 09:34 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 09:34 +0000
1# This file is part of cp_pipe.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21import numpy as np
23import lsst.afw.detection as afwDetect
24import lsst.pex.config as pexConfig
25import lsst.pipe.base as pipeBase
26import lsst.pipe.base.connectionTypes as cT
28from astropy.stats import sigma_clipped_stats
29from astropy.table import Table
30from scipy.optimize import curve_fit
33__all__ = ["QuadNotchExtractConfig", "QuadNotchExtractTask",
34 "QuadNotchMergeConfig", "QuadNotchMergeTask"]
37class QuadNotchExtractConnections(pipeBase.PipelineTaskConnections,
38 dimensions=("instrument", "exposure", "detector")):
39 inputExp = cT.Input(
40 name="post_isr_quadnotch",
41 doc="Input ISR-processed exposures to combine.",
42 storageClass="Exposure",
43 dimensions=("instrument", "exposure", "detector"),
44 multiple=False,
45 deferLoad=False,
46 )
47 camera = cT.PrerequisiteInput(
48 name="camera",
49 storageClass="Camera",
50 doc="Input camera to construct complete exposures.",
51 dimensions=["instrument"],
52 isCalibration=True,
53 )
55 outputData = cT.Output(
56 name="quadNotchSingle",
57 doc="Output quad-notch analysis.",
58 storageClass="ArrowAstropy",
59 dimensions=("instrument", "exposure", "detector"),
60 )
63class QuadNotchExtractConfig(pipeBase.PipelineTaskConfig,
64 pipelineConnections=QuadNotchExtractConnections):
65 """Configuration for quad-notch processing."""
66 nSigma = pexConfig.Field(
67 dtype=float,
68 default=2.0,
69 doc="Significance of detected objects.",
70 )
71 nPixMin = pexConfig.Field(
72 dtype=int,
73 default=5,
74 doc="Minimum area for a detected object.",
75 )
76 detectionType = pexConfig.ChoiceField(
77 dtype=str,
78 doc="Which kind of threshold to use for detection.",
79 default="VALUE",
80 allowed={
81 "STDEV": "afwDetect.STDEV",
82 "VALUE": "afwDetect.VALUE, using manual STDEV calculation.",
83 },
84 )
85 grow = pexConfig.Field(
86 dtype=int,
87 default=0,
88 doc="If non-zero, grow footprints by this amount.",
89 )
90 xWindow = pexConfig.Field(
91 dtype=int,
92 default=0,
93 doc="Window around the central object detected that we will "
94 "look for the quad fluxes in.",
95 )
96 yWindow = pexConfig.Field(
97 dtype=int,
98 default=50,
99 doc="Additional widening along the y-axis."
100 )
101 xGauge = pexConfig.Field(
102 dtype=float,
103 default=1.75,
104 doc="Scale multiplier of the FWHM in the x-direction for each band.",
105 )
106 threshold = pexConfig.Field(
107 dtype=float,
108 default=1.2e5,
109 doc="Threshold above which fluxes are measured.",
110 )
111 targetReplacements = pexConfig.DictField(
112 keytype=str,
113 itemtype=str,
114 doc="Dictionary of target names with replacements; Any not specified will not be changed.",
115 default={
116 "MU-COL": "HD38666"
117 }
118 )
121class QuadNotchExtractTask(pipeBase.PipelineTask):
122 """Task to measure quad-notch data."""
124 ConfigClass = QuadNotchExtractConfig
125 _DefaultName = "quadNotchExtract"
127 FLAG_SUCCESS = 0x0
128 FLAG_UNKNOWN_ERROR = 0x1
129 FLAG_NO_FOOTPRINT_FOUND = 0x2
131 def __init__(self, **kwargs):
132 super().__init__(**kwargs)
134 def runQuantum(self, butlerQC, inputRefs, outputRefs):
135 inputs = butlerQC.get(inputRefs)
137 inputs["inputDims"] = dict(inputRefs.inputExp.dataId.required)
139 outputs = self.run(**inputs)
140 butlerQC.put(outputs, outputRefs)
142 def run(self, inputExp, camera, inputDims):
143 """Quadnotch extraction task.
145 Parameters
146 ----------
147 inputExp : `lsst.afw.image.Exposure`
148 Input exposure to do analysis on.
149 camera : `lsst.afw.cameraGeom.Camera`
150 Camera geometry to use.
151 inputDims: `dict`
152 Dictionary of input dimensions.
154 Returns
155 -------
156 outputData : `astropy.table.Table`
157 Extracted data for this exposure.
158 """
159 row = {}
160 detector = inputExp.getDetector()
162 # Get visitInfo based values:
163 row['airmass'] = inputExp.visitInfo.boresightAirmass
164 row['azimuth'] = inputExp.visitInfo.boresightAzAlt[0].asDegrees()
165 row['altitude'] = inputExp.visitInfo.boresightAzAlt[1].asDegrees()
166 # Cast to isoformat so we don't try to persist an object
167 # in the output table.
168 row['date'] = inputExp.visitInfo.date.toPython().isoformat()
169 row['hourangle'] = inputExp.visitInfo.boresightHourAngle.asDegrees()
170 row['expTime'] = inputExp.visitInfo.exposureTime
171 row['target'] = inputExp.visitInfo.object
172 if row['target'] in self.config.targetReplacements.keys():
173 row['target'] = self.config.targetReplacments[row['target']]
175 # These can be gotten from the butler, but are also in the header.
176 metadata = inputExp.getMetadata()
177 row['day_obs'] = int(metadata.get("DAYOBS", 0))
178 row['exposureId'] = inputDims['exposure']
180 # This can be retrieved from consDB, but not really, because
181 # we're not supposed to access consDB in pipetasks.
182 row['mount_jitter'] = np.nan
184 # Also from visitInfo, but this is where things start to happen.
185 target_amp = 3
186 centered = False
188 match inputExp.visitInfo.observationReason:
189 case "x_offset_50":
190 target_amp = 2
191 centered = True
192 case "x_offset_-50":
193 target_amp = 4
194 centered = True
195 case "x_offset_0":
196 target_amp = 3
197 centered = True
198 case _:
199 pass
201 row['target_amp'] = target_amp
202 row['centered'] = centered
203 row['flag'] = self.FLAG_SUCCESS
205 bbox = detector[target_amp].getBBox()
206 # ampImage = inputExp[bbox] # unused error
207 # I think this gets returned on failure?
209 fpSet = self.findObjects(inputExp)
210 if len(fpSet.getFootprints()) == 0:
211 rowAddendum = self._returnFailure(self.FLAG_NO_FOOTPRINT_FOUND)
212 else:
213 starCentroid, centerX = self.getCutoutLocation(inputExp, fpSet)
215 # If we failed to find a center earlier, find one now.
216 # This does not handle the case where the commanded offset
217 # resulted in a bad exposure.
218 while centered is False:
219 if centerX < bbox.minX:
220 target_amp -= 1
221 bbox = detector[target_amp].getBBox()
222 row['target_amp'] = target_amp
223 elif centerX > bbox.maxX:
224 target_amp += 1
225 bbox = detector[target_amp].getBBox()
226 row['target_amp'] = target_amp
227 else:
228 centered = True
230 cutout = inputExp[bbox]
232 rowAddendum = self.getAdvancedFluxes(cutout)
234 # Combine base row with updates
235 row.update(rowAddendum)
236 outputTable = Table([row])
238 return pipeBase.Struct(
239 outputData=outputTable,
240 )
242 @staticmethod
243 def _returnFailure(flag):
244 row = {}
245 row['flux'] = [np.nan, np.nan, np.nan, np.nan]
246 row['percentiles'] = np.zeros((4, 41))
247 row['centroids'] = [(0, 0), (0, 0), (0, 0), (0, 0)]
248 row['background'] = np.nan
249 row['flag'] = flag
250 row['fwhm'] = [0, 0, 0, 0]
251 row['counts_above_threshold'] = [0, 0, 0, 0]
252 # This doesn't return everything it should
253 return row
255 def findObjects(self, exposure):
256 """Quick detection method.
258 Parameters
259 ----------
260 exposure : `lsst.afw.image.Exposure`
261 The exposure to detect objects in.
263 Returns
264 -------
265 footprints : `lsst.afw.detection.FootprintSet`
266 Footprints of detections
268 Notes
269 -----
270 This comes from summit_utils/utils.py/detectObjectsInExp.
271 """
272 exposureCopy = exposure.clone()
273 median = np.nanmedian(exposureCopy.image.array)
274 exposureCopy.image -= median
276 if self.config.detectionType == "VALUE":
277 stdev = np.nanstd(exposureCopy.image.array)
278 threshold = afwDetect.Threshold(self.config.nSigma * stdev,
279 afwDetect.Threshold.VALUE)
280 else:
281 threshold = afwDetect.Threshold(self.config.nSigma,
282 afwDetect.Threshold.STDEV)
284 footPrintSet = afwDetect.FootprintSet(exposureCopy.getMaskedImage(),
285 threshold,
286 "DETECTED",
287 self.config.nPixMin)
288 if self.config.grow > 0:
289 isotropic = True
290 footPrintSet = afwDetect.FootprintSet(footPrintSet, self.config.grow, isotropic)
292 return footPrintSet
294 def getCutoutLocation(self, inputExp, fpSet):
295 """Search footprint list for the brightest object.
297 Parameters
298 ----------
299 inputExp : `lsst.afw.image.Exposure`
300 Exposure to study.
301 fpSet : `lsst.afw.detection.FootprintSet`
302 List of detected footprints.
304 Returns
305 -------
306 centroid : `lsst.geom.Point2D`
307 The centroid of the brightest object.
308 centerX : `float`
309 X-axis coordinate of the center of the object.
310 """
311 median = np.nanmedian(inputExp.image.array)
312 centerOfMass_numerator = 0
313 centerOfMass_denominator = 0
315 fluxes = []
316 centroids = []
317 for fp in fpSet.getFootprints():
318 if fp.getArea() < self.config.nPixMin:
319 continue
320 centroid = fp.getCentroid()
321 height = fp.getBBox().height
322 flux = fp.getSpans().flatten(inputExp.image.array - median).sum()
323 centerOfMass_numerator += centroid[0]*flux*height
324 centerOfMass_denominator += flux*height
326 fluxes.append(flux)
327 centroids.append(centroid)
329 # Take the largest flux centroid as the central star. This
330 # should probably have shape checks.
331 starCentroidIndex = np.array(fluxes).argmax()
332 starCentroid = centroids[starCentroidIndex]
333 centerX = centerOfMass_numerator / centerOfMass_denominator
335 return starCentroid, centerX
337 def getAdvancedFluxes(self, exp):
338 """Measure the fluxes within the expected four spectral bandpasses.
340 Parameters
341 ----------
342 exp : `lsst.afw.image.Exposure`
343 Single amp cutout to consider.
345 Returns
346 -------
347 row : `dict`
348 Addendum to the output row.
349 """
350 # This method removes the plots that were in the original.
351 row = {}
353 fpSet = self.findObjects(exp)
354 if (fpLen := len(fpSet.getFootprints())) < 4:
355 if fpLen == 0:
356 return self._returnFailure(self.FLAG_NO_FOOTPRINT_FOUND)
357 else:
358 return self._returnFailure(self.FLAG_UNKNOWN_ERROR)
360 centroids = []
361 bboxes = []
362 for fp in fpSet.getFootprints():
363 centroid = fp.getCentroid()
364 ampBBox = exp.getBBox()
365 # TODO: DM-52259 This should be configurable
366 if centroid[1] < ampBBox.getMaxY() - 100:
367 centroids.append(centroid)
368 bboxes.append(fp.getBBox())
369 if len(bboxes) < 4:
370 return self._returnFailure(self.FLAG_UNKNOWN_ERROR)
372 center_guess_x = np.array(centroids).T[0] - exp.image.getX0()
373 left = np.zeros(4)
374 right = np.zeros(4)
375 for idx in range(4):
376 left[idx] = bboxes[idx].minY - exp.image.getY0()
377 right[idx] = bboxes[idx].maxY - exp.image.getY0()
379 bin_edges = [int(left[0] - 2*self.config.yWindow),
380 int(left[1] + right[0]) // 2,
381 int(left[2] + right[1]) // 2,
382 int(left[3] + right[2]) // 2,
383 int(right[3] + self.config.yWindow)]
385 # now find midpoint in y, estimate midpoint for x
386 x_centers = np.zeros(4, dtype=int)
387 x_min = np.zeros(4, dtype=int)
388 x_max = np.zeros(4, dtype=int)
390 alt_centroids = []
391 fwhms = np.zeros(4, dtype=int)
392 fit_parameters = []
394 for idx in range(4):
395 mid_y = int((bin_edges[idx + 1] - bin_edges[idx])/2. + bin_edges[idx])
396 # TODO: DM-52259 This should be configurable
397 x_data = np.median(exp.image.array[mid_y - 5:mid_y + 5, :], axis=0)
398 alt_x_vals = np.arange(len(x_data))
400 x_norm = (x_data - x_data.min())/(x_data.max() - x_data.min())
402 theta, covariance = curve_fit(self._d_gaussian,
403 alt_x_vals,
404 x_norm,
405 p0=[0.5, center_guess_x[idx], 3, 20],
406 bounds=(0, [1, 400, 25, 100]),
407 max_nfev=10_000)
408 fit_parameters.append(theta)
409 x_centers[idx] = int(theta[1])
410 alt_centroids.append([x_centers[idx], mid_y])
411 if self.config.xWindow == 0:
412 scale = 2*np.sqrt(2*np.log(2))
413 fwhm_1 = int(scale * np.abs(theta[2]))
414 fwhm_2 = int(scale * np.abs(theta[3]))
415 fwhm = int(theta[0]*fwhm_1 + 2*(1-theta[0])*fwhm_2)
417 fwhms[idx] = 2*int(self.config.xGauge*fwhm) + 20
419 if self.config.xWindow != 0:
420 fwhms = self.config.xWindow*np.ones(4, dtype=int)
421 max_width = np.max(fwhms)
423 x_min = (x_centers - int(max_width/2))
424 x_max = (x_centers + int(max_width/2))
426 # Use center to get both data and background boxes:
427 flag = self.FLAG_SUCCESS
428 flux = []
429 background = []
430 percentiles = []
431 counts_above_threshold = []
432 ranks = np.arange(60, 101)
434 for idx in range(4):
435 # Background first:
436 # TODO: DM-52259 This should be configurable
437 left = exp.image.array[bin_edges[idx]:bin_edges[idx + 1],
438 x_min[idx] - 10:x_min[idx]]
439 right = exp.image.array[bin_edges[idx]:bin_edges[idx + 1],
440 x_max[idx]: x_max[idx] + 10]
441 background_box = np.concatenate((left, right), axis=1)
442 _, background_vec, _ = sigma_clipped_stats(background_box, axis=1)
444 # Define aperture box:
445 signal_box = exp.image.array[bin_edges[idx]:bin_edges[idx + 1],
446 x_min[idx]:x_max[idx]]
447 corrected = signal_box - background_vec[:, np.newaxis]
449 background.append(np.mean(background_vec))
450 flux.append(np.sum(corrected))
451 counts_above_threshold.append(np.sum(corrected > self.config.threshold))
453 if bin_edges[0] <= 0 or signal_box.size == 0:
454 percentiles.append(np.zeros_like(ranks))
455 flag = self.FLAG_UNKNOWN_ERROR | self.FLAG_NO_FOOTPRINT_FOUND
456 else:
457 percentiles.append(np.percentile(corrected, ranks))
459 row['flux'] = flux
460 row['percentiles'] = percentiles
461 row['centroids'] = alt_centroids
462 row['background'] = np.mean(np.array(background)) # This is mean of means of bkg_vec.
463 row['flag'] = flag
464 row['fwhm'] = fwhms
465 row['counts_above_threshold'] = counts_above_threshold
467 return row
469 @staticmethod
470 def _d_gaussian(x, a, mean, sigma_1, sigma_2):
471 """Double Gaussian function.
473 Parameters
474 ----------
475 x : `float` or `np.array`
476 Position to evalueate the double Gaussian.
477 a : `float`
478 Amplitude of the first component. The second component is 1-a.
479 mean : `mean`
480 Mean for both components of the double Gaussian.
481 sigma_1 : `float`
482 Standard deviation for the first component.
483 sigma_2 : `float`
484 Standard deviation for the second component.
486 Returns
487 -------
488 value : `float` or `np.array`
489 The double Gaussian value.
490 """
491 return a*np.exp(-(x-mean)**2/(2*sigma_1**2)) + (1-a)*np.exp(-(x-mean)**2/(2*sigma_2**2))
494class QuadNotchMergeConnections(pipeBase.PipelineTaskConnections,
495 dimensions=("instrument", "detector")):
496 inputData = cT.Input(
497 name="quadNotchSingle",
498 doc="Quad-notch measurements from individual exposures.",
499 storageClass="ArrowAstropy",
500 dimensions=("instrument", "exposure", "detector"),
501 multiple=True,
502 deferLoad=True,
503 )
504 outputData = cT.Output(
505 name="quadNotchCombined",
506 doc="Output combined quad-notch analysis.",
507 storageClass="ArrowAstropy",
508 dimensions=("instrument", "detector"),
509 )
512class QuadNotchMergeConfig(pipeBase.PipelineTaskConfig,
513 pipelineConnections=QuadNotchMergeConnections):
514 """Configuration for quad-notch processing."""
515 nSigma = pexConfig.Field(
516 dtype=float,
517 default=2.0,
518 doc="This is a dummy parameter that isn't used, but I didn't want no config here.",
519 )
522class QuadNotchMergeTask(pipeBase.PipelineTask):
523 """Task to measure quad-notch data."""
525 ConfigClass = QuadNotchMergeConfig
526 _DefaultName = "quadNotchMerge"
528 def __init__(self, **kwargs):
529 super().__init__(**kwargs)
531 def run(self, inputData):
532 """
533 """
534 rows = []
536 for dataset in inputData:
537 data = dataset.get()
538 rows.append(data[0])
540 outputTable = Table(rows)
541 return pipeBase.Struct(
542 outputData=outputTable,
543 )