Coverage for python / lsst / pipe / tasks / match_tract_catalog.py: 0%
72 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 18:38 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 18:38 +0000
1# This file is part of pipe_tasks.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22__all__ = [
23 'MatchTractCatalogSubConfig', 'MatchTractCatalogSubTask',
24 'MatchTractCatalogConfig', 'MatchTractCatalogTask'
25]
27import lsst.afw.geom as afwGeom
28import lsst.pex.config as pexConfig
29import lsst.pipe.base as pipeBase
30import lsst.pipe.base.connectionTypes as cT
31from lsst.skymap import BaseSkyMap
33from abc import ABC, abstractmethod
35import astropy.table
36import pandas as pd
37from typing import Tuple, Set
40MatchTractCatalogBaseTemplates = {
41 "name_input_cat_ref": "truth_summary",
42 "name_input_cat_target": "objectTable_tract",
43}
46class MatchTractCatalogConnections(
47 pipeBase.PipelineTaskConnections,
48 dimensions=("tract", "skymap"),
49 defaultTemplates=MatchTractCatalogBaseTemplates,
50):
51 cat_ref = cT.Input(
52 doc="Reference object catalog to match from",
53 name="{name_input_cat_ref}",
54 storageClass="ArrowAstropy",
55 dimensions=("tract", "skymap"),
56 deferLoad=True,
57 )
58 cat_target = cT.Input(
59 doc="Target object catalog to match",
60 name="{name_input_cat_target}",
61 storageClass="ArrowAstropy",
62 dimensions=("tract", "skymap"),
63 deferLoad=True,
64 )
65 skymap = cT.Input(
66 doc="Input definition of geometry/bbox and projection/wcs for coadded exposures",
67 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
68 storageClass="SkyMap",
69 dimensions=("skymap",),
70 )
71 cat_output_ref = cT.Output(
72 doc="Reference matched catalog with indices of target matches",
73 name="match_ref_{name_input_cat_ref}_{name_input_cat_target}",
74 storageClass="ArrowAstropy",
75 dimensions=("tract", "skymap"),
76 )
77 cat_output_target = cT.Output(
78 doc="Target matched catalog with indices of reference matches",
79 name="match_target_{name_input_cat_ref}_{name_input_cat_target}",
80 storageClass="ArrowAstropy",
81 dimensions=("tract", "skymap"),
82 )
84 def __init__(self, *, config=None):
85 if config.refcat_sharding_type != "tract":
86 if config.refcat_sharding_type == "none":
87 old = self.cat_ref
88 self.cat_ref = cT.Input(
89 doc=old.doc,
90 name=old.name,
91 storageClass=old.storageClass,
92 dimensions=(),
93 deferLoad=old.deferLoad,
94 )
95 else:
96 raise NotImplementedError(f"{config.refcat_sharding_type=} not implemented")
97 if config.target_sharding_type != "tract":
98 if config.target_sharding_type == "none":
99 old = self.cat_target
100 self.cat_target = cT.Input(
101 doc=old.doc,
102 name=old.name,
103 storageClass=old.storageClass,
104 dimensions=(),
105 deferLoad=old.deferLoad,
106 )
107 else:
108 raise NotImplementedError(f"{config.target_sharding_type=} not implemented")
111class MatchTractCatalogSubConfig(pexConfig.Config):
112 """Config class for the MatchTractCatalogSubTask to define methods returning
113 values that depend on multiple config settings.
114 """
115 @property
116 @abstractmethod
117 def columns_in_ref(self) -> Set[str]:
118 raise NotImplementedError()
120 @property
121 @abstractmethod
122 def columns_in_target(self) -> Set[str]:
123 raise NotImplementedError()
126class MatchTractCatalogSubTask(pipeBase.Task, ABC):
127 """An abstract interface for subtasks of MatchTractCatalogTask to match
128 two tract object catalogs.
130 Parameters
131 ----------
132 **kwargs
133 Additional arguments to be passed to the `lsst.pipe.base.Task`
134 constructor.
135 """
136 ConfigClass = MatchTractCatalogSubConfig
138 def __init__(self, **kwargs):
139 super().__init__(**kwargs)
141 @abstractmethod
142 def run(
143 self,
144 catalog_ref: pd.DataFrame | astropy.table.Table,
145 catalog_target: pd.DataFrame | astropy.table.Table,
146 wcs: afwGeom.SkyWcs = None,
147 ) -> pipeBase.Struct:
148 """Match sources in a reference tract catalog with a target catalog.
150 Parameters
151 ----------
152 catalog_ref : `pandas.DataFrame` | `astropy.table.Table`
153 A reference catalog to match objects/sources from.
154 catalog_target : `pandas.DataFrame` | `astropy.table.Table`
155 A target catalog to match reference objects/sources to.
156 wcs : `lsst.afw.image.SkyWcs`
157 A coordinate system to convert catalog positions to sky coordinates.
159 Returns
160 -------
161 retStruct : `lsst.pipe.base.Struct`
162 A struct with output_ref and output_target attribute containing the
163 output matched catalogs.
164 """
165 raise NotImplementedError()
168class MatchTractCatalogConfig(
169 pipeBase.PipelineTaskConfig,
170 pipelineConnections=MatchTractCatalogConnections,
171):
172 """Configure a MatchTractCatalogTask, including a configurable matching subtask.
173 """
174 match_tract_catalog = pexConfig.ConfigurableField(
175 target=MatchTractCatalogSubTask,
176 doc="Task to match sources in a reference tract catalog with a target catalog",
177 )
178 refcat_sharding_type = pexConfig.ChoiceField[str](
179 doc="The type of sharding (spatial splitting) for the reference catalog",
180 allowed={"tract": "Tract-based shards", "none": "No sharding at all"},
181 default="tract",
182 )
183 target_sharding_type = pexConfig.ChoiceField[str](
184 doc="The type of sharding (spatial splitting) for the target catalog",
185 allowed={"tract": "Tract-based shards", "none": "No sharding at all"},
186 default="tract",
187 )
189 def get_columns_in(self) -> Tuple[Set, Set]:
190 """Get the set of input columns required for matching.
192 Returns
193 -------
194 columns_ref : `set` [`str`]
195 The set of required input catalog column names.
196 columns_target : `set` [`str`]
197 The set of required target catalog column names.
198 """
199 try:
200 columns_ref, columns_target = (self.match_tract_catalog.columns_in_ref,
201 self.match_tract_catalog.columns_in_target)
202 except AttributeError as err:
203 raise RuntimeError(f'{__class__}.match_tract_catalog must have columns_in_ref and'
204 f' columns_in_target attributes: {err}') from None
205 return set(columns_ref), set(columns_target)
208class MatchTractCatalogTask(pipeBase.PipelineTask):
209 """Match sources in a reference tract catalog with those in a target catalog.
210 """
211 ConfigClass = MatchTractCatalogConfig
212 _DefaultName = "MatchTractCatalog"
214 def __init__(self, initInputs, **kwargs):
215 super().__init__(initInputs=initInputs, **kwargs)
216 self.makeSubtask("match_tract_catalog")
218 def runQuantum(self, butlerQC, inputRefs, outputRefs):
219 inputs = butlerQC.get(inputRefs)
220 columns_ref, columns_target = self.config.get_columns_in()
221 skymap = inputs.pop("skymap")
223 outputs = self.run(
224 catalog_ref=inputs['cat_ref'].get(parameters={'columns': columns_ref}),
225 catalog_target=inputs['cat_target'].get(parameters={'columns': columns_target}),
226 wcs=skymap[butlerQC.quantum.dataId["tract"]].wcs,
227 )
228 butlerQC.put(outputs, outputRefs)
230 def run(
231 self,
232 catalog_ref: pd.DataFrame | astropy.table.Table,
233 catalog_target: pd.DataFrame | astropy.table.Table,
234 wcs: afwGeom.SkyWcs = None,
235 ) -> pipeBase.Struct:
236 """Match sources in a reference tract catalog with a target catalog.
238 Parameters
239 ----------
240 catalog_ref : `pandas.DataFrame` | `astropy.table.Table`
241 A reference catalog to match objects/sources from.
242 catalog_target : `pandas.DataFrame` | `astropy.table.Table`
243 A target catalog to match reference objects/sources to.
244 wcs : `lsst.afw.image.SkyWcs`
245 A coordinate system to convert catalog positions to sky coordinates,
246 if necessary.
248 Returns
249 -------
250 retStruct : `lsst.pipe.base.Struct`
251 A struct with output_ref and output_target attribute containing the
252 output matched catalogs.
253 """
254 output = self.match_tract_catalog.run(catalog_ref, catalog_target, wcs=wcs)
255 if output.exceptions:
256 self.log.warning('Exceptions: %s', output.exceptions)
257 retStruct = pipeBase.Struct(cat_output_ref=output.cat_output_ref,
258 cat_output_target=output.cat_output_target)
259 return retStruct