Coverage for python / lsst / ap / association / mpSkyEphemerisQuery.py: 25%

109 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-28 09:06 +0000

1# This file is part of ap_association. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22"""Solar System Object Query to MPSky in place of a internal Rubin solar 

23system object caching/retrieval code. 

24 

25Will compute the location for of known SSObjects within a visit-detector combination. 

26""" 

27 

28__all__ = ["MPSkyEphemerisQueryConfig", "MPSkyEphemerisQueryTask"] 

29 

30 

31import numpy as np 

32import os 

33import pyarrow as pa 

34import requests 

35 

36from astropy.table import Table, MaskedColumn 

37import astropy.units as u 

38 

39from lsst.ap.association.utils import getMidpointFromTimespan 

40from lsst.geom import SpherePoint 

41import lsst.pex.config as pexConfig 

42from lsst.pipe.tasks.associationUtils import obj_id_to_ss_object_id 

43from lsst.utils.timer import timeMethod 

44 

45from lsst.pipe.base import connectionTypes, NoWorkFound, PipelineTask, \ 

46 PipelineTaskConfig, PipelineTaskConnections, Struct 

47 

48ELEMENTS_LIST = "q,e,inc,node,argPeri,t_p_MJD_TDB,epochMJD_TDB".split(',') 

49ELEMENTS_FORMAT = " ".join(["{:> 11f}"] * len(ELEMENTS_LIST)) 

50NAMING_MAP = {'inc': 'incl', 'argPeri': 'peri', 't_p_MJD_TDB': 't_p', 'epochMJD_TDB': 'epoch', 

51 'q': 'q', 'e': 'e', 'node': 'node'} 

52 

53ELEMENTS_LIST_MPC_ORBITS = ['designation', 'id', 'packed_primary_provisional_designation', 

54 'unpacked_primary_provisional_designation', 'mpc_orb_jsonb', 

55 'created_at', 'updated_at', 'orbit_type_int', 'u_param', 'nopp', 

56 'arc_length_total', 'arc_length_sel', 'nobs_total', 'nobs_total_sel', 

57 'a', 'q', 'e', 'i', 'node', 'argperi', 'peri_time', 'yarkovsky', 'srp', 

58 'a1', 'a2', 'a3', 'dt', 'mean_anomaly', 'period', 'mean_motion', 'a_unc', 

59 'q_unc', 'e_unc', 'i_unc', 'node_unc', 'argperi_unc', 'peri_time_unc', 

60 'yarkovsky_unc', 'srp_unc', 'a1_unc', 'a2_unc', 'a3_unc', 'dt_unc', 

61 'mean_anomaly_unc', 'period_unc', 'mean_motion_unc', 'epoch_mjd', 'h', 'g', 

62 'not_normalized_rms', 'normalized_rms', 'earth_moid', 'fitting_datetime'] 

63 

64# Units for orbital element columns (MPC orbits schema) 

65ELEMENTS_MPC_ORBITS_UNITS = { 

66 'a': u.AU, 'q': u.AU, 'e': u.dimensionless_unscaled, 

67 'i': u.deg, 'node': u.deg, 'argperi': u.deg, 

68 'peri_time': u.d, 'mean_anomaly': u.deg, 

69 'period': u.d, 'mean_motion': u.deg/u.d, 

70 'a_unc': u.AU, 'q_unc': u.AU, 'e_unc': u.dimensionless_unscaled, 

71 'i_unc': u.deg, 'node_unc': u.deg, 'argperi_unc': u.deg, 

72 'peri_time_unc': u.d, 'mean_anomaly_unc': u.deg, 

73 'period_unc': u.d, 'mean_motion_unc': u.deg/u.d, 

74 'earth_moid': u.AU, 'epoch_mjd': u.d, 

75 'arc_length_total': u.d, 'arc_length_sel': u.d, 

76 'h': u.mag, 'not_normalized_rms': u.arcsec, 'normalized_rms': u.dimensionless_unscaled, 

77} 

78 

79# Units for classic orbital elements schema 

80ELEMENTS_UNITS = { 

81 'q': u.AU, 'e': u.dimensionless_unscaled, 

82 'incl': u.deg, 'node': u.deg, 'peri': u.deg, 

83 't_p': u.d, 'epoch': u.d, 

84} 

85 

86 

87class MPSkyEphemerisQueryConnections(PipelineTaskConnections, 

88 dimensions=("instrument", 

89 "group", "detector")): 

90 

91 predictedRegionTime = connectionTypes.Input( 

92 doc="The predicted exposure region and time", 

93 name="regionTimeInfo", 

94 storageClass="RegionTimeInfo", 

95 dimensions={"instrument", "group", "detector"}, 

96 ) 

97 

98 ssObjects = connectionTypes.Output( 

99 doc="MPSky-provided Solar System objects observable in this detector-visit", 

100 name="preloaded_SsObjects", 

101 storageClass="ArrowAstropy", 

102 dimensions=("instrument", "group", "detector"), 

103 ) 

104 

105 

106class MPSkyEphemerisQueryConfig( 

107 PipelineTaskConfig, 

108 pipelineConnections=MPSkyEphemerisQueryConnections): 

109 observerCode = pexConfig.Field( 

110 dtype=str, 

111 doc="IAU Minor Planet Center observer code for queries " 

112 "(default is X05 for Rubin Obs./LSST)", 

113 default='X05' 

114 ) 

115 queryBufferRadiusDegrees = pexConfig.Field( 

116 dtype=float, 

117 doc="Buffer radius in degrees added to detector bounding circle for ephemeris " 

118 "cone search. Defaults to 10 deg/day * 30 minutes", 

119 default=0.208 

120 ) 

121 mpSkyRequestTimeoutSeconds = pexConfig.Field( 

122 dtype=float, 

123 doc="Time in seconds to wait for mpSky request before failing ", 

124 default=10.0 

125 ) 

126 mpSkyFallbackURL = pexConfig.Field( 

127 dtype=str, 

128 doc="mpSky default URL if MP_SKY_URL environment variable unset", 

129 default="http://sdfiana014.sdf.slac.stanford.edu:3666/ephemerides/" 

130 ) 

131 mpcorb_prefix = pexConfig.Field( 

132 dtype=str, 

133 doc="Column prefix to indicate MPCORB columns for downstream tasks.", 

134 default="MPCORB_" 

135 ) 

136 

137 

138class MPSkyEphemerisQueryTask(PipelineTask): 

139 """Task to query the MPSky service and retrieve the solar system objects 

