Coverage for python / lsst / ap / association / loadDiaCatalogs.py: 35%
83 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 09:05 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 09:05 +0000
1# This file is part of ap_association.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22"""Task for pre-loading DiaSources and DiaObjects within ap_pipe.
23"""
25import pandas as pd
27import lsst.dax.apdb as daxApdb
28import lsst.geom
29import lsst.pex.config as pexConfig
30import lsst.pipe.base as pipeBase
31import lsst.pipe.base.connectionTypes as connTypes
32import lsst.sphgeom
34from lsst.utils.timer import timeMethod, duration_from_timeMethod
36from lsst.ap.association.utils import getMidpointFromTimespan, paddedRegion, readSchemaFromApdb
37from lsst.pipe.tasks.schemaUtils import convertDataFrameToSdmSchema
39__all__ = ("LoadDiaCatalogsTask", "LoadDiaCatalogsConfig")
42class LoadDiaCatalogsConnections(pipeBase.PipelineTaskConnections,
43 dimensions=("instrument", "group", "detector")):
44 regionTime = connTypes.Input(
45 doc="The predicted exposure region and time",
46 name="regionTimeInfo",
47 storageClass="RegionTimeInfo",
48 dimensions=("instrument", "group", "detector"),
49 )
50 diaObjects = connTypes.Output(
51 doc="DiaObjects preloaded from the APDB.",
52 name="preloaded_diaObjects",
53 storageClass="ArrowAstropy",
54 dimensions=("instrument", "group", "detector"),
55 )
56 diaSources = connTypes.Output(
57 doc="DiaSources preloaded from the APDB.",
58 name="preloaded_diaSources",
59 storageClass="ArrowAstropy",
60 dimensions=("instrument", "group", "detector"),
61 )
62 diaForcedSources = connTypes.Output(
63 doc="DiaForcedSources preloaded from the APDB.",
64 name="preloaded_diaForcedSources",
65 storageClass="ArrowAstropy",
66 dimensions=("instrument", "group", "detector"),
67 )
70class LoadDiaCatalogsConfig(pipeBase.PipelineTaskConfig,
71 pipelineConnections=LoadDiaCatalogsConnections):
72 """Config class for LoadDiaCatalogsConfig.
73 """
74 apdb_config_url = pexConfig.Field(
75 dtype=str,
76 default=None,
77 optional=False,
78 doc="A config file specifying the APDB and its connection parameters, "
79 "typically written by the apdb-cli command-line utility. "
80 "The database must already be initialized.",
81 )
83 pixelMargin = pexConfig.RangeField(
84 doc="Padding to add to 4 all edges of the bounding box (pixels)",
85 dtype=int,
86 default=250,
87 min=0,
88 deprecated="This config has been replaced by `angleMargin`"
89 "Will be removed after v28.",
90 )
91 angleMargin = pexConfig.RangeField(
92 doc="Padding to add to the radius of the bounding circle (arcseconds)",
93 dtype=float,
94 default=20,
95 min=0,
96 )
97 doLoadForcedSources = pexConfig.Field(
98 dtype=bool,
99 default=True,
100 deprecated="Added to allow disabling forced sources for performance "
101 "reasons during the ops rehearsal. "
102 "It is expected to be removed.",
103 doc="Load forced DiaSource history from the APDB? "
104 "This should only be turned off for debugging purposes.",
105 )
108class LoadDiaCatalogsTask(pipeBase.PipelineTask):
109 """Retrieve DiaObjects and associated DiaSources from the Apdb given an
110 input exposure.
111 """
112 ConfigClass = LoadDiaCatalogsConfig
113 _DefaultName = "loadDiaCatalogs"
115 def __init__(self, **kwargs):
116 super().__init__(**kwargs)
117 self.apdb = daxApdb.Apdb.from_uri(self.config.apdb_config_url)
119 @timeMethod
120 def run(self, regionTime):
121 """Preload all DiaObjects and DiaSources from the Apdb given the
122 current exposure.
124 Parameters
125 ----------
126 regionTime : `lsst.pipe.base.utils.RegionTimeInfo`
127 A serializable container for a sky region and timespan.
129 Returns
130 -------
131 result : `lsst.pipe.base.Struct`
132 Results struct with components.
134 - ``diaObjects`` : Complete set of DiaObjects covering the input
135 exposure padded by ``pixelMargin``. DataFrame is indexed by
136 the ``diaObjectId`` column. (`pandas.DataFrame`)
137 - ``diaSources`` : Complete set of DiaSources covering the input
138 exposure padded by ``pixelMargin``. DataFrame is indexed by
139 ``diaObjectId``, ``band``, ``diaSourceId`` columns.
140 (`pandas.DataFrame`)
141 - ``diaForcedSources`` : Complete set of forced photometered fluxes
142 on the past 12 months of difference images at DiaObject locations.
144 Raises
145 ------
146 RuntimeError
147 Raised if the Database query failed to load DiaObjects.
148 """
149 region = paddedRegion(regionTime.region,
150 lsst.sphgeom.Angle.fromDegrees(self.config.angleMargin/3600.))
151 schema = readSchemaFromApdb(self.apdb)
153 try:
154 # This is the first database query.
155 try:
156 diaObjects = self.loadDiaObjects(region, schema)
157 finally:
158 self.metadata["loadDiaObjectsDuration"] = duration_from_timeMethod(
159 self.metadata, "loadDiaObjects", clock="Utc")
160 self.log.verbose("DiaObjects: Took %.4f seconds", self.metadata["loadDiaObjectsDuration"])
162 # Load diaSources and forced sources up to the time of the exposure
163 # The timespan may include significant padding, so use the midpoint to
164 # avoid missing valid recent diaSources.
165 visitTime = getMidpointFromTimespan(regionTime.timespan)
167 try:
168 diaSources = self.loadDiaSources(diaObjects, region, visitTime, schema)
169 finally:
170 self.metadata["loadDiaSourcesDuration"] = duration_from_timeMethod(
171 self.metadata, "loadDiaSources", clock="Utc")
172 self.log.verbose("DiaSources: Took %.4f seconds", self.metadata["loadDiaSourcesDuration"])
174 if self.config.doLoadForcedSources:
175 try:
176 diaForcedSources = self.loadDiaForcedSources(diaObjects, region, visitTime, schema)
177 finally:
178 self.metadata["loadDiaForcedSourcesDuration"] = duration_from_timeMethod(
179 self.metadata, "loadDiaForcedSources", clock="Utc")
180 self.log.verbose("DiaForcedSources: Took %.4f seconds",
181 self.metadata["loadDiaForcedSourcesDuration"])
182 else:
183 diaForcedSources = pd.DataFrame(columns=["diaObjectId", "diaForcedSourceId"])
184 self.metadata["loadDiaForcedSourcesDuration"] = -1
185 finally:
186 # Loki can add up the three individual times, but a combined log puts less load on the server.
187 self.log.verbose("All catalogs: Took %.4f seconds",
188 self.metadata.get("loadDiaObjectsDuration", 0)
189 + self.metadata.get("loadDiaSourcesDuration", 0)
190 + max(0, self.metadata.get("loadDiaForcedSourcesDuration", 0))
191 )
193 return pipeBase.Struct(
194 diaObjects=diaObjects,
195 diaSources=diaSources,
196 diaForcedSources=diaForcedSources)
198 @timeMethod
199 def loadDiaObjects(self, region, schema):
200 """Load DiaObjects from the Apdb based on their HTM location.
202 Parameters
203 ----------
204 region : `sphgeom.Region`
205 Region of interest.
206 schema : 'dict' of `lsst.dax.apdb.apdbSchema.ApdbSchema`
207 A dict of the schemas in the apdb.
209 Returns
210 -------
211 diaObjects : `pandas.DataFrame`
212 DiaObjects loaded from the Apdb that are within the area defined
213 by ``pixelRanges``.
214 """
215 diaObjects = self.apdb.getDiaObjects(region)
217 diaObjects.set_index("diaObjectId", drop=False, inplace=True)
218 if diaObjects.index.has_duplicates:
219 self.log.warning(
220 "Duplicate DiaObjects loaded from the Apdb. This may cause "
221 "downstream pipeline issues. Dropping duplicated rows")
222 # Drop duplicates via index and keep the first appearance.
223 diaObjects = diaObjects.groupby(diaObjects.index).first()
224 self.log.info("Loaded %i DiaObjects", len(diaObjects))
226 return convertDataFrameToSdmSchema(schema, diaObjects, tableName="DiaObject", skipIndex=True)
228 @timeMethod
229 def loadDiaSources(self, diaObjects, region, dateTime, schema):
230 """Load DiaSources from the Apdb based on their diaObjectId or
231 location.
233 Variable used to load sources is set in config.
235 Parameters
236 ----------
237 diaObjects : `pandas.DataFrame`
238 DiaObjects loaded from the Apdb that are within the area defined
239 by ``pixelRanges``.
240 region : `sphgeom.Region`
241 Region of interest.
242 dateTime : `astropy.time.Time`
243 Time of the current visit
244 schema : 'dict' of `lsst.dax.apdb.apdbSchema.ApdbSchema`
245 A dict of the schemas in the apdb.
247 Returns
248 -------
249 DiaSources : `pandas.DataFrame`
250 DiaSources loaded from the Apdb that are within the area defined
251 by ``pixelRange`` and associated with ``diaObjects``.
252 """
253 diaSources = self.apdb.getDiaSources(region, diaObjects.loc[:, "diaObjectId"], dateTime)
255 diaSources.set_index(["diaObjectId", "band", "diaSourceId"],
256 drop=False,
257 inplace=True)
258 if diaSources.index.has_duplicates:
259 self.log.warning(
260 "Duplicate DiaSources loaded from the Apdb. This may cause "
261 "downstream pipeline issues. Dropping duplicated rows")
262 # Drop duplicates via index and keep the first appearance. Reset
263 # due to the index shape being slight different thatn expected.
264 diaSources = diaSources.groupby(diaSources.index).first().reset_index(drop=True)
265 diaSources.set_index(["diaObjectId", "band", "diaSourceId"],
266 drop=False,
267 inplace=True)
268 self.log.info("Loaded %i DiaSources", len(diaSources))
270 return convertDataFrameToSdmSchema(schema, diaSources, tableName="DiaSource", skipIndex=True)
272 @timeMethod
273 def loadDiaForcedSources(self, diaObjects, region, dateTime, schema):
274 """Load DiaObjects from the Apdb based on their HTM location.
276 Parameters
277 ----------
278 diaObjects : `pandas.DataFrame`
279 DiaObjects loaded from the Apdb.
280 region : `sphgeom.Region`
281 Region of interest.
282 dateTime : `astropy.time.Time`
283 Time of the current visit
284 schema : 'dict' of `lsst.dax.apdb.apdbSchema.ApdbSchema`
285 A dict of the schemas in the apdb.
287 Returns
288 -------
289 diaObjects : `pandas.DataFrame`
290 DiaObjects loaded from the Apdb that are within the area defined
291 by ``pixelRanges``.
292 """
294 if len(diaObjects) == 0:
295 # If no diaObjects are available return an empty DataFrame with
296 # the the column used for indexing later in AssociationTask.
297 diaForcedSources = pd.DataFrame(columns=["diaObjectId",
298 "diaForcedSourceId"])
299 else:
300 diaForcedSources = self.apdb.getDiaForcedSources(
301 region,
302 diaObjects.loc[:, "diaObjectId"],
303 dateTime)
305 diaForcedSources.set_index(["diaObjectId", "diaForcedSourceId"],
306 drop=False,
307 inplace=True)
308 if diaForcedSources.index.has_duplicates:
309 self.log.warning(
310 "Duplicate DiaForcedSources loaded from the Apdb. This may "
311 "cause downstream pipeline issues. Dropping duplicated rows.")
312 # Drop duplicates via index and keep the first appearance. Reset
313 # due to the index shape being slightly different than expected.
314 diaForcedSources = diaForcedSources.groupby(diaForcedSources.index).first()
315 diaForcedSources.reset_index(drop=True, inplace=True)
316 diaForcedSources.set_index(["diaObjectId", "diaForcedSourceId"],
317 drop=False,
318 inplace=True)
319 nVisits = 0 if diaForcedSources.empty else len(set(diaForcedSources["visit"]))
320 self.log.info("Loaded %i DiaForcedSources from %i visits", len(diaForcedSources), nVisits)
322 return convertDataFrameToSdmSchema(schema, diaForcedSources, tableName="DiaForcedSource",
323 skipIndex=True)