Coverage for python / lsst / ap / association / loadDiaCatalogs.py: 35%

83 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-05 18:39 +0000

1# This file is part of ap_association. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22"""Task for pre-loading DiaSources and DiaObjects within ap_pipe. 

23""" 

24 

25import pandas as pd 

26 

27import lsst.dax.apdb as daxApdb 

28import lsst.geom 

29import lsst.pex.config as pexConfig 

30import lsst.pipe.base as pipeBase 

31import lsst.pipe.base.connectionTypes as connTypes 

32import lsst.sphgeom 

33 

34from lsst.utils.timer import timeMethod, duration_from_timeMethod 

35 

36from lsst.ap.association.utils import getMidpointFromTimespan, paddedRegion, readSchemaFromApdb 

37from lsst.pipe.tasks.schemaUtils import convertDataFrameToSdmSchema 

38 

39__all__ = ("LoadDiaCatalogsTask", "LoadDiaCatalogsConfig") 

40 

41 

42class LoadDiaCatalogsConnections(pipeBase.PipelineTaskConnections, 

43 dimensions=("instrument", "group", "detector")): 

44 regionTime = connTypes.Input( 

45 doc="The predicted exposure region and time", 

46 name="regionTimeInfo", 

47 storageClass="RegionTimeInfo", 

48 dimensions=("instrument", "group", "detector"), 

49 ) 

50 diaObjects = connTypes.Output( 

51 doc="DiaObjects preloaded from the APDB.", 

52 name="preloaded_diaObjects", 

53 storageClass="ArrowAstropy", 

54 dimensions=("instrument", "group", "detector"), 

55 ) 

56 diaSources = connTypes.Output( 

57 doc="DiaSources preloaded from the APDB.", 

58 name="preloaded_diaSources", 

59 storageClass="ArrowAstropy", 

60 dimensions=("instrument", "group", "detector"), 

61 ) 

62 diaForcedSources = connTypes.Output( 

63 doc="DiaForcedSources preloaded from the APDB.", 

64 name="preloaded_diaForcedSources", 

65 storageClass="ArrowAstropy", 

66 dimensions=("instrument", "group", "detector"), 

67 ) 

68 

69 

70class LoadDiaCatalogsConfig(pipeBase.PipelineTaskConfig, 

71 pipelineConnections=LoadDiaCatalogsConnections): 

72 """Config class for LoadDiaCatalogsConfig. 

73 """ 

74 apdb_config_url = pexConfig.Field( 

75 dtype=str, 

76 default=None, 

77 optional=False, 

78 doc="A config file specifying the APDB and its connection parameters, " 

79 "typically written by the apdb-cli command-line utility. " 

80 "The database must already be initialized.", 

81 ) 

82 

83 pixelMargin = pexConfig.RangeField( 

84 doc="Padding to add to 4 all edges of the bounding box (pixels)", 

85 dtype=int, 

86 default=250, 

87 min=0, 

88 deprecated="This config has been replaced by `angleMargin`" 

89 "Will be removed after v28.", 

90 ) 

91 angleMargin = pexConfig.RangeField( 

92 doc="Padding to add to the radius of the bounding circle (arcseconds)", 

93 dtype=float, 

94 default=20, 

95 min=0, 

96 ) 

97 doLoadForcedSources = pexConfig.Field( 

98 dtype=bool, 

99 default=True, 

100 deprecated="Added to allow disabling forced sources for performance " 

101 "reasons during the ops rehearsal. " 

102 "It is expected to be removed.", 

103 doc="Load forced DiaSource history from the APDB? " 

104 "This should only be turned off for debugging purposes.", 

105 ) 

106 

107 

108class LoadDiaCatalogsTask(pipeBase.PipelineTask): 

109 """Retrieve DiaObjects and associated DiaSources from the Apdb given an 

110 input exposure. 

111 """ 

112 ConfigClass = LoadDiaCatalogsConfig 

113 _DefaultName = "loadDiaCatalogs" 

114 

115 def __init__(self, **kwargs): 

116 super().__init__(**kwargs) 

117 self.apdb = daxApdb.Apdb.from_uri(self.config.apdb_config_url) 

118 

119 @timeMethod 

120 def run(self, regionTime): 

121 """Preload all DiaObjects and DiaSources from the Apdb given the 

122 current exposure. 

123 

124 Parameters 

125 ---------- 

126 regionTime : `lsst.pipe.base.utils.RegionTimeInfo` 

127 A serializable container for a sky region and timespan. 

128 

129 Returns 

130 ------- 

131 result : `lsst.pipe.base.Struct` 

132 Results struct with components. 

