Coverage for tests / test_tap_schema.py: 17%
421 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:37 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:37 +0000
1# This file is part of felis.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22import os
23import shutil
24import tempfile
25import unittest
26from typing import Any
28from sqlalchemy import select
30from felis.datamodel import Schema
31from felis.db.database_context import create_database_context
32from felis.tap_schema import DataLoader, TableManager
34TEST_DIR = os.path.dirname(__file__)
35TEST_SALES = os.path.join(TEST_DIR, "data", "sales.yaml")
36TEST_TAP_SCHEMA = os.path.join(TEST_DIR, "data", "test_tap_schema.yaml")
37TEST_COMPOSITE_KEYS = os.path.join(TEST_DIR, "data", "test_composite_keys.yaml")
40class TableManagerTestCase(unittest.TestCase):
41 """Test the `TableManager` class."""
43 def setUp(self) -> None:
44 """Set up the test case."""
45 with open(TEST_SALES) as test_file:
46 self.schema = Schema.from_stream(test_file)
48 def test_create_table_manager(self) -> None:
49 """Test the TAP table manager class."""
50 mgr = TableManager()
52 schema_name = mgr.schema_name
54 # Check the created metadata and tables.
55 self.assertNotEqual(len(mgr.metadata.tables), 0)
56 # For SQLite (default), metadata.schema is None but schema_name is set
57 expected_metadata_schema = None if not mgr.apply_schema_to_metadata else schema_name
58 self.assertEqual(mgr.metadata.schema, expected_metadata_schema)
59 self.assertEqual(mgr.schema_name, "TAP_SCHEMA") # schema_name should always be set
60 for table_name in mgr.get_table_names_std():
61 mgr[table_name]
63 # Make sure that creating a new table manager works when one has
64 # already been created.
65 mgr = TableManager()
67 def test_table_name_postfix(self) -> None:
68 """Test the table name postfix."""
69 mgr = TableManager(table_name_postfix="_test")
70 for table_name in mgr.metadata.tables:
71 self.assertTrue(table_name.endswith("_test"))
74class DataLoaderTestCase(unittest.TestCase):
75 """Test the `DataLoader` class."""
77 def setUp(self) -> None:
78 """Set up the test case."""
79 with open(TEST_TAP_SCHEMA) as test_file:
80 self.schema = Schema.from_stream(test_file, context={"id_generation": True})
82 self.tmpdir = tempfile.mkdtemp(dir=TEST_DIR)
84 def tearDown(self) -> None:
85 """Clean up temporary directory."""
86 shutil.rmtree(self.tmpdir, ignore_errors=True)
88 def test_sqlite(self) -> None:
89 """Test the `DataLoader` using an in-memory SQLite database."""
90 mgr = TableManager()
91 with create_database_context("sqlite:///:memory:", mgr.metadata) as db_ctx:
92 mgr.initialize_database(db_ctx)
94 loader = DataLoader(self.schema, mgr, db_context=db_ctx)
95 loader.load()
97 def test_sql_output(self) -> None:
98 """Test printing SQL to stdout and writing SQL to a file."""
99 mgr = TableManager()
100 with create_database_context("sqlite:///:memory:", mgr.metadata) as db_ctx:
101 loader = DataLoader(self.schema, mgr, db_ctx, dry_run=True, print_sql=True)
102 loader.load()
104 sql_path = os.path.join(self.tmpdir, "test_tap_schema_print_sql.sql")
105 with open(sql_path, "w") as sql_file:
106 loader = DataLoader(
107 self.schema, mgr, db_ctx, dry_run=True, print_sql=True, output_file=sql_file
108 )
109 loader.load()
111 self.assertTrue(os.path.exists(sql_path))
112 with open(sql_path) as sql_file:
113 sql_data = sql_file.read()
114 insert_count = sql_data.count("INSERT INTO")
115 self.assertEqual(
116 insert_count,
117 22,
118 f"Expected 22 'INSERT INTO' statements, found {insert_count}",
119 )
121 def test_unique_keys(self) -> None:
122 """Test generation of unique foreign keys."""
123 mgr = TableManager()
124 with create_database_context("sqlite:///:memory:", mgr.metadata) as db_ctx:
125 mgr.initialize_database(db_ctx)
127 loader = DataLoader(self.schema, mgr, db_context=db_ctx, unique_keys=True)
128 loader.load()
130 keys_data = mgr.select(db_ctx, "keys")
131 self.assertGreaterEqual(len(keys_data), 1)
132 for row in keys_data:
133 self.assertTrue(row["key_id"].startswith(f"{self.schema.name}_"))
135 key_columns_data = mgr.select(db_ctx, "key_columns")
136 self.assertGreaterEqual(len(key_columns_data), 1)
137 for row in key_columns_data:
138 self.assertTrue(row["key_id"].startswith(f"{self.schema.name}_"))
140 def test_select_with_filter(self) -> None:
141 """Test selecting rows with a filter."""
142 mgr = TableManager()
143 with create_database_context("sqlite:///:memory:", mgr.metadata) as db_ctx:
144 mgr.initialize_database(db_ctx)
145 loader = DataLoader(self.schema, mgr, db_context=db_ctx, unique_keys=True)
146 loader.load()
148 rows = mgr.select(db_ctx, "columns", "table_name = 'test_schema.table1'")
149 self.assertEqual(len(rows), 16)
152def _find_row(rows: list[dict[str, Any]], column_name: str, value: str) -> dict[str, Any]:
153 next_row = next(
154 (row for row in rows if row[column_name] == value),
155 None,
156 )
157 assert next_row is not None
158 assert isinstance(next_row, dict)
159 return next_row
162class TapSchemaSqliteSetup:
163 """Set up the TAP_SCHEMA SQLite database for testing.
165 Parameters
166 ----------
167 test_file_path:
168 Path to the TAP_SCHEMA test file.
170 context
171 Context for the schema. Default is an empty dictionary.
172 """
174 def __init__(self, test_file_path: str, context: dict = {}) -> None:
175 with open(test_file_path) as test_file:
176 self._schema = Schema.from_stream(test_file, context=context)
178 mgr = TableManager()
179 # Create context manager but don't enter it yet - tests will do that
180 self._mgr = mgr
181 self._metadata = mgr.metadata
183 @property
184 def schema(self) -> Schema:
185 """Return the schema."""
186 return self._schema
188 @property
189 def mgr(self) -> TableManager:
190 """Return the table manager."""
191 return self._mgr
193 @property
194 def metadata(self) -> Any:
195 """Return the metadata."""
196 return self._metadata
199class TapSchemaDataTest(unittest.TestCase):
200 """Test the validity of generated TAP SCHEMA data."""
202 def setUp(self) -> None:
203 """Set up the test case."""
204 self.tap_schema_setup = TapSchemaSqliteSetup(TEST_TAP_SCHEMA, context={"id_generation": True})
206 def test_schemas(self) -> None:
207 with create_database_context("sqlite:///:memory:", self.tap_schema_setup.metadata) as db_ctx:
208 self.tap_schema_setup.mgr.initialize_database(db_ctx)
209 loader = DataLoader(
210 self.tap_schema_setup.schema,
211 self.tap_schema_setup.mgr,
212 db_context=db_ctx,
213 tap_schema_index=2,
214 )
215 loader.load()
217 schemas_table = self.tap_schema_setup.mgr["schemas"]
218 with db_ctx.engine.connect() as connection:
219 result = connection.execute(select(schemas_table))
220 schema_data = [row._asdict() for row in result]
222 self.assertEqual(len(schema_data), 1)
224 schema = schema_data[0]
225 self.assertEqual(schema["schema_name"], "test_schema")
226 self.assertEqual(schema["description"], "Test schema")
227 self.assertEqual(schema["utype"], "Schema")
228 self.assertEqual(schema["schema_index"], 2)
230 def test_tables(self) -> None:
231 with create_database_context("sqlite:///:memory:", self.tap_schema_setup.metadata) as db_ctx:
232 self.tap_schema_setup.mgr.initialize_database(db_ctx)
233 loader = DataLoader(
234 self.tap_schema_setup.schema, self.tap_schema_setup.mgr, db_context=db_ctx, tap_schema_index=2
235 )
236 loader.load()
238 tables_table = self.tap_schema_setup.mgr["tables"]
239 with db_ctx.engine.connect() as connection:
240 result = connection.execute(select(tables_table))
241 table_data = [row._asdict() for row in result]
243 self.assertEqual(len(table_data), 2)
245 table = table_data[0]
246 assert isinstance(table, dict)
247 self.assertEqual(table["schema_name"], "test_schema")
248 self.assertEqual(table["table_name"], f"{self.tap_schema_setup.schema.name}.table1")
249 self.assertEqual(table["table_type"], "table")
250 self.assertEqual(table["utype"], "Table")
251 self.assertEqual(table["description"], "Test table 1")
252 self.assertEqual(table["table_index"], 2)
254 def test_columns(self) -> None:
255 with create_database_context("sqlite:///:memory:", self.tap_schema_setup.metadata) as db_ctx:
256 self.tap_schema_setup.mgr.initialize_database(db_ctx)
257 loader = DataLoader(
258 self.tap_schema_setup.schema, self.tap_schema_setup.mgr, db_context=db_ctx, tap_schema_index=2
259 )
260 loader.load()
262 columns_table = self.tap_schema_setup.mgr["columns"]
263 with db_ctx.engine.connect() as connection:
264 result = connection.execute(select(columns_table))
265 column_data = [row._asdict() for row in result]
267 table1_rows = [
268 row for row in column_data if row["table_name"] == f"{self.tap_schema_setup.schema.name}.table1"
269 ]
270 self.assertNotEqual(len(table1_rows), 0)
272 boolean_col = _find_row(table1_rows, "column_name", "boolean_field")
273 self.assertEqual(boolean_col["datatype"], "boolean")
274 self.assertEqual(boolean_col["arraysize"], None)
276 byte_col = _find_row(table1_rows, "column_name", "byte_field")
277 self.assertEqual(byte_col["datatype"], "unsignedByte")
278 self.assertEqual(byte_col["arraysize"], None)
280 short_col = _find_row(table1_rows, "column_name", "short_field")
281 self.assertEqual(short_col["datatype"], "short")
282 self.assertEqual(short_col["arraysize"], None)
284 int_col = _find_row(table1_rows, "column_name", "int_field")
285 self.assertEqual(int_col["datatype"], "int")
286 self.assertEqual(int_col["arraysize"], None)
288 float_col = _find_row(table1_rows, "column_name", "float_field")
289 self.assertEqual(float_col["datatype"], "float")
290 self.assertEqual(float_col["arraysize"], None)
292 double_col = _find_row(table1_rows, "column_name", "double_field")
293 self.assertEqual(double_col["datatype"], "double")
294 self.assertEqual(double_col["arraysize"], None)
296 long_col = _find_row(table1_rows, "column_name", "long_field")
297 self.assertEqual(long_col["datatype"], "long")
298 self.assertEqual(long_col["arraysize"], None)
300 unicode_col = _find_row(table1_rows, "column_name", "unicode_field")
301 self.assertEqual(unicode_col["datatype"], "unicodeChar")
302 self.assertEqual(unicode_col["arraysize"], "128*")
304 binary_col = _find_row(table1_rows, "column_name", "binary_field")
305 self.assertEqual(binary_col["datatype"], "unsignedByte")
306 self.assertEqual(binary_col["arraysize"], "1024*")
308 ts = _find_row(table1_rows, "column_name", "timestamp_field")
309 self.assertEqual(ts["datatype"], "char")
310 self.assertEqual(ts["xtype"], "timestamp")
311 self.assertEqual(ts["description"], "Timestamp field")
312 self.assertEqual(ts["utype"], "Obs:Timestamp")
313 self.assertEqual(ts["unit"], "s")
314 self.assertEqual(ts["ucd"], "time.epoch")
315 self.assertEqual(ts["principal"], 1)
316 self.assertEqual(ts["std"], 1)
317 self.assertEqual(ts["column_index"], 42)
318 self.assertEqual(ts["size"], None)
319 self.assertEqual(ts["arraysize"], "*")
321 char_col = _find_row(table1_rows, "column_name", "char_field")
322 self.assertEqual(char_col["datatype"], "char")
323 self.assertEqual(char_col["arraysize"], "64")
325 str_col = _find_row(table1_rows, "column_name", "string_field")
326 self.assertEqual(str_col["datatype"], "char")
327 self.assertEqual(str_col["arraysize"], "256*")
329 txt_col = _find_row(table1_rows, "column_name", "text_field")
330 self.assertEqual(txt_col["datatype"], "char")
331 self.assertEqual(txt_col["arraysize"], "*")
333 def test_keys(self) -> None:
334 with create_database_context("sqlite:///:memory:", self.tap_schema_setup.metadata) as db_ctx:
335 self.tap_schema_setup.mgr.initialize_database(db_ctx)
336 loader = DataLoader(
337 self.tap_schema_setup.schema,
338 self.tap_schema_setup.mgr,
339 db_context=db_ctx,
340 tap_schema_index=2,
341 )
342 loader.load()
344 keys_table = self.tap_schema_setup.mgr["keys"]
345 with db_ctx.engine.connect() as connection:
346 result = connection.execute(select(keys_table))
347 key_data = [row._asdict() for row in result]
349 self.assertEqual(len(key_data), 1)
351 key = key_data[0]
352 assert isinstance(key, dict)
354 self.assertEqual(key["key_id"], "fk_table1_to_table2")
355 self.assertEqual(key["from_table"], f"{self.tap_schema_setup.schema.name}.table1")
356 self.assertEqual(key["target_table"], f"{self.tap_schema_setup.schema.name}.table2")
357 self.assertEqual(key["description"], "Foreign key from table1 to table2")
358 self.assertEqual(key["utype"], "ForeignKey")
360 def test_key_columns(self) -> None:
361 with create_database_context("sqlite:///:memory:", self.tap_schema_setup.metadata) as db_ctx:
362 self.tap_schema_setup.mgr.initialize_database(db_ctx)
363 loader = DataLoader(
364 self.tap_schema_setup.schema,
365 self.tap_schema_setup.mgr,
366 db_context=db_ctx,
367 tap_schema_index=2,
368 )
369 loader.load()
371 key_columns_table = self.tap_schema_setup.mgr["key_columns"]
372 with db_ctx.engine.connect() as connection:
373 result = connection.execute(select(key_columns_table))
374 key_column_data = [row._asdict() for row in result]
376 self.assertEqual(len(key_column_data), 1)
378 key_column = key_column_data[0]
379 assert isinstance(key_column, dict)
381 self.assertEqual(key_column["key_id"], "fk_table1_to_table2")
382 self.assertEqual(key_column["from_column"], "fk")
383 self.assertEqual(key_column["target_column"], "id")
385 def test_bad_table_name(self) -> None:
386 """Test getting a bad TAP_SCHEMA table name."""
387 with self.assertRaises(KeyError):
388 self.tap_schema_setup.mgr["bad_table"]
391class ForceUnboundArraySizeTest(unittest.TestCase):
392 """Test that arraysize for appropriate types is set to '*' when the
393 ``force_unboundeded_arraysize`` context flag is set to ``True``.
394 """
396 def setUp(self) -> None:
397 """Set up the test case."""
398 self.tap_schema_setup = TapSchemaSqliteSetup(
399 TEST_TAP_SCHEMA, context={"id_generation": True, "force_unbounded_arraysize": True}
400 )
402 def test_force_unbounded_arraysize(self) -> None:
403 """Test that unbounded arraysize is set to None."""
404 with create_database_context("sqlite:///:memory:", self.tap_schema_setup.metadata) as db_ctx:
405 self.tap_schema_setup.mgr.initialize_database(db_ctx)
406 loader = DataLoader(
407 self.tap_schema_setup.schema, self.tap_schema_setup.mgr, db_context=db_ctx, tap_schema_index=2
408 )
409 loader.load()
411 columns_table = self.tap_schema_setup.mgr["columns"]
412 with db_ctx.engine.connect() as connection:
413 result = connection.execute(select(columns_table))
414 column_data = [row._asdict() for row in result]
416 table1_rows = [
417 row for row in column_data if row["table_name"] == f"{self.tap_schema_setup.schema.name}.table1"
418 ]
419 for row in table1_rows:
420 if row["column_name"] in ["string_field", "text_field", "unicode_field", "binary_field"]:
421 self.assertEqual(row["arraysize"], "*")
424class CompositeKeysTestCase(unittest.TestCase):
425 """Test the handling of composite foreign keys."""
427 def setUp(self) -> None:
428 """Set up the test case."""
429 self.tap_schema_setup = TapSchemaSqliteSetup(TEST_COMPOSITE_KEYS, context={"id_generation": True})
431 # Set up the data in a context manager
432 with create_database_context("sqlite:///:memory:", self.tap_schema_setup.metadata) as db_ctx:
433 self.tap_schema_setup.mgr.initialize_database(db_ctx)
434 loader = DataLoader(
435 self.tap_schema_setup.schema, self.tap_schema_setup.mgr, db_context=db_ctx, tap_schema_index=2
436 )
437 loader.load()
439 # Fetch the keys and key_columns data from the TAP_SCHEMA tables.
440 keys_table = self.tap_schema_setup.mgr["keys"]
441 key_columns_table = self.tap_schema_setup.mgr["key_columns"]
442 with db_ctx.engine.connect() as connection:
443 key_columns_result = connection.execute(select(key_columns_table))
444 self.key_columns_data = [row._asdict() for row in key_columns_result]
446 keys_result = connection.execute(select(keys_table))
447 self.keys_data = [row._asdict() for row in keys_result]
449 def test_keys(self) -> None:
450 """Test that composite keys are handled correctly by inspecting the
451 data in the generated TAP_SCHEMA ``keys`` table.
452 """
453 print(f"\nComposite keys data: {self.keys_data}")
455 self.assertEqual(len(self.keys_data), 1)
457 self.assertEqual(
458 self.keys_data[0],
459 {
460 "key_id": "fk_composite",
461 "from_table": "test_composite_keys.table1",
462 "target_table": "test_composite_keys.table2",
463 "utype": "ForeignKey",
464 "description": "Composite foreign key from table1 to table2",
465 },
466 )
468 def test_key_columns(self) -> None:
469 """Test that composite keys are handled correctly by inspecting the
470 data in the generated TAP_SCHEMA ``key_columns`` table.
471 """
472 print(f"\nComposite key columns data: {self.key_columns_data}")
474 self.assertEqual(len(self.key_columns_data), 2)
476 key_columns_row1 = self.key_columns_data[0]
477 self.assertEqual(
478 key_columns_row1, {"key_id": "fk_composite", "from_column": "id1", "target_column": "id1"}
479 )
481 key_columns_row2 = self.key_columns_data[1]
482 self.assertEqual(
483 key_columns_row2, {"key_id": "fk_composite", "from_column": "id2", "target_column": "id2"}
484 )
487class TableManagerExtensionsTestCase(unittest.TestCase):
488 """Test the `TableManager` class with extensions."""
490 def setUp(self) -> None:
491 """Set up the test case."""
492 self.tmpdir = tempfile.mkdtemp(dir=TEST_DIR)
494 self.extensions_path = os.path.join(self.tmpdir, "test_extensions.yaml")
495 extensions_content = """
496name: test_extensions
497description: Test TAP_SCHEMA extensions
499tables:
500 - name: schemas
501 description: Extensions to schemas table
502 columns:
503 - name: owner_id
504 datatype: char
505 length: 32
506 nullable: true
507 description: "Owner identifier"
508 - name: read_anon
509 datatype: int
510 nullable: true
511 description: "Anon read flag"
513 - name: tables
514 description: Extensions to tables table
515 columns:
516 - name: api_created
517 datatype: int
518 nullable: true
519 description: "API created flag"
520"""
521 with open(self.extensions_path, "w") as f:
522 f.write(extensions_content)
524 def tearDown(self) -> None:
525 """Clean up temporary directory."""
526 shutil.rmtree(self.tmpdir, ignore_errors=True)
528 def test_extensions_applied(self) -> None:
529 mgr = TableManager(extensions_path=self.extensions_path)
531 schemas_table = mgr["schemas"]
532 self.assertIn("owner_id", schemas_table.c)
533 self.assertIn("read_anon", schemas_table.c)
535 tables_table = mgr["tables"]
536 self.assertIn("api_created", tables_table.c)
538 def test_extensions_column_count(self) -> None:
539 mgr_without = TableManager()
540 mgr_with = TableManager(extensions_path=self.extensions_path)
542 schemas_before = len(mgr_without["schemas"].c)
543 schemas_after = len(mgr_with["schemas"].c)
544 self.assertEqual(schemas_after, schemas_before + 2)
546 tables_before = len(mgr_without["tables"].c)
547 tables_after = len(mgr_with["tables"].c)
548 self.assertEqual(tables_after, tables_before + 1)
550 def test_extensions_with_data_loader(self) -> None:
551 mgr = TableManager(extensions_path=self.extensions_path)
552 with create_database_context("sqlite:///:memory:", mgr.metadata) as db_ctx:
553 mgr.initialize_database(db_ctx)
555 with open(TEST_TAP_SCHEMA) as test_file:
556 schema = Schema.from_stream(test_file, context={"id_generation": True})
558 loader = DataLoader(schema, mgr, db_context=db_ctx)
559 loader.load()
561 schemas_table = mgr["schemas"]
562 with db_ctx.engine.connect() as connection:
563 result = connection.execute(select(schemas_table))
564 row = result.fetchone()
565 self.assertIn("owner_id", row._fields)
566 self.assertIn("read_anon", row._fields)
568 def test_invalid_extensions_file(self) -> None:
569 invalid_path = os.path.join(self.tmpdir, "nonexistent.yaml")
571 with self.assertRaises(ValueError):
572 TableManager(extensions_path=invalid_path)
574 def test_empty_extensions(self) -> None:
575 empty_extensions_path = os.path.join(self.tmpdir, "empty_extensions.yaml")
576 with open(empty_extensions_path, "w") as f:
577 f.write("name: empty_extensions\ntables: []\n")
579 mgr = TableManager(extensions_path=empty_extensions_path)
580 self.assertIsNotNone(mgr["schemas"])
582 def test_extensions_with_null_table_extensions(self) -> None:
583 null_extensions_path = os.path.join(self.tmpdir, "null_extensions.yaml")
584 with open(null_extensions_path, "w") as f:
585 f.write("""
586name: null_extensions
587tables:
588 - name: schemas
589 columns: []
590 - name: tables
591 columns: []
592 - name: columns
593 columns:
594 - name: test_col
595 datatype: int
596""")
598 mgr = TableManager(extensions_path=null_extensions_path)
600 columns_table = mgr["columns"]
601 self.assertIn("test_col", columns_table.c)
603 schemas_table = mgr["schemas"]
604 self.assertNotIn("owner_id", schemas_table.c)
606 def test_extensions_invalid_column_missing_name(self) -> None:
607 invalid_name_path = os.path.join(self.tmpdir, "invalid_name.yaml")
608 with open(invalid_name_path, "w") as f:
609 f.write("""
610 name: invalid_name
611 tables:
612 - name: schemas
613 columns:
614 - datatype: int
615 description: "Missing name"
616 - name: some_column
617 datatype: int
618 """)
619 with self.assertRaises(KeyError):
620 TableManager(extensions_path=invalid_name_path)
622 def test_extensions_column_id_auto_generation(self) -> None:
623 auto_id_path = os.path.join(self.tmpdir, "auto_id.yaml")
624 with open(auto_id_path, "w") as f:
625 f.write("""
626name: auto_id
627tables:
628 - name: schemas
629 columns:
630 - name: auto_id
631 datatype: int
632 nullable: false
633 description: "Column with auto_id"
634""")
636 mgr = TableManager(extensions_path=auto_id_path)
637 schemas_table = mgr["schemas"]
638 self.assertIn("auto_id", schemas_table.c)
640 def test_extensions_column_id_preserved(self) -> None:
641 explicit_id_path = os.path.join(self.tmpdir, "explicit_id.yaml")
642 with open(explicit_id_path, "w") as f:
643 f.write("""
644name: explicit_id
645tables:
646 - name: schemas
647 columns:
648 - name: explicit_id
649 datatype: int
650 "@id": "#custom.id.path"
651""")
653 mgr = TableManager(extensions_path=explicit_id_path)
654 schemas_table = mgr["schemas"]
655 self.assertIn("explicit_id", schemas_table.c)
657 def test_extensions_multiple_tables_extended(self) -> None:
658 multi_table_path = os.path.join(self.tmpdir, "multi_table.yaml")
659 with open(multi_table_path, "w") as f:
660 f.write("""
661name: multi_table
662tables:
663 - name: schemas
664 columns:
665 - name: schema_ext1
666 datatype: int
667 - name: schema_ext2
668 datatype: int
669 - name: tables
670 columns:
671 - name: table_ext1
672 datatype: int
673 - name: table_ext2
674 datatype: double
675 - name: columns
676 columns:
677 - name: col_ext1
678 datatype: int
679 - name: keys
680 columns:
681 - name: key_ext1
682 datatype: char
683 length: 128
684""")
686 mgr = TableManager(extensions_path=multi_table_path)
688 schemas_table = mgr["schemas"]
689 self.assertIn("schema_ext1", schemas_table.c)
690 self.assertIn("schema_ext2", schemas_table.c)
692 tables_table = mgr["tables"]
693 self.assertIn("table_ext1", tables_table.c)
694 self.assertIn("table_ext2", tables_table.c)
696 columns_table = mgr["columns"]
697 self.assertIn("col_ext1", columns_table.c)
699 keys_table = mgr["keys"]
700 self.assertIn("key_ext1", keys_table.c)
702 def test_extensions_nonexistent_table_skipped(self) -> None:
703 nonexistent_table_path = os.path.join(self.tmpdir, "nonexistent_table.yaml")
704 with open(nonexistent_table_path, "w") as f:
705 f.write("""
706name: test_extensions_nonexistent_table
707tables:
708 - name: schemas
709 columns:
710 - name: valid_ext
711 datatype: int
712 - name: nonexistent_table
713 columns:
714 - name: should_be_ignored
715 datatype: int
716""")
718 mgr = TableManager(extensions_path=nonexistent_table_path)
719 schemas_table = mgr["schemas"]
720 self.assertIn("valid_ext", schemas_table.c)
722 def test_extensions_column_properties_preserved(self) -> None:
723 full_props_path = os.path.join(self.tmpdir, "full_props.yaml")
724 with open(full_props_path, "w") as f:
725 f.write("""
726name: full_props
727tables:
728 - name: schemas
729 columns:
730 - name: full_property_column
731 datatype: char
732 length: 64
733 nullable: false
734 description: "Column with all properties"
735 "@id": "#tap_schema.schemas.full_property_column"
736""")
738 mgr = TableManager(extensions_path=full_props_path)
739 schemas_table = mgr["schemas"]
740 self.assertIn("full_property_column", schemas_table.c)
742 def test_extensions_apply_schema_to_metadata_true(self) -> None:
743 mgr = TableManager(
744 engine_url="postgresql://user:pass@localhost/db", extensions_path=self.extensions_path
745 )
746 schemas_table = mgr["schemas"]
747 self.assertIn("owner_id", schemas_table.c)
749 def test_extensions_apply_schema_to_metadata_false(self) -> None:
750 mgr = TableManager(extensions_path=self.extensions_path)
752 schemas_table = mgr["schemas"]
753 self.assertIn("owner_id", schemas_table.c)
754 self.assertIn("read_anon", schemas_table.c)
756 def test_extensions_with_table_name_postfix(self) -> None:
757 mgr = TableManager(extensions_path=self.extensions_path, table_name_postfix="_custom")
759 schemas_table = mgr["schemas"]
760 self.assertIn("owner_id", schemas_table.c)
762 def test_extensions_metadata_builder_called(self) -> None:
763 mgr = TableManager(extensions_path=self.extensions_path)
765 self.assertIsNotNone(mgr._metadata)
767 table_names = list(mgr.metadata.tables.keys())
768 found_schemas = any("schemas" in name for name in table_names)
769 found_tables = any("tables" in name and "schemas" not in name for name in table_names)
771 self.assertTrue(found_schemas, f"No schemas table found in {table_names}")
772 self.assertTrue(found_tables, f"No tables table found in {table_names}")
774 def test_extensions_preserve_original_columns(self) -> None:
775 mgr = TableManager(extensions_path=self.extensions_path)
777 schemas_table = mgr["schemas"]
778 column_names = [col.name for col in schemas_table.columns]
780 self.assertIn("schema_name", column_names)
781 self.assertIn("owner_id", column_names)
782 self.assertIn("read_anon", column_names)
784 def test_no_extensions_path_provided(self) -> None:
785 mgr = TableManager(extensions_path=None)
786 schemas_table = mgr["schemas"]
787 self.assertNotIn("owner_id", schemas_table.c)
789 def test_extensions_path_empty_string(self) -> None:
790 mgr = TableManager(extensions_path="")
791 schemas_table = mgr["schemas"]
792 self.assertNotIn("owner_id", schemas_table.c)
794 def test_extensions_file_not_found(self) -> None:
795 nonexistent_path = os.path.join(self.tmpdir, "does_not_exist.yaml")
796 with self.assertRaises(ValueError):
797 TableManager(extensions_path=nonexistent_path)
800if __name__ == "__main__": 800 ↛ 801line 800 didn't jump to line 801 because the condition on line 800 was never true
801 unittest.main()