Coverage for python / lsst / pipe / tasks / consolidateSsTables.py: 0%

73 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-18 09:04 +0000

1# This file is part of pipe_tasks. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22__all__ = ["ConsolidateSsTablesConfig", "ConsolidateSsTablesTask", "ConsolidateSsTablesConnections"] 

23 

24import astropy.table as tb 

25import astropy.units as u 

26import numpy as np 

27import warnings 

28 

29import lsst.pipe.base as pipeBase 

30import lsst.pipe.base.connectionTypes as cT 

31from lsst.utils.timer import timeMethod 

32from .ssp.ssobject import DIA_COLUMNS, compute_ssobject 

33 

34warnings.filterwarnings("ignore") 

35 

36 

37class ConsolidateSsTablesConnections( 

38 pipeBase.PipelineTaskConnections, 

39 dimensions=("skymap",), 

40 defaultTemplates={"coaddName": "goodSeeing", "fakesType": ""}, 

41): 

42 inputCatalogs = cT.Input( 

43 doc="associated ssSources from all tract-patches.", 

44 name="{fakesType}{coaddName}Diff_assocSsSrcTable", 

45 storageClass="ArrowAstropy", 

46 dimensions=("tract", "patch"), 

47 multiple=True, 

48 ) 

49 mpcorb = cT.Input( 

50 doc="Minor Planet Center orbit table used for association", 

51 name="mpcorb", 

52 storageClass="ArrowAstropy", 

53 dimensions=(), 

54 ) 

55 ssSourceTable = cT.Output( 

56 doc="", 

57 name="{fakesType}{coaddName}Diff_ssSrcTable", 

58 storageClass="ArrowAstropy", 

59 dimensions=(), 

60 ) 

61 ssObjectTable = cT.Output( 

62 doc="", 

63 name="{fakesType}{coaddName}Diff_ssObjTable", 

64 storageClass="ArrowAstropy", 

65 dimensions=(), 

66 ) 

67 

68 

69class ConsolidateSsTablesConfig( 

70 pipeBase.PipelineTaskConfig, pipelineConnections=ConsolidateSsTablesConnections 

71): 

72 """Config for ConsolidateSsTablesTask""" 

73 

74 

75class ConsolidateSsTablesTask(pipeBase.PipelineTask): 

76 """Consolidate per-patch ssSource tables into a single table. 

77 Create ssObject table 

78 TODO (DM-49451): Fit per-object parameters 

79 TODO (DM-49453): Generate MPCORB table. 

80 """ 

81 

82 ConfigClass = ConsolidateSsTablesConfig 

83 _DefaultName = "consolidateSsTables" 

84 

85 @timeMethod 

86 def run(self, inputCatalogs, mpcorb): 

87 """Concatenate per-patch ssSource tables. 

88 Generate ssObject table. 

89 

90 Parameters 

91 ---------- 

92 inputCatalogs `list` of `astropy.table.Table`: 

93 All per-patch ssSource Tables 

94 

95 Returns 

96 ------- 

97 output : `lsst.pipe.base.Struct` 

98 Results struct with attributes: 

99 

100 ``sSourceTable`` 

101 Table of ssSources. 

102 (`astropy.table.Table`) 

103 ``ssObjectTable`` 

104 Table of ssObjects 

105 (`astropy.table.Table`). 

106 """ 

107 self.log.info("Concatenating %s per-patch ssSource Tables", len(inputCatalogs)) 

108 ssSourceTable = tb.vstack(inputCatalogs) 

109 self.log.info( 

110 f"Done. {len(ssSourceTable)} observations, {np.unique(ssSourceTable['ssObjectId']).size} objects." 

111 ) 

112 

113 # Compatibility for pre RFC-1138 ss_source_associated tables. 

114 # To be removed in DM-53466. 

115 if "heliocentricDist" in ssSourceTable.colnames: 

116 arr = ssSourceTable["ssObjectId"] + 0x20000000_00000000 # leading whitespace 