133 

134 - ``diaObjects`` : Complete set of DiaObjects covering the input 

135 exposure padded by ``pixelMargin``. DataFrame is indexed by 

136 the ``diaObjectId`` column. (`pandas.DataFrame`) 

137 - ``diaSources`` : Complete set of DiaSources covering the input 

138 exposure padded by ``pixelMargin``. DataFrame is indexed by 

139 ``diaObjectId``, ``band``, ``diaSourceId`` columns. 

140 (`pandas.DataFrame`) 

141 - ``diaForcedSources`` : Complete set of forced photometered fluxes 

142 on the past 12 months of difference images at DiaObject locations. 

143 

144 Raises 

145 ------ 

146 RuntimeError 

147 Raised if the Database query failed to load DiaObjects. 

148 """ 

149 region = paddedRegion(regionTime.region, 

150 lsst.sphgeom.Angle.fromDegrees(self.config.angleMargin/3600.)) 

151 schema = readSchemaFromApdb(self.apdb) 

152 

153 try: 

154 # This is the first database query. 

155 try: 

156 diaObjects = self.loadDiaObjects(region, schema) 

157 finally: 

158 self.metadata["loadDiaObjectsDuration"] = duration_from_timeMethod( 

159 self.metadata, "loadDiaObjects", clock="Utc") 

160 self.log.verbose("DiaObjects: Took %.4f seconds", self.metadata["loadDiaObjectsDuration"]) 

161 

162 # Load diaSources and forced sources up to the time of the exposure 

163 # The timespan may include significant padding, so use the midpoint to 

164 # avoid missing valid recent diaSources. 

165 visitTime = getMidpointFromTimespan(regionTime.timespan) 

166 

167 try: 

168 diaSources = self.loadDiaSources(diaObjects, region, visitTime, schema) 

169 finally: 

170 self.metadata["loadDiaSourcesDuration"] = duration_from_timeMethod( 

171 self.metadata, "loadDiaSources", clock="Utc") 

172 self.log.verbose("DiaSources: Took %.4f seconds", self.metadata["loadDiaSourcesDuration"]) 

173 

174 if self.config.doLoadForcedSources: 

175 try: 

176 diaForcedSources = self.loadDiaForcedSources(diaObjects, region, visitTime, schema) 

177 finally: 

178 self.metadata["loadDiaForcedSourcesDuration"] = duration_from_timeMethod( 

179 self.metadata, "loadDiaForcedSources", clock="Utc") 

180 self.log.verbose("DiaForcedSources: Took %.4f seconds", 

181 self.metadata["loadDiaForcedSourcesDuration"]) 

182 else: 

183 diaForcedSources = pd.DataFrame(columns=["diaObjectId", "diaForcedSourceId"]) 

184 self.metadata["loadDiaForcedSourcesDuration"] = -1 

185 finally: 

186 # Loki can add up the three individual times, but a combined log puts less load on the server. 

187 self.log.verbose("All catalogs: Took %.4f seconds", 

188 self.metadata.get("loadDiaObjectsDuration", 0) 

189 + self.metadata.get("loadDiaSourcesDuration", 0) 

190 + max(0, self.metadata.get("loadDiaForcedSourcesDuration", 0)) 

191 ) 

192 

193 return pipeBase.Struct( 

194 diaObjects=diaObjects, 

195 diaSources=diaSources, 

196 diaForcedSources=diaForcedSources) 

197 

198 @timeMethod 

199 def loadDiaObjects(self, region, schema): 

200 """Load DiaObjects from the Apdb based on their HTM location. 

201 

202 Parameters 

203 ---------- 

204 region : `sphgeom.Region` 

205 Region of interest. 

206 schema : 'dict' of `lsst.dax.apdb.apdbSchema.ApdbSchema` 

207 A dict of the schemas in the apdb. 

208 

209 Returns 

210 ------- 

211 diaObjects : `pandas.DataFrame` 

212 DiaObjects loaded from the Apdb that are within the area defined 

213 by ``pixelRanges``. 

214 """ 

215 diaObjects = self.apdb.getDiaObjects(region) 

216 

217 diaObjects.set_index("diaObjectId", drop=False, inplace=True) 

218 if diaObjects.index.has_duplicates: 

219 self.log.warning( 

220 "Duplicate DiaObjects loaded from the Apdb. This may cause " 

221 "downstream pipeline issues. Dropping duplicated rows") 

222 # Drop duplicates via index and keep the first appearance. 

223 diaObjects = diaObjects.groupby(diaObjects.index).first() 

224 self.log.info("Loaded %i DiaObjects", len(diaObjects)) 

225 

226 return convertDataFrameToSdmSchema(schema, diaObjects, tableName="DiaObject", skipIndex=True) 

227 

228 @timeMethod 

229 def loadDiaSources(self, diaObjects, region, dateTime, schema): 

230 """Load DiaSources from the Apdb based on their diaObjectId or 

