Coverage for tests / test_tap_schema.py: 16%

454 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-07 08:14 +0000

1# This file is part of felis. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22import os 

23import shutil 

24import tempfile 

25import unittest 

26from typing import Any 

27 

28from sqlalchemy import select 

29 

30from felis.datamodel import Schema 

31from felis.db.database_context import create_database_context 

32from felis.tap_schema import DataLoader, TableManager 

33 

34TEST_DIR = os.path.dirname(__file__) 

35TEST_SALES = os.path.join(TEST_DIR, "data", "sales.yaml") 

36TEST_TAP_SCHEMA = os.path.join(TEST_DIR, "data", "test_tap_schema.yaml") 

37TEST_COMPOSITE_KEYS = os.path.join(TEST_DIR, "data", "test_composite_keys.yaml") 

38 

39 

40class TableManagerTestCase(unittest.TestCase): 

41 """Test the `TableManager` class.""" 

42 

43 def setUp(self) -> None: 

44 """Set up the test case.""" 

45 with open(TEST_SALES) as test_file: 

46 self.schema = Schema.from_stream(test_file) 

47 

48 def test_create_table_manager(self) -> None: 

49 """Test the TAP table manager class.""" 

50 mgr = TableManager() 

51 

52 schema_name = mgr.schema_name 

53 

54 # Check the created metadata and tables. 

55 self.assertNotEqual(len(mgr.metadata.tables), 0) 

56 # For SQLite (default), metadata.schema is None but schema_name is set 

57 expected_metadata_schema = None if not mgr.apply_schema_to_metadata else schema_name 

58 self.assertEqual(mgr.metadata.schema, expected_metadata_schema) 

59 self.assertEqual(mgr.schema_name, "TAP_SCHEMA") # schema_name should always be set 

60 for table_name in mgr.get_table_names_std(): 

61 mgr[table_name] 

62 

63 # Make sure that creating a new table manager works when one has 

64 # already been created. 

65 mgr = TableManager() 

66 

67 def test_table_name_postfix(self) -> None: 

68 """Test the table name postfix.""" 

69 mgr = TableManager(table_name_postfix="_test") 

70 for table_name in mgr.metadata.tables: 

71 self.assertTrue(table_name.endswith("_test")) 

72 

73 

74class DataLoaderTestCase(unittest.TestCase): 

75 """Test the `DataLoader` class.""" 

76 

77 def setUp(self) -> None: 

78 """Set up the test case.""" 

79 with open(TEST_TAP_SCHEMA) as test_file: 

80 self.schema = Schema.from_stream(test_file, context={"id_generation": True}) 

81 

82 self.tmpdir = tempfile.mkdtemp(dir=TEST_DIR) 

83 

84 def tearDown(self) -> None: 

85 """Clean up temporary directory.""" 

86 shutil.rmtree(self.tmpdir, ignore_errors=True) 

87 

88 def test_sqlite(self) -> None: 

89 """Test the `DataLoader` using an in-memory SQLite database.""" 

90 mgr = TableManager() 

91 with create_database_context("sqlite:///:memory:", mgr.metadata) as db_ctx: 

92 mgr.initialize_database(db_ctx) 

93 

94 loader = DataLoader(self.schema, mgr, db_context=db_ctx) 

95 loader.load() 

96 

97 def test_sql_output(self) -> None: 

98 """Test printing SQL to stdout and writing SQL to a file.""" 

99 mgr = TableManager() 

100 with create_database_context("sqlite:///:memory:", mgr.metadata) as db_ctx: 

101 loader = DataLoader(self.schema, mgr, db_ctx, dry_run=True, print_sql=True) 

102 loader.load() 

103 

104 sql_path = os.path.join(self.tmpdir, "test_tap_schema_print_sql.sql") 

105 with open(sql_path, "w") as sql_file: 

106 loader = DataLoader( 

107 self.schema, mgr, db_ctx, dry_run=True, print_sql=True, output_file=sql_file 

108 ) 

109 loader.load() 

110 

111 self.assertTrue(os.path.exists(sql_path)) 

112 with open(sql_path) as sql_file: 

113 sql_data = sql_file.read() 

114 insert_count = sql_data.count("INSERT INTO") 

115 self.assertEqual( 

116 insert_count, 

117 22, 

118 f"Expected 22 'INSERT INTO' statements, found {insert_count}", 

119 ) 

120 

121 def test_unique_keys(self) -> None: 

122 """Test generation of unique foreign keys.""" 

123 mgr = TableManager() 

124 with create_database_context("sqlite:///:memory:", mgr.metadata) as db_ctx: 

