Coverage for python / lsst / pipe / tasks / ssp / ssobject.py: 12%

134 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-05 08:30 +0000

1import pandas as pd 

2import numpy as np 

3from functools import partial 

4from . import photfit 

5from . import util 

6from . import schema 

7from .moid import MOIDSolver, earth_orbit 

8import argparse 

9import sys 

10 

11# The only columns we need from DiaSource. 

12# TODO DM-53699: These column names should be taken from and/or checked to 

13# match the DiaSource table definition in sdm_schemas 

14DIA_COLUMNS = [ 

15 "diaSourceId", "midpointMjdTai", "ra", "dec", "extendedness", 

16 "band", "psfFlux", "psfFluxErr" 

17] 

18DIA_DTYPES = [int, float, float, float, float, str, float, float] 

19 

20 

21def nJy_to_mag(f_njy): 

22 """ 

23 Convert flux density in nanoJanskys (nJy) to AB magnitude. 

24 

25 Parameters 

26 ---------- 

27 f_njy : float or array-like 

28 Flux density in nanoJanskys. 

29 

30 Returns 

31 ------- 

32 float or array-like 

33 AB magnitude corresponding to the input flux density. 

34 """ 

35 return 31.4 - 2.5 * np.log10(f_njy) 

36 

37 

38def nJy_err_to_mag_err(f_njy, f_err_njy): 

39 """ 

40 Convert flux error in nanoJanskys to magnitude error. 

41 

42 Parameters 

43 ---------- 

44 f_njy : float 

45 Flux in nanoJanskys. 

46 f_err_njy : float 

47 Flux error in nanoJanskys. 

48 

49 Returns 

50 ------- 

51 float 

52 Magnitude error. 

53 """ 

54 return 1.085736 * (f_err_njy / f_njy) 

55 

56 

57def compute_ssobject_entry( 

58 row, sss, fixedG12=None, magSigmaFloor=0.0, nSigmaClip=None, 

59): 

60 # just verify we didn't screw up something 

61 assert sss["ssObjectId"].nunique() == 1 

62 

63 # Metadata columns 

64 row["ssObjectId"] = sss["ssObjectId"].iloc[0] 

65 row["firstObservationMjdTai"] = sss["dia_midpointMjdTai"].min() 

66 

67 if "discoverySubmissionDate" in row.dtype.names: # DP2 does not have this field 

68 # FIXME: here I arbitrarily guess we discover everything 7 days 

69 # after first obsv. we should really pull this out of the obs_sbn tbl. 

70 row["discoverySubmissionDate"] = row["firstObservationMjdTai"] + 7.0 

71 row["arc"] = np.ptp(sss["dia_midpointMjdTai"]) 

72 row["designation"] = sss["designation"].iloc[0] 

73 

74 # observation counts 

75 row["nObs"] = len(sss) 

76 

77 # per band entries 

78 for band in "ugrizy": 

79 df = sss[sss["dia_band"] == band] 

80 

81 # set defaults for this band (equivalents of NULL) 

82 row[f"{band}_Chi2"] = np.nan 

83 row[f"{band}_G12"] = np.nan 

84 row[f"{band}_G12Err"] = np.nan 

85 row[f"{band}_H"] = np.nan 

86 row[f"{band}_H_{band}_G12_Cov"] = np.nan 

87 row[f"{band}_HErr"] = np.nan 

88 row[f"{band}_nObsUsed"] = 0 

89 row[f"{band}_phaseAngleMin"] = np.nan 

90 row[f"{band}_phaseAngleMax"] = np.nan 

91 

92 nBandObs = len(df) 

93 row[f"{band}_nObs"] = nBandObs 

94 if nBandObs > 0: 

95 paMin, paMax = df["phaseAngle"].min(), df["phaseAngle"].max() 

96 row[f"{band}_phaseAngleMin"] = paMin 

97 row[f"{band}_phaseAngleMax"] = paMax 

98 

99 if nBandObs > 1: 

100 # do the absmag/slope fits, if there are at least two 

101 # data points 

102 H, G12, sigmaH, sigmaG12, covHG12, chi2dof, nobsv = photfit.fitHG12( 

103 df["dia_psfMag"], df["dia_psfMagErr"], 

104 df["phaseAngle"], df["topoRange"], df["helioRange"], 

105 fixedG12=fixedG12, magSigmaFloor=magSigmaFloor, 

106 nSigmaClip=nSigmaClip, 

107 ) 

