Coverage for python / lsst / obs / hsc / calibs.py: 0%

79 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-22 08:59 +0000

1import math 

2import numpy as np 

3 

4from lsst.pex.config import Field, ListField, ConfigField 

5from lsst.pipe.drivers.constructCalibs import CalibCombineConfig, CalibCombineTask 

6from lsst.ip.isr.vignette import VignetteConfig 

7import lsst.afw.image as afwImage 

8import lsst.afw.cameraGeom as afwcg 

9import lsst.geom as geom 

10 

11 

12class HscFlatCombineConfig(CalibCombineConfig): 

13 vignette = ConfigField(dtype=VignetteConfig, doc="Vignetting parameters in focal plane coordinates") 

14 badAmpCcdList = ListField(dtype=int, default=[], doc="List of CCD serial numbers for bad amplifiers") 

15 badAmpList = ListField(dtype=int, default=[], doc="List of amp serial numbers in CCD") 

16 maskPlane = Field(dtype=str, default="NO_DATA", doc="Mask plane to set") 

17 

18 def validate(self): 

19 CalibCombineConfig.validate(self) 

20 if len(self.badAmpCcdList) != len(self.badAmpList): 

21 raise RuntimeError("Length of badAmpCcdList and badAmpList don't match") 

22 

23 

24class HscFlatCombineTask(CalibCombineTask): 

25 """Mask the vignetted area""" 

26 ConfigClass = HscFlatCombineConfig 

27 

28 def run(self, sensorRefList, *args, **kwargs): 

29 """Mask vignetted pixels after combining 

30 

31 This returns an Exposure instead of an Image, but upstream shouldn't 

32 care, as it just dumps it out via the Butler. 

33 """ 

34 combined = super(HscFlatCombineTask, self).run(sensorRefList, *args, **kwargs) 

35 mi = afwImage.makeMaskedImage(combined.getImage()) 

36 mi.getMask().set(0) 

37 

38 # Retrieve the detector 

39 # XXX It's unfortunate that we have to read an entire image to get the 

40 # detector, but there's no public API in the butler to get the same. 

41 image = sensorRefList[0].get("postISRCCD") 

42 detector = image.getDetector() 

43 del image 

44 

45 self.maskVignetting(mi.getMask(), detector) 

46 self.maskBadAmps(mi.getMask(), detector) 

47 

48 return afwImage.makeExposure(mi) 

49 

50 def maskVignetting(self, mask, detector): 

51 """Mask vignetted pixels in-place 

52 

53 @param mask: Mask image to modify 

54 @param detector: Detector for transformation to focal plane coords 

55 """ 

56 mask.addMaskPlane(self.config.maskPlane) 

57 bitMask = mask.getPlaneBitMask(self.config.maskPlane) 

58 bbox = mask.getBBox(afwImage.LOCAL) 

59 w, h = bbox.getDimensions() 

60 numCorners = 0 # Number of corners outside radius 

61 pixelsToFocalPlane = detector.getTransform(afwcg.PIXELS, afwcg.FOCAL_PLANE) 

62 pixelCorners = [geom.Point2D(pos) for pos in bbox.getCorners()] 

63 fpCorners = pixelsToFocalPlane.applyForward(pixelCorners) 

64 fpCenter = geom.Point2D(self.config.vignette.xCenter, self.config.vignette.yCenter) 

65 numCorners = sum(math.hypot(*(fpPos - fpCenter)) > self.config.vignette.radius for fpPos in fpCorners) 

66 if numCorners == 0: 

67 # Nothing to be masked 

68 self.log.info("Detector %d is unvignetted", detector.getId()) 

69 return 

70 if numCorners == 4: 

71 # Everything to be masked 

72 # We ignore the question of how we're getting any data from a CCD 

73 # that's totally vignetted... 

74 self.log.warning("Detector %d is completely vignetted", detector.getId()) 

75 mask |= bitMask 

76 return 

77 # We have to go pixel by pixel 

78 numPixels = bbox.getArea() 

79 xx, yy = np.meshgrid(np.arange(0, w, dtype=int), np.arange(0, h, dtype=int)) 

80 xyDetector = [geom.Point2D(x, y) for x, y in zip(xx.reshape(numPixels), yy.reshape(numPixels))] 

81 xyFocalPlane = pixelsToFocalPlane.applyForward(xyDetector) 

82 origin = geom.Point2D(self.config.vignette.xCenter, self.config.vignette.yCenter) 

83 r2 = np.array([pp.distanceSquared(origin) for pp in xyFocalPlane]) 

84 isBad = (r2 > self.config.vignette.radius**2).reshape((h, w)) 

85 self.log.info("Detector %d has %f%% pixels vignetted", 

86 detector.getId(), isBad.sum()/float(isBad.size)*100.0) 

87 maskArray = mask.getArray() 

88 maskArray[yy[isBad], xx[isBad]] |= bitMask 

89 

90 def maskBadAmps(self, mask, detector): 

91 """Mask bad amplifiers 

92 

93 @param mask: Mask image to modify 

94 @param detector: Detector for amp locations 

95 """ 

96 mask.addMaskPlane(self.config.maskPlane) 

97 bitMask = mask.getPlaneBitMask(self.config.maskPlane) 

98 if detector is None: 

99 raise RuntimeError("Detector isn't a Ccd") 

100 ccdSerial = detector.getId() 

101 if ccdSerial not in self.config.badAmpCcdList: 

102 return 

103 for ccdNum, ampNum in zip(self.config.badAmpCcdList, self.config.badAmpList): 

104 if ccdNum != ccdSerial: 

105 continue 

106 # Mask this amp 

107 found = False 

108 for amp in detector: 

109 if amp.getId() != ampNum: 

110 continue 

111 found = True 

112 box = amp.getDataSec() 

113 subMask = mask.Factory(mask, box) 

114 subMask |= bitMask 

115 break 

116 if not found: 

117 self.log.warning("Unable to find amp=%s in ccd=%s", ampNum, ccdNum)