Coverage for python / lsst / pipe / tasks / ssp / sssource.py: 0%

109 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-01 08:40 +0000

1from astropy.coordinates import ( 

2 get_body_barycentric_posvel, 

3 solar_system_ephemeris, 

4 SkyCoord, 

5 HeliocentricEclipticIAU76, 

6) 

7from astropy.time import Time 

8import astropy.units as u 

9from functools import partial 

10import numpy as np 

11import pandas as pd 

12 

13from . import util, schema 

14from .photfit import hg_V_mag, phase_angle_deg 

15from .ephem import _aux_compute_ephemerides 

16 

17 

18def compute_sssource_entry(sss, assoc, mpcorb, dia): 

19 

20 kms = u.km / u.s 

21 

22 # extract only the subset of observations related to this object 

23 dia = dia.iloc[assoc["dia_index"]] 

24 

25 # just verify we didn't screw up something 

26 assert np.all(dia["ssObjectId"] == dia["ssObjectId"].iloc[0]) 

27 assert np.all(sss["ssObjectId"] == sss["ssObjectId"][0]) 

28 assert len(dia) == len(sss) 

29 

30 provID = sss["designation"][0] 

31 ephTimes = Time(dia["midpointMjdTai"].values, format="mjd", scale="tai") 

32 eph, (H, G), xx, vv, obs, mu_lon, mu_lat, mu = _aux_compute_ephemerides(provID, ephTimes, mpcorb) 

33 

34 sss["ephRateRa"] = mu_lon.value 

35 sss["ephRateDec"] = mu_lat.value 

36 sss["ephRate"] = mu.value 

37 

38 # location/velocity of the Sun 

39 with solar_system_ephemeris.set("de440"): 

40 pos, vel = get_body_barycentric_posvel("sun", ephTimes) 

41 hx, hy, hz = pos.x.to(u.au).value, pos.z.to(u.au).value, pos.z.to(u.au).value 

42 hvx, hvy, hvz = vel.x.to(kms).value, vel.y.to(kms).value, vel.z.to(kms).value 

43 

44 # location/velocity of the observer 

45 robs, vobs = util.observatory_barycentric_posvel("X05", ephTimes) 

46 robs = robs.to(u.au).value 

47 vobs = vobs.to(u.km / u.s).value 

48 # r_obs_sun = np.sqrt((robs * robs).sum(axis=0)) 

49 

50 sss["ephRa"] = eph.ra.deg 

51 sss["ephDec"] = eph.dec.deg 

52 obsv = SkyCoord(ra=dia["ra"], dec=dia["dec"], unit="deg", frame="icrs") 

53 

54 sss["ephOffsetDec"] = (dia["dec"].to_numpy() - sss["ephDec"]) * 3600 

55 sss["ephOffsetRa"] = (dia["ra"].to_numpy() - sss["ephRa"]) * np.cos(np.deg2rad(sss["ephDec"])) * 3600 

56 sss["ephOffset"] = eph.separation(obsv).arcsec 

57 

58 # Compute heliocentric position components 

59 sss["helio_x"] = xx[0] - hx 

60 sss["helio_y"] = xx[1] - hy 

61 sss["helio_z"] = xx[2] - hz 

62 sss["helioRange"] = np.sqrt(sss["helio_x"] ** 2 + sss["helio_y"] ** 2 + sss["helio_z"] ** 2) 

63 

64 # Compute heliocentric velocity components 

65 sss["helio_vx"] = vv[0] - hvx 

66 sss["helio_vy"] = vv[1] - hvy 

67 sss["helio_vz"] = vv[2] - hvz 

68 sss["helio_vtot"] = np.sqrt(sss["helio_vx"] ** 2 + sss["helio_vy"] ** 2 + sss["helio_vz"] ** 2) 

69 

70 # Compute heliocentric radial velocity: dot product of velocity 

71 # and unit position vector 

72 sss["helioRangeRate"] = ( 

73 sss["helio_vx"] * sss["helio_x"] + sss["helio_vy"] * sss["helio_y"] + sss["helio_vz"] * sss["helio_z"] 

74 ) / sss["helioRange"] 

75 

76 # Compute topocentric position components 

77 sss["topo_x"] = xx[0] - obs[0] 

78 sss["topo_y"] = xx[1] - obs[1] 

79 sss["topo_z"] = xx[2] - obs[2] 

