Coverage for python / felis / metadata.py: 15%
147 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 08:42 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 08:42 +0000
1"""Build SQLAlchemy metadata from a Felis schema."""
3# This file is part of felis.
4#
5# Developed for the LSST Data Management System.
6# This product includes software developed by the LSST Project
7# (https://www.lsst.org).
8# See the COPYRIGHT file at the top-level directory of this distribution
9# for details of code ownership.
10#
11# This program is free software: you can redistribute it and/or modify
12# it under the terms of the GNU General Public License as published by
13# the Free Software Foundation, either version 3 of the License, or
14# (at your option) any later version.
15#
16# This program is distributed in the hope that it will be useful,
17# but WITHOUT ANY WARRANTY; without even the implied warranty of
18# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19# GNU General Public License for more details.
20#
21# You should have received a copy of the GNU General Public License
22# along with this program. If not, see <https://www.gnu.org/licenses/>.
24from __future__ import annotations
26import logging
27from typing import IO, Any, Literal
29from lsst.utils.iteration import ensure_iterable
30from sqlalchemy import (
31 CheckConstraint,
32 Column,
33 Constraint,
34 ForeignKeyConstraint,
35 Index,
36 MetaData,
37 PrimaryKeyConstraint,
38 Table,
39 TextClause,
40 UniqueConstraint,
41 text,
42)
43from sqlalchemy.dialects import mysql, postgresql
44from sqlalchemy.types import TypeEngine
46from . import datamodel
47from .datamodel import Schema
48from .db import _sqltypes as sqltypes
49from .db._variants import make_variant_dict
50from .db.database_context import is_sqlite_url
51from .types import FelisType
53__all__ = ("MetaDataBuilder", "get_datatype_with_variants")
55logger = logging.getLogger(__name__)
58def _handle_timestamp_column(column_obj: datamodel.Column, variant_dict: dict[str, TypeEngine[Any]]) -> None:
59 """Handle columns with the timestamp datatype.
61 Parameters
62 ----------
63 column_obj
64 The column object representing the timestamp.
65 variant_dict
66 The dictionary of variant overrides for the datatype.
68 Notes
69 -----
70 This function updates the variant dictionary with the appropriate
71 timestamp type for the column object but only if the precision is set.
72 Otherwise, the default timestamp objects defined in the Felis type system
73 will be used instead.
74 """
75 if column_obj.precision is not None:
76 args: Any = [False, column_obj.precision] # Turn off timezone.
77 variant_dict.update({"postgresql": postgresql.TIMESTAMP(*args), "mysql": mysql.DATETIME(*args)})
80def get_datatype_with_variants(column_obj: datamodel.Column) -> TypeEngine:
81 """Use the Felis type system to get a SQLAlchemy datatype with variant
82 overrides from the information in a Felis column object.
84 Parameters
85 ----------
86 column_obj
87 The column object from which to get the datatype.
89 Returns
90 -------
91 `~sqlalchemy.types.TypeEngine`
92 The SQLAlchemy datatype object.
94 Raises
95 ------
96 ValueError
97 Raised if the column has a sized type but no length or if the datatype
98 is invalid.
99 """
100 variant_dict = make_variant_dict(column_obj)
101 felis_type = FelisType.felis_type(column_obj.datatype.value)
102 datatype_fun = getattr(sqltypes, column_obj.datatype.value, None)
103 if datatype_fun is None:
104 raise ValueError(f"Unknown datatype: {column_obj.datatype.value}")
105 args = []
106 if felis_type.is_sized:
107 # Add length argument for size types.
108 if not column_obj.length:
109 raise ValueError(f"Column {column_obj.name} has sized type '{column_obj.datatype}' but no length")
110 args = [column_obj.length]
111 if felis_type.is_timestamp:
112 _handle_timestamp_column(column_obj, variant_dict)
113 return datatype_fun(*args, **variant_dict)
116_VALID_SERVER_DEFAULTS = ("CURRENT_TIMESTAMP", "NOW()", "LOCALTIMESTAMP", "NULL")
119class MetaDataBuilder:
120 """Build a SQLAlchemy metadata object from a Felis schema.
122 Parameters
123 ----------
124 schema
125 The schema object from which to build the SQLAlchemy metadata.
126 apply_schema_to_metadata
127 Whether to apply the schema name to the metadata object.
128 ignore_constraints
129 Whether to ignore constraints when building the metadata.
130 table_name_postfix
131 A string to append to the table names when building the metadata.
132 skip_indexes
133 Skip indexes when building the metadata.
134 """
136 def __init__(
137 self,
138 schema: Schema,
139 apply_schema_to_metadata: bool = True,
140 ignore_constraints: bool = False,
141 table_name_postfix: str = "",
142 skip_indexes: bool = False,
143 ) -> None:
144 """Initialize the metadata builder."""
145 self.schema = schema
146 if not apply_schema_to_metadata:
147 logger.debug("Schema name will not be applied to metadata")
148 self.metadata = MetaData(schema=schema.name if apply_schema_to_metadata else None)
149 self._objects: dict[str, Any] = {}
150 self.ignore_constraints = ignore_constraints
151 self.table_name_postfix = table_name_postfix
152 self.skip_indexes = skip_indexes
154 def build(self) -> MetaData:
155 """Build the SQLAlchemy tables and constraints from the schema.
157 Notes
158 -----
159 This first builds the tables and then makes a second pass to build the
160 constraints. This is necessary because the constraints may reference
161 objects that are not yet created when the tables are built.
163 Returns
164 -------
165 `~sqlalchemy.sql.schema.MetaData`
166 The SQLAlchemy metadata object.
167 """
168 self.build_tables()
169 if not self.skip_indexes:
170 self.build_indexes()
171 else:
172 logger.warning("Ignoring indexes")
173 if not self.ignore_constraints:
174 self.build_constraints()
175 else:
176 logger.warning("Ignoring constraints")
177 return self.metadata
179 def build_tables(self) -> None:
180 """Build the SQLAlchemy tables from the schema."""
181 for table in self.schema.tables:
182 self.build_table(table)
183 if table.primary_key:
184 primary_key = self.build_primary_key(table.primary_key)
185 self._objects[table.id].append_constraint(primary_key)
187 def build_primary_key(self, primary_key_columns: str | list[str]) -> PrimaryKeyConstraint:
188 """Build a SQAlchemy ``PrimaryKeyConstraint`` from a single column ID
189 or a list of them.
191 Parameters
192 ----------
193 primary_key_columns
194 The column ID or list of column IDs from which to build the primary
195 key.
197 Returns
198 -------
199 `~sqlalchemy.sql.schema.PrimaryKeyConstraint`
200 The SQLAlchemy primary key constraint object.
202 Notes
203 -----
204 The ``primary_key_columns`` is a string or a list of strings
205 representing IDs which will be used to find the columnn objects in the
206 builder's internal ID map.
207 """
208 return PrimaryKeyConstraint(
209 *[self._objects[column_id] for column_id in ensure_iterable(primary_key_columns)]
210 )
212 def build_table(self, table_obj: datamodel.Table) -> None:
213 """Build a SQLAlchemy ``Table`` from a Felis table and add it to the
214 metadata.
216 Parameters
217 ----------
218 table_obj
219 The Felis table object from which to build the SQLAlchemy table.
221 Notes
222 -----
223 Several MySQL table options, including the engine and charset, are
224 handled by adding annotations to the table. This is not needed for
225 Postgres, as Felis does not support any table options for this dialect.
226 """
227 # Process mysql table options.
228 optargs = {}
229 if table_obj.mysql_engine:
230 optargs["mysql_engine"] = table_obj.mysql_engine
231 if table_obj.mysql_charset:
232 optargs["mysql_charset"] = table_obj.mysql_charset
234 # Create the SQLAlchemy table object and its columns.
235 name = table_obj.name
236 id = table_obj.id
237 description = table_obj.description
238 columns = [self.build_column(column) for column in table_obj.columns]
239 table = Table(
240 name + self.table_name_postfix,
241 self.metadata,
242 *columns,
243 comment=description,
244 **optargs, # type: ignore[arg-type]
245 )
247 self._objects[id] = table
249 def build_column(self, column_obj: datamodel.Column) -> Column:
250 """Build a SQLAlchemy ``Column`` from a Felis column object.
252 Parameters
253 ----------
254 column_obj
255 The column object from which to build the SQLAlchemy column.
257 Returns
258 -------
259 `~sqlalchemy.sql.schema.Column`
260 The SQLAlchemy column object.
261 """
262 # Get basic column attributes.
263 name = column_obj.name
264 id = column_obj.id
265 description = column_obj.description
266 value = column_obj.value
267 nullable = column_obj.nullable
269 # Get datatype, handling variant overrides such as "mysql:datatype".
270 datatype = get_datatype_with_variants(column_obj)
272 # Set autoincrement, depending on if it was provided explicitly.
273 autoincrement: Literal["auto"] | bool = (
274 column_obj.autoincrement if column_obj.autoincrement is not None else "auto"
275 )
277 server_default: str | TextClause | None = None
278 if value is not None:
279 server_default = str(value)
280 if server_default in _VALID_SERVER_DEFAULTS or not isinstance(value, str):
281 # If the server default is a valid keyword or not a string,
282 # use it as is.
283 server_default = text(server_default)
285 if server_default is not None:
286 logger.debug(f"Column '{id}' has default value: {server_default}")
288 column: Column = Column(
289 name,
290 datatype,
291 comment=description,
292 autoincrement=autoincrement,
293 nullable=nullable,
294 server_default=server_default,
295 )
297 self._objects[id] = column
299 return column
301 def build_constraints(self) -> None:
302 """Build the SQLAlchemy constraints from the Felis schema and append
303 them to the associated table in the metadata.
305 Notes
306 -----
307 This is performed as a separate step after building the tables so that
308 all the referenced objects in the constraints will be present and can
309 be looked up by their ID.
310 """
311 for table_obj in self.schema.tables:
312 table = self._objects[table_obj.id]
313 for constraint_obj in table_obj.constraints:
314 constraint = self.build_constraint(constraint_obj)
315 table.append_constraint(constraint)
317 def build_constraint(self, constraint_obj: datamodel.Constraint) -> Constraint:
318 """Build a SQLAlchemy ``Constraint`` from a Felis constraint.
320 Parameters
321 ----------
322 constraint_obj
323 The Felis object from which to build the constraint.
325 Returns
326 -------
327 `~sqlalchemy.sql.schema.Constraint`
328 The SQLAlchemy constraint object.
330 Raises
331 ------
332 ValueError
333 If the constraint type is not recognized.
334 TypeError
335 If the constraint object is not the expected type.
336 """
337 args: dict[str, Any] = {
338 "name": constraint_obj.name or None,
339 "comment": constraint_obj.description or None,
340 "deferrable": constraint_obj.deferrable or None,
341 "initially": constraint_obj.initially or None,
342 }
344 constraint: Constraint
346 if isinstance(constraint_obj, datamodel.ForeignKeyConstraint):
347 fk_obj: datamodel.ForeignKeyConstraint = constraint_obj
348 columns = [self._objects[column_id] for column_id in fk_obj.columns]
349 refcolumns = [self._objects[column_id] for column_id in fk_obj.referenced_columns]
350 if constraint_obj.on_delete is not None:
351 args["ondelete"] = constraint_obj.on_delete
352 if constraint_obj.on_update is not None:
353 args["onupdate"] = constraint_obj.on_update
354 constraint = ForeignKeyConstraint(columns, refcolumns, **args)
355 elif isinstance(constraint_obj, datamodel.CheckConstraint):
356 check_obj: datamodel.CheckConstraint = constraint_obj
357 expression = check_obj.expression
358 constraint = CheckConstraint(expression, **args)
359 elif isinstance(constraint_obj, datamodel.UniqueConstraint):
360 uniq_obj: datamodel.UniqueConstraint = constraint_obj
361 columns = [self._objects[column_id] for column_id in uniq_obj.columns]
362 constraint = UniqueConstraint(*columns, **args)
363 else:
364 raise ValueError(f"Unknown constraint type: {type(constraint_obj)}")
366 self._objects[constraint_obj.id] = constraint
368 return constraint
370 def build_index(self, index_obj: datamodel.Index) -> Index:
371 """Build a SQLAlchemy ``Index`` from a Felis `~felis.datamodel.Index`.
373 Parameters
374 ----------
375 index_obj
376 The Felis object from which to build the SQLAlchemy index.
378 Returns
379 -------
380 `~sqlalchemy.sql.schema.Index`
381 The SQLAlchemy index object.
382 """
383 columns = [self._objects[c_id] for c_id in (index_obj.columns if index_obj.columns else [])]
384 expressions = index_obj.expressions if index_obj.expressions else []
385 index = Index(index_obj.name, *columns, *expressions)
386 self._objects[index_obj.id] = index
387 return index
389 def build_indexes(self) -> None:
390 """Build the SQLAlchemy indexes from the Felis schema and add them to
391 the associated table in the metadata.
392 """
393 for table in self.schema.tables:
394 md_table = self._objects.get(table.id, None)
395 if md_table is None:
396 raise KeyError(f"Table with ID '{table.id}' not found in objects map")
397 if not isinstance(md_table, Table):
398 raise TypeError(f"Expected Table object, got {type(md_table)}")
399 indexes = [self.build_index(index) for index in table.indexes]
400 for index in indexes:
401 index._set_parent(md_table)
402 md_table.indexes.add(index)
405def create_metadata(
406 felis_file: IO[str],
407 schema_name: str | None = None,
408 id_generation: bool = True,
409 ignore_constraints: bool = False,
410 skip_indexes: bool = False,
411 engine_url: str | None = None,
412) -> MetaData:
413 """Create SQLAlchemy metadata from a Felis schema file.
415 Parameters
416 ----------
417 felis_file
418 The Felis schema file to read.
419 schema_name
420 Optional schema name to override the one in the file.
421 id_generation
422 Whether to generate IDs for all objects in the schema that do not have
423 them.
424 ignore_constraints
425 Whether to ignore constraints when building metadata.
426 skip_indexes
427 Whether to skip creating indexes when building metadata.
428 engine_url
429 Engine URL to determine if SQLite-specific handling is needed.
431 Returns
432 -------
433 MetaData
434 The SQLAlchemy metadata object with proper schema handling.
435 """
436 schema = Schema.from_stream(felis_file, context={"id_generation": id_generation})
437 if schema_name:
438 logger.info(f"Overriding schema name with: {schema_name}")
439 schema.name = schema_name
441 # Determine if we need SQLite-specific handling
442 apply_schema = True
443 if engine_url:
444 if is_sqlite_url(engine_url):
445 apply_schema = False
446 logger.debug("SQLite detected: schema name will not be applied to metadata")
448 return MetaDataBuilder(
449 schema,
450 ignore_constraints=ignore_constraints,
451 skip_indexes=skip_indexes,
452 apply_schema_to_metadata=apply_schema,
453 ).build()