Coverage for python / lsst / analysis / tools / tasks / propertyMapAnalysis.py: 41%

75 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-22 09:09 +0000

1# This file is part of analysis_tools. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21from __future__ import annotations 

22 

23__all__ = [ 

24 "SurveyWidePropertyMapAnalysisConfig", 

25 "SurveyWidePropertyMapAnalysisTask", 

26 "PerTractPropertyMapAnalysisConfig", 

27 "PerTractPropertyMapAnalysisTask", 

28] 

29 

30from typing import Any, Mapping, Union 

31 

32from lsst.daf.butler import DataCoordinate 

33from lsst.pex.config import ChoiceField, DictField, Field, ListField 

34from lsst.pipe.base import ( 

35 InputQuantizedConnection, 

36 OutputQuantizedConnection, 

37 QuantumContext, 

38 connectionTypes, 

39) 

40from lsst.skymap import BaseSkyMap 

41 

42from ..interfaces import AnalysisBaseConfig, AnalysisBaseConnections, AnalysisPipelineTask 

43 

44 

45class PerTractPropertyMapAnalysisConnections( 

46 AnalysisBaseConnections, 

47 dimensions=("skymap", "band", "tract"), 

48 defaultTemplates={"outputName": "propertyMapTract"}, 

49): 

50 skymap = connectionTypes.Input( 

51 doc="The skymap that covers the tract that the data is from.", 

52 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME, 

53 storageClass="SkyMap", 

54 dimensions=("skymap",), 

55 ) 

56 

57 def __init__(self, *, config=None): 

58 super().__init__(config=config) 

59 

60 operationNameLookup = { 

61 "min": "Minimum", 

62 "max": "Maximum", 

63 "mean": "Mean", 

64 "weighted_mean": "Weighted mean", 

65 "sum": "Sum", 

66 } 

67 

68 # Making connections for the maps that are configured to run. 

69 for name in config.atools.fieldNames: 

70 propertyName, operationName = name.split("_map_") 

71 coaddName, propertyName = propertyName.split("Coadd_") 

72 propertyName = propertyName.replace("_", " ") 

73 operationLongName = operationNameLookup[operationName] 

74 setattr( 

75 self, 

76 name, 

77 connectionTypes.Input( 

78 doc=f"{operationLongName}-value map of {propertyName} for {coaddName} coadd", 

79 name=name, 

80 storageClass="HealSparseMap", 

81 dimensions=("skymap", "band", "tract"), 

82 multiple=False, 

83 deferLoad=True, 

84 ), 

85 ) 

86 

87 

88class PerTractPropertyMapAnalysisConfig( 

89 AnalysisBaseConfig, pipelineConnections=PerTractPropertyMapAnalysisConnections 

90): 

91 projectionKwargs = DictField( 

92 keytype=str, 

93 itemtype=float, 

94 doc="Keyword arguments to use in the GnomonicSkyproj call, e.g. n_grid_lon. " 

95 "The following keys are not permitted: 'ax', 'lon_0', 'lat_0', and 'extent'. " 

96 "See https://skyproj.readthedocs.io/en/latest/modules.html#skyproj.skyproj.GnomonicSkyproj", 

97 default={}, 

98 keyCheck=lambda k: k not in ["ax", "lon_0", "lat_0", "extent"], 

99 ) 

100 

101 zoomFactors = ListField( 

102 dtype=float, 

103 doc="Two-element list of zoom factors to use when plotting the maps.", 

104 default=[1.8, 5], 

105 ) 

106 

107 colorbarKwargs = DictField( 

108 keytype=str, 

109 itemtype=str, 

110 doc="Keyword arguments to pass to the colorbar except for 'orientation' and 'location'.", 

111 default={"cmap": "viridis"}, 

112 keyCheck=lambda k: k not in ["orientation", "location"], 

113 ) 

114 

115 publicationStyle = Field[bool](doc="Make a simplified figure layout for publication use?", default=False) 

116 

117 

118class PerTractPropertyMapAnalysisTask(AnalysisPipelineTask): 

119 ConfigClass = PerTractPropertyMapAnalysisConfig 

120 _DefaultName = "perTractPropertyMapAnalysisTask" 

121 