140 that are observable within the input visit. 

141 """ 

142 ConfigClass = MPSkyEphemerisQueryConfig 

143 _DefaultName = "mpSkyEphemerisQuery" 

144 

145 @timeMethod 

146 def run(self, predictedRegionTime): 

147 """Parse the information on the current visit and retrieve the 

148 observable solar system objects from MPSky. 

149 

150 Parameters 

151 ---------- 

152 predictedRegionTime : `lsst.pipe.base.utils.RegionTimeInfo` 

153 Predicted footprint and timespan of the exposure 

154 

155 Returns 

156 ------- 

157 result : `lsst.pipe.base.Struct` 

158 Results struct with components: 

159 

160 - ``ssObjects`` : `astropy.table.Table` 

161 Table containing Solar System Objects near the detector 

162 footprint as retrieved by MPSky. The columns are as follows: 

163 

164 ``Name`` 

165 object name (`str`) 

166 ``ra`` 

167 RA in decimal degrees (`float`, degrees) 

168 ``dec`` 

169 DEC in decimal degrees (`float`, degrees) 

170 ``obj_X_poly``, ``obj_Y_poly``, ``obj_Z_poly`` 

171 Chebyshev coefficients for object path 

172 ``obs_X_poly``, ``obs_Y_poly``, ``obs_Z_poly`` 

