Coverage for python / lsst / analysis / tools / tasks / propertyMapAnalysis.py: 42%

76 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 09:27 +0000

1# This file is part of analysis_tools. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21from __future__ import annotations 

22 

23__all__ = [ 

24 "SurveyWidePropertyMapAnalysisConfig", 

25 "SurveyWidePropertyMapAnalysisTask", 

26 "PerTractPropertyMapAnalysisConfig", 

27 "PerTractPropertyMapAnalysisTask", 

28] 

29 

30from collections.abc import Mapping 

31from typing import Any 

32 

33from lsst.daf.butler import DataCoordinate 

34from lsst.pex.config import ChoiceField, DictField, Field, ListField 

35from lsst.pipe.base import ( 

36 InputQuantizedConnection, 

37 OutputQuantizedConnection, 

38 QuantumContext, 

39 connectionTypes, 

40) 

41from lsst.skymap import BaseSkyMap 

42 

43from ..interfaces import AnalysisBaseConfig, AnalysisBaseConnections, AnalysisPipelineTask 

44 

45 

46class PerTractPropertyMapAnalysisConnections( 

47 AnalysisBaseConnections, 

48 dimensions=("skymap", "band", "tract"), 

49 defaultTemplates={"outputName": "propertyMapTract"}, 

50): 

51 skymap = connectionTypes.Input( 

52 doc="The skymap that covers the tract that the data is from.", 

53 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME, 

54 storageClass="SkyMap", 

55 dimensions=("skymap",), 

56 ) 

57 

58 def __init__(self, *, config=None): 

59 super().__init__(config=config) 

60 

61 operationNameLookup = { 

62 "min": "Minimum", 

63 "max": "Maximum", 

64 "mean": "Mean", 

65 "weighted_mean": "Weighted mean", 

66 "sum": "Sum", 

67 } 

68 

69 # Making connections for the maps that are configured to run. 

70 for name in config.atools.fieldNames: 

71 propertyName, operationName = name.split("_map_") 

72 coaddName, propertyName = propertyName.split("Coadd_") 

73 propertyName = propertyName.replace("_", " ") 

74 operationLongName = operationNameLookup[operationName] 

75 setattr( 

76 self, 

77 name, 

78 connectionTypes.Input( 

79 doc=f"{operationLongName}-value map of {propertyName} for {coaddName} coadd", 

80 name=name, 

81 storageClass="HealSparseMap", 

82 dimensions=("skymap", "band", "tract"), 

83 multiple=False, 

84 deferLoad=True, 

85 ), 

86 ) 

87 

88 

89class PerTractPropertyMapAnalysisConfig( 

90 AnalysisBaseConfig, pipelineConnections=PerTractPropertyMapAnalysisConnections 

91): 

92 projectionKwargs = DictField( 

93 keytype=str, 

94 itemtype=float, 

95 doc="Keyword arguments to use in the GnomonicSkyproj call, e.g. n_grid_lon. " 

96 "The following keys are not permitted: 'ax', 'lon_0', 'lat_0', and 'extent'. " 

97 "See https://skyproj.readthedocs.io/en/latest/modules.html#skyproj.skyproj.GnomonicSkyproj", 

98 default={}, 

99 keyCheck=lambda k: k not in ["ax", "lon_0", "lat_0", "extent"], 

100 ) 

101 

102 zoomFactors = ListField( 

103 dtype=float, 

104 doc="Two-element list of zoom factors to use when plotting the maps.", 

105 default=[1.8, 5], 

106 ) 

107 

108 colorbarKwargs = DictField( 

109 keytype=str, 

110 itemtype=str, 

111 doc="Keyword arguments to pass to the colorbar except for 'orientation' and 'location'.", 

112 default={"cmap": "viridis"}, 

113 keyCheck=lambda k: k not in ["orientation", "location"], 

114 ) 

115 

116 publicationStyle = Field[bool](doc="Make a simplified figure layout for publication use?", default=False) 

117 

118 

119class PerTractPropertyMapAnalysisTask(AnalysisPipelineTask): 

120 ConfigClass = PerTractPropertyMapAnalysisConfig 

121 _DefaultName = "perTractPropertyMapAnalysisTask" 

122 