125 mgr.initialize_database(db_ctx) 

126 

127 loader = DataLoader(self.schema, mgr, db_context=db_ctx, unique_keys=True) 

128 loader.load() 

129 

130 keys_data = mgr.select(db_ctx, "keys") 

131 self.assertGreaterEqual(len(keys_data), 1) 

132 for row in keys_data: 

133 self.assertTrue(row["key_id"].startswith(f"{self.schema.name}_")) 

134 

135 key_columns_data = mgr.select(db_ctx, "key_columns") 

136 self.assertGreaterEqual(len(key_columns_data), 1) 

137 for row in key_columns_data: 

138 self.assertTrue(row["key_id"].startswith(f"{self.schema.name}_")) 

139 

140 def test_select_with_filter(self) -> None: 

141 """Test selecting rows with a filter.""" 

142 mgr = TableManager() 

143 with create_database_context("sqlite:///:memory:", mgr.metadata) as db_ctx: 

144 mgr.initialize_database(db_ctx) 

145 loader = DataLoader(self.schema, mgr, db_context=db_ctx, unique_keys=True) 

146 loader.load() 

147 

148 rows = mgr.select(db_ctx, "columns", "table_name = 'test_schema.table1'") 

149 self.assertEqual(len(rows), 16) 

150 

151 

152def _find_row(rows: list[dict[str, Any]], column_name: str, value: str) -> dict[str, Any]: 

153 next_row = next( 

154 (row for row in rows if row[column_name] == value), 

155 None, 

156 ) 

157 assert next_row is not None 

158 assert isinstance(next_row, dict) 

159 return next_row 

160 

161 

162class TapSchemaSqliteSetup: 

163 """Set up the TAP_SCHEMA SQLite database for testing. 

164 

165 Parameters 

166 ---------- 

167 test_file_path: 

168 Path to the TAP_SCHEMA test file. 

169 

170 context 

171 Context for the schema. Default is an empty dictionary. 

172 """ 

173 

174 def __init__(self, test_file_path: str, context: dict = {}) -> None: 

175 with open(test_file_path) as test_file: 

176 self._schema = Schema.from_stream(test_file, context=context) 

177 

178 mgr = TableManager() 

179 # Create context manager but don't enter it yet - tests will do that 

180 self._mgr = mgr 

181 self._metadata = mgr.metadata 

182 

183 @property 

184 def schema(self) -> Schema: 

185 """Return the schema.""" 

186 return self._schema 

187 

188 @property 

189 def mgr(self) -> TableManager: 

190 """Return the table manager.""" 

191 return self._mgr 

192 

193 @property 

194 def metadata(self) -> Any: 

195 """Return the metadata.""" 

196 return self._metadata 

197 

198 

199class TapSchemaDataTest(unittest.TestCase): 

200 """Test the validity of generated TAP SCHEMA data.""" 

201 

202 def setUp(self) -> None: 

203 """Set up the test case.""" 

204 self.tap_schema_setup = TapSchemaSqliteSetup(TEST_TAP_SCHEMA, context={"id_generation": True}) 

205 

206 def test_schemas(self) -> None: 

207 with create_database_context("sqlite:///:memory:", self.tap_schema_setup.metadata) as db_ctx: 

208 self.tap_schema_setup.mgr.initialize_database(db_ctx) 

209 loader = DataLoader( 

210 self.tap_schema_setup.schema, 

211 self.tap_schema_setup.mgr, 

212 db_context=db_ctx, 

213 tap_schema_index=2, 

214 ) 

215 loader.load() 

216 

217 schemas_table = self.tap_schema_setup.mgr["schemas"] 

218 with db_ctx.engine.connect() as connection: 

219 result = connection.execute(select(schemas_table)) 

220 schema_data = [row._asdict() for row in result] 

221 

222 self.assertEqual(len(schema_data), 1) 

223 

224 schema = schema_data[0] 

225 self.assertEqual(schema["schema_name"], "test_schema") 

226 self.assertEqual(schema["description"], "Test schema") 

227 self.assertEqual(schema["utype"], "Schema") 

228 self.assertEqual(schema["schema_index"], 2) 

229 

230 def test_tables(self) -> None: 

231 with create_database_context("sqlite:///:memory:", self.tap_schema_setup.metadata) as db_ctx: 

232 self.tap_schema_setup.mgr.initialize_database(db_ctx) 

233 loader = DataLoader( 

234 self.tap_schema_setup.schema, self.tap_schema_setup.mgr, db_context=db_ctx, tap_schema_index=2 

235 ) 

