Coverage for python / lsst / pipe / tasks / match_tract_catalog.py: 0%

72 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-22 09:17 +0000

1# This file is part of pipe_tasks. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22__all__ = [ 

23 'MatchTractCatalogSubConfig', 'MatchTractCatalogSubTask', 

24 'MatchTractCatalogConfig', 'MatchTractCatalogTask' 

25] 

26 

27import lsst.afw.geom as afwGeom 

28import lsst.pex.config as pexConfig 

29import lsst.pipe.base as pipeBase 

30import lsst.pipe.base.connectionTypes as cT 

31from lsst.skymap import BaseSkyMap 

32 

33from abc import ABC, abstractmethod 

34 

35import astropy.table 

36import pandas as pd 

37from typing import Tuple, Set 

38 

39 

40MatchTractCatalogBaseTemplates = { 

41 "name_input_cat_ref": "truth_summary", 

42 "name_input_cat_target": "objectTable_tract", 

43} 

44 

45 

46class MatchTractCatalogConnections( 

47 pipeBase.PipelineTaskConnections, 

48 dimensions=("tract", "skymap"), 

49 defaultTemplates=MatchTractCatalogBaseTemplates, 

50): 

51 cat_ref = cT.Input( 

52 doc="Reference object catalog to match from", 

53 name="{name_input_cat_ref}", 

54 storageClass="ArrowAstropy", 

55 dimensions=("tract", "skymap"), 

56 deferLoad=True, 

57 ) 

58 cat_target = cT.Input( 

59 doc="Target object catalog to match", 

60 name="{name_input_cat_target}", 

61 storageClass="ArrowAstropy", 

62 dimensions=("tract", "skymap"), 

63 deferLoad=True, 

64 ) 

65 skymap = cT.Input( 

66 doc="Input definition of geometry/bbox and projection/wcs for coadded exposures", 

67 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME, 

68 storageClass="SkyMap", 

69 dimensions=("skymap",), 

70 ) 

71 cat_output_ref = cT.Output( 

72 doc="Reference matched catalog with indices of target matches", 

73 name="match_ref_{name_input_cat_ref}_{name_input_cat_target}", 

74 storageClass="ArrowAstropy", 

75 dimensions=("tract", "skymap"), 

76 ) 

77 cat_output_target = cT.Output( 

78 doc="Target matched catalog with indices of reference matches", 

79 name="match_target_{name_input_cat_ref}_{name_input_cat_target}", 

80 storageClass="ArrowAstropy", 

81 dimensions=("tract", "skymap"), 

82 ) 

83 

84 def __init__(self, *, config=None): 

85 if config.refcat_sharding_type != "tract": 

86 if config.refcat_sharding_type == "none": 

87 old = self.cat_ref 

88 self.cat_ref = cT.Input( 

89 doc=old.doc, 

90 name=old.name, 

91 storageClass=old.storageClass, 

92 dimensions=(), 

93 deferLoad=old.deferLoad, 

94 ) 

95 else: 

96 raise NotImplementedError(f"{config.refcat_sharding_type=} not implemented") 

97 if config.target_sharding_type != "tract": 

98 if config.target_sharding_type == "none": 

99 old = self.cat_target 

100 self.cat_target = cT.Input( 

101 doc=old.doc, 

102 name=old.name, 

103 storageClass=old.storageClass, 

104 dimensions=(), 

105 deferLoad=old.deferLoad, 

106 ) 

107 else: 

108 raise NotImplementedError(f"{config.target_sharding_type=} not implemented") 

109 

110 

111class MatchTractCatalogSubConfig(pexConfig.Config): 

112 """Config class for the MatchTractCatalogSubTask to define methods returning 

113 values that depend on multiple config settings. 

114 """ 

115 @property 

116 @abstractmethod 

117 def columns_in_ref(self) -> Set[str]: 

118 raise NotImplementedError() 

119 

120 @property 

121 @abstractmethod 

122 def columns_in_target(self) -> Set[str]: 

123 raise NotImplementedError() 

124 

125 

126class MatchTractCatalogSubTask(pipeBase.Task, ABC): 

127 """An abstract interface for subtasks of MatchTractCatalogTask to match 

128 two tract object catalogs. 

129 

130 Parameters 

131 ---------- 

132 **kwargs 

133 Additional arguments to be passed to the `lsst.pipe.base.Task` 

134 constructor. 

135 """ 

136 ConfigClass = MatchTractCatalogSubConfig 

137 

138 def __init__(self, **kwargs): 

139 super().__init__(**kwargs) 

140 

141 @abstractmethod 

142 def run( 

143 self, 

144 catalog_ref: pd.DataFrame | astropy.table.Table, 

145 catalog_target: pd.DataFrame | astropy.table.Table, 

146 wcs: afwGeom.SkyWcs = None, 

147 ) -> pipeBase.Struct: 