80 sss["topoRange"] = np.sqrt(sss["topo_x"] ** 2 + sss["topo_y"] ** 2 + sss["topo_z"] ** 2) 

81 

82 # Compute topocentric velocity components 

83 sss["topo_vx"] = vv[0] - vobs[0] 

84 sss["topo_vy"] = vv[1] - vobs[1] 

85 sss["topo_vz"] = vv[2] - vobs[2] 

86 sss["topo_vtot"] = np.sqrt(sss["topo_vx"] ** 2 + sss["topo_vy"] ** 2 + sss["topo_vz"] ** 2) 

87 

88 # Compute topocentric radial velocity: dot product of velocity 

89 # and unit position vector 

90 sss["topoRangeRate"] = ( 

91 sss["topo_vx"] * sss["topo_x"] + sss["topo_vy"] * sss["topo_y"] + sss["topo_vz"] * sss["topo_z"] 

92 ) / sss["topoRange"] 

93 

94 sss["phaseAngle"] = phase_deg = phase_angle_deg(xx, obs) 

95 

96 sss["ephVmag"] = hg_V_mag(H, G, sss["helioRange"], sss["topoRange"], phase_deg) 

97 

98 max_sep = np.max(sss["ephOffset"]) 

99 med_sep = np.median(sss["ephOffset"]) 

100 print(f"{provID}: max/median separation: {max_sep:.4f}, {med_sep:.4f} arcsec") 

101 

102 

103if __name__ == "__main__": 

104 input_dir = "./analysis/inputs" 

105 output_dir = "./analysis/outputs" 

106 

107 dia = pd.read_parquet( 

108 f"{input_dir}/dia_sources.parquet", engine="pyarrow", dtype_backend="pyarrow" 

109 ).reset_index(drop=True) 

110 # DEBUG: while debugging, remove some indices and resort the array 

111 dia = dia.sample(frac=0.9, random_state=42).reset_index(drop=True) 

112 

113 det = pd.read_parquet( 

114 f"{input_dir}/obs_sbn.parquet", engine="pyarrow", dtype_backend="pyarrow" 

115 ).reset_index() 

116 

117 # DEBUG: cut this down to a much smaller table 

118 sampled_provids = det["provid"].drop_duplicates().sample(10, random_state=42) 

119 det = det[det["provid"].isin(sampled_provids)].reset_index() 

120 print(len(det)) 

121 

122 # FIXME: this will have to check if the ID's are IAU-style 

123 # (with string prefixes) 

124 det["obssubid"] = det["obssubid"].astype(int) 

125 det = det[ 

126 [ 

127 "trksub", 

128 "obssubid", 

129 "provid", 

130 "permid", 

131 "submission_id", 

132 "ra", 

133 "dec", 

134 "obstime", 

135 "designation_asterisk", 

136 ] 

137 ].copy() 

138 

139 # verify types didn't get mangled somewhere along the way 

140 # from the database to here 

141 expect_dtypes = dict( 

142 trksub="string[pyarrow]", 

143 obssubid="int64", 

144 provid="string[pyarrow]", 

145 permid="string[pyarrow]", 

146 submission_id="string[pyarrow]", 

147 ra="double[pyarrow]", 

148 dec="double[pyarrow]", 

149 obstime="timestamp[us][pyarrow]", 

150 designation_asterisk="bool[pyarrow]", 

151 ) 

152 

153 for col in det.columns: 

154 assert det[col].dtype == expect_dtypes[col] 

155 

156 # create the association side table 

157 assoc = ( 

158 dia[["diaSourceId"]] 

159 .reset_index() 

160 .merge(det.add_prefix("mpc_"), left_on="diaSourceId", right_on="mpc_obssubid", how="inner") 

161 ) 

162 assoc.rename(columns={"index": "dia_index"}, inplace=True) 

163 

164 # verify all went well 

165 assert np.all(dia["diaSourceId"].iloc[assoc["dia_index"]].to_numpy() == assoc["diaSourceId"].to_numpy()) 

166 

167 # verify contents of the association table 

168 util.assoc_validate(dia, assoc) 

169 

170 totalNumObs = len(assoc) 

171 

172 numid = pd.read_parquet( 

173 f"{input_dir}/numbered_identifications.parquet", 

174 engine="pyarrow", 

175 columns=["permid", "unpacked_primary_provisional_designation"], 

176 dtype_backend="pyarrow", 

177 ).reset_index(drop=True) 

