Coverage for python / lsst / analysis / tools / tasks / propertyMapAnalysis.py: 42%
76 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 09:36 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 09:36 +0000
1# This file is part of analysis_tools.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
21from __future__ import annotations
23__all__ = [
24 "SurveyWidePropertyMapAnalysisConfig",
25 "SurveyWidePropertyMapAnalysisTask",
26 "PerTractPropertyMapAnalysisConfig",
27 "PerTractPropertyMapAnalysisTask",
28]
30from collections.abc import Mapping
31from typing import Any
33from lsst.daf.butler import DataCoordinate
34from lsst.pex.config import ChoiceField, DictField, Field, ListField
35from lsst.pipe.base import (
36 InputQuantizedConnection,
37 OutputQuantizedConnection,
38 QuantumContext,
39 connectionTypes,
40)
41from lsst.skymap import BaseSkyMap
43from ..interfaces import AnalysisBaseConfig, AnalysisBaseConnections, AnalysisPipelineTask
46class PerTractPropertyMapAnalysisConnections(
47 AnalysisBaseConnections,
48 dimensions=("skymap", "band", "tract"),
49 defaultTemplates={"outputName": "propertyMapTract"},
50):
51 skymap = connectionTypes.Input(
52 doc="The skymap that covers the tract that the data is from.",
53 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
54 storageClass="SkyMap",
55 dimensions=("skymap",),
56 )
58 def __init__(self, *, config=None):
59 super().__init__(config=config)
61 operationNameLookup = {
62 "min": "Minimum",
63 "max": "Maximum",
64 "mean": "Mean",
65 "weighted_mean": "Weighted mean",
66 "sum": "Sum",
67 }
69 # Making connections for the maps that are configured to run.
70 for name in config.atools.fieldNames:
71 propertyName, operationName = name.split("_map_")
72 coaddName, propertyName = propertyName.split("Coadd_")
73 propertyName = propertyName.replace("_", " ")
74 operationLongName = operationNameLookup[operationName]
75 setattr(
76 self,
77 name,
78 connectionTypes.Input(
79 doc=f"{operationLongName}-value map of {propertyName} for {coaddName} coadd",
80 name=name,
81 storageClass="HealSparseMap",
82 dimensions=("skymap", "band", "tract"),
83 multiple=False,
84 deferLoad=True,
85 ),
86 )
89class PerTractPropertyMapAnalysisConfig(
90 AnalysisBaseConfig, pipelineConnections=PerTractPropertyMapAnalysisConnections
91):
92 projectionKwargs = DictField(
93 keytype=str,
94 itemtype=float,
95 doc="Keyword arguments to use in the GnomonicSkyproj call, e.g. n_grid_lon. "
96 "The following keys are not permitted: 'ax', 'lon_0', 'lat_0', and 'extent'. "
97 "See https://skyproj.readthedocs.io/en/latest/modules.html#skyproj.skyproj.GnomonicSkyproj",
98 default={},
99 keyCheck=lambda k: k not in ["ax", "lon_0", "lat_0", "extent"],
100 )
102 zoomFactors = ListField(
103 dtype=float,
104 doc="Two-element list of zoom factors to use when plotting the maps.",
105 default=[1.8, 5],
106 )
108 colorbarKwargs = DictField(
109 keytype=str,
110 itemtype=str,
111 doc="Keyword arguments to pass to the colorbar except for 'orientation' and 'location'.",
112 default={"cmap": "viridis"},
113 keyCheck=lambda k: k not in ["orientation", "location"],
114 )
116 publicationStyle = Field[bool](doc="Make a simplified figure layout for publication use?", default=False)
119class PerTractPropertyMapAnalysisTask(AnalysisPipelineTask):
120 ConfigClass = PerTractPropertyMapAnalysisConfig
121 _DefaultName = "perTractPropertyMapAnalysisTask"
123 def parsePlotInfo(
124 self, inputs: Mapping[str, Any], dataId: DataCoordinate | None, connectionNames: list[str]
125 ) -> Mapping[str, Mapping[str, str] | str | int]:
126 """Parse the inputs and dataId to get the information needed to add to
127 the figure.
129 Parameters
130 ----------
131 inputs: `dict`
132 The inputs to the task
133 dataId: `~lsst.daf.butler.DataCoordinate`
134 The dataId that the task is being run on.
135 connectionNames: `list` [`str`]
136 Name of the input connections to use for determining table names.
138 Returns
139 -------
140 plotInfo : `dict`
141 A dictionary containing the information needed to add to the
142 figure.
144 Notes
145 -----
146 We customized this method to fit our needs, because our analyses are
147 not 1-1 with datasettypes. We analyze multiple connections/datasettypes
148 at once, thus the table names are not the same for all connections.
149 """
151 # Initialize the plot info dictionary.
152 plotInfo = {
153 # To be filled in later.
154 "tableNames": {},
155 # They all share the same run, so just grab the first one.
156 "run": inputs[connectionNames[0]].ref.run,
157 }
159 # For each connection, separately store the table name.
160 for connectionName in connectionNames:
161 tableName = inputs[connectionName].ref.datasetType.name
162 plotInfo["tableNames"][connectionName] = tableName
164 # Add the dataId information, same for all connections.
165 self._populatePlotInfoWithDataId(plotInfo, dataId)
167 return plotInfo
169 def runQuantum(
170 self,
171 butlerQC: QuantumContext,
172 inputRefs: InputQuantizedConnection,
173 outputRefs: OutputQuantizedConnection,
174 ) -> None:
175 # Docstring inherited.
177 inputs = butlerQC.get(inputRefs)
178 skymap = inputs["skymap"]
179 dataId = butlerQC.quantum.dataId
180 tractInfo = skymap[dataId["tract"]]
181 mapKeys = [key for key in inputs if key != "skymap"]
183 plotInfo = self.parsePlotInfo(inputs, dataId, mapKeys)
184 outputs = self.run(data=inputs, tractInfo=tractInfo, plotConfig=self.config, plotInfo=plotInfo)
185 butlerQC.put(outputs, outputRefs)
188class SurveyWidePropertyMapAnalysisConnections(
189 AnalysisBaseConnections,
190 dimensions=("skymap", "band"),
191 defaultTemplates={"outputName": "propertyMapSurvey"},
192):
193 def __init__(self, *, config=None):
194 super().__init__(config=config)
196 operationNameLookup = {
197 "min": "Minimum",
198 "max": "Maximum",
199 "mean": "Mean",
200 "weighted_mean": "Weighted mean",
201 "sum": "Sum",
202 }
204 # Making connections for the maps that are configured to run.
205 for name in config.atools.fieldNames:
206 propertyName, operationName = name.split("_consolidated_map_")
207 coaddName, propertyName = propertyName.split("Coadd_")
208 propertyName = propertyName.replace("_", " ")
209 operationLongName = operationNameLookup[operationName]
210 setattr(
211 self,
212 name,
213 connectionTypes.Input(
214 doc=f"{operationLongName}-value consolidated map of {propertyName} for {coaddName} coadd",
215 name=name,
216 storageClass="HealSparseMap",
217 dimensions=("skymap", "band"),
218 multiple=False,
219 deferLoad=True,
220 ),
221 )
224class SurveyWidePropertyMapAnalysisConfig(
225 AnalysisBaseConfig, pipelineConnections=SurveyWidePropertyMapAnalysisConnections
226):
227 # Note: Gnomonic projection is excluded here because `GnomonicSkyproj` must
228 # have the central lon/lat set (defaults to 0/0) which makes it useful for
229 # plotting individual tracts but not for survey-wide maps.
230 projection = ChoiceField[str](
231 doc="The projection to use for plotting the map. "
232 "See https://skyproj.readthedocs.io/en/latest/projections.html",
233 default="McBryde",
234 allowed={
235 proj: proj
236 for proj in (
237 "McBryde",
238 "Mollweide",
239 "Cylindrical",
240 "Laea",
241 "Hammer",
242 "EqualEarth",
243 "ObliqueMollweide",
244 "Albers",
245 )
246 },
247 )
249 projectionKwargs = DictField(
250 keytype=str,
251 itemtype=float,
252 doc="Keyword arguments to use in the projection call, e.g. lon_0. The key 'ax' is not permitted. "
253 "See https://skyproj.readthedocs.io/en/latest/projections.html",
254 default={},
255 keyCheck=lambda k: k not in ["ax"],
256 )
258 autozoom = Field[bool](
259 doc="Automatically zooms in on the RA/Dec range of the map to make better use of its resolution; "
260 "otherwise, the map is displayed within the full-sky domain.",
261 default=True,
262 )
264 colorbarKwargs = DictField(
265 keytype=str,
266 itemtype=str,
267 doc="Keyword arguments to pass to the colorbar.",
268 default={"orientation": "horizontal", "location": "top", "cmap": "viridis"},
269 )
270 # TODO: Mixed types will be allowed after DM-47937. You can then add things
271 # like "aspect": 20, rather than just strings.
273 publicationStyle = Field[bool](doc="Make a simplified figure layout for publication use?", default=False)
276class SurveyWidePropertyMapAnalysisTask(AnalysisPipelineTask):
277 ConfigClass = SurveyWidePropertyMapAnalysisConfig
278 _DefaultName = "surveyWidePropertyMapAnalysisTask"
280 def parsePlotInfo(
281 self, inputs: Mapping[str, Any], dataId: DataCoordinate | None, connectionNames: list[str]
282 ) -> Mapping[str, Mapping[str, str] | str | int]:
283 """Parse the inputs and dataId to get the information needed to add to
284 the figure.
286 Parameters
287 ----------
288 inputs: `dict`
289 The inputs to the task
290 dataId: `~lsst.daf.butler.DataCoordinate`
291 The dataId that the task is being run on.
292 connectionNames: `list` [`str`]
293 Name of the input connections to use for determining table names.
295 Returns
296 -------
297 plotInfo : `dict`
298 A dictionary containing the information needed to add to the
299 figure.
301 Notes
302 -----
303 We customized this method to fit our needs, because our analyses are
304 not 1-1 with datasettypes. We analyze multiple connections/datasettypes
305 at once, thus the table names are not the same for all connections.
306 """
308 # Initialize the plot info dictionary.
309 plotInfo = {
310 # To be filled in later.
311 "tableNames": {},
312 # They all share the same run, so just grab the first one.
313 "run": inputs[connectionNames[0]].ref.run,
314 }
316 # For each connection, separately store the table name.
317 for connectionName in connectionNames:
318 tableName = inputs[connectionName].ref.datasetType.name
319 plotInfo["tableNames"][connectionName] = tableName
321 # Add the dataId information, same for all connections.
322 self._populatePlotInfoWithDataId(plotInfo, dataId)
324 return plotInfo
326 def runQuantum(
327 self,
328 butlerQC: QuantumContext,
329 inputRefs: InputQuantizedConnection,
330 outputRefs: OutputQuantizedConnection,
331 ) -> None:
332 # Docstring inherited.
334 inputs = butlerQC.get(inputRefs)
335 dataId = butlerQC.quantum.dataId
337 plotInfo = self.parsePlotInfo(inputs, dataId, list(inputs.keys()))
338 outputs = self.run(data=inputs, plotConfig=self.config, plotInfo=plotInfo)
339 butlerQC.put(outputs, outputRefs)