148 """Match sources in a reference tract catalog with a target catalog. 

149 

150 Parameters 

151 ---------- 

152 catalog_ref : `pandas.DataFrame` | `astropy.table.Table` 

153 A reference catalog to match objects/sources from. 

154 catalog_target : `pandas.DataFrame` | `astropy.table.Table` 

155 A target catalog to match reference objects/sources to. 

156 wcs : `lsst.afw.image.SkyWcs` 

157 A coordinate system to convert catalog positions to sky coordinates. 

158 

159 Returns 

160 ------- 

161 retStruct : `lsst.pipe.base.Struct` 

162 A struct with output_ref and output_target attribute containing the 

163 output matched catalogs. 

164 """ 

165 raise NotImplementedError() 

166 

167 

168class MatchTractCatalogConfig( 

169 pipeBase.PipelineTaskConfig, 

170 pipelineConnections=MatchTractCatalogConnections, 

171): 

172 """Configure a MatchTractCatalogTask, including a configurable matching subtask. 

173 """ 

174 match_tract_catalog = pexConfig.ConfigurableField( 

175 target=MatchTractCatalogSubTask, 

176 doc="Task to match sources in a reference tract catalog with a target catalog", 

177 ) 

178 refcat_sharding_type = pexConfig.ChoiceField[str]( 

179 doc="The type of sharding (spatial splitting) for the reference catalog", 

180 allowed={"tract": "Tract-based shards", "none": "No sharding at all"}, 

181 default="tract", 

182 ) 

183 target_sharding_type = pexConfig.ChoiceField[str]( 

184 doc="The type of sharding (spatial splitting) for the target catalog", 

185 allowed={"tract": "Tract-based shards", "none": "No sharding at all"}, 

186 default="tract", 

187 ) 

188 

189 def get_columns_in(self) -> Tuple[Set, Set]: 

190 """Get the set of input columns required for matching. 

191 

192 Returns 

193 ------- 

194 columns_ref : `set` [`str`] 

195 The set of required input catalog column names. 

196 columns_target : `set` [`str`] 

197 The set of required target catalog column names. 

198 """ 

199 try: 

200 columns_ref, columns_target = (self.match_tract_catalog.columns_in_ref, 

201 self.match_tract_catalog.columns_in_target) 

202 except AttributeError as err: 

203 raise RuntimeError(f'{__class__}.match_tract_catalog must have columns_in_ref and' 

204 f' columns_in_target attributes: {err}') from None 

205 return set(columns_ref), set(columns_target) 

206 

207 

208class MatchTractCatalogTask(pipeBase.PipelineTask): 

209 """Match sources in a reference tract catalog with those in a target catalog. 

210 """ 

211 ConfigClass = MatchTractCatalogConfig 

212 _DefaultName = "MatchTractCatalog" 

213 

214 def __init__(self, initInputs, **kwargs): 

215 super().__init__(initInputs=initInputs, **kwargs) 

216 self.makeSubtask("match_tract_catalog") 

217 

218 def runQuantum(self, butlerQC, inputRefs, outputRefs): 

219 inputs = butlerQC.get(inputRefs) 

220 columns_ref, columns_target = self.config.get_columns_in() 

221 skymap = inputs.pop("skymap") 

222 

223 outputs = self.run( 

224 catalog_ref=inputs['cat_ref'].get(parameters={'columns': columns_ref}), 

225 catalog_target=inputs['cat_target'].get(parameters={'columns': columns_target}), 

226 wcs=skymap[butlerQC.quantum.dataId["tract"]].wcs, 

227 ) 

228 butlerQC.put(outputs, outputRefs) 

229 

230 def run( 

231 self, 

232 catalog_ref: pd.DataFrame | astropy.table.Table, 

233 catalog_target: pd.DataFrame | astropy.table.Table, 

234 wcs: afwGeom.SkyWcs = None, 

235 ) -> pipeBase.Struct: 

236 """Match sources in a reference tract catalog with a target catalog. 

237 

238 Parameters 

239 ---------- 

240 catalog_ref : `pandas.DataFrame` | `astropy.table.Table` 

241 A reference catalog to match objects/sources from. 

242 catalog_target : `pandas.DataFrame` | `astropy.table.Table` 

243 A target catalog to match reference objects/sources to. 

244 wcs : `lsst.afw.image.SkyWcs` 

245 A coordinate system to convert catalog positions to sky coordinates, 

246 if necessary. 

247 

248 Returns 

249 ------- 

250 retStruct : `lsst.pipe.base.Struct` 

251 A struct with output_ref and output_target attribute containing the 

252 output matched catalogs. 

253 """ 

254 output = self.match_tract_catalog.run(catalog_ref, catalog_target, wcs=wcs) 

255 if output.exceptions: 

256 self.log.warning('Exceptions: %s', output.exceptions) 

257 retStruct = pipeBase.Struct(cat_output_ref=output.cat_output_ref, 

258 cat_output_target=output.cat_output_target) 

259 return retStruct