Coverage for python / lsst / analysis / tools / tasks / wholeTractImageAnalysis.py: 34%

85 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-17 09:36 +0000

1# This file is part of analysis_tools. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22__all__ = ( 

23 "WholeTractImageAnalysisConfig", 

24 "WholeTractImageAnalysisTask", 

25 "MakeBinnedCoaddConfig", 

26 "MakeBinnedCoaddTask", 

27) 

28 

29from collections.abc import Mapping 

30from typing import Any 

31 

32import lsst.pipe.base as pipeBase 

33from lsst.daf.butler import DataCoordinate 

34from lsst.ip.isr.binImageDataTask import binImageData 

35from lsst.pex.config import Field 

36from lsst.pipe.base import ( 

37 InputQuantizedConnection, 

38 OutputQuantizedConnection, 

39 PipelineTask, 

40 PipelineTaskConfig, 

41 PipelineTaskConnections, 

42 QuantumContext, 

43) 

44from lsst.pipe.base import connectionTypes as ct 

45from lsst.skymap import BaseSkyMap 

46 

47from ..interfaces import AnalysisBaseConfig, AnalysisBaseConnections, AnalysisPipelineTask 

48 

49 

50class WholeTractImageAnalysisConnections( 

51 AnalysisBaseConnections, 

52 dimensions=("skymap", "tract", "band"), 

53 defaultTemplates={ 

54 "coaddName": "deep", 

55 }, 

56): 

57 data = ct.Input( 

58 doc="Binned coadd image data to read from the butler.", 

59 name="{coaddName}Coadd_calexp_bin", 

60 storageClass="ExposureF", 

61 deferLoad=True, 

62 dimensions=( 

63 "skymap", 

64 "tract", 

65 "patch", 

66 "band", 

67 ), 

68 multiple=True, 

69 ) 

70 

71 skymap = ct.Input( 

72 doc="The skymap that covers the tract that the data is from.", 

73 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME, 

74 storageClass="SkyMap", 

75 dimensions=("skymap",), 

76 ) 

77 

78 def __init__(self, *, config=None): 

79 """Customize the storageClass for a specific instance. This enables it 

80 to be dynamically set at runtime, allowing the task to work with 

81 different types of image-like data. 

82 

83 Parameters 

84 ---------- 

85 config : `WholeTractImageAnalysisConfig` 

86 A config for `WholeTractImageAnalysisTask`. 

87 """ 

88 super().__init__(config=config) 

89 if config and config.dataStorageClass != self.data.storageClass: 

90 self.data = ct.Input( 

91 name=self.data.name, 

92 doc=self.data.doc, 

93 storageClass=config.dataStorageClass, 

94 dimensions=self.data.dimensions, 

95 deferLoad=self.data.deferLoad, 

96 multiple=self.data.multiple, 

97 ) 

98 

99 

100class WholeTractImageAnalysisConfig( 

101 AnalysisBaseConfig, pipelineConnections=WholeTractImageAnalysisConnections 

102): 

103 dataStorageClass = Field( 

104 default="ExposureF", 

105 dtype=str, 

106 doc=( 

107 "Override the storageClass of the input data. " 

108 "Must be of type `Image`, `MaskedImage` or `Exposure`, or one of their subtypes." 

109 ), 

110 ) 

111 

112 

113class WholeTractImageAnalysisTask(AnalysisPipelineTask): 

114 

115 ConfigClass = WholeTractImageAnalysisConfig 

116 _DefaultName = "wholeTractImageAnalysis" 

117 

118 def runQuantum( 

119 self, 

120 butlerQC: QuantumContext, 

121 inputRefs: InputQuantizedConnection, 

122 outputRefs: OutputQuantizedConnection, 

123 ) -> None: 

124 inputs = butlerQC.get(inputRefs) 

125 dataId = butlerQC.quantum.dataId 

126 plotInfo = self.parsePlotInfo(inputs, dataId) 

127 

128 try: 

