Coverage for python / felis / tap_schema.py: 22%
241 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 08:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 08:49 +0000
1"""Provides utilities for creating and populating the TAP_SCHEMA database."""
3# This file is part of felis.
4#
5# Developed for the LSST Data Management System.
6# This product includes software developed by the LSST Project
7# (https://www.lsst.org).
8# See the COPYRIGHT file at the top-level directory of this distribution
9# for details of code ownership.
10#
11# This program is free software: you can redistribute it and/or modify
12# it under the terms of the GNU General Public License as published by
13# the Free Software Foundation, either version 3 of the License, or
14# (at your option) any later version.
15#
16# This program is distributed in the hope that it will be useful,
17# but WITHOUT ANY WARRANTY; without even the implied warranty of
18# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19# GNU General Public License for more details.
20#
21# You should have received a copy of the GNU General Public License
22# along with this program. If not, see <https://www.gnu.org/licenses/>.
24import csv
25import io
26import logging
27import os
28import re
29from typing import IO, Any
31from lsst.resources import ResourcePath
32from sqlalchemy import MetaData, Table, select, text
33from sqlalchemy.exc import SQLAlchemyError
34from sqlalchemy.sql.dml import Insert
36from . import datamodel
37from .datamodel import Constraint, Schema
38from .db.database_context import DatabaseContext, is_sqlite_url
39from .metadata import MetaDataBuilder
40from .types import FelisType
42__all__ = ["DataLoader", "MetadataInserter", "TableManager"]
44logger = logging.getLogger(__name__)
47class TableManager:
48 """Manage TAP_SCHEMA table definitions and access.
50 This class provides a streamlined interface for managing TAP_SCHEMA tables,
51 automatically handling dialect-specific requirements and providing
52 consistent access to TAP_SCHEMA tables through a dictionary-like interface.
54 Parameters
55 ----------
56 engine_url
57 Database engine URL for automatic dialect detection and schema
58 handling.
59 db_context
60 Optional database context for reflecting existing TAP_SCHEMA tables.
61 If None, loads from internal YAML schema.
62 schema_name
63 The name of the schema to use for TAP_SCHEMA tables.
64 Defaults to "TAP_SCHEMA".
65 table_name_postfix
66 A string to append to standard table names for customization.
67 extensions_path
68 Path to additional TAP_SCHEMA table definitions.
70 Notes
71 -----
72 The TableManager automatically detects SQLite vs. schema-supporting
73 databases and handles schema application appropriately.
74 """
76 _TABLE_NAMES_STD = ["schemas", "tables", "columns", "keys", "key_columns"]
77 """The standard table names for the TAP_SCHEMA tables."""
79 _SCHEMA_NAME_STD = "TAP_SCHEMA"
80 """The standard schema name for the TAP_SCHEMA tables."""
82 def __init__(
83 self,
84 engine_url: str | None = None,
85 db_context: DatabaseContext | None = None,
86 schema_name: str | None = None,
87 table_name_postfix: str = "",
88 extensions_path: str | None = None,
89 ):
90 """Initialize the table manager."""
91 self.table_name_postfix = table_name_postfix
92 self.schema_name = schema_name or self._SCHEMA_NAME_STD
93 self.extensions_path = extensions_path
95 # Automatic dialect detection from engine URL
96 if engine_url is not None:
97 self.apply_schema_to_metadata = not is_sqlite_url(engine_url)
98 else:
99 # Default case: assume SQLite
100 engine_url = "sqlite:///:memory:"
101 self.apply_schema_to_metadata = False
103 if db_context is not None:
104 if table_name_postfix != "":
105 logger.warning(
106 "Table name postfix '%s' will be ignored when reflecting TAP_SCHEMA database",
107 table_name_postfix,
108 )
109 logger.debug(
110 "Reflecting TAP_SCHEMA database from existing database at %s",
111 db_context.engine.url._replace(password="***"),
112 )
113 self._reflect_from_database(db_context)
114 else:
115 self._load_from_yaml()
117 self._create_table_map()
118 self._check_tables()
120 def _load_from_yaml(self) -> None:
121 """Load TAP_SCHEMA from YAML resources and build metadata."""
122 # Load the base schema
123 self._schema = self.load_schema_resource()
125 # Override schema name if specified
126 if self.schema_name != self._SCHEMA_NAME_STD:
127 self._schema.name = self.schema_name
128 else:
129 self.schema_name = self._schema.name
131 # Apply any extensions
132 self._apply_extensions()
134 # Build metadata using streamlined approach
135 self._metadata = MetaDataBuilder(
136 self._schema,
137 apply_schema_to_metadata=self.apply_schema_to_metadata,
138 table_name_postfix=self.table_name_postfix,
139 ).build()
141 logger.debug("Loaded TAP_SCHEMA '%s' from YAML resource", self.schema_name)
143 def _reflect_from_database(self, db_context: DatabaseContext) -> None:
144 """Reflect TAP_SCHEMA tables from an existing database.
146 Parameters
147 ----------
148 db_context
149 The database context to use for reflection.
150 """
151 self._metadata = MetaData(schema=self.schema_name if self.apply_schema_to_metadata else None)
152 try:
153 self._metadata.reflect(bind=db_context.engine)
154 except SQLAlchemyError as e:
155 logger.error("Error reflecting TAP_SCHEMA database: %s", e)
156 raise
158 def _apply_extensions(self) -> None:
159 """Apply extensions from a YAML file to the TAP_SCHEMA schema.
161 This method loads extension column definitions from a YAML file and
162 adds them to the appropriate TAP_SCHEMA tables.
163 """
164 if not self.extensions_path:
165 return
167 logger.info("Loading TAP_SCHEMA extensions from: %s", self.extensions_path)
168 extensions_schema = Schema.from_uri(self.extensions_path, context={"id_generation": True})
170 if not extensions_schema.tables:
171 logger.warning("Extensions schema does not contain any tables, no extensions applied")
172 return
174 extension_count = 0
175 extension_tables = {table.name: table.columns for table in extensions_schema.tables}
177 for table in self.schema.tables:
178 extension_columns = extension_tables.get(table.name)
179 if extension_columns:
180 table.columns = list(table.columns) + list(extension_columns)
181 extension_count += len(extension_columns)
182 logger.debug("Added %d extension columns to table '%s'", len(extension_columns), table.name)
184 logger.info("Applied %d extension columns to TAP_SCHEMA", extension_count)
186 def __getitem__(self, table_name: str) -> Table:
187 """Get one of the TAP_SCHEMA tables by its standard TAP_SCHEMA name.
189 Parameters
190 ----------
191 table_name
192 The name of the table to get.
194 Returns
195 -------
196 Table
197 The table with the given name.
199 Notes
200 -----
201 This implements array semantics for the table manager, allowing
202 tables to be accessed by their standard TAP_SCHEMA names.
203 """
204 if table_name not in self._table_map:
205 raise KeyError(f"Table '{table_name}' not found in TAP_SCHEMA")
206 return self.metadata.tables[self._table_map[table_name]]
208 @property
209 def schema(self) -> Schema:
210 """Get the TAP_SCHEMA schema.
212 Returns
213 -------
214 Schema
215 The TAP_SCHEMA schema.
217 Notes
218 -----
219 This will only be set if the TAP_SCHEMA schema was loaded from a
220 Felis package resource. In the case where the TAP_SCHEMA schema was
221 reflected from an existing database, this will be None.
222 """
223 return self._schema
225 @property
226 def metadata(self) -> MetaData:
227 """Get the metadata for the TAP_SCHEMA tables.
229 Returns
230 -------
231 `~sqlalchemy.sql.schema.MetaData`
232 The metadata for the TAP_SCHEMA tables.
234 Notes
235 -----
236 This will either be the metadata that was reflected from an existing
237 database or the metadata that was loaded from a Felis package resource.
238 """
239 return self._metadata
241 @classmethod
242 def get_tap_schema_std_path(cls) -> str:
243 """Get the path to the standard TAP_SCHEMA schema resource.
245 Returns
246 -------
247 str
248 The path to the standard TAP_SCHEMA schema resource.
249 """
250 return os.path.join(os.path.dirname(__file__), "config", "tap_schema", "tap_schema_std.yaml")
252 @classmethod
253 def get_tap_schema_std_resource(cls) -> ResourcePath:
254 """Get the standard TAP_SCHEMA schema resource.
256 Returns
257 -------
258 `~lsst.resources.ResourcePath`
259 The standard TAP_SCHEMA schema resource.
260 """
261 return ResourcePath("resource://felis/config/tap_schema/tap_schema_std.yaml")
263 @classmethod
264 def get_table_names_std(cls) -> list[str]:
265 """Get the standard column names for the TAP_SCHEMA tables.
267 Returns
268 -------
269 list
270 The standard table names for the TAP_SCHEMA tables.
271 """
272 return cls._TABLE_NAMES_STD
274 @classmethod
275 def get_schema_name_std(cls) -> str:
276 """Get the standard schema name for the TAP_SCHEMA tables.
278 Returns
279 -------
280 str
281 The standard schema name for the TAP_SCHEMA tables.
282 """
283 return cls._SCHEMA_NAME_STD
285 @classmethod
286 def load_schema_resource(cls) -> Schema:
287 """Load the standard TAP_SCHEMA schema from a Felis package
288 resource into a Felis `~felis.datamodel.Schema`.
290 Returns
291 -------
292 Schema
293 The TAP_SCHEMA schema.
294 """
295 rp = cls.get_tap_schema_std_resource()
296 return Schema.from_uri(rp, context={"id_generation": True})
298 def _load_schema(self) -> None:
299 """Load the TAP_SCHEMA schema from a Felis package resource."""
300 self._schema = self.load_schema_resource()
302 def _create_table_map(self) -> None:
303 """Create a mapping of standard table names to the table names modified
304 with a postfix, as well as the prepended schema name if it is set.
306 Notes
307 -----
308 This is a private method that is called during initialization, allowing
309 us to use table names like ``schemas11`` such as those used by the CADC
310 TAP library instead of the standard table names. It also maps between
311 the standard table names and those with the schema name prepended like
312 SQLAlchemy uses. The mapping is stored in ``self._table_map``.
313 """
314 self._table_map = {
315 table_name: (
316 f"{self.schema_name + '.' if self.apply_schema_to_metadata else ''}"
317 f"{table_name}{self.table_name_postfix}"
318 )
319 for table_name in TableManager.get_table_names_std()
320 }
321 logger.debug(f"Created TAP_SCHEMA table map: {self._table_map}")
323 def _check_tables(self) -> None:
324 """Check that there is a valid mapping to each standard table.
326 Raises
327 ------
328 KeyError
329 If a table is missing from the table map.
330 """
331 for table_name in TableManager.get_table_names_std():
332 self[table_name]
334 def initialize_database(self, db_context: DatabaseContext) -> None:
335 """Initialize a database with the TAP_SCHEMA tables.
337 Parameters
338 ----------
339 db_context
340 The database context to use to create the tables.
341 """
342 logger.info("Creating TAP_SCHEMA database '%s'", self.schema_name)
343 db_context.initialize()
344 db_context.create_all()
346 def select(
347 self,
348 db_context: DatabaseContext,
349 table_name: str,
350 filter_condition: str = "",
351 ) -> list[dict[str, Any]]:
352 """Select all rows from a TAP_SCHEMA table with an optional filter
353 condition.
355 Parameters
356 ----------
357 db_context
358 The database context to use to connect to the database.
359 table_name
360 The name of the table to select from.
361 filter_condition
362 The filter condition as a string. If empty, no filter will be
363 applied.
365 Returns
366 -------
367 list
368 A list of dictionaries containing the rows from the table.
369 """
370 table = self[table_name]
371 query = select(table)
372 if filter_condition:
373 query = query.where(text(filter_condition))
374 with db_context.engine.connect() as connection:
375 result = connection.execute(query)
376 rows = [dict(row._mapping) for row in result]
377 return rows
380class DataLoader:
381 """Load data into the TAP_SCHEMA tables.
383 Parameters
384 ----------
385 schema
386 The Felis ``Schema`` to load into the TAP_SCHEMA tables.
387 mgr
388 The table manager that contains the TAP_SCHEMA tables.
389 db_context
390 The database context to use to connect to the database.
391 tap_schema_index
392 The index of the schema in the TAP_SCHEMA database.
393 output_file
394 The file object to write the SQL statements to. If None, file output
395 will be suppressed.
396 print_sql
397 If True, print the SQL statements that will be executed.
398 dry_run
399 If True, the data will not be loaded into the database.
400 unique_keys
401 If True, prepend the schema name to the key name to make it unique
402 when loading data into the keys and key_columns tables.
403 """
405 def __init__(
406 self,
407 schema: Schema,
408 mgr: TableManager,
409 db_context: DatabaseContext,
410 tap_schema_index: int = 0,
411 output_file: IO[str] | None = None,
412 print_sql: bool = False,
413 dry_run: bool = False,
414 unique_keys: bool = False,
415 ):
416 self.schema = schema
417 self.mgr = mgr
418 self._db_context = db_context
419 self.tap_schema_index = tap_schema_index
420 self.inserts: list[Insert] = []
421 self.output_file = output_file
422 self.print_sql = print_sql
423 self.dry_run = dry_run
424 self.unique_keys = unique_keys
426 def load(self) -> None:
427 """Load the schema data into the TAP_SCHEMA tables.
429 Notes
430 -----
431 This will generate inserts for the data, print the SQL statements if
432 requested, save the SQL statements to a file if requested, and load the
433 data into the database if not in dry run mode. These are done as
434 sequential operations rather than for each insert. The logic is that
435 the user may still want the complete SQL output to be printed or saved
436 to a file even if loading into the database causes errors. If there are
437 errors when inserting into the database, the SQLAlchemy error message
438 should indicate which SQL statement caused the error.
439 """
440 self._generate_all_inserts()
441 if self.print_sql:
442 # Print to stdout.
443 self._print_sql()
444 if self.output_file:
445 # Print to an output file.
446 self._write_sql_to_file()
447 if not self.dry_run:
448 # Execute the inserts if not in dry run mode.
449 self._execute_inserts()
450 else:
451 logger.info("Dry run - skipped loading into database")
453 def _insert_schemas(self) -> None:
454 """Insert the schema data into the ``schemas`` table."""
455 schema_record = {
456 "schema_name": self.schema.name,
457 "utype": self.schema.votable_utype,
458 "description": self.schema.description,
459 "schema_index": self.tap_schema_index,
460 }
461 self._insert("schemas", schema_record)
463 def _get_table_name(self, table: datamodel.Table) -> str:
464 """Get the name of the table with the schema name prepended.
466 Parameters
467 ----------
468 table
469 The table to get the name for.
471 Returns
472 -------
473 str
474 The name of the table with the schema name prepended.
475 """
476 return f"{self.schema.name}.{table.name}"
478 def _insert_tables(self) -> None:
479 """Insert the table data into the ``tables`` table."""
480 for table in self.schema.tables:
481 table_record = {
482 "schema_name": self.schema.name,
483 "table_name": self._get_table_name(table),
484 "table_type": "table",
485 "utype": table.votable_utype,
486 "description": table.description,
487 "table_index": 0 if table.tap_table_index is None else table.tap_table_index,
488 }
489 self._insert("tables", table_record)
491 def _insert_columns(self) -> None:
492 """Insert the column data into the ``columns`` table."""
493 for table in self.schema.tables:
494 for column in table.columns:
495 felis_type = FelisType.felis_type(column.datatype.value)
496 arraysize = str(column.votable_arraysize) if column.votable_arraysize else None
497 size = DataLoader._get_size(column)
498 indexed = DataLoader._is_indexed(column, table)
499 tap_column_index = column.tap_column_index
500 unit = column.ivoa_unit or column.fits_tunit
502 column_record = {
503 "table_name": self._get_table_name(table),
504 "column_name": column.name,
505 "datatype": felis_type.votable_name,
506 "arraysize": arraysize,
507 "size": size,
508 "xtype": column.votable_xtype,
509 "description": column.description,
510 "utype": column.votable_utype,
511 "unit": unit,
512 "ucd": column.ivoa_ucd,
513 "indexed": indexed,
514 "principal": column.tap_principal,
515 "std": column.tap_std,
516 "column_index": tap_column_index,
517 }
518 self._insert("columns", column_record)
520 def _get_key(self, constraint: Constraint) -> str:
521 """Get the key name for a constraint.
523 Parameters
524 ----------
525 constraint
526 The constraint to get the key name for.
528 Returns
529 -------
530 str
531 The key name for the constraint.
533 Notes
534 -----
535 This will prepend the name of the schema to the key name if the
536 `unique_keys` attribute is set to True. Otherwise, it will just return
537 the name of the constraint.
538 """
539 if self.unique_keys:
540 key_id = f"{self.schema.name}_{constraint.name}"
541 logger.debug("Generated unique key_id: %s -> %s", constraint.name, key_id)
542 else:
543 key_id = constraint.name
544 return key_id
546 def _insert_keys(self) -> None:
547 """Insert the foreign keys into the ``keys`` and ``key_columns``
548 tables.
549 """
550 for table in self.schema.tables:
551 for constraint in table.constraints:
552 if isinstance(constraint, datamodel.ForeignKeyConstraint):
553 ###########################################################
554 # Handle keys table
555 ###########################################################
556 referenced_column = self.schema.find_object_by_id(
557 constraint.referenced_columns[0], datamodel.Column
558 )
559 referenced_table = self.schema.get_table_by_column(referenced_column)
560 key_id = self._get_key(constraint)
561 key_record = {
562 "key_id": key_id,
563 "from_table": self._get_table_name(table),
564 "target_table": self._get_table_name(referenced_table),
565 "description": constraint.description,
566 "utype": constraint.votable_utype,
567 }
568 self._insert("keys", key_record)
570 ###########################################################
571 # Handle key_columns table
572 ###########################################################
573 # Loop over the corresponding columns and referenced
574 # columns and insert a record for each pair. This is
575 # necessary for proper handling of composite keys.
576 for from_column_id, target_column_id in zip(
577 constraint.columns, constraint.referenced_columns
578 ):
579 from_column = self.schema.find_object_by_id(from_column_id, datamodel.Column)
580 target_column = self.schema.find_object_by_id(target_column_id, datamodel.Column)
581 key_columns_record = {
582 "key_id": key_id,
583 "from_column": from_column.name,
584 "target_column": target_column.name,
585 }
586 self._insert("key_columns", key_columns_record)
588 def _generate_all_inserts(self) -> None:
589 """Generate the inserts for all the data."""
590 self.inserts.clear()
591 self._insert_schemas()
592 self._insert_tables()
593 self._insert_columns()
594 self._insert_keys()
595 logger.debug("Generated %d insert statements", len(self.inserts))
597 def _execute_inserts(self) -> None:
598 """Load the `~felis.datamodel.Schema` data into the TAP_SCHEMA
599 tables.
600 """
601 try:
602 with self._db_context.engine.begin() as connection:
603 for insert in self.inserts:
604 connection.execute(insert)
605 except Exception as e:
606 logger.error("Error loading data into database: %s", e)
607 raise
609 def _compiled_inserts(self) -> list[str]:
610 """Compile the inserts to SQL.
612 Returns
613 -------
614 list
615 A list of the compiled insert statements.
616 """
617 return [
618 str(
619 insert.compile(
620 dialect=self._db_context.dialect,
621 compile_kwargs={"literal_binds": True},
622 ),
623 )
624 for insert in self.inserts
625 ]
627 def _print_sql(self) -> None:
628 """Print the generated inserts to stdout."""
629 for insert_str in self._compiled_inserts():
630 print(insert_str + ";")
632 def _write_sql_to_file(self) -> None:
633 """Write the generated insert statements to a file."""
634 if not self.output_file:
635 raise ValueError("No output file specified")
636 for insert_str in self._compiled_inserts():
637 self.output_file.write(insert_str + ";" + "\n")
639 def _insert(self, table_name: str, record: list[Any] | dict[str, Any]) -> None:
640 """Generate an insert statement for a record.
642 Parameters
643 ----------
644 table_name
645 The name of the table to insert the record into.
646 record
647 The record to insert into the table.
648 """
649 table = self.mgr[table_name]
650 insert_statement = table.insert().values(record)
651 self.inserts.append(insert_statement)
653 @staticmethod
654 def _get_size(column: datamodel.Column) -> int | None:
655 """Get the size of the column.
657 Parameters
658 ----------
659 column
660 The column to get the size for.
662 Returns
663 -------
664 int or None
665 The size of the column or None if not applicable.
666 """
667 arraysize = column.votable_arraysize
669 if not arraysize:
670 return None
672 arraysize_str = str(arraysize)
673 if arraysize_str.isdigit():
674 return int(arraysize_str)
676 match = re.match(r"^([0-9]+)\*$", arraysize_str)
677 if match and match.group(1) is not None:
678 return int(match.group(1))
680 return None
682 @staticmethod
683 def _is_indexed(column: datamodel.Column, table: datamodel.Table) -> int:
684 """Check if the column is indexed in the table.
686 Parameters
687 ----------
688 column
689 The column to check.
690 table
691 The table to check.
693 Returns
694 -------
695 int
696 1 if the column is indexed, 0 otherwise.
697 """
698 if isinstance(table.primary_key, str) and table.primary_key == column.id:
699 return 1
700 for index in table.indexes:
701 if index.columns and len(index.columns) == 1 and index.columns[0] == column.id:
702 return 1
703 return 0
706class MetadataInserter:
707 """Insert TAP_SCHEMA self-description rows into the database.
709 Parameters
710 ----------
711 mgr
712 The table manager that contains the TAP_SCHEMA tables.
713 db_context
714 The database context for connecting to the TAP_SCHEMA database.
715 """
717 def __init__(self, mgr: TableManager, db_context: DatabaseContext):
718 """Initialize the metadata inserter.
720 Parameters
721 ----------
722 mgr
723 The table manager representing the TAP_SCHEMA tables.
724 db_context
725 The database context for connecting to the database.
726 """
727 self._mgr = mgr
728 self._db_context = db_context
730 def insert_metadata(self) -> None:
731 """Insert the TAP_SCHEMA metadata into the database."""
732 with self._db_context.engine.begin() as conn:
733 for table_name in self._mgr.get_table_names_std():
734 table = self._mgr[table_name]
735 csv_bytes = ResourcePath(f"resource://felis/config/tap_schema/{table_name}.csv").read()
736 text_stream = io.TextIOWrapper(io.BytesIO(csv_bytes), encoding="utf-8")
737 reader = csv.reader(text_stream)
738 headers = next(reader)
739 rows = [
740 {key: None if value == "\\N" else value for key, value in zip(headers, row)}
741 for row in reader
742 ]
743 logger.debug(
744 "Inserting %d rows into table '%s' with headers: %s",
745 len(rows),
746 table_name,
747 headers,
748 )
749 conn.execute(table.insert(), rows)