122 def parsePlotInfo( 

123 self, inputs: Mapping[str, Any], dataId: DataCoordinate | None, connectionNames: list[str] 

124 ) -> Mapping[str, Union[Mapping[str, str], str, int]]: 

125 """Parse the inputs and dataId to get the information needed to add to 

126 the figure. 

127 

128 Parameters 

129 ---------- 

130 inputs: `dict` 

131 The inputs to the task 

132 dataId: `~lsst.daf.butler.DataCoordinate` 

133 The dataId that the task is being run on. 

134 connectionNames: `list` [`str`] 

135 Name of the input connections to use for determining table names. 

136 

137 Returns 

138 ------- 

139 plotInfo : `dict` 

140 A dictionary containing the information needed to add to the 

141 figure. 

142 

143 Notes 

144 ----- 

145 We customized this method to fit our needs, because our analyses are 

146 not 1-1 with datasettypes. We analyze multiple connections/datasettypes 

147 at once, thus the table names are not the same for all connections. 

148 """ 

149 

150 # Initialize the plot info dictionary. 

151 plotInfo = { 

152 # To be filled in later. 

153 "tableNames": {}, 

154 # They all share the same run, so just grab the first one. 

155 "run": inputs[connectionNames[0]].ref.run, 

156 } 

157 

158 # For each connection, separately store the table name. 

159 for connectionName in connectionNames: 

160 tableName = inputs[connectionName].ref.datasetType.name 

161 plotInfo["tableNames"][connectionName] = tableName 

162 

163 # Add the dataId information, same for all connections. 

164 self._populatePlotInfoWithDataId(plotInfo, dataId) 

165 

166 return plotInfo 

167 

168 def runQuantum( 

169 self, 

170 butlerQC: QuantumContext, 

171 inputRefs: InputQuantizedConnection, 

172 outputRefs: OutputQuantizedConnection, 

173 ) -> None: 

174 # Docstring inherited. 

175 

176 inputs = butlerQC.get(inputRefs) 

177 skymap = inputs["skymap"] 

178 dataId = butlerQC.quantum.dataId 

179 tractInfo = skymap[dataId["tract"]] 

180 mapKeys = [key for key in inputs if key != "skymap"] 

181 

182 plotInfo = self.parsePlotInfo(inputs, dataId, mapKeys) 

183 outputs = self.run(data=inputs, tractInfo=tractInfo, plotConfig=self.config, plotInfo=plotInfo) 

184 butlerQC.put(outputs, outputRefs) 

185 

186 

187class SurveyWidePropertyMapAnalysisConnections( 

188 AnalysisBaseConnections, 

189 dimensions=("skymap", "band"), 

190 defaultTemplates={"outputName": "propertyMapSurvey"}, 

191): 

192 def __init__(self, *, config=None): 

193 super().__init__(config=config) 

194 

195 operationNameLookup = { 

196 "min": "Minimum", 

197 "max": "Maximum", 

198 "mean": "Mean", 

199 "weighted_mean": "Weighted mean", 

200 "sum": "Sum", 

201 } 

202 

203 # Making connections for the maps that are configured to run. 

204 for name in config.atools.fieldNames: 

205 propertyName, operationName = name.split("_consolidated_map_") 

206 coaddName, propertyName = propertyName.split("Coadd_") 

207 propertyName = propertyName.replace("_", " ") 

208 operationLongName = operationNameLookup[operationName] 

209 setattr( 

210 self, 

211 name, 

212 connectionTypes.Input( 

213 doc=f"{operationLongName}-value consolidated map of {propertyName} for {coaddName} coadd", 

214 name=name, 

215 storageClass="HealSparseMap", 

216 dimensions=("skymap", "band"), 

217 multiple=False, 

218 deferLoad=True, 

219 ), 

220 ) 

221 

222 

223class SurveyWidePropertyMapAnalysisConfig( 

224 AnalysisBaseConfig, pipelineConnections=SurveyWidePropertyMapAnalysisConnections 

225): 

226 # Note: Gnomonic projection is excluded here because `GnomonicSkyproj` must 

227 # have the central lon/lat set (defaults to 0/0) which makes it useful for 

228 # plotting individual tracts but not for survey-wide maps. 

