Coverage for python / lsst / pipe / tasks / mpSkyEphemerisQueryDRP.py: 0%

84 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-22 08:53 +0000

1# This file is part of ap_association. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22"""Solar System Object Query to MPSky in place of a internal Rubin solar 

23system object caching/retrieval code. 

24 

25Will compute the location for of known SSObjects within a visit-detector combination. 

26""" 

27 

28__all__ = ["MPSkyEphemerisQueryDRPConfig", "MPSkyEphemerisQueryDRPTask"] 

29 

30 

31import os 

32import pandas as pd 

33import numpy as np 

34import pyarrow as pa 

35import requests 

36 

37from lsst.geom import SpherePoint, degrees 

38import lsst.pex.config as pexConfig 

39from lsst.pipe.tasks.associationUtils import obj_id_to_ss_object_id 

40from lsst.sphgeom import ConvexPolygon 

41from lsst.utils.timer import timeMethod 

42 

43from lsst.pipe.base import connectionTypes, NoWorkFound, PipelineTask, \ 

44 PipelineTaskConfig, PipelineTaskConnections, Struct 

45 

46 

47class MPSkyEphemerisQueryDRPConnections(PipelineTaskConnections, 

48 dimensions=("instrument", 

49 "visit", "detector")): 

50 finalVisitSummary = connectionTypes.Input( 

51 doc="Summary of visit information including ra, dec, and time", 

52 name="finalVisitSummary", 

53 storageClass="ExposureCatalog", 

54 dimensions={"instrument", "visit"}, 

55 ) 

56 

57 ssObjects = connectionTypes.Output( 

58 doc="MPSky-provided Solar System objects observable in this detector-visit", 

59 name="preloaded_DRP_SsObjects", 

60 storageClass="DataFrame", 

61 dimensions=("instrument", "visit", "detector"), 

62 ) 

63 

64 

65class MPSkyEphemerisQueryDRPConfig( 

66 PipelineTaskConfig, 

67 pipelineConnections=MPSkyEphemerisQueryDRPConnections): 

68 observerCode = pexConfig.Field( 

69 dtype=str, 

70 doc="IAU Minor Planet Center observer code for queries " 

71 "(default is X05 for Rubin Obs./LSST)", 

72 default='X05' 

73 ) 

74 queryBufferRadiusDegrees = pexConfig.Field( 

75 dtype=float, 

76 doc="Buffer radius in degrees added to detector bounding circle for ephemeris " 

77 "cone search. Defaults to 10 deg/day * 30 minutes", 

78 default=0.208 

79 ) 

80 mpSkyRequestTimeoutSeconds = pexConfig.Field( 

81 dtype=float, 

82 doc="Time in seconds to wait for mpSky request before failing ", 

83 default=1.0 

84 ) 

85 mpSkyFallbackURL = pexConfig.Field( 

86 dtype=str, 

87 doc="mpSky default URL if MP_SKY_URL environment variable unset", 

88 default="http://sdfiana014.sdf.slac.stanford.edu:3666/ephemerides/" 

89 ) 

90 

91 

92class MPSkyEphemerisQueryDRPTask(PipelineTask): 

93 """Task to query the MPSky service and retrieve the solar system objects 

94 that are observable within the input visit. 

95 """ 

96 ConfigClass = MPSkyEphemerisQueryDRPConfig 

97 _DefaultName = "mpSkyEphemerisQueryDRP" 

98 

99 def runQuantum( 

100 self, 

101 butlerQC, 

102 inputRefs, 

103 outputRefs, 

104 ): 

105 """Do butler IO and transform to provide in memory 

106 objects for tasks `~Task.run` method. 

107 

108 Parameters 

109 ---------- 

110 butlerQC : `QuantumContext` 

111 A butler which is specialized to operate in the context of a 

112 `lsst.daf.butler.Quantum`. 

113 inputRefs : `InputQuantizedConnection` 

