Coverage for python / lsst / pipe / tasks / generateEphemerides.py: 0%

126 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 09:11 +0000

1# This file is part of ap_association. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22"""Generate per-visit solar system ephemerides using Sorcha. 

23 

24Sorcha is an open-source solar system survey simulation tool, which 

25efficiently generates ephemerides for solar system objects from input orbits 

26and observation pointings. It is currently the fastest robust and maintained 

27ephemeris generator, and can fit our use case well. 

28 

29Sorcha is a command line tool, and is not designed for direct use in python. 

30This task creates a temporary directory in which to run Sorcha as designed, 

31providing it with all required input files and reading the Sorcha-generated 

32ephemerides from a csv. While awkward and un-pipetask-like, it works and takes 

33advantage of Sorcha's as-designed speed. 

34 

35Eventually, this should be replaced with adam_core's ephemeris generation 

36tools which propagate forward orbital uncertainty into on-sky ellipses. 

37Doing so will require re-implementing the healpix binning method described 

38in https://arxiv.org/abs/2506.02140. Doing so will not only improve this code 

39but also allow on-sky uncertainties to be used during association. When this is 

40done, mpsky should be modified to do the same. 

41""" 

42 

43__all__ = ["GenerateEphemeridesConfig", "GenerateEphemeridesTask"] 

44 

45 

46from astropy.table import Table 

47import astropy.units as u 

48import glob 

49import numpy as np 

50from importlib import resources 

51import os 

52import pandas as pd 

53from sqlite3 import connect as sqlite_connect 

54from subprocess import Popen, PIPE 

55from tempfile import TemporaryDirectory 

56from textwrap import dedent 

57from time import sleep 

58 

59from lsst.pex.config import Field 

60from lsst.pipe.base import connectionTypes, NoWorkFound, PipelineTask, \ 

61 PipelineTaskConfig, PipelineTaskConnections, Struct 

62import lsst.pipe.tasks 

63from lsst.utils.timer import timeMethod 

64 

65COL_TO_UNIT = {'RATrue_deg': u.deg, 'DecTrue_deg': u.deg, 'phase_deg': u.deg, 'Range_LTC_km': u.km, 

66 'Obj_Sun_x_LTC_km': u.km, 'Obj_Sun_y_LTC_km': u.km, 'Obj_Sun_z_LTC_km': u.km, 

67 'Obj_Sun_vx_LTC_km_s': u.km/u.s, 'Obj_Sun_vy_LTC_km_s': u.km/u.s, 

68 'Obj_Sun_vz_LTC_km_s': u.km/u.s, 'Obs_Sun_x_km': u.km, 'Obs_Sun_y_km': u.km, 

69 'Obs_Sun_z_km': u.km, 'Obs_Sun_vx_km_s': u.km/u.s, 'Obs_Sun_vy_km_s': u.km/u.s, 

70 'Obs_Sun_vz_km_s': u.km/u.s, 'fieldMJD_TAI': u.d, 'fieldJD_TDB': u.d} 

71 

72 

73class GenerateEphemeridesConnections(PipelineTaskConnections, 

74 dimensions=("instrument",)): 

75 

76 visitTable = connectionTypes.Input( 

77 doc="Table summarizing each visit, including ra, dec, and time", 

78 name="preliminary_visit_table", 

79 storageClass="ArrowAstropy", 

80 dimensions={"instrument"}, 

81 ) 

82 mpcorb = connectionTypes.Input( 

83 doc="Minor Planet Center orbit table used for association", 

84 name="mpcorb", 

85 storageClass="ArrowAstropy", 

86 dimensions={}, 

87 ) 

88 # QG generation checks input visits to decide what output visits to expect. 

89 # This dummy per-visit input helps QG generation decide to only expect output 

90 # visits for visits actually present in the run, which this task gets in the 

91 # not-per-visit visitTable input. See discussion on DM-54062 for more details. 

92 dummyPerVisitInput = connectionTypes.Input( 

93 doc="Dummy per-visit input to help trim QG generation", 

94 name="visit_summary", 

95 storageClass="ExposureCatalog", 

96 dimensions={"instrument", "visit"}, 

97 multiple=True, 

98 deferLoad=True, 

99 ) 

100 

101 # The following 9 prequisite inputs are Sorcha's required auxiliary files. 

102 de440s = connectionTypes.PrerequisiteInput( 

103 doc="NAIF DE440 ephemeris file (de440s.bsp)", 

104 name="de440s", 

105 storageClass="SSPAuxiliaryFile", 

106 dimensions={}, 

107 ) 

108 sb441_n16 = connectionTypes.PrerequisiteInput( 

109 doc="NAIF DE440 ephemeris file (sb441_n16.bsp)", 

110 name="sb441_n16", 

111 storageClass="SSPAuxiliaryFile", 

112 dimensions={}, 

113 ) 

114 obsCodes = connectionTypes.PrerequisiteInput( 

115 doc="MPC observatory code file (ObsCodes.json)", 

116 name="obscodes", 

117 storageClass="SSPAuxiliaryFile", 

118 dimensions={}, 

119 ) 

120 linux_p1550p2650 = connectionTypes.PrerequisiteInput( 

121 doc="TODO (linux_p1550p2650.440)", 

122 name="linux_p1550p2650", 

123 storageClass="SSPAuxiliaryFile", 

124 dimensions={}, 

125 ) 

126 pck00010 = connectionTypes.PrerequisiteInput( 

127 doc="orientation of planets, moons, the Sun, and selected asteroids. (pck00010.pck)", 

128 name="pck00010", 

129 storageClass="SSPAuxiliaryFile", 

130 dimensions={}, 

131 ) 

132 earth_latest_high_prec = connectionTypes.PrerequisiteInput( 

133 doc="High-precision Earth orientation parameters (EOP) kernel", 

134 name="earth_latest_high_prec", 

135 storageClass="SSPAuxiliaryFile", 

136 dimensions={}, 

137 ) 

138 earth_620120_250826 = connectionTypes.PrerequisiteInput( 

139 doc="Historical EOP", 

140 name="earth_620120_250826", 

141 storageClass="SSPAuxiliaryFile", 

142 dimensions={}, 

143 ) 

144 earth_2025_250826_2125_predict = connectionTypes.PrerequisiteInput( 

145 doc="Longterm EOP predictions", 

146 name="earth_2025_250826_2125_predict", 

147 storageClass="SSPAuxiliaryFile", 

148 dimensions={}, 

149 ) 

150 naif0012 = connectionTypes.PrerequisiteInput( 

151 doc="Leapsecond tls file", 

152 name="naif0012", 

153 storageClass="SSPAuxiliaryFile", 

154 dimensions={}, 

155 ) 

156 

157 ssObjects = connectionTypes.Output( 

158 doc="Sorcha-provided Solar System objects observable in this detector-visit", 

159 name="preloaded_ss_object_visit", 

160 storageClass="DataFrame", 

161 dimensions=("instrument", "visit"), 

162 multiple=True, 

163 ) 

164 

165 

166class GenerateEphemeridesConfig( 

167 PipelineTaskConfig, 

168 pipelineConnections=GenerateEphemeridesConnections): 

169 observatoryCode = Field( 

170 dtype=str, 

171 doc="IAU Minor Planet Center observer code for queries " 

172 "(default is X05 for Rubin Obs./LSST)", 

173 default='X05' 

174 ) 

175 observatoryFOVRadius = Field( 

176 dtype=float, 

177 doc="The field of view of the observatory (degrees)", 

178 default=2.06, 

179 ) 

180 loggingSleepTime = Field( 

181 dtype=int, 

182 doc="Time (seconds) to sleep between checking Sorcha's progress", 

183 default=60, 

184 ) 

185 

186 

187class GenerateEphemeridesTask(PipelineTask): 

188 """Task to query the Sorcha service and retrieve the solar system objects 

