Coverage for python / lsst / pipe / tasks / ssp / ssobject.py: 11%
131 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 00:08 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 00:08 +0000
1import pandas as pd
2import numpy as np
3from . import photfit
4from . import util
5from . import schema
6from .moid import MOIDSolver, earth_orbit_J2000
7import argparse
8import sys
10# The only columns we need from DiaSource.
12DIA_COLUMNS = [
13 "diaSourceId", "midpointMjdTai", "ra", "dec", "extendedness",
14 "band", "psfFlux", "psfFluxErr"
15]
16DIA_DTYPES = [int, float, float, float, float, str, float, float]
19def nJy_to_mag(f_njy):
20 """
21 Convert flux density in nanoJanskys (nJy) to AB magnitude.
23 Parameters
24 ----------
25 f_njy : float or array-like
26 Flux density in nanoJanskys.
28 Returns
29 -------
30 float or array-like
31 AB magnitude corresponding to the input flux density.
32 """
33 return 31.4 - 2.5 * np.log10(f_njy)
36def nJy_err_to_mag_err(f_njy, f_err_njy):
37 """
38 Convert flux error in nanoJanskys to magnitude error.
40 Parameters
41 ----------
42 f_njy : float
43 Flux in nanoJanskys.
44 f_err_njy : float
45 Flux error in nanoJanskys.
47 Returns
48 -------
49 float
50 Magnitude error.
51 """
52 return 1.085736 * (f_err_njy / f_njy)
55def compute_ssobject_entry(row, sss):
56 # just verify we didn't screw up something
57 assert sss["ssObjectId"].nunique() == 1
59 # Metadata columns
60 row["ssObjectId"] = sss["ssObjectId"].iloc[0]
61 row["firstObservationMjdTai"] = sss["dia_midpointMjdTai"].min()
63 if "discoverySubmissionDate" in row.dtype.names: # DP2 does not have this field
64 # FIXME: here I arbitrarily guess we discover everything 7 days
65 # after first obsv. we should really pull this out of the obs_sbn tbl.
66 row["discoverySubmissionDate"] = row["firstObservationMjdTai"] + 7.0
67 row["arc"] = np.ptp(sss["dia_midpointMjdTai"])
68 row["designation"] = sss["designation"].iloc[0]
70 # observation counts
71 row["nObs"] = len(sss)
73 # per band entries
74 for band in "ugrizy":
75 df = sss[sss["dia_band"] == band]
77 # set defaults for this band (equivalents of NULL)
78 row[f"{band}_Chi2"] = np.nan
79 row[f"{band}_G12"] = np.nan
80 row[f"{band}_G12Err"] = np.nan
81 row[f"{band}_H"] = np.nan
82 row[f"{band}_H_{band}_G12_Cov"] = np.nan
83 row[f"{band}_HErr"] = np.nan
84 row[f"{band}_nObsUsed"] = 0
85 row[f"{band}_phaseAngleMin"] = np.nan
86 row[f"{band}_phaseAngleMax"] = np.nan
88 nBandObs = len(df)
89 row[f"{band}_nObs"] = nBandObs
90 if nBandObs > 0:
91 paMin, paMax = df["phaseAngle"].min(), df["phaseAngle"].max()
92 row[f"{band}_phaseAngleMin"] = paMin
93 row[f"{band}_phaseAngleMax"] = paMax
95 if nBandObs > 1:
96 # do the absmag/slope fits, if there are at least two
97 # data points
98 H, G12, sigmaH, sigmaG12, covHG12, chi2dof, nobsv = photfit.fitHG12(
99 df["dia_psfMag"], df["dia_psfMagErr"], df["phaseAngle"], df["topoRange"], df["helioRange"]
100 )
101 nDof = nBandObs - 2
102 # print(provID, band, H, G12, sigmaH, sigmaG12, covHG12,
103 # chi2dof, nobsv)
105 # mark if the fit failed
106 if np.isnan(G12):
107 row[f"{band}_slope_fit_failed"] = True
108 # FIXME: if fitting fails, we should revert to simple
109 # estimation of H using a fiducial G12 value, storing
110 # that G12 as well.
112 row[f"{band}_Chi2"] = chi2dof * nDof
113 row[f"{band}_G12"] = G12
114 row[f"{band}_G12Err"] = sigmaG12
115 row[f"{band}_H"] = H
116 row[f"{band}_H_{band}_G12_Cov"] = covHG12
117 row[f"{band}_HErr"] = sigmaH
118 row[f"{band}_nObsUsed"] = nobsv
120 # Extendedness
121 row["extendednessMin"] = sss["dia_extendedness"].min()
122 row["extendednessMax"] = sss["dia_extendedness"].max()
123 row["extendednessMedian"] = sss["dia_extendedness"].median()
126def compute_ssobject(sss, dia, mpcorb):
127 """
128 Compute solar system object properties by joining and processing
129 SSSource, DiaSource, and MPC orbit data.
131 This function takes a pre-grouped SSSource table, joins it with
132 DiaSource data, computes per-object quantities, and calculates
133 additional orbital parameters like Tisserand J and Minimum Orbit
134 Intersection Distance (MOID) with Earth for matching objects.
136 Parameters
137 ----------
138 sss : pandas.DataFrame
139 SSSource table, pre-grouped by 'ssObjectId'. Must be sorted by
140 'ssObjectId' for correct grouping. Contains columns like
141 'ssObjectId', 'diaSourceId', etc.
142 dia : pandas.DataFrame
143 DiaSource table with columns prefixed as 'dia_' in the join.
144 Must include 'dia_diaSourceId', 'dia_psfFlux', 'dia_psfFluxErr',
145 etc.
146 mpcorb : pandas.DataFrame
147 MPC orbit data with columns like
148 'unpacked_primary_provisional_designation', 'q', 'e', 'i',
149 'node', 'argperi'.
151 Returns
152 -------
153 numpy.ndarray
154 Array of ssObject records with dtype schema.ssObjectDtype,
155 containing computed properties for each unique ssObjectId,
156 including magnitudes, orbital elements, Tisserand J, and
157 MOID-related values.
159 Raises
160 ------
161 AssertionError
162 If 'sss' is not pre-grouped by 'ssObjectId', or if DiaSources
163 are missing after join.
165 Notes
166 -----
167 - The function assumes 'sss' is large and avoids internal
168 sorting/copying for efficiency.
169 - Tisserand J and MOID are computed only for objects matching
170 designations in 'mpcorb'.
171 - MOID computation uses a MOIDSolver for each matched object.
172 """
174 # assert that sss is pre-grouped by ssObjectId
175 assert util.values_grouped(sss["ssObjectId"]), (
176 "SSSource table must be pre-grouped by ssObjectId. "
177 "An easy way to do this is to sort by ssObjectId before calling compute_ssobject(). "
178 "The grouping is required for correct per-object computations, and since SSSource is "
179 "typically large and we want to avoid copies, it's not done internally."
180 )
182 # Join the DiaSource parts we're interested in to our SSSource table
183 num = len(sss)
184 dia_tmp = dia[DIA_COLUMNS].add_prefix("dia_") # FIXME: does this cause unnececessary copy?
185 # FIXME: The diaSourceId should really be uint64. But Felis doesn't speak
186 # uint64, but only knows about int64. Yet the pipeline produces uint64
187 # diaSourceId in the dia_source dataset. So we have to cast here to int64
188 # to make the join work (otherwise pyarrow tries to cast to float64, and
189 # the whole thing gloriously explodes).
190 dia_tmp["dia_diaSourceId"] = dia_tmp["dia_diaSourceId"].astype("int64[pyarrow]")
191 sss = sss.merge(dia_tmp, left_on="diaSourceId", right_on="dia_diaSourceId", how="inner")
192 assert num == len(sss), f"{num - len(sss)} DiaSources found missing."
193 del sss["dia_diaSourceId"]
194 del dia_tmp
196 # add magnitude columns
197 sss["dia_psfMag"] = nJy_to_mag(sss["dia_psfFlux"])
198 sss["dia_psfMagErr"] = nJy_err_to_mag_err(sss["dia_psfFlux"], sss["dia_psfFluxErr"])
200 # Pre-create the empty array
201 totalNumObjects = np.unique(sss["ssObjectId"]).size
202 obj = np.zeros(totalNumObjects, dtype=schema.SSObjectDtype)
204 # compute per-object quantities
205 util.group_by([sss], "ssObjectId", compute_ssobject_entry, out=obj)
207 #
208 # compute columns that can be efficiently computed in a vector fashon
209 #
210 # Tisserand J
212 if mpcorb is not None:
213 # inner join by provisional designation. We allow for some objects to be
214 # missing from mpcorb (this should not happen often, but it did in DP1).
215 # FIXME: at some point require that no objects are missing. I _think_ that
216 # shouldn't happen in normal operations.
217 oidx, midx = util.argjoin(
218 obj["designation"].astype("U"),
219 mpcorb["unpacked_primary_provisional_designation"].to_numpy().astype("U"),
220 )
221 assert np.all(
222 mpcorb["unpacked_primary_provisional_designation"].take(midx)
223 == obj["designation"][oidx].astype("U")
224 )
225 q, e, i, node, argperi = util.unpack(mpcorb["q e i node argperi".split()].take(midx))
226 a = q / (1.0 - e)
227 obj["tisserand_J"][oidx] = util.tisserand_jupiter(a, e, i)
229 # MOID computation
230 solver = MOIDSolver()
231 for i, el_obj in enumerate(zip(a, e, i, node, argperi)):
232 (moid, deltaV, eclon, trueEarth, trueObject) = solver.compute(earth_orbit_J2000(), el_obj)
233 row = obj[oidx[i]]
234 row["MOIDEarth"] = moid
235 row["MOIDEarthDeltaV"] = deltaV
236 row["MOIDEarthEclipticLongitude"] = eclon
237 row["MOIDEarthTrueAnomaly"] = trueEarth
238 row["MOIDEarthTrueAnomalyObject"] = trueObject
240 return obj
243def main():
244 """
245 CLI entry point for building SSObject table from SSSource,
246 DiaSource, and MPC orbit data.
247 """
248 parser = argparse.ArgumentParser(
249 description="Build SSObject table from SSSource, DiaSource, and MPC orbit Parquet files",
250 formatter_class=argparse.RawDescriptionHelpFormatter,
251 epilog="""
252Examples:
253 ssp-build-ssobject sssource.parquet dia_sources.parquet mpc_orbits.parquet --output ssobject.parquet
254 """,
255 )
257 parser.add_argument("sssource_parquet", help="Path to SSSource Parquet file")
258 parser.add_argument("diasource_parquet", help="Path to DiaSource Parquet file")
259 parser.add_argument("mpcorb_parquet", help="Path to MPC orbits Parquet file")
260 parser.add_argument("--output", "-o", required=True, help="Path to output SSObject Parquet file")
261 parser.add_argument(
262 "--reraise",
263 action="store_true",
264 help="Re-raise exceptions instead of exiting gracefully (for debugging)",
265 )
267 args = parser.parse_args()
269 try:
270 # Load SSSource
271 print(f"Loading SSSource from {args.sssource_parquet}...")
272 sss = pd.read_parquet(args.sssource_parquet, engine="pyarrow", dtype_backend="pyarrow").reset_index(
273 drop=True
274 )
275 num = len(sss)
276 print(f"Loaded {num:,} SSSource rows")
278 # Load DiaSource with required columns
279 dia_columns = [
280 "diaSourceId",
281 "midpointMjdTai",
282 "ra",
283 "dec",
284 "extendedness",
285 "band",
286 "psfFlux",
287 "psfFluxErr",
288 ]
289 print(f"Loading DiaSource from {args.diasource_parquet}...")
290 dia = pd.read_parquet(
291 args.diasource_parquet, engine="pyarrow", dtype_backend="pyarrow", columns=dia_columns
292 ).reset_index(drop=True)
293 print(f"Loaded {len(dia):,} DiaSource rows")
295 # Ensure diaSourceId is uint64
296 assert np.all(dia["diaSourceId"] >= 0)
297 dia["diaSourceId"] = dia["diaSourceId"].astype("uint64[pyarrow]")
299 # Load MPC orbits
300 mpcorb_columns = [
301 "unpacked_primary_provisional_designation",
302 "a",
303 "q",
304 "e",
305 "i",
306 "node",
307 "argperi",
308 "peri_time",
309 "mean_anomaly",
310 "epoch_mjd",
311 "h",
312 "g",
313 ]
314 print(f"Loading MPC orbits from {args.mpcorb_parquet}...")
315 mpcorb = pd.read_parquet(
316 args.mpcorb_parquet, engine="pyarrow", dtype_backend="pyarrow", columns=mpcorb_columns
317 ).reset_index(drop=True)
318 print(f"Loaded {len(mpcorb):,} MPC orbit rows")
320 # Compute SSObject
321 print("Computing SSObject data...")
322 obj = compute_ssobject(sss, dia, mpcorb)
324 # Save result
325 print(f"Saving {len(obj):,} SSObject rows to {args.output}...")
326 util.struct_to_parquet(obj, args.output)
328 print(f"Success! Created SSObject with {len(obj):,} objects")
329 print(f"Row size: {obj.dtype.itemsize:,} bytes, Total size: {obj.nbytes:,} bytes")
331 except Exception as e:
332 print(f"Error: {e}", file=sys.stderr)
333 if args.reraise:
334 raise
335 sys.exit(1)
338if __name__ == "__main__": 338 ↛ 339line 338 didn't jump to line 339 because the condition on line 338 was never true
339 input_dir = "./analysis/inputs"
340 output_dir = "./analysis/outputs"
342 #
343 # Loads
344 #
346 # load SSObject
347 sss = pd.read_parquet(
348 f"{output_dir}/sssource.parquet", engine="pyarrow", dtype_backend="pyarrow"
349 ).reset_index(drop=True)
350 num = len(sss)
352 # load corresponding DiaSource
353 dia = pd.read_parquet(
354 f"{input_dir}/dia_sources.parquet", engine="pyarrow", dtype_backend="pyarrow", columns=DIA_COLUMNS
355 ).reset_index(drop=True)
357 # FIXME: I'm not sure why the datatype is int and not uint here.
358 # Investigate upstream...
359 assert np.all(dia["diaSourceId"] >= 0)
360 dia["diaSourceId"] = dia["diaSourceId"].astype("uint64[pyarrow]")
362 # Load mpcorb
363 mpcorb = pd.read_parquet(
364 f"{input_dir}/mpc_orbits.parquet",
365 engine="pyarrow",
366 dtype_backend="pyarrow",
367 columns=[
368 "unpacked_primary_provisional_designation",
369 "a",
370 "q",
371 "e",
372 "i",
373 "node",
374 "argperi",
375 "peri_time",
376 "mean_anomaly",
377 "epoch_mjd",
378 "h",
379 "g",
380 ],
381 ).reset_index(drop=True)
383 #
384 # Business logic
385 #
386 obj = compute_ssobject(sss, dia, mpcorb)
388 #
389 # Save
390 #
391 util.struct_to_parquet(obj, f"{output_dir}/ssobject.parquet")
393 print(f"row_length={obj.dtype.itemsize:,} bytes, rows={len(obj):,}, {obj.nbytes:,} bytes total")