123 def parsePlotInfo( 

124 self, inputs: Mapping[str, Any], dataId: DataCoordinate | None, connectionNames: list[str] 

125 ) -> Mapping[str, Mapping[str, str] | str | int]: 

126 """Parse the inputs and dataId to get the information needed to add to 

127 the figure. 

128 

129 Parameters 

130 ---------- 

131 inputs: `dict` 

132 The inputs to the task 

133 dataId: `~lsst.daf.butler.DataCoordinate` 

134 The dataId that the task is being run on. 

135 connectionNames: `list` [`str`] 

136 Name of the input connections to use for determining table names. 

137 

138 Returns 

139 ------- 

140 plotInfo : `dict` 

141 A dictionary containing the information needed to add to the 

142 figure. 

143 

144 Notes 

145 ----- 

146 We customized this method to fit our needs, because our analyses are 

147 not 1-1 with datasettypes. We analyze multiple connections/datasettypes 

148 at once, thus the table names are not the same for all connections. 

149 """ 

150 

151 # Initialize the plot info dictionary. 

152 plotInfo = { 

153 # To be filled in later. 

154 "tableNames": {}, 

155 # They all share the same run, so just grab the first one. 

156 "run": inputs[connectionNames[0]].ref.run, 

157 } 

158 

159 # For each connection, separately store the table name. 

160 for connectionName in connectionNames: 

161 tableName = inputs[connectionName].ref.datasetType.name 

162 plotInfo["tableNames"][connectionName] = tableName 

163 

164 # Add the dataId information, same for all connections. 

165 self._populatePlotInfoWithDataId(plotInfo, dataId) 

166 

167 return plotInfo 

168 

169 def runQuantum( 

170 self, 

171 butlerQC: QuantumContext, 

172 inputRefs: InputQuantizedConnection, 

173 outputRefs: OutputQuantizedConnection, 

174 ) -> None: 

175 # Docstring inherited. 

176 

177 inputs = butlerQC.get(inputRefs) 

178 skymap = inputs["skymap"] 

179 dataId = butlerQC.quantum.dataId 

180 tractInfo = skymap[dataId["tract"]] 

181 mapKeys = [key for key in inputs if key != "skymap"] 

182 

183 plotInfo = self.parsePlotInfo(inputs, dataId, mapKeys) 

184 outputs = self.run(data=inputs, tractInfo=tractInfo, plotConfig=self.config, plotInfo=plotInfo) 

185 butlerQC.put(outputs, outputRefs) 

186 

187 

188class SurveyWidePropertyMapAnalysisConnections( 

189 AnalysisBaseConnections, 

190 dimensions=("skymap", "band"), 

191 defaultTemplates={"outputName": "propertyMapSurvey"}, 

192): 

193 def __init__(self, *, config=None): 

194 super().__init__(config=config) 

195 

196 operationNameLookup = { 

197 "min": "Minimum", 

198 "max": "Maximum", 

199 "mean": "Mean", 

200 "weighted_mean": "Weighted mean", 

201 "sum": "Sum", 

202 } 

203 

204 # Making connections for the maps that are configured to run. 

205 for name in config.atools.fieldNames: 

206 propertyName, operationName = name.split("_consolidated_map_") 

207 coaddName, propertyName = propertyName.split("Coadd_") 

208 propertyName = propertyName.replace("_", " ") 

209 operationLongName = operationNameLookup[operationName] 

210 setattr( 

211 self, 

212 name, 

213 connectionTypes.Input( 

214 doc=f"{operationLongName}-value consolidated map of {propertyName} for {coaddName} coadd", 

215 name=name, 

216 storageClass="HealSparseMap", 

217 dimensions=("skymap", "band"), 

218 multiple=False, 

219 deferLoad=True, 

220 ), 

221 ) 

222 

223 

224class SurveyWidePropertyMapAnalysisConfig( 

225 AnalysisBaseConfig, pipelineConnections=SurveyWidePropertyMapAnalysisConnections 

226): 

227 # Note: Gnomonic projection is excluded here because `GnomonicSkyproj` must 

228 # have the central lon/lat set (defaults to 0/0) which makes it useful for 

229 # plotting individual tracts but not for survey-wide maps. 

