Coverage for tests / test_metadata.py: 10%
145 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 08:42 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 08:42 +0000
1# This file is part of felis.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22import os
23import unittest
25import yaml
26from sqlalchemy import (
27 CheckConstraint,
28 Constraint,
29 ForeignKeyConstraint,
30 Index,
31 MetaData,
32 PrimaryKeyConstraint,
33 UniqueConstraint,
34)
36from felis import datamodel as dm
37from felis.datamodel import Schema
38from felis.db.database_context import create_database_context
39from felis.metadata import MetaDataBuilder, get_datatype_with_variants
41TESTDIR = os.path.abspath(os.path.dirname(__file__))
42TEST_YAML = os.path.join(TESTDIR, "data", "sales.yaml")
45class MetaDataTestCase(unittest.TestCase):
46 """Test creation of SQLAlchemy metadata from a Felis schema."""
48 def setUp(self) -> None:
49 """Load the YAML test data."""
50 with open(TEST_YAML) as data:
51 self.yaml_data = yaml.safe_load(data)
53 def test_create_all(self) -> None:
54 """Create all tables in the schema using the metadata object and a
55 SQLite connection.
57 Check that the reflected metadata matches that created by the builder.
58 """
60 def _sorted_indexes(indexes: set[Index]) -> list[Index]:
61 """Return a sorted list of indexes."""
62 return sorted(indexes, key=lambda i: i.name)
64 def _sorted_constraints(constraints: set[Constraint]) -> list[Constraint]:
65 """Return a sorted list of constraints with the
66 `PrimaryKeyConstraint` objects filtered out.
67 """
68 return sorted(
69 [c for c in constraints if not isinstance(c, PrimaryKeyConstraint)], key=lambda c: c.name
70 )
72 schema = Schema.model_validate(self.yaml_data)
73 schema.name = "main"
74 builder = MetaDataBuilder(schema)
75 md = builder.build()
77 with create_database_context("sqlite:///:memory:", md) as db_ctx:
78 db_ctx.create_all()
80 md_db = MetaData()
81 md_db.reflect(db_ctx.engine.connect(), schema=schema.name)
83 self.assertEqual(md_db.tables.keys(), md.tables.keys())
85 for md_table_name in md.tables.keys():
86 md_table = md.tables[md_table_name]
87 md_db_table = md_db.tables[md_table_name]
88 self.assertEqual(md_table.columns.keys(), md_db_table.columns.keys())
89 for md_column_name in md_table.columns.keys():
90 md_column = md_table.columns[md_column_name]
91 md_db_column = md_db_table.columns[md_column_name]
92 self.assertEqual(type(md_column.type), type(md_db_column.type))
93 self.assertEqual(md_column.nullable, md_db_column.nullable)
94 self.assertEqual(md_column.primary_key, md_db_column.primary_key)
95 self.assertTrue(
96 (md_table.constraints and md_db_table.constraints)
97 or (not md_table.constraints and not md_table.constraints),
98 "Constraints not created correctly",
99 )
100 if md_table.constraints:
101 self.assertEqual(len(md_table.constraints), len(md_db_table.constraints))
102 md_constraints = _sorted_constraints(md_table.constraints)
103 md_db_constraints = _sorted_constraints(md_db_table.constraints)
104 for md_constraint, md_db_constraint in zip(md_constraints, md_db_constraints):
105 self.assertEqual(md_constraint.name, md_db_constraint.name)
106 self.assertEqual(md_constraint.deferrable, md_db_constraint.deferrable)
107 self.assertEqual(md_constraint.initially, md_db_constraint.initially)
108 self.assertEqual(
109 type(md_constraint), type(md_db_constraint), "Constraint types do not match"
110 )
111 if isinstance(md_constraint, ForeignKeyConstraint) and isinstance(
112 md_db_constraint, ForeignKeyConstraint
113 ):
114 md_fk: ForeignKeyConstraint = md_constraint
115 md_db_fk: ForeignKeyConstraint = md_db_constraint
116 self.assertEqual(md_fk.referred_table.name, md_db_fk.referred_table.name)
117 self.assertEqual(md_fk.column_keys, md_db_fk.column_keys)
118 self.assertEqual(md_fk.ondelete, md_db_fk.ondelete)
119 self.assertEqual(md_fk.onupdate, md_db_fk.onupdate)
120 elif isinstance(md_constraint, UniqueConstraint) and isinstance(
121 md_db_constraint, UniqueConstraint
122 ):
123 md_uniq: UniqueConstraint = md_constraint
124 md_db_uniq: UniqueConstraint = md_db_constraint
125 self.assertEqual(md_uniq.columns.keys(), md_db_uniq.columns.keys())
126 elif isinstance(md_constraint, CheckConstraint) and isinstance(
127 md_db_constraint, CheckConstraint
128 ):
129 md_check: CheckConstraint = md_constraint
130 md_db_check: CheckConstraint = md_db_constraint
131 self.assertEqual(str(md_check.sqltext), str(md_db_check.sqltext))
132 self.assertTrue(
133 (md_table.indexes and md_db_table.indexes)
134 or (not md_table.indexes and not md_table.indexes),
135 "Indexes not created correctly",
136 )
137 if md_table.indexes:
138 md_indexes = _sorted_indexes(md_table.indexes)
139 md_db_indexes = _sorted_indexes(md_db_table.indexes)
140 self.assertEqual(len(md_indexes), len(md_db_indexes))
141 for md_index, md_db_index in zip(md_table.indexes, md_db_table.indexes):
142 self.assertEqual(md_index.name, md_db_index.name)
143 self.assertEqual(md_index.columns.keys(), md_db_index.columns.keys())
145 def test_builder(self) -> None:
146 """Test that the information in the metadata object created by the
147 builder matches the data in the Felis schema used to create it.
148 """
149 sch = Schema.model_validate(self.yaml_data)
150 bld = MetaDataBuilder(sch, apply_schema_to_metadata=False)
151 md = bld.build()
153 self.assertEqual(len(sch.tables), len(md.tables))
154 self.assertEqual([table.name for table in sch.tables], list(md.tables.keys()))
155 for table in sch.tables:
156 md_table = md.tables[table.name]
157 self.assertEqual(table.name, md_table.name)
158 self.assertEqual(len(table.columns), len(md_table.columns))
159 for column in table.columns:
160 md_table_column = md_table.columns[column.name]
161 datatype = get_datatype_with_variants(column)
162 self.assertEqual(type(datatype), type(md_table_column.type))
163 if column.nullable is not None:
164 self.assertEqual(column.nullable, md_table_column.nullable)
165 for constraint in table.constraints:
166 md_constraint = [mdc for mdc in md_table.constraints if mdc.name == constraint.name][0]
167 if isinstance(constraint, dm.ForeignKeyConstraint):
168 self.assertTrue(isinstance(md_constraint, ForeignKeyConstraint))
169 self.assertTrue(
170 sorted([sch[column_id].name for column_id in constraint.columns]),
171 sorted(md_constraint.columns.keys()),
172 )
173 elif isinstance(constraint, dm.UniqueConstraint):
174 self.assertEqual(
175 sorted([sch[column_id].name for column_id in constraint.columns]),
176 sorted(md_constraint.columns.keys()),
177 )
178 elif isinstance(constraint, dm.CheckConstraint):
179 self.assertEqual(constraint.expression, str(md_constraint.sqltext))
180 for index in table.indexes:
181 md_index = [mdi for mdi in md_table.indexes if mdi.name == index.name][0]
182 self.assertEqual(
183 sorted([sch[column_id].name for column_id in index.columns]),
184 sorted(md_index.columns.keys()),
185 )
186 if table.primary_key:
187 if isinstance(table.primary_key, str):
188 primary_keys = [sch[table.primary_key].name]
189 else:
190 primary_keys = [sch[pk].name for pk in table.primary_key]
191 for primary_key in primary_keys:
192 self.assertTrue(md_table.columns[primary_key].primary_key)
194 def test_timestamp(self) -> None:
195 """Test that the `timestamp` datatype is created correctly."""
196 for precision in [None, 6]:
197 col = dm.Column(
198 **{
199 "name": "timestamp_test",
200 "id": "#timestamp_test",
201 "datatype": "timestamp",
202 "precision": precision,
203 }
204 )
205 datatype = get_datatype_with_variants(col)
206 variant_dict = datatype._variant_mapping
207 self.assertTrue("mysql" in variant_dict)
208 self.assertTrue("postgresql" in variant_dict)
209 pg_timestamp = variant_dict["postgresql"]
210 self.assertEqual(pg_timestamp.timezone, False)
211 self.assertEqual(pg_timestamp.precision, precision)
212 mysql_timestamp = variant_dict["mysql"]
213 self.assertEqual(mysql_timestamp.timezone, False)
214 self.assertEqual(mysql_timestamp.fsp, precision)
216 def test_ignore_constraints(self) -> None:
217 """Test that constraints are not created when the
218 ``ignore_constraints`` flag is set on the metadata builder.
219 """
220 schema = Schema.model_validate(self.yaml_data)
221 schema.name = "main"
222 builder = MetaDataBuilder(schema, ignore_constraints=True)
223 md = builder.build()
224 for table in md.tables.values():
225 non_primary_key_constraints = [
226 c for c in table.constraints if not isinstance(c, PrimaryKeyConstraint)
227 ]
228 self.assertEqual(
229 len(non_primary_key_constraints),
230 0,
231 msg=f"Table {table.name} has non-primary key constraints defined",
232 )
234 def test_table_name_postfix(self) -> None:
235 """Test that table name postfixes are correctly applied."""
236 schema = Schema.model_validate(self.yaml_data)
237 schema.name = "main"
238 builder = MetaDataBuilder(schema, table_name_postfix="_test")
239 md = builder.build()
240 for table in md.tables.values():
241 self.assertTrue(table.name.endswith("_test"))
243 def test_fk_actions(self) -> None:
244 """Test that foreign key constraints with on delete and on update
245 actions are created correctly.
246 """
247 schema = Schema.model_validate(self.yaml_data)
248 schema.name = "main"
249 builder = MetaDataBuilder(schema)
250 md = builder.build()
252 for table in md.tables.values():
253 for constraint in table.constraints:
254 if isinstance(constraint, ForeignKeyConstraint):
255 self.assertIn(constraint.ondelete, ["SET NULL"])
256 self.assertIn(constraint.onupdate, ["CASCADE"])
259if __name__ == "__main__": 259 ↛ 260line 259 didn't jump to line 260 because the condition on line 259 was never true
260 unittest.main()