114 Datastructure whose attribute names are the names that identify 

115 connections defined in corresponding `PipelineTaskConnections` 

116 class. The values of these attributes are the 

117 `lsst.daf.butler.DatasetRef` objects associated with the defined 

118 input/prerequisite connections. 

119 outputRefs : `OutputQuantizedConnection` 

120 Datastructure whose attribute names are the names that identify 

121 connections defined in corresponding `PipelineTaskConnections` 

122 class. The values of these attributes are the 

123 `lsst.daf.butler.DatasetRef` objects associated with the defined 

124 output connections. 

125 """ 

126 inputs = butlerQC.get(inputRefs) 

127 detector = butlerQC.quantum.dataId["detector"] 

128 outputs = self.run(detector, **inputs) 

129 butlerQC.put(outputs, outputRefs) 

130 

131 @timeMethod 

132 def run(self, detector, finalVisitSummary): 

133 """Parse the information on the current visit and retrieve the 

134 observable solar system objects from MPSky. 

135 

136 Parameters 

137 ---------- 

138 finalVisitSummary : `lsst.afw.table.ExposureCatalog` 

139 visitInfo including center and time of exposure 

140 

141 Returns 

142 ------- 

143 result : `lsst.pipe.base.Struct` 

144 Results struct with components: 

145 

146 - ``ssObjects`` : `pandas.DataFrame` 

147 DataFrame containing Solar System Objects near the detector 

148 footprint as retrieved by MPSky. The columns are as follows: 

149 

150 ``Name`` 

151 object name (`str`) 

152 ``ra`` 

153 RA in decimal degrees (`float`) 

154 ``dec`` 

155 DEC in decimal degrees (`float`) 

156 ``obj_X_poly``, ``obj_Y_poly``, ``obj_Z_poly`` 

157 Chebyshev coefficients for object path 

158 ``obs_X_poly``, ``obs_Y_poly``, ``obs_Z_poly`` 

159 Chebyshev coefficients for observer path 

160 ``t_min`` 

161 Lower time bound for polynomials 

162 ``t_max`` 

163 Upper time bound for polynomials 

164 """ 

165 row = finalVisitSummary.find(detector) 

166 visitInfo = row.visitInfo 

167 ra, dec = row['ra'], row['dec'] 

168 corners = np.vstack([row['raCorners'], row['decCorners']]).T 

169 corner_coords = [] 

170 for corner in corners: 

171 corner_coords.append(SpherePoint(corner[0], corner[1], units=degrees).getVector()) 

172 detectorPolygon = ConvexPolygon(corner_coords) 

173 expRadius = detectorPolygon.getBoundingCircle().getOpeningAngle().asDegrees() 

174 expMidPointEPOCH = visitInfo.date.toAstropy().mjd 

175 

176 # MPSky service query 

177 mpSkyURL = os.environ.get('MP_SKY_URL', self.config.mpSkyFallbackURL) 

178 mpSkySsObjects = self._mpSkyConeSearch(ra, dec, expMidPointEPOCH, 

179 expRadius + self.config.queryBufferRadiusDegrees, mpSkyURL) 

180 return Struct( 

181 ssObjects=mpSkySsObjects, 

182 ) 

183 

184 def read_mp_sky_response(self, response): 

185 """Extract ephemerides from an MPSky web request 

186 

187 Parameters 

188 ---------- 

189 response : `requests.Response` 

190 MPSky message 

191 

192 Returns 

193 ------- 

194 objID : `np.ndarray` 

195 Designations of nearby objects 

196 ra : `np.ndarray` 

197 Array of object right ascensions 

198 dec : `np.ndarray` 

199 Array of object declinations 

200 object_polynomial : `np.ndarray`, (N,M) 

201 Array of object cartesian position polynomials 

202 observer_polynomial : `np.ndarray`, (N,M) 

203 Array of observer cartesian position polynomials 

204 t_min : `np.ndarray` 

