Coverage for python / lsst / analysis / tools / tasks / propertyMapAnalysis.py: 41%
75 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 09:09 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 09:09 +0000
1# This file is part of analysis_tools.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
21from __future__ import annotations
23__all__ = [
24 "SurveyWidePropertyMapAnalysisConfig",
25 "SurveyWidePropertyMapAnalysisTask",
26 "PerTractPropertyMapAnalysisConfig",
27 "PerTractPropertyMapAnalysisTask",
28]
30from typing import Any, Mapping, Union
32from lsst.daf.butler import DataCoordinate
33from lsst.pex.config import ChoiceField, DictField, Field, ListField
34from lsst.pipe.base import (
35 InputQuantizedConnection,
36 OutputQuantizedConnection,
37 QuantumContext,
38 connectionTypes,
39)
40from lsst.skymap import BaseSkyMap
42from ..interfaces import AnalysisBaseConfig, AnalysisBaseConnections, AnalysisPipelineTask
45class PerTractPropertyMapAnalysisConnections(
46 AnalysisBaseConnections,
47 dimensions=("skymap", "band", "tract"),
48 defaultTemplates={"outputName": "propertyMapTract"},
49):
50 skymap = connectionTypes.Input(
51 doc="The skymap that covers the tract that the data is from.",
52 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
53 storageClass="SkyMap",
54 dimensions=("skymap",),
55 )
57 def __init__(self, *, config=None):
58 super().__init__(config=config)
60 operationNameLookup = {
61 "min": "Minimum",
62 "max": "Maximum",
63 "mean": "Mean",
64 "weighted_mean": "Weighted mean",
65 "sum": "Sum",
66 }
68 # Making connections for the maps that are configured to run.
69 for name in config.atools.fieldNames:
70 propertyName, operationName = name.split("_map_")
71 coaddName, propertyName = propertyName.split("Coadd_")
72 propertyName = propertyName.replace("_", " ")
73 operationLongName = operationNameLookup[operationName]
74 setattr(
75 self,
76 name,
77 connectionTypes.Input(
78 doc=f"{operationLongName}-value map of {propertyName} for {coaddName} coadd",
79 name=name,
80 storageClass="HealSparseMap",
81 dimensions=("skymap", "band", "tract"),
82 multiple=False,
83 deferLoad=True,
84 ),
85 )
88class PerTractPropertyMapAnalysisConfig(
89 AnalysisBaseConfig, pipelineConnections=PerTractPropertyMapAnalysisConnections
90):
91 projectionKwargs = DictField(
92 keytype=str,
93 itemtype=float,
94 doc="Keyword arguments to use in the GnomonicSkyproj call, e.g. n_grid_lon. "
95 "The following keys are not permitted: 'ax', 'lon_0', 'lat_0', and 'extent'. "
96 "See https://skyproj.readthedocs.io/en/latest/modules.html#skyproj.skyproj.GnomonicSkyproj",
97 default={},
98 keyCheck=lambda k: k not in ["ax", "lon_0", "lat_0", "extent"],
99 )
101 zoomFactors = ListField(
102 dtype=float,
103 doc="Two-element list of zoom factors to use when plotting the maps.",
104 default=[1.8, 5],
105 )
107 colorbarKwargs = DictField(
108 keytype=str,
109 itemtype=str,
110 doc="Keyword arguments to pass to the colorbar except for 'orientation' and 'location'.",
111 default={"cmap": "viridis"},
112 keyCheck=lambda k: k not in ["orientation", "location"],
113 )
115 publicationStyle = Field[bool](doc="Make a simplified figure layout for publication use?", default=False)
118class PerTractPropertyMapAnalysisTask(AnalysisPipelineTask):
119 ConfigClass = PerTractPropertyMapAnalysisConfig
120 _DefaultName = "perTractPropertyMapAnalysisTask"
122 def parsePlotInfo(
123 self, inputs: Mapping[str, Any], dataId: DataCoordinate | None, connectionNames: list[str]
124 ) -> Mapping[str, Union[Mapping[str, str], str, int]]:
125 """Parse the inputs and dataId to get the information needed to add to
126 the figure.
128 Parameters
129 ----------
130 inputs: `dict`
131 The inputs to the task
132 dataId: `~lsst.daf.butler.DataCoordinate`
133 The dataId that the task is being run on.
134 connectionNames: `list` [`str`]
135 Name of the input connections to use for determining table names.
137 Returns
138 -------
139 plotInfo : `dict`
140 A dictionary containing the information needed to add to the
141 figure.
143 Notes
144 -----
145 We customized this method to fit our needs, because our analyses are
146 not 1-1 with datasettypes. We analyze multiple connections/datasettypes
147 at once, thus the table names are not the same for all connections.
148 """
150 # Initialize the plot info dictionary.
151 plotInfo = {
152 # To be filled in later.
153 "tableNames": {},
154 # They all share the same run, so just grab the first one.
155 "run": inputs[connectionNames[0]].ref.run,
156 }
158 # For each connection, separately store the table name.
159 for connectionName in connectionNames:
160 tableName = inputs[connectionName].ref.datasetType.name
161 plotInfo["tableNames"][connectionName] = tableName
163 # Add the dataId information, same for all connections.
164 self._populatePlotInfoWithDataId(plotInfo, dataId)
166 return plotInfo
168 def runQuantum(
169 self,
170 butlerQC: QuantumContext,
171 inputRefs: InputQuantizedConnection,
172 outputRefs: OutputQuantizedConnection,
173 ) -> None:
174 # Docstring inherited.
176 inputs = butlerQC.get(inputRefs)
177 skymap = inputs["skymap"]
178 dataId = butlerQC.quantum.dataId
179 tractInfo = skymap[dataId["tract"]]
180 mapKeys = [key for key in inputs if key != "skymap"]
182 plotInfo = self.parsePlotInfo(inputs, dataId, mapKeys)
183 outputs = self.run(data=inputs, tractInfo=tractInfo, plotConfig=self.config, plotInfo=plotInfo)
184 butlerQC.put(outputs, outputRefs)
187class SurveyWidePropertyMapAnalysisConnections(
188 AnalysisBaseConnections,
189 dimensions=("skymap", "band"),
190 defaultTemplates={"outputName": "propertyMapSurvey"},
191):
192 def __init__(self, *, config=None):
193 super().__init__(config=config)
195 operationNameLookup = {
196 "min": "Minimum",
197 "max": "Maximum",
198 "mean": "Mean",
199 "weighted_mean": "Weighted mean",
200 "sum": "Sum",
201 }
203 # Making connections for the maps that are configured to run.
204 for name in config.atools.fieldNames:
205 propertyName, operationName = name.split("_consolidated_map_")
206 coaddName, propertyName = propertyName.split("Coadd_")
207 propertyName = propertyName.replace("_", " ")
208 operationLongName = operationNameLookup[operationName]
209 setattr(
210 self,
211 name,
212 connectionTypes.Input(
213 doc=f"{operationLongName}-value consolidated map of {propertyName} for {coaddName} coadd",
214 name=name,
215 storageClass="HealSparseMap",
216 dimensions=("skymap", "band"),
217 multiple=False,
218 deferLoad=True,
219 ),
220 )
223class SurveyWidePropertyMapAnalysisConfig(
224 AnalysisBaseConfig, pipelineConnections=SurveyWidePropertyMapAnalysisConnections
225):
226 # Note: Gnomonic projection is excluded here because `GnomonicSkyproj` must
227 # have the central lon/lat set (defaults to 0/0) which makes it useful for
228 # plotting individual tracts but not for survey-wide maps.
229 projection = ChoiceField[str](
230 doc="The projection to use for plotting the map. "
231 "See https://skyproj.readthedocs.io/en/latest/projections.html",
232 default="McBryde",
233 allowed={
234 proj: proj
235 for proj in (
236 "McBryde",
237 "Mollweide",
238 "Cylindrical",
239 "Laea",
240 "Hammer",
241 "EqualEarth",
242 "ObliqueMollweide",
243 "Albers",
244 )
245 },
246 )
248 projectionKwargs = DictField(
249 keytype=str,
250 itemtype=float,
251 doc="Keyword arguments to use in the projection call, e.g. lon_0. The key 'ax' is not permitted. "
252 "See https://skyproj.readthedocs.io/en/latest/projections.html",
253 default={},
254 keyCheck=lambda k: k not in ["ax"],
255 )
257 autozoom = Field[bool](
258 doc="Automatically zooms in on the RA/Dec range of the map to make better use of its resolution; "
259 "otherwise, the map is displayed within the full-sky domain.",
260 default=True,
261 )
263 colorbarKwargs = DictField(
264 keytype=str,
265 itemtype=str,
266 doc="Keyword arguments to pass to the colorbar.",
267 default={"orientation": "horizontal", "location": "top", "cmap": "viridis"},
268 )
269 # TODO: Mixed types will be allowed after DM-47937. You can then add things
270 # like "aspect": 20, rather than just strings.
272 publicationStyle = Field[bool](doc="Make a simplified figure layout for publication use?", default=False)
275class SurveyWidePropertyMapAnalysisTask(AnalysisPipelineTask):
276 ConfigClass = SurveyWidePropertyMapAnalysisConfig
277 _DefaultName = "surveyWidePropertyMapAnalysisTask"
279 def parsePlotInfo(
280 self, inputs: Mapping[str, Any], dataId: DataCoordinate | None, connectionNames: list[str]
281 ) -> Mapping[str, Union[Mapping[str, str], str, int]]:
282 """Parse the inputs and dataId to get the information needed to add to
283 the figure.
285 Parameters
286 ----------
287 inputs: `dict`
288 The inputs to the task
289 dataId: `~lsst.daf.butler.DataCoordinate`
290 The dataId that the task is being run on.
291 connectionNames: `list` [`str`]
292 Name of the input connections to use for determining table names.
294 Returns
295 -------
296 plotInfo : `dict`
297 A dictionary containing the information needed to add to the
298 figure.
300 Notes
301 -----
302 We customized this method to fit our needs, because our analyses are
303 not 1-1 with datasettypes. We analyze multiple connections/datasettypes
304 at once, thus the table names are not the same for all connections.
305 """
307 # Initialize the plot info dictionary.
308 plotInfo = {
309 # To be filled in later.
310 "tableNames": {},
311 # They all share the same run, so just grab the first one.
312 "run": inputs[connectionNames[0]].ref.run,
313 }
315 # For each connection, separately store the table name.
316 for connectionName in connectionNames:
317 tableName = inputs[connectionName].ref.datasetType.name
318 plotInfo["tableNames"][connectionName] = tableName
320 # Add the dataId information, same for all connections.
321 self._populatePlotInfoWithDataId(plotInfo, dataId)
323 return plotInfo
325 def runQuantum(
326 self,
327 butlerQC: QuantumContext,
328 inputRefs: InputQuantizedConnection,
329 outputRefs: OutputQuantizedConnection,
330 ) -> None:
331 # Docstring inherited.
333 inputs = butlerQC.get(inputRefs)
334 dataId = butlerQC.quantum.dataId
336 plotInfo = self.parsePlotInfo(inputs, dataId, list(inputs.keys()))
337 outputs = self.run(data=inputs, plotConfig=self.config, plotInfo=plotInfo)
338 butlerQC.put(outputs, outputRefs)