178 curid = pd.read_parquet( 

179 f"{input_dir}/current_identifications.parquet", 

180 engine="pyarrow", 

181 dtype_backend="pyarrow", 

182 columns=[ 

183 "unpacked_primary_provisional_designation", 

184 "unpacked_secondary_provisional_designation", 

185 "packed_primary_provisional_designation", 

186 ], 

187 ).reset_index(drop=True) 

188 

189 # First step: some numbered objects in `obs_sbn` don't have their 

190 # provID set. Restore it. 

191 df = assoc[["mpc_provid", "mpc_permid"]].merge(numid, left_on="mpc_permid", right_on="permid", how="left") 

192 assert len(df) == len(assoc) 

193 

194 assoc["mpc_provid"] = assoc["mpc_provid"].where( 

195 assoc["mpc_provid"].notna(), df["unpacked_primary_provisional_designation"] 

196 ) 

197 

198 assert not assoc["mpc_provid"].isna().any() 

199 assert len(assoc) == totalNumObs 

200 

201 # Second step: update provisional designations with the primary ones. 

202 

203 df = assoc[["mpc_provid"]].merge( 

204 curid, left_on="mpc_provid", right_on="unpacked_secondary_provisional_designation", how="inner" 

205 ) 

206 assoc["mpc_provid"] = df["unpacked_primary_provisional_designation"] 

207 assoc["mpc_packed"] = df["packed_primary_provisional_designation"] 

208 

209 assert len(assoc) == totalNumObs 

210 

211 # sort the association table by object 

212 assoc.sort_values(["mpc_provid"], inplace=True) 

213 

214 # create the output array for SSSource 

215 sss = np.zeros(totalNumObs, dtype=schema.SSSourceDtype) 

216 sss.dtype.itemsize, len(sss), f"{sss.nbytes:,}" 

217 

218 # 

219 # construct SSSource -- start with easily vectorizable columns 

220 # 

221 sss["diaSourceId"] = assoc["diaSourceId"].values 

222 sss["ssObjectId"] = util.packed_ascii_to_uint64_le(assoc["mpc_packed"]) 

223 sss["designation"] = assoc["mpc_provid"] 

224 

225 df = dia[["ra", "dec", "midpointMjdTai"]].iloc[assoc["dia_index"]] 

226 ra, dec, t = ( 

227 df["ra"].to_numpy(), 

228 df["dec"].to_numpy(), 

229 Time(df["midpointMjdTai"].to_numpy(), format="mjd", scale="tai"), 

230 ) 

231 

232 sss["elongation"] = util.solar_elongation_ndarray(ra, dec, t) 

233 

234 # FIXME: verify these coordinate transforms replicate IAU76 at JPL 

235 p = SkyCoord(ra=ra * u.deg, dec=dec * u.deg, distance=1 * u.au, frame="hcrs") 

236 ecl = p.transform_to(HeliocentricEclipticIAU76) 

237 p = SkyCoord(ra=ra * u.deg, dec=dec * u.deg, distance=1 * u.au, frame="icrs") 

238 gal = p.transform_to("galactic") 

239 

240 sss["eclLambda"] = ecl.lon 

241 sss["eclBeta"] = ecl.lat 

242 sss["galLon"] = gal.l 

243 sss["galLat"] = gal.b 

244 

245 mpcorb = pd.read_parquet( 

246 f"{input_dir}/mpc_orbits.parquet", 

247 engine="pyarrow", 

248 dtype_backend="pyarrow", 

249 columns=[ 

250 "unpacked_primary_provisional_designation", 

251 "packed_primary_provisional_designation", 

252 "a", 

253 "q", 

254 "e", 

255 "i", 

256 "node", 

257 "argperi", 

258 "peri_time", 

259 "mean_anomaly", 

260 "epoch_mjd", 

261 "h", 

262 "g", 

263 ], 

264 ).reset_index(drop=True) 

265 

266 util.group_by([sss, assoc], "ssObjectId", partial(compute_sssource_entry, mpcorb=mpcorb, dia=dia)) 

267 

268 totalNumObjects = np.unique(sss["ssObjectId"]).size 

269 print(f"{totalNumObjects:,} unique objects with {len(sss):,} total observations.") 

270 

271 util.struct_to_parquet(sss, f"{output_dir}/sssource.parquet")