173 Chebyshev coefficients for observer path 

174 ``t_min`` 

175 Lower time bound for polynomials 

176 ``t_max`` 

177 Upper time bound for polynomials 

178 """ 

179 # Get detector center and timespan midpoint from predictedRegionTime. 

180 region = predictedRegionTime.region 

181 timespan = predictedRegionTime.timespan 

182 expCenter = SpherePoint(region.getBoundingCircle().getCenter()) 

183 expRadius = region.getBoundingCircle().getOpeningAngle().asDegrees() 

184 expMidPointEPOCH = getMidpointFromTimespan(timespan, allowUnbounded=False).mjd 

185 

186 # MPSky service query 

187 try: 

188 mpSkyURL = os.environ['MP_SKY_URL'] 

189 except KeyError: 

190 self.log.warning("MP_SKY_URL is not defined. Falling back to %s.", self.config.mpSkyFallbackURL) 

191 mpSkyURL = self.config.mpSkyFallbackURL 

192 mpSkySsObjects, elements = self._mpSkyConeSearch(expCenter, expMidPointEPOCH, 

193 expRadius + self.config.queryBufferRadiusDegrees, 

194 mpSkyURL) 

195 if elements is not None: 

196 if 'mpc_orb_jsonb' in elements.colnames: 

197 for element in ELEMENTS_LIST_MPC_ORBITS: 

198 mpSkySsObjects[self.config.mpcorb_prefix + element] = elements[element] 

199 else: 

200 for element in ELEMENTS_LIST: 

201 mpSkySsObjects[self.config.mpcorb_prefix + NAMING_MAP[element]] = elements[element] 

202 

203 # Fill masked values with NaN for float columns (equivalent to DataFrame.fillna) 

204 for colname in mpSkySsObjects.colnames: 

205 col = mpSkySsObjects[colname] 

206 if hasattr(col, 'filled') and col.dtype.kind == 'f': 

207 mpSkySsObjects[colname] = col.filled(np.nan) 

208 

209 return Struct( 

210 ssObjects=mpSkySsObjects, 

211 ) 

212 

213 def _mpSkyConeSearch(self, expCenter, epochMJD, queryRadius, mpSkyURL): 

214 """Query MPSky ephemeris service for objects near the expected detector position 

215 

216 Parameters 

217 ---------- 

218 expCenter : `lsst.geom.SpherePoint` 

219 Center of search cone 

220 epochMJD : `float` 

221 Epoch of cone search, (MJD in UTC). 

222 queryRadius : `float` 

223 Radius of the cone search in degrees. 

224 mpSkyURL : `str` 

225 URL to query for MPSky. 

226 

227 Returns 

228 ------- 

229 mpSkySsObjects : `astropy.table.Table` 

230 Table with Solar System Object information and RA/DEC position 

231 within the visit. 

232 elements : `astropy.table.Table` or `None` 

233 Orbital elements of ssObjects (to pass to alerts) 

