Coverage for python / lsst / pipe / tasks / consolidateSsTables.py: 0%

78 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-06 08:52 +0000

1# This file is part of pipe_tasks. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22__all__ = ["ConsolidateSsTablesConfig", "ConsolidateSsTablesTask", "ConsolidateSsTablesConnections"] 

23 

24import astropy.table as tb 

25import astropy.units as u 

26import numpy as np 

27import warnings 

28 

29import lsst.pex.config as pexConfig 

30import lsst.pipe.base as pipeBase 

31import lsst.pipe.base.connectionTypes as cT 

32from lsst.utils.timer import timeMethod 

33from .ssp.ssobject import DIA_COLUMNS, compute_ssobject 

34 

35warnings.filterwarnings("ignore") 

36 

37 

38class ConsolidateSsTablesConnections( 

39 pipeBase.PipelineTaskConnections, 

40 dimensions=("skymap",), 

41 defaultTemplates={"coaddName": "goodSeeing", "fakesType": ""}, 

42): 

43 inputCatalogs = cT.Input( 

44 doc="associated ssSources from all tract-patches.", 

45 name="{fakesType}{coaddName}Diff_assocSsSrcTable", 

46 storageClass="ArrowAstropy", 

47 dimensions=("tract", "patch"), 

48 multiple=True, 

49 ) 

50 mpcorb = cT.Input( 

51 doc="Minor Planet Center orbit table used for association", 

52 name="mpcorb", 

53 storageClass="ArrowAstropy", 

54 dimensions=(), 

55 ) 

56 ssSourceTable = cT.Output( 

57 doc="", 

58 name="{fakesType}{coaddName}Diff_ssSrcTable", 

59 storageClass="ArrowAstropy", 

60 dimensions=(), 

61 ) 

62 ssObjectTable = cT.Output( 

63 doc="", 

64 name="{fakesType}{coaddName}Diff_ssObjTable", 

65 storageClass="ArrowAstropy", 

66 dimensions=(), 

67 ) 

68 

69 

70class ConsolidateSsTablesConfig( 

71 pipeBase.PipelineTaskConfig, pipelineConnections=ConsolidateSsTablesConnections 

72): 

73 """Config for ConsolidateSsTablesTask""" 

74 

75 hg12FixedG12 = pexConfig.Field( 

76 dtype=float, 

77 default=None, 

78 optional=True, 

79 doc="If set, fix the G12 slope parameter to this value " 

80 "and only fit H. If None (default), both H and G12 " 

81 "are fit.", 

82 ) 

83 

84 hg12MagSigmaFloor = pexConfig.Field( 

85 dtype=float, 

86 default=0.0, 

87 doc="Systematic magnitude error floor (mag) added in " 

88 "quadrature to measurement errors before HG12 " 

89 "fitting.", 

90 ) 

91 

92 hg12NSigmaClip = pexConfig.Field( 

93 dtype=float, 

94 default=None, 

95 optional=True, 

96 doc="If set, reject outliers beyond this many sigma " 

97 "after an initial robust fit. If None, no clipping.", 

98 ) 

99 

100 

101class ConsolidateSsTablesTask(pipeBase.PipelineTask): 

102 """Consolidate per-patch ssSource tables into a single table. 

103 Create ssObject table 

104 TODO (DM-49451): Fit per-object parameters 

105 TODO (DM-49453): Generate MPCORB table. 

106 """ 

107 

108 ConfigClass = ConsolidateSsTablesConfig 

109 _DefaultName = "consolidateSsTables" 

110 

111 @timeMethod 

112 def run(self, inputCatalogs, mpcorb): 

113 """Concatenate per-patch ssSource tables. 

114 Generate ssObject table. 

115 

116 Parameters 

117 ---------- 

118 inputCatalogs `list` of `astropy.table.Table`: 

119 All per-patch ssSource Tables 

120 

121 Returns 

122 ------- 

123 output : `lsst.pipe.base.Struct` 

124 Results struct with attributes: 

125 

126 ``sSourceTable`` 

127 Table of ssSources. 

128 (`astropy.table.Table`) 

129 ``ssObjectTable`` 

130 Table of ssObjects 

131 (`astropy.table.Table`). 

132 """ 

133 self.log.info("Concatenating %s per-patch ssSource Tables", len(inputCatalogs)) 

134 ssSourceTable = tb.vstack(inputCatalogs) 

135 self.log.info( 

136 f"Done. {len(ssSourceTable)} observations, {np.unique(ssSourceTable['ssObjectId']).size} objects." 

137 ) 

138 

139 # Compatibility for pre RFC-1138 ss_source_associated tables. 

140 # To be removed in DM-53466. 

