22__all__ = [
"ConsolidateSsTablesConfig",
"ConsolidateSsTablesTask",
"ConsolidateSsTablesConnections"]
24import astropy.table
as tb
25import astropy.units
as u
29import lsst.pipe.base
as pipeBase
30import lsst.pipe.base.connectionTypes
as cT
31from lsst.utils.timer
import timeMethod
32from .ssp.ssobject
import DIA_COLUMNS, compute_ssobject
34warnings.filterwarnings(
"ignore")
38 pipeBase.PipelineTaskConnections,
39 dimensions=(
"skymap",),
40 defaultTemplates={
"coaddName":
"goodSeeing",
"fakesType":
""},
42 inputCatalogs = cT.Input(
43 doc=
"associated ssSources from all tract-patches.",
44 name=
"{fakesType}{coaddName}Diff_assocSsSrcTable",
45 storageClass=
"ArrowAstropy",
46 dimensions=(
"tract",
"patch"),
50 doc=
"Minor Planet Center orbit table used for association",
52 storageClass=
"ArrowAstropy",
55 ssSourceTable = cT.Output(
57 name=
"{fakesType}{coaddName}Diff_ssSrcTable",
58 storageClass=
"ArrowAstropy",
61 ssObjectTable = cT.Output(
63 name=
"{fakesType}{coaddName}Diff_ssObjTable",
64 storageClass=
"ArrowAstropy",
69class ConsolidateSsTablesConfig(
70 pipeBase.PipelineTaskConfig, pipelineConnections=ConsolidateSsTablesConnections
72 """Config for ConsolidateSsTablesTask"""
75class ConsolidateSsTablesTask(pipeBase.PipelineTask):
76 """Consolidate per-patch ssSource tables into a single table.
78 TODO (DM-49451): Fit per-object parameters
79 TODO (DM-49453): Generate MPCORB table.
82 ConfigClass = ConsolidateSsTablesConfig
83 _DefaultName =
"consolidateSsTables"
86 def run(self, inputCatalogs, mpcorb):
87 """Concatenate per-patch ssSource tables.
88 Generate ssObject table.
92 inputCatalogs `list` of `astropy.table.Table`:
93 All per-patch ssSource Tables
97 output : `lsst.pipe.base.Struct`
98 Results struct with attributes:
102 (`astropy.table.Table`)
105 (`astropy.table.Table`).
107 self.log.info(
"Concatenating %s per-patch ssSource Tables", len(inputCatalogs))
108 ssSourceTable = tb.vstack(inputCatalogs)
110 f
"Done. {len(ssSourceTable)} observations, {np.unique(ssSourceTable['ssObjectId']).size} objects."
115 if "heliocentricDist" in ssSourceTable.colnames:
116 arr = ssSourceTable[
"ssObjectId"] + 0x20000000_00000000
117 arr_s8 = arr.byteswap().view(arr.dtype.newbyteorder()).view(
"S8")
118 ssSourceTable[
"designation"] = np.char.lstrip(arr_s8)
120 au_in_km = (1 * u.au).to(u.km).value
121 ssSourceTable.rename_column(
"topocentricDist",
"topoRange")
122 ssSourceTable[
"topoRange"] /= au_in_km
123 ssSourceTable.rename_column(
"heliocentricDist",
"helioRange")
124 ssSourceTable[
"helioRange"] /= au_in_km
126 ssSourceTable.rename_column(
"heliocentricX",
"helio_x")
127 ssSourceTable[
"helio_x"] /= au_in_km
128 ssSourceTable.rename_column(
"heliocentricY",
"helio_y")
129 ssSourceTable[
"helio_y"] /= au_in_km
130 ssSourceTable.rename_column(
"heliocentricZ",
"helio_z")
131 ssSourceTable[
"helio_z"] /= au_in_km
133 ssSourceTable.rename_column(
"topocentricX",
"topo_x")
134 ssSourceTable[
"topo_x"] /= au_in_km
135 ssSourceTable.rename_column(
"topocentricY",
"topo_y")
136 ssSourceTable[
"topo_y"] /= au_in_km
137 ssSourceTable.rename_column(
"topocentricZ",
"topo_z")
138 ssSourceTable[
"topo_z"] /= au_in_km
141 ssSourceTable.rename_column(
"heliocentricVX",
"helio_vx")
142 ssSourceTable.rename_column(
"heliocentricVY",
"helio_vy")
143 ssSourceTable.rename_column(
"heliocentricVZ",
"helio_vz")
145 ssSourceTable.rename_column(
"topocentricVX",
"topo_vx")
146 ssSourceTable.rename_column(
"topocentricVY",
"topo_vy")
147 ssSourceTable.rename_column(
"topocentricVZ",
"topo_vz")
150 ssSourceTable.rename_column(
"residualRa",
"ephOffsetRa")
151 ssSourceTable.rename_column(
"residualDec",
"ephOffsetDec")
152 ssSourceTable.rename_column(
"eclipticLambda",
"eclLambda")
153 ssSourceTable.rename_column(
"eclipticBeta",
"eclBeta")
154 ssSourceTable.rename_column(
"galacticL",
"galLon")
155 ssSourceTable.rename_column(
"galacticB",
"galLat")
159 if mpcorb
is not None:
160 mpcorb[
"unpacked_primary_provisional_designation"] = mpcorb[
161 "packed_primary_provisional_designation"
164 if mpcorb
is not None:
165 self.log.info(f
"mpcorb loaded ({len(mpcorb)} objects, {len(mpcorb.columns)} columns)")
167 self.log.info(
"mpcorb not loaded.")
170 diaSource = tb.Table()
171 for c
in DIA_COLUMNS:
172 src = c
if c ==
"diaSourceId" else f
"DIA_{c}"
173 diaSource[c] = ssSourceTable[src]
174 if src !=
"diaSourceId":
175 del ssSourceTable[src]
178 ssSourceTable.sort(
"ssObjectId")
179 mpcorb = mpcorb.to_pandas()
if isinstance(mpcorb, tb.Table)
else mpcorb
180 ssObjectTable = compute_ssobject(
181 ssSourceTable.to_pandas(), diaSource.to_pandas(), mpcorb
183 ssObjectTable = tb.Table(ssObjectTable)
185 return pipeBase.Struct(
186 ssSourceTable=ssSourceTable,
187 ssObjectTable=ssObjectTable,