234 """ 

235 fieldRA = expCenter.getRa().asDegrees() 

236 fieldDec = expCenter.getDec().asDegrees() 

237 

238 params = { 

239 "t": epochMJD, 

240 "ra": fieldRA, 

241 "dec": fieldDec, 

242 "radius": queryRadius, 

243 "return_elements": "extended" 

244 } 

245 

246 response = None 

247 try: 

248 response = requests.get(mpSkyURL, params=params, timeout=self.config.mpSkyRequestTimeoutSeconds) 

249 response.raise_for_status() 

250 with pa.input_stream(memoryview(response.content)) as fp: 

251 fp.seek(0) 

252 p = pa.ipc.read_tensor(fp) 

253 op = pa.ipc.read_tensor(fp) 

254 with pa.ipc.open_stream(fp) as reader: 

255 schema = reader.schema 

256 r = next(reader) 

257 

258 # Construct an elements astropy Table 

259 if "q" in schema.names: 

260 if "mpc_orb_jsonb" in schema.names: 

261 colnames = ELEMENTS_LIST_MPC_ORBITS 

262 units_map = ELEMENTS_MPC_ORBITS_UNITS 

263 else: 

264 colnames = ELEMENTS_LIST 

265 units_map = {NAMING_MAP[k]: v for k, v in ELEMENTS_UNITS.items()} 

266 

267 # Columns that are timestamps/datetimes — store as masked strings 

268 DATETIME_COLS = {'created_at', 'fitting_datetime', 'updated_at'} 

269 # Columns that should be stored as plain strings (object dtype in arrow) 

270 STRING_COLS = {'designation', 'id', 'packed_primary_provisional_designation', 

271 'unpacked_primary_provisional_designation', 'mpc_orb_jsonb'} 

272 

273 elements = Table() 

274 for col in colnames: 

275 data = r[col].to_numpy(zero_copy_only=False) 

276 if col in DATETIME_COLS: 

277 # Store as masked empty strings — TODO (DM-53952): handle as datetime 

278 elements[col] = MaskedColumn( 

279 np.full(len(data), "", dtype=str), mask=np.ones(len(data), dtype=bool) 

280 ) 

281 elif col in STRING_COLS: 

282 # Ensure clean string dtype, not object 

283 elements[col] = np.array(data, dtype=str) 

284 else: 

285 unit = units_map.get(col, None) 

286 if unit is None: 

287 elements[col] = MaskedColumn(data) 

288 else: 

289 elements[col] = MaskedColumn(data, unit=unit) 

290 else: 

291 elements = None 

292 

293 object_polynomial = p.to_numpy() 

294 observer_polynomial = op.to_numpy() 

295 

296 n = len(r["name"]) 

297 vmag_data = r["Vmag"].to_numpy() if 'Vmag' in r else np.full(n, np.nan) 

298 

299 mpSkySsObjects = Table({ 

300 'ObjID': r["name"].to_numpy(zero_copy_only=False).astype(str), 

301 'ra': MaskedColumn(r["ra"].to_numpy(), unit=u.deg), 

302 'dec': MaskedColumn(r["dec"].to_numpy(), unit=u.deg), 

303 'tmin': MaskedColumn(r["tmin"].to_numpy(), unit=u.d), 

304 'tmax': MaskedColumn(r["tmax"].to_numpy(), unit=u.d), 

305 'trailedSourceMagTrue': MaskedColumn(vmag_data, unit=u.mag), 

306 'obj_x_poly': [poly[0] for poly in object_polynomial.T], 

307 'obj_y_poly': [poly[1] for poly in object_polynomial.T], 

308 'obj_z_poly': [poly[2] for poly in object_polynomial.T], 

309 'obs_x_poly': [observer_polynomial.T[0] for _ in range(n)], 

310 'obs_y_poly': [observer_polynomial.T[1] for _ in range(n)], 

311 'obs_z_poly': [observer_polynomial.T[2] for _ in range(n)], 

312 'Err(arcsec)': MaskedColumn(np.full(n, 2.0), unit=u.arcsec), 

313 }) 

314 

315 if 'packed_primary_provisional_designation' in elements.colnames: 

316 desigs = elements['packed_primary_provisional_designation'].data 

317 else: 

318 desigs = mpSkySsObjects['ObjID'].data 

319 mpSkySsObjects['ssObjectId'] = [obj_id_to_ss_object_id(v) for v in desigs] 

320 

321 nFound = len(mpSkySsObjects) 

322 

323 if nFound == 0: 

324 self.log.info("No Solar System objects found for visit.") 

325 else: 

326 self.log.info("%d Solar System Objects in visit", nFound) 

327 except requests.RequestException as e: 

328 body = getattr(response, "text", "<No response>") 

329 message = f"Query to the remote ephemerides service failed. Got response {body}" 

330 self.log.error(message) 

331 raise NoWorkFound(f"{message}: {e}") from e 

332 

333 return mpSkySsObjects, elements