Coverage for python / lsst / pipe / tasks / schemaUtils.py: 12%

92 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-23 08:45 +0000

1# This file is part of pipe_tasks. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22 

23"""Utilities for working with sdm_schemas. 

24""" 

25__all__ = ("convertDataFrameToSdmSchema", "readSdmSchemaFile", 

26 "dropEmptyColumns", "make_empty_catalog", "checkSdmSchemaColumns", 

27 "checkDataFrameAgainstSdmSchema") 

28 

29from collections.abc import Mapping 

30import os 

31 

32import felis.datamodel 

33import numpy as np 

34import pandas as pd 

35from astropy.table import Table 

36 

37 

38# The first entry in the returned mapping is for nullable columns, 

39# the second entry is for non-nullable columns. 

40_dtype_map: Mapping[felis.datamodel.DataType, tuple[str, str]] = { 

41 felis.datamodel.DataType.double: ("float64", "float64"), # Cassandra utilities need np.nan not pd.NA 

42 felis.datamodel.DataType.float: ("float32", "float32"), # Cassandra utilities need np.nan not pd.NA 

43 felis.datamodel.DataType.timestamp: ("datetime64[ms]", "datetime64[ms]"), 

44 felis.datamodel.DataType.long: ("Int64", "int64"), 

45 felis.datamodel.DataType.int: ("Int32", "int32"), 

46 felis.datamodel.DataType.short: ("Int16", "int16"), 

47 felis.datamodel.DataType.byte: ("Int8", "int8"), 

48 felis.datamodel.DataType.binary: ("object", "object"), 

49 felis.datamodel.DataType.char: ("object", "object"), 

50 felis.datamodel.DataType.text: ("object", "object"), 

51 felis.datamodel.DataType.string: ("object", "object"), 

52 felis.datamodel.DataType.unicode: ("object", "object"), 

53 felis.datamodel.DataType.boolean: ("boolean", "bool"), 

54} 

55 

56 

57def column_dtype(felis_type: felis.datamodel.DataType, nullable=False) -> str: 

58 """Return Pandas data type for a given Felis column type. 

59 

60 Parameters 

61 ---------- 

62 felis_type : `felis.datamodel.DataType` 

63 Felis type, on of the enums defined in `felis.datamodel` module. 

64 

65 Returns 

66 ------- 

67 column_dtype : `type` or `str` 

68 Type that can be used for columns in Pandas. 

69 

70 Raises 

71 ------ 

72 TypeError 

73 Raised if type is cannot be handled. 

74 """ 

75 try: 

76 return _dtype_map[felis_type][0] if nullable else _dtype_map[felis_type][1] 

77 except KeyError: 

78 raise TypeError(f"Unexpected Felis type: {felis_type}") 

79 

80 

81def readSdmSchemaFile(schemaFile: str): 

82 """Read a schema file in YAML format. 

83 

84 Parameters 

85 ---------- 

86 schemaFile : `str` 

87 Fully specified path to the file to be read. 

88 

89 Returns 

90 ------- 

91 schemaTable : dict[str, felis.datamodel.Table] 

92 A dict of the schemas in the given table defined in the specified file. 

93 

94 Raises 

95 ------ 

96 ValueError 

97 If the schema file can't be parsed. 

98 """ 

99 schemaFile = os.path.expandvars(schemaFile) 

100 felis_schema = felis.datamodel.Schema.from_uri(schemaFile, context={"id_generation": True}) 

101 schemaTable = {} 

102 

103 for singleTable in felis_schema.tables: 

104 schemaTable[singleTable.name] = singleTable 

105 return schemaTable 

106 

107 

108def checkSdmSchemaColumns(schema, colNames, tableName): 

109 """Check if supplied column names exists in the schema. 

110 

111 Parameters 

112 ---------- 

113 schema : `dict` [`str`, `felis.datamodel.Schema`] 

114 Dictionary of Schemas from ``sdm_schemas`` containing the table definition to use. 

115 colNames : `list` of ``str` 

116 Names of the columns to check for in the table. 

117 tableName : `str` 

118 Name of the table in the schema to use. 

119 

120 Returns 

121 ------- 

122 missing : `list` of `str` 

123 All column names that are not in the schema 

124 """ 

125 table = schema[tableName] 

126 missing = [] 

127 

128 names = [columnDef.name for columnDef in table.columns] 

129 for col in colNames: 

130 if col not in names: 

131 missing.append(col) 

132 return missing 

133 

134 

135def checkDataFrameAgainstSdmSchema(schema, sourceTable, tableName): 

136 """Force a table to conform to the supplied schema. 

137 

138 This method uses the table definitions in ``sdm_schemas`` to load the 

139 schema. 

140 

141 Parameters 

142 ---------- 

143 schema : `dict` [`str`, `felis.datamodel.Schema`] 

144 Dictionary of Schemas from ``sdm_schemas`` containing the table definition to use. 

145 sourceTable : `pandas.DataFrame` 

146 The input table to check. 

147 tableName : `str` 

148 Name of the table in the schema to use. 

149 """ 

150 table = schema[tableName] 

151 

152 for columnDef in table.columns: 

153 dtype = column_dtype(columnDef.datatype, nullable=columnDef.nullable) 

154 if columnDef.name in sourceTable.columns: 

155 if sourceTable[columnDef.name].dtype != dtype: 

156 raise ValueError(f"Column {columnDef.name} dtype {sourceTable[columnDef.name].dtype}" 

157 f" does not match schema dtype of {dtype}") 

158 else: 

159 raise ValueError(f"Column {columnDef.name} is missing from the table.") 

160 

