Coverage for python / lsst / pipe / tasks / ssp / ssobject.py: 11%

131 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-15 00:08 +0000

1import pandas as pd 

2import numpy as np 

3from . import photfit 

4from . import util 

5from . import schema 

6from .moid import MOIDSolver, earth_orbit_J2000 

7import argparse 

8import sys 

9 

10# The only columns we need from DiaSource. 

11 

12DIA_COLUMNS = [ 

13 "diaSourceId", "midpointMjdTai", "ra", "dec", "extendedness", 

14 "band", "psfFlux", "psfFluxErr" 

15] 

16DIA_DTYPES = [int, float, float, float, float, str, float, float] 

17 

18 

19def nJy_to_mag(f_njy): 

20 """ 

21 Convert flux density in nanoJanskys (nJy) to AB magnitude. 

22 

23 Parameters 

24 ---------- 

25 f_njy : float or array-like 

26 Flux density in nanoJanskys. 

27 

28 Returns 

29 ------- 

30 float or array-like 

31 AB magnitude corresponding to the input flux density. 

32 """ 

33 return 31.4 - 2.5 * np.log10(f_njy) 

34 

35 

36def nJy_err_to_mag_err(f_njy, f_err_njy): 

37 """ 

38 Convert flux error in nanoJanskys to magnitude error. 

39 

40 Parameters 

41 ---------- 

42 f_njy : float 

43 Flux in nanoJanskys. 

44 f_err_njy : float 

45 Flux error in nanoJanskys. 

46 

47 Returns 

48 ------- 

49 float 

50 Magnitude error. 

51 """ 

52 return 1.085736 * (f_err_njy / f_njy) 

53 

54 

55def compute_ssobject_entry(row, sss): 

56 # just verify we didn't screw up something 

57 assert sss["ssObjectId"].nunique() == 1 

58 

59 # Metadata columns 

60 row["ssObjectId"] = sss["ssObjectId"].iloc[0] 

61 row["firstObservationMjdTai"] = sss["dia_midpointMjdTai"].min() 

62 

63 if "discoverySubmissionDate" in row.dtype.names: # DP2 does not have this field 

64 # FIXME: here I arbitrarily guess we discover everything 7 days 

65 # after first obsv. we should really pull this out of the obs_sbn tbl. 

66 row["discoverySubmissionDate"] = row["firstObservationMjdTai"] + 7.0 

67 row["arc"] = np.ptp(sss["dia_midpointMjdTai"]) 

68 row["designation"] = sss["designation"].iloc[0] 

69 

70 # observation counts 

71 row["nObs"] = len(sss) 

72 

73 # per band entries 

74 for band in "ugrizy": 

75 df = sss[sss["dia_band"] == band] 

76 

77 # set defaults for this band (equivalents of NULL) 

78 row[f"{band}_Chi2"] = np.nan 

79 row[f"{band}_G12"] = np.nan 

80 row[f"{band}_G12Err"] = np.nan 

81 row[f"{band}_H"] = np.nan 

82 row[f"{band}_H_{band}_G12_Cov"] = np.nan 

83 row[f"{band}_HErr"] = np.nan 

84 row[f"{band}_nObsUsed"] = 0 

85 row[f"{band}_phaseAngleMin"] = np.nan 

86 row[f"{band}_phaseAngleMax"] = np.nan 

87 

88 nBandObs = len(df) 

89 row[f"{band}_nObs"] = nBandObs 

90 if nBandObs > 0: 

91 paMin, paMax = df["phaseAngle"].min(), df["phaseAngle"].max() 

92 row[f"{band}_phaseAngleMin"] = paMin 

93 row[f"{band}_phaseAngleMax"] = paMax 

94 

95 if nBandObs > 1: 

96 # do the absmag/slope fits, if there are at least two 

97 # data points 

98 H, G12, sigmaH, sigmaG12, covHG12, chi2dof, nobsv = photfit.fitHG12( 

99 df["dia_psfMag"], df["dia_psfMagErr"], df["phaseAngle"], df["topoRange"], df["helioRange"] 

100 ) 

101 nDof = nBandObs - 2 

102 # print(provID, band, H, G12, sigmaH, sigmaG12, covHG12, 

103 # chi2dof, nobsv) 

104 