231 location. 

232 

233 Variable used to load sources is set in config. 

234 

235 Parameters 

236 ---------- 

237 diaObjects : `pandas.DataFrame` 

238 DiaObjects loaded from the Apdb that are within the area defined 

239 by ``pixelRanges``. 

240 region : `sphgeom.Region` 

241 Region of interest. 

242 dateTime : `astropy.time.Time` 

243 Time of the current visit 

244 schema : 'dict' of `lsst.dax.apdb.apdbSchema.ApdbSchema` 

245 A dict of the schemas in the apdb. 

246 

247 Returns 

248 ------- 

249 DiaSources : `pandas.DataFrame` 

250 DiaSources loaded from the Apdb that are within the area defined 

251 by ``pixelRange`` and associated with ``diaObjects``. 

252 """ 

253 diaSources = self.apdb.getDiaSources(region, diaObjects.loc[:, "diaObjectId"], dateTime) 

254 

255 diaSources.set_index(["diaObjectId", "band", "diaSourceId"], 

256 drop=False, 

257 inplace=True) 

258 if diaSources.index.has_duplicates: 

259 self.log.warning( 

260 "Duplicate DiaSources loaded from the Apdb. This may cause " 

261 "downstream pipeline issues. Dropping duplicated rows") 

262 # Drop duplicates via index and keep the first appearance. Reset 

263 # due to the index shape being slight different thatn expected. 

264 diaSources = diaSources.groupby(diaSources.index).first().reset_index(drop=True) 

265 diaSources.set_index(["diaObjectId", "band", "diaSourceId"], 

266 drop=False, 

267 inplace=True) 

268 self.log.info("Loaded %i DiaSources", len(diaSources)) 

269 

270 return convertDataFrameToSdmSchema(schema, diaSources, tableName="DiaSource", skipIndex=True) 

271 

272 @timeMethod 

273 def loadDiaForcedSources(self, diaObjects, region, dateTime, schema): 

274 """Load DiaObjects from the Apdb based on their HTM location. 

275 

276 Parameters 

277 ---------- 

278 diaObjects : `pandas.DataFrame` 

279 DiaObjects loaded from the Apdb. 

280 region : `sphgeom.Region` 

281 Region of interest. 

282 dateTime : `astropy.time.Time` 

283 Time of the current visit 

284 schema : 'dict' of `lsst.dax.apdb.apdbSchema.ApdbSchema` 

285 A dict of the schemas in the apdb. 

286 

287 Returns 

288 ------- 

289 diaObjects : `pandas.DataFrame` 

290 DiaObjects loaded from the Apdb that are within the area defined 

291 by ``pixelRanges``. 

292 """ 

293 

294 if len(diaObjects) == 0: 

295 # If no diaObjects are available return an empty DataFrame with 

296 # the the column used for indexing later in AssociationTask. 

297 diaForcedSources = pd.DataFrame(columns=["diaObjectId", 

298 "diaForcedSourceId"]) 

299 else: 

300 diaForcedSources = self.apdb.getDiaForcedSources( 

301 region, 

302 diaObjects.loc[:, "diaObjectId"], 

303 dateTime) 

304 

305 diaForcedSources.set_index(["diaObjectId", "diaForcedSourceId"], 

306 drop=False, 

307 inplace=True) 

308 if diaForcedSources.index.has_duplicates: 

309 self.log.warning( 

310 "Duplicate DiaForcedSources loaded from the Apdb. This may " 

311 "cause downstream pipeline issues. Dropping duplicated rows.") 

312 # Drop duplicates via index and keep the first appearance. Reset 

313 # due to the index shape being slightly different than expected. 

314 diaForcedSources = diaForcedSources.groupby(diaForcedSources.index).first() 

315 diaForcedSources.reset_index(drop=True, inplace=True) 

316 diaForcedSources.set_index(["diaObjectId", "diaForcedSourceId"], 

317 drop=False, 

318 inplace=True) 

319 nVisits = 0 if diaForcedSources.empty else len(set(diaForcedSources["visit"])) 

320 self.log.info("Loaded %i DiaForcedSources from %i visits", len(diaForcedSources), nVisits) 

321 

322 return convertDataFrameToSdmSchema(schema, diaForcedSources, tableName="DiaForcedSource", 

323 skipIndex=True)