230 projection = ChoiceField[str]( 

231 doc="The projection to use for plotting the map. " 

232 "See https://skyproj.readthedocs.io/en/latest/projections.html", 

233 default="McBryde", 

234 allowed={ 

235 proj: proj 

236 for proj in ( 

237 "McBryde", 

238 "Mollweide", 

239 "Cylindrical", 

240 "Laea", 

241 "Hammer", 

242 "EqualEarth", 

243 "ObliqueMollweide", 

244 "Albers", 

245 ) 

246 }, 

247 ) 

248 

249 projectionKwargs = DictField( 

250 keytype=str, 

251 itemtype=float, 

252 doc="Keyword arguments to use in the projection call, e.g. lon_0. The key 'ax' is not permitted. " 

253 "See https://skyproj.readthedocs.io/en/latest/projections.html", 

254 default={}, 

255 keyCheck=lambda k: k not in ["ax"], 

256 ) 

257 

258 autozoom = Field[bool]( 

259 doc="Automatically zooms in on the RA/Dec range of the map to make better use of its resolution; " 

260 "otherwise, the map is displayed within the full-sky domain.", 

261 default=True, 

262 ) 

263 

264 colorbarKwargs = DictField( 

265 keytype=str, 

266 itemtype=str, 

267 doc="Keyword arguments to pass to the colorbar.", 

268 default={"orientation": "horizontal", "location": "top", "cmap": "viridis"}, 

269 ) 

270 # TODO: Mixed types will be allowed after DM-47937. You can then add things 

271 # like "aspect": 20, rather than just strings. 

272 

273 publicationStyle = Field[bool](doc="Make a simplified figure layout for publication use?", default=False) 

274 

275 

276class SurveyWidePropertyMapAnalysisTask(AnalysisPipelineTask): 

277 ConfigClass = SurveyWidePropertyMapAnalysisConfig 

278 _DefaultName = "surveyWidePropertyMapAnalysisTask" 

279 

280 def parsePlotInfo( 

281 self, inputs: Mapping[str, Any], dataId: DataCoordinate | None, connectionNames: list[str] 

282 ) -> Mapping[str, Mapping[str, str] | str | int]: 

283 """Parse the inputs and dataId to get the information needed to add to 

284 the figure. 

285 

286 Parameters 

287 ---------- 

288 inputs: `dict` 

289 The inputs to the task 

290 dataId: `~lsst.daf.butler.DataCoordinate` 

291 The dataId that the task is being run on. 

292 connectionNames: `list` [`str`] 

293 Name of the input connections to use for determining table names. 

294 

295 Returns 

296 ------- 

297 plotInfo : `dict` 

298 A dictionary containing the information needed to add to the 

299 figure. 

300 

301 Notes 

302 ----- 

303 We customized this method to fit our needs, because our analyses are 

304 not 1-1 with datasettypes. We analyze multiple connections/datasettypes 

305 at once, thus the table names are not the same for all connections. 

306 """ 

307 

308 # Initialize the plot info dictionary. 

309 plotInfo = { 

310 # To be filled in later. 

311 "tableNames": {}, 

312 # They all share the same run, so just grab the first one. 

313 "run": inputs[connectionNames[0]].ref.run, 

314 } 

315 

316 # For each connection, separately store the table name. 

317 for connectionName in connectionNames: 

318 tableName = inputs[connectionName].ref.datasetType.name 

319 plotInfo["tableNames"][connectionName] = tableName 

320 

321 # Add the dataId information, same for all connections. 

322 self._populatePlotInfoWithDataId(plotInfo, dataId) 

323 

324 return plotInfo 

325 

326 def runQuantum( 

327 self, 

328 butlerQC: QuantumContext, 

329 inputRefs: InputQuantizedConnection, 

330 outputRefs: OutputQuantizedConnection, 

331 ) -> None: 

332 # Docstring inherited. 

333 

334 inputs = butlerQC.get(inputRefs) 

335 dataId = butlerQC.quantum.dataId 

336 

337 plotInfo = self.parsePlotInfo(inputs, dataId, list(inputs.keys())) 

338 outputs = self.run(data=inputs, plotConfig=self.config, plotInfo=plotInfo) 

339 butlerQC.put(outputs, outputRefs)