Coverage for python / lsst / analysis / tools / tasks / catalogMatch.py: 24%
142 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 09:36 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 09:36 +0000
1# This file is part of analysis_tools.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22__all__ = ("CatalogMatchConfig", "CatalogMatchTask")
25import numpy as np
26from astropy.table import Table, hstack, vstack
27from astropy.time import Time
28from smatch import Matcher
30import lsst.geom
31import lsst.pex.config as pexConfig
32import lsst.pipe.base as pipeBase
33from lsst.pex.config.configurableActions import ConfigurableActionStructField
34from lsst.pipe.tasks.loadReferenceCatalog import LoadReferenceCatalogTask
35from lsst.skymap import BaseSkyMap
37from ..actions.vector import (
38 CoaddPlotFlagSelector,
39 GalaxySelector,
40 MatchingFlagSelector,
41 SnSelector,
42 StarSelector,
43)
44from ..interfaces import VectorAction
47class CatalogMatchConnections(
48 pipeBase.PipelineTaskConnections,
49 dimensions=("tract", "skymap"),
50 defaultTemplates={"targetCatalog": "objectTable_tract", "refCatalog": "ps1_pv3_3pi_20170110"},
51):
52 catalog = pipeBase.connectionTypes.Input(
53 doc="The tract-wide catalog to make plots from.",
54 storageClass="ArrowAstropy",
55 name="{targetCatalog}",
56 dimensions=("tract", "skymap"),
57 deferLoad=True,
58 )
60 refCat = pipeBase.connectionTypes.PrerequisiteInput(
61 doc="The reference catalog to match to loaded input catalog sources.",
62 name="{refCatalog}",
63 storageClass="SimpleCatalog",
64 dimensions=("skypix",),
65 deferLoad=True,
66 multiple=True,
67 )
69 skymap = pipeBase.connectionTypes.Input(
70 doc="The skymap for the tract",
71 storageClass="SkyMap",
72 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
73 dimensions=("skymap",),
74 )
76 matchedCatalog = pipeBase.connectionTypes.Output(
77 doc="Catalog with matched target and reference objects with separations",
78 name="{targetCatalog}_{refCatalog}_match",
79 storageClass="ArrowAstropy",
80 dimensions=("tract", "skymap"),
81 )
84class CatalogMatchConfig(pipeBase.PipelineTaskConfig, pipelineConnections=CatalogMatchConnections):
85 referenceCatalogLoader = pexConfig.ConfigurableField(
86 target=LoadReferenceCatalogTask,
87 doc="Reference catalog loader",
88 )
90 epoch = pexConfig.Field[float](
91 doc="Epoch to which reference objects are shifted.",
92 default=2015.0,
93 )
95 filterNames = pexConfig.ListField[str](
96 doc="Physical filter names to persist downstream.",
97 default=["u", "g", "r", "i", "z", "y"],
98 )
100 selectorBands = pexConfig.ListField[str](
101 doc="Band to use when selecting objects, primarily for extendedness.",
102 default=["i"],
103 )
105 selectorActions = ConfigurableActionStructField[VectorAction](
106 doc="Which selectors to use to narrow down the data for QA plotting.",
107 default={"flagSelector": MatchingFlagSelector()},
108 )
110 sourceSelectorActions = ConfigurableActionStructField[VectorAction](
111 doc="What types of sources to use.",
112 default={},
113 )
115 extraColumnSelectors = ConfigurableActionStructField[VectorAction](
116 doc="Other selectors that are not used in this task, but whose columns" "may be needed downstream",
117 default={
118 "selector1": SnSelector(),
119 "selector2": StarSelector(),
120 "selector3": GalaxySelector(),
121 "selector4": CoaddPlotFlagSelector(),
122 },
123 )
125 idColumn = pexConfig.Field[str](
126 doc="Unique ID column for the target catalog.",
127 default="objectId",
128 )
130 extraColumns = pexConfig.ListField[str](
131 doc="Other catalog columns to persist to downstream tasks",
132 default=["patch", "ebv"],
133 )
135 extraPerBandColumns = pexConfig.ListField[str](
136 doc="Other columns to load that should be loaded for each band individually.",
137 default=["cModelFlux"],
138 )
140 matchRadius = pexConfig.Field[float](
141 doc="The radius to use for matching, in arcsecs.",
142 default=1.0,
143 )
145 targetRaColumn = pexConfig.Field[str](
146 doc="RA column name for the target (being matched) catalog.",
147 default="coord_ra",
148 )
150 targetDecColumn = pexConfig.Field[str](
151 doc="Dec column name for the target (being matched) catalog.",
152 default="coord_dec",
153 )
155 refRaColumn = pexConfig.Field[str](
156 doc="RA column name for the reference (being matched to) catalog.",
157 default="ra",
158 )
160 refDecColumn = pexConfig.Field[str](
161 doc="Dec column name for the reference (being matched to) catalog.",
162 default="dec",
163 )
165 raColumn = pexConfig.Field[str](
166 doc="RA column.",
167 default="coord_ra",
168 deprecated="This field was replaced with targetRaColumn and is unused. Will be removed after v27.",
169 )
171 decColumn = pexConfig.Field[str](
172 doc="Dec column.",
173 default="coord_dec",
174 deprecated="This field was replaced with targetDecColumn and is unused. Will be removed after v27.",
175 )
177 patchColumn = pexConfig.Field[str](doc="Patch column.", default="patch")
179 matchesRefCat = pexConfig.Field[bool](
180 doc="Is the catalog being matched to stored as a reference catalog?",
181 default=False,
182 )
184 returnNonMatches = pexConfig.Field[bool](
185 doc="Return the rows of the reference catalog that didn't get matched?",
186 default=False,
187 )
189 def setDefaults(self):
190 super().setDefaults()
191 self.referenceCatalogLoader.doReferenceSelection = False
192 self.referenceCatalogLoader.doApplyColorTerms = False
195class CatalogMatchTask(pipeBase.PipelineTask):
196 """The base task for matching catalogs. Figures out which columns
197 it needs to grab for the downstream tasks and then matches the
198 two tables together and returns the matched and joined table
199 including the extra columns.
200 """
202 ConfigClass = CatalogMatchConfig
203 _DefaultName = "analysisToolsCatalogMatch"
205 def runQuantum(self, butlerQC, inputRefs, outputRefs):
206 """Implemented in the inherited tasks"""
207 pass
209 def run(self, *, targetCatalog, refCatalog, bands):
210 """Takes the two catalogs and returns the matched one.
212 Parameters
213 ----------
214 `targetCatalog` : astropy.table.Table
215 The catalog to be matched
216 `refCatalog` : astropy.table.Table
217 The catalog to be matched to
218 `bands` : list
219 A list of bands to apply the selectors in
221 Returns
222 -------
223 `matchedCatalog` : astropy.table.Table
225 Notes
226 -----
227 Performs an RA/Dec match that returns the closest match
228 within the match radius which defaults to 1.0 arcsecond.
229 Applies the suffix, _target, to the catalog being matched
230 and _ref to the reference catalog being matched to.
231 """
232 # Apply the selectors to the catalog
233 mask = np.ones(len(targetCatalog), dtype=bool)
234 for selector in self.config.sourceSelectorActions:
235 for band in self.config.selectorBands:
236 mask &= selector(targetCatalog, band=band).astype(bool)
238 targetCatalog = targetCatalog[mask]
240 if (len(targetCatalog) == 0) or (len(refCatalog)) == 0:
241 refMatchIndices = np.array([], dtype=np.int64)
242 targetMatchIndices = np.array([], dtype=np.int64)
243 dists = np.array([], dtype=np.float64)
244 else:
245 # Run the matcher.
247 # This all assumes that everything is in degrees.
248 # Which I think is okay, but the current task
249 # allows different units. Need to configure match
250 # radius, either in this task or a subtask.
252 targetRas = np.asarray(targetCatalog[self.config.targetRaColumn])
253 targetDecs = np.asarray(targetCatalog[self.config.targetDecColumn])
254 targetMask = np.isfinite(targetRas) & np.isfinite(targetDecs)
256 # Get rid of entries in the refCat with non-finite RA/Dec values.
257 refRas = np.asarray(refCatalog[self.config.refRaColumn])
258 refDecs = np.asarray(refCatalog[self.config.refDecColumn])
259 refMask = np.isfinite(refRas) & np.isfinite(refDecs)
261 refCatalog = refCatalog[refMask]
262 with Matcher(refRas[refMask], refDecs[refMask]) as m:
263 idx, refMatchIndices, targetMatchIndices, dists = m.query_radius(
264 targetRas[targetMask],
265 targetDecs[targetMask],
266 self.config.matchRadius / 3600.0,
267 return_indices=True,
268 )
270 # Convert degrees to arcseconds.
271 dists *= 3600.0
273 targetCatalogMatched = targetCatalog[targetMatchIndices]
274 refCatalogMatched = refCatalog[refMatchIndices]
276 targetCols = targetCatalogMatched.columns.copy()
277 for col in targetCols:
278 targetCatalogMatched.rename_column(col, col + "_target")
279 refCols = refCatalogMatched.columns.copy()
280 for col in refCols:
281 refCatalogMatched.rename_column(col, col + "_ref")
283 if self.config.returnNonMatches:
284 unmatchedIndices = list(set(np.arange(0, len(refCatalog))) - set(refMatchIndices))
285 refCatalogNotMatched = refCatalog[unmatchedIndices]
286 # We need to set the relevant flag columns to
287 # true or false so that they make it through the
288 # selectors even though the none matched sources
289 # don't have values for those columns.
290 trueFlagCols = []
291 falseFlagCols = []
292 for selectorAction in [self.config.selectorActions, self.config.extraColumnSelectors]:
293 for selector in selectorAction:
294 try:
295 for flag in selector.selectWhenTrue:
296 trueFlagCols.append(flag)
297 for flag in selector.selectWhenFalse:
298 falseFlagCols.append(flag)
299 except AttributeError:
300 continue
301 for col in refCols:
302 refCatalogNotMatched.rename_column(col, col + "_ref")
303 for col in targetCols:
304 refCatalogNotMatched[col] = [np.nan] * len(refCatalogNotMatched)
305 for col in trueFlagCols:
306 refCatalogNotMatched[col] = [True] * len(refCatalogNotMatched)
307 for col in falseFlagCols:
308 refCatalogNotMatched[col] = [False] * len(refCatalogNotMatched)
310 if self.config.matchesRefCat:
311 for i, band in enumerate(bands):
312 refCatalogMatched[band + "_mag_ref"] = refCatalogMatched["refMag_ref"][:, i]
313 refCatalogMatched[band + "_magErr_ref"] = refCatalogMatched["refMagErr_ref"][:, i]
314 refCatalogMatched.remove_column("refMag_ref")
315 refCatalogMatched.remove_column("refMagErr_ref")
317 if self.config.returnNonMatches:
318 for i, band in enumerate(bands):
319 refCatalogNotMatched[band + "_mag_ref"] = refCatalogNotMatched["refMag_ref"][:, i]
320 refCatalogNotMatched[band + "_magErr_ref"] = refCatalogNotMatched["refMagErr_ref"][:, i]
321 refCatalogNotMatched.remove_column("refMag_ref")
322 refCatalogNotMatched.remove_column("refMagErr_ref")
324 tMatched = hstack([targetCatalogMatched, refCatalogMatched], join_type="exact")
325 tMatched["matchDistance"] = dists
327 if self.config.returnNonMatches:
328 refCatalogNotMatched["matchDistance"] = [np.nan] * len(refCatalogNotMatched)
329 tMatched = vstack([tMatched, refCatalogNotMatched])
331 return pipeBase.Struct(matchedCatalog=tMatched)
333 def prepColumns(self, bands):
334 """Get all the columns needed for downstream tasks.
335 Both those from the selectors and those specified in the
336 config options.
337 """
339 bandColumns = []
340 for band in bands:
341 for col in self.config.extraPerBandColumns:
342 bandColumns.append(band + "_" + col)
344 columns = (
345 [
346 self.config.idColumn,
347 self.config.targetRaColumn,
348 self.config.targetDecColumn,
349 ]
350 + self.config.extraColumns.list()
351 + bandColumns
352 )
354 if self.config.patchColumn != "":
355 columns.append(self.config.patchColumn)
357 selectorBands = list(set(list(bands) + self.config.selectorBands.list()))
358 for selectorAction in [
359 self.config.selectorActions,
360 self.config.sourceSelectorActions,
361 self.config.extraColumnSelectors,
362 ]:
363 for selector in selectorAction:
364 for band in selectorBands:
365 selectorSchema = selector.getFormattedInputSchema(band=band)
366 columns += [s[0] for s in selectorSchema]
368 return columns
370 def _loadRefCat(self, loaderTask, tractInfo, epoch=None):
371 """Load the reference catalog that covers the
372 catalog that is to be matched to.
374 Parameters
375 ----------
376 `loaderTask` :
377 lsst.pipe.tasks.loadReferenceCatalog.loadReferenceCatalogTask
378 `tractInfo` : lsst.skymap.tractInfo.ExplicitTractInfo
379 The tract information to get the sky location from
380 `epoch` : `astropy.time.Time`
381 The epoch to which the reference catalog will be shifted.
383 Returns
384 -------
385 `loadedRefCat` : astropy.table.Table
386 The reference catalog that covers the input catalog.
387 """
388 boundingCircle = tractInfo.getOuterSkyPolygon().getBoundingCircle()
389 center = lsst.geom.SpherePoint(boundingCircle.getCenter())
390 radius = boundingCircle.getOpeningAngle()
392 if epoch is None:
393 epoch = Time(self.config.epoch, format="decimalyear")
395 # This is always going to return degrees.
396 try:
397 loadedRefCat = loaderTask.getSkyCircleCatalog(
398 center, radius, self.config.filterNames, epoch=epoch
399 )
400 except RuntimeError as e:
401 raise pipeBase.NoWorkFound(e)
403 return Table(loadedRefCat)