Coverage for python / lsst / analysis / tools / tasks / wholeTractImageAnalysis.py: 33%

84 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-15 00:23 +0000

1# This file is part of analysis_tools. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22__all__ = ( 

23 "WholeTractImageAnalysisConfig", 

24 "WholeTractImageAnalysisTask", 

25 "MakeBinnedCoaddConfig", 

26 "MakeBinnedCoaddTask", 

27) 

28 

29from typing import Any, Mapping 

30 

31import lsst.pipe.base as pipeBase 

32from lsst.daf.butler import DataCoordinate 

33from lsst.ip.isr.binImageDataTask import binImageData 

34from lsst.pex.config import Field 

35from lsst.pipe.base import ( 

36 InputQuantizedConnection, 

37 OutputQuantizedConnection, 

38 PipelineTask, 

39 PipelineTaskConfig, 

40 PipelineTaskConnections, 

41 QuantumContext, 

42) 

43from lsst.pipe.base import connectionTypes as ct 

44from lsst.skymap import BaseSkyMap 

45 

46from ..interfaces import AnalysisBaseConfig, AnalysisBaseConnections, AnalysisPipelineTask 

47 

48 

49class WholeTractImageAnalysisConnections( 

50 AnalysisBaseConnections, 

51 dimensions=("skymap", "tract", "band"), 

52 defaultTemplates={ 

53 "coaddName": "deep", 

54 }, 

55): 

56 data = ct.Input( 

57 doc="Binned coadd image data to read from the butler.", 

58 name="{coaddName}Coadd_calexp_bin", 

59 storageClass="ExposureF", 

60 deferLoad=True, 

61 dimensions=( 

62 "skymap", 

63 "tract", 

64 "patch", 

65 "band", 

66 ), 

67 multiple=True, 

68 ) 

69 

70 skymap = ct.Input( 

71 doc="The skymap that covers the tract that the data is from.", 

72 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME, 

73 storageClass="SkyMap", 

74 dimensions=("skymap",), 

75 ) 

76 

77 def __init__(self, *, config=None): 

78 """Customize the storageClass for a specific instance. This enables it 

79 to be dynamically set at runtime, allowing the task to work with 

80 different types of image-like data. 

81 

82 Parameters 

83 ---------- 

84 config : `WholeTractImageAnalysisConfig` 

85 A config for `WholeTractImageAnalysisTask`. 

86 """ 

87 super().__init__(config=config) 

88 if config and config.dataStorageClass != self.data.storageClass: 

89 self.data = ct.Input( 

90 name=self.data.name, 

91 doc=self.data.doc, 

92 storageClass=config.dataStorageClass, 

93 dimensions=self.data.dimensions, 

94 deferLoad=self.data.deferLoad, 

95 multiple=self.data.multiple, 

96 ) 

97 

98 

99class WholeTractImageAnalysisConfig( 

100 AnalysisBaseConfig, pipelineConnections=WholeTractImageAnalysisConnections 

101): 

102 dataStorageClass = Field( 

103 default="ExposureF", 

104 dtype=str, 

105 doc=( 

106 "Override the storageClass of the input data. " 

107 "Must be of type `Image`, `MaskedImage` or `Exposure`, or one of their subtypes." 

108 ), 

109 ) 

110 

111 

112class WholeTractImageAnalysisTask(AnalysisPipelineTask): 

113 

114 ConfigClass = WholeTractImageAnalysisConfig 

115 _DefaultName = "wholeTractImageAnalysis" 

116 

117 def runQuantum( 

118 self, 

119 butlerQC: QuantumContext, 

120 inputRefs: InputQuantizedConnection, 

121 outputRefs: OutputQuantizedConnection, 

122 ) -> None: 

123 inputs = butlerQC.get(inputRefs) 

124 dataId = butlerQC.quantum.dataId 

125 plotInfo = self.parsePlotInfo(inputs, dataId) 

126 

127 try: 