108 nDof = nBandObs - (1 if fixedG12 is not None else 2) 

109 # print(provID, band, H, G12, sigmaH, sigmaG12, covHG12, 

110 # chi2dof, nobsv) 

111 

112 # mark if the fit failed 

113 if np.isnan(G12): 

114 row[f"{band}_slope_fit_failed"] = True 

115 # FIXME: if fitting fails, we should revert to simple 

116 # estimation of H using a fiducial G12 value, storing 

117 # that G12 as well. 

118 

119 row[f"{band}_Chi2"] = chi2dof * nDof 

120 row[f"{band}_G12"] = G12 

121 row[f"{band}_G12Err"] = sigmaG12 

122 row[f"{band}_H"] = H 

123 row[f"{band}_H_{band}_G12_Cov"] = covHG12 

124 row[f"{band}_HErr"] = sigmaH 

125 row[f"{band}_nObsUsed"] = nobsv 

126 

127 # Extendedness 

128 row["extendednessMin"] = sss["dia_extendedness"].min() 

129 row["extendednessMax"] = sss["dia_extendedness"].max() 

130 row["extendednessMedian"] = sss["dia_extendedness"].median() 

131 

132 

133def compute_ssobject( 

134 sss, dia, mpcorb, fixedG12=None, magSigmaFloor=0.0, 

135 nSigmaClip=None, 

136): 

137 """ 

138 Compute solar system object properties by joining and processing 

139 SSSource, DiaSource, and MPC orbit data. 

140 

141 This function takes a pre-grouped SSSource table, joins it with 

142 DiaSource data, computes per-object quantities, and calculates 

143 additional orbital parameters like Tisserand J and Minimum Orbit 

144 Intersection Distance (MOID) with Earth for matching objects. 

145 

146 Parameters 

147 ---------- 

148 sss : pandas.DataFrame 

149 SSSource table, pre-grouped by 'ssObjectId'. Must be sorted by 

150 'ssObjectId' for correct grouping. Contains columns like 

151 'ssObjectId', 'diaSourceId', etc. 

152 dia : pandas.DataFrame 

153 DiaSource table with columns prefixed as 'dia_' in the join. 

154 Must include 'dia_diaSourceId', 'dia_psfFlux', 'dia_psfFluxErr', 

155 etc. 

156 mpcorb : pandas.DataFrame 

157 MPC orbit data with columns like 

158 'unpacked_primary_provisional_designation', 'q', 'e', 'i', 

159 'node', 'argperi'. 

160 

161 Returns 

162 ------- 

163 numpy.ndarray 

164 Array of ssObject records with dtype schema.ssObjectDtype, 

165 containing computed properties for each unique ssObjectId, 

166 including magnitudes, orbital elements, Tisserand J, and 

167 MOID-related values. 

168 

169 Raises 

170 ------ 

171 AssertionError 

172 If 'sss' is not pre-grouped by 'ssObjectId', or if DiaSources 

173 are missing after join. 

174 

175 Notes 

176 ----- 

177 - The function assumes 'sss' is large and avoids internal 

178 sorting/copying for efficiency. 

179 - Tisserand J and MOID are computed only for objects matching 

180 designations in 'mpcorb'. 

181 - MOID computation uses a MOIDSolver for each matched object. 

182 """ 

183 

184 # assert that sss is pre-grouped by ssObjectId 

185 assert util.values_grouped(sss["ssObjectId"]), ( 

186 "SSSource table must be pre-grouped by ssObjectId. " 

187 "An easy way to do this is to sort by ssObjectId before calling compute_ssobject(). " 

188 "The grouping is required for correct per-object computations, and since SSSource is " 

189 "typically large and we want to avoid copies, it's not done internally." 

190 ) 

191 

192 # Join the DiaSource parts we're interested in to our SSSource table 

193 num = len(sss) 

194 dia_tmp = dia[DIA_COLUMNS].add_prefix("dia_") # FIXME: does this cause unnececessary copy? 

195 # FIXME: The diaSourceId should really be uint64. But Felis doesn't speak 

196 # uint64, but only knows about int64. Yet the pipeline produces uint64 

197 # diaSourceId in the dia_source dataset. So we have to cast here to int64 

198 # to make the join work (otherwise pyarrow tries to cast to float64, and 

199 # the whole thing gloriously explodes). 

200 dia_tmp["dia_diaSourceId"] = dia_tmp["dia_diaSourceId"].astype("int64[pyarrow]") 

