23 'MatchTractCatalogSubConfig',
'MatchTractCatalogSubTask',
24 'MatchTractCatalogConfig',
'MatchTractCatalogTask'
29import lsst.pipe.base
as pipeBase
30import lsst.pipe.base.connectionTypes
as cT
33from abc
import ABC, abstractmethod
37from typing
import Tuple, Set
40MatchTractCatalogBaseTemplates = {
41 "name_input_cat_ref":
"truth_summary",
42 "name_input_cat_target":
"objectTable_tract",
47 pipeBase.PipelineTaskConnections,
48 dimensions=(
"tract",
"skymap"),
49 defaultTemplates=MatchTractCatalogBaseTemplates,
52 doc=
"Reference object catalog to match from",
53 name=
"{name_input_cat_ref}",
54 storageClass=
"ArrowAstropy",
55 dimensions=(
"tract",
"skymap"),
58 cat_target = cT.Input(
59 doc=
"Target object catalog to match",
60 name=
"{name_input_cat_target}",
61 storageClass=
"ArrowAstropy",
62 dimensions=(
"tract",
"skymap"),
66 doc=
"Input definition of geometry/bbox and projection/wcs for coadded exposures",
67 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
68 storageClass=
"SkyMap",
69 dimensions=(
"skymap",),
71 cat_output_ref = cT.Output(
72 doc=
"Reference matched catalog with indices of target matches",
73 name=
"match_ref_{name_input_cat_ref}_{name_input_cat_target}",
74 storageClass=
"ArrowAstropy",
75 dimensions=(
"tract",
"skymap"),
77 cat_output_target = cT.Output(
78 doc=
"Target matched catalog with indices of reference matches",
79 name=
"match_target_{name_input_cat_ref}_{name_input_cat_target}",
80 storageClass=
"ArrowAstropy",
81 dimensions=(
"tract",
"skymap"),
85 if config.refcat_sharding_type !=
"tract":
86 if config.refcat_sharding_type ==
"none":
91 storageClass=old.storageClass,
93 deferLoad=old.deferLoad,
96 raise NotImplementedError(f
"{config.refcat_sharding_type=} not implemented")
97 if config.target_sharding_type !=
"tract":
98 if config.target_sharding_type ==
"none":
103 storageClass=old.storageClass,
105 deferLoad=old.deferLoad,
108 raise NotImplementedError(f
"{config.target_sharding_type=} not implemented")
112 """Config class for the MatchTractCatalogSubTask to define methods returning
113 values that depend on multiple config settings.
118 raise NotImplementedError()
123 raise NotImplementedError()
127 """An abstract interface for subtasks of MatchTractCatalogTask to match
128 two tract object catalogs.
133 Additional arguments to be passed to the `lsst.pipe.base.Task`
136 ConfigClass = MatchTractCatalogSubConfig
144 catalog_ref: pd.DataFrame | astropy.table.Table,
145 catalog_target: pd.DataFrame | astropy.table.Table,
146 wcs: afwGeom.SkyWcs =
None,
147 ) -> pipeBase.Struct:
148 """Match sources in a reference tract catalog with a target catalog.
152 catalog_ref : `pandas.DataFrame` | `astropy.table.Table`
153 A reference catalog to match objects/sources from.
154 catalog_target : `pandas.DataFrame` | `astropy.table.Table`
155 A target catalog to match reference objects/sources to.
156 wcs : `lsst.afw.image.SkyWcs`
157 A coordinate system to convert catalog positions to sky coordinates.
161 retStruct : `lsst.pipe.base.Struct`
162 A struct with output_ref and output_target attribute containing the
163 output matched catalogs.
165 raise NotImplementedError()
169 pipeBase.PipelineTaskConfig,
170 pipelineConnections=MatchTractCatalogConnections,
172 """Configure a MatchTractCatalogTask, including a configurable matching subtask.
174 match_tract_catalog = pexConfig.ConfigurableField(
175 target=MatchTractCatalogSubTask,
176 doc=
"Task to match sources in a reference tract catalog with a target catalog",
178 refcat_sharding_type = pexConfig.ChoiceField[str](
179 doc=
"The type of sharding (spatial splitting) for the reference catalog",
180 allowed={
"tract":
"Tract-based shards",
"none":
"No sharding at all"},
183 target_sharding_type = pexConfig.ChoiceField[str](
184 doc=
"The type of sharding (spatial splitting) for the target catalog",
185 allowed={
"tract":
"Tract-based shards",
"none":
"No sharding at all"},
190 """Get the set of input columns required for matching.
194 columns_ref : `set` [`str`]
195 The set of required input catalog column names.
196 columns_target : `set` [`str`]
197 The set of required target catalog column names.
202 except AttributeError
as err:
203 raise RuntimeError(f
'{__class__}.match_tract_catalog must have columns_in_ref and'
204 f
' columns_in_target attributes: {err}')
from None
205 return set(columns_ref), set(columns_target)
209 """Match sources in a reference tract catalog with those in a target catalog.
211 ConfigClass = MatchTractCatalogConfig
212 _DefaultName =
"MatchTractCatalog"
215 super().
__init__(initInputs=initInputs, **kwargs)
216 self.makeSubtask(
"match_tract_catalog")
219 inputs = butlerQC.get(inputRefs)
220 columns_ref, columns_target = self.config.get_columns_in()
221 skymap = inputs.pop(
"skymap")
224 catalog_ref=inputs[
'cat_ref'].get(parameters={
'columns': columns_ref}),
225 catalog_target=inputs[
'cat_target'].get(parameters={
'columns': columns_target}),
226 wcs=skymap[butlerQC.quantum.dataId[
"tract"]].wcs,
228 butlerQC.put(outputs, outputRefs)
232 catalog_ref: pd.DataFrame | astropy.table.Table,
233 catalog_target: pd.DataFrame | astropy.table.Table,
234 wcs: afwGeom.SkyWcs =
None,
235 ) -> pipeBase.Struct:
236 """Match sources in a reference tract catalog with a target catalog.
240 catalog_ref : `pandas.DataFrame` | `astropy.table.Table`
241 A reference catalog to match objects/sources from.
242 catalog_target : `pandas.DataFrame` | `astropy.table.Table`
243 A target catalog to match reference objects/sources to.
244 wcs : `lsst.afw.image.SkyWcs`
245 A coordinate system to convert catalog positions to sky coordinates,
250 retStruct : `lsst.pipe.base.Struct`
251 A struct with output_ref and output_target attribute containing the
252 output matched catalogs.
254 output = self.match_tract_catalog.run(catalog_ref, catalog_target, wcs=wcs)
255 if output.exceptions:
256 self.log.warning(
'Exceptions: %s', output.exceptions)
257 retStruct = pipeBase.Struct(cat_output_ref=output.cat_output_ref,
258 cat_output_target=output.cat_output_target)
Tuple[Set, Set] get_columns_in(self)
__init__(self, *, config=None)
Set[str] columns_in_target(self)
Set[str] columns_in_ref(self)
pipeBase.Struct run(self, pd.DataFrame|astropy.table.Table catalog_ref, pd.DataFrame|astropy.table.Table catalog_target, afwGeom.SkyWcs wcs=None)
runQuantum(self, butlerQC, inputRefs, outputRefs)
pipeBase.Struct run(self, pd.DataFrame|astropy.table.Table catalog_ref, pd.DataFrame|astropy.table.Table catalog_target, afwGeom.SkyWcs wcs=None)
__init__(self, initInputs, **kwargs)