129 inputData = inputs.pop("data") 

130 except KeyError: 

131 raise RuntimeError("'data' is a required input connection, but is not defined.") 

132 

133 keyedData = dict() 

134 if "Exposure" in self.config.dataStorageClass: 

135 inputNames = {"mask"} 

136 inputNames.update(self.collectInputNames()) 

137 for inputName in inputNames: 

138 keyedData[inputName] = dict() 

139 for handle in inputData: 

140 keyedData[inputName][handle.dataId["patch"]] = handle.get(component=inputName) 

141 elif "Image" in self.config.dataStorageClass: 

142 keyedData["image"] = dict() 

143 for handle in inputData: 

144 image = handle.get() 

145 keyedData["image"][handle.dataId["patch"]] = image 

146 else: 

147 raise TypeError("'data' must be of type Image, MaskedImage, Exposure, or one of their subtypes") 

148 

149 outputs = self.run( 

150 data=keyedData, 

151 plotInfo=plotInfo, 

152 tractId=dataId["tract"], 

153 skymap=inputs["skymap"], 

154 bands=dataId["band"], 

155 ) 

156 

157 self.putByBand(butlerQC, outputs, outputRefs) 

158 

159 def parsePlotInfo( 

160 self, inputs: Mapping[str, Any] | None, dataId: DataCoordinate | None, connectionName: str = "data" 

161 ) -> Mapping[str, str]: 

162 """Parse the inputs and dataId to get the information needed to 

163 to add to the figure. The parent class parsePlotInfo cannot be 

164 used becuase it assumes a single input dataset, as opposed to the 

165 multiple datasets used by this analysis task. 

166 

167 Parameters 

168 ---------- 

169 inputs: `dict` 

170 The inputs to the task 

171 dataCoordinate: `lsst.daf.butler.DataCoordinate` 

172 The dataId that the task is being run on. 

173 connectionName: `str`, optional 

174 Name of the input connection to use for determining table name. 

175 

176 Returns 

177 ------- 

178 plotInfo : `dict` 

179 """ 

180 

181 if inputs is None: 

182 tableName = "" 

183 run = "" 

184 else: 

185 tableName = inputs[connectionName][0].ref.datasetType.name 

186 run = inputs[connectionName][0].ref.run 

187 

188 # Initialize the plot info dictionary 

189 plotInfo = {"tableName": tableName, "run": run} 

190 

191 self._populatePlotInfoWithDataId(plotInfo, dataId) 

192 return plotInfo 

193 

194 

195class MakeBinnedCoaddConnections( 

196 PipelineTaskConnections, 

197 dimensions=("skymap", "tract", "patch", "band"), 

198 defaultTemplates={"coaddName": "deep"}, 

199): 

200 

201 coadd = ct.Input( 

202 doc="Input coadd image data to bin.", 

203 name="{coaddName}Coadd_calexp", 

204 storageClass="ExposureF", 

205 dimensions=("skymap", "tract", "patch", "band"), 

206 deferLoad=True, 

207 ) 

208 skymap = ct.Input( 

209 doc="The skymap that covers the tract that the data is from.", 

210 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME, 

211 storageClass="SkyMap", 

212 dimensions=("skymap",), 

213 ) 

214 binnedCoadd = ct.Output( 

215 doc="Binned coadd image data.", 

216 name="{coaddName}Coadd_calexp_bin", 

217 storageClass="ExposureF", 

218 dimensions=("skymap", "tract", "patch", "band"), 

219 ) 

220 

221 def __init__(self, *, config=None): 

222 """Customize the storageClass for a specific instance. 

223 This enables it to be dynamically set at runtime, allowing 

224 the task to work with different types of image-like data. 

225 

226 Parameters 

227 ---------- 

228 config : `MakeBinnedCoaddConfig` 

229 A config for `MakeBinnedCoaddTask`. 

230 """ 

231 super().__init__(config=config) 

232 if config and config.coaddStorageClass != self.coadd.storageClass: 

