Coverage for python / lsst / analysis / tools / tasks / catalogMatch.py: 24%

142 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-15 00:23 +0000

1# This file is part of analysis_tools. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22__all__ = ("CatalogMatchConfig", "CatalogMatchTask") 

23 

24 

25import lsst.geom 

26import lsst.pex.config as pexConfig 

27import lsst.pipe.base as pipeBase 

28import numpy as np 

29from astropy.table import Table, hstack, vstack 

30from astropy.time import Time 

31from lsst.pex.config.configurableActions import ConfigurableActionStructField 

32from lsst.pipe.tasks.loadReferenceCatalog import LoadReferenceCatalogTask 

33from lsst.skymap import BaseSkyMap 

34from smatch import Matcher 

35 

36from ..actions.vector import ( 

37 CoaddPlotFlagSelector, 

38 GalaxySelector, 

39 MatchingFlagSelector, 

40 SnSelector, 

41 StarSelector, 

42) 

43from ..interfaces import VectorAction 

44 

45 

46class CatalogMatchConnections( 

47 pipeBase.PipelineTaskConnections, 

48 dimensions=("tract", "skymap"), 

49 defaultTemplates={"targetCatalog": "objectTable_tract", "refCatalog": "ps1_pv3_3pi_20170110"}, 

50): 

51 catalog = pipeBase.connectionTypes.Input( 

52 doc="The tract-wide catalog to make plots from.", 

53 storageClass="ArrowAstropy", 

54 name="{targetCatalog}", 

55 dimensions=("tract", "skymap"), 

56 deferLoad=True, 

57 ) 

58 

59 refCat = pipeBase.connectionTypes.PrerequisiteInput( 

60 doc="The reference catalog to match to loaded input catalog sources.", 

61 name="{refCatalog}", 

62 storageClass="SimpleCatalog", 

63 dimensions=("skypix",), 

64 deferLoad=True, 

65 multiple=True, 

66 ) 

67 

68 skymap = pipeBase.connectionTypes.Input( 

69 doc="The skymap for the tract", 

70 storageClass="SkyMap", 

71 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME, 

72 dimensions=("skymap",), 

73 ) 

74 

75 matchedCatalog = pipeBase.connectionTypes.Output( 

76 doc="Catalog with matched target and reference objects with separations", 

77 name="{targetCatalog}_{refCatalog}_match", 

78 storageClass="ArrowAstropy", 

79 dimensions=("tract", "skymap"), 

80 ) 

81 

82 

83class CatalogMatchConfig(pipeBase.PipelineTaskConfig, pipelineConnections=CatalogMatchConnections): 

84 referenceCatalogLoader = pexConfig.ConfigurableField( 

85 target=LoadReferenceCatalogTask, 

86 doc="Reference catalog loader", 

87 ) 

88 

89 epoch = pexConfig.Field[float]( 

90 doc="Epoch to which reference objects are shifted.", 

91 default=2015.0, 

92 ) 

93 

94 filterNames = pexConfig.ListField[str]( 

95 doc="Physical filter names to persist downstream.", 

96 default=["u", "g", "r", "i", "z", "y"], 

97 ) 

98 

99 selectorBands = pexConfig.ListField[str]( 

100 doc="Band to use when selecting objects, primarily for extendedness.", 

101 default=["i"], 

102 ) 

103 

104 selectorActions = ConfigurableActionStructField[VectorAction]( 

105 doc="Which selectors to use to narrow down the data for QA plotting.", 

106 default={"flagSelector": MatchingFlagSelector()}, 

107 ) 

108 

109 sourceSelectorActions = ConfigurableActionStructField[VectorAction]( 

110 doc="What types of sources to use.", 

111 default={}, 

112 ) 

113 

114 extraColumnSelectors = ConfigurableActionStructField[VectorAction]( 

115 doc="Other selectors that are not used in this task, but whose columns" "may be needed downstream", 

116 default={ 

117 "selector1": SnSelector(), 

118 "selector2": StarSelector(), 

119 "selector3": GalaxySelector(), 

120 "selector4": CoaddPlotFlagSelector(), 

121 }, 

122 ) 

