Coverage for python / lsst / dax / apdb / schema_model.py: 66%
179 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-21 10:35 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-21 10:35 +0000
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22from __future__ import annotations
24__all__ = [
25 "CheckConstraint",
26 "Column",
27 "Constraint",
28 "ExtraDataTypes",
29 "ForeignKeyConstraint",
30 "Index",
31 "Schema",
32 "Table",
33 "UniqueConstraint",
34]
36import dataclasses
37from collections.abc import Iterable, Mapping, MutableMapping
38from enum import Enum
39from typing import Any
41import felis.datamodel
43_Mapping = Mapping[str, Any]
46class ExtraDataTypes(Enum):
47 """Additional column data types that we need in dax_apdb."""
49 UUID = "uuid"
52DataTypes = felis.datamodel.DataType | ExtraDataTypes
55def _strip_keys(map: _Mapping, keys: Iterable[str]) -> _Mapping:
56 """Return a copy of a dictionary with some keys removed."""
57 keys = set(keys)
58 return {key: value for key, value in map.items() if key not in keys}
61def _make_iterable(obj: str | Iterable[str]) -> Iterable[str]:
62 """Make an iterable out of string or list of strings."""
63 if isinstance(obj, str):
64 yield obj
65 else:
66 yield from obj
69_data_type_size: Mapping[DataTypes, int] = {
70 felis.datamodel.DataType.boolean: 1,
71 felis.datamodel.DataType.byte: 1,
72 felis.datamodel.DataType.short: 2,
73 felis.datamodel.DataType.int: 4,
74 felis.datamodel.DataType.long: 8,
75 felis.datamodel.DataType.float: 4,
76 felis.datamodel.DataType.double: 8,
77 felis.datamodel.DataType.char: 1,
78 felis.datamodel.DataType.string: 2, # approximation, depends on character set
79 felis.datamodel.DataType.unicode: 2, # approximation, depends on character set
80 felis.datamodel.DataType.text: 2, # approximation, depends on character set
81 felis.datamodel.DataType.binary: 1,
82 felis.datamodel.DataType.timestamp: 8, # May be different depending on backend
83 ExtraDataTypes.UUID: 16,
84}
87# The first entry in the returned mapping is for nullable columns,
88# the second entry is for non-nullable columns.
89_dtype_map: Mapping[felis.datamodel.DataType, tuple[str, str]] = {
90 felis.datamodel.DataType.double: ("float64", "float64"),
91 felis.datamodel.DataType.float: ("float32", "float32"),
92 felis.datamodel.DataType.timestamp: ("datetime64[ms]", "datetime64[ms]"),
93 felis.datamodel.DataType.long: ("Int64", "int64"),
94 felis.datamodel.DataType.int: ("Int32", "int32"),
95 felis.datamodel.DataType.short: ("Int16", "int16"),
96 felis.datamodel.DataType.byte: ("Int8", "int8"),
97 felis.datamodel.DataType.binary: ("object", "object"),
98 felis.datamodel.DataType.char: ("object", "object"),
99 felis.datamodel.DataType.text: ("object", "object"),
100 felis.datamodel.DataType.string: ("object", "object"),
101 felis.datamodel.DataType.unicode: ("object", "object"),
102 felis.datamodel.DataType.boolean: ("boolean", "bool"),
103}
106@dataclasses.dataclass
107class Column:
108 """Column representation in schema."""
110 name: str
111 """Column name."""
113 id: str
114 """Felis ID for this column."""
116 datatype: DataTypes
117 """Column type, one of the enums defined in DataType."""
119 length: int | None = None
120 """Optional length for string/binary columns"""
122 nullable: bool = True
123 """True for nullable columns."""
125 value: Any = None
126 """Default value for column, can be `None`."""
128 autoincrement: bool | None = None
129 """Unspecified value results in `None`."""
131 description: str | None = None
132 """Column description."""
134 annotations: Mapping[str, Any] = dataclasses.field(default_factory=dict)
135 """Additional annotations for this column."""
137 table: Table | None = None
138 """Table which defines this column, usually not `None`."""
140 @classmethod
141 def from_felis(cls, dm_column: felis.datamodel.Column) -> Column:
142 """Convert Felis column definition into instance of this class.
144 Parameters
145 ----------
146 dm_column : `felis.datamodel.Column`
147 Felis column definition.
149 Returns
150 -------
151 column : `Column`
152 Converted column definition.
153 """
154 column = cls(
155 name=dm_column.name,
156 id=dm_column.id,
157 datatype=dm_column.datatype,
158 length=dm_column.length,
159 value=dm_column.value,
160 description=dm_column.description,
161 nullable=dm_column.nullable if dm_column.nullable is not None else True,
162 autoincrement=dm_column.autoincrement,
163 annotations=_strip_keys(
164 dict(dm_column),
165 ["name", "id", "datatype", "length", "nullable", "value", "autoincrement", "description"],
166 ),
167 )
168 return column
170 def clone(self) -> Column:
171 """Make a clone of self."""
172 return dataclasses.replace(self, table=None)
174 def size(self) -> int:
175 """Return size in bytes of this column.
177 Returns
178 -------
179 size : `int`
180 Size in bytes for this column, typically represents in-memory size
181 of the corresponding data type. May or may not be the same as
182 storage size or wire-level protocol size.
183 """
184 size = _data_type_size[self.datatype]
185 if self.length is not None:
186 size *= self.length
187 return size
189 @property
190 def pandas_type(self) -> str:
191 """Type of this column in pandas.DataFrame (`str`)."""
192 # We do not convert UUID columns to pandas.
193 assert isinstance(self.datatype, felis.datamodel.DataType)
194 # TODO: We have cases of NULLs in existing data for non-nullable
195 # columns (in Cassandra). To avoid errors for such cases we allow all
196 # types to be nullable. We should revisit this at some later time.
197 return _dtype_map[self.datatype][0]
200@dataclasses.dataclass
201class Index:
202 """Index representation."""
204 name: str
205 """index name, can be empty."""
207 id: str
208 """Felis ID for this index."""
210 columns: list[Column] = dataclasses.field(default_factory=list)
211 """List of columns in index, one of the ``columns`` or ``expressions``
212 must be non-empty.
213 """
215 expressions: list[str] = dataclasses.field(default_factory=list)
216 """List of expressions in index, one of the ``columns`` or ``expressions``
217 must be non-empty.
218 """
220 description: str | None = None
221 """Index description."""
223 annotations: Mapping[str, Any] = dataclasses.field(default_factory=dict)
224 """Additional annotations for this index."""
226 @classmethod
227 def from_felis(cls, dm_index: felis.datamodel.Index, columns: Mapping[str, Column]) -> Index:
228 """Convert Felis index definition into instance of this class.
230 Parameters
231 ----------
232 dm_index : `felis.datamodel.Index`
233 Felis index definition.
234 columns : `~collections.abc.Mapping` [`str`, `Column`]
235 Mapping of column ID to `Column` instance.
237 Returns
238 -------
239 index : `Index`
240 Converted index definition.
241 """
242 return cls(
243 name=dm_index.name,
244 id=dm_index.id,
245 columns=[columns[c] for c in (dm_index.columns or [])],
246 expressions=dm_index.expressions or [],
247 description=dm_index.description,
248 annotations=_strip_keys(dict(dm_index), ["name", "id", "columns", "expressions", "description"]),
249 )
252@dataclasses.dataclass
253class Constraint:
254 """Constraint description, this is a base class, actual constraints will be
255 instances of one of the subclasses.
256 """
258 name: str | None
259 """Constraint name."""
261 id: str
262 """Felis ID for this constraint."""
264 deferrable: bool = False
265 """If `True` then this constraint will be declared as deferrable."""
267 initially: str | None = None
268 """Value for ``INITIALLY`` clause, only used of ``deferrable`` is True."""
270 description: str | None = None
271 """Constraint description."""
273 annotations: Mapping[str, Any] = dataclasses.field(default_factory=dict)
274 """Additional annotations for this constraint."""
276 @classmethod
277 def from_felis(cls, dm_constr: felis.datamodel.Constraint, columns: Mapping[str, Column]) -> Constraint:
278 """Convert Felis constraint definition into instance of this class.
280 Parameters
281 ----------
282 dm_const : `felis.datamodel.Constraint`
283 Felis constraint definition.
284 columns : `~collections.abc.Mapping` [`str`, `Column`]
285 Mapping of column ID to `Column` instance.
287 Returns
288 -------
289 constraint : `Constraint`
290 Converted constraint definition.
291 """
292 if isinstance(dm_constr, felis.datamodel.UniqueConstraint):
293 return UniqueConstraint(
294 name=dm_constr.name,
295 id=dm_constr.id,
296 columns=[columns[c] for c in dm_constr.columns],
297 deferrable=dm_constr.deferrable,
298 initially=dm_constr.initially,
299 description=dm_constr.description,
300 annotations=_strip_keys(
301 dict(dm_constr),
302 ["name", "type", "id", "columns", "deferrable", "initially", "description"],
303 ),
304 )
305 elif isinstance(dm_constr, felis.datamodel.ForeignKeyConstraint):
306 return ForeignKeyConstraint(
307 name=dm_constr.name,
308 id=dm_constr.id,
309 columns=[columns[c] for c in dm_constr.columns],
310 referenced_columns=[columns[c] for c in dm_constr.referenced_columns],
311 deferrable=dm_constr.deferrable,
312 initially=dm_constr.initially,
313 description=dm_constr.description,
314 annotations=_strip_keys(
315 dict(dm_constr),
316 [
317 "name",
318 "id",
319 "type",
320 "columns",
321 "deferrable",
322 "initially",
323 "referenced_columns",
324 "description",
325 ],
326 ),
327 )
328 elif isinstance(dm_constr, felis.datamodel.CheckConstraint):
329 return CheckConstraint(
330 name=dm_constr.name,
331 id=dm_constr.id,
332 expression=dm_constr.expression,
333 deferrable=dm_constr.deferrable,
334 initially=dm_constr.initially,
335 description=dm_constr.description,
336 annotations=_strip_keys(
337 dict(dm_constr),
338 ["name", "id", "type", "expression", "deferrable", "initially", "description"],
339 ),
340 )
341 else:
342 raise TypeError(f"Unexpected constraint type: {dm_constr}")
345@dataclasses.dataclass
346class UniqueConstraint(Constraint):
347 """Description of unique constraint."""
349 columns: list[Column] = dataclasses.field(default_factory=list)
350 """List of columns in this constraint, all columns belong to the same table
351 as the constraint itself.
352 """
355@dataclasses.dataclass
356class ForeignKeyConstraint(Constraint):
357 """Description of foreign key constraint."""
359 columns: list[Column] = dataclasses.field(default_factory=list)
360 """List of columns in this constraint, all columns belong to the same table
361 as the constraint itself.
362 """
364 referenced_columns: list[Column] = dataclasses.field(default_factory=list)
365 """List of referenced columns, the number of columns must be the same as in
366 ``Constraint.columns`` list. All columns must belong to the same table,
367 which is different from the table of this constraint.
368 """
370 onupdate: str | None = None
371 """What to do when parent table columns are updated. Typical values are
372 CASCADE, DELETE and RESTRICT.
373 """
375 ondelete: str | None = None
376 """What to do when parent table columns are deleted. Typical values are
377 CASCADE, DELETE and RESTRICT.
378 """
380 @property
381 def referenced_table(self) -> Table:
382 """Table referenced by this constraint."""
383 assert len(self.referenced_columns) > 0, "column list cannot be empty"
384 ref_table = self.referenced_columns[0].table
385 assert ref_table is not None, "foreign key column must have table defined"
386 return ref_table
389@dataclasses.dataclass
390class CheckConstraint(Constraint):
391 """Description of check constraint."""
393 expression: str = ""
394 """Expression on one or more columns on the table, must be non-empty."""
397@dataclasses.dataclass
398class Table:
399 """Description of a single table schema."""
401 name: str
402 """Table name."""
404 id: str
405 """Felis ID for this table."""
407 columns: list[Column]
408 """List of Column instances."""
410 primary_key: list[Column]
411 """List of Column that constitute a primary key, may be empty."""
413 constraints: list[Constraint]
414 """List of Constraint instances, can be empty."""
416 indexes: list[Index]
417 """List of Index instances, can be empty."""
419 description: str | None = None
420 """Table description."""
422 annotations: Mapping[str, Any] = dataclasses.field(default_factory=dict)
423 """Additional annotations for this table."""
425 def __post_init__(self) -> None:
426 """Update all columns to point to this table."""
427 for column in self.columns:
428 column.table = self
430 @classmethod
431 def from_felis(cls, dm_table: felis.datamodel.Table, columns: Mapping[str, Column]) -> Table:
432 """Convert Felis table definition into instance of this class.
434 Parameters
435 ----------
436 dm_table : `felis.datamodel.Table`
437 Felis table definition.
438 columns : `~collections.abc.Mapping` [`str`, `Column`]
439 Mapping of column ID to `Column` instance.
441 Returns
442 -------
443 table : `Table`
444 Converted table definition.
445 """
446 table_columns = [columns[c.id] for c in dm_table.columns]
447 if dm_table.primary_key:
448 pk_columns = [columns[c] for c in _make_iterable(dm_table.primary_key)]
449 else:
450 pk_columns = []
451 constraints = [Constraint.from_felis(constr, columns) for constr in dm_table.constraints]
452 indices = [Index.from_felis(dm_idx, columns) for dm_idx in dm_table.indexes]
453 table = cls(
454 name=dm_table.name,
455 id=dm_table.id,
456 columns=table_columns,
457 primary_key=pk_columns,
458 constraints=constraints,
459 indexes=indices,
460 description=dm_table.description,
461 annotations=_strip_keys(
462 dict(dm_table),
463 ["name", "id", "columns", "primaryKey", "constraints", "indexes", "description"],
464 ),
465 )
466 return table
469@dataclasses.dataclass
470class Schema:
471 """Complete schema description, collection of tables."""
473 name: str
474 """Schema name."""
476 id: str
477 """Felis ID for this schema."""
479 tables: list[Table]
480 """Collection of table definitions."""
482 version: felis.datamodel.SchemaVersion | None = None
483 """Schema version description."""
485 description: str | None = None
486 """Schema description."""
488 annotations: Mapping[str, Any] = dataclasses.field(default_factory=dict)
489 """Additional annotations for this table."""
491 @classmethod
492 def from_felis(cls, dm_schema: felis.datamodel.Schema) -> Schema:
493 """Convert felis schema definition to instance of this class.
495 Parameters
496 ----------
497 dm_schema : `felis.datamodel.Schema`
498 Felis schema definition.
500 Returns
501 -------
502 schema : `Schema`
503 Converted schema definition.
504 """
505 # Convert all columns first.
506 columns: MutableMapping[str, Column] = {}
507 for dm_table in dm_schema.tables:
508 for dm_column in dm_table.columns:
509 column = Column.from_felis(dm_column)
510 columns[column.id] = column
512 tables = [Table.from_felis(dm_table, columns) for dm_table in dm_schema.tables]
514 version: felis.datamodel.SchemaVersion | None
515 if isinstance(dm_schema.version, str):
516 version = felis.datamodel.SchemaVersion(current=dm_schema.version)
517 else:
518 version = dm_schema.version
520 schema = cls(
521 name=dm_schema.name,
522 id=dm_schema.id,
523 tables=tables,
524 version=version,
525 description=dm_schema.description,
526 annotations=_strip_keys(dict(dm_schema), ["name", "id", "tables", "description"]),
527 )
528 return schema