233 self.coadd = ct.Input( 

234 name=self.coadd.name, 

235 doc=self.coadd.doc, 

236 storageClass=config.coaddStorageClass, 

237 dimensions=self.coadd.dimensions, 

238 deferLoad=self.coadd.deferLoad, 

239 ) 

240 self.binnedCoadd = ct.Output( 

241 name=self.binnedCoadd.name, 

242 doc=self.binnedCoadd.doc, 

243 storageClass=config.coaddStorageClass, 

244 dimensions=self.binnedCoadd.dimensions, 

245 ) 

246 

247 

248class MakeBinnedCoaddConfig(PipelineTaskConfig, pipelineConnections=MakeBinnedCoaddConnections): 

249 """Config for MakeBinnedCoaddTask""" 

250 

251 doBinInnerBBox = Field[bool]( 

252 doc=( 

253 "Retrieve and bin the coadd image data within the patch Inner Bounding Box, ", 

254 "thereby excluding the regions that overlap neighboring patches.", 

255 ), 

256 default=False, 

257 ) 

258 binFactor = Field[int]( 

259 doc="Binning factor applied to both spatial dimensions.", 

260 default=8, 

261 check=lambda x: x > 1, 

262 ) 

263 coaddStorageClass = Field( 

264 default="ExposureF", 

265 dtype=str, 

266 doc=( 

267 "Override the storageClass of the input and binned coadd image data. " 

268 "Must be of type `Image`, `MaskedImage`, or `Exposure`, or one of their subtypes." 

269 ), 

270 ) 

271 

272 

273class MakeBinnedCoaddTask(PipelineTask): 

274 

275 ConfigClass = MakeBinnedCoaddConfig 

276 _DefaultName = "makeBinnedCoadd" 

277 

278 def runQuantum( 

279 self, 

280 butlerQC: QuantumContext, 

281 inputRefs: InputQuantizedConnection, 

282 outputRefs: OutputQuantizedConnection, 

283 ) -> None: 

284 """Takes coadd image data and bins it by the factor specified in 

285 self.config.binFactor. This task uses the binImageData function 

286 defined in ip_isr, but adds the option to only retrieve and bin the 

287 data contained within the patch's inner bounding box. 

288 

289 Parameters 

290 ---------- 

291 butlerQC : `lsst.pipe.base.QuantumContext` 

292 A butler which is specialized to operate in the context of a 

293 `lsst.daf.butler.Quantum`. 

294 inputRefs : `lsst.pipe.base.InputQuantizedConnection` 

295 Data structure containing named attributes 'coadd' and 'skymap'. 

296 The values of these attributes are the corresponding 

297 `lsst.daf.butler.DatasetRef` objects defined in the corresponding 

298 `PipelineTaskConnections` class. 

299 outputRefs : `lsst.pipe.base.OutputQuantizedConnection` 

300 Datastructure containing named attribute 'binnedCoadd'. 

301 The value of this attribute is the corresponding 

302 `lsst.daf.butler.DatasetRef` object defined in the corresponding 

303 `PipelineTaskConnections` class. 

304 """ 

305 

306 inputs = butlerQC.get(inputRefs) 

307 coaddRef = inputs["coadd"] 

308 

309 if self.config.doBinInnerBBox: 

310 skymap = inputs["skymap"] 

311 tractId = butlerQC.quantum.dataId["tract"] 

312 patchId = butlerQC.quantum.dataId["patch"] 

313 tractInfo = skymap.generateTract(tractId) 

314 bbox = tractInfo.getPatchInfo(patchId).getInnerBBox() 

315 

316 coadd = coaddRef.get(parameters={"bbox": bbox}) 

317 else: 

318 coadd = coaddRef.get() 

319 

320 binnedCoadd = binImageData(coadd, self.config.binFactor) 

321 

322 butlerQC.put(pipeBase.Struct(binnedCoadd=binnedCoadd), outputRefs)