229 projection = ChoiceField[str]( 

230 doc="The projection to use for plotting the map. " 

231 "See https://skyproj.readthedocs.io/en/latest/projections.html", 

232 default="McBryde", 

233 allowed={ 

234 proj: proj 

235 for proj in ( 

236 "McBryde", 

237 "Mollweide", 

238 "Cylindrical", 

239 "Laea", 

240 "Hammer", 

241 "EqualEarth", 

242 "ObliqueMollweide", 

243 "Albers", 

244 ) 

245 }, 

246 ) 

247 

248 projectionKwargs = DictField( 

249 keytype=str, 

250 itemtype=float, 

251 doc="Keyword arguments to use in the projection call, e.g. lon_0. The key 'ax' is not permitted. " 

252 "See https://skyproj.readthedocs.io/en/latest/projections.html", 

253 default={}, 

254 keyCheck=lambda k: k not in ["ax"], 

255 ) 

256 

257 autozoom = Field[bool]( 

258 doc="Automatically zooms in on the RA/Dec range of the map to make better use of its resolution; " 

259 "otherwise, the map is displayed within the full-sky domain.", 

260 default=True, 

261 ) 

262 

263 colorbarKwargs = DictField( 

264 keytype=str, 

265 itemtype=str, 

266 doc="Keyword arguments to pass to the colorbar.", 

267 default={"orientation": "horizontal", "location": "top", "cmap": "viridis"}, 

268 ) 

269 # TODO: Mixed types will be allowed after DM-47937. You can then add things 

270 # like "aspect": 20, rather than just strings. 

271 

272 publicationStyle = Field[bool](doc="Make a simplified figure layout for publication use?", default=False) 

273 

274 

275class SurveyWidePropertyMapAnalysisTask(AnalysisPipelineTask): 

276 ConfigClass = SurveyWidePropertyMapAnalysisConfig 

277 _DefaultName = "surveyWidePropertyMapAnalysisTask" 

278 

279 def parsePlotInfo( 

280 self, inputs: Mapping[str, Any], dataId: DataCoordinate | None, connectionNames: list[str] 

281 ) -> Mapping[str, Union[Mapping[str, str], str, int]]: 

282 """Parse the inputs and dataId to get the information needed to add to 

283 the figure. 

284 

285 Parameters 

286 ---------- 

287 inputs: `dict` 

288 The inputs to the task 

289 dataId: `~lsst.daf.butler.DataCoordinate` 

290 The dataId that the task is being run on. 

291 connectionNames: `list` [`str`] 

292 Name of the input connections to use for determining table names. 

293 

294 Returns 

295 ------- 

296 plotInfo : `dict` 

297 A dictionary containing the information needed to add to the 

298 figure. 

299 

300 Notes 

301 ----- 

302 We customized this method to fit our needs, because our analyses are 

303 not 1-1 with datasettypes. We analyze multiple connections/datasettypes 

304 at once, thus the table names are not the same for all connections. 

305 """ 

306 

307 # Initialize the plot info dictionary. 

308 plotInfo = { 

309 # To be filled in later. 

310 "tableNames": {}, 

311 # They all share the same run, so just grab the first one. 

312 "run": inputs[connectionNames[0]].ref.run, 

313 } 

314 

315 # For each connection, separately store the table name. 

316 for connectionName in connectionNames: 

317 tableName = inputs[connectionName].ref.datasetType.name 

318 plotInfo["tableNames"][connectionName] = tableName 

319 

320 # Add the dataId information, same for all connections. 

321 self._populatePlotInfoWithDataId(plotInfo, dataId) 

322 

323 return plotInfo 

324 

325 def runQuantum( 

326 self, 

327 butlerQC: QuantumContext, 

328 inputRefs: InputQuantizedConnection, 

329 outputRefs: OutputQuantizedConnection, 

330 ) -> None: 

331 # Docstring inherited. 

332 

333 inputs = butlerQC.get(inputRefs) 

334 dataId = butlerQC.quantum.dataId 

335 

336 plotInfo = self.parsePlotInfo(inputs, dataId, list(inputs.keys())) 

337 outputs = self.run(data=inputs, plotConfig=self.config, plotInfo=plotInfo) 

338 butlerQC.put(outputs, outputRefs)