105 # mark if the fit failed 

106 if np.isnan(G12): 

107 row[f"{band}_slope_fit_failed"] = True 

108 # FIXME: if fitting fails, we should revert to simple 

109 # estimation of H using a fiducial G12 value, storing 

110 # that G12 as well. 

111 

112 row[f"{band}_Chi2"] = chi2dof * nDof 

113 row[f"{band}_G12"] = G12 

114 row[f"{band}_G12Err"] = sigmaG12 

115 row[f"{band}_H"] = H 

116 row[f"{band}_H_{band}_G12_Cov"] = covHG12 

117 row[f"{band}_HErr"] = sigmaH 

118 row[f"{band}_nObsUsed"] = nobsv 

119 

120 # Extendedness 

121 row["extendednessMin"] = sss["dia_extendedness"].min() 

122 row["extendednessMax"] = sss["dia_extendedness"].max() 

123 row["extendednessMedian"] = sss["dia_extendedness"].median() 

124 

125 

126def compute_ssobject(sss, dia, mpcorb): 

127 """ 

128 Compute solar system object properties by joining and processing 

129 SSSource, DiaSource, and MPC orbit data. 

130 

131 This function takes a pre-grouped SSSource table, joins it with 

132 DiaSource data, computes per-object quantities, and calculates 

133 additional orbital parameters like Tisserand J and Minimum Orbit 

134 Intersection Distance (MOID) with Earth for matching objects. 

135 

136 Parameters 

137 ---------- 

138 sss : pandas.DataFrame 

139 SSSource table, pre-grouped by 'ssObjectId'. Must be sorted by 

140 'ssObjectId' for correct grouping. Contains columns like 

141 'ssObjectId', 'diaSourceId', etc. 

142 dia : pandas.DataFrame 

143 DiaSource table with columns prefixed as 'dia_' in the join. 

144 Must include 'dia_diaSourceId', 'dia_psfFlux', 'dia_psfFluxErr', 

145 etc. 

146 mpcorb : pandas.DataFrame 

147 MPC orbit data with columns like 

148 'unpacked_primary_provisional_designation', 'q', 'e', 'i', 

149 'node', 'argperi'. 

150 

151 Returns 

152 ------- 

153 numpy.ndarray 

154 Array of ssObject records with dtype schema.ssObjectDtype, 

155 containing computed properties for each unique ssObjectId, 

156 including magnitudes, orbital elements, Tisserand J, and 

157 MOID-related values. 

158 

159 Raises 

160 ------ 

161 AssertionError 

162 If 'sss' is not pre-grouped by 'ssObjectId', or if DiaSources 

163 are missing after join. 

164 

165 Notes 

166 ----- 

167 - The function assumes 'sss' is large and avoids internal 

168 sorting/copying for efficiency. 

169 - Tisserand J and MOID are computed only for objects matching 

170 designations in 'mpcorb'. 

171 - MOID computation uses a MOIDSolver for each matched object. 

172 """ 

173 

174 # assert that sss is pre-grouped by ssObjectId 

175 assert util.values_grouped(sss["ssObjectId"]), ( 

176 "SSSource table must be pre-grouped by ssObjectId. " 

177 "An easy way to do this is to sort by ssObjectId before calling compute_ssobject(). " 

178 "The grouping is required for correct per-object computations, and since SSSource is " 

179 "typically large and we want to avoid copies, it's not done internally." 

180 ) 

181 

182 # Join the DiaSource parts we're interested in to our SSSource table 

183 num = len(sss) 

184 dia_tmp = dia[DIA_COLUMNS].add_prefix("dia_") # FIXME: does this cause unnececessary copy? 

185 # FIXME: The diaSourceId should really be uint64. But Felis doesn't speak 

186 # uint64, but only knows about int64. Yet the pipeline produces uint64 

187 # diaSourceId in the dia_source dataset. So we have to cast here to int64 

188 # to make the join work (otherwise pyarrow tries to cast to float64, and 

189 # the whole thing gloriously explodes). 

190 dia_tmp["dia_diaSourceId"] = dia_tmp["dia_diaSourceId"].astype("int64[pyarrow]") 

191 sss = sss.merge(dia_tmp, left_on="diaSourceId", right_on="dia_diaSourceId", how="inner") 

