Coverage for python / lsst / pipe / tasks / ssp / ssobject.py: 12%
134 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-07 08:38 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-07 08:38 +0000
1import pandas as pd
2import numpy as np
3from functools import partial
4from . import photfit
5from . import util
6from . import schema
7from .moid import MOIDSolver, earth_orbit
8import argparse
9import sys
11# The only columns we need from DiaSource.
12# TODO DM-53699: These column names should be taken from and/or checked to
13# match the DiaSource table definition in sdm_schemas
14DIA_COLUMNS = [
15 "diaSourceId", "midpointMjdTai", "ra", "dec", "extendedness",
16 "band", "psfFlux", "psfFluxErr"
17]
18DIA_DTYPES = [int, float, float, float, float, str, float, float]
21def nJy_to_mag(f_njy):
22 """
23 Convert flux density in nanoJanskys (nJy) to AB magnitude.
25 Parameters
26 ----------
27 f_njy : float or array-like
28 Flux density in nanoJanskys.
30 Returns
31 -------
32 float or array-like
33 AB magnitude corresponding to the input flux density.
34 """
35 return 31.4 - 2.5 * np.log10(f_njy)
38def nJy_err_to_mag_err(f_njy, f_err_njy):
39 """
40 Convert flux error in nanoJanskys to magnitude error.
42 Parameters
43 ----------
44 f_njy : float
45 Flux in nanoJanskys.
46 f_err_njy : float
47 Flux error in nanoJanskys.
49 Returns
50 -------
51 float
52 Magnitude error.
53 """
54 return 1.085736 * (f_err_njy / f_njy)
57def compute_ssobject_entry(
58 row, sss, fixedG12=None, magSigmaFloor=0.0, nSigmaClip=None,
59):
60 # just verify we didn't screw up something
61 assert sss["ssObjectId"].nunique() == 1
63 # Metadata columns
64 row["ssObjectId"] = sss["ssObjectId"].iloc[0]
65 row["firstObservationMjdTai"] = sss["dia_midpointMjdTai"].min()
67 if "discoverySubmissionDate" in row.dtype.names: # DP2 does not have this field
68 # FIXME: here I arbitrarily guess we discover everything 7 days
69 # after first obsv. we should really pull this out of the obs_sbn tbl.
70 row["discoverySubmissionDate"] = row["firstObservationMjdTai"] + 7.0
71 row["arc"] = np.ptp(sss["dia_midpointMjdTai"])
72 row["designation"] = sss["designation"].iloc[0]
74 # observation counts
75 row["nObs"] = len(sss)
77 # per band entries
78 for band in "ugrizy":
79 df = sss[sss["dia_band"] == band]
81 # set defaults for this band (equivalents of NULL)
82 row[f"{band}_Chi2"] = np.nan
83 row[f"{band}_G12"] = np.nan
84 row[f"{band}_G12Err"] = np.nan
85 row[f"{band}_H"] = np.nan
86 row[f"{band}_H_{band}_G12_Cov"] = np.nan
87 row[f"{band}_HErr"] = np.nan
88 row[f"{band}_nObsUsed"] = 0
89 row[f"{band}_phaseAngleMin"] = np.nan
90 row[f"{band}_phaseAngleMax"] = np.nan
92 nBandObs = len(df)
93 row[f"{band}_nObs"] = nBandObs
94 if nBandObs > 0:
95 paMin, paMax = df["phaseAngle"].min(), df["phaseAngle"].max()
96 row[f"{band}_phaseAngleMin"] = paMin
97 row[f"{band}_phaseAngleMax"] = paMax
99 if nBandObs > 1:
100 # do the absmag/slope fits, if there are at least two
101 # data points
102 H, G12, sigmaH, sigmaG12, covHG12, chi2dof, nobsv = photfit.fitHG12(
103 df["dia_psfMag"], df["dia_psfMagErr"],
104 df["phaseAngle"], df["topoRange"], df["helioRange"],
105 fixedG12=fixedG12, magSigmaFloor=magSigmaFloor,
106 nSigmaClip=nSigmaClip,
107 )
108 nDof = nBandObs - (1 if fixedG12 is not None else 2)
109 # print(provID, band, H, G12, sigmaH, sigmaG12, covHG12,
110 # chi2dof, nobsv)
112 # mark if the fit failed
113 if np.isnan(G12):
114 row[f"{band}_slope_fit_failed"] = True
115 # FIXME: if fitting fails, we should revert to simple
116 # estimation of H using a fiducial G12 value, storing
117 # that G12 as well.
119 row[f"{band}_Chi2"] = chi2dof * nDof
120 row[f"{band}_G12"] = G12
121 row[f"{band}_G12Err"] = sigmaG12
122 row[f"{band}_H"] = H
123 row[f"{band}_H_{band}_G12_Cov"] = covHG12
124 row[f"{band}_HErr"] = sigmaH
125 row[f"{band}_nObsUsed"] = nobsv
127 # Extendedness
128 row["extendednessMin"] = sss["dia_extendedness"].min()
129 row["extendednessMax"] = sss["dia_extendedness"].max()
130 row["extendednessMedian"] = sss["dia_extendedness"].median()
133def compute_ssobject(
134 sss, dia, mpcorb, fixedG12=None, magSigmaFloor=0.0,
135 nSigmaClip=None,
136):
137 """
138 Compute solar system object properties by joining and processing
139 SSSource, DiaSource, and MPC orbit data.
141 This function takes a pre-grouped SSSource table, joins it with
142 DiaSource data, computes per-object quantities, and calculates
143 additional orbital parameters like Tisserand J and Minimum Orbit
144 Intersection Distance (MOID) with Earth for matching objects.
146 Parameters
147 ----------
148 sss : pandas.DataFrame
149 SSSource table, pre-grouped by 'ssObjectId'. Must be sorted by
150 'ssObjectId' for correct grouping. Contains columns like
151 'ssObjectId', 'diaSourceId', etc.
152 dia : pandas.DataFrame
153 DiaSource table with columns prefixed as 'dia_' in the join.
154 Must include 'dia_diaSourceId', 'dia_psfFlux', 'dia_psfFluxErr',
155 etc.
156 mpcorb : pandas.DataFrame
157 MPC orbit data with columns like
158 'unpacked_primary_provisional_designation', 'q', 'e', 'i',
159 'node', 'argperi'.
161 Returns
162 -------
163 numpy.ndarray
164 Array of ssObject records with dtype schema.ssObjectDtype,
165 containing computed properties for each unique ssObjectId,
166 including magnitudes, orbital elements, Tisserand J, and
167 MOID-related values.
169 Raises
170 ------
171 AssertionError
172 If 'sss' is not pre-grouped by 'ssObjectId', or if DiaSources
173 are missing after join.
175 Notes
176 -----
177 - The function assumes 'sss' is large and avoids internal
178 sorting/copying for efficiency.
179 - Tisserand J and MOID are computed only for objects matching
180 designations in 'mpcorb'.
181 - MOID computation uses a MOIDSolver for each matched object.
182 """
184 # assert that sss is pre-grouped by ssObjectId
185 assert util.values_grouped(sss["ssObjectId"]), (
186 "SSSource table must be pre-grouped by ssObjectId. "
187 "An easy way to do this is to sort by ssObjectId before calling compute_ssobject(). "
188 "The grouping is required for correct per-object computations, and since SSSource is "
189 "typically large and we want to avoid copies, it's not done internally."
190 )
192 # Join the DiaSource parts we're interested in to our SSSource table
193 num = len(sss)
194 dia_tmp = dia[DIA_COLUMNS].add_prefix("dia_") # FIXME: does this cause unnececessary copy?
195 # FIXME: The diaSourceId should really be uint64. But Felis doesn't speak
196 # uint64, but only knows about int64. Yet the pipeline produces uint64
197 # diaSourceId in the dia_source dataset. So we have to cast here to int64
198 # to make the join work (otherwise pyarrow tries to cast to float64, and
199 # the whole thing gloriously explodes).
200 dia_tmp["dia_diaSourceId"] = dia_tmp["dia_diaSourceId"].astype("int64[pyarrow]")
201 sss = sss.merge(dia_tmp, left_on="diaSourceId", right_on="dia_diaSourceId", how="inner")
202 assert num == len(sss), f"{num - len(sss)} DiaSources found missing."
203 del sss["dia_diaSourceId"]
204 del dia_tmp
206 # add magnitude columns
207 sss["dia_psfMag"] = nJy_to_mag(sss["dia_psfFlux"])
208 sss["dia_psfMagErr"] = nJy_err_to_mag_err(sss["dia_psfFlux"], sss["dia_psfFluxErr"])
210 # Pre-create the empty array
211 totalNumObjects = np.unique(sss["ssObjectId"]).size
212 obj = np.zeros(totalNumObjects, dtype=schema.SSObjectDtype)
214 # compute per-object quantities
215 callback = partial(
216 compute_ssobject_entry, fixedG12=fixedG12,
217 magSigmaFloor=magSigmaFloor, nSigmaClip=nSigmaClip,
218 )
219 util.group_by([sss], "ssObjectId", callback, out=obj)
221 #
222 # compute columns that can be efficiently computed in a vector fashon
223 #
224 # Tisserand J
226 if mpcorb is not None:
227 # inner join by provisional designation. We allow for some objects to be
228 # missing from mpcorb (this should not happen often, but it did in DP1).
229 # FIXME: at some point require that no objects are missing. I _think_ that
230 # shouldn't happen in normal operations.
231 oidx, midx = util.argjoin(
232 obj["designation"].astype("U"),
233 mpcorb["unpacked_primary_provisional_designation"].to_numpy().astype("U"),
234 )
235 assert np.all(
236 mpcorb["unpacked_primary_provisional_designation"].take(midx)
237 == obj["designation"][oidx].astype("U")
238 )
239 q, e, i, node, argperi, epoch_mjd = util.unpack(
240 mpcorb["q e i node argperi epoch_mjd".split()].take(midx)
241 )
242 a = q / (1.0 - e)
243 obj["tisserand_J"][oidx] = util.tisserand_jupiter(a, e, i)
245 # MOID computation
246 solver = MOIDSolver()
247 for i, el_obj in enumerate(zip(a, e, i, node, argperi)):
248 earth = earth_orbit(epoch_mjd[i])
249 (moid, deltaV, eclon, trueEarth, trueObject) = solver.compute(earth, el_obj)
250 row = obj[oidx[i]]
251 row["MOIDEarth"] = moid
252 row["MOIDEarthDeltaV"] = deltaV
253 row["MOIDEarthEclipticLongitude"] = eclon
254 row["MOIDEarthTrueAnomaly"] = trueEarth
255 row["MOIDEarthTrueAnomalyObject"] = trueObject
257 return obj
260def main():
261 """
262 CLI entry point for building SSObject table from SSSource,
263 DiaSource, and MPC orbit data.
264 """
265 parser = argparse.ArgumentParser(
266 description="Build SSObject table from SSSource, DiaSource, and MPC orbit Parquet files",
267 formatter_class=argparse.RawDescriptionHelpFormatter,
268 epilog="""
269Examples:
270 ssp-build-ssobject sssource.parquet dia_sources.parquet mpc_orbits.parquet --output ssobject.parquet
271 """,
272 )
274 parser.add_argument("sssource_parquet", help="Path to SSSource Parquet file")
275 parser.add_argument("diasource_parquet", help="Path to DiaSource Parquet file")
276 parser.add_argument("mpcorb_parquet", help="Path to MPC orbits Parquet file")
277 parser.add_argument("--output", "-o", required=True, help="Path to output SSObject Parquet file")
278 parser.add_argument(
279 "--reraise",
280 action="store_true",
281 help="Re-raise exceptions instead of exiting gracefully (for debugging)",
282 )
284 args = parser.parse_args()
286 try:
287 # Load SSSource
288 print(f"Loading SSSource from {args.sssource_parquet}...")
289 sss = pd.read_parquet(args.sssource_parquet, engine="pyarrow", dtype_backend="pyarrow").reset_index(
290 drop=True
291 )
292 num = len(sss)
293 print(f"Loaded {num:,} SSSource rows")
295 # Load DiaSource with required columns
296 dia_columns = [
297 "diaSourceId",
298 "midpointMjdTai",
299 "ra",
300 "dec",
301 "extendedness",
302 "band",
303 "psfFlux",
304 "psfFluxErr",
305 ]
306 print(f"Loading DiaSource from {args.diasource_parquet}...")
307 dia = pd.read_parquet(
308 args.diasource_parquet, engine="pyarrow", dtype_backend="pyarrow", columns=dia_columns
309 ).reset_index(drop=True)
310 print(f"Loaded {len(dia):,} DiaSource rows")
312 # Ensure diaSourceId is uint64
313 assert np.all(dia["diaSourceId"] >= 0)
314 dia["diaSourceId"] = dia["diaSourceId"].astype("uint64[pyarrow]")
316 # Load MPC orbits
317 mpcorb_columns = [
318 "unpacked_primary_provisional_designation",
319 "a",
320 "q",
321 "e",
322 "i",
323 "node",
324 "argperi",
325 "peri_time",
326 "mean_anomaly",
327 "epoch_mjd",
328 "h",
329 "g",
330 ]
331 print(f"Loading MPC orbits from {args.mpcorb_parquet}...")
332 mpcorb = pd.read_parquet(
333 args.mpcorb_parquet, engine="pyarrow", dtype_backend="pyarrow", columns=mpcorb_columns
334 ).reset_index(drop=True)
335 print(f"Loaded {len(mpcorb):,} MPC orbit rows")
337 # Compute SSObject
338 print("Computing SSObject data...")
339 obj = compute_ssobject(sss, dia, mpcorb)
341 # Save result
342 print(f"Saving {len(obj):,} SSObject rows to {args.output}...")
343 util.struct_to_parquet(obj, args.output)
345 print(f"Success! Created SSObject with {len(obj):,} objects")
346 print(f"Row size: {obj.dtype.itemsize:,} bytes, Total size: {obj.nbytes:,} bytes")
348 except Exception as e:
349 print(f"Error: {e}", file=sys.stderr)
350 if args.reraise:
351 raise
352 sys.exit(1)
355if __name__ == "__main__": 355 ↛ 356line 355 didn't jump to line 356 because the condition on line 355 was never true
356 input_dir = "./analysis/inputs"
357 output_dir = "./analysis/outputs"
359 #
360 # Loads
361 #
363 # load SSObject
364 sss = pd.read_parquet(
365 f"{output_dir}/sssource.parquet", engine="pyarrow", dtype_backend="pyarrow"
366 ).reset_index(drop=True)
367 num = len(sss)
369 # load corresponding DiaSource
370 dia = pd.read_parquet(
371 f"{input_dir}/dia_sources.parquet", engine="pyarrow", dtype_backend="pyarrow", columns=DIA_COLUMNS
372 ).reset_index(drop=True)
374 # FIXME: I'm not sure why the datatype is int and not uint here.
375 # Investigate upstream...
376 assert np.all(dia["diaSourceId"] >= 0)
377 dia["diaSourceId"] = dia["diaSourceId"].astype("uint64[pyarrow]")
379 # Load mpcorb
380 mpcorb = pd.read_parquet(
381 f"{input_dir}/mpc_orbits.parquet",
382 engine="pyarrow",
383 dtype_backend="pyarrow",
384 columns=[
385 "unpacked_primary_provisional_designation",
386 "a",
387 "q",
388 "e",
389 "i",
390 "node",
391 "argperi",
392 "peri_time",
393 "mean_anomaly",
394 "epoch_mjd",
395 "h",
396 "g",
397 ],
398 ).reset_index(drop=True)
400 #
401 # Business logic
402 #
403 obj = compute_ssobject(sss, dia, mpcorb)
405 #
406 # Save
407 #
408 util.struct_to_parquet(obj, f"{output_dir}/ssobject.parquet")
410 print(f"row_length={obj.dtype.itemsize:,} bytes, rows={len(obj):,}, {obj.nbytes:,} bytes total")