189 that are observable within the input visit. 

190 """ 

191 ConfigClass = GenerateEphemeridesConfig 

192 _DefaultName = "GenerateEphemerides" 

193 

194 def runQuantum( 

195 self, 

196 butlerQC, 

197 inputRefs, 

198 outputRefs, 

199 ): 

200 """Do butler IO and transform to provide in memory 

201 objects for tasks `~Task.run` method. 

202 

203 Parameters 

204 ---------- 

205 butlerQC : `QuantumContext` 

206 A butler which is specialized to operate in the context of a 

207 `lsst.daf.butler.Quantum`. 

208 inputRefs : `InputQuantizedConnection` 

209 Datastructure whose attribute names are the names that identify 

210 connections defined in corresponding `PipelineTaskConnections` 

211 class. The values of these attributes are the 

212 `lsst.daf.butler.DatasetRef` objects associated with the defined 

213 input/prerequisite connections. 

214 outputRefs : `OutputQuantizedConnection` 

215 Datastructure whose attribute names are the names that identify 

216 connections defined in corresponding `PipelineTaskConnections` 

217 class. The values of these attributes are the 

218 `lsst.daf.butler.DatasetRef` objects associated with the defined 

219 output connections. 

220 """ 

221 inputs = butlerQC.get(inputRefs) 

222 del inputs["dummyPerVisitInput"] 

223 outputs = self.run(**inputs) 

224 for ref in outputRefs.ssObjects: 

225 dataId = ref.dataId 

226 if dataId['visit'] in outputs.ssObjects: 

227 ephemeris_visit = outputs.ssObjects[dataId['visit']] 

228 butlerQC.put(ephemeris_visit, ref) 

229 

230 @timeMethod 

231 def run(self, visitTable, mpcorb, de440s, sb441_n16, obsCodes, linux_p1550p2650, pck00010, 

232 earth_latest_high_prec, earth_620120_250826, earth_2025_250826_2125_predict, naif0012): 

233 """Parse the information on the current visit and retrieve the 

