Coverage for python / lsst / obs / hsc / calibs.py: 0%
79 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-24 08:44 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-24 08:44 +0000
1import math
2import numpy as np
4from lsst.pex.config import Field, ListField, ConfigField
5from lsst.pipe.drivers.constructCalibs import CalibCombineConfig, CalibCombineTask
6from lsst.ip.isr.vignette import VignetteConfig
7import lsst.afw.image as afwImage
8import lsst.afw.cameraGeom as afwcg
9import lsst.geom as geom
12class HscFlatCombineConfig(CalibCombineConfig):
13 vignette = ConfigField(dtype=VignetteConfig, doc="Vignetting parameters in focal plane coordinates")
14 badAmpCcdList = ListField(dtype=int, default=[], doc="List of CCD serial numbers for bad amplifiers")
15 badAmpList = ListField(dtype=int, default=[], doc="List of amp serial numbers in CCD")
16 maskPlane = Field(dtype=str, default="NO_DATA", doc="Mask plane to set")
18 def validate(self):
19 CalibCombineConfig.validate(self)
20 if len(self.badAmpCcdList) != len(self.badAmpList):
21 raise RuntimeError("Length of badAmpCcdList and badAmpList don't match")
24class HscFlatCombineTask(CalibCombineTask):
25 """Mask the vignetted area"""
26 ConfigClass = HscFlatCombineConfig
28 def run(self, sensorRefList, *args, **kwargs):
29 """Mask vignetted pixels after combining
31 This returns an Exposure instead of an Image, but upstream shouldn't
32 care, as it just dumps it out via the Butler.
33 """
34 combined = super(HscFlatCombineTask, self).run(sensorRefList, *args, **kwargs)
35 mi = afwImage.makeMaskedImage(combined.getImage())
36 mi.getMask().set(0)
38 # Retrieve the detector
39 # XXX It's unfortunate that we have to read an entire image to get the
40 # detector, but there's no public API in the butler to get the same.
41 image = sensorRefList[0].get("postISRCCD")
42 detector = image.getDetector()
43 del image
45 self.maskVignetting(mi.getMask(), detector)
46 self.maskBadAmps(mi.getMask(), detector)
48 return afwImage.makeExposure(mi)
50 def maskVignetting(self, mask, detector):
51 """Mask vignetted pixels in-place
53 @param mask: Mask image to modify
54 @param detector: Detector for transformation to focal plane coords
55 """
56 mask.addMaskPlane(self.config.maskPlane)
57 bitMask = mask.getPlaneBitMask(self.config.maskPlane)
58 bbox = mask.getBBox(afwImage.LOCAL)
59 w, h = bbox.getDimensions()
60 numCorners = 0 # Number of corners outside radius
61 pixelsToFocalPlane = detector.getTransform(afwcg.PIXELS, afwcg.FOCAL_PLANE)
62 pixelCorners = [geom.Point2D(pos) for pos in bbox.getCorners()]
63 fpCorners = pixelsToFocalPlane.applyForward(pixelCorners)
64 fpCenter = geom.Point2D(self.config.vignette.xCenter, self.config.vignette.yCenter)
65 numCorners = sum(math.hypot(*(fpPos - fpCenter)) > self.config.vignette.radius for fpPos in fpCorners)
66 if numCorners == 0:
67 # Nothing to be masked
68 self.log.info("Detector %d is unvignetted", detector.getId())
69 return
70 if numCorners == 4:
71 # Everything to be masked
72 # We ignore the question of how we're getting any data from a CCD
73 # that's totally vignetted...
74 self.log.warning("Detector %d is completely vignetted", detector.getId())
75 mask |= bitMask
76 return
77 # We have to go pixel by pixel
78 numPixels = bbox.getArea()
79 xx, yy = np.meshgrid(np.arange(0, w, dtype=int), np.arange(0, h, dtype=int))
80 xyDetector = [geom.Point2D(x, y) for x, y in zip(xx.reshape(numPixels), yy.reshape(numPixels))]
81 xyFocalPlane = pixelsToFocalPlane.applyForward(xyDetector)
82 origin = geom.Point2D(self.config.vignette.xCenter, self.config.vignette.yCenter)
83 r2 = np.array([pp.distanceSquared(origin) for pp in xyFocalPlane])
84 isBad = (r2 > self.config.vignette.radius**2).reshape((h, w))
85 self.log.info("Detector %d has %f%% pixels vignetted",
86 detector.getId(), isBad.sum()/float(isBad.size)*100.0)
87 maskArray = mask.getArray()
88 maskArray[yy[isBad], xx[isBad]] |= bitMask
90 def maskBadAmps(self, mask, detector):
91 """Mask bad amplifiers
93 @param mask: Mask image to modify
94 @param detector: Detector for amp locations
95 """
96 mask.addMaskPlane(self.config.maskPlane)
97 bitMask = mask.getPlaneBitMask(self.config.maskPlane)
98 if detector is None:
99 raise RuntimeError("Detector isn't a Ccd")
100 ccdSerial = detector.getId()
101 if ccdSerial not in self.config.badAmpCcdList:
102 return
103 for ccdNum, ampNum in zip(self.config.badAmpCcdList, self.config.badAmpList):
104 if ccdNum != ccdSerial:
105 continue
106 # Mask this amp
107 found = False
108 for amp in detector:
109 if amp.getId() != ampNum:
110 continue
111 found = True
112 box = amp.getDataSec()
113 subMask = mask.Factory(mask, box)
114 subMask |= bitMask
115 break
116 if not found:
117 self.log.warning("Unable to find amp=%s in ccd=%s", ampNum, ccdNum)