236 loader.load() 

237 

238 tables_table = self.tap_schema_setup.mgr["tables"] 

239 with db_ctx.engine.connect() as connection: 

240 result = connection.execute(select(tables_table)) 

241 table_data = [row._asdict() for row in result] 

242 

243 self.assertEqual(len(table_data), 2) 

244 

245 table = table_data[0] 

246 assert isinstance(table, dict) 

247 self.assertEqual(table["schema_name"], "test_schema") 

248 self.assertEqual(table["table_name"], f"{self.tap_schema_setup.schema.name}.table1") 

249 self.assertEqual(table["table_type"], "table") 

250 self.assertEqual(table["utype"], "Table") 

251 self.assertEqual(table["description"], "Test table 1") 

252 self.assertEqual(table["table_index"], 2) 

253 

254 def test_columns(self) -> None: 

255 with create_database_context("sqlite:///:memory:", self.tap_schema_setup.metadata) as db_ctx: 

256 self.tap_schema_setup.mgr.initialize_database(db_ctx) 

257 loader = DataLoader( 

258 self.tap_schema_setup.schema, self.tap_schema_setup.mgr, db_context=db_ctx, tap_schema_index=2 

259 ) 

260 loader.load() 

261 

262 columns_table = self.tap_schema_setup.mgr["columns"] 

263 with db_ctx.engine.connect() as connection: 

264 result = connection.execute(select(columns_table)) 

265 column_data = [row._asdict() for row in result] 

266 

267 table1_rows = [ 

268 row for row in column_data if row["table_name"] == f"{self.tap_schema_setup.schema.name}.table1" 

269 ] 

270 self.assertNotEqual(len(table1_rows), 0) 

271 

272 boolean_col = _find_row(table1_rows, "column_name", "boolean_field") 

273 self.assertEqual(boolean_col["datatype"], "boolean") 

274 self.assertEqual(boolean_col["arraysize"], None) 

275 

276 byte_col = _find_row(table1_rows, "column_name", "byte_field") 

277 self.assertEqual(byte_col["datatype"], "unsignedByte") 

278 self.assertEqual(byte_col["arraysize"], None) 

279 

280 short_col = _find_row(table1_rows, "column_name", "short_field") 

281 self.assertEqual(short_col["datatype"], "short") 

282 self.assertEqual(short_col["arraysize"], None) 

283 

284 int_col = _find_row(table1_rows, "column_name", "int_field") 

285 self.assertEqual(int_col["datatype"], "int") 

286 self.assertEqual(int_col["arraysize"], None) 

287 

288 float_col = _find_row(table1_rows, "column_name", "float_field") 

289 self.assertEqual(float_col["datatype"], "float") 

290 self.assertEqual(float_col["arraysize"], None) 

291 

292 double_col = _find_row(table1_rows, "column_name", "double_field") 

293 self.assertEqual(double_col["datatype"], "double") 

294 self.assertEqual(double_col["arraysize"], None) 

295 

296 long_col = _find_row(table1_rows, "column_name", "long_field") 

297 self.assertEqual(long_col["datatype"], "long") 

298 self.assertEqual(long_col["arraysize"], None) 

299 

300 unicode_col = _find_row(table1_rows, "column_name", "unicode_field") 

301 self.assertEqual(unicode_col["datatype"], "unicodeChar") 

302 self.assertEqual(unicode_col["arraysize"], "128*") 

303 

304 binary_col = _find_row(table1_rows, "column_name", "binary_field") 

305 self.assertEqual(binary_col["datatype"], "unsignedByte") 

306 self.assertEqual(binary_col["arraysize"], "1024*") 

307 

308 ts = _find_row(table1_rows, "column_name", "timestamp_field") 

309 self.assertEqual(ts["datatype"], "char") 

310 self.assertEqual(ts["xtype"], "timestamp") 

311 self.assertEqual(ts["description"], "Timestamp field") 

312 self.assertEqual(ts["utype"], "Obs:Timestamp") 

313 self.assertEqual(ts["unit"], "s") 

314 self.assertEqual(ts["ucd"], "time.epoch") 

315 self.assertEqual(ts["principal"], 1) 

316 self.assertEqual(ts["std"], 1) 

317 self.assertEqual(ts["column_index"], 42) 

318 self.assertEqual(ts["size"], None) 

319 self.assertEqual(ts["arraysize"], "*") 

320 

321 char_col = _find_row(table1_rows, "column_name", "char_field") 

322 self.assertEqual(char_col["datatype"], "char") 

323 self.assertEqual(char_col["arraysize"], "64") 