234 observable solar system objects from Sorcha. 

235 

236 Parameters 

237 ---------- 

238 visitSummary : `lsst.afw.table.ExposureCatalog` 

239 Has rows with .getVisitInfo, which give the center and time of exposure 

240 

241 mpcorb, de440s, sb441_n16, obsCodes, linux_p1550p2650, pck00010, 

242 earth_latest_high_prec, earth_620120_250826, earth_2025_250826_2125_predict, 

243 naif0012 : `lsst.pipe.tasks.sspAuxiliaryFile.SSPAuxiliaryFile`s 

244 Minor Planet Center orbit table used for association 

245 

246 Returns 

247 ------- 

248 result : `lsst.pipe.base.Struct` 

249 Results struct with components: 

250 

251 - ``ssObjects`` : `pandas.DataFrame` 

252 DataFrame containing Solar System Objects near the detector 

253 footprint as retrieved by Sorcha. The columns are as follows: 

254 

255 ``Name`` 

256 object name (`str`) 

257 ``ra`` 

258 RA in decimal degrees (`float`) 

259 ``dec`` 

260 DEC in decimal degrees (`float`) 

261 

262 """ 

263 if len(visitTable) == 0: 

264 raise NoWorkFound("No visits input!") 

265 visitTable = visitTable.to_pandas() 

266 ( 

267 visits, epochMJD, fieldRA, fieldDec, visitTime 

268 ) = visitTable[ 

269 ['visitId', 'expMidptMJD', 'ra', 'dec', 'expTime'] 

270 ].values.T 

271 

272 # Confused about seconds/days units here 

273 n = len(epochMJD) 

274 inputVisits = pd.DataFrame({ 

275 "observationMidpointMJD": epochMJD, 

276 "fieldRA": fieldRA, 

277 "fieldDec": fieldDec, 

278 "observationId": visits, 

279 "visitTime": np.ones(n) * visitTime, 

280 "observationStartMJD": epochMJD - (visitTime / 2) / 86400.0, 

281 "visitExposureTime": visitTime, 

282 

283 # The following columns are required by Socha but only used after ephemeris generation 

284 "filter": ["r"] * n, # Let's call V-band "r-band" to play nicely with Sorcha... 

285 "seeingFwhmGeom": [0.01] * n, 

286 "seeingFwhmEff": [0.01] * n, 

287 "fiveSigmaDepth": [9999] * n, 

288 "rotSkyPos": [0] * n, 

289 }) 

290 

291 desig_dict = {un: pack for pack, un in mpcorb[['packed_primary_provisional_designation', 

292 'unpacked_primary_provisional_designation']]} 

293 

294 inputOrbits = Table(mpcorb[['unpacked_primary_provisional_designation', 'q', 'e', 'i', 

295 'node', 'argperi', 'peri_time', 'epoch_mjd']]) 

296 

297 inputOrbits.rename_columns( 

298 ['unpacked_primary_provisional_designation', 'epoch_mjd', 'i', 'argperi', 'peri_time'], 

299 ['ObjID', 'epochMJD_TDB', 'inc', 'argPeri', 't_p_MJD_TDB'] 

300 ) 

301 inputOrbits['FORMAT'] = 'COM' 

302 # Colors exactly like jake's prep_input_colors 

303 inputColors = inputOrbits[["ObjID"]].copy() 

304 inputColors["H_r"] = mpcorb['h'] 

305 inputColors["GS"] = 0.15 # Traditional 

306 

307 eph_str = resources.files(lsst.pipe.tasks).parents[3].joinpath("data/eph.ini").read_text() 

308 eph_str = eph_str.replace("{OBSCODE}", self.config.observatoryCode) 

309 eph_str = eph_str.replace("{FOV}", str(self.config.observatoryFOVRadius)) 

310 

311 with TemporaryDirectory() as tmpdirname: 

312 self.log.info(f'temp dir: {tmpdirname}') 

313 

314 # Observations SQLite 

315 conn = sqlite_connect(f'{tmpdirname}/pointings.db') 

316 inputVisits.to_sql('observations', conn, if_exists='replace', index=False) 

317 conn.close() 

318 

319 with open(f'{tmpdirname}/eph.ini', 'w') as ephFile: 

320 ephFile.write(eph_str) 

321 

322 cache = f'{tmpdirname}/sorcha_cache/' 

323 self.log.info('making cache') 

324 os.mkdir(cache) 

325 for filename, fileref in [ 

326 ('de440s.bsp', de440s), 

327 ('sb441-n16.bsp', sb441_n16), 

328 ('ObsCodes.json', obsCodes), 

329 ('linux_p1550p2650.440', linux_p1550p2650), 

330 ('pck00010.pck', pck00010), 

331 ('earth_latest_high_prec.bpc', earth_latest_high_prec), 

332 ('earth_620120_250826.bpc', earth_620120_250826), 

333 ('earth_2025_250826_2125_predict.bpc', earth_2025_250826_2125_predict), 

334 ('naif0012.tls', naif0012), 

335 ]: 

336 self.log.info(f'writing {filename}') 

337 with open(cache + filename, 'wb') as file: 

338 file.write(fileref.fileContents.read()) 

339 

340 abspath = f'{tmpdirname}/sorcha_cache/' 

341 split = 79 

342 n_iter = int(len(str(abspath)) / split) # Number of splits required 

343 for n in range(n_iter, 0, -1): 

344 abspath = abspath[:split * n] + "+' '" + abspath[split * n:] 

345 

346 meta_kernel_text = dedent(f"""\ 