117 arr_s8 = arr.byteswap().view(arr.dtype.newbyteorder()).view("S8") 

118 ssSourceTable["designation"] = np.char.lstrip(arr_s8) 

119 # Distances → convert km → AU 

120 au_in_km = (1 * u.au).to(u.km).value 

121 ssSourceTable.rename_column("topocentricDist", "topoRange") 

122 ssSourceTable["topoRange"] /= au_in_km 

123 ssSourceTable.rename_column("heliocentricDist", "helioRange") 

124 ssSourceTable["helioRange"] /= au_in_km 

125 

126 ssSourceTable.rename_column("heliocentricX", "helio_x") 

127 ssSourceTable["helio_x"] /= au_in_km 

128 ssSourceTable.rename_column("heliocentricY", "helio_y") 

129 ssSourceTable["helio_y"] /= au_in_km 

130 ssSourceTable.rename_column("heliocentricZ", "helio_z") 

131 ssSourceTable["helio_z"] /= au_in_km 

132 

133 ssSourceTable.rename_column("topocentricX", "topo_x") 

134 ssSourceTable["topo_x"] /= au_in_km 

135 ssSourceTable.rename_column("topocentricY", "topo_y") 

136 ssSourceTable["topo_y"] /= au_in_km 

137 ssSourceTable.rename_column("topocentricZ", "topo_z") 

138 ssSourceTable["topo_z"] /= au_in_km 

139 

140 # Velocities (no unit change, just renaming) 

141 ssSourceTable.rename_column("heliocentricVX", "helio_vx") 

142 ssSourceTable.rename_column("heliocentricVY", "helio_vy") 

143 ssSourceTable.rename_column("heliocentricVZ", "helio_vz") 

144 

145 ssSourceTable.rename_column("topocentricVX", "topo_vx") 

146 ssSourceTable.rename_column("topocentricVY", "topo_vy") 

147 ssSourceTable.rename_column("topocentricVZ", "topo_vz") 

148 

149 # the rest 

150 ssSourceTable.rename_column("residualRa", "ephOffsetRa") 

151 ssSourceTable.rename_column("residualDec", "ephOffsetDec") 

152 ssSourceTable.rename_column("eclipticLambda", "eclLambda") 

153 ssSourceTable.rename_column("eclipticBeta", "eclBeta") 

154 ssSourceTable.rename_column("galacticL", "galLon") 

155 ssSourceTable.rename_column("galacticB", "galLat") 

156 

157 # if we're loading the old-style catalog, we require packed_primary_provisional_designation 

158 # to be in the mpcorb schema (and we'll pretend that it's actually unpacked) 

159 if mpcorb is not None: 

160 mpcorb["unpacked_primary_provisional_designation"] = mpcorb[ 

161 "packed_primary_provisional_designation" 

162 ] 

163 

164 if mpcorb is not None: 

165 self.log.info(f"mpcorb loaded ({len(mpcorb)} objects, {len(mpcorb.columns)} columns)") 

166 else: 

167 self.log.info("mpcorb not loaded.") 

168 

169 # extract the DiaSource subset and remove it from ssSourceTable 

170 diaSource = tb.Table() 

171 for c in DIA_COLUMNS: 

172 src = c if c == "diaSourceId" else f"DIA_{c}" 

173 diaSource[c] = ssSourceTable[src] 

174 if src != "diaSourceId": 

175 del ssSourceTable[src] 

176 

177 # build the SSObject table 

178 ssSourceTable.sort("ssObjectId") 

179 mpcorb = mpcorb.to_pandas() if isinstance(mpcorb, tb.Table) else mpcorb 

180 ssObjectTable = compute_ssobject( 

181 ssSourceTable.to_pandas(), diaSource.to_pandas(), mpcorb 

182 ) 

183 ssObjectTable = tb.Table(ssObjectTable) 

184 

185 return pipeBase.Struct( 

186 ssSourceTable=ssSourceTable, 

187 ssObjectTable=ssObjectTable, 

188 )