324 

325 str_col = _find_row(table1_rows, "column_name", "string_field") 

326 self.assertEqual(str_col["datatype"], "char") 

327 self.assertEqual(str_col["arraysize"], "256*") 

328 

329 txt_col = _find_row(table1_rows, "column_name", "text_field") 

330 self.assertEqual(txt_col["datatype"], "char") 

331 self.assertEqual(txt_col["arraysize"], "*") 

332 

333 def test_keys(self) -> None: 

334 with create_database_context("sqlite:///:memory:", self.tap_schema_setup.metadata) as db_ctx: 

335 self.tap_schema_setup.mgr.initialize_database(db_ctx) 

336 loader = DataLoader( 

337 self.tap_schema_setup.schema, 

338 self.tap_schema_setup.mgr, 

339 db_context=db_ctx, 

340 tap_schema_index=2, 

341 ) 

342 loader.load() 

343 

344 keys_table = self.tap_schema_setup.mgr["keys"] 

345 with db_ctx.engine.connect() as connection: 

346 result = connection.execute(select(keys_table)) 

347 key_data = [row._asdict() for row in result] 

348 

349 self.assertEqual(len(key_data), 1) 

350 

351 key = key_data[0] 

352 assert isinstance(key, dict) 

353 

354 self.assertEqual(key["key_id"], "fk_table1_to_table2") 

355 self.assertEqual(key["from_table"], f"{self.tap_schema_setup.schema.name}.table1") 

356 self.assertEqual(key["target_table"], f"{self.tap_schema_setup.schema.name}.table2") 

357 self.assertEqual(key["description"], "Foreign key from table1 to table2") 

358 self.assertEqual(key["utype"], "ForeignKey") 

359 

360 def test_key_columns(self) -> None: 

361 with create_database_context("sqlite:///:memory:", self.tap_schema_setup.metadata) as db_ctx: 

362 self.tap_schema_setup.mgr.initialize_database(db_ctx) 

363 loader = DataLoader( 

364 self.tap_schema_setup.schema, 

365 self.tap_schema_setup.mgr, 

366 db_context=db_ctx, 

367 tap_schema_index=2, 

368 ) 

369 loader.load() 

370 

371 key_columns_table = self.tap_schema_setup.mgr["key_columns"] 

372 with db_ctx.engine.connect() as connection: 

373 result = connection.execute(select(key_columns_table)) 

374 key_column_data = [row._asdict() for row in result] 

375 

376 self.assertEqual(len(key_column_data), 1) 

377 

378 key_column = key_column_data[0] 

379 assert isinstance(key_column, dict) 

380 

381 self.assertEqual(key_column["key_id"], "fk_table1_to_table2") 

382 self.assertEqual(key_column["from_column"], "fk") 

383 self.assertEqual(key_column["target_column"], "id") 

384 

385 def test_bad_table_name(self) -> None: 

386 """Test getting a bad TAP_SCHEMA table name.""" 

387 with self.assertRaises(KeyError): 

388 self.tap_schema_setup.mgr["bad_table"] 

389 

390 

391class ForceUnboundArraySizeTest(unittest.TestCase): 

392 """Test that arraysize for appropriate types is set to '*' when the 

393 ``force_unboundeded_arraysize`` context flag is set to ``True``. 

394 """ 

395 

396 def setUp(self) -> None: 

397 """Set up the test case.""" 

398 self.tap_schema_setup = TapSchemaSqliteSetup( 

399 TEST_TAP_SCHEMA, context={"id_generation": True, "force_unbounded_arraysize": True} 

400 ) 

401 

402 def test_force_unbounded_arraysize(self) -> None: 

403 """Test that unbounded arraysize is set to None.""" 

404 with create_database_context("sqlite:///:memory:", self.tap_schema_setup.metadata) as db_ctx: 

405 self.tap_schema_setup.mgr.initialize_database(db_ctx) 

406 loader = DataLoader( 

407 self.tap_schema_setup.schema, self.tap_schema_setup.mgr, db_context=db_ctx, tap_schema_index=2 

408 ) 

409 loader.load() 

410 

411 columns_table = self.tap_schema_setup.mgr["columns"] 

412 with db_ctx.engine.connect() as connection: 

413 result = connection.execute(select(columns_table)) 

414 column_data = [row._asdict() for row in result] 

415 

416 table1_rows = [ 

417 row for row in column_data if row["table_name"] == f"{self.tap_schema_setup.schema.name}.table1" 

418 ] 

419 for row in table1_rows: 