123 

124 idColumn = pexConfig.Field[str]( 

125 doc="Unique ID column for the target catalog.", 

126 default="objectId", 

127 ) 

128 

129 extraColumns = pexConfig.ListField[str]( 

130 doc="Other catalog columns to persist to downstream tasks", 

131 default=["patch", "ebv"], 

132 ) 

133 

134 extraPerBandColumns = pexConfig.ListField[str]( 

135 doc="Other columns to load that should be loaded for each band individually.", 

136 default=["cModelFlux"], 

137 ) 

138 

139 matchRadius = pexConfig.Field[float]( 

140 doc="The radius to use for matching, in arcsecs.", 

141 default=1.0, 

142 ) 

143 

144 targetRaColumn = pexConfig.Field[str]( 

145 doc="RA column name for the target (being matched) catalog.", 

146 default="coord_ra", 

147 ) 

148 

149 targetDecColumn = pexConfig.Field[str]( 

150 doc="Dec column name for the target (being matched) catalog.", 

151 default="coord_dec", 

152 ) 

153 

154 refRaColumn = pexConfig.Field[str]( 

155 doc="RA column name for the reference (being matched to) catalog.", 

156 default="ra", 

157 ) 

158 

159 refDecColumn = pexConfig.Field[str]( 

160 doc="Dec column name for the reference (being matched to) catalog.", 

161 default="dec", 

162 ) 

163 

164 raColumn = pexConfig.Field[str]( 

165 doc="RA column.", 

166 default="coord_ra", 

167 deprecated="This field was replaced with targetRaColumn and is unused. Will be removed after v27.", 

168 ) 

169 

170 decColumn = pexConfig.Field[str]( 

171 doc="Dec column.", 

172 default="coord_dec", 

173 deprecated="This field was replaced with targetDecColumn and is unused. Will be removed after v27.", 

174 ) 

175 

176 patchColumn = pexConfig.Field[str](doc="Patch column.", default="patch") 

177 

178 matchesRefCat = pexConfig.Field[bool]( 

179 doc="Is the catalog being matched to stored as a reference catalog?", 

180 default=False, 

181 ) 

182 

183 returnNonMatches = pexConfig.Field[bool]( 

184 doc="Return the rows of the reference catalog that didn't get matched?", 

185 default=False, 

186 ) 

187 

188 def setDefaults(self): 

189 super().setDefaults() 

190 self.referenceCatalogLoader.doReferenceSelection = False 

191 self.referenceCatalogLoader.doApplyColorTerms = False 

192 

193 

194class CatalogMatchTask(pipeBase.PipelineTask): 

195 """The base task for matching catalogs. Figures out which columns 

196 it needs to grab for the downstream tasks and then matches the 

197 two tables together and returns the matched and joined table 

198 including the extra columns. 

199 """ 

200 

201 ConfigClass = CatalogMatchConfig 

202 _DefaultName = "analysisToolsCatalogMatch" 

203 

204 def runQuantum(self, butlerQC, inputRefs, outputRefs): 

205 """Implemented in the inherited tasks""" 

206 pass 

207 

208 def run(self, *, targetCatalog, refCatalog, bands): 

209 """Takes the two catalogs and returns the matched one. 

210 

211 Parameters 

212 ---------- 

213 `targetCatalog` : astropy.table.Table 

214 The catalog to be matched 

215 `refCatalog` : astropy.table.Table 

216 The catalog to be matched to 

217 `bands` : list 

218 A list of bands to apply the selectors in 

219 

220 Returns 

221 ------- 

222 `matchedCatalog` : astropy.table.Table 

223 

224 Notes 

225 ----- 

226 Performs an RA/Dec match that returns the closest match 

227 within the match radius which defaults to 1.0 arcsecond. 

228 Applies the suffix, _target, to the catalog being matched 

229 and _ref to the reference catalog being matched to. 

230 """ 

