23"""Utilities for working with sdm_schemas.
25__all__ = (
"convertDataFrameToSdmSchema",
"readSdmSchemaFile",
26 "dropEmptyColumns",
"make_empty_catalog",
"checkSdmSchemaColumns",
27 "checkDataFrameAgainstSdmSchema")
29from collections.abc
import Mapping
35from astropy.table
import Table
40_dtype_map: Mapping[felis.datamodel.DataType, tuple[str, str]] = {
41 felis.datamodel.DataType.double: (
"float64",
"float64"),
42 felis.datamodel.DataType.float: (
"float32",
"float32"),
43 felis.datamodel.DataType.timestamp: (
"datetime64[ms]",
"datetime64[ms]"),
44 felis.datamodel.DataType.long: (
"Int64",
"int64"),
45 felis.datamodel.DataType.int: (
"Int32",
"int32"),
46 felis.datamodel.DataType.short: (
"Int16",
"int16"),
47 felis.datamodel.DataType.byte: (
"Int8",
"int8"),
48 felis.datamodel.DataType.binary: (
"object",
"object"),
49 felis.datamodel.DataType.char: (
"object",
"object"),
50 felis.datamodel.DataType.text: (
"object",
"object"),
51 felis.datamodel.DataType.string: (
"object",
"object"),
52 felis.datamodel.DataType.unicode: (
"object",
"object"),
53 felis.datamodel.DataType.boolean: (
"boolean",
"bool"),
57def column_dtype(felis_type: felis.datamodel.DataType, nullable=
False) -> str:
58 """Return Pandas data type for a given Felis column type.
62 felis_type : `felis.datamodel.DataType`
63 Felis type, on of the enums defined in `felis.datamodel` module.
67 column_dtype : `type` or `str`
68 Type that can be used for columns in Pandas.
73 Raised if type is cannot be handled.
76 return _dtype_map[felis_type][0]
if nullable
else _dtype_map[felis_type][1]
78 raise TypeError(f
"Unexpected Felis type: {felis_type}")
82 """Read a schema file in YAML format.
87 Fully specified path to the file to be read.
91 schemaTable : dict[str, felis.datamodel.Table]
92 A dict of the schemas in the given table defined in the specified file.
97 If the schema file can't be parsed.
99 schemaFile = os.path.expandvars(schemaFile)
100 felis_schema = felis.datamodel.Schema.from_uri(schemaFile, context={
"id_generation":
True})
103 for singleTable
in felis_schema.tables:
104 schemaTable[singleTable.name] = singleTable
109 """Check if supplied column names exists in the schema.
113 schema : `dict` [`str`, `felis.datamodel.Schema`]
114 Dictionary of Schemas from ``sdm_schemas`` containing the table definition to use.
115 colNames : `list` of ``str`
116 Names of the columns to check for in the table.
118 Name of the table in the schema to use.
122 missing : `list` of `str`
123 All column names that are not in the schema
125 table = schema[tableName]
128 names = [columnDef.name
for columnDef
in table.columns]
136 """Force a table to conform to the supplied schema.
138 This method uses the table definitions in ``sdm_schemas`` to load the
143 schema : `dict` [`str`, `felis.datamodel.Schema`]
144 Dictionary of Schemas from ``sdm_schemas`` containing the table definition to use.
145 sourceTable : `pandas.DataFrame`
146 The input table to check.
148 Name of the table in the schema to use.
150 table = schema[tableName]
152 for columnDef
in table.columns:
153 dtype =
column_dtype(columnDef.datatype, nullable=columnDef.nullable)
154 if columnDef.name
in sourceTable.columns:
155 if sourceTable[columnDef.name].dtype != dtype:
156 raise ValueError(f
"Column {columnDef.name} dtype {sourceTable[columnDef.name].dtype}"
157 f
" does not match schema dtype of {dtype}")
159 raise ValueError(f
"Column {columnDef.name} is missing from the table.")
163 """Force a table to conform to the schema defined by the SDM schema.
167 schema : `dict` [`str`, `felis.datamodel.Schema`]
168 Dictionary of Schemas from ``sdm_schemas`` containing the table definition to use.
169 sourceTable : `pandas.DataFrame`
170 The input table to convert.
172 Name of the table in the schema to use.
177 A table with the correct schema and data copied from
178 the input ``sourceTable``.
180 if sourceTable.empty:
182 table = schema[tableName]
185 nSrc = len(sourceTable)
189 elif len(sourceTable.index.names) == 1:
190 indexNames = sourceTable.index.name
192 indexNames = sourceTable.index.names
194 sourceTable.reset_index(inplace=
True)
196 for columnDef
in table.columns:
197 dtype =
column_dtype(columnDef.datatype, nullable=columnDef.nullable)
198 if columnDef.name
in sourceTable.columns:
199 col = sourceTable[columnDef.name]
200 if not columnDef.nullable:
202 data[columnDef.name] = pd.Series(col, dtype=dtype,
203 index=sourceTable.index)
205 if columnDef.nullable:
207 data[columnDef.name] = pd.Series([pd.NA]*nSrc, dtype=dtype, index=sourceTable.index)
209 data[columnDef.name] = pd.Series([np.nan]*nSrc, dtype=dtype, index=sourceTable.index)
211 data[columnDef.name] = pd.Series([0]*nSrc, dtype=dtype, index=sourceTable.index)
212 df = pd.DataFrame(data)
214 df.set_index(indexNames, drop=
True, inplace=
True)
219 """Force an Astropy table to conform to the schema defined by the SDM schema.
223 schema : `dict` [`str`, `felis.datamodel.Schema`]
224 Dictionary of Schemas from ``sdm_schemas`` containing the table definition to use.
225 sourceTable : `astropy.table.Table`
226 The input table to convert.
228 Name of the table in the schema to use.
232 `astropy.table.Table`
233 A table with the correct schema and data copied from
234 the input ``sourceTable``.
236 table = schema[tableName]
239 nSrc = len(sourceTable)
241 for columnDef
in table.columns:
242 dtype =
column_dtype(columnDef.datatype, nullable=columnDef.nullable)
243 if columnDef.name
in sourceTable.columns:
244 data[columnDef.name] = Table.Column(sourceTable[columnDef.name], dtype=dtype.lower())
246 if columnDef.nullable:
248 data[columnDef.name] = Table.Column([pd.NA]*nSrc, dtype=object)
250 data[columnDef.name] = Table.Column([pd.nan]*nSrc, dtype=dtype)
252 data[columnDef.name] = Table.Column([0]*nSrc, dtype=dtype)
257 """Drop empty columns that are nullable.
261 schema : `dict` [`str`, `felis.datamodel.Schema`]
262 Dictionary of Schemas from ``sdm_schemas`` containing the table definition to use.
263 sourceTable : `pandas.DataFrame`
264 The input table to remove missing data columns from.
266 Name of the table in the schema to use.
271 The table with columns that are missing and nullable dropped.
273 table = schema[tableName]
275 nullableList = [columnDef.name
for columnDef
in table.columns
if columnDef.nullable]
276 nullColumns = sourceTable.isnull().all()
277 nullColNames = nullColumns[nullColumns].index.tolist()
278 dropColumns = list(set(nullColNames) & set(nullableList))
279 return sourceTable.drop(columns=dropColumns)
283 """Make an empty catalog for a table with a given name.
287 schema : `dict` [`str`, `felis.datamodel.Schema`]
288 Dictionary of Schemas from ``sdm_schemas`` containing the table definition to use.
290 Name of the table in the schema to use.
294 catalog : `pandas.DataFrame`
297 table = schema[tableName]
300 columnDef.name: pd.Series(dtype=
column_dtype(columnDef.datatype, nullable=columnDef.nullable))
301 for columnDef
in table.columns
303 return pd.DataFrame(data)
convertTableToSdmSchema(schema, sourceTable, tableName)
checkDataFrameAgainstSdmSchema(schema, sourceTable, tableName)
str column_dtype(felis.datamodel.DataType felis_type, nullable=False)
checkSdmSchemaColumns(schema, colNames, tableName)
dropEmptyColumns(schema, sourceTable, tableName)
convertDataFrameToSdmSchema(schema, sourceTable, tableName, skipIndex=False)
readSdmSchemaFile(str schemaFile)
make_empty_catalog(schema, tableName)