Coverage for python / lsst / pipe / tasks / consolidateSsTables.py: 0%
73 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 09:21 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 09:21 +0000
1# This file is part of pipe_tasks.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22__all__ = ["ConsolidateSsTablesConfig", "ConsolidateSsTablesTask", "ConsolidateSsTablesConnections"]
24import astropy.table as tb
25import astropy.units as u
26import numpy as np
27import warnings
29import lsst.pipe.base as pipeBase
30import lsst.pipe.base.connectionTypes as cT
31from lsst.utils.timer import timeMethod
32from .ssp.ssobject import DIA_COLUMNS, compute_ssobject
34warnings.filterwarnings("ignore")
37class ConsolidateSsTablesConnections(
38 pipeBase.PipelineTaskConnections,
39 dimensions=("skymap",),
40 defaultTemplates={"coaddName": "goodSeeing", "fakesType": ""},
41):
42 inputCatalogs = cT.Input(
43 doc="associated ssSources from all tract-patches.",
44 name="{fakesType}{coaddName}Diff_assocSsSrcTable",
45 storageClass="ArrowAstropy",
46 dimensions=("tract", "patch"),
47 multiple=True,
48 )
49 mpcorb = cT.Input(
50 doc="Minor Planet Center orbit table used for association",
51 name="mpcorb",
52 storageClass="ArrowAstropy",
53 dimensions=(),
54 )
55 ssSourceTable = cT.Output(
56 doc="",
57 name="{fakesType}{coaddName}Diff_ssSrcTable",
58 storageClass="ArrowAstropy",
59 dimensions=(),
60 )
61 ssObjectTable = cT.Output(
62 doc="",
63 name="{fakesType}{coaddName}Diff_ssObjTable",
64 storageClass="ArrowAstropy",
65 dimensions=(),
66 )
69class ConsolidateSsTablesConfig(
70 pipeBase.PipelineTaskConfig, pipelineConnections=ConsolidateSsTablesConnections
71):
72 """Config for ConsolidateSsTablesTask"""
75class ConsolidateSsTablesTask(pipeBase.PipelineTask):
76 """Consolidate per-patch ssSource tables into a single table.
77 Create ssObject table
78 TODO (DM-49451): Fit per-object parameters
79 TODO (DM-49453): Generate MPCORB table.
80 """
82 ConfigClass = ConsolidateSsTablesConfig
83 _DefaultName = "consolidateSsTables"
85 @timeMethod
86 def run(self, inputCatalogs, mpcorb):
87 """Concatenate per-patch ssSource tables.
88 Generate ssObject table.
90 Parameters
91 ----------
92 inputCatalogs `list` of `astropy.table.Table`:
93 All per-patch ssSource Tables
95 Returns
96 -------
97 output : `lsst.pipe.base.Struct`
98 Results struct with attributes:
100 ``sSourceTable``
101 Table of ssSources.
102 (`astropy.table.Table`)
103 ``ssObjectTable``
104 Table of ssObjects
105 (`astropy.table.Table`).
106 """
107 self.log.info("Concatenating %s per-patch ssSource Tables", len(inputCatalogs))
108 ssSourceTable = tb.vstack(inputCatalogs)
109 self.log.info(
110 f"Done. {len(ssSourceTable)} observations, {np.unique(ssSourceTable['ssObjectId']).size} objects."
111 )
113 # Compatibility for pre RFC-1138 ss_source_associated tables.
114 # To be removed in DM-53466.
115 if "heliocentricDist" in ssSourceTable.colnames:
116 arr = ssSourceTable["ssObjectId"] + 0x20000000_00000000 # leading whitespace
117 arr_s8 = arr.byteswap().view(arr.dtype.newbyteorder()).view("S8")
118 ssSourceTable["designation"] = np.char.lstrip(arr_s8)
119 # Distances → convert km → AU
120 au_in_km = (1 * u.au).to(u.km).value
121 ssSourceTable.rename_column("topocentricDist", "topoRange")
122 ssSourceTable["topoRange"] /= au_in_km
123 ssSourceTable.rename_column("heliocentricDist", "helioRange")
124 ssSourceTable["helioRange"] /= au_in_km
126 ssSourceTable.rename_column("heliocentricX", "helio_x")
127 ssSourceTable["helio_x"] /= au_in_km
128 ssSourceTable.rename_column("heliocentricY", "helio_y")
129 ssSourceTable["helio_y"] /= au_in_km
130 ssSourceTable.rename_column("heliocentricZ", "helio_z")
131 ssSourceTable["helio_z"] /= au_in_km
133 ssSourceTable.rename_column("topocentricX", "topo_x")
134 ssSourceTable["topo_x"] /= au_in_km
135 ssSourceTable.rename_column("topocentricY", "topo_y")
136 ssSourceTable["topo_y"] /= au_in_km
137 ssSourceTable.rename_column("topocentricZ", "topo_z")
138 ssSourceTable["topo_z"] /= au_in_km
140 # Velocities (no unit change, just renaming)
141 ssSourceTable.rename_column("heliocentricVX", "helio_vx")
142 ssSourceTable.rename_column("heliocentricVY", "helio_vy")
143 ssSourceTable.rename_column("heliocentricVZ", "helio_vz")
145 ssSourceTable.rename_column("topocentricVX", "topo_vx")
146 ssSourceTable.rename_column("topocentricVY", "topo_vy")
147 ssSourceTable.rename_column("topocentricVZ", "topo_vz")
149 # the rest
150 ssSourceTable.rename_column("residualRa", "ephOffsetRa")
151 ssSourceTable.rename_column("residualDec", "ephOffsetDec")
152 ssSourceTable.rename_column("eclipticLambda", "eclLambda")
153 ssSourceTable.rename_column("eclipticBeta", "eclBeta")
154 ssSourceTable.rename_column("galacticL", "galLon")
155 ssSourceTable.rename_column("galacticB", "galLat")
157 # if we're loading the old-style catalog, we require packed_primary_provisional_designation
158 # to be in the mpcorb schema (and we'll pretend that it's actually unpacked)
159 if mpcorb is not None:
160 mpcorb["unpacked_primary_provisional_designation"] = mpcorb[
161 "packed_primary_provisional_designation"
162 ]
164 if mpcorb is not None:
165 self.log.info(f"mpcorb loaded ({len(mpcorb)} objects, {len(mpcorb.columns)} columns)")
166 else:
167 self.log.info("mpcorb not loaded.")
169 # extract the DiaSource subset and remove it from ssSourceTable
170 diaSource = tb.Table()
171 for c in DIA_COLUMNS:
172 src = c if c == "diaSourceId" else f"DIA_{c}"
173 diaSource[c] = ssSourceTable[src]
174 if src != "diaSourceId":
175 del ssSourceTable[src]
177 # build the SSObject table
178 ssSourceTable.sort("ssObjectId")
179 mpcorb = mpcorb.to_pandas() if isinstance(mpcorb, tb.Table) else mpcorb
180 ssObjectTable = compute_ssobject(
181 ssSourceTable.to_pandas(), diaSource.to_pandas(), mpcorb
182 )
183 ssObjectTable = tb.Table(ssObjectTable)
185 return pipeBase.Struct(
186 ssSourceTable=ssSourceTable,
187 ssObjectTable=ssObjectTable,
188 )