lsst.pipe.tasks gcf00bf066d+02752e367a
Loading...
Searching...
No Matches
sssource.py
Go to the documentation of this file.
1from astropy.coordinates import (
2 get_body_barycentric_posvel,
3 solar_system_ephemeris,
4 SkyCoord,
5 HeliocentricEclipticIAU76,
6)
7from astropy.time import Time
8import astropy.units as u
9from functools import partial
10import numpy as np
11import pandas as pd
12
13from . import util, schema
14from .photfit import hg_V_mag, phase_angle_deg
15from .ephem import _aux_compute_ephemerides
16
17
18def compute_sssource_entry(sss, assoc, mpcorb, dia):
19
20 kms = u.km / u.s
21
22 # extract only the subset of observations related to this object
23 dia = dia.iloc[assoc["dia_index"]]
24
25 # just verify we didn't screw up something
26 assert np.all(dia["ssObjectId"] == dia["ssObjectId"].iloc[0])
27 assert np.all(sss["ssObjectId"] == sss["ssObjectId"][0])
28 assert len(dia) == len(sss)
29
30 provID = sss["designation"][0]
31 ephTimes = Time(dia["midpointMjdTai"].values, format="mjd", scale="tai")
32 eph, (H, G), xx, vv, obs, mu_lon, mu_lat, mu = _aux_compute_ephemerides(provID, ephTimes, mpcorb)
33
34 sss["ephRateRa"] = mu_lon.value
35 sss["ephRateDec"] = mu_lat.value
36 sss["ephRate"] = mu.value
37
38 # location/velocity of the Sun
39 with solar_system_ephemeris.set("de440"):
40 pos, vel = get_body_barycentric_posvel("sun", ephTimes)
41 hx, hy, hz = pos.x.to(u.au).value, pos.z.to(u.au).value, pos.z.to(u.au).value
42 hvx, hvy, hvz = vel.x.to(kms).value, vel.y.to(kms).value, vel.z.to(kms).value
43
44 # location/velocity of the observer
45 robs, vobs = util.observatory_barycentric_posvel("X05", ephTimes)
46 robs = robs.to(u.au).value
47 vobs = vobs.to(u.km / u.s).value
48 # r_obs_sun = np.sqrt((robs * robs).sum(axis=0))
49
50 sss["ephRa"] = eph.ra.deg
51 sss["ephDec"] = eph.dec.deg
52 obsv = SkyCoord(ra=dia["ra"], dec=dia["dec"], unit="deg", frame="icrs")
53
54 sss["ephOffsetDec"] = (dia["dec"].to_numpy() - sss["ephDec"]) * 3600
55 sss["ephOffsetRa"] = (dia["ra"].to_numpy() - sss["ephRa"]) * np.cos(np.deg2rad(sss["ephDec"])) * 3600
56 sss["ephOffset"] = eph.separation(obsv).arcsec
57
58 # Compute heliocentric position components
59 sss["helio_x"] = xx[0] - hx
60 sss["helio_y"] = xx[1] - hy
61 sss["helio_z"] = xx[2] - hz
62 sss["helioRange"] = np.sqrt(sss["helio_x"] ** 2 + sss["helio_y"] ** 2 + sss["helio_z"] ** 2)
63
64 # Compute heliocentric velocity components
65 sss["helio_vx"] = vv[0] - hvx
66 sss["helio_vy"] = vv[1] - hvy
67 sss["helio_vz"] = vv[2] - hvz
68 sss["helio_vtot"] = np.sqrt(sss["helio_vx"] ** 2 + sss["helio_vy"] ** 2 + sss["helio_vz"] ** 2)
69
70 # Compute heliocentric radial velocity: dot product of velocity
71 # and unit position vector
72 sss["helioRangeRate"] = (
73 sss["helio_vx"] * sss["helio_x"] + sss["helio_vy"] * sss["helio_y"] + sss["helio_vz"] * sss["helio_z"]
74 ) / sss["helioRange"]
75
76 # Compute topocentric position components
77 sss["topo_x"] = xx[0] - obs[0]
78 sss["topo_y"] = xx[1] - obs[1]
79 sss["topo_z"] = xx[2] - obs[2]
80 sss["topoRange"] = np.sqrt(sss["topo_x"] ** 2 + sss["topo_y"] ** 2 + sss["topo_z"] ** 2)
81
82 # Compute topocentric velocity components
83 sss["topo_vx"] = vv[0] - vobs[0]
84 sss["topo_vy"] = vv[1] - vobs[1]
85 sss["topo_vz"] = vv[2] - vobs[2]
86 sss["topo_vtot"] = np.sqrt(sss["topo_vx"] ** 2 + sss["topo_vy"] ** 2 + sss["topo_vz"] ** 2)
87
88 # Compute topocentric radial velocity: dot product of velocity
89 # and unit position vector
90 sss["topoRangeRate"] = (
91 sss["topo_vx"] * sss["topo_x"] + sss["topo_vy"] * sss["topo_y"] + sss["topo_vz"] * sss["topo_z"]
92 ) / sss["topoRange"]
93
94 sss["phaseAngle"] = phase_deg = phase_angle_deg(xx, obs)
95
96 sss["ephVmag"] = hg_V_mag(H, G, sss["helioRange"], sss["topoRange"], phase_deg)
97
98 max_sep = np.max(sss["ephOffset"])
99 med_sep = np.median(sss["ephOffset"])
100 print(f"{provID}: max/median separation: {max_sep:.4f}, {med_sep:.4f} arcsec")
101
102
103if __name__ == "__main__":
104 input_dir = "./analysis/inputs"
105 output_dir = "./analysis/outputs"
106
107 dia = pd.read_parquet(
108 f"{input_dir}/dia_sources.parquet", engine="pyarrow", dtype_backend="pyarrow"
109 ).reset_index(drop=True)
110 # DEBUG: while debugging, remove some indices and resort the array
111 dia = dia.sample(frac=0.9, random_state=42).reset_index(drop=True)
112
113 det = pd.read_parquet(
114 f"{input_dir}/obs_sbn.parquet", engine="pyarrow", dtype_backend="pyarrow"
115 ).reset_index()
116
117 # DEBUG: cut this down to a much smaller table
118 sampled_provids = det["provid"].drop_duplicates().sample(10, random_state=42)
119 det = det[det["provid"].isin(sampled_provids)].reset_index()
120 print(len(det))
121
122 # FIXME: this will have to check if the ID's are IAU-style
123 # (with string prefixes)
124 det["obssubid"] = det["obssubid"].astype(int)
125 det = det[
126 [
127 "trksub",
128 "obssubid",
129 "provid",
130 "permid",
131 "submission_id",
132 "ra",
133 "dec",
134 "obstime",
135 "designation_asterisk",
136 ]
137 ].copy()
138
139 # verify types didn't get mangled somewhere along the way
140 # from the database to here
141 expect_dtypes = dict(
142 trksub="string[pyarrow]",
143 obssubid="int64",
144 provid="string[pyarrow]",
145 permid="string[pyarrow]",
146 submission_id="string[pyarrow]",
147 ra="double[pyarrow]",
148 dec="double[pyarrow]",
149 obstime="timestamp[us][pyarrow]",
150 designation_asterisk="bool[pyarrow]",
151 )
152
153 for col in det.columns:
154 assert det[col].dtype == expect_dtypes[col]
155
156 # create the association side table
157 assoc = (
158 dia[["diaSourceId"]]
159 .reset_index()
160 .merge(det.add_prefix("mpc_"), left_on="diaSourceId", right_on="mpc_obssubid", how="inner")
161 )
162 assoc.rename(columns={"index": "dia_index"}, inplace=True)
163
164 # verify all went well
165 assert np.all(dia["diaSourceId"].iloc[assoc["dia_index"]].to_numpy() == assoc["diaSourceId"].to_numpy())
166
167 # verify contents of the association table
168 util.assoc_validate(dia, assoc)
169
170 totalNumObs = len(assoc)
171
172 numid = pd.read_parquet(
173 f"{input_dir}/numbered_identifications.parquet",
174 engine="pyarrow",
175 columns=["permid", "unpacked_primary_provisional_designation"],
176 dtype_backend="pyarrow",
177 ).reset_index(drop=True)
178 curid = pd.read_parquet(
179 f"{input_dir}/current_identifications.parquet",
180 engine="pyarrow",
181 dtype_backend="pyarrow",
182 columns=[
183 "unpacked_primary_provisional_designation",
184 "unpacked_secondary_provisional_designation",
185 "packed_primary_provisional_designation",
186 ],
187 ).reset_index(drop=True)
188
189 # First step: some numbered objects in `obs_sbn` don't have their
190 # provID set. Restore it.
191 df = assoc[["mpc_provid", "mpc_permid"]].merge(numid, left_on="mpc_permid", right_on="permid", how="left")
192 assert len(df) == len(assoc)
193
194 assoc["mpc_provid"] = assoc["mpc_provid"].where(
195 assoc["mpc_provid"].notna(), df["unpacked_primary_provisional_designation"]
196 )
197
198 assert not assoc["mpc_provid"].isna().any()
199 assert len(assoc) == totalNumObs
200
201 # Second step: update provisional designations with the primary ones.
202
203 df = assoc[["mpc_provid"]].merge(
204 curid, left_on="mpc_provid", right_on="unpacked_secondary_provisional_designation", how="inner"
205 )
206 assoc["mpc_provid"] = df["unpacked_primary_provisional_designation"]
207 assoc["mpc_packed"] = df["packed_primary_provisional_designation"]
208
209 assert len(assoc) == totalNumObs
210
211 # sort the association table by object
212 assoc.sort_values(["mpc_provid"], inplace=True)
213
214 # create the output array for SSSource
215 sss = np.zeros(totalNumObs, dtype=schema.SSSourceDtype)
216 sss.dtype.itemsize, len(sss), f"{sss.nbytes:,}"
217
218 #
219 # construct SSSource -- start with easily vectorizable columns
220 #
221 sss["diaSourceId"] = assoc["diaSourceId"].values
222 sss["ssObjectId"] = util.packed_ascii_to_uint64_le(assoc["mpc_packed"])
223 sss["designation"] = assoc["mpc_provid"]
224
225 df = dia[["ra", "dec", "midpointMjdTai"]].iloc[assoc["dia_index"]]
226 ra, dec, t = (
227 df["ra"].to_numpy(),
228 df["dec"].to_numpy(),
229 Time(df["midpointMjdTai"].to_numpy(), format="mjd", scale="tai"),
230 )
231
232 sss["elongation"] = util.solar_elongation_ndarray(ra, dec, t)
233
234 # FIXME: verify these coordinate transforms replicate IAU76 at JPL
235 p = SkyCoord(ra=ra * u.deg, dec=dec * u.deg, distance=1 * u.au, frame="hcrs")
236 ecl = p.transform_to(HeliocentricEclipticIAU76)
237 p = SkyCoord(ra=ra * u.deg, dec=dec * u.deg, distance=1 * u.au, frame="icrs")
238 gal = p.transform_to("galactic")
239
240 sss["eclLambda"] = ecl.lon
241 sss["eclBeta"] = ecl.lat
242 sss["galLon"] = gal.l
243 sss["galLat"] = gal.b
244
245 mpcorb = pd.read_parquet(
246 f"{input_dir}/mpc_orbits.parquet",
247 engine="pyarrow",
248 dtype_backend="pyarrow",
249 columns=[
250 "unpacked_primary_provisional_designation",
251 "packed_primary_provisional_designation",
252 "a",
253 "q",
254 "e",
255 "i",
256 "node",
257 "argperi",
258 "peri_time",
259 "mean_anomaly",
260 "epoch_mjd",
261 "h",
262 "g",
263 ],
264 ).reset_index(drop=True)
265
266 util.group_by([sss, assoc], "ssObjectId", partial(compute_sssource_entry, mpcorb=mpcorb, dia=dia))
267
268 totalNumObjects = np.unique(sss["ssObjectId"]).size
269 print(f"{totalNumObjects:,} unique objects with {len(sss):,} total observations.")
270
271 util.struct_to_parquet(sss, f"{output_dir}/sssource.parquet")
compute_sssource_entry(sss, assoc, mpcorb, dia)
Definition sssource.py:18