420 if row["column_name"] in ["string_field", "text_field", "unicode_field", "binary_field"]: 

421 self.assertEqual(row["arraysize"], "*") 

422 

423 

424class CompositeKeysTestCase(unittest.TestCase): 

425 """Test the handling of composite foreign keys.""" 

426 

427 def setUp(self) -> None: 

428 """Set up the test case.""" 

429 self.tap_schema_setup = TapSchemaSqliteSetup(TEST_COMPOSITE_KEYS, context={"id_generation": True}) 

430 

431 # Set up the data in a context manager 

432 with create_database_context("sqlite:///:memory:", self.tap_schema_setup.metadata) as db_ctx: 

433 self.tap_schema_setup.mgr.initialize_database(db_ctx) 

434 loader = DataLoader( 

435 self.tap_schema_setup.schema, self.tap_schema_setup.mgr, db_context=db_ctx, tap_schema_index=2 

436 ) 

437 loader.load() 

438 

439 # Fetch the keys and key_columns data from the TAP_SCHEMA tables. 

440 keys_table = self.tap_schema_setup.mgr["keys"] 

441 key_columns_table = self.tap_schema_setup.mgr["key_columns"] 

442 with db_ctx.engine.connect() as connection: 

443 key_columns_result = connection.execute(select(key_columns_table)) 

444 self.key_columns_data = [row._asdict() for row in key_columns_result] 

445 

446 keys_result = connection.execute(select(keys_table)) 

447 self.keys_data = [row._asdict() for row in keys_result] 

448 

449 def test_keys(self) -> None: 

450 """Test that composite keys are handled correctly by inspecting the 

451 data in the generated TAP_SCHEMA ``keys`` table. 

452 """ 

453 print(f"\nComposite keys data: {self.keys_data}") 

454 

455 self.assertEqual(len(self.keys_data), 1) 

456 

457 self.assertEqual( 

458 self.keys_data[0], 

459 { 

460 "key_id": "fk_composite", 

461 "from_table": "test_composite_keys.table1", 

462 "target_table": "test_composite_keys.table2", 

463 "utype": "ForeignKey", 

464 "description": "Composite foreign key from table1 to table2", 

465 }, 

466 ) 

467 

468 def test_key_columns(self) -> None: 

469 """Test that composite keys are handled correctly by inspecting the 

470 data in the generated TAP_SCHEMA ``key_columns`` table. 

471 """ 

472 print(f"\nComposite key columns data: {self.key_columns_data}") 

473 

474 self.assertEqual(len(self.key_columns_data), 2) 

475 

476 key_columns_row1 = self.key_columns_data[0] 

477 self.assertEqual( 

478 key_columns_row1, {"key_id": "fk_composite", "from_column": "id1", "target_column": "id1"} 

479 ) 

480 

481 key_columns_row2 = self.key_columns_data[1] 

482 self.assertEqual( 

483 key_columns_row2, {"key_id": "fk_composite", "from_column": "id2", "target_column": "id2"} 

484 ) 

485 

486 

487class ColumnRefsTestCase(unittest.TestCase): 

488 """Test case that column references are applied correctly in TAP_SCHEMA.""" 

489 

490 def setUp(self) -> None: 

491 """Set up the test case.""" 

492 self.temp_dir = tempfile.mkdtemp() 

493 

494 # Write out source schema file 

495 source_schema_content = """ 

496name: source_schema 

497tables: 

498- name: source_table 

499 columns: 

500 - name: ref_col1 

501 datatype: int 

502 - name: ref_col2 

503 datatype: string 

504 length: 64 

505 - name: ref_col3 

506 datatype: float 

507""" 

508 source_schema_path = os.path.join(self.temp_dir, "source_schema.yaml") 

509 with open(source_schema_path, "w") as f: 

510 f.write(source_schema_content.strip()) 

511 

512 # Write out referencing schema file 

513 ref_schema_content = """ 

514name: ref_schema 

515resources: 

516 source_schema: 

517 uri: {resource_path} 

518tables: 

519- name: ref_table 

520 columnRefs: 

521 source_schema: 

522 source_table: 

523 ref_col1: 

524 ref_col2: 

525 col3: 

526 ref_name: ref_col3 

527""" 

528 ref_schema_path = os.path.join(self.temp_dir, "ref_schema.yaml") 

529 ref_content = ref_schema_content.format(resource_path=source_schema_path) 

530 with open(ref_schema_path, "w") as f: 

531 f.write(ref_content.strip()) 

532 

533 self.tap_schema_setup = TapSchemaSqliteSetup( 

534 ref_schema_path, context={"id_generation": True, "column_ref_index_increment": 1} 

535 ) 

