Coverage for python / lsst / analysis / tools / tasks / wholeTractImageAnalysis.py: 33%
84 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 18:53 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 18:53 +0000
1# This file is part of analysis_tools.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22__all__ = (
23 "WholeTractImageAnalysisConfig",
24 "WholeTractImageAnalysisTask",
25 "MakeBinnedCoaddConfig",
26 "MakeBinnedCoaddTask",
27)
29from typing import Any, Mapping
31import lsst.pipe.base as pipeBase
32from lsst.daf.butler import DataCoordinate
33from lsst.ip.isr.binImageDataTask import binImageData
34from lsst.pex.config import Field
35from lsst.pipe.base import (
36 InputQuantizedConnection,
37 OutputQuantizedConnection,
38 PipelineTask,
39 PipelineTaskConfig,
40 PipelineTaskConnections,
41 QuantumContext,
42)
43from lsst.pipe.base import connectionTypes as ct
44from lsst.skymap import BaseSkyMap
46from ..interfaces import AnalysisBaseConfig, AnalysisBaseConnections, AnalysisPipelineTask
49class WholeTractImageAnalysisConnections(
50 AnalysisBaseConnections,
51 dimensions=("skymap", "tract", "band"),
52 defaultTemplates={
53 "coaddName": "deep",
54 },
55):
56 data = ct.Input(
57 doc="Binned coadd image data to read from the butler.",
58 name="{coaddName}Coadd_calexp_bin",
59 storageClass="ExposureF",
60 deferLoad=True,
61 dimensions=(
62 "skymap",
63 "tract",
64 "patch",
65 "band",
66 ),
67 multiple=True,
68 )
70 skymap = ct.Input(
71 doc="The skymap that covers the tract that the data is from.",
72 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
73 storageClass="SkyMap",
74 dimensions=("skymap",),
75 )
77 def __init__(self, *, config=None):
78 """Customize the storageClass for a specific instance. This enables it
79 to be dynamically set at runtime, allowing the task to work with
80 different types of image-like data.
82 Parameters
83 ----------
84 config : `WholeTractImageAnalysisConfig`
85 A config for `WholeTractImageAnalysisTask`.
86 """
87 super().__init__(config=config)
88 if config and config.dataStorageClass != self.data.storageClass:
89 self.data = ct.Input(
90 name=self.data.name,
91 doc=self.data.doc,
92 storageClass=config.dataStorageClass,
93 dimensions=self.data.dimensions,
94 deferLoad=self.data.deferLoad,
95 multiple=self.data.multiple,
96 )
99class WholeTractImageAnalysisConfig(
100 AnalysisBaseConfig, pipelineConnections=WholeTractImageAnalysisConnections
101):
102 dataStorageClass = Field(
103 default="ExposureF",
104 dtype=str,
105 doc=(
106 "Override the storageClass of the input data. "
107 "Must be of type `Image`, `MaskedImage` or `Exposure`, or one of their subtypes."
108 ),
109 )
112class WholeTractImageAnalysisTask(AnalysisPipelineTask):
114 ConfigClass = WholeTractImageAnalysisConfig
115 _DefaultName = "wholeTractImageAnalysis"
117 def runQuantum(
118 self,
119 butlerQC: QuantumContext,
120 inputRefs: InputQuantizedConnection,
121 outputRefs: OutputQuantizedConnection,
122 ) -> None:
123 inputs = butlerQC.get(inputRefs)
124 dataId = butlerQC.quantum.dataId
125 plotInfo = self.parsePlotInfo(inputs, dataId)
127 try:
128 inputData = inputs.pop("data")
129 except KeyError:
130 raise RuntimeError("'data' is a required input connection, but is not defined.")
132 keyedData = dict()
133 if "Exposure" in self.config.dataStorageClass:
134 inputNames = {"mask"}
135 inputNames.update(self.collectInputNames())
136 for inputName in inputNames:
137 keyedData[inputName] = dict()
138 for handle in inputData:
139 keyedData[inputName][handle.dataId["patch"]] = handle.get(component=inputName)
140 elif "Image" in self.config.dataStorageClass:
141 keyedData["image"] = dict()
142 for handle in inputData:
143 image = handle.get()
144 keyedData["image"][handle.dataId["patch"]] = image
145 else:
146 raise TypeError("'data' must be of type Image, MaskedImage, Exposure, or one of their subtypes")
148 outputs = self.run(
149 data=keyedData,
150 plotInfo=plotInfo,
151 tractId=dataId["tract"],
152 skymap=inputs["skymap"],
153 bands=dataId["band"],
154 )
156 self.putByBand(butlerQC, outputs, outputRefs)
158 def parsePlotInfo(
159 self, inputs: Mapping[str, Any] | None, dataId: DataCoordinate | None, connectionName: str = "data"
160 ) -> Mapping[str, str]:
161 """Parse the inputs and dataId to get the information needed to
162 to add to the figure. The parent class parsePlotInfo cannot be
163 used becuase it assumes a single input dataset, as opposed to the
164 multiple datasets used by this analysis task.
166 Parameters
167 ----------
168 inputs: `dict`
169 The inputs to the task
170 dataCoordinate: `lsst.daf.butler.DataCoordinate`
171 The dataId that the task is being run on.
172 connectionName: `str`, optional
173 Name of the input connection to use for determining table name.
175 Returns
176 -------
177 plotInfo : `dict`
178 """
180 if inputs is None:
181 tableName = ""
182 run = ""
183 else:
184 tableName = inputs[connectionName][0].ref.datasetType.name
185 run = inputs[connectionName][0].ref.run
187 # Initialize the plot info dictionary
188 plotInfo = {"tableName": tableName, "run": run}
190 self._populatePlotInfoWithDataId(plotInfo, dataId)
191 return plotInfo
194class MakeBinnedCoaddConnections(
195 PipelineTaskConnections,
196 dimensions=("skymap", "tract", "patch", "band"),
197 defaultTemplates={"coaddName": "deep"},
198):
200 coadd = ct.Input(
201 doc="Input coadd image data to bin.",
202 name="{coaddName}Coadd_calexp",
203 storageClass="ExposureF",
204 dimensions=("skymap", "tract", "patch", "band"),
205 deferLoad=True,
206 )
207 skymap = ct.Input(
208 doc="The skymap that covers the tract that the data is from.",
209 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
210 storageClass="SkyMap",
211 dimensions=("skymap",),
212 )
213 binnedCoadd = ct.Output(
214 doc="Binned coadd image data.",
215 name="{coaddName}Coadd_calexp_bin",
216 storageClass="ExposureF",
217 dimensions=("skymap", "tract", "patch", "band"),
218 )
220 def __init__(self, *, config=None):
221 """Customize the storageClass for a specific instance.
222 This enables it to be dynamically set at runtime, allowing
223 the task to work with different types of image-like data.
225 Parameters
226 ----------
227 config : `MakeBinnedCoaddConfig`
228 A config for `MakeBinnedCoaddTask`.
229 """
230 super().__init__(config=config)
231 if config and config.coaddStorageClass != self.coadd.storageClass:
232 self.coadd = ct.Input(
233 name=self.coadd.name,
234 doc=self.coadd.doc,
235 storageClass=config.coaddStorageClass,
236 dimensions=self.coadd.dimensions,
237 deferLoad=self.coadd.deferLoad,
238 )
239 self.binnedCoadd = ct.Output(
240 name=self.binnedCoadd.name,
241 doc=self.binnedCoadd.doc,
242 storageClass=config.coaddStorageClass,
243 dimensions=self.binnedCoadd.dimensions,
244 )
247class MakeBinnedCoaddConfig(PipelineTaskConfig, pipelineConnections=MakeBinnedCoaddConnections):
248 """Config for MakeBinnedCoaddTask"""
250 doBinInnerBBox = Field[bool](
251 doc=(
252 "Retrieve and bin the coadd image data within the patch Inner Bounding Box, ",
253 "thereby excluding the regions that overlap neighboring patches.",
254 ),
255 default=False,
256 )
257 binFactor = Field[int](
258 doc="Binning factor applied to both spatial dimensions.",
259 default=8,
260 check=lambda x: x > 1,
261 )
262 coaddStorageClass = Field(
263 default="ExposureF",
264 dtype=str,
265 doc=(
266 "Override the storageClass of the input and binned coadd image data. "
267 "Must be of type `Image`, `MaskedImage`, or `Exposure`, or one of their subtypes."
268 ),
269 )
272class MakeBinnedCoaddTask(PipelineTask):
274 ConfigClass = MakeBinnedCoaddConfig
275 _DefaultName = "makeBinnedCoadd"
277 def runQuantum(
278 self,
279 butlerQC: QuantumContext,
280 inputRefs: InputQuantizedConnection,
281 outputRefs: OutputQuantizedConnection,
282 ) -> None:
283 """Takes coadd image data and bins it by the factor specified in
284 self.config.binFactor. This task uses the binImageData function
285 defined in ip_isr, but adds the option to only retrieve and bin the
286 data contained within the patch's inner bounding box.
288 Parameters
289 ----------
290 butlerQC : `lsst.pipe.base.QuantumContext`
291 A butler which is specialized to operate in the context of a
292 `lsst.daf.butler.Quantum`.
293 inputRefs : `lsst.pipe.base.InputQuantizedConnection`
294 Data structure containing named attributes 'coadd' and 'skymap'.
295 The values of these attributes are the corresponding
296 `lsst.daf.butler.DatasetRef` objects defined in the corresponding
297 `PipelineTaskConnections` class.
298 outputRefs : `lsst.pipe.base.OutputQuantizedConnection`
299 Datastructure containing named attribute 'binnedCoadd'.
300 The value of this attribute is the corresponding
301 `lsst.daf.butler.DatasetRef` object defined in the corresponding
302 `PipelineTaskConnections` class.
303 """
305 inputs = butlerQC.get(inputRefs)
306 coaddRef = inputs["coadd"]
308 if self.config.doBinInnerBBox:
309 skymap = inputs["skymap"]
310 tractId = butlerQC.quantum.dataId["tract"]
311 patchId = butlerQC.quantum.dataId["patch"]
312 tractInfo = skymap.generateTract(tractId)
313 bbox = tractInfo.getPatchInfo(patchId).getInnerBBox()
315 coadd = coaddRef.get(parameters={"bbox": bbox})
316 else:
317 coadd = coaddRef.get()
319 binnedCoadd = binImageData(coadd, self.config.binFactor)
321 butlerQC.put(pipeBase.Struct(binnedCoadd=binnedCoadd), outputRefs)