205 Lower time bound for polynomials 

206 t_max : `np.ndarray` 

207 Upper time bound for polynomials 

208 

209 """ 

210 with pa.input_stream(memoryview(response.content)) as stream: 

211 stream.seek(0) 

212 object_polynomial = pa.ipc.read_tensor(stream).to_numpy() 

213 observer_polynomial = pa.ipc.read_tensor(stream).to_numpy() 

214 with pa.ipc.open_stream(stream) as reader: 

215 columns = next(reader) 

216 objID = columns["name"].to_numpy(zero_copy_only=False) 

217 ra = columns["ra"].to_numpy() 

218 dec = columns["dec"].to_numpy() 

219 t_min = columns["tmin"].to_numpy() 

220 t_max = columns["tmax"].to_numpy() 

221 return objID, ra, dec, object_polynomial, observer_polynomial, t_min, t_max 

222 

223 def _mpSkyConeSearch(self, fieldRA, fieldDec, epochMJD, queryRadius, mpSkyURL): 

224 """Query MPSky ephemeris service for objects near the expected detector position 

225 

226 Parameters 

227 ---------- 

228 expCenter : `lsst.geom.SpherePoint` 

229 Center of search cone 

230 epochMJD : `float` 

231 Epoch of cone search, (MJD in UTC). 

232 queryRadius : `float` 

233 Radius of the cone search in degrees. 

234 mpSkyURL : `str` 

235 URL to query for MPSky. 

236 

237 Returns 

238 ------- 

239 mpSkySsObjects : `pandas.DataFrame` 

240 DataFrame with Solar System Object information and RA/DEC position 

241 within the visit. 

242 """ 

243 params = { 

244 "t": epochMJD, 

245 "ra": fieldRA, 

246 "dec": fieldDec, 

247 "radius": queryRadius 

248 } 

249 

250 try: 

251 response = requests.get(mpSkyURL, params=params, timeout=self.config.mpSkyRequestTimeoutSeconds) 

252 response.raise_for_status() 

253 response = self.read_mp_sky_response(response) 

254 objID, ra, dec, object_polynomial, observer_polynomial, tmin, tmax = response 

255 

256 mpSkySsObjects = pd.DataFrame() 

257 mpSkySsObjects['ObjID'] = objID 

258 mpSkySsObjects['ra'] = ra 

259 mpSkySsObjects['obj_x_poly'] = [poly[0] for poly in object_polynomial.T] 

260 mpSkySsObjects['obj_y_poly'] = [poly[1] for poly in object_polynomial.T] 

261 mpSkySsObjects['obj_z_poly'] = [poly[2] for poly in object_polynomial.T] 

262 mpSkySsObjects['obs_x_poly'] = [observer_polynomial.T[0] for 

263 i in range(len(mpSkySsObjects))] 

264 mpSkySsObjects['obs_y_poly'] = [observer_polynomial.T[1] for 

265 i in range(len(mpSkySsObjects))] 

266 mpSkySsObjects['obs_z_poly'] = [observer_polynomial.T[2] for 

267 i in range(len(mpSkySsObjects))] 

268 mpSkySsObjects['tmin'] = tmin 

269 mpSkySsObjects['tmax'] = tmax 

270 mpSkySsObjects['dec'] = dec 

271 mpSkySsObjects['Err(arcsec)'] = 2 

272 mpSkySsObjects['ssObjectId'] = [obj_id_to_ss_object_id(v) for v in mpSkySsObjects['ObjID'].values] 

273 nFound = len(mpSkySsObjects) 

274 

275 if nFound == 0: 

276 self.log.info("No Solar System objects found for visit.") 

277 else: 

278 self.log.info("%d Solar System Objects in visit", nFound) 

279 except requests.RequestException as e: 

280 raise NoWorkFound(f"Failed to connect to the remote ephemerides service: {e}") from e 

281 

282 return mpSkySsObjects