lsst.pipe.tasks g253578fa50+c1a9b1f270
Loading...
Searching...
No Matches
ssobject.py
Go to the documentation of this file.
1import pandas as pd
2import numpy as np
3from . import photfit
4from . import util
5from . import schema
6from .moid import MOIDSolver, earth_orbit_J2000
7import argparse
8import sys
9
10# The only columns we need from DiaSource.
11# TODO DM-53699: These column names should be taken from and/or checked to
12# match the DiaSource table definition in sdm_schemas
13DIA_COLUMNS = [
14 "diaSourceId", "midpointMjdTai", "ra", "dec", "extendedness",
15 "band", "psfFlux", "psfFluxErr"
16]
17DIA_DTYPES = [int, float, float, float, float, str, float, float]
18
19
20def nJy_to_mag(f_njy):
21 """
22 Convert flux density in nanoJanskys (nJy) to AB magnitude.
23
24 Parameters
25 ----------
26 f_njy : float or array-like
27 Flux density in nanoJanskys.
28
29 Returns
30 -------
31 float or array-like
32 AB magnitude corresponding to the input flux density.
33 """
34 return 31.4 - 2.5 * np.log10(f_njy)
35
36
37def nJy_err_to_mag_err(f_njy, f_err_njy):
38 """
39 Convert flux error in nanoJanskys to magnitude error.
40
41 Parameters
42 ----------
43 f_njy : float
44 Flux in nanoJanskys.
45 f_err_njy : float
46 Flux error in nanoJanskys.
47
48 Returns
49 -------
50 float
51 Magnitude error.
52 """
53 return 1.085736 * (f_err_njy / f_njy)
54
55
57 # just verify we didn't screw up something
58 assert sss["ssObjectId"].nunique() == 1
59
60 # Metadata columns
61 row["ssObjectId"] = sss["ssObjectId"].iloc[0]
62 row["firstObservationMjdTai"] = sss["dia_midpointMjdTai"].min()
63
64 if "discoverySubmissionDate" in row.dtype.names: # DP2 does not have this field
65 # FIXME: here I arbitrarily guess we discover everything 7 days
66 # after first obsv. we should really pull this out of the obs_sbn tbl.
67 row["discoverySubmissionDate"] = row["firstObservationMjdTai"] + 7.0
68 row["arc"] = np.ptp(sss["dia_midpointMjdTai"])
69 row["designation"] = sss["designation"].iloc[0]
70
71 # observation counts
72 row["nObs"] = len(sss)
73
74 # per band entries
75 for band in "ugrizy":
76 df = sss[sss["dia_band"] == band]
77
78 # set defaults for this band (equivalents of NULL)
79 row[f"{band}_Chi2"] = np.nan
80 row[f"{band}_G12"] = np.nan
81 row[f"{band}_G12Err"] = np.nan
82 row[f"{band}_H"] = np.nan
83 row[f"{band}_H_{band}_G12_Cov"] = np.nan
84 row[f"{band}_HErr"] = np.nan
85 row[f"{band}_nObsUsed"] = 0
86 row[f"{band}_phaseAngleMin"] = np.nan
87 row[f"{band}_phaseAngleMax"] = np.nan
88
89 nBandObs = len(df)
90 row[f"{band}_nObs"] = nBandObs
91 if nBandObs > 0:
92 paMin, paMax = df["phaseAngle"].min(), df["phaseAngle"].max()
93 row[f"{band}_phaseAngleMin"] = paMin
94 row[f"{band}_phaseAngleMax"] = paMax
95
96 if nBandObs > 1:
97 # do the absmag/slope fits, if there are at least two
98 # data points
99 H, G12, sigmaH, sigmaG12, covHG12, chi2dof, nobsv = photfit.fitHG12(
100 df["dia_psfMag"], df["dia_psfMagErr"], df["phaseAngle"], df["topoRange"], df["helioRange"]
101 )
102 nDof = nBandObs - 2
103 # print(provID, band, H, G12, sigmaH, sigmaG12, covHG12,
104 # chi2dof, nobsv)
105
106 # mark if the fit failed
107 if np.isnan(G12):
108 row[f"{band}_slope_fit_failed"] = True
109 # FIXME: if fitting fails, we should revert to simple
110 # estimation of H using a fiducial G12 value, storing
111 # that G12 as well.
112
113 row[f"{band}_Chi2"] = chi2dof * nDof
114 row[f"{band}_G12"] = G12
115 row[f"{band}_G12Err"] = sigmaG12
116 row[f"{band}_H"] = H
117 row[f"{band}_H_{band}_G12_Cov"] = covHG12
118 row[f"{band}_HErr"] = sigmaH
119 row[f"{band}_nObsUsed"] = nobsv
120
121 # Extendedness
122 row["extendednessMin"] = sss["dia_extendedness"].min()
123 row["extendednessMax"] = sss["dia_extendedness"].max()
124 row["extendednessMedian"] = sss["dia_extendedness"].median()
125
126
127def compute_ssobject(sss, dia, mpcorb):
128 """
129 Compute solar system object properties by joining and processing
130 SSSource, DiaSource, and MPC orbit data.
131
132 This function takes a pre-grouped SSSource table, joins it with
133 DiaSource data, computes per-object quantities, and calculates
134 additional orbital parameters like Tisserand J and Minimum Orbit
135 Intersection Distance (MOID) with Earth for matching objects.
136
137 Parameters
138 ----------
139 sss : pandas.DataFrame
140 SSSource table, pre-grouped by 'ssObjectId'. Must be sorted by
141 'ssObjectId' for correct grouping. Contains columns like
142 'ssObjectId', 'diaSourceId', etc.
143 dia : pandas.DataFrame
144 DiaSource table with columns prefixed as 'dia_' in the join.
145 Must include 'dia_diaSourceId', 'dia_psfFlux', 'dia_psfFluxErr',
146 etc.
147 mpcorb : pandas.DataFrame
148 MPC orbit data with columns like
149 'unpacked_primary_provisional_designation', 'q', 'e', 'i',
150 'node', 'argperi'.
151
152 Returns
153 -------
154 numpy.ndarray
155 Array of ssObject records with dtype schema.ssObjectDtype,
156 containing computed properties for each unique ssObjectId,
157 including magnitudes, orbital elements, Tisserand J, and
158 MOID-related values.
159
160 Raises
161 ------
162 AssertionError
163 If 'sss' is not pre-grouped by 'ssObjectId', or if DiaSources
164 are missing after join.
165
166 Notes
167 -----
168 - The function assumes 'sss' is large and avoids internal
169 sorting/copying for efficiency.
170 - Tisserand J and MOID are computed only for objects matching
171 designations in 'mpcorb'.
172 - MOID computation uses a MOIDSolver for each matched object.
173 """
174
175 # assert that sss is pre-grouped by ssObjectId
176 assert util.values_grouped(sss["ssObjectId"]), (
177 "SSSource table must be pre-grouped by ssObjectId. "
178 "An easy way to do this is to sort by ssObjectId before calling compute_ssobject(). "
179 "The grouping is required for correct per-object computations, and since SSSource is "
180 "typically large and we want to avoid copies, it's not done internally."
181 )
182
183 # Join the DiaSource parts we're interested in to our SSSource table
184 num = len(sss)
185 dia_tmp = dia[DIA_COLUMNS].add_prefix("dia_") # FIXME: does this cause unnececessary copy?
186 # FIXME: The diaSourceId should really be uint64. But Felis doesn't speak
187 # uint64, but only knows about int64. Yet the pipeline produces uint64
188 # diaSourceId in the dia_source dataset. So we have to cast here to int64
189 # to make the join work (otherwise pyarrow tries to cast to float64, and
190 # the whole thing gloriously explodes).
191 dia_tmp["dia_diaSourceId"] = dia_tmp["dia_diaSourceId"].astype("int64[pyarrow]")
192 sss = sss.merge(dia_tmp, left_on="diaSourceId", right_on="dia_diaSourceId", how="inner")
193 assert num == len(sss), f"{num - len(sss)} DiaSources found missing."
194 del sss["dia_diaSourceId"]
195 del dia_tmp
196
197 # add magnitude columns
198 sss["dia_psfMag"] = nJy_to_mag(sss["dia_psfFlux"])
199 sss["dia_psfMagErr"] = nJy_err_to_mag_err(sss["dia_psfFlux"], sss["dia_psfFluxErr"])
200
201 # Pre-create the empty array
202 totalNumObjects = np.unique(sss["ssObjectId"]).size
203 obj = np.zeros(totalNumObjects, dtype=schema.SSObjectDtype)
204
205 # compute per-object quantities
206 util.group_by([sss], "ssObjectId", compute_ssobject_entry, out=obj)
207
208 #
209 # compute columns that can be efficiently computed in a vector fashon
210 #
211 # Tisserand J
212
213 if mpcorb is not None:
214 # inner join by provisional designation. We allow for some objects to be
215 # missing from mpcorb (this should not happen often, but it did in DP1).
216 # FIXME: at some point require that no objects are missing. I _think_ that
217 # shouldn't happen in normal operations.
218 oidx, midx = util.argjoin(
219 obj["designation"].astype("U"),
220 mpcorb["unpacked_primary_provisional_designation"].to_numpy().astype("U"),
221 )
222 assert np.all(
223 mpcorb["unpacked_primary_provisional_designation"].take(midx)
224 == obj["designation"][oidx].astype("U")
225 )
226 q, e, i, node, argperi = util.unpack(mpcorb["q e i node argperi".split()].take(midx))
227 a = q / (1.0 - e)
228 obj["tisserand_J"][oidx] = util.tisserand_jupiter(a, e, i)
229
230 # MOID computation
231 solver = MOIDSolver()
232 for i, el_obj in enumerate(zip(a, e, i, node, argperi)):
233 (moid, deltaV, eclon, trueEarth, trueObject) = solver.compute(earth_orbit_J2000(), el_obj)
234 row = obj[oidx[i]]
235 row["MOIDEarth"] = moid
236 row["MOIDEarthDeltaV"] = deltaV
237 row["MOIDEarthEclipticLongitude"] = eclon
238 row["MOIDEarthTrueAnomaly"] = trueEarth
239 row["MOIDEarthTrueAnomalyObject"] = trueObject
240
241 return obj
242
243
244def main():
245 """
246 CLI entry point for building SSObject table from SSSource,
247 DiaSource, and MPC orbit data.
248 """
249 parser = argparse.ArgumentParser(
250 description="Build SSObject table from SSSource, DiaSource, and MPC orbit Parquet files",
251 formatter_class=argparse.RawDescriptionHelpFormatter,
252 epilog="""
253Examples:
254 ssp-build-ssobject sssource.parquet dia_sources.parquet mpc_orbits.parquet --output ssobject.parquet
255 """,
256 )
257
258 parser.add_argument("sssource_parquet", help="Path to SSSource Parquet file")
259 parser.add_argument("diasource_parquet", help="Path to DiaSource Parquet file")
260 parser.add_argument("mpcorb_parquet", help="Path to MPC orbits Parquet file")
261 parser.add_argument("--output", "-o", required=True, help="Path to output SSObject Parquet file")
262 parser.add_argument(
263 "--reraise",
264 action="store_true",
265 help="Re-raise exceptions instead of exiting gracefully (for debugging)",
266 )
267
268 args = parser.parse_args()
269
270 try:
271 # Load SSSource
272 print(f"Loading SSSource from {args.sssource_parquet}...")
273 sss = pd.read_parquet(args.sssource_parquet, engine="pyarrow", dtype_backend="pyarrow").reset_index(
274 drop=True
275 )
276 num = len(sss)
277 print(f"Loaded {num:,} SSSource rows")
278
279 # Load DiaSource with required columns
280 dia_columns = [
281 "diaSourceId",
282 "midpointMjdTai",
283 "ra",
284 "dec",
285 "extendedness",
286 "band",
287 "psfFlux",
288 "psfFluxErr",
289 ]
290 print(f"Loading DiaSource from {args.diasource_parquet}...")
291 dia = pd.read_parquet(
292 args.diasource_parquet, engine="pyarrow", dtype_backend="pyarrow", columns=dia_columns
293 ).reset_index(drop=True)
294 print(f"Loaded {len(dia):,} DiaSource rows")
295
296 # Ensure diaSourceId is uint64
297 assert np.all(dia["diaSourceId"] >= 0)
298 dia["diaSourceId"] = dia["diaSourceId"].astype("uint64[pyarrow]")
299
300 # Load MPC orbits
301 mpcorb_columns = [
302 "unpacked_primary_provisional_designation",
303 "a",
304 "q",
305 "e",
306 "i",
307 "node",
308 "argperi",
309 "peri_time",
310 "mean_anomaly",
311 "epoch_mjd",
312 "h",
313 "g",
314 ]
315 print(f"Loading MPC orbits from {args.mpcorb_parquet}...")
316 mpcorb = pd.read_parquet(
317 args.mpcorb_parquet, engine="pyarrow", dtype_backend="pyarrow", columns=mpcorb_columns
318 ).reset_index(drop=True)
319 print(f"Loaded {len(mpcorb):,} MPC orbit rows")
320
321 # Compute SSObject
322 print("Computing SSObject data...")
323 obj = compute_ssobject(sss, dia, mpcorb)
324
325 # Save result
326 print(f"Saving {len(obj):,} SSObject rows to {args.output}...")
327 util.struct_to_parquet(obj, args.output)
328
329 print(f"Success! Created SSObject with {len(obj):,} objects")
330 print(f"Row size: {obj.dtype.itemsize:,} bytes, Total size: {obj.nbytes:,} bytes")
331
332 except Exception as e:
333 print(f"Error: {e}", file=sys.stderr)
334 if args.reraise:
335 raise
336 sys.exit(1)
337
338
339if __name__ == "__main__":
340 input_dir = "./analysis/inputs"
341 output_dir = "./analysis/outputs"
342
343 #
344 # Loads
345 #
346
347 # load SSObject
348 sss = pd.read_parquet(
349 f"{output_dir}/sssource.parquet", engine="pyarrow", dtype_backend="pyarrow"
350 ).reset_index(drop=True)
351 num = len(sss)
352
353 # load corresponding DiaSource
354 dia = pd.read_parquet(
355 f"{input_dir}/dia_sources.parquet", engine="pyarrow", dtype_backend="pyarrow", columns=DIA_COLUMNS
356 ).reset_index(drop=True)
357
358 # FIXME: I'm not sure why the datatype is int and not uint here.
359 # Investigate upstream...
360 assert np.all(dia["diaSourceId"] >= 0)
361 dia["diaSourceId"] = dia["diaSourceId"].astype("uint64[pyarrow]")
362
363 # Load mpcorb
364 mpcorb = pd.read_parquet(
365 f"{input_dir}/mpc_orbits.parquet",
366 engine="pyarrow",
367 dtype_backend="pyarrow",
368 columns=[
369 "unpacked_primary_provisional_designation",
370 "a",
371 "q",
372 "e",
373 "i",
374 "node",
375 "argperi",
376 "peri_time",
377 "mean_anomaly",
378 "epoch_mjd",
379 "h",
380 "g",
381 ],
382 ).reset_index(drop=True)
383
384 #
385 # Business logic
386 #
387 obj = compute_ssobject(sss, dia, mpcorb)
388
389 #
390 # Save
391 #
392 util.struct_to_parquet(obj, f"{output_dir}/ssobject.parquet")
393
394 print(f"row_length={obj.dtype.itemsize:,} bytes, rows={len(obj):,}, {obj.nbytes:,} bytes total")
compute_ssobject(sss, dia, mpcorb)
Definition ssobject.py:127
nJy_err_to_mag_err(f_njy, f_err_njy)
Definition ssobject.py:37