Coverage for tests / test_tap_schema.py: 16%
454 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 08:42 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 08:42 +0000
1# This file is part of felis.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22import os
23import shutil
24import tempfile
25import unittest
26from typing import Any
28from sqlalchemy import select
30from felis.datamodel import Schema
31from felis.db.database_context import create_database_context
32from felis.tap_schema import DataLoader, TableManager
34TEST_DIR = os.path.dirname(__file__)
35TEST_SALES = os.path.join(TEST_DIR, "data", "sales.yaml")
36TEST_TAP_SCHEMA = os.path.join(TEST_DIR, "data", "test_tap_schema.yaml")
37TEST_COMPOSITE_KEYS = os.path.join(TEST_DIR, "data", "test_composite_keys.yaml")
40class TableManagerTestCase(unittest.TestCase):
41 """Test the `TableManager` class."""
43 def setUp(self) -> None:
44 """Set up the test case."""
45 with open(TEST_SALES) as test_file:
46 self.schema = Schema.from_stream(test_file)
48 def test_create_table_manager(self) -> None:
49 """Test the TAP table manager class."""
50 mgr = TableManager()
52 schema_name = mgr.schema_name
54 # Check the created metadata and tables.
55 self.assertNotEqual(len(mgr.metadata.tables), 0)
56 # For SQLite (default), metadata.schema is None but schema_name is set
57 expected_metadata_schema = None if not mgr.apply_schema_to_metadata else schema_name
58 self.assertEqual(mgr.metadata.schema, expected_metadata_schema)
59 self.assertEqual(mgr.schema_name, "TAP_SCHEMA") # schema_name should always be set
60 for table_name in mgr.get_table_names_std():
61 mgr[table_name]
63 # Make sure that creating a new table manager works when one has
64 # already been created.
65 mgr = TableManager()
67 def test_table_name_postfix(self) -> None:
68 """Test the table name postfix."""
69 mgr = TableManager(table_name_postfix="_test")
70 for table_name in mgr.metadata.tables:
71 self.assertTrue(table_name.endswith("_test"))
74class DataLoaderTestCase(unittest.TestCase):
75 """Test the `DataLoader` class."""
77 def setUp(self) -> None:
78 """Set up the test case."""
79 with open(TEST_TAP_SCHEMA) as test_file:
80 self.schema = Schema.from_stream(test_file, context={"id_generation": True})
82 self.tmpdir = tempfile.mkdtemp(dir=TEST_DIR)
84 def tearDown(self) -> None:
85 """Clean up temporary directory."""
86 shutil.rmtree(self.tmpdir, ignore_errors=True)
88 def test_sqlite(self) -> None:
89 """Test the `DataLoader` using an in-memory SQLite database."""
90 mgr = TableManager()
91 with create_database_context("sqlite:///:memory:", mgr.metadata) as db_ctx:
92 mgr.initialize_database(db_ctx)
94 loader = DataLoader(self.schema, mgr, db_context=db_ctx)
95 loader.load()
97 def test_sql_output(self) -> None:
98 """Test printing SQL to stdout and writing SQL to a file."""
99 mgr = TableManager()
100 with create_database_context("sqlite:///:memory:", mgr.metadata) as db_ctx:
101 loader = DataLoader(self.schema, mgr, db_ctx, dry_run=True, print_sql=True)
102 loader.load()
104 sql_path = os.path.join(self.tmpdir, "test_tap_schema_print_sql.sql")
105 with open(sql_path, "w") as sql_file:
106 loader = DataLoader(
107 self.schema, mgr, db_ctx, dry_run=True, print_sql=True, output_file=sql_file
108 )
109 loader.load()
111 self.assertTrue(os.path.exists(sql_path))
112 with open(sql_path) as sql_file:
113 sql_data = sql_file.read()
114 insert_count = sql_data.count("INSERT INTO")
115 self.assertEqual(
116 insert_count,
117 22,
118 f"Expected 22 'INSERT INTO' statements, found {insert_count}",
119 )
121 def test_unique_keys(self) -> None:
122 """Test generation of unique foreign keys."""
123 mgr = TableManager()
124 with create_database_context("sqlite:///:memory:", mgr.metadata) as db_ctx:
125 mgr.initialize_database(db_ctx)
127 loader = DataLoader(self.schema, mgr, db_context=db_ctx, unique_keys=True)
128 loader.load()
130 keys_data = mgr.select(db_ctx, "keys")
131 self.assertGreaterEqual(len(keys_data), 1)
132 for row in keys_data:
133 self.assertTrue(row["key_id"].startswith(f"{self.schema.name}_"))
135 key_columns_data = mgr.select(db_ctx, "key_columns")
136 self.assertGreaterEqual(len(key_columns_data), 1)
137 for row in key_columns_data:
138 self.assertTrue(row["key_id"].startswith(f"{self.schema.name}_"))
140 def test_select_with_filter(self) -> None:
141 """Test selecting rows with a filter."""
142 mgr = TableManager()
143 with create_database_context("sqlite:///:memory:", mgr.metadata) as db_ctx:
144 mgr.initialize_database(db_ctx)
145 loader = DataLoader(self.schema, mgr, db_context=db_ctx, unique_keys=True)
146 loader.load()
148 rows = mgr.select(db_ctx, "columns", "table_name = 'test_schema.table1'")
149 self.assertEqual(len(rows), 16)
152def _find_row(rows: list[dict[str, Any]], column_name: str, value: str) -> dict[str, Any]:
153 next_row = next(
154 (row for row in rows if row[column_name] == value),
155 None,
156 )
157 assert next_row is not None
158 assert isinstance(next_row, dict)
159 return next_row
162class TapSchemaSqliteSetup:
163 """Set up the TAP_SCHEMA SQLite database for testing.
165 Parameters
166 ----------
167 test_file_path:
168 Path to the TAP_SCHEMA test file.
170 context
171 Context for the schema. Default is an empty dictionary.
172 """
174 def __init__(self, test_file_path: str, context: dict = {}) -> None:
175 with open(test_file_path) as test_file:
176 self._schema = Schema.from_stream(test_file, context=context)
178 mgr = TableManager()
179 # Create context manager but don't enter it yet - tests will do that
180 self._mgr = mgr
181 self._metadata = mgr.metadata
183 @property
184 def schema(self) -> Schema:
185 """Return the schema."""
186 return self._schema
188 @property
189 def mgr(self) -> TableManager:
190 """Return the table manager."""
191 return self._mgr
193 @property
194 def metadata(self) -> Any:
195 """Return the metadata."""
196 return self._metadata
199class TapSchemaDataTest(unittest.TestCase):
200 """Test the validity of generated TAP SCHEMA data."""
202 def setUp(self) -> None:
203 """Set up the test case."""
204 self.tap_schema_setup = TapSchemaSqliteSetup(TEST_TAP_SCHEMA, context={"id_generation": True})
206 def test_schemas(self) -> None:
207 with create_database_context("sqlite:///:memory:", self.tap_schema_setup.metadata) as db_ctx:
208 self.tap_schema_setup.mgr.initialize_database(db_ctx)
209 loader = DataLoader(
210 self.tap_schema_setup.schema,
211 self.tap_schema_setup.mgr,
212 db_context=db_ctx,
213 tap_schema_index=2,
214 )
215 loader.load()
217 schemas_table = self.tap_schema_setup.mgr["schemas"]
218 with db_ctx.engine.connect() as connection:
219 result = connection.execute(select(schemas_table))
220 schema_data = [row._asdict() for row in result]
222 self.assertEqual(len(schema_data), 1)
224 schema = schema_data[0]
225 self.assertEqual(schema["schema_name"], "test_schema")
226 self.assertEqual(schema["description"], "Test schema")
227 self.assertEqual(schema["utype"], "Schema")
228 self.assertEqual(schema["schema_index"], 2)
230 def test_tables(self) -> None:
231 with create_database_context("sqlite:///:memory:", self.tap_schema_setup.metadata) as db_ctx:
232 self.tap_schema_setup.mgr.initialize_database(db_ctx)
233 loader = DataLoader(
234 self.tap_schema_setup.schema, self.tap_schema_setup.mgr, db_context=db_ctx, tap_schema_index=2
235 )
236 loader.load()
238 tables_table = self.tap_schema_setup.mgr["tables"]
239 with db_ctx.engine.connect() as connection:
240 result = connection.execute(select(tables_table))
241 table_data = [row._asdict() for row in result]
243 self.assertEqual(len(table_data), 2)
245 table = table_data[0]
246 assert isinstance(table, dict)
247 self.assertEqual(table["schema_name"], "test_schema")
248 self.assertEqual(table["table_name"], f"{self.tap_schema_setup.schema.name}.table1")
249 self.assertEqual(table["table_type"], "table")
250 self.assertEqual(table["utype"], "Table")
251 self.assertEqual(table["description"], "Test table 1")
252 self.assertEqual(table["table_index"], 2)
254 def test_columns(self) -> None:
255 with create_database_context("sqlite:///:memory:", self.tap_schema_setup.metadata) as db_ctx:
256 self.tap_schema_setup.mgr.initialize_database(db_ctx)
257 loader = DataLoader(
258 self.tap_schema_setup.schema, self.tap_schema_setup.mgr, db_context=db_ctx, tap_schema_index=2
259 )
260 loader.load()
262 columns_table = self.tap_schema_setup.mgr["columns"]
263 with db_ctx.engine.connect() as connection:
264 result = connection.execute(select(columns_table))
265 column_data = [row._asdict() for row in result]
267 table1_rows = [
268 row for row in column_data if row["table_name"] == f"{self.tap_schema_setup.schema.name}.table1"
269 ]
270 self.assertNotEqual(len(table1_rows), 0)
272 boolean_col = _find_row(table1_rows, "column_name", "boolean_field")
273 self.assertEqual(boolean_col["datatype"], "boolean")
274 self.assertEqual(boolean_col["arraysize"], None)
276 byte_col = _find_row(table1_rows, "column_name", "byte_field")
277 self.assertEqual(byte_col["datatype"], "unsignedByte")
278 self.assertEqual(byte_col["arraysize"], None)
280 short_col = _find_row(table1_rows, "column_name", "short_field")
281 self.assertEqual(short_col["datatype"], "short")
282 self.assertEqual(short_col["arraysize"], None)
284 int_col = _find_row(table1_rows, "column_name", "int_field")
285 self.assertEqual(int_col["datatype"], "int")
286 self.assertEqual(int_col["arraysize"], None)
288 float_col = _find_row(table1_rows, "column_name", "float_field")
289 self.assertEqual(float_col["datatype"], "float")
290 self.assertEqual(float_col["arraysize"], None)
292 double_col = _find_row(table1_rows, "column_name", "double_field")
293 self.assertEqual(double_col["datatype"], "double")
294 self.assertEqual(double_col["arraysize"], None)
296 long_col = _find_row(table1_rows, "column_name", "long_field")
297 self.assertEqual(long_col["datatype"], "long")
298 self.assertEqual(long_col["arraysize"], None)
300 unicode_col = _find_row(table1_rows, "column_name", "unicode_field")
301 self.assertEqual(unicode_col["datatype"], "unicodeChar")
302 self.assertEqual(unicode_col["arraysize"], "128*")
304 binary_col = _find_row(table1_rows, "column_name", "binary_field")
305 self.assertEqual(binary_col["datatype"], "unsignedByte")
306 self.assertEqual(binary_col["arraysize"], "1024*")
308 ts = _find_row(table1_rows, "column_name", "timestamp_field")
309 self.assertEqual(ts["datatype"], "char")
310 self.assertEqual(ts["xtype"], "timestamp")
311 self.assertEqual(ts["description"], "Timestamp field")
312 self.assertEqual(ts["utype"], "Obs:Timestamp")
313 self.assertEqual(ts["unit"], "s")
314 self.assertEqual(ts["ucd"], "time.epoch")
315 self.assertEqual(ts["principal"], 1)
316 self.assertEqual(ts["std"], 1)
317 self.assertEqual(ts["column_index"], 42)
318 self.assertEqual(ts["size"], None)
319 self.assertEqual(ts["arraysize"], "*")
321 char_col = _find_row(table1_rows, "column_name", "char_field")
322 self.assertEqual(char_col["datatype"], "char")
323 self.assertEqual(char_col["arraysize"], "64")
325 str_col = _find_row(table1_rows, "column_name", "string_field")
326 self.assertEqual(str_col["datatype"], "char")
327 self.assertEqual(str_col["arraysize"], "256*")
329 txt_col = _find_row(table1_rows, "column_name", "text_field")
330 self.assertEqual(txt_col["datatype"], "char")
331 self.assertEqual(txt_col["arraysize"], "*")
333 def test_keys(self) -> None:
334 with create_database_context("sqlite:///:memory:", self.tap_schema_setup.metadata) as db_ctx:
335 self.tap_schema_setup.mgr.initialize_database(db_ctx)
336 loader = DataLoader(
337 self.tap_schema_setup.schema,
338 self.tap_schema_setup.mgr,
339 db_context=db_ctx,
340 tap_schema_index=2,
341 )
342 loader.load()
344 keys_table = self.tap_schema_setup.mgr["keys"]
345 with db_ctx.engine.connect() as connection:
346 result = connection.execute(select(keys_table))
347 key_data = [row._asdict() for row in result]
349 self.assertEqual(len(key_data), 1)
351 key = key_data[0]
352 assert isinstance(key, dict)
354 self.assertEqual(key["key_id"], "fk_table1_to_table2")
355 self.assertEqual(key["from_table"], f"{self.tap_schema_setup.schema.name}.table1")
356 self.assertEqual(key["target_table"], f"{self.tap_schema_setup.schema.name}.table2")
357 self.assertEqual(key["description"], "Foreign key from table1 to table2")
358 self.assertEqual(key["utype"], "ForeignKey")
360 def test_key_columns(self) -> None:
361 with create_database_context("sqlite:///:memory:", self.tap_schema_setup.metadata) as db_ctx:
362 self.tap_schema_setup.mgr.initialize_database(db_ctx)
363 loader = DataLoader(
364 self.tap_schema_setup.schema,
365 self.tap_schema_setup.mgr,
366 db_context=db_ctx,
367 tap_schema_index=2,
368 )
369 loader.load()
371 key_columns_table = self.tap_schema_setup.mgr["key_columns"]
372 with db_ctx.engine.connect() as connection:
373 result = connection.execute(select(key_columns_table))
374 key_column_data = [row._asdict() for row in result]
376 self.assertEqual(len(key_column_data), 1)
378 key_column = key_column_data[0]
379 assert isinstance(key_column, dict)
381 self.assertEqual(key_column["key_id"], "fk_table1_to_table2")
382 self.assertEqual(key_column["from_column"], "fk")
383 self.assertEqual(key_column["target_column"], "id")
385 def test_bad_table_name(self) -> None:
386 """Test getting a bad TAP_SCHEMA table name."""
387 with self.assertRaises(KeyError):
388 self.tap_schema_setup.mgr["bad_table"]
391class ForceUnboundArraySizeTest(unittest.TestCase):
392 """Test that arraysize for appropriate types is set to '*' when the
393 ``force_unboundeded_arraysize`` context flag is set to ``True``.
394 """
396 def setUp(self) -> None:
397 """Set up the test case."""
398 self.tap_schema_setup = TapSchemaSqliteSetup(
399 TEST_TAP_SCHEMA, context={"id_generation": True, "force_unbounded_arraysize": True}
400 )
402 def test_force_unbounded_arraysize(self) -> None:
403 """Test that unbounded arraysize is set to None."""
404 with create_database_context("sqlite:///:memory:", self.tap_schema_setup.metadata) as db_ctx:
405 self.tap_schema_setup.mgr.initialize_database(db_ctx)
406 loader = DataLoader(
407 self.tap_schema_setup.schema, self.tap_schema_setup.mgr, db_context=db_ctx, tap_schema_index=2
408 )
409 loader.load()
411 columns_table = self.tap_schema_setup.mgr["columns"]
412 with db_ctx.engine.connect() as connection:
413 result = connection.execute(select(columns_table))
414 column_data = [row._asdict() for row in result]
416 table1_rows = [
417 row for row in column_data if row["table_name"] == f"{self.tap_schema_setup.schema.name}.table1"
418 ]
419 for row in table1_rows:
420 if row["column_name"] in ["string_field", "text_field", "unicode_field", "binary_field"]:
421 self.assertEqual(row["arraysize"], "*")
424class CompositeKeysTestCase(unittest.TestCase):
425 """Test the handling of composite foreign keys."""
427 def setUp(self) -> None:
428 """Set up the test case."""
429 self.tap_schema_setup = TapSchemaSqliteSetup(TEST_COMPOSITE_KEYS, context={"id_generation": True})
431 # Set up the data in a context manager
432 with create_database_context("sqlite:///:memory:", self.tap_schema_setup.metadata) as db_ctx:
433 self.tap_schema_setup.mgr.initialize_database(db_ctx)
434 loader = DataLoader(
435 self.tap_schema_setup.schema, self.tap_schema_setup.mgr, db_context=db_ctx, tap_schema_index=2
436 )
437 loader.load()
439 # Fetch the keys and key_columns data from the TAP_SCHEMA tables.
440 keys_table = self.tap_schema_setup.mgr["keys"]
441 key_columns_table = self.tap_schema_setup.mgr["key_columns"]
442 with db_ctx.engine.connect() as connection:
443 key_columns_result = connection.execute(select(key_columns_table))
444 self.key_columns_data = [row._asdict() for row in key_columns_result]
446 keys_result = connection.execute(select(keys_table))
447 self.keys_data = [row._asdict() for row in keys_result]
449 def test_keys(self) -> None:
450 """Test that composite keys are handled correctly by inspecting the
451 data in the generated TAP_SCHEMA ``keys`` table.
452 """
453 print(f"\nComposite keys data: {self.keys_data}")
455 self.assertEqual(len(self.keys_data), 1)
457 self.assertEqual(
458 self.keys_data[0],
459 {
460 "key_id": "fk_composite",
461 "from_table": "test_composite_keys.table1",
462 "target_table": "test_composite_keys.table2",
463 "utype": "ForeignKey",
464 "description": "Composite foreign key from table1 to table2",
465 },
466 )
468 def test_key_columns(self) -> None:
469 """Test that composite keys are handled correctly by inspecting the
470 data in the generated TAP_SCHEMA ``key_columns`` table.
471 """
472 print(f"\nComposite key columns data: {self.key_columns_data}")
474 self.assertEqual(len(self.key_columns_data), 2)
476 key_columns_row1 = self.key_columns_data[0]
477 self.assertEqual(
478 key_columns_row1, {"key_id": "fk_composite", "from_column": "id1", "target_column": "id1"}
479 )
481 key_columns_row2 = self.key_columns_data[1]
482 self.assertEqual(
483 key_columns_row2, {"key_id": "fk_composite", "from_column": "id2", "target_column": "id2"}
484 )
487class ColumnRefsTestCase(unittest.TestCase):
488 """Test case that column references are applied correctly in TAP_SCHEMA."""
490 def setUp(self) -> None:
491 """Set up the test case."""
492 self.temp_dir = tempfile.mkdtemp()
494 # Write out source schema file
495 source_schema_content = """
496name: source_schema
497tables:
498- name: source_table
499 columns:
500 - name: ref_col1
501 datatype: int
502 - name: ref_col2
503 datatype: string
504 length: 64
505 - name: ref_col3
506 datatype: float
507"""
508 source_schema_path = os.path.join(self.temp_dir, "source_schema.yaml")
509 with open(source_schema_path, "w") as f:
510 f.write(source_schema_content.strip())
512 # Write out referencing schema file
513 ref_schema_content = """
514name: ref_schema
515resources:
516 source_schema:
517 uri: {resource_path}
518tables:
519- name: ref_table
520 columnRefs:
521 source_schema:
522 source_table:
523 ref_col1:
524 ref_col2:
525 col3:
526 ref_name: ref_col3
527"""
528 ref_schema_path = os.path.join(self.temp_dir, "ref_schema.yaml")
529 ref_content = ref_schema_content.format(resource_path=source_schema_path)
530 with open(ref_schema_path, "w") as f:
531 f.write(ref_content.strip())
533 self.tap_schema_setup = TapSchemaSqliteSetup(
534 ref_schema_path, context={"id_generation": True, "column_ref_index_increment": 1}
535 )
537 def test_column_refs(self) -> None:
538 """Test that column references are processed correctly."""
539 with create_database_context("sqlite:///:memory:", self.tap_schema_setup.metadata) as db_ctx:
540 # Create the TAP_SCHEMA database and load the data
541 self.tap_schema_setup.mgr.initialize_database(db_ctx)
542 loader = DataLoader(
543 self.tap_schema_setup.schema, self.tap_schema_setup.mgr, db_context=db_ctx, tap_schema_index=2
544 )
545 loader.load()
547 columns_table = self.tap_schema_setup.mgr["columns"]
548 with db_ctx.engine.connect() as connection:
549 result = connection.execute(select(columns_table))
550 rows = [row._asdict() for row in result]
551 self.assertEqual(len(rows), 3)
552 column_names = [column_data["column_name"] for column_data in rows]
553 self.assertEqual(set(column_names), {"ref_col1", "ref_col2", "col3"})
554 for column_data in rows:
555 # Check the generated indices of the referenced columns
556 if column_data["column_name"] == "ref_col1":
557 self.assertEqual(column_data["column_index"], 1)
558 elif column_data["column_name"] == "ref_col2":
559 self.assertEqual(column_data["column_index"], 2)
560 elif column_data["column_name"] == "col3":
561 self.assertEqual(column_data["column_index"], 3)
562 else:
563 self.fail(f"Unexpected column name: {column_data['column_name']}")
566class TableManagerExtensionsTestCase(unittest.TestCase):
567 """Test the `TableManager` class with extensions."""
569 def setUp(self) -> None:
570 """Set up the test case."""
571 self.tmpdir = tempfile.mkdtemp(dir=TEST_DIR)
573 self.extensions_path = os.path.join(self.tmpdir, "test_extensions.yaml")
574 extensions_content = """
575name: test_extensions
576description: Test TAP_SCHEMA extensions
578tables:
579 - name: schemas
580 description: Extensions to schemas table
581 columns:
582 - name: owner_id
583 datatype: char
584 length: 32
585 nullable: true
586 description: "Owner identifier"
587 - name: read_anon
588 datatype: int
589 nullable: true
590 description: "Anon read flag"
592 - name: tables
593 description: Extensions to tables table
594 columns:
595 - name: api_created
596 datatype: int
597 nullable: true
598 description: "API created flag"
599"""
600 with open(self.extensions_path, "w") as f:
601 f.write(extensions_content)
603 def tearDown(self) -> None:
604 """Clean up temporary directory."""
605 shutil.rmtree(self.tmpdir, ignore_errors=True)
607 def test_extensions_applied(self) -> None:
608 mgr = TableManager(extensions_path=self.extensions_path)
610 schemas_table = mgr["schemas"]
611 self.assertIn("owner_id", schemas_table.c)
612 self.assertIn("read_anon", schemas_table.c)
614 tables_table = mgr["tables"]
615 self.assertIn("api_created", tables_table.c)
617 def test_extensions_column_count(self) -> None:
618 mgr_without = TableManager()
619 mgr_with = TableManager(extensions_path=self.extensions_path)
621 schemas_before = len(mgr_without["schemas"].c)
622 schemas_after = len(mgr_with["schemas"].c)
623 self.assertEqual(schemas_after, schemas_before + 2)
625 tables_before = len(mgr_without["tables"].c)
626 tables_after = len(mgr_with["tables"].c)
627 self.assertEqual(tables_after, tables_before + 1)
629 def test_extensions_with_data_loader(self) -> None:
630 mgr = TableManager(extensions_path=self.extensions_path)
631 with create_database_context("sqlite:///:memory:", mgr.metadata) as db_ctx:
632 mgr.initialize_database(db_ctx)
634 with open(TEST_TAP_SCHEMA) as test_file:
635 schema = Schema.from_stream(test_file, context={"id_generation": True})
637 loader = DataLoader(schema, mgr, db_context=db_ctx)
638 loader.load()
640 schemas_table = mgr["schemas"]
641 with db_ctx.engine.connect() as connection:
642 result = connection.execute(select(schemas_table))
643 row = result.fetchone()
644 self.assertIn("owner_id", row._fields)
645 self.assertIn("read_anon", row._fields)
647 def test_invalid_extensions_file(self) -> None:
648 invalid_path = os.path.join(self.tmpdir, "nonexistent.yaml")
650 with self.assertRaises(ValueError):
651 TableManager(extensions_path=invalid_path)
653 def test_empty_extensions(self) -> None:
654 empty_extensions_path = os.path.join(self.tmpdir, "empty_extensions.yaml")
655 with open(empty_extensions_path, "w") as f:
656 f.write("name: empty_extensions\ntables: []\n")
658 mgr = TableManager(extensions_path=empty_extensions_path)
659 self.assertIsNotNone(mgr["schemas"])
661 def test_extensions_with_null_table_extensions(self) -> None:
662 null_extensions_path = os.path.join(self.tmpdir, "null_extensions.yaml")
663 with open(null_extensions_path, "w") as f:
664 f.write("""
665name: null_extensions
666tables:
667 - name: schemas
668 columns: []
669 - name: tables
670 columns: []
671 - name: columns
672 columns:
673 - name: test_col
674 datatype: int
675""")
677 mgr = TableManager(extensions_path=null_extensions_path)
679 columns_table = mgr["columns"]
680 self.assertIn("test_col", columns_table.c)
682 schemas_table = mgr["schemas"]
683 self.assertNotIn("owner_id", schemas_table.c)
685 def test_extensions_invalid_column_missing_name(self) -> None:
686 invalid_name_path = os.path.join(self.tmpdir, "invalid_name.yaml")
687 with open(invalid_name_path, "w") as f:
688 f.write("""
689 name: invalid_name
690 tables:
691 - name: schemas
692 columns:
693 - datatype: int
694 description: "Missing name"
695 - name: some_column
696 datatype: int
697 """)
698 with self.assertRaises(KeyError):
699 TableManager(extensions_path=invalid_name_path)
701 def test_extensions_column_id_auto_generation(self) -> None:
702 auto_id_path = os.path.join(self.tmpdir, "auto_id.yaml")
703 with open(auto_id_path, "w") as f:
704 f.write("""
705name: auto_id
706tables:
707 - name: schemas
708 columns:
709 - name: auto_id
710 datatype: int
711 nullable: false
712 description: "Column with auto_id"
713""")
715 mgr = TableManager(extensions_path=auto_id_path)
716 schemas_table = mgr["schemas"]
717 self.assertIn("auto_id", schemas_table.c)
719 def test_extensions_column_id_preserved(self) -> None:
720 explicit_id_path = os.path.join(self.tmpdir, "explicit_id.yaml")
721 with open(explicit_id_path, "w") as f:
722 f.write("""
723name: explicit_id
724tables:
725 - name: schemas
726 columns:
727 - name: explicit_id
728 datatype: int
729 "@id": "#custom.id.path"
730""")
732 mgr = TableManager(extensions_path=explicit_id_path)
733 schemas_table = mgr["schemas"]
734 self.assertIn("explicit_id", schemas_table.c)
736 def test_extensions_multiple_tables_extended(self) -> None:
737 multi_table_path = os.path.join(self.tmpdir, "multi_table.yaml")
738 with open(multi_table_path, "w") as f:
739 f.write("""
740name: multi_table
741tables:
742 - name: schemas
743 columns:
744 - name: schema_ext1
745 datatype: int
746 - name: schema_ext2
747 datatype: int
748 - name: tables
749 columns:
750 - name: table_ext1
751 datatype: int
752 - name: table_ext2
753 datatype: double
754 - name: columns
755 columns:
756 - name: col_ext1
757 datatype: int
758 - name: keys
759 columns:
760 - name: key_ext1
761 datatype: char
762 length: 128
763""")
765 mgr = TableManager(extensions_path=multi_table_path)
767 schemas_table = mgr["schemas"]
768 self.assertIn("schema_ext1", schemas_table.c)
769 self.assertIn("schema_ext2", schemas_table.c)
771 tables_table = mgr["tables"]
772 self.assertIn("table_ext1", tables_table.c)
773 self.assertIn("table_ext2", tables_table.c)
775 columns_table = mgr["columns"]
776 self.assertIn("col_ext1", columns_table.c)
778 keys_table = mgr["keys"]
779 self.assertIn("key_ext1", keys_table.c)
781 def test_extensions_nonexistent_table_skipped(self) -> None:
782 nonexistent_table_path = os.path.join(self.tmpdir, "nonexistent_table.yaml")
783 with open(nonexistent_table_path, "w") as f:
784 f.write("""
785name: test_extensions_nonexistent_table
786tables:
787 - name: schemas
788 columns:
789 - name: valid_ext
790 datatype: int
791 - name: nonexistent_table
792 columns:
793 - name: should_be_ignored
794 datatype: int
795""")
797 mgr = TableManager(extensions_path=nonexistent_table_path)
798 schemas_table = mgr["schemas"]
799 self.assertIn("valid_ext", schemas_table.c)
801 def test_extensions_column_properties_preserved(self) -> None:
802 full_props_path = os.path.join(self.tmpdir, "full_props.yaml")
803 with open(full_props_path, "w") as f:
804 f.write("""
805name: full_props
806tables:
807 - name: schemas
808 columns:
809 - name: full_property_column
810 datatype: char
811 length: 64
812 nullable: false
813 description: "Column with all properties"
814 "@id": "#tap_schema.schemas.full_property_column"
815""")
817 mgr = TableManager(extensions_path=full_props_path)
818 schemas_table = mgr["schemas"]
819 self.assertIn("full_property_column", schemas_table.c)
821 def test_extensions_apply_schema_to_metadata_true(self) -> None:
822 mgr = TableManager(
823 engine_url="postgresql://user:pass@localhost/db", extensions_path=self.extensions_path
824 )
825 schemas_table = mgr["schemas"]
826 self.assertIn("owner_id", schemas_table.c)
828 def test_extensions_apply_schema_to_metadata_false(self) -> None:
829 mgr = TableManager(extensions_path=self.extensions_path)
831 schemas_table = mgr["schemas"]
832 self.assertIn("owner_id", schemas_table.c)
833 self.assertIn("read_anon", schemas_table.c)
835 def test_extensions_with_table_name_postfix(self) -> None:
836 mgr = TableManager(extensions_path=self.extensions_path, table_name_postfix="_custom")
838 schemas_table = mgr["schemas"]
839 self.assertIn("owner_id", schemas_table.c)
841 def test_extensions_metadata_builder_called(self) -> None:
842 mgr = TableManager(extensions_path=self.extensions_path)
844 self.assertIsNotNone(mgr._metadata)
846 table_names = list(mgr.metadata.tables.keys())
847 found_schemas = any("schemas" in name for name in table_names)
848 found_tables = any("tables" in name and "schemas" not in name for name in table_names)
850 self.assertTrue(found_schemas, f"No schemas table found in {table_names}")
851 self.assertTrue(found_tables, f"No tables table found in {table_names}")
853 def test_extensions_preserve_original_columns(self) -> None:
854 mgr = TableManager(extensions_path=self.extensions_path)
856 schemas_table = mgr["schemas"]
857 column_names = [col.name for col in schemas_table.columns]
859 self.assertIn("schema_name", column_names)
860 self.assertIn("owner_id", column_names)
861 self.assertIn("read_anon", column_names)
863 def test_no_extensions_path_provided(self) -> None:
864 mgr = TableManager(extensions_path=None)
865 schemas_table = mgr["schemas"]
866 self.assertNotIn("owner_id", schemas_table.c)
868 def test_extensions_path_empty_string(self) -> None:
869 mgr = TableManager(extensions_path="")
870 schemas_table = mgr["schemas"]
871 self.assertNotIn("owner_id", schemas_table.c)
873 def test_extensions_file_not_found(self) -> None:
874 nonexistent_path = os.path.join(self.tmpdir, "does_not_exist.yaml")
875 with self.assertRaises(ValueError):
876 TableManager(extensions_path=nonexistent_path)
879if __name__ == "__main__": 879 ↛ 880line 879 didn't jump to line 880 because the condition on line 879 was never true
880 unittest.main()