536 

537 def test_column_refs(self) -> None: 

538 """Test that column references are processed correctly.""" 

539 with create_database_context("sqlite:///:memory:", self.tap_schema_setup.metadata) as db_ctx: 

540 # Create the TAP_SCHEMA database and load the data 

541 self.tap_schema_setup.mgr.initialize_database(db_ctx) 

542 loader = DataLoader( 

543 self.tap_schema_setup.schema, self.tap_schema_setup.mgr, db_context=db_ctx, tap_schema_index=2 

544 ) 

545 loader.load() 

546 

547 columns_table = self.tap_schema_setup.mgr["columns"] 

548 with db_ctx.engine.connect() as connection: 

549 result = connection.execute(select(columns_table)) 

550 rows = [row._asdict() for row in result] 

551 self.assertEqual(len(rows), 3) 

552 column_names = [column_data["column_name"] for column_data in rows] 

553 self.assertEqual(set(column_names), {"ref_col1", "ref_col2", "col3"}) 

554 for column_data in rows: 

555 # Check the generated indices of the referenced columns 

556 if column_data["column_name"] == "ref_col1": 

557 self.assertEqual(column_data["column_index"], 1) 

558 elif column_data["column_name"] == "ref_col2": 

559 self.assertEqual(column_data["column_index"], 2) 

560 elif column_data["column_name"] == "col3": 

561 self.assertEqual(column_data["column_index"], 3) 

562 else: 

563 self.fail(f"Unexpected column name: {column_data['column_name']}") 

564 

565 

566class TableManagerExtensionsTestCase(unittest.TestCase): 

567 """Test the `TableManager` class with extensions.""" 

568 

569 def setUp(self) -> None: 

570 """Set up the test case.""" 

571 self.tmpdir = tempfile.mkdtemp(dir=TEST_DIR) 

572 

573 self.extensions_path = os.path.join(self.tmpdir, "test_extensions.yaml") 

574 extensions_content = """ 

575name: test_extensions 

576description: Test TAP_SCHEMA extensions 

577 

578tables: 

579 - name: schemas 

580 description: Extensions to schemas table 

581 columns: 

582 - name: owner_id 

583 datatype: char 

584 length: 32 

585 nullable: true 

586 description: "Owner identifier" 

587 - name: read_anon 

588 datatype: int 

589 nullable: true 

590 description: "Anon read flag" 

591 

592 - name: tables 

593 description: Extensions to tables table 

594 columns: 

595 - name: api_created 

596 datatype: int 

597 nullable: true 

598 description: "API created flag" 

599""" 

600 with open(self.extensions_path, "w") as f: 

601 f.write(extensions_content) 

602 

603 def tearDown(self) -> None: 

604 """Clean up temporary directory.""" 

605 shutil.rmtree(self.tmpdir, ignore_errors=True) 

606 

607 def test_extensions_applied(self) -> None: 

608 mgr = TableManager(extensions_path=self.extensions_path) 

609 

610 schemas_table = mgr["schemas"] 

611 self.assertIn("owner_id", schemas_table.c) 

612 self.assertIn("read_anon", schemas_table.c) 

613 

614 tables_table = mgr["tables"] 

615 self.assertIn("api_created", tables_table.c) 

616 

617 def test_extensions_column_count(self) -> None: 

618 mgr_without = TableManager() 

619 mgr_with = TableManager(extensions_path=self.extensions_path) 

620 

621 schemas_before = len(mgr_without["schemas"].c) 

622 schemas_after = len(mgr_with["schemas"].c) 

623 self.assertEqual(schemas_after, schemas_before + 2) 

624 

625 tables_before = len(mgr_without["tables"].c) 

626 tables_after = len(mgr_with["tables"].c) 

627 self.assertEqual(tables_after, tables_before + 1) 

628 

629 def test_extensions_with_data_loader(self) -> None: 

630 mgr = TableManager(extensions_path=self.extensions_path) 

631 with create_database_context("sqlite:///:memory:", mgr.metadata) as db_ctx: 

632 mgr.initialize_database(db_ctx) 

633 

634 with open(TEST_TAP_SCHEMA) as test_file: 

635 schema = Schema.from_stream(test_file, context={"id_generation": True}) 

636 

637 loader = DataLoader(schema, mgr, db_context=db_ctx) 

638 loader.load() 

639 

640 schemas_table = mgr["schemas"] 

641 with db_ctx.engine.connect() as connection: 

642 result = connection.execute(select(schemas_table)) 

