lsst.pipe.tasks g2c21b0017a+4f59a27f16
Loading...
Searching...
No Matches
consolidateSsTables.py
Go to the documentation of this file.
1# This file is part of pipe_tasks.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
21
22__all__ = ["ConsolidateSsTablesConfig", "ConsolidateSsTablesTask", "ConsolidateSsTablesConnections"]
23
24import astropy.table as tb
25import astropy.units as u
26import numpy as np
27import warnings
28
29import lsst.pex.config as pexConfig
30import lsst.pipe.base as pipeBase
31import lsst.pipe.base.connectionTypes as cT
32from lsst.utils.timer import timeMethod
33from .ssp.ssobject import DIA_COLUMNS, compute_ssobject
34
35warnings.filterwarnings("ignore")
36
37
39 pipeBase.PipelineTaskConnections,
40 dimensions=("skymap",),
41 defaultTemplates={"coaddName": "goodSeeing", "fakesType": ""},
42):
43 inputCatalogs = cT.Input(
44 doc="associated ssSources from all tract-patches.",
45 name="{fakesType}{coaddName}Diff_assocSsSrcTable",
46 storageClass="ArrowAstropy",
47 dimensions=("tract", "patch"),
48 multiple=True,
49 )
50 mpcorb = cT.Input(
51 doc="Minor Planet Center orbit table used for association",
52 name="mpcorb",
53 storageClass="ArrowAstropy",
54 dimensions=(),
55 )
56 ssSourceTable = cT.Output(
57 doc="",
58 name="{fakesType}{coaddName}Diff_ssSrcTable",
59 storageClass="ArrowAstropy",
60 dimensions=(),
61 )
62 ssObjectTable = cT.Output(
63 doc="",
64 name="{fakesType}{coaddName}Diff_ssObjTable",
65 storageClass="ArrowAstropy",
66 dimensions=(),
67 )
68
69
70class ConsolidateSsTablesConfig(
71 pipeBase.PipelineTaskConfig, pipelineConnections=ConsolidateSsTablesConnections
72):
73 """Config for ConsolidateSsTablesTask"""
74
75 hg12FixedG12 = pexConfig.Field(
76 dtype=float,
77 default=None,
78 optional=True,
79 doc="If set, fix the G12 slope parameter to this value "
80 "and only fit H. If None (default), both H and G12 "
81 "are fit.",
82 )
83
84 hg12MagSigmaFloor = pexConfig.Field(
85 dtype=float,
86 default=0.0,
87 doc="Systematic magnitude error floor (mag) added in "
88 "quadrature to measurement errors before HG12 "
89 "fitting.",
90 )
91
92 hg12NSigmaClip = pexConfig.Field(
93 dtype=float,
94 default=None,
95 optional=True,
96 doc="If set, reject outliers beyond this many sigma "
97 "after an initial robust fit. If None, no clipping.",
98 )
99
100
101class ConsolidateSsTablesTask(pipeBase.PipelineTask):
102 """Consolidate per-patch ssSource tables into a single table.
103 Create ssObject table
104 TODO (DM-49451): Fit per-object parameters
105 TODO (DM-49453): Generate MPCORB table.
106 """
107
108 ConfigClass = ConsolidateSsTablesConfig
109 _DefaultName = "consolidateSsTables"
110
111 @timeMethod
112 def run(self, inputCatalogs, mpcorb):
113 """Concatenate per-patch ssSource tables.
114 Generate ssObject table.
115
116 Parameters
117 ----------
118 inputCatalogs `list` of `astropy.table.Table`:
119 All per-patch ssSource Tables
120
121 Returns
122 -------
123 output : `lsst.pipe.base.Struct`
124 Results struct with attributes:
125
126 ``sSourceTable``
127 Table of ssSources.
128 (`astropy.table.Table`)
129 ``ssObjectTable``
130 Table of ssObjects
131 (`astropy.table.Table`).
132 """
133 self.log.info("Concatenating %s per-patch ssSource Tables", len(inputCatalogs))
134 ssSourceTable = tb.vstack(inputCatalogs)
135 self.log.info(
136 f"Done. {len(ssSourceTable)} observations, {np.unique(ssSourceTable['ssObjectId']).size} objects."
137 )
138
139 # Compatibility for pre RFC-1138 ss_source_associated tables.
140 # To be removed in DM-53466.
141 if "heliocentricDist" in ssSourceTable.colnames:
142 arr = ssSourceTable["ssObjectId"] + 0x20000000_00000000 # leading whitespace
143 arr_s8 = arr.byteswap().view(arr.dtype.newbyteorder()).view("S8")
144 ssSourceTable["designation"] = np.char.lstrip(arr_s8)
145 # Distances → convert km → AU
146 au_in_km = (1 * u.au).to(u.km).value
147 ssSourceTable.rename_column("topocentricDist", "topoRange")
148 ssSourceTable["topoRange"] /= au_in_km
149 ssSourceTable.rename_column("heliocentricDist", "helioRange")
150 ssSourceTable["helioRange"] /= au_in_km
151
152 ssSourceTable.rename_column("heliocentricX", "helio_x")
153 ssSourceTable["helio_x"] /= au_in_km
154 ssSourceTable.rename_column("heliocentricY", "helio_y")
155 ssSourceTable["helio_y"] /= au_in_km
156 ssSourceTable.rename_column("heliocentricZ", "helio_z")
157 ssSourceTable["helio_z"] /= au_in_km
158
159 ssSourceTable.rename_column("topocentricX", "topo_x")
160 ssSourceTable["topo_x"] /= au_in_km
161 ssSourceTable.rename_column("topocentricY", "topo_y")
162 ssSourceTable["topo_y"] /= au_in_km
163 ssSourceTable.rename_column("topocentricZ", "topo_z")
164 ssSourceTable["topo_z"] /= au_in_km
165
166 # Velocities (no unit change, just renaming)
167 ssSourceTable.rename_column("heliocentricVX", "helio_vx")
168 ssSourceTable.rename_column("heliocentricVY", "helio_vy")
169 ssSourceTable.rename_column("heliocentricVZ", "helio_vz")
170
171 ssSourceTable.rename_column("topocentricVX", "topo_vx")
172 ssSourceTable.rename_column("topocentricVY", "topo_vy")
173 ssSourceTable.rename_column("topocentricVZ", "topo_vz")
174
175 # the rest
176 ssSourceTable.rename_column("residualRa", "ephOffsetRa")
177 ssSourceTable.rename_column("residualDec", "ephOffsetDec")
178 ssSourceTable.rename_column("eclipticLambda", "eclLambda")
179 ssSourceTable.rename_column("eclipticBeta", "eclBeta")
180 ssSourceTable.rename_column("galacticL", "galLon")
181 ssSourceTable.rename_column("galacticB", "galLat")
182
183 # if we're loading the old-style catalog, we require packed_primary_provisional_designation
184 # to be in the mpcorb schema (and we'll pretend that it's actually unpacked)
185 if mpcorb is not None:
186 mpcorb["unpacked_primary_provisional_designation"] = mpcorb[
187 "packed_primary_provisional_designation"
188 ]
189
190 if mpcorb is not None:
191 self.log.info(f"mpcorb loaded ({len(mpcorb)} objects, {len(mpcorb.columns)} columns)")
192 else:
193 self.log.info("mpcorb not loaded.")
194
195 # extract the DiaSource subset and remove it from ssSourceTable
196 diaSource = tb.Table()
197 for c in DIA_COLUMNS:
198 src = c if c == "diaSourceId" else f"DIA_{c}"
199 diaSource[c] = ssSourceTable[src]
200 if src != "diaSourceId":
201 del ssSourceTable[src]
202
203 # build the SSObject table
204 self.log.info(
205 "HG12 fit config: hg12FixedG12=%s, "
206 "hg12MagSigmaFloor=%s, hg12NSigmaClip=%s",
207 self.config.hg12FixedG12,
208 self.config.hg12MagSigmaFloor,
209 self.config.hg12NSigmaClip,
210 )
211 ssSourceTable.sort("ssObjectId")
212 mpcorb = mpcorb.to_pandas() if isinstance(mpcorb, tb.Table) else mpcorb
213 ssObjectTable = compute_ssobject(
214 ssSourceTable.to_pandas(), diaSource.to_pandas(), mpcorb,
215 fixedG12=self.config.hg12FixedG12,
216 magSigmaFloor=self.config.hg12MagSigmaFloor,
217 nSigmaClip=self.config.hg12NSigmaClip,
218 )
219 ssObjectTable = tb.Table(ssObjectTable)
220
221 return pipeBase.Struct(
222 ssSourceTable=ssSourceTable,
223 ssObjectTable=ssObjectTable,
224 )