Coverage for python / lsst / ap / association / mpSkyEphemerisQuery.py: 25%
109 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 09:18 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 09:18 +0000
1# This file is part of ap_association.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22"""Solar System Object Query to MPSky in place of a internal Rubin solar
23system object caching/retrieval code.
25Will compute the location for of known SSObjects within a visit-detector combination.
26"""
28__all__ = ["MPSkyEphemerisQueryConfig", "MPSkyEphemerisQueryTask"]
31import numpy as np
32import os
33import pyarrow as pa
34import requests
36from astropy.table import Table, MaskedColumn
37import astropy.units as u
39from lsst.ap.association.utils import getMidpointFromTimespan
40from lsst.geom import SpherePoint
41import lsst.pex.config as pexConfig
42from lsst.pipe.tasks.associationUtils import obj_id_to_ss_object_id
43from lsst.utils.timer import timeMethod
45from lsst.pipe.base import connectionTypes, NoWorkFound, PipelineTask, \
46 PipelineTaskConfig, PipelineTaskConnections, Struct
48ELEMENTS_LIST = "q,e,inc,node,argPeri,t_p_MJD_TDB,epochMJD_TDB".split(',')
49ELEMENTS_FORMAT = " ".join(["{:> 11f}"] * len(ELEMENTS_LIST))
50NAMING_MAP = {'inc': 'incl', 'argPeri': 'peri', 't_p_MJD_TDB': 't_p', 'epochMJD_TDB': 'epoch',
51 'q': 'q', 'e': 'e', 'node': 'node'}
53ELEMENTS_LIST_MPC_ORBITS = ['designation', 'id', 'packed_primary_provisional_designation',
54 'unpacked_primary_provisional_designation', 'mpc_orb_jsonb',
55 'created_at', 'updated_at', 'orbit_type_int', 'u_param', 'nopp',
56 'arc_length_total', 'arc_length_sel', 'nobs_total', 'nobs_total_sel',
57 'a', 'q', 'e', 'i', 'node', 'argperi', 'peri_time', 'yarkovsky', 'srp',
58 'a1', 'a2', 'a3', 'dt', 'mean_anomaly', 'period', 'mean_motion', 'a_unc',
59 'q_unc', 'e_unc', 'i_unc', 'node_unc', 'argperi_unc', 'peri_time_unc',
60 'yarkovsky_unc', 'srp_unc', 'a1_unc', 'a2_unc', 'a3_unc', 'dt_unc',
61 'mean_anomaly_unc', 'period_unc', 'mean_motion_unc', 'epoch_mjd', 'h', 'g',
62 'not_normalized_rms', 'normalized_rms', 'earth_moid', 'fitting_datetime']
64# Units for orbital element columns (MPC orbits schema)
65ELEMENTS_MPC_ORBITS_UNITS = {
66 'a': u.AU, 'q': u.AU, 'e': u.dimensionless_unscaled,
67 'i': u.deg, 'node': u.deg, 'argperi': u.deg,
68 'peri_time': u.d, 'mean_anomaly': u.deg,
69 'period': u.d, 'mean_motion': u.deg/u.d,
70 'a_unc': u.AU, 'q_unc': u.AU, 'e_unc': u.dimensionless_unscaled,
71 'i_unc': u.deg, 'node_unc': u.deg, 'argperi_unc': u.deg,
72 'peri_time_unc': u.d, 'mean_anomaly_unc': u.deg,
73 'period_unc': u.d, 'mean_motion_unc': u.deg/u.d,
74 'earth_moid': u.AU, 'epoch_mjd': u.d,
75 'arc_length_total': u.d, 'arc_length_sel': u.d,
76 'h': u.mag, 'not_normalized_rms': u.arcsec, 'normalized_rms': u.dimensionless_unscaled,
77}
79# Units for classic orbital elements schema
80ELEMENTS_UNITS = {
81 'q': u.AU, 'e': u.dimensionless_unscaled,
82 'incl': u.deg, 'node': u.deg, 'peri': u.deg,
83 't_p': u.d, 'epoch': u.d,
84}
87class MPSkyEphemerisQueryConnections(PipelineTaskConnections,
88 dimensions=("instrument",
89 "group", "detector")):
91 predictedRegionTime = connectionTypes.Input(
92 doc="The predicted exposure region and time",
93 name="regionTimeInfo",
94 storageClass="RegionTimeInfo",
95 dimensions={"instrument", "group", "detector"},
96 )
98 ssObjects = connectionTypes.Output(
99 doc="MPSky-provided Solar System objects observable in this detector-visit",
100 name="preloaded_SsObjects",
101 storageClass="ArrowAstropy",
102 dimensions=("instrument", "group", "detector"),
103 )
106class MPSkyEphemerisQueryConfig(
107 PipelineTaskConfig,
108 pipelineConnections=MPSkyEphemerisQueryConnections):
109 observerCode = pexConfig.Field(
110 dtype=str,
111 doc="IAU Minor Planet Center observer code for queries "
112 "(default is X05 for Rubin Obs./LSST)",
113 default='X05'
114 )
115 queryBufferRadiusDegrees = pexConfig.Field(
116 dtype=float,
117 doc="Buffer radius in degrees added to detector bounding circle for ephemeris "
118 "cone search. Defaults to 10 deg/day * 30 minutes",
119 default=0.208
120 )
121 mpSkyRequestTimeoutSeconds = pexConfig.Field(
122 dtype=float,
123 doc="Time in seconds to wait for mpSky request before failing ",
124 default=10.0
125 )
126 mpSkyFallbackURL = pexConfig.Field(
127 dtype=str,
128 doc="mpSky default URL if MP_SKY_URL environment variable unset",
129 default="http://sdfiana014.sdf.slac.stanford.edu:3666/ephemerides/"
130 )
131 mpcorb_prefix = pexConfig.Field(
132 dtype=str,
133 doc="Column prefix to indicate MPCORB columns for downstream tasks.",
134 default="MPCORB_"
135 )
138class MPSkyEphemerisQueryTask(PipelineTask):
139 """Task to query the MPSky service and retrieve the solar system objects
140 that are observable within the input visit.
141 """
142 ConfigClass = MPSkyEphemerisQueryConfig
143 _DefaultName = "mpSkyEphemerisQuery"
145 @timeMethod
146 def run(self, predictedRegionTime):
147 """Parse the information on the current visit and retrieve the
148 observable solar system objects from MPSky.
150 Parameters
151 ----------
152 predictedRegionTime : `lsst.pipe.base.utils.RegionTimeInfo`
153 Predicted footprint and timespan of the exposure
155 Returns
156 -------
157 result : `lsst.pipe.base.Struct`
158 Results struct with components:
160 - ``ssObjects`` : `astropy.table.Table`
161 Table containing Solar System Objects near the detector
162 footprint as retrieved by MPSky. The columns are as follows:
164 ``Name``
165 object name (`str`)
166 ``ra``
167 RA in decimal degrees (`float`, degrees)
168 ``dec``
169 DEC in decimal degrees (`float`, degrees)
170 ``obj_X_poly``, ``obj_Y_poly``, ``obj_Z_poly``
171 Chebyshev coefficients for object path
172 ``obs_X_poly``, ``obs_Y_poly``, ``obs_Z_poly``
173 Chebyshev coefficients for observer path
174 ``t_min``
175 Lower time bound for polynomials
176 ``t_max``
177 Upper time bound for polynomials
178 """
179 # Get detector center and timespan midpoint from predictedRegionTime.
180 region = predictedRegionTime.region
181 timespan = predictedRegionTime.timespan
182 expCenter = SpherePoint(region.getBoundingCircle().getCenter())
183 expRadius = region.getBoundingCircle().getOpeningAngle().asDegrees()
184 expMidPointEPOCH = getMidpointFromTimespan(timespan, allowUnbounded=False).mjd
186 # MPSky service query
187 try:
188 mpSkyURL = os.environ['MP_SKY_URL']
189 except KeyError:
190 self.log.warning("MP_SKY_URL is not defined. Falling back to %s.", self.config.mpSkyFallbackURL)
191 mpSkyURL = self.config.mpSkyFallbackURL
192 mpSkySsObjects, elements = self._mpSkyConeSearch(expCenter, expMidPointEPOCH,
193 expRadius + self.config.queryBufferRadiusDegrees,
194 mpSkyURL)
195 if elements is not None:
196 if 'mpc_orb_jsonb' in elements.colnames:
197 for element in ELEMENTS_LIST_MPC_ORBITS:
198 mpSkySsObjects[self.config.mpcorb_prefix + element] = elements[element]
199 else:
200 for element in ELEMENTS_LIST:
201 mpSkySsObjects[self.config.mpcorb_prefix + NAMING_MAP[element]] = elements[element]
203 # Fill masked values with NaN for float columns (equivalent to DataFrame.fillna)
204 for colname in mpSkySsObjects.colnames:
205 col = mpSkySsObjects[colname]
206 if hasattr(col, 'filled') and col.dtype.kind == 'f':
207 mpSkySsObjects[colname] = col.filled(np.nan)
209 return Struct(
210 ssObjects=mpSkySsObjects,
211 )
213 def _mpSkyConeSearch(self, expCenter, epochMJD, queryRadius, mpSkyURL):
214 """Query MPSky ephemeris service for objects near the expected detector position
216 Parameters
217 ----------
218 expCenter : `lsst.geom.SpherePoint`
219 Center of search cone
220 epochMJD : `float`
221 Epoch of cone search, (MJD in UTC).
222 queryRadius : `float`
223 Radius of the cone search in degrees.
224 mpSkyURL : `str`
225 URL to query for MPSky.
227 Returns
228 -------
229 mpSkySsObjects : `astropy.table.Table`
230 Table with Solar System Object information and RA/DEC position
231 within the visit.
232 elements : `astropy.table.Table` or `None`
233 Orbital elements of ssObjects (to pass to alerts)
234 """
235 fieldRA = expCenter.getRa().asDegrees()
236 fieldDec = expCenter.getDec().asDegrees()
238 params = {
239 "t": epochMJD,
240 "ra": fieldRA,
241 "dec": fieldDec,
242 "radius": queryRadius,
243 "return_elements": "extended"
244 }
246 response = None
247 try:
248 response = requests.get(mpSkyURL, params=params, timeout=self.config.mpSkyRequestTimeoutSeconds)
249 response.raise_for_status()
250 with pa.input_stream(memoryview(response.content)) as fp:
251 fp.seek(0)
252 p = pa.ipc.read_tensor(fp)
253 op = pa.ipc.read_tensor(fp)
254 with pa.ipc.open_stream(fp) as reader:
255 schema = reader.schema
256 r = next(reader)
258 # Construct an elements astropy Table
259 if "q" in schema.names:
260 if "mpc_orb_jsonb" in schema.names:
261 colnames = ELEMENTS_LIST_MPC_ORBITS
262 units_map = ELEMENTS_MPC_ORBITS_UNITS
263 else:
264 colnames = ELEMENTS_LIST
265 units_map = {NAMING_MAP[k]: v for k, v in ELEMENTS_UNITS.items()}
267 # Columns that are timestamps/datetimes — store as masked strings
268 DATETIME_COLS = {'created_at', 'fitting_datetime', 'updated_at'}
269 # Columns that should be stored as plain strings (object dtype in arrow)
270 STRING_COLS = {'designation', 'id', 'packed_primary_provisional_designation',
271 'unpacked_primary_provisional_designation', 'mpc_orb_jsonb'}
273 elements = Table()
274 for col in colnames:
275 data = r[col].to_numpy(zero_copy_only=False)
276 if col in DATETIME_COLS:
277 # Store as masked empty strings — TODO (DM-53952): handle as datetime
278 elements[col] = MaskedColumn(
279 np.full(len(data), "", dtype=str), mask=np.ones(len(data), dtype=bool)
280 )
281 elif col in STRING_COLS:
282 # Ensure clean string dtype, not object
283 elements[col] = np.array(data, dtype=str)
284 else:
285 unit = units_map.get(col, None)
286 if unit is None:
287 elements[col] = MaskedColumn(data)
288 else:
289 elements[col] = MaskedColumn(data, unit=unit)
290 else:
291 elements = None
293 object_polynomial = p.to_numpy()
294 observer_polynomial = op.to_numpy()
296 n = len(r["name"])
297 vmag_data = r["Vmag"].to_numpy() if 'Vmag' in r else np.full(n, np.nan)
299 mpSkySsObjects = Table({
300 'ObjID': r["name"].to_numpy(zero_copy_only=False).astype(str),
301 'ra': MaskedColumn(r["ra"].to_numpy(), unit=u.deg),
302 'dec': MaskedColumn(r["dec"].to_numpy(), unit=u.deg),
303 'tmin': MaskedColumn(r["tmin"].to_numpy(), unit=u.d),
304 'tmax': MaskedColumn(r["tmax"].to_numpy(), unit=u.d),
305 'trailedSourceMagTrue': MaskedColumn(vmag_data, unit=u.mag),
306 'obj_x_poly': [poly[0] for poly in object_polynomial.T],
307 'obj_y_poly': [poly[1] for poly in object_polynomial.T],
308 'obj_z_poly': [poly[2] for poly in object_polynomial.T],
309 'obs_x_poly': [observer_polynomial.T[0] for _ in range(n)],
310 'obs_y_poly': [observer_polynomial.T[1] for _ in range(n)],
311 'obs_z_poly': [observer_polynomial.T[2] for _ in range(n)],
312 'Err(arcsec)': MaskedColumn(np.full(n, 2.0), unit=u.arcsec),
313 })
315 if 'packed_primary_provisional_designation' in elements.colnames:
316 desigs = elements['packed_primary_provisional_designation'].data
317 else:
318 desigs = mpSkySsObjects['ObjID'].data
319 mpSkySsObjects['ssObjectId'] = [obj_id_to_ss_object_id(v) for v in desigs]
321 nFound = len(mpSkySsObjects)
323 if nFound == 0:
324 self.log.info("No Solar System objects found for visit.")
325 else:
326 self.log.info("%d Solar System Objects in visit", nFound)
327 except requests.RequestException as e:
328 body = getattr(response, "text", "<No response>")
329 message = f"Query to the remote ephemerides service failed. Got response {body}"
330 self.log.error(message)
331 raise NoWorkFound(f"{message}: {e}") from e
333 return mpSkySsObjects, elements