347 \\begindata 

348 

349 PATH_VALUES = ('{abspath}') 

350 

351 PATH_SYMBOLS = ('A') 

352 

353 KERNELS_TO_LOAD=( 

354 '$A/naif0012.tls', 

355 '$A/earth_620120_250826.bpc', 

356 '$A/earth_2025_250826_2125_predict.bpc', 

357 '$A/pck00010.pck', 

358 '$A/de440s.bsp', 

359 '$A/earth_latest_high_prec.bpc', 

360 ) 

361 

362 \\begintext 

363 """) 

364 with open(f'{tmpdirname}/sorcha_cache/meta_kernel.txt', 'w') as meta_kernel_file: 

365 meta_kernel_file.write(meta_kernel_text) 

366 self.log.info('Sorcha process begun') 

367 

368 # Orbits and colors 

369 inputOrbits.write(f'{tmpdirname}/orbits.csv', format='ascii.csv') 

370 inputColors.write(f'{tmpdirname}/colors.csv', format='ascii.csv') 

371 

372 # FIXME: this is a workaround for breakage in sbpy 0.5.0 caused 

373 # by astropy 7.2.0. DM-53467 once sbpy is updated. 

374 try: 

375 import sbpy.photometry # noqa: F401 

376 sorcha_run = ["sorcha", "run"] 

377 except ImportError: 

378 import textwrap 

379 self.log.info("WARNING: Running sorcha-run patched for sbpy issues.") 

380 sorcha_run = ["python", "-c", textwrap.dedent(""" 