201 sss = sss.merge(dia_tmp, left_on="diaSourceId", right_on="dia_diaSourceId", how="inner") 

202 assert num == len(sss), f"{num - len(sss)} DiaSources found missing." 

203 del sss["dia_diaSourceId"] 

204 del dia_tmp 

205 

206 # add magnitude columns 

207 sss["dia_psfMag"] = nJy_to_mag(sss["dia_psfFlux"]) 

208 sss["dia_psfMagErr"] = nJy_err_to_mag_err(sss["dia_psfFlux"], sss["dia_psfFluxErr"]) 

209 

210 # Pre-create the empty array 

211 totalNumObjects = np.unique(sss["ssObjectId"]).size 

212 obj = np.zeros(totalNumObjects, dtype=schema.SSObjectDtype) 

213 

214 # compute per-object quantities 

215 callback = partial( 

216 compute_ssobject_entry, fixedG12=fixedG12, 

217 magSigmaFloor=magSigmaFloor, nSigmaClip=nSigmaClip, 

218 ) 

219 util.group_by([sss], "ssObjectId", callback, out=obj) 

220 

221 # 

222 # compute columns that can be efficiently computed in a vector fashon 

223 # 

224 # Tisserand J 

225 

226 if mpcorb is not None: 

227 # inner join by provisional designation. We allow for some objects to be 

228 # missing from mpcorb (this should not happen often, but it did in DP1). 

229 # FIXME: at some point require that no objects are missing. I _think_ that 

230 # shouldn't happen in normal operations. 

231 oidx, midx = util.argjoin( 

232 obj["designation"].astype("U"), 

233 mpcorb["unpacked_primary_provisional_designation"].to_numpy().astype("U"), 

234 ) 

235 assert np.all( 

236 mpcorb["unpacked_primary_provisional_designation"].take(midx) 

237 == obj["designation"][oidx].astype("U") 

238 ) 

239 q, e, i, node, argperi, epoch_mjd = util.unpack( 

240 mpcorb["q e i node argperi epoch_mjd".split()].take(midx) 

241 ) 

242 a = q / (1.0 - e) 

243 obj["tisserand_J"][oidx] = util.tisserand_jupiter(a, e, i) 

244 

245 # MOID computation 

246 solver = MOIDSolver() 

247 for i, el_obj in enumerate(zip(a, e, i, node, argperi)): 

248 earth = earth_orbit(epoch_mjd[i]) 

249 (moid, deltaV, eclon, trueEarth, trueObject) = solver.compute(earth, el_obj) 

250 row = obj[oidx[i]] 

251 row["MOIDEarth"] = moid 

252 row["MOIDEarthDeltaV"] = deltaV 

253 row["MOIDEarthEclipticLongitude"] = eclon 

254 row["MOIDEarthTrueAnomaly"] = trueEarth 

255 row["MOIDEarthTrueAnomalyObject"] = trueObject 

256 

257 return obj 

258 

259 

260def main(): 

261 """ 

262 CLI entry point for building SSObject table from SSSource, 

263 DiaSource, and MPC orbit data. 

264 """ 

265 parser = argparse.ArgumentParser( 

266 description="Build SSObject table from SSSource, DiaSource, and MPC orbit Parquet files", 

267 formatter_class=argparse.RawDescriptionHelpFormatter, 

268 epilog=""" 

269Examples: 

270 ssp-build-ssobject sssource.parquet dia_sources.parquet mpc_orbits.parquet --output ssobject.parquet 

271 """, 

272 ) 

273 

274 parser.add_argument("sssource_parquet", help="Path to SSSource Parquet file") 

275 parser.add_argument("diasource_parquet", help="Path to DiaSource Parquet file") 

276 parser.add_argument("mpcorb_parquet", help="Path to MPC orbits Parquet file") 

277 parser.add_argument("--output", "-o", required=True, help="Path to output SSObject Parquet file") 

278 parser.add_argument( 

279 "--reraise", 

280 action="store_true", 

281 help="Re-raise exceptions instead of exiting gracefully (for debugging)", 

282 ) 

283 

284 args = parser.parse_args() 

285 

286 try: 

287 # Load SSSource 

288 print(f"Loading SSSource from {args.sssource_parquet}...") 

289 sss = pd.read_parquet(args.sssource_parquet, engine="pyarrow", dtype_backend="pyarrow").reset_index( 

290 drop=True 

291 ) 

292 num = len(sss) 