643 row = result.fetchone() 

644 self.assertIn("owner_id", row._fields) 

645 self.assertIn("read_anon", row._fields) 

646 

647 def test_invalid_extensions_file(self) -> None: 

648 invalid_path = os.path.join(self.tmpdir, "nonexistent.yaml") 

649 

650 with self.assertRaises(ValueError): 

651 TableManager(extensions_path=invalid_path) 

652 

653 def test_empty_extensions(self) -> None: 

654 empty_extensions_path = os.path.join(self.tmpdir, "empty_extensions.yaml") 

655 with open(empty_extensions_path, "w") as f: 

656 f.write("name: empty_extensions\ntables: []\n") 

657 

658 mgr = TableManager(extensions_path=empty_extensions_path) 

659 self.assertIsNotNone(mgr["schemas"]) 

660 

661 def test_extensions_with_null_table_extensions(self) -> None: 

662 null_extensions_path = os.path.join(self.tmpdir, "null_extensions.yaml") 

663 with open(null_extensions_path, "w") as f: 

664 f.write(""" 

665name: null_extensions 

666tables: 

667 - name: schemas 

668 columns: [] 

669 - name: tables 

670 columns: [] 

671 - name: columns 

672 columns: 

673 - name: test_col 

674 datatype: int 

675""") 

676 

677 mgr = TableManager(extensions_path=null_extensions_path) 

678 

679 columns_table = mgr["columns"] 

680 self.assertIn("test_col", columns_table.c) 

681 

682 schemas_table = mgr["schemas"] 

683 self.assertNotIn("owner_id", schemas_table.c) 

684 

685 def test_extensions_invalid_column_missing_name(self) -> None: 

686 invalid_name_path = os.path.join(self.tmpdir, "invalid_name.yaml") 

687 with open(invalid_name_path, "w") as f: 

688 f.write(""" 

689 name: invalid_name 

690 tables: 

691 - name: schemas 

692 columns: 

693 - datatype: int 

694 description: "Missing name" 

695 - name: some_column 

696 datatype: int 

697 """) 

698 with self.assertRaises(KeyError): 

699 TableManager(extensions_path=invalid_name_path) 

700 

701 def test_extensions_column_id_auto_generation(self) -> None: 

702 auto_id_path = os.path.join(self.tmpdir, "auto_id.yaml") 

703 with open(auto_id_path, "w") as f: 

704 f.write(""" 

705name: auto_id 

706tables: 

707 - name: schemas 

708 columns: 

709 - name: auto_id 

710 datatype: int 

711 nullable: false 

712 description: "Column with auto_id" 

713""") 

714 

715 mgr = TableManager(extensions_path=auto_id_path) 

716 schemas_table = mgr["schemas"] 

717 self.assertIn("auto_id", schemas_table.c) 

718 

719 def test_extensions_column_id_preserved(self) -> None: 

720 explicit_id_path = os.path.join(self.tmpdir, "explicit_id.yaml") 

721 with open(explicit_id_path, "w") as f: 

722 f.write(""" 

723name: explicit_id 

724tables: 

725 - name: schemas 

726 columns: 

727 - name: explicit_id 

728 datatype: int 

729 "@id": "#custom.id.path" 

730""") 

731 

732 mgr = TableManager(extensions_path=explicit_id_path) 

733 schemas_table = mgr["schemas"] 

734 self.assertIn("explicit_id", schemas_table.c) 

735 

736 def test_extensions_multiple_tables_extended(self) -> None: 

737 multi_table_path = os.path.join(self.tmpdir, "multi_table.yaml") 

738 with open(multi_table_path, "w") as f: 

739 f.write(""" 

740name: multi_table 

741tables: 

742 - name: schemas 

743 columns: 

744 - name: schema_ext1 

745 datatype: int 

746 - name: schema_ext2 

747 datatype: int 

748 - name: tables 

749 columns: 

750 - name: table_ext1 

751 datatype: int 

752 - name: table_ext2 

753 datatype: double 

754 - name: columns 

755 columns: 

756 - name: col_ext1 

757 datatype: int 

758 - name: keys 

759 columns: 

760 - name: key_ext1 

761 datatype: char 

762 length: 128 

763""") 

764 

765 mgr = TableManager(extensions_path=multi_table_path) 

766 

767 schemas_table = mgr["schemas"] 

768 self.assertIn("schema_ext1", schemas_table.c) 

769 self.assertIn("schema_ext2", schemas_table.c) 

770 

771 tables_table = mgr["tables"] 

772 self.assertIn("table_ext1", tables_table.c) 