231 # Apply the selectors to the catalog 

232 mask = np.ones(len(targetCatalog), dtype=bool) 

233 for selector in self.config.sourceSelectorActions: 

234 for band in self.config.selectorBands: 

235 mask &= selector(targetCatalog, band=band).astype(bool) 

236 

237 targetCatalog = targetCatalog[mask] 

238 

239 if (len(targetCatalog) == 0) or (len(refCatalog)) == 0: 

240 refMatchIndices = np.array([], dtype=np.int64) 

241 targetMatchIndices = np.array([], dtype=np.int64) 

242 dists = np.array([], dtype=np.float64) 

243 else: 

244 # Run the matcher. 

245 

246 # This all assumes that everything is in degrees. 

247 # Which I think is okay, but the current task 

248 # allows different units. Need to configure match 

249 # radius, either in this task or a subtask. 

250 

251 targetRas = np.asarray(targetCatalog[self.config.targetRaColumn]) 

252 targetDecs = np.asarray(targetCatalog[self.config.targetDecColumn]) 

253 targetMask = np.isfinite(targetRas) & np.isfinite(targetDecs) 

254 

255 # Get rid of entries in the refCat with non-finite RA/Dec values. 

256 refRas = np.asarray(refCatalog[self.config.refRaColumn]) 

257 refDecs = np.asarray(refCatalog[self.config.refDecColumn]) 

258 refMask = np.isfinite(refRas) & np.isfinite(refDecs) 

259 

260 refCatalog = refCatalog[refMask] 

261 with Matcher(refRas[refMask], refDecs[refMask]) as m: 

262 idx, refMatchIndices, targetMatchIndices, dists = m.query_radius( 

263 targetRas[targetMask], 

264 targetDecs[targetMask], 

265 self.config.matchRadius / 3600.0, 

266 return_indices=True, 

267 ) 

268 

269 # Convert degrees to arcseconds. 

270 dists *= 3600.0 

271 

272 targetCatalogMatched = targetCatalog[targetMatchIndices] 

273 refCatalogMatched = refCatalog[refMatchIndices] 

274 

275 targetCols = targetCatalogMatched.columns.copy() 

276 for col in targetCols: 

277 targetCatalogMatched.rename_column(col, col + "_target") 

278 refCols = refCatalogMatched.columns.copy() 

279 for col in refCols: 

280 refCatalogMatched.rename_column(col, col + "_ref") 

281 

282 if self.config.returnNonMatches: 

283 unmatchedIndices = list(set(np.arange(0, len(refCatalog))) - set(refMatchIndices)) 

284 refCatalogNotMatched = refCatalog[unmatchedIndices] 

285 # We need to set the relevant flag columns to 

286 # true or false so that they make it through the 

287 # selectors even though the none matched sources 

288 # don't have values for those columns. 

289 trueFlagCols = [] 

290 falseFlagCols = [] 

291 for selectorAction in [self.config.selectorActions, self.config.extraColumnSelectors]: 

292 for selector in selectorAction: 

293 try: 

294 for flag in selector.selectWhenTrue: 

295 trueFlagCols.append(flag) 

296 for flag in selector.selectWhenFalse: 

297 falseFlagCols.append(flag) 

298 except AttributeError: 

299 continue 

300 for col in refCols: 

301 refCatalogNotMatched.rename_column(col, col + "_ref") 

302 for col in targetCols: 

303 refCatalogNotMatched[col] = [np.nan] * len(refCatalogNotMatched) 

304 for col in trueFlagCols: 

305 refCatalogNotMatched[col] = [True] * len(refCatalogNotMatched) 

306 for col in falseFlagCols: 

307 refCatalogNotMatched[col] = [False] * len(refCatalogNotMatched) 

308 

309 if self.config.matchesRefCat: 

310 for i, band in enumerate(bands): 

