Coverage for python / lsst / pipe / tasks / schemaUtils.py: 12%
92 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 09:21 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 09:21 +0000
1# This file is part of pipe_tasks.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
23"""Utilities for working with sdm_schemas.
24"""
25__all__ = ("convertDataFrameToSdmSchema", "readSdmSchemaFile",
26 "dropEmptyColumns", "make_empty_catalog", "checkSdmSchemaColumns",
27 "checkDataFrameAgainstSdmSchema")
29from collections.abc import Mapping
30import os
32import felis.datamodel
33import numpy as np
34import pandas as pd
35from astropy.table import Table
38# The first entry in the returned mapping is for nullable columns,
39# the second entry is for non-nullable columns.
40_dtype_map: Mapping[felis.datamodel.DataType, tuple[str, str]] = {
41 felis.datamodel.DataType.double: ("float64", "float64"), # Cassandra utilities need np.nan not pd.NA
42 felis.datamodel.DataType.float: ("float32", "float32"), # Cassandra utilities need np.nan not pd.NA
43 felis.datamodel.DataType.timestamp: ("datetime64[ms]", "datetime64[ms]"),
44 felis.datamodel.DataType.long: ("Int64", "int64"),
45 felis.datamodel.DataType.int: ("Int32", "int32"),
46 felis.datamodel.DataType.short: ("Int16", "int16"),
47 felis.datamodel.DataType.byte: ("Int8", "int8"),
48 felis.datamodel.DataType.binary: ("object", "object"),
49 felis.datamodel.DataType.char: ("object", "object"),
50 felis.datamodel.DataType.text: ("object", "object"),
51 felis.datamodel.DataType.string: ("object", "object"),
52 felis.datamodel.DataType.unicode: ("object", "object"),
53 felis.datamodel.DataType.boolean: ("boolean", "bool"),
54}
57def column_dtype(felis_type: felis.datamodel.DataType, nullable=False) -> str:
58 """Return Pandas data type for a given Felis column type.
60 Parameters
61 ----------
62 felis_type : `felis.datamodel.DataType`
63 Felis type, on of the enums defined in `felis.datamodel` module.
65 Returns
66 -------
67 column_dtype : `type` or `str`
68 Type that can be used for columns in Pandas.
70 Raises
71 ------
72 TypeError
73 Raised if type is cannot be handled.
74 """
75 try:
76 return _dtype_map[felis_type][0] if nullable else _dtype_map[felis_type][1]
77 except KeyError:
78 raise TypeError(f"Unexpected Felis type: {felis_type}")
81def readSdmSchemaFile(schemaFile: str):
82 """Read a schema file in YAML format.
84 Parameters
85 ----------
86 schemaFile : `str`
87 Fully specified path to the file to be read.
89 Returns
90 -------
91 schemaTable : dict[str, felis.datamodel.Table]
92 A dict of the schemas in the given table defined in the specified file.
94 Raises
95 ------
96 ValueError
97 If the schema file can't be parsed.
98 """
99 schemaFile = os.path.expandvars(schemaFile)
100 felis_schema = felis.datamodel.Schema.from_uri(schemaFile, context={"id_generation": True})
101 schemaTable = {}
103 for singleTable in felis_schema.tables:
104 schemaTable[singleTable.name] = singleTable
105 return schemaTable
108def checkSdmSchemaColumns(schema, colNames, tableName):
109 """Check if supplied column names exists in the schema.
111 Parameters
112 ----------
113 schema : `dict` [`str`, `felis.datamodel.Schema`]
114 Dictionary of Schemas from ``sdm_schemas`` containing the table definition to use.
115 colNames : `list` of ``str`
116 Names of the columns to check for in the table.
117 tableName : `str`
118 Name of the table in the schema to use.
120 Returns
121 -------
122 missing : `list` of `str`
123 All column names that are not in the schema
124 """
125 table = schema[tableName]
126 missing = []
128 names = [columnDef.name for columnDef in table.columns]
129 for col in colNames:
130 if col not in names:
131 missing.append(col)
132 return missing
135def checkDataFrameAgainstSdmSchema(schema, sourceTable, tableName):
136 """Force a table to conform to the supplied schema.
138 This method uses the table definitions in ``sdm_schemas`` to load the
139 schema.
141 Parameters
142 ----------
143 schema : `dict` [`str`, `felis.datamodel.Schema`]
144 Dictionary of Schemas from ``sdm_schemas`` containing the table definition to use.
145 sourceTable : `pandas.DataFrame`
146 The input table to check.
147 tableName : `str`
148 Name of the table in the schema to use.
149 """
150 table = schema[tableName]
152 for columnDef in table.columns:
153 dtype = column_dtype(columnDef.datatype, nullable=columnDef.nullable)
154 if columnDef.name in sourceTable.columns:
155 if sourceTable[columnDef.name].dtype != dtype:
156 raise ValueError(f"Column {columnDef.name} dtype {sourceTable[columnDef.name].dtype}"
157 f" does not match schema dtype of {dtype}")
158 else:
159 raise ValueError(f"Column {columnDef.name} is missing from the table.")
162def convertDataFrameToSdmSchema(schema, sourceTable, tableName, skipIndex=False):
163 """Force a table to conform to the schema defined by the SDM schema.
165 Parameters
166 ----------
167 schema : `dict` [`str`, `felis.datamodel.Schema`]
168 Dictionary of Schemas from ``sdm_schemas`` containing the table definition to use.
169 sourceTable : `pandas.DataFrame`
170 The input table to convert.
171 tableName : `str`
172 Name of the table in the schema to use.
174 Returns
175 -------
176 `pandas.DataFrame`
177 A table with the correct schema and data copied from
178 the input ``sourceTable``.
179 """
180 if sourceTable.empty:
181 make_empty_catalog(schema, tableName)
182 table = schema[tableName]
184 data = {}
185 nSrc = len(sourceTable)
186 # Check for multiIndex
187 if skipIndex:
188 indexNames = None
189 elif len(sourceTable.index.names) == 1:
190 indexNames = sourceTable.index.name
191 else:
192 indexNames = sourceTable.index.names
193 if indexNames:
194 sourceTable.reset_index(inplace=True)
196 for columnDef in table.columns:
197 dtype = column_dtype(columnDef.datatype, nullable=columnDef.nullable)
198 if columnDef.name in sourceTable.columns:
199 col = sourceTable[columnDef.name]
200 if not columnDef.nullable:
201 col = col.fillna(0)
202 data[columnDef.name] = pd.Series(col, dtype=dtype,
203 index=sourceTable.index)
204 else:
205 if columnDef.nullable:
206 try:
207 data[columnDef.name] = pd.Series([pd.NA]*nSrc, dtype=dtype, index=sourceTable.index)
208 except TypeError:
209 data[columnDef.name] = pd.Series([np.nan]*nSrc, dtype=dtype, index=sourceTable.index)
210 else:
211 data[columnDef.name] = pd.Series([0]*nSrc, dtype=dtype, index=sourceTable.index)
212 df = pd.DataFrame(data)
213 if indexNames:
214 df.set_index(indexNames, drop=True, inplace=True)
215 return df
218def convertTableToSdmSchema(schema, sourceTable, tableName):
219 """Force an Astropy table to conform to the schema defined by the SDM schema.
221 Parameters
222 ----------
223 schema : `dict` [`str`, `felis.datamodel.Schema`]
224 Dictionary of Schemas from ``sdm_schemas`` containing the table definition to use.
225 sourceTable : `astropy.table.Table`
226 The input table to convert.
227 tableName : `str`
228 Name of the table in the schema to use.
230 Returns
231 -------
232 `astropy.table.Table`
233 A table with the correct schema and data copied from
234 the input ``sourceTable``.
235 """
236 table = schema[tableName]
238 data = {}
239 nSrc = len(sourceTable)
241 for columnDef in table.columns:
242 dtype = column_dtype(columnDef.datatype, nullable=columnDef.nullable)
243 if columnDef.name in sourceTable.columns:
244 data[columnDef.name] = Table.Column(sourceTable[columnDef.name], dtype=dtype.lower())
245 else:
246 if columnDef.nullable:
247 try:
248 data[columnDef.name] = Table.Column([pd.NA]*nSrc, dtype=object)
249 except TypeError:
250 data[columnDef.name] = Table.Column([pd.nan]*nSrc, dtype=dtype)
251 else:
252 data[columnDef.name] = Table.Column([0]*nSrc, dtype=dtype)
253 return Table(data)
256def dropEmptyColumns(schema, sourceTable, tableName):
257 """Drop empty columns that are nullable.
259 Parameters
260 ----------
261 schema : `dict` [`str`, `felis.datamodel.Schema`]
262 Dictionary of Schemas from ``sdm_schemas`` containing the table definition to use.
263 sourceTable : `pandas.DataFrame`
264 The input table to remove missing data columns from.
265 tableName : `str`
266 Name of the table in the schema to use.
268 Returns
269 -------
270 `pandas.DataFrame`
271 The table with columns that are missing and nullable dropped.
272 """
273 table = schema[tableName]
275 nullableList = [columnDef.name for columnDef in table.columns if columnDef.nullable]
276 nullColumns = sourceTable.isnull().all()
277 nullColNames = nullColumns[nullColumns].index.tolist()
278 dropColumns = list(set(nullColNames) & set(nullableList))
279 return sourceTable.drop(columns=dropColumns)
282def make_empty_catalog(schema, tableName):
283 """Make an empty catalog for a table with a given name.
285 Parameters
286 ----------
287 schema : `dict` [`str`, `felis.datamodel.Schema`]
288 Dictionary of Schemas from ``sdm_schemas`` containing the table definition to use.
289 tableName : `str`
290 Name of the table in the schema to use.
292 Returns
293 -------
294 catalog : `pandas.DataFrame`
295 An empty catalog.
296 """
297 table = schema[tableName]
299 data = {
300 columnDef.name: pd.Series(dtype=column_dtype(columnDef.datatype, nullable=columnDef.nullable))
301 for columnDef in table.columns
302 }
303 return pd.DataFrame(data)