192 assert num == len(sss), f"{num - len(sss)} DiaSources found missing." 

193 del sss["dia_diaSourceId"] 

194 del dia_tmp 

195 

196 # add magnitude columns 

197 sss["dia_psfMag"] = nJy_to_mag(sss["dia_psfFlux"]) 

198 sss["dia_psfMagErr"] = nJy_err_to_mag_err(sss["dia_psfFlux"], sss["dia_psfFluxErr"]) 

199 

200 # Pre-create the empty array 

201 totalNumObjects = np.unique(sss["ssObjectId"]).size 

202 obj = np.zeros(totalNumObjects, dtype=schema.SSObjectDtype) 

203 

204 # compute per-object quantities 

205 util.group_by([sss], "ssObjectId", compute_ssobject_entry, out=obj) 

206 

207 # 

208 # compute columns that can be efficiently computed in a vector fashon 

209 # 

210 # Tisserand J 

211 

212 if mpcorb is not None: 

213 # inner join by provisional designation. We allow for some objects to be 

214 # missing from mpcorb (this should not happen often, but it did in DP1). 

215 # FIXME: at some point require that no objects are missing. I _think_ that 

216 # shouldn't happen in normal operations. 

217 oidx, midx = util.argjoin( 

218 obj["designation"].astype("U"), 

219 mpcorb["unpacked_primary_provisional_designation"].to_numpy().astype("U"), 

220 ) 

221 assert np.all( 

222 mpcorb["unpacked_primary_provisional_designation"].take(midx) 

223 == obj["designation"][oidx].astype("U") 

224 ) 

225 q, e, i, node, argperi = util.unpack(mpcorb["q e i node argperi".split()].take(midx)) 

226 a = q / (1.0 - e) 

227 obj["tisserand_J"][oidx] = util.tisserand_jupiter(a, e, i) 

228 

229 # MOID computation 

230 solver = MOIDSolver() 

231 for i, el_obj in enumerate(zip(a, e, i, node, argperi)): 

232 (moid, deltaV, eclon, trueEarth, trueObject) = solver.compute(earth_orbit_J2000(), el_obj) 

233 row = obj[oidx[i]] 

234 row["MOIDEarth"] = moid 

235 row["MOIDEarthDeltaV"] = deltaV 

236 row["MOIDEarthEclipticLongitude"] = eclon 

237 row["MOIDEarthTrueAnomaly"] = trueEarth 

238 row["MOIDEarthTrueAnomalyObject"] = trueObject 

239 

240 return obj 

241 

242 

243def main(): 

244 """ 

245 CLI entry point for building SSObject table from SSSource, 

246 DiaSource, and MPC orbit data. 

247 """ 

248 parser = argparse.ArgumentParser( 

249 description="Build SSObject table from SSSource, DiaSource, and MPC orbit Parquet files", 

250 formatter_class=argparse.RawDescriptionHelpFormatter, 

251 epilog=""" 

252Examples: 

253 ssp-build-ssobject sssource.parquet dia_sources.parquet mpc_orbits.parquet --output ssobject.parquet 

254 """, 

255 ) 

256 

257 parser.add_argument("sssource_parquet", help="Path to SSSource Parquet file") 

258 parser.add_argument("diasource_parquet", help="Path to DiaSource Parquet file") 

259 parser.add_argument("mpcorb_parquet", help="Path to MPC orbits Parquet file") 

260 parser.add_argument("--output", "-o", required=True, help="Path to output SSObject Parquet file") 

261 parser.add_argument( 

262 "--reraise", 

263 action="store_true", 

264 help="Re-raise exceptions instead of exiting gracefully (for debugging)", 

265 ) 

266 

267 args = parser.parse_args() 

268 

269 try: 

270 # Load SSSource 

271 print(f"Loading SSSource from {args.sssource_parquet}...") 

272 sss = pd.read_parquet(args.sssource_parquet, engine="pyarrow", dtype_backend="pyarrow").reset_index( 

273 drop=True 

274 ) 

275 num = len(sss) 

276 print(f"Loaded {num:,} SSSource rows") 

277 

278 # Load DiaSource with required columns 