773 self.assertIn("table_ext2", tables_table.c) 

774 

775 columns_table = mgr["columns"] 

776 self.assertIn("col_ext1", columns_table.c) 

777 

778 keys_table = mgr["keys"] 

779 self.assertIn("key_ext1", keys_table.c) 

780 

781 def test_extensions_nonexistent_table_skipped(self) -> None: 

782 nonexistent_table_path = os.path.join(self.tmpdir, "nonexistent_table.yaml") 

783 with open(nonexistent_table_path, "w") as f: 

784 f.write(""" 

785name: test_extensions_nonexistent_table 

786tables: 

787 - name: schemas 

788 columns: 

789 - name: valid_ext 

790 datatype: int 

791 - name: nonexistent_table 

792 columns: 

793 - name: should_be_ignored 

794 datatype: int 

795""") 

796 

797 mgr = TableManager(extensions_path=nonexistent_table_path) 

798 schemas_table = mgr["schemas"] 

799 self.assertIn("valid_ext", schemas_table.c) 

800 

801 def test_extensions_column_properties_preserved(self) -> None: 

802 full_props_path = os.path.join(self.tmpdir, "full_props.yaml") 

803 with open(full_props_path, "w") as f: 

804 f.write(""" 

805name: full_props 

806tables: 

807 - name: schemas 

808 columns: 

809 - name: full_property_column 

810 datatype: char 

811 length: 64 

812 nullable: false 

813 description: "Column with all properties" 

814 "@id": "#tap_schema.schemas.full_property_column" 

815""") 

816 

817 mgr = TableManager(extensions_path=full_props_path) 

818 schemas_table = mgr["schemas"] 

819 self.assertIn("full_property_column", schemas_table.c) 

820 

821 def test_extensions_apply_schema_to_metadata_true(self) -> None: 

822 mgr = TableManager( 

823 engine_url="postgresql://user:pass@localhost/db", extensions_path=self.extensions_path 

824 ) 

825 schemas_table = mgr["schemas"] 

826 self.assertIn("owner_id", schemas_table.c) 

827 

828 def test_extensions_apply_schema_to_metadata_false(self) -> None: 

829 mgr = TableManager(extensions_path=self.extensions_path) 

830 

831 schemas_table = mgr["schemas"] 

832 self.assertIn("owner_id", schemas_table.c) 

833 self.assertIn("read_anon", schemas_table.c) 

834 

835 def test_extensions_with_table_name_postfix(self) -> None: 

836 mgr = TableManager(extensions_path=self.extensions_path, table_name_postfix="_custom") 

837 

838 schemas_table = mgr["schemas"] 

839 self.assertIn("owner_id", schemas_table.c) 

840 

841 def test_extensions_metadata_builder_called(self) -> None: 

842 mgr = TableManager(extensions_path=self.extensions_path) 

843 

844 self.assertIsNotNone(mgr._metadata) 

845 

846 table_names = list(mgr.metadata.tables.keys()) 

847 found_schemas = any("schemas" in name for name in table_names) 

848 found_tables = any("tables" in name and "schemas" not in name for name in table_names) 

849 

850 self.assertTrue(found_schemas, f"No schemas table found in {table_names}") 

851 self.assertTrue(found_tables, f"No tables table found in {table_names}") 

852 

853 def test_extensions_preserve_original_columns(self) -> None: 

854 mgr = TableManager(extensions_path=self.extensions_path) 

855 

856 schemas_table = mgr["schemas"] 

857 column_names = [col.name for col in schemas_table.columns] 

858 

859 self.assertIn("schema_name", column_names) 

860 self.assertIn("owner_id", column_names) 

861 self.assertIn("read_anon", column_names) 

862 

863 def test_no_extensions_path_provided(self) -> None: 

864 mgr = TableManager(extensions_path=None) 

865 schemas_table = mgr["schemas"] 

866 self.assertNotIn("owner_id", schemas_table.c) 

867 

868 def test_extensions_path_empty_string(self) -> None: 

869 mgr = TableManager(extensions_path="") 

870 schemas_table = mgr["schemas"] 

871 self.assertNotIn("owner_id", schemas_table.c) 

872 

873 def test_extensions_file_not_found(self) -> None: 

874 nonexistent_path = os.path.join(self.tmpdir, "does_not_exist.yaml") 

875 with self.assertRaises(ValueError): 

876 TableManager(extensions_path=nonexistent_path) 

877 

878 

879if __name__ == "__main__": 879 ↛ 880line 879 didn't jump to line 880 because the condition on line 879 was never true

880 unittest.main()