Coverage for python / lsst / meas / extensions / multiprofit / utils.py: 12%

52 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-04 17:43 +0000

1# This file is part of meas_extensions_multiprofit. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22from collections import defaultdict 

23from typing import Iterable 

24 

25import lsst.afw.detection as afwDetect 

26import lsst.afw.image as afwImage 

27import lsst.geom as geom 

28import numpy as np 

29 

30 

31def defaultdictNested(): 

32 """Get a nested defaultdict with defaultdict default value. 

33 

34 Returns 

35 ------- 

36 defaultdict : `defaultdict` 

37 A `defaultdict` with `defaultdict` default values. 

38 """ 

39 return defaultdict(defaultdictNested) 

40 

41 

42def get_all_subclasses(cls, children_first: bool = True): 

43 """Return all subclasses of a class recursively. 

44 

45 Parameters 

46 ---------- 

47 children_first 

48 If true, return child (direct subclasses) first, followed by their 

49 children (recursively). Otherwise, return each direct child followed 

50 by its own descendants first. 

51 """ 

52 subclasses = {c: None for c in cls.__subclasses__()} 

53 subclasses_return = subclasses.copy() if children_first else {} 

54 for subclass in subclasses: 

55 if children_first: 

56 subclasses_return[subclass] = None 

57 for subsubclass in get_all_subclasses(subclass): 

58 subclasses_return[subsubclass] = None 

59 return list(subclasses_return) 

60 

61 

62# TODO: Allow addition to existing image 

63def get_spanned_image( 

64 exposure: afwImage.Exposure, 

65 footprint: afwDetect.Footprint = None, 

66 bbox: geom.Box2I | None = None, 

67 spans: np.ndarray | None = None, 

68 get_sig_inv: bool = False, 

69 calibrate: bool = True, 

70) -> tuple[np.ndarray, geom.Box2I, np.ndarray]: 

71 """Get an image masked by its spanset. 

72 

73 Parameters 

74 ---------- 

75 exposure 

76 An exposure to extract arrays from. 

77 footprint 

78 The footprint to get spans/bboxes from. Not needed if both of 

79 `bbox` and `spans` are provided. 

80 bbox 

81 The bounding box to subset the exposure with. 

82 Defaults to the footprint's bbox. 

83 spans 

84 A spanset array (inverse mask/selection). 

85 Defaults to the footprint's spans. 

86 get_sig_inv 

87 Whether to get the inverse variance and return its square root. 

88 calibrate 

89 Whether to calibrate the image; set to False if already calibrated. 

90 

91 Returns 

92 ------- 

93 image 

94 The image array, with masked pixels set to zero. 

95 bbox 

96 The bounding box used to subset the exposure. 

97 sig_inv 

98 The inverse sigma array, with masked pixels set to zero. 

99 Set to None if `get_sig_inv` is False. 

100 """ 

101 bbox_is_none = bbox is None 

102 if bbox_is_none: 

103 bbox = footprint.getBBox() 

104 if not (bbox.getHeight() > 0 and bbox.getWidth() > 0): 

105 return None, bbox 

106 if spans is None: 

107 spans = footprint.getSpans().asArray() 

108 sig_inv = afwImage.ImageF(bbox) if get_sig_inv else None 

109 img = afwImage.ImageF(bbox) 

110 img.array[:] = np.nan 

111 if footprint is None: 

112 maskedIm = exposure.maskedImage.subset(bbox) 

113 if not calibrate: 

114 img = maskedIm.image.array 

115 sig_inv.array[spans] = 1 / np.sqrt(maskedIm.variance.array[spans]) 

116 else: 

117 img.array[spans] = footprint.getImageArray() 

118 if get_sig_inv: 

119 # footprint.getVarianceArray() returns zeros 

120 variance = exposure.variance[bbox] 

121 if not calibrate: 

122 sig_inv.array[spans] = 1 / np.sqrt(variance.array[spans]) 

123 if calibrate: 

124 # Have to calibrate with the original image 

125 maskedIm = afwImage.MaskedImageF( 

126 image=exposure.image[bbox], 

127 variance=variance if get_sig_inv else None, 

128 ) 

129 if calibrate: 

130 maskedIm = exposure.photoCalib.calibrateImage(maskedIm) 

131 if footprint is None: 

132 img = maskedIm.image.array 

133 else: 

134 # Apply the calibration to the deblended footprint 

135 # ... hopefully it's multiplicative enough 

136 img.array[spans] *= maskedIm.image.array[spans] / exposure.image[bbox].array[spans] 

137 img = img.array 

138 if get_sig_inv: 

139 sig_inv.array[spans] = 1 / np.sqrt(maskedIm.variance.array[spans]) 

140 # Should not happen but does with footprints having nans 

141 sig_inv.array[~(sig_inv.array >= 0)] = 0 

142 

143 return np.array(img, dtype="float64"), bbox, np.array(sig_inv.array, dtype="float64") 

144 

145 

146def join_and_filter(separator: str, items: Iterable[str], exclusion: str | None = None) -> str: 

147 """Join an iterable of items by a separator, filtering out an exclusion. 

148 

149 Parameters 

150 ---------- 

151 separator 

152 The separator to join items by. 

153 items 

154 Items to join. 

155 exclusion 

156 The pattern to exclude. 

157 

158 Returns 

159 ------- 

160 joined 

161 The joined string. 

162 """ 

163 return separator.join(filter(exclusion, items))