279 dia_columns = [ 

280 "diaSourceId", 

281 "midpointMjdTai", 

282 "ra", 

283 "dec", 

284 "extendedness", 

285 "band", 

286 "psfFlux", 

287 "psfFluxErr", 

288 ] 

289 print(f"Loading DiaSource from {args.diasource_parquet}...") 

290 dia = pd.read_parquet( 

291 args.diasource_parquet, engine="pyarrow", dtype_backend="pyarrow", columns=dia_columns 

292 ).reset_index(drop=True) 

293 print(f"Loaded {len(dia):,} DiaSource rows") 

294 

295 # Ensure diaSourceId is uint64 

296 assert np.all(dia["diaSourceId"] >= 0) 

297 dia["diaSourceId"] = dia["diaSourceId"].astype("uint64[pyarrow]") 

298 

299 # Load MPC orbits 

300 mpcorb_columns = [ 

301 "unpacked_primary_provisional_designation", 

302 "a", 

303 "q", 

304 "e", 

305 "i", 

306 "node", 

307 "argperi", 

308 "peri_time", 

309 "mean_anomaly", 

310 "epoch_mjd", 

311 "h", 

312 "g", 

313 ] 

314 print(f"Loading MPC orbits from {args.mpcorb_parquet}...") 

315 mpcorb = pd.read_parquet( 

316 args.mpcorb_parquet, engine="pyarrow", dtype_backend="pyarrow", columns=mpcorb_columns 

317 ).reset_index(drop=True) 

318 print(f"Loaded {len(mpcorb):,} MPC orbit rows") 

319 

320 # Compute SSObject 

321 print("Computing SSObject data...") 

322 obj = compute_ssobject(sss, dia, mpcorb) 

323 

324 # Save result 

325 print(f"Saving {len(obj):,} SSObject rows to {args.output}...") 

326 util.struct_to_parquet(obj, args.output) 

327 

328 print(f"Success! Created SSObject with {len(obj):,} objects") 

329 print(f"Row size: {obj.dtype.itemsize:,} bytes, Total size: {obj.nbytes:,} bytes") 

330 

331 except Exception as e: 

332 print(f"Error: {e}", file=sys.stderr) 

333 if args.reraise: 

334 raise 

335 sys.exit(1) 

336 

337 

338if __name__ == "__main__": 338 ↛ 339line 338 didn't jump to line 339 because the condition on line 338 was never true

339 input_dir = "./analysis/inputs" 

340 output_dir = "./analysis/outputs" 

341 

342 # 

343 # Loads 

344 # 

345 

346 # load SSObject 

347 sss = pd.read_parquet( 

348 f"{output_dir}/sssource.parquet", engine="pyarrow", dtype_backend="pyarrow" 

349 ).reset_index(drop=True) 

350 num = len(sss) 

351 

352 # load corresponding DiaSource 

353 dia = pd.read_parquet( 

354 f"{input_dir}/dia_sources.parquet", engine="pyarrow", dtype_backend="pyarrow", columns=DIA_COLUMNS 

355 ).reset_index(drop=True) 

356 

357 # FIXME: I'm not sure why the datatype is int and not uint here. 

358 # Investigate upstream... 

359 assert np.all(dia["diaSourceId"] >= 0) 

360 dia["diaSourceId"] = dia["diaSourceId"].astype("uint64[pyarrow]") 

361 

362 # Load mpcorb 

363 mpcorb = pd.read_parquet( 

364 f"{input_dir}/mpc_orbits.parquet", 

365 engine="pyarrow", 

366 dtype_backend="pyarrow", 

367 columns=[ 

368 "unpacked_primary_provisional_designation", 

369 "a", 

370 "q", 

371 "e", 

372 "i", 

373 "node", 

374 "argperi", 

375 "peri_time", 

376 "mean_anomaly", 

377 "epoch_mjd", 

378 "h", 

379 "g", 

380 ], 

381 ).reset_index(drop=True) 

382 

383 # 

384 # Business logic 

385 # 

386 obj = compute_ssobject(sss, dia, mpcorb) 

387 

388 # 

389 # Save 

390 # 

391 util.struct_to_parquet(obj, f"{output_dir}/ssobject.parquet") 

392 

393 print(f"row_length={obj.dtype.itemsize:,} bytes, rows={len(obj):,}, {obj.nbytes:,} bytes total")