293 print(f"Loaded {num:,} SSSource rows") 

294 

295 # Load DiaSource with required columns 

296 dia_columns = [ 

297 "diaSourceId", 

298 "midpointMjdTai", 

299 "ra", 

300 "dec", 

301 "extendedness", 

302 "band", 

303 "psfFlux", 

304 "psfFluxErr", 

305 ] 

306 print(f"Loading DiaSource from {args.diasource_parquet}...") 

307 dia = pd.read_parquet( 

308 args.diasource_parquet, engine="pyarrow", dtype_backend="pyarrow", columns=dia_columns 

309 ).reset_index(drop=True) 

310 print(f"Loaded {len(dia):,} DiaSource rows") 

311 

312 # Ensure diaSourceId is uint64 

313 assert np.all(dia["diaSourceId"] >= 0) 

314 dia["diaSourceId"] = dia["diaSourceId"].astype("uint64[pyarrow]") 

315 

316 # Load MPC orbits 

317 mpcorb_columns = [ 

318 "unpacked_primary_provisional_designation", 

319 "a", 

320 "q", 

321 "e", 

322 "i", 

323 "node", 

324 "argperi", 

325 "peri_time", 

326 "mean_anomaly", 

327 "epoch_mjd", 

328 "h", 

329 "g", 

330 ] 

331 print(f"Loading MPC orbits from {args.mpcorb_parquet}...") 

332 mpcorb = pd.read_parquet( 

333 args.mpcorb_parquet, engine="pyarrow", dtype_backend="pyarrow", columns=mpcorb_columns 

334 ).reset_index(drop=True) 

335 print(f"Loaded {len(mpcorb):,} MPC orbit rows") 

336 

337 # Compute SSObject 

338 print("Computing SSObject data...") 

339 obj = compute_ssobject(sss, dia, mpcorb) 

340 

341 # Save result 

342 print(f"Saving {len(obj):,} SSObject rows to {args.output}...") 

343 util.struct_to_parquet(obj, args.output) 

344 

345 print(f"Success! Created SSObject with {len(obj):,} objects") 

346 print(f"Row size: {obj.dtype.itemsize:,} bytes, Total size: {obj.nbytes:,} bytes") 

347 

348 except Exception as e: 

349 print(f"Error: {e}", file=sys.stderr) 

350 if args.reraise: 

351 raise 

352 sys.exit(1) 

353 

354 

355if __name__ == "__main__": 355 ↛ 356line 355 didn't jump to line 356 because the condition on line 355 was never true

356 input_dir = "./analysis/inputs" 

357 output_dir = "./analysis/outputs" 

358 

359 # 

360 # Loads 

361 # 

362 

363 # load SSObject 

364 sss = pd.read_parquet( 

365 f"{output_dir}/sssource.parquet", engine="pyarrow", dtype_backend="pyarrow" 

366 ).reset_index(drop=True) 

367 num = len(sss) 

368 

369 # load corresponding DiaSource 

370 dia = pd.read_parquet( 

371 f"{input_dir}/dia_sources.parquet", engine="pyarrow", dtype_backend="pyarrow", columns=DIA_COLUMNS 

372 ).reset_index(drop=True) 

373 

374 # FIXME: I'm not sure why the datatype is int and not uint here. 

375 # Investigate upstream... 

376 assert np.all(dia["diaSourceId"] >= 0) 

377 dia["diaSourceId"] = dia["diaSourceId"].astype("uint64[pyarrow]") 

378 

379 # Load mpcorb 

380 mpcorb = pd.read_parquet( 

381 f"{input_dir}/mpc_orbits.parquet", 

382 engine="pyarrow", 

383 dtype_backend="pyarrow", 

384 columns=[ 

385 "unpacked_primary_provisional_designation", 

386 "a", 

387 "q", 

388 "e", 

389 "i", 

390 "node", 

391 "argperi", 

392 "peri_time", 

393 "mean_anomaly", 

394 "epoch_mjd", 

395 "h", 

396 "g", 

397 ], 

398 ).reset_index(drop=True) 

399 

400 # 

401 # Business logic 

402 # 

403 obj = compute_ssobject(sss, dia, mpcorb) 

404 

405 # 

406 # Save 

407 # 

408 util.struct_to_parquet(obj, f"{output_dir}/ssobject.parquet") 

409 

410 print(f"row_length={obj.dtype.itemsize:,} bytes, rows={len(obj):,}, {obj.nbytes:,} bytes total")