128 inputData = inputs.pop("data") 

129 except KeyError: 

130 raise RuntimeError("'data' is a required input connection, but is not defined.") 

131 

132 keyedData = dict() 

133 if "Exposure" in self.config.dataStorageClass: 

134 inputNames = {"mask"} 

135 inputNames.update(self.collectInputNames()) 

136 for inputName in inputNames: 

137 keyedData[inputName] = dict() 

138 for handle in inputData: 

139 keyedData[inputName][handle.dataId["patch"]] = handle.get(component=inputName) 

140 elif "Image" in self.config.dataStorageClass: 

141 keyedData["image"] = dict() 

142 for handle in inputData: 

143 image = handle.get() 

144 keyedData["image"][handle.dataId["patch"]] = image 

145 else: 

146 raise TypeError("'data' must be of type Image, MaskedImage, Exposure, or one of their subtypes") 

147 

148 outputs = self.run( 

149 data=keyedData, 

150 plotInfo=plotInfo, 

151 tractId=dataId["tract"], 

152 skymap=inputs["skymap"], 

153 bands=dataId["band"], 

154 ) 

155 

156 self.putByBand(butlerQC, outputs, outputRefs) 

157 

158 def parsePlotInfo( 

159 self, inputs: Mapping[str, Any] | None, dataId: DataCoordinate | None, connectionName: str = "data" 

160 ) -> Mapping[str, str]: 

161 """Parse the inputs and dataId to get the information needed to 

162 to add to the figure. The parent class parsePlotInfo cannot be 

163 used becuase it assumes a single input dataset, as opposed to the 

164 multiple datasets used by this analysis task. 

165 

166 Parameters 

167 ---------- 

168 inputs: `dict` 

169 The inputs to the task 

170 dataCoordinate: `lsst.daf.butler.DataCoordinate` 

171 The dataId that the task is being run on. 

172 connectionName: `str`, optional 

173 Name of the input connection to use for determining table name. 

174 

175 Returns 

176 ------- 

177 plotInfo : `dict` 

178 """ 

179 

180 if inputs is None: 

181 tableName = "" 

182 run = "" 

183 else: 

184 tableName = inputs[connectionName][0].ref.datasetType.name 

185 run = inputs[connectionName][0].ref.run 

186 

187 # Initialize the plot info dictionary 

188 plotInfo = {"tableName": tableName, "run": run} 

189 

190 self._populatePlotInfoWithDataId(plotInfo, dataId) 

191 return plotInfo 

192 

193 

194class MakeBinnedCoaddConnections( 

195 PipelineTaskConnections, 

196 dimensions=("skymap", "tract", "patch", "band"), 

197 defaultTemplates={"coaddName": "deep"}, 

198): 

199 

200 coadd = ct.Input( 

201 doc="Input coadd image data to bin.", 

202 name="{coaddName}Coadd_calexp", 

203 storageClass="ExposureF", 

204 dimensions=("skymap", "tract", "patch", "band"), 

205 deferLoad=True, 

206 ) 

207 skymap = ct.Input( 

208 doc="The skymap that covers the tract that the data is from.", 

209 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME, 

210 storageClass="SkyMap", 

211 dimensions=("skymap",), 

212 ) 

213 binnedCoadd = ct.Output( 

214 doc="Binned coadd image data.", 

215 name="{coaddName}Coadd_calexp_bin", 

216 storageClass="ExposureF", 

217 dimensions=("skymap", "tract", "patch", "band"), 

218 ) 

219 

220 def __init__(self, *, config=None): 

221 """Customize the storageClass for a specific instance. 

222 This enables it to be dynamically set at runtime, allowing 

223 the task to work with different types of image-like data. 

224 

225 Parameters 

226 ---------- 

227 config : `MakeBinnedCoaddConfig` 

228 A config for `MakeBinnedCoaddTask`. 

229 """ 

230 super().__init__(config=config) 

231 if config and config.coaddStorageClass != self.coadd.storageClass: 