381 import re, sys 

382 print("WARNING: Running sorcha-run patched for sbpy issues.", file=sys.stderr) 

383 

384 import astropy.units.utils 

385 import astropy.units.docgen 

386 astropy.units.utils.generate_unit_summary = \ 

387 astropy.units.docgen.generate_unit_summary 

388 

389 from sorcha_cmdline.run import main 

390 sys.exit(main()) 

391 """)] 

392 proc = Popen( 

393 sorcha_run 

394 + [ 

395 "-c", f"{tmpdirname}/eph.ini", 

396 "-o", f"{tmpdirname}/", 

397 "--ob", f"{tmpdirname}/orbits.csv", 

398 "-p", f"{tmpdirname}/colors.csv", 

399 "--pd", f"{tmpdirname}/pointings.db", 

400 "--ew", f"{tmpdirname}/ephemeris", 

401 "--ar", f"{tmpdirname}/sorcha_cache/", 

402 "-t", "sorcha_output", 

403 ], 

404 stdout=PIPE, 

405 stderr=PIPE, 

406 text=True 

407 ) 

408 # As long as Sorcha is running, print its output or 'still running' messages 

409 outfile = None 

410 n_iter = 0 

411 while proc.poll() is None: 

412 sleep(self.config.loggingSleepTime) 

413 n_iter += 1 

414 if outfile is None: 

415 outfile = open(glob.glob(f'{tmpdirname}/sorcha_output*.log')[0], 'r') 

416 new_text = outfile.read() 

417 if len(new_text) > 0: 

418 self.log.info(new_text) 

419 else: 

420 seconds_elapsed = n_iter * self.config.loggingSleepTime 

421 self.log.info(f"Sorcha is still running ({seconds_elapsed} seconds)") 

422 

423 stdout, stderr = proc.communicate() 

424 self.log.info(f"Sorcha STDOUT:\n {stdout}") 

425 self.log.info(f"Sorcha STDERR:\n {stderr}") 

426 

427 eph_path = f'{tmpdirname}/sorcha_output.csv' 

428 if not os.path.exists(eph_path): 

429 raise FileNotFoundError( 

430 " Sorcha did not create sorcha_output. Check STDOUT/STDERR above. " 

431 f"Directory contents:\n{os.listdir(tmpdirname)}" 

432 ) 

433 

434 # Return Sorcha output directly 

435 ephemeris = Table.read(eph_path, format='ascii.csv') 

436 ephemeris['packed_desig'] = np.array([desig_dict[k] for k in ephemeris['ObjID']]) 

437 for col in COL_TO_UNIT: 

438 ephemeris[col] *= COL_TO_UNIT[col] 

439 

440 perVisitSsObjects = {group['FieldID'][0]: group for group in ephemeris.group_by("FieldID").groups} 

441 for v in visits: 

442 if v not in perVisitSsObjects: 

443 perVisitSsObjects[v] = ephemeris[:0] 

444 return Struct(ssObjects=perVisitSsObjects)