311 refCatalogMatched[band + "_mag_ref"] = refCatalogMatched["refMag_ref"][:, i] 

312 refCatalogMatched[band + "_magErr_ref"] = refCatalogMatched["refMagErr_ref"][:, i] 

313 refCatalogMatched.remove_column("refMag_ref") 

314 refCatalogMatched.remove_column("refMagErr_ref") 

315 

316 if self.config.returnNonMatches: 

317 for i, band in enumerate(bands): 

318 refCatalogNotMatched[band + "_mag_ref"] = refCatalogNotMatched["refMag_ref"][:, i] 

319 refCatalogNotMatched[band + "_magErr_ref"] = refCatalogNotMatched["refMagErr_ref"][:, i] 

320 refCatalogNotMatched.remove_column("refMag_ref") 

321 refCatalogNotMatched.remove_column("refMagErr_ref") 

322 

323 tMatched = hstack([targetCatalogMatched, refCatalogMatched], join_type="exact") 

324 tMatched["matchDistance"] = dists 

325 

326 if self.config.returnNonMatches: 

327 refCatalogNotMatched["matchDistance"] = [np.nan] * len(refCatalogNotMatched) 

328 tMatched = vstack([tMatched, refCatalogNotMatched]) 

329 

330 return pipeBase.Struct(matchedCatalog=tMatched) 

331 

332 def prepColumns(self, bands): 

333 """Get all the columns needed for downstream tasks. 

334 Both those from the selectors and those specified in the 

335 config options. 

336 """ 

337 

338 bandColumns = [] 

339 for band in bands: 

340 for col in self.config.extraPerBandColumns: 

341 bandColumns.append(band + "_" + col) 

342 

343 columns = ( 

344 [ 

345 self.config.idColumn, 

346 self.config.targetRaColumn, 

347 self.config.targetDecColumn, 

348 ] 

349 + self.config.extraColumns.list() 

350 + bandColumns 

351 ) 

352 

353 if self.config.patchColumn != "": 

354 columns.append(self.config.patchColumn) 

355 

356 selectorBands = list(set(list(bands) + self.config.selectorBands.list())) 

357 for selectorAction in [ 

358 self.config.selectorActions, 

359 self.config.sourceSelectorActions, 

360 self.config.extraColumnSelectors, 

361 ]: 

362 for selector in selectorAction: 

363 for band in selectorBands: 

364 selectorSchema = selector.getFormattedInputSchema(band=band) 

365 columns += [s[0] for s in selectorSchema] 

366 

367 return columns 

368 

369 def _loadRefCat(self, loaderTask, tractInfo, epoch=None): 

370 """Load the reference catalog that covers the 

371 catalog that is to be matched to. 

372 

373 Parameters 

374 ---------- 

375 `loaderTask` : 

376 lsst.pipe.tasks.loadReferenceCatalog.loadReferenceCatalogTask 

377 `tractInfo` : lsst.skymap.tractInfo.ExplicitTractInfo 

378 The tract information to get the sky location from 

379 `epoch` : `astropy.time.Time` 

380 The epoch to which the reference catalog will be shifted. 

381 

382 Returns 

383 ------- 

384 `loadedRefCat` : astropy.table.Table 

385 The reference catalog that covers the input catalog. 

386 """ 

387 boundingCircle = tractInfo.getOuterSkyPolygon().getBoundingCircle() 

388 center = lsst.geom.SpherePoint(boundingCircle.getCenter()) 

389 radius = boundingCircle.getOpeningAngle() 

390 

391 if epoch is None: 

392 epoch = Time(self.config.epoch, format="decimalyear") 

393 

394 # This is always going to return degrees. 

395 try: 

396 loadedRefCat = loaderTask.getSkyCircleCatalog( 

397 center, radius, self.config.filterNames, epoch=epoch 

398 ) 

399 except RuntimeError as e: 

400 raise pipeBase.NoWorkFound(e) 

401 

402 return Table(loadedRefCat)