22__all__ = [
"ConsolidateSsTablesConfig",
"ConsolidateSsTablesTask",
"ConsolidateSsTablesConnections"]
24import astropy.table
as tb
25import astropy.units
as u
30import lsst.pipe.base
as pipeBase
31import lsst.pipe.base.connectionTypes
as cT
32from lsst.utils.timer
import timeMethod
33from .ssp.ssobject
import DIA_COLUMNS, compute_ssobject
35warnings.filterwarnings(
"ignore")
39 pipeBase.PipelineTaskConnections,
40 dimensions=(
"skymap",),
41 defaultTemplates={
"coaddName":
"goodSeeing",
"fakesType":
""},
43 inputCatalogs = cT.Input(
44 doc=
"associated ssSources from all tract-patches.",
45 name=
"{fakesType}{coaddName}Diff_assocSsSrcTable",
46 storageClass=
"ArrowAstropy",
47 dimensions=(
"tract",
"patch"),
51 doc=
"Minor Planet Center orbit table used for association",
53 storageClass=
"ArrowAstropy",
56 ssSourceTable = cT.Output(
58 name=
"{fakesType}{coaddName}Diff_ssSrcTable",
59 storageClass=
"ArrowAstropy",
62 ssObjectTable = cT.Output(
64 name=
"{fakesType}{coaddName}Diff_ssObjTable",
65 storageClass=
"ArrowAstropy",
70class ConsolidateSsTablesConfig(
71 pipeBase.PipelineTaskConfig, pipelineConnections=ConsolidateSsTablesConnections
73 """Config for ConsolidateSsTablesTask"""
75 hg12FixedG12 = pexConfig.Field(
79 doc=
"If set, fix the G12 slope parameter to this value "
80 "and only fit H. If None (default), both H and G12 "
84 hg12MagSigmaFloor = pexConfig.Field(
87 doc=
"Systematic magnitude error floor (mag) added in "
88 "quadrature to measurement errors before HG12 "
92 hg12NSigmaClip = pexConfig.Field(
96 doc=
"If set, reject outliers beyond this many sigma "
97 "after an initial robust fit. If None, no clipping.",
101class ConsolidateSsTablesTask(pipeBase.PipelineTask):
102 """Consolidate per-patch ssSource tables into a single table.
103 Create ssObject table
104 TODO (DM-49451): Fit per-object parameters
105 TODO (DM-49453): Generate MPCORB table.
108 ConfigClass = ConsolidateSsTablesConfig
109 _DefaultName =
"consolidateSsTables"
112 def run(self, inputCatalogs, mpcorb):
113 """Concatenate per-patch ssSource tables.
114 Generate ssObject table.
118 inputCatalogs `list` of `astropy.table.Table`:
119 All per-patch ssSource Tables
123 output : `lsst.pipe.base.Struct`
124 Results struct with attributes:
128 (`astropy.table.Table`)
131 (`astropy.table.Table`).
133 self.log.info(
"Concatenating %s per-patch ssSource Tables", len(inputCatalogs))
134 ssSourceTable = tb.vstack(inputCatalogs)
136 f
"Done. {len(ssSourceTable)} observations, {np.unique(ssSourceTable['ssObjectId']).size} objects."
141 if "heliocentricDist" in ssSourceTable.colnames:
142 arr = ssSourceTable[
"ssObjectId"] + 0x20000000_00000000
143 arr_s8 = arr.byteswap().view(arr.dtype.newbyteorder()).view(
"S8")
144 ssSourceTable[
"designation"] = np.char.lstrip(arr_s8)
146 au_in_km = (1 * u.au).to(u.km).value
147 ssSourceTable.rename_column(
"topocentricDist",
"topoRange")
148 ssSourceTable[
"topoRange"] /= au_in_km
149 ssSourceTable.rename_column(
"heliocentricDist",
"helioRange")
150 ssSourceTable[
"helioRange"] /= au_in_km
152 ssSourceTable.rename_column(
"heliocentricX",
"helio_x")
153 ssSourceTable[
"helio_x"] /= au_in_km
154 ssSourceTable.rename_column(
"heliocentricY",
"helio_y")
155 ssSourceTable[
"helio_y"] /= au_in_km
156 ssSourceTable.rename_column(
"heliocentricZ",
"helio_z")
157 ssSourceTable[
"helio_z"] /= au_in_km
159 ssSourceTable.rename_column(
"topocentricX",
"topo_x")
160 ssSourceTable[
"topo_x"] /= au_in_km
161 ssSourceTable.rename_column(
"topocentricY",
"topo_y")
162 ssSourceTable[
"topo_y"] /= au_in_km
163 ssSourceTable.rename_column(
"topocentricZ",
"topo_z")
164 ssSourceTable[
"topo_z"] /= au_in_km
167 ssSourceTable.rename_column(
"heliocentricVX",
"helio_vx")
168 ssSourceTable.rename_column(
"heliocentricVY",
"helio_vy")
169 ssSourceTable.rename_column(
"heliocentricVZ",
"helio_vz")
171 ssSourceTable.rename_column(
"topocentricVX",
"topo_vx")
172 ssSourceTable.rename_column(
"topocentricVY",
"topo_vy")
173 ssSourceTable.rename_column(
"topocentricVZ",
"topo_vz")
176 ssSourceTable.rename_column(
"residualRa",
"ephOffsetRa")
177 ssSourceTable.rename_column(
"residualDec",
"ephOffsetDec")
178 ssSourceTable.rename_column(
"eclipticLambda",
"eclLambda")
179 ssSourceTable.rename_column(
"eclipticBeta",
"eclBeta")
180 ssSourceTable.rename_column(
"galacticL",
"galLon")
181 ssSourceTable.rename_column(
"galacticB",
"galLat")
185 if mpcorb
is not None:
186 mpcorb[
"unpacked_primary_provisional_designation"] = mpcorb[
187 "packed_primary_provisional_designation"
190 if mpcorb
is not None:
191 self.log.info(f
"mpcorb loaded ({len(mpcorb)} objects, {len(mpcorb.columns)} columns)")
193 self.log.info(
"mpcorb not loaded.")
196 diaSource = tb.Table()
197 for c
in DIA_COLUMNS:
198 src = c
if c ==
"diaSourceId" else f
"DIA_{c}"
199 diaSource[c] = ssSourceTable[src]
200 if src !=
"diaSourceId":
201 del ssSourceTable[src]
205 "HG12 fit config: hg12FixedG12=%s, "
206 "hg12MagSigmaFloor=%s, hg12NSigmaClip=%s",
207 self.config.hg12FixedG12,
208 self.config.hg12MagSigmaFloor,
209 self.config.hg12NSigmaClip,
211 ssSourceTable.sort(
"ssObjectId")
212 mpcorb = mpcorb.to_pandas()
if isinstance(mpcorb, tb.Table)
else mpcorb
213 ssObjectTable = compute_ssobject(
214 ssSourceTable.to_pandas(), diaSource.to_pandas(), mpcorb,
215 fixedG12=self.config.hg12FixedG12,
216 magSigmaFloor=self.config.hg12MagSigmaFloor,
217 nSigmaClip=self.config.hg12NSigmaClip,
219 ssObjectTable = tb.Table(ssObjectTable)
221 return pipeBase.Struct(
222 ssSourceTable=ssSourceTable,
223 ssObjectTable=ssObjectTable,