Coverage for python / lsst / analysis / tools / tasks / catalogMatch.py: 24%
142 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 09:09 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 09:09 +0000
1# This file is part of analysis_tools.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22__all__ = ("CatalogMatchConfig", "CatalogMatchTask")
25import lsst.geom
26import lsst.pex.config as pexConfig
27import lsst.pipe.base as pipeBase
28import numpy as np
29from astropy.table import Table, hstack, vstack
30from astropy.time import Time
31from lsst.pex.config.configurableActions import ConfigurableActionStructField
32from lsst.pipe.tasks.loadReferenceCatalog import LoadReferenceCatalogTask
33from lsst.skymap import BaseSkyMap
34from smatch import Matcher
36from ..actions.vector import (
37 CoaddPlotFlagSelector,
38 GalaxySelector,
39 MatchingFlagSelector,
40 SnSelector,
41 StarSelector,
42)
43from ..interfaces import VectorAction
46class CatalogMatchConnections(
47 pipeBase.PipelineTaskConnections,
48 dimensions=("tract", "skymap"),
49 defaultTemplates={"targetCatalog": "objectTable_tract", "refCatalog": "ps1_pv3_3pi_20170110"},
50):
51 catalog = pipeBase.connectionTypes.Input(
52 doc="The tract-wide catalog to make plots from.",
53 storageClass="ArrowAstropy",
54 name="{targetCatalog}",
55 dimensions=("tract", "skymap"),
56 deferLoad=True,
57 )
59 refCat = pipeBase.connectionTypes.PrerequisiteInput(
60 doc="The reference catalog to match to loaded input catalog sources.",
61 name="{refCatalog}",
62 storageClass="SimpleCatalog",
63 dimensions=("skypix",),
64 deferLoad=True,
65 multiple=True,
66 )
68 skymap = pipeBase.connectionTypes.Input(
69 doc="The skymap for the tract",
70 storageClass="SkyMap",
71 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
72 dimensions=("skymap",),
73 )
75 matchedCatalog = pipeBase.connectionTypes.Output(
76 doc="Catalog with matched target and reference objects with separations",
77 name="{targetCatalog}_{refCatalog}_match",
78 storageClass="ArrowAstropy",
79 dimensions=("tract", "skymap"),
80 )
83class CatalogMatchConfig(pipeBase.PipelineTaskConfig, pipelineConnections=CatalogMatchConnections):
84 referenceCatalogLoader = pexConfig.ConfigurableField(
85 target=LoadReferenceCatalogTask,
86 doc="Reference catalog loader",
87 )
89 epoch = pexConfig.Field[float](
90 doc="Epoch to which reference objects are shifted.",
91 default=2015.0,
92 )
94 filterNames = pexConfig.ListField[str](
95 doc="Physical filter names to persist downstream.",
96 default=["u", "g", "r", "i", "z", "y"],
97 )
99 selectorBands = pexConfig.ListField[str](
100 doc="Band to use when selecting objects, primarily for extendedness.",
101 default=["i"],
102 )
104 selectorActions = ConfigurableActionStructField[VectorAction](
105 doc="Which selectors to use to narrow down the data for QA plotting.",
106 default={"flagSelector": MatchingFlagSelector()},
107 )
109 sourceSelectorActions = ConfigurableActionStructField[VectorAction](
110 doc="What types of sources to use.",
111 default={},
112 )
114 extraColumnSelectors = ConfigurableActionStructField[VectorAction](
115 doc="Other selectors that are not used in this task, but whose columns" "may be needed downstream",
116 default={
117 "selector1": SnSelector(),
118 "selector2": StarSelector(),
119 "selector3": GalaxySelector(),
120 "selector4": CoaddPlotFlagSelector(),
121 },
122 )
124 idColumn = pexConfig.Field[str](
125 doc="Unique ID column for the target catalog.",
126 default="objectId",
127 )
129 extraColumns = pexConfig.ListField[str](
130 doc="Other catalog columns to persist to downstream tasks",
131 default=["patch", "ebv"],
132 )
134 extraPerBandColumns = pexConfig.ListField[str](
135 doc="Other columns to load that should be loaded for each band individually.",
136 default=["cModelFlux"],
137 )
139 matchRadius = pexConfig.Field[float](
140 doc="The radius to use for matching, in arcsecs.",
141 default=1.0,
142 )
144 targetRaColumn = pexConfig.Field[str](
145 doc="RA column name for the target (being matched) catalog.",
146 default="coord_ra",
147 )
149 targetDecColumn = pexConfig.Field[str](
150 doc="Dec column name for the target (being matched) catalog.",
151 default="coord_dec",
152 )
154 refRaColumn = pexConfig.Field[str](
155 doc="RA column name for the reference (being matched to) catalog.",
156 default="ra",
157 )
159 refDecColumn = pexConfig.Field[str](
160 doc="Dec column name for the reference (being matched to) catalog.",
161 default="dec",
162 )
164 raColumn = pexConfig.Field[str](
165 doc="RA column.",
166 default="coord_ra",
167 deprecated="This field was replaced with targetRaColumn and is unused. Will be removed after v27.",
168 )
170 decColumn = pexConfig.Field[str](
171 doc="Dec column.",
172 default="coord_dec",
173 deprecated="This field was replaced with targetDecColumn and is unused. Will be removed after v27.",
174 )
176 patchColumn = pexConfig.Field[str](doc="Patch column.", default="patch")
178 matchesRefCat = pexConfig.Field[bool](
179 doc="Is the catalog being matched to stored as a reference catalog?",
180 default=False,
181 )
183 returnNonMatches = pexConfig.Field[bool](
184 doc="Return the rows of the reference catalog that didn't get matched?",
185 default=False,
186 )
188 def setDefaults(self):
189 super().setDefaults()
190 self.referenceCatalogLoader.doReferenceSelection = False
191 self.referenceCatalogLoader.doApplyColorTerms = False
194class CatalogMatchTask(pipeBase.PipelineTask):
195 """The base task for matching catalogs. Figures out which columns
196 it needs to grab for the downstream tasks and then matches the
197 two tables together and returns the matched and joined table
198 including the extra columns.
199 """
201 ConfigClass = CatalogMatchConfig
202 _DefaultName = "analysisToolsCatalogMatch"
204 def runQuantum(self, butlerQC, inputRefs, outputRefs):
205 """Implemented in the inherited tasks"""
206 pass
208 def run(self, *, targetCatalog, refCatalog, bands):
209 """Takes the two catalogs and returns the matched one.
211 Parameters
212 ----------
213 `targetCatalog` : astropy.table.Table
214 The catalog to be matched
215 `refCatalog` : astropy.table.Table
216 The catalog to be matched to
217 `bands` : list
218 A list of bands to apply the selectors in
220 Returns
221 -------
222 `matchedCatalog` : astropy.table.Table
224 Notes
225 -----
226 Performs an RA/Dec match that returns the closest match
227 within the match radius which defaults to 1.0 arcsecond.
228 Applies the suffix, _target, to the catalog being matched
229 and _ref to the reference catalog being matched to.
230 """
231 # Apply the selectors to the catalog
232 mask = np.ones(len(targetCatalog), dtype=bool)
233 for selector in self.config.sourceSelectorActions:
234 for band in self.config.selectorBands:
235 mask &= selector(targetCatalog, band=band).astype(bool)
237 targetCatalog = targetCatalog[mask]
239 if (len(targetCatalog) == 0) or (len(refCatalog)) == 0:
240 refMatchIndices = np.array([], dtype=np.int64)
241 targetMatchIndices = np.array([], dtype=np.int64)
242 dists = np.array([], dtype=np.float64)
243 else:
244 # Run the matcher.
246 # This all assumes that everything is in degrees.
247 # Which I think is okay, but the current task
248 # allows different units. Need to configure match
249 # radius, either in this task or a subtask.
251 targetRas = np.asarray(targetCatalog[self.config.targetRaColumn])
252 targetDecs = np.asarray(targetCatalog[self.config.targetDecColumn])
253 targetMask = np.isfinite(targetRas) & np.isfinite(targetDecs)
255 # Get rid of entries in the refCat with non-finite RA/Dec values.
256 refRas = np.asarray(refCatalog[self.config.refRaColumn])
257 refDecs = np.asarray(refCatalog[self.config.refDecColumn])
258 refMask = np.isfinite(refRas) & np.isfinite(refDecs)
260 refCatalog = refCatalog[refMask]
261 with Matcher(refRas[refMask], refDecs[refMask]) as m:
262 idx, refMatchIndices, targetMatchIndices, dists = m.query_radius(
263 targetRas[targetMask],
264 targetDecs[targetMask],
265 self.config.matchRadius / 3600.0,
266 return_indices=True,
267 )
269 # Convert degrees to arcseconds.
270 dists *= 3600.0
272 targetCatalogMatched = targetCatalog[targetMatchIndices]
273 refCatalogMatched = refCatalog[refMatchIndices]
275 targetCols = targetCatalogMatched.columns.copy()
276 for col in targetCols:
277 targetCatalogMatched.rename_column(col, col + "_target")
278 refCols = refCatalogMatched.columns.copy()
279 for col in refCols:
280 refCatalogMatched.rename_column(col, col + "_ref")
282 if self.config.returnNonMatches:
283 unmatchedIndices = list(set(np.arange(0, len(refCatalog))) - set(refMatchIndices))
284 refCatalogNotMatched = refCatalog[unmatchedIndices]
285 # We need to set the relevant flag columns to
286 # true or false so that they make it through the
287 # selectors even though the none matched sources
288 # don't have values for those columns.
289 trueFlagCols = []
290 falseFlagCols = []
291 for selectorAction in [self.config.selectorActions, self.config.extraColumnSelectors]:
292 for selector in selectorAction:
293 try:
294 for flag in selector.selectWhenTrue:
295 trueFlagCols.append(flag)
296 for flag in selector.selectWhenFalse:
297 falseFlagCols.append(flag)
298 except AttributeError:
299 continue
300 for col in refCols:
301 refCatalogNotMatched.rename_column(col, col + "_ref")
302 for col in targetCols:
303 refCatalogNotMatched[col] = [np.nan] * len(refCatalogNotMatched)
304 for col in trueFlagCols:
305 refCatalogNotMatched[col] = [True] * len(refCatalogNotMatched)
306 for col in falseFlagCols:
307 refCatalogNotMatched[col] = [False] * len(refCatalogNotMatched)
309 if self.config.matchesRefCat:
310 for i, band in enumerate(bands):
311 refCatalogMatched[band + "_mag_ref"] = refCatalogMatched["refMag_ref"][:, i]
312 refCatalogMatched[band + "_magErr_ref"] = refCatalogMatched["refMagErr_ref"][:, i]
313 refCatalogMatched.remove_column("refMag_ref")
314 refCatalogMatched.remove_column("refMagErr_ref")
316 if self.config.returnNonMatches:
317 for i, band in enumerate(bands):
318 refCatalogNotMatched[band + "_mag_ref"] = refCatalogNotMatched["refMag_ref"][:, i]
319 refCatalogNotMatched[band + "_magErr_ref"] = refCatalogNotMatched["refMagErr_ref"][:, i]
320 refCatalogNotMatched.remove_column("refMag_ref")
321 refCatalogNotMatched.remove_column("refMagErr_ref")
323 tMatched = hstack([targetCatalogMatched, refCatalogMatched], join_type="exact")
324 tMatched["matchDistance"] = dists
326 if self.config.returnNonMatches:
327 refCatalogNotMatched["matchDistance"] = [np.nan] * len(refCatalogNotMatched)
328 tMatched = vstack([tMatched, refCatalogNotMatched])
330 return pipeBase.Struct(matchedCatalog=tMatched)
332 def prepColumns(self, bands):
333 """Get all the columns needed for downstream tasks.
334 Both those from the selectors and those specified in the
335 config options.
336 """
338 bandColumns = []
339 for band in bands:
340 for col in self.config.extraPerBandColumns:
341 bandColumns.append(band + "_" + col)
343 columns = (
344 [
345 self.config.idColumn,
346 self.config.targetRaColumn,
347 self.config.targetDecColumn,
348 ]
349 + self.config.extraColumns.list()
350 + bandColumns
351 )
353 if self.config.patchColumn != "":
354 columns.append(self.config.patchColumn)
356 selectorBands = list(set(list(bands) + self.config.selectorBands.list()))
357 for selectorAction in [
358 self.config.selectorActions,
359 self.config.sourceSelectorActions,
360 self.config.extraColumnSelectors,
361 ]:
362 for selector in selectorAction:
363 for band in selectorBands:
364 selectorSchema = selector.getFormattedInputSchema(band=band)
365 columns += [s[0] for s in selectorSchema]
367 return columns
369 def _loadRefCat(self, loaderTask, tractInfo, epoch=None):
370 """Load the reference catalog that covers the
371 catalog that is to be matched to.
373 Parameters
374 ----------
375 `loaderTask` :
376 lsst.pipe.tasks.loadReferenceCatalog.loadReferenceCatalogTask
377 `tractInfo` : lsst.skymap.tractInfo.ExplicitTractInfo
378 The tract information to get the sky location from
379 `epoch` : `astropy.time.Time`
380 The epoch to which the reference catalog will be shifted.
382 Returns
383 -------
384 `loadedRefCat` : astropy.table.Table
385 The reference catalog that covers the input catalog.
386 """
387 boundingCircle = tractInfo.getOuterSkyPolygon().getBoundingCircle()
388 center = lsst.geom.SpherePoint(boundingCircle.getCenter())
389 radius = boundingCircle.getOpeningAngle()
391 if epoch is None:
392 epoch = Time(self.config.epoch, format="decimalyear")
394 # This is always going to return degrees.
395 try:
396 loadedRefCat = loaderTask.getSkyCircleCatalog(
397 center, radius, self.config.filterNames, epoch=epoch
398 )
399 except RuntimeError as e:
400 raise pipeBase.NoWorkFound(e)
402 return Table(loadedRefCat)