141 if "heliocentricDist" in ssSourceTable.colnames: 

142 arr = ssSourceTable["ssObjectId"] + 0x20000000_00000000 # leading whitespace 

143 arr_s8 = arr.byteswap().view(arr.dtype.newbyteorder()).view("S8") 

144 ssSourceTable["designation"] = np.char.lstrip(arr_s8) 

145 # Distances → convert km → AU 

146 au_in_km = (1 * u.au).to(u.km).value 

147 ssSourceTable.rename_column("topocentricDist", "topoRange") 

148 ssSourceTable["topoRange"] /= au_in_km 

149 ssSourceTable.rename_column("heliocentricDist", "helioRange") 

150 ssSourceTable["helioRange"] /= au_in_km 

151 

152 ssSourceTable.rename_column("heliocentricX", "helio_x") 

153 ssSourceTable["helio_x"] /= au_in_km 

154 ssSourceTable.rename_column("heliocentricY", "helio_y") 

155 ssSourceTable["helio_y"] /= au_in_km 

156 ssSourceTable.rename_column("heliocentricZ", "helio_z") 

157 ssSourceTable["helio_z"] /= au_in_km 

158 

159 ssSourceTable.rename_column("topocentricX", "topo_x") 

160 ssSourceTable["topo_x"] /= au_in_km 

161 ssSourceTable.rename_column("topocentricY", "topo_y") 

162 ssSourceTable["topo_y"] /= au_in_km 

163 ssSourceTable.rename_column("topocentricZ", "topo_z") 

164 ssSourceTable["topo_z"] /= au_in_km 

165 

166 # Velocities (no unit change, just renaming) 

167 ssSourceTable.rename_column("heliocentricVX", "helio_vx") 

168 ssSourceTable.rename_column("heliocentricVY", "helio_vy") 

169 ssSourceTable.rename_column("heliocentricVZ", "helio_vz") 

170 

171 ssSourceTable.rename_column("topocentricVX", "topo_vx") 

172 ssSourceTable.rename_column("topocentricVY", "topo_vy") 

173 ssSourceTable.rename_column("topocentricVZ", "topo_vz") 

174 

175 # the rest 

176 ssSourceTable.rename_column("residualRa", "ephOffsetRa") 

177 ssSourceTable.rename_column("residualDec", "ephOffsetDec") 

178 ssSourceTable.rename_column("eclipticLambda", "eclLambda") 

179 ssSourceTable.rename_column("eclipticBeta", "eclBeta") 

180 ssSourceTable.rename_column("galacticL", "galLon") 

181 ssSourceTable.rename_column("galacticB", "galLat") 

182 

183 # if we're loading the old-style catalog, we require packed_primary_provisional_designation 

184 # to be in the mpcorb schema (and we'll pretend that it's actually unpacked) 

185 if mpcorb is not None: 

186 mpcorb["unpacked_primary_provisional_designation"] = mpcorb[ 

187 "packed_primary_provisional_designation" 

188 ] 

189 

190 if mpcorb is not None: 

191 self.log.info(f"mpcorb loaded ({len(mpcorb)} objects, {len(mpcorb.columns)} columns)") 

192 else: 

193 self.log.info("mpcorb not loaded.") 

194 

195 # extract the DiaSource subset and remove it from ssSourceTable 

196 diaSource = tb.Table() 

197 for c in DIA_COLUMNS: 

198 src = c if c == "diaSourceId" else f"DIA_{c}" 

199 diaSource[c] = ssSourceTable[src] 

200 if src != "diaSourceId": 

201 del ssSourceTable[src] 

202 

203 # build the SSObject table 

204 self.log.info( 

205 "HG12 fit config: hg12FixedG12=%s, " 

206 "hg12MagSigmaFloor=%s, hg12NSigmaClip=%s", 

207 self.config.hg12FixedG12, 

208 self.config.hg12MagSigmaFloor, 

209 self.config.hg12NSigmaClip, 

210 ) 

211 ssSourceTable.sort("ssObjectId") 

212 mpcorb = mpcorb.to_pandas() if isinstance(mpcorb, tb.Table) else mpcorb 

213 ssObjectTable = compute_ssobject( 

214 ssSourceTable.to_pandas(), diaSource.to_pandas(), mpcorb, 

215 fixedG12=self.config.hg12FixedG12, 

216 magSigmaFloor=self.config.hg12MagSigmaFloor, 

217 nSigmaClip=self.config.hg12NSigmaClip, 

218 ) 

219 ssObjectTable = tb.Table(ssObjectTable) 

220 

221 return pipeBase.Struct( 

222 ssSourceTable=ssSourceTable, 

223 ssObjectTable=ssObjectTable, 

224 )