Coverage for tests / test_tap_schema_postgres.py: 29%
86 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:37 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:37 +0000
1# This file is part of felis.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22import gc
23import os
24import unittest
26from sqlalchemy import MetaData
27from sqlalchemy.engine import create_engine
28from sqlalchemy.exc import SQLAlchemyError
29from sqlalchemy.schema import CreateSchema
31from felis.datamodel import Schema
32from felis.db.database_context import create_database_context
33from felis.metadata import MetaDataBuilder
34from felis.tap_schema import DataLoader, TableManager
36try:
37 from testing.postgresql import Postgresql
38except ImportError:
39 Postgresql = None
41TEST_DIR = os.path.abspath(os.path.dirname(__file__))
42TEST_SALES = os.path.join(TEST_DIR, "data", "sales.yaml")
43TEST_TAP_SCHEMA_NONSTD = os.path.join(TEST_DIR, "data", "test_tap_schema_nonstandard.yaml")
46class TestTapSchemaPostgresql(unittest.TestCase):
47 """Test TAP_SCHEMA for PostgreSQL"""
49 def setUp(self) -> None:
50 """Set up a local PostgreSQL database and a test schema."""
51 # Skip the test if the testing.postgresql package is not installed.
52 if not Postgresql:
53 self.skipTest("testing.postgresql not installed")
55 # Start a PostgreSQL database for testing.
56 self.postgresql = Postgresql()
57 url = self.postgresql.url()
58 self.engine = create_engine(url)
60 # Setup a test schema.
61 self.test_schema = Schema.from_uri(TEST_SALES)
63 def test_create_metadata(self) -> None:
64 """Test using a schema that was created by using the TAP_SCHEMA tables
65 by the `~felis.tap_schema.TableManager`.
66 """
67 # Create the TAP_SCHEMA database.
68 mgr = TableManager(engine_url=str(self.engine.url))
69 with create_database_context(str(self.engine.url), mgr.metadata) as db_ctx:
70 mgr.initialize_database(db_ctx)
72 # Load the test data into the database.
73 loader = DataLoader(self.test_schema, mgr, db_ctx, tap_schema_index=1)
74 loader.load()
75 # Context manager will handle cleanup
77 def test_reflect_database(self) -> None:
78 """Test reflecting an existing PostgreSQL TAP_SCHEMA database into a
79 `~felis.tap_schema.TableManager`.
80 """
81 mgr = None
82 db_ctx = None
83 try:
84 # Build the TAP_SCHEMA database independently of the TableManager.
85 schema = TableManager.load_schema_resource()
86 md = MetaDataBuilder(schema).build()
87 with self.engine.connect() as conn:
88 trans = conn.begin()
89 try:
90 print(f"Creating schema '{schema.name}'")
91 conn.execute(CreateSchema(schema.name, if_not_exists=False))
92 trans.commit()
93 except SQLAlchemyError as e:
94 trans.rollback()
95 self.fail(f"Failed to create schema: {e}")
96 try:
97 print(f"Creating tables in schema: {md.schema}")
98 md.create_all(self.engine)
99 except SQLAlchemyError as e:
100 self.fail(f"Failed to create database: {e}")
102 # Reflect the existing database into a TableManager.
103 with create_database_context(str(self.engine.url), md) as db_ctx:
104 mgr = TableManager(engine_url=str(self.engine.url), db_context=db_ctx)
105 self.assertIsNotNone(mgr.metadata)
106 self.assertGreater(len(mgr.metadata.tables), 0)
107 table_names = set(
108 [table_name.replace(f"{schema.name}.", "") for table_name in mgr.metadata.tables.keys()]
109 )
110 self.assertEqual(table_names, set(TableManager.get_table_names_std()))
112 # See if test data can be loaded successfully using the
113 # existing database
114 loader = DataLoader(self.test_schema, mgr, db_ctx, tap_schema_index=1)
115 loader.load()
116 except Exception as e:
117 self.fail(f"Test failed with exception: {e}")
119 def test_nonstandard_names(self) -> None:
120 """Test the TAP table manager class with non-standard names for the
121 schema and columns, which are present in the test YAML file used
122 to create the TAP_SCHEMA database.
123 """
124 with open(TEST_TAP_SCHEMA_NONSTD) as file:
125 sch = Schema.from_stream(file, context={"id_generation": True})
126 md = MetaDataBuilder(sch).build()
127 with create_database_context(str(self.engine.url), md) as ctx:
128 ctx.initialize()
129 ctx.create_all()
131 postfix = "11"
132 # Create a context for reflection with the existing database
133 with create_database_context(str(self.engine.url), md) as reflect_ctx:
134 mgr = TableManager(
135 engine_url=str(self.engine.url),
136 db_context=reflect_ctx,
137 table_name_postfix=postfix,
138 schema_name=sch.name,
139 )
140 for table_name in mgr.get_table_names_std():
141 table = mgr[table_name]
142 self.assertEqual(table.name, f"{table_name}{postfix}".replace(f"{sch.name}", ""))
144 def test_bad_engine(self) -> None:
145 """Test the TableManager class with an invalid engine."""
146 bad_url = "postgresql+psycopg2://fake_user:fake_password@fake_host:5555"
147 # Create metadata for reflection attempt
148 md = MetaData(schema="TAP_SCHEMA")
149 with create_database_context(bad_url, md) as db_ctx:
150 with self.assertRaises(SQLAlchemyError):
151 # Reflection will fail when trying to connect
152 TableManager(db_context=db_ctx)
154 def tearDown(self) -> None:
155 """Tear down the test case."""
156 gc.collect()
157 self.engine.dispose()
160if __name__ == "__main__": 160 ↛ 161line 160 didn't jump to line 161 because the condition on line 160 was never true
161 unittest.main()