161 

162def convertDataFrameToSdmSchema(schema, sourceTable, tableName, skipIndex=False): 

163 """Force a table to conform to the schema defined by the SDM schema. 

164 

165 Parameters 

166 ---------- 

167 schema : `dict` [`str`, `felis.datamodel.Schema`] 

168 Dictionary of Schemas from ``sdm_schemas`` containing the table definition to use. 

169 sourceTable : `pandas.DataFrame` 

170 The input table to convert. 

171 tableName : `str` 

172 Name of the table in the schema to use. 

173 

174 Returns 

175 ------- 

176 `pandas.DataFrame` 

177 A table with the correct schema and data copied from 

178 the input ``sourceTable``. 

179 """ 

180 if sourceTable.empty: 

181 make_empty_catalog(schema, tableName) 

182 table = schema[tableName] 

183 

184 data = {} 

185 nSrc = len(sourceTable) 

186 # Check for multiIndex 

187 if skipIndex: 

188 indexNames = None 

189 elif len(sourceTable.index.names) == 1: 

190 indexNames = sourceTable.index.name 

191 else: 

192 indexNames = sourceTable.index.names 

193 if indexNames: 

194 sourceTable.reset_index(inplace=True) 

195 

196 for columnDef in table.columns: 

197 dtype = column_dtype(columnDef.datatype, nullable=columnDef.nullable) 

198 if columnDef.name in sourceTable.columns: 

199 col = sourceTable[columnDef.name] 

200 if not columnDef.nullable: 

201 col = col.fillna(0) 

202 data[columnDef.name] = pd.Series(col, dtype=dtype, 

203 index=sourceTable.index) 

204 else: 

205 if columnDef.nullable: 

206 try: 

207 data[columnDef.name] = pd.Series([pd.NA]*nSrc, dtype=dtype, index=sourceTable.index) 

208 except TypeError: 

209 data[columnDef.name] = pd.Series([np.nan]*nSrc, dtype=dtype, index=sourceTable.index) 

210 else: 

211 data[columnDef.name] = pd.Series([0]*nSrc, dtype=dtype, index=sourceTable.index) 

212 df = pd.DataFrame(data) 

213 if indexNames: 

214 df.set_index(indexNames, drop=True, inplace=True) 

215 return df 

216 

217 

218def convertTableToSdmSchema(schema, sourceTable, tableName): 

219 """Force an Astropy table to conform to the schema defined by the SDM schema. 

220 

221 Parameters 

222 ---------- 

223 schema : `dict` [`str`, `felis.datamodel.Schema`] 

224 Dictionary of Schemas from ``sdm_schemas`` containing the table definition to use. 

225 sourceTable : `astropy.table.Table` 

226 The input table to convert. 

227 tableName : `str` 

228 Name of the table in the schema to use. 

229 

230 Returns 

231 ------- 

232 `astropy.table.Table` 

233 A table with the correct schema and data copied from 

234 the input ``sourceTable``. 

235 """ 

236 table = schema[tableName] 

237 

238 data = {} 

239 nSrc = len(sourceTable) 

240 

241 for columnDef in table.columns: 

242 dtype = column_dtype(columnDef.datatype, nullable=columnDef.nullable) 

243 if columnDef.name in sourceTable.columns: 

244 data[columnDef.name] = Table.Column(sourceTable[columnDef.name], dtype=dtype.lower()) 

245 else: 

246 if columnDef.nullable: 

247 try: 

248 data[columnDef.name] = Table.Column([pd.NA]*nSrc, dtype=object) 

249 except TypeError: 

250 data[columnDef.name] = Table.Column([pd.nan]*nSrc, dtype=dtype) 

251 else: 

252 data[columnDef.name] = Table.Column([0]*nSrc, dtype=dtype) 

253 return Table(data) 

254 

255 

256def dropEmptyColumns(schema, sourceTable, tableName): 

257 """Drop empty columns that are nullable. 

258 

259 Parameters 

260 ---------- 

261 schema : `dict` [`str`, `felis.datamodel.Schema`] 

262 Dictionary of Schemas from ``sdm_schemas`` containing the table definition to use. 

263 sourceTable : `pandas.DataFrame` 

264 The input table to remove missing data columns from. 

265 tableName : `str` 

266 Name of the table in the schema to use. 

267 

268 Returns 

269 ------- 

270 `pandas.DataFrame` 

271 The table with columns that are missing and nullable dropped. 

272 """ 

273 table = schema[tableName] 

274 

275 nullableList = [columnDef.name for columnDef in table.columns if columnDef.nullable] 

276 nullColumns = sourceTable.isnull().all() 

277 nullColNames = nullColumns[nullColumns].index.tolist() 

278 dropColumns = list(set(nullColNames) & set(nullableList)) 

279 return sourceTable.drop(columns=dropColumns) 

280 

281 

282def make_empty_catalog(schema, tableName): 

283 """Make an empty catalog for a table with a given name. 

284 

285 Parameters 

286 ---------- 

287 schema : `dict` [`str`, `felis.datamodel.Schema`] 

288 Dictionary of Schemas from ``sdm_schemas`` containing the table definition to use. 

289 tableName : `str` 

290 Name of the table in the schema to use. 

291 

292 Returns 

293 ------- 

294 catalog : `pandas.DataFrame` 

295 An empty catalog. 

296 """ 

297 table = schema[tableName] 

298 

299 data = { 

300 columnDef.name: pd.Series(dtype=column_dtype(columnDef.datatype, nullable=columnDef.nullable)) 

301 for columnDef in table.columns 

302 } 

303 return pd.DataFrame(data)