Coverage for python / lsst / ap / association / exportDiaCatalogs.py: 57%
52 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 09:06 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 09:06 +0000
1# This file is part of ap_association.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22"""Load DiaSources and DiaObjects by patch and tract for reassociation.
23"""
26import astropy.units
28from lsst.daf.base import DateTime
29import lsst.daf.butler as dafButler
30import lsst.dax.apdb as daxApdb
31import lsst.geom
32import lsst.pex.config as pexConfig
33import lsst.pipe.base as pipeBase
34import lsst.pipe.base.connectionTypes as connTypes
35from lsst.pipe.base.utils import RegionTimeInfo
36from lsst.skymap import BaseSkyMap
37import lsst.sphgeom
39from lsst.ap.association.loadDiaCatalogs import LoadDiaCatalogsTask
41__all__ = ("ExportDiaCatalogsTask", "ExportDiaCatalogsConfig")
44class ExportDiaCatalogsConnections(pipeBase.PipelineTaskConnections,
45 dimensions=("tract", "patch", "skymap")):
47 skyMap = pipeBase.connectionTypes.Input(
48 doc="Geometry of the tracts and patches that the coadds are defined on.",
49 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
50 dimensions=("skymap",),
51 storageClass="SkyMap",
52 )
53 coaddExposures = pipeBase.connectionTypes.Input(
54 doc="Coadds for the current patch and tract. There may be multiple "
55 "bands but we will only use the dataId of the first that is found."
56 "Used to constrain the quantum graph and only attempt to load data "
57 "from the APDB for patches with templates.",
58 dimensions=("tract", "patch", "skymap", "band"),
59 storageClass="ExposureF",
60 name="template_coadd",
61 multiple=True,
62 deferLoad=True,
63 )
64 diaObjects = connTypes.Output(
65 doc="DiaObjects preloaded from the APDB.",
66 name="apdb_export_diaObjects",
67 storageClass="ArrowAstropy",
68 dimensions=("tract", "patch"),
69 )
70 diaSources = connTypes.Output(
71 doc="DiaSources preloaded from the APDB.",
72 name="apdb_export_diaSources",
73 storageClass="ArrowAstropy",
74 dimensions=("tract", "patch"),
75 )
76 diaForcedSources = connTypes.Output(
77 doc="DiaForcedSources preloaded from the APDB.",
78 name="apdb_export_diaForcedSources",
79 storageClass="ArrowAstropy",
80 dimensions=("tract", "patch"),
81 )
83 def __init__(self, *, config=None):
84 super().__init__(config=config)
85 if not config.doLoadForcedSources:
86 del self.diaForcedSources
89class ExportDiaCatalogsConfig(pipeBase.PipelineTaskConfig,
90 pipelineConnections=ExportDiaCatalogsConnections):
91 """Config class for ExportDiaCatalogsConfig.
92 """
93 apdb_config_url = pexConfig.Field(
94 dtype=str,
95 default=None,
96 optional=False,
97 doc="A config file specifying the APDB and its connection parameters, "
98 "typically written by the apdb-cli command-line utility. "
99 "The database must already be initialized.",
100 )
101 angleMargin = pexConfig.RangeField(
102 doc="Padding to add to the radius of the bounding circle (arcseconds)",
103 dtype=float,
104 default=2,
105 min=0,
106 )
107 doLoadForcedSources = pexConfig.Field(
108 dtype=bool,
109 default=True,
110 deprecated="Added to allow disabling forced sources for performance "
111 "reasons during the ops rehearsal. "
112 "It is expected to be removed.",
113 doc="Load forced DiaSource history from the APDB? "
114 "This should only be turned off for debugging purposes.",
115 )
118class ExportDiaCatalogsTask(LoadDiaCatalogsTask):
119 """Retrieve DiaObjects and associated DiaSources from the Apdb given an
120 input exposure.
121 """
122 ConfigClass = ExportDiaCatalogsConfig
123 _DefaultName = "exportDiaCatalogs"
125 def __init__(self, **kwargs):
126 super().__init__(**kwargs)
127 self.apdb = daxApdb.Apdb.from_uri(self.config.apdb_config_url)
128 self.apdb.read_sources_months = 12
130 def runQuantum(self, butlerQC, inputRefs, outputRefs):
131 inputs = butlerQC.get(inputRefs)
133 skymap = inputs.pop("skyMap")
134 coaddExposures = inputs.pop("coaddExposures")
135 dataId = coaddExposures[0].dataId
137 # region = lsst.sphgeom.ConvexPolygon([pp.getVector() for pp in wcs.pixelToSky(bbox.getCorners())])
138 regionTime = self._makeRegionTime(skymap, dataId["tract"], dataId["patch"])
139 outputs = self.run(regionTime)
140 butlerQC.put(outputs, outputRefs)
142 @staticmethod
143 def _makeRegionTime(skymap, tract, patch):
144 """Construct a region and timespan to load catalogs from the APDB.
146 Parameters
147 ----------
148 skymap : `lsst.skymap.SkyMap`
149 Geometry of the tracts and patches the coadds are defined on.
150 tract : `int`
151 Tract id number.
152 patch : `int`
153 Patch id number.
155 Returns
156 -------
157 regionTime : `lsst.pipe.base.utils.RegionTimeInfo`
158 A serializable container for a sky region and timespan.
159 """
160 wcs = skymap[tract].getWcs()
161 bbox = skymap[tract][patch].getInnerBBox()
162 patchCorners = wcs.pixelToSky(lsst.geom.Box2D(bbox).getCorners())
163 region = lsst.sphgeom.ConvexPolygon([pp.getVector() for pp in patchCorners])
165 begin = DateTime.now().toAstropy()
166 end = begin + 30*astropy.units.second
167 timespan = dafButler.Timespan(begin=begin, end=end)
168 return RegionTimeInfo(region=region, timespan=timespan)