232 self.coadd = ct.Input( 

233 name=self.coadd.name, 

234 doc=self.coadd.doc, 

235 storageClass=config.coaddStorageClass, 

236 dimensions=self.coadd.dimensions, 

237 deferLoad=self.coadd.deferLoad, 

238 ) 

239 self.binnedCoadd = ct.Output( 

240 name=self.binnedCoadd.name, 

241 doc=self.binnedCoadd.doc, 

242 storageClass=config.coaddStorageClass, 

243 dimensions=self.binnedCoadd.dimensions, 

244 ) 

245 

246 

247class MakeBinnedCoaddConfig(PipelineTaskConfig, pipelineConnections=MakeBinnedCoaddConnections): 

248 """Config for MakeBinnedCoaddTask""" 

249 

250 doBinInnerBBox = Field[bool]( 

251 doc=( 

252 "Retrieve and bin the coadd image data within the patch Inner Bounding Box, ", 

253 "thereby excluding the regions that overlap neighboring patches.", 

254 ), 

255 default=False, 

256 ) 

257 binFactor = Field[int]( 

258 doc="Binning factor applied to both spatial dimensions.", 

259 default=8, 

260 check=lambda x: x > 1, 

261 ) 

262 coaddStorageClass = Field( 

263 default="ExposureF", 

264 dtype=str, 

265 doc=( 

266 "Override the storageClass of the input and binned coadd image data. " 

267 "Must be of type `Image`, `MaskedImage`, or `Exposure`, or one of their subtypes." 

268 ), 

269 ) 

270 

271 

272class MakeBinnedCoaddTask(PipelineTask): 

273 

274 ConfigClass = MakeBinnedCoaddConfig 

275 _DefaultName = "makeBinnedCoadd" 

276 

277 def runQuantum( 

278 self, 

279 butlerQC: QuantumContext, 

280 inputRefs: InputQuantizedConnection, 

281 outputRefs: OutputQuantizedConnection, 

282 ) -> None: 

283 """Takes coadd image data and bins it by the factor specified in 

284 self.config.binFactor. This task uses the binImageData function 

285 defined in ip_isr, but adds the option to only retrieve and bin the 

286 data contained within the patch's inner bounding box. 

287 

288 Parameters 

289 ---------- 

290 butlerQC : `lsst.pipe.base.QuantumContext` 

291 A butler which is specialized to operate in the context of a 

292 `lsst.daf.butler.Quantum`. 

293 inputRefs : `lsst.pipe.base.InputQuantizedConnection` 

294 Data structure containing named attributes 'coadd' and 'skymap'. 

295 The values of these attributes are the corresponding 

296 `lsst.daf.butler.DatasetRef` objects defined in the corresponding 

297 `PipelineTaskConnections` class. 

298 outputRefs : `lsst.pipe.base.OutputQuantizedConnection` 

299 Datastructure containing named attribute 'binnedCoadd'. 

300 The value of this attribute is the corresponding 

301 `lsst.daf.butler.DatasetRef` object defined in the corresponding 

302 `PipelineTaskConnections` class. 

303 """ 

304 

305 inputs = butlerQC.get(inputRefs) 

306 coaddRef = inputs["coadd"] 

307 

308 if self.config.doBinInnerBBox: 

309 skymap = inputs["skymap"] 

310 tractId = butlerQC.quantum.dataId["tract"] 

311 patchId = butlerQC.quantum.dataId["patch"] 

312 tractInfo = skymap.generateTract(tractId) 

313 bbox = tractInfo.getPatchInfo(patchId).getInnerBBox() 

314 

315 coadd = coaddRef.get(parameters={"bbox": bbox}) 

316 else: 

317 coadd = coaddRef.get() 

318 

319 binnedCoadd = binImageData(coadd, self.config.binFactor) 

320 

321 butlerQC.put(pipeBase.Struct(binnedCoadd=binnedCoadd), outputRefs)