Coverage for python / lsst / ap / association / exportDiaCatalogs.py: 57%

52 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 09:12 +0000

1# This file is part of ap_association. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22"""Load DiaSources and DiaObjects by patch and tract for reassociation. 

23""" 

24 

25 

26import astropy.units 

27 

28from lsst.daf.base import DateTime 

29import lsst.daf.butler as dafButler 

30import lsst.dax.apdb as daxApdb 

31import lsst.geom 

32import lsst.pex.config as pexConfig 

33import lsst.pipe.base as pipeBase 

34import lsst.pipe.base.connectionTypes as connTypes 

35from lsst.pipe.base.utils import RegionTimeInfo 

36from lsst.skymap import BaseSkyMap 

37import lsst.sphgeom 

38 

39from lsst.ap.association.loadDiaCatalogs import LoadDiaCatalogsTask 

40 

41__all__ = ("ExportDiaCatalogsTask", "ExportDiaCatalogsConfig") 

42 

43 

44class ExportDiaCatalogsConnections(pipeBase.PipelineTaskConnections, 

45 dimensions=("tract", "patch", "skymap")): 

46 

47 skyMap = pipeBase.connectionTypes.Input( 

48 doc="Geometry of the tracts and patches that the coadds are defined on.", 

49 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME, 

50 dimensions=("skymap",), 

51 storageClass="SkyMap", 

52 ) 

53 coaddExposures = pipeBase.connectionTypes.Input( 

54 doc="Coadds for the current patch and tract. There may be multiple " 

55 "bands but we will only use the dataId of the first that is found." 

56 "Used to constrain the quantum graph and only attempt to load data " 

57 "from the APDB for patches with templates.", 

58 dimensions=("tract", "patch", "skymap", "band"), 

59 storageClass="ExposureF", 

60 name="template_coadd", 

61 multiple=True, 

62 deferLoad=True, 

63 ) 

64 diaObjects = connTypes.Output( 

65 doc="DiaObjects preloaded from the APDB.", 

66 name="apdb_export_diaObjects", 

67 storageClass="ArrowAstropy", 

68 dimensions=("tract", "patch"), 

69 ) 

70 diaSources = connTypes.Output( 

71 doc="DiaSources preloaded from the APDB.", 

72 name="apdb_export_diaSources", 

73 storageClass="ArrowAstropy", 

74 dimensions=("tract", "patch"), 

75 ) 

76 diaForcedSources = connTypes.Output( 

77 doc="DiaForcedSources preloaded from the APDB.", 

78 name="apdb_export_diaForcedSources", 

79 storageClass="ArrowAstropy", 

80 dimensions=("tract", "patch"), 

81 ) 

82 

83 def __init__(self, *, config=None): 

84 super().__init__(config=config) 

85 if not config.doLoadForcedSources: 

86 del self.diaForcedSources 

87 

88 

89class ExportDiaCatalogsConfig(pipeBase.PipelineTaskConfig, 

90 pipelineConnections=ExportDiaCatalogsConnections): 

91 """Config class for ExportDiaCatalogsConfig. 

92 """ 

93 apdb_config_url = pexConfig.Field( 

94 dtype=str, 

95 default=None, 

96 optional=False, 

97 doc="A config file specifying the APDB and its connection parameters, " 

98 "typically written by the apdb-cli command-line utility. " 

99 "The database must already be initialized.", 

100 ) 

101 angleMargin = pexConfig.RangeField( 

102 doc="Padding to add to the radius of the bounding circle (arcseconds)", 

103 dtype=float, 

104 default=2, 

105 min=0, 

106 ) 

107 doLoadForcedSources = pexConfig.Field( 

108 dtype=bool, 

109 default=True, 

110 deprecated="Added to allow disabling forced sources for performance " 

111 "reasons during the ops rehearsal. " 

112 "It is expected to be removed.", 

113 doc="Load forced DiaSource history from the APDB? " 

114 "This should only be turned off for debugging purposes.", 

115 ) 

116 

117 

118class ExportDiaCatalogsTask(LoadDiaCatalogsTask): 

119 """Retrieve DiaObjects and associated DiaSources from the Apdb given an 

120 input exposure. 

121 """ 

122 ConfigClass = ExportDiaCatalogsConfig 

123 _DefaultName = "exportDiaCatalogs" 

124 

125 def __init__(self, **kwargs): 

126 super().__init__(**kwargs) 

127 self.apdb = daxApdb.Apdb.from_uri(self.config.apdb_config_url) 

128 self.apdb.read_sources_months = 12 

129 

130 def runQuantum(self, butlerQC, inputRefs, outputRefs): 

131 inputs = butlerQC.get(inputRefs) 

132 

133 skymap = inputs.pop("skyMap") 

134 coaddExposures = inputs.pop("coaddExposures") 

135 dataId = coaddExposures[0].dataId 

136 

137 # region = lsst.sphgeom.ConvexPolygon([pp.getVector() for pp in wcs.pixelToSky(bbox.getCorners())]) 

138 regionTime = self._makeRegionTime(skymap, dataId["tract"], dataId["patch"]) 

139 outputs = self.run(regionTime) 

140 butlerQC.put(outputs, outputRefs) 

141 

142 @staticmethod 

143 def _makeRegionTime(skymap, tract, patch): 

144 """Construct a region and timespan to load catalogs from the APDB. 

145 

146 Parameters 

147 ---------- 

148 skymap : `lsst.skymap.SkyMap` 

149 Geometry of the tracts and patches the coadds are defined on. 

150 tract : `int` 

151 Tract id number. 

152 patch : `int` 

153 Patch id number. 

154 

155 Returns 

156 ------- 

157 regionTime : `lsst.pipe.base.utils.RegionTimeInfo` 

158 A serializable container for a sky region and timespan. 

159 """ 

160 wcs = skymap[tract].getWcs() 

161 bbox = skymap[tract][patch].getInnerBBox() 

162 patchCorners = wcs.pixelToSky(lsst.geom.Box2D(bbox).getCorners()) 

163 region = lsst.sphgeom.ConvexPolygon([pp.getVector() for pp in patchCorners]) 

164 

165 begin = DateTime.now().toAstropy() 

166 end = begin + 30*astropy.units.second 

167 timespan = dafButler.Timespan(begin=begin, end=end) 

168 return RegionTimeInfo(region=region, timespan=timespan)