Coverage for tests / test_cli.py: 25%

230 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-07 08:14 +0000

1# This file is part of felis. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22import logging 

23import os 

24import shutil 

25import tempfile 

26import unittest 

27from typing import Any 

28 

29import yaml 

30from sqlalchemy import create_engine, text 

31 

32import felis.tap_schema as tap_schema 

33from felis.datamodel import Schema 

34from felis.metadata import MetaDataBuilder 

35from felis.tests.run_cli import run_cli 

36 

37TEST_DIR = os.path.abspath(os.path.dirname(__file__)) 

38TEST_YAML = os.path.join(TEST_DIR, "data", "test.yml") 

39TEST_SALES_YAML = os.path.join(TEST_DIR, "data", "sales.yaml") 

40 

41 

42class CliTestCase(unittest.TestCase): 

43 """Tests for CLI commands.""" 

44 

45 def setUp(self) -> None: 

46 """Set up a temporary directory for tests.""" 

47 self.tmpdir = tempfile.mkdtemp(dir=TEST_DIR) 

48 self.sqlite_url = f"sqlite:///{self.tmpdir}/db.sqlite3" 

49 print(f"Using temporary directory: {self.tmpdir}") 

50 

51 # Clear any existing logging handlers to ensure fresh configuration for 

52 # each test 

53 for handler in logging.root.handlers[:]: 

54 logging.root.removeHandler(handler) 

55 

56 def tearDown(self) -> None: 

57 """Clean up temporary directory.""" 

58 shutil.rmtree(self.tmpdir, ignore_errors=True) 

59 

60 def test_invalid_command(self) -> None: 

61 """Test for invalid command.""" 

62 run_cli(["invalid"], expect_error=True) 

63 

64 def test_help(self) -> None: 

65 """Test for help command.""" 

66 run_cli(["--help"], print_output=True) 

67 

68 def test_create(self) -> None: 

69 """Test for create command.""" 

70 run_cli(["create", f"--engine-url={self.sqlite_url}", TEST_YAML]) 

71 

72 def test_create_with_echo(self) -> None: 

73 """Test for create command.""" 

74 run_cli(["create", "--echo", f"--engine-url={self.sqlite_url}", TEST_YAML]) 

75 

76 def test_create_with_dry_run(self) -> None: 

77 """Test for ``create --dry-run`` command.""" 

78 run_cli(["create", "--schema-name=main", f"--engine-url={self.sqlite_url}", "--dry-run", TEST_YAML]) 

79 

80 def test_create_with_ignore_constraints(self) -> None: 

81 """Test ``--ignore-constraints`` flag of ``create`` command.""" 

82 run_cli( 

83 [ 

84 "create", 

85 "--schema-name=main", 

86 "--ignore-constraints", 

87 f"--engine-url={self.sqlite_url}", 

88 "--dry-run", 

89 TEST_YAML, 

90 ] 

91 ) 

92 

93 def test_validate(self) -> None: 

94 """Test validate command.""" 

95 run_cli(["validate", TEST_YAML]) 

96 

97 def test_validate_with_log_file(self) -> None: 

98 """Test validate command with log file.""" 

99 log_file = os.path.join(self.tmpdir, "validate.log") 

100 run_cli([f"--log-file={log_file}", "validate", TEST_YAML], log_level=logging.DEBUG, print_cmd=True) 

101 if not os.path.exists(log_file): 

102 self.fail("Log file was not created") 

103 if os.path.getsize(log_file) == 0: 

104 self.fail("Log file is empty") 

105 

106 def test_validate_with_id_generation(self) -> None: 

107 """Test that loading a schema with IDs works if ID generation is 

108 enabled. This is the default behavior. 

109 """ 

110 test_yaml = os.path.join(TEST_DIR, "data", "test_id_generation.yaml") 

111 run_cli(["--id-generation", "validate", test_yaml]) 

112 

113 def test_validate_with_id_generation_error(self) -> None: 

114 """Test that loading a schema without IDs fails if ID generation is not 

115 enabled. 

116 """ 

117 test_yaml = os.path.join(TEST_DIR, "data", "test_id_generation.yaml") 

118 run_cli(["--no-id-generation", "validate", test_yaml], expect_error=True) 

119 

120 def test_validate_with_extra_checks(self) -> None: 

121 """Test schema validation flags.""" 

122 run_cli( 

123 [ 

124 "validate", 

125 "--check-description", 

126 "--check-tap-principal", 

127 "--check-tap-table-indexes", 

128 TEST_YAML, 

129 ] 

130 ) 

131 

132 def test_create_with_initialize_and_drop_error(self) -> None: 

133 """Test that initialize and drop can't be used together.""" 

134 run_cli(["create", "--initialize", "--drop", TEST_YAML], expect_error=True) 

135 

136 def test_load_tap_schema(self) -> None: 

137 """Test load-tap-schema command.""" 

138 # Create the TAP_SCHEMA database. 

139 tap_schema_path = tap_schema.TableManager.get_tap_schema_std_path() 

140 run_cli(["--id-generation", "create", f"--engine-url={self.sqlite_url}", tap_schema_path]) 

141 

142 # Load the TAP_SCHEMA data. 

143 run_cli(["load-tap-schema", f"--engine-url={self.sqlite_url}", TEST_YAML]) 

144 

145 def test_load_tap_schema_with_dry_run_and_output_file(self) -> None: 

146 """Test load-tap-schema command with dry run and output file.""" 

147 output_sql = os.path.join(self.tmpdir, "tap_schema.sql") 

148 run_cli( 

149 [ 

150 "load-tap-schema", 

151 "--engine-url=mysql://", 

152 "--dry-run", 

153 "--tap-schema-index=1", 

154 "--tap-tables-postfix=11", 

155 "--force-unbounded-arraysize", 

156 f"--output-file={output_sql}", 

157 TEST_YAML, 

158 ] 

159 ) 

160 if not os.path.exists(output_sql): 

161 self.fail("Output SQL file was not created") 

162 if os.path.getsize(output_sql) == 0: 

163 self.fail("Output SQL file is empty") 

164 

165 def test_init_tap_schema(self) -> None: 

166 """Test init-tap-schema command.""" 

167 run_cli(["init-tap-schema", f"--engine-url={self.sqlite_url}"]) 

168 

169 def test_init_tap_schema_mock(self) -> None: 

170 """Test init-tap-schema command with a mock URL, which should throw 

171 an error, as this is not supported. 

172 """ 

173 run_cli(["init-tap-schema", "sqlite://"], expect_error=True) 

174 

175 def test_init_tap_schema_with_extensions(self) -> None: 

176 """Test init-tap-schema command with default extensions.""" 

177 run_cli( 

178 [ 

179 "init-tap-schema", 

180 f"--engine-url={self.sqlite_url}", 

181 "--extensions", 

182 "resource://felis/config/tap_schema/tap_schema_extensions.yaml", 

183 ] 

184 ) 

185 

186 def test_init_tap_schema_with_custom_extensions(self) -> None: 

187 """Test init-tap-schema command with custom extensions file.""" 

188 extensions_file = os.path.join(self.tmpdir, "custom_extensions.yaml") 

189 extensions_content = """ 

190 name: TAP_SCHEMA 

191 tables: 

192 - name: schemas 

193 columns: 

194 - name: field1 

195 datatype: char 

196 length: 64 

197 description: A custom field 

198 """ 

199 with open(extensions_file, "w") as f: 

200 f.write(extensions_content) 

201 

202 run_cli(["init-tap-schema", f"--engine-url={self.sqlite_url}", "--extensions", extensions_file]) 

203 

204 def test_diff(self) -> None: 

205 """Test for ``diff`` command.""" 

206 test_diff1 = os.path.join(TEST_DIR, "data", "test_diff1.yaml") 

207 test_diff2 = os.path.join(TEST_DIR, "data", "test_diff2.yaml") 

208 

209 run_cli(["diff", test_diff1, test_diff2]) 

210 

211 def test_diff_database(self) -> None: 

212 """Test for ``diff`` command with database.""" 

213 test_diff1 = os.path.join(TEST_DIR, "data", "test_diff1.yaml") 

214 test_diff2 = os.path.join(TEST_DIR, "data", "test_diff2.yaml") 

215 

216 engine = create_engine(self.sqlite_url) 

217 metadata_db = MetaDataBuilder(Schema.from_uri(test_diff1), apply_schema_to_metadata=False).build() 

218 metadata_db.create_all(engine) 

219 engine.dispose() 

220 

221 run_cli(["diff", f"--engine-url={self.sqlite_url}", test_diff2]) 

222 

223 def test_diff_alembic(self) -> None: 

224 """Test for ``diff`` command with ``--alembic`` comparator option.""" 

225 test_diff1 = os.path.join(TEST_DIR, "data", "test_diff1.yaml") 

226 test_diff2 = os.path.join(TEST_DIR, "data", "test_diff2.yaml") 

227 run_cli(["diff", "--comparator", "alembic", test_diff1, test_diff2], print_output=True) 

228 

229 def test_diff_error(self) -> None: 

230 """Test for ``diff`` command with bad arguments.""" 

231 test_diff1 = os.path.join(TEST_DIR, "data", "test_diff1.yaml") 

232 run_cli(["diff", test_diff1], expect_error=True) 

233 

234 def test_diff_error_on_change(self) -> None: 

235 """Test for ``diff`` command with ``--error-on-change`` flag.""" 

236 test_diff1 = os.path.join(TEST_DIR, "data", "test_diff1.yaml") 

237 test_diff2 = os.path.join(TEST_DIR, "data", "test_diff2.yaml") 

238 run_cli(["diff", "--error-on-change", test_diff1, test_diff2], expect_error=True, print_output=True) 

239 

240 def test_dump_yaml(self) -> None: 

241 """Test for ``dump`` command with YAML output.""" 

242 with tempfile.NamedTemporaryFile(delete=True, suffix=".yaml") as temp_file: 

243 run_cli(["dump", TEST_YAML, temp_file.name], print_output=True) 

244 

245 @classmethod 

246 def _check_strip_ids(cls, obj: Any) -> None: 

247 """ 

248 Recursively check that a dict/list structure has no attributes with key 

249 '@id'. Raises a ValueError if any '@id' key is found. This is used to 

250 check the output of the `--strip-ids` option in the `dump` command. 

251 """ 

252 if isinstance(obj, dict): 

253 for k, v in obj.items(): 

254 if k == "@id": 

255 raise ValueError("Found forbidden key '@id'") 

256 cls._check_strip_ids(v) 

257 elif isinstance(obj, list): 

258 for item in obj: 

259 cls._check_strip_ids(item) 

260 

261 def test_dump_yaml_with_strip_ids(self) -> None: 

262 """Test for ``dump`` command with YAML output and stripped IDs.""" 

263 with tempfile.NamedTemporaryFile(delete=True, suffix=".yaml") as temp_file: 

264 run_cli(["dump", "--strip-ids", TEST_YAML, temp_file.name], print_output=True) 

265 dumped_data = temp_file.read().decode("utf-8") 

266 try: 

267 # Load the dumped YAML data to check for '@id' keys. 

268 data = yaml.safe_load(dumped_data) 

269 self._check_strip_ids(data) 

270 except ValueError: 

271 self.fail("Dumped YAML contains forbidden key '@id'") 

272 

273 def test_dump_json(self) -> None: 

274 """Test for ``dump`` command with JSON output.""" 

275 with tempfile.NamedTemporaryFile(delete=True, suffix=".json") as temp_file: 

276 run_cli(["dump", TEST_YAML, temp_file.name], print_output=True) 

277 

278 def test_dump_with_dereference_resources_and_sort_columns(self) -> None: 

279 """Test dump with both --dereference-resources and --sort-columns.""" 

280 # Define a source schema with columns in non-alphabetical order 

281 source_schema_content = """ 

282name: base_schema 

283tables: 

284- name: base_table 

285 columns: 

286 - name: zebra_col 

287 datatype: string 

288 length: 32 

289 - name: alpha_col 

290 datatype: int 

291 - name: middle_col 

292 datatype: float 

293""" 

294 source_path = os.path.join(self.tmpdir, "base_schema.yaml") 

295 with open(source_path, "w") as f: 

296 f.write(source_schema_content.strip()) 

297 

298 # Define a referencing schema that pulls columns via columnRefs 

299 ref_schema_content = f""" 

300name: derived_schema 

301resources: 

302 base_schema: 

303 uri: {source_path} 

304tables: 

305- name: derived_table 

306 columnRefs: 

307 base_schema: 

308 base_table: 

309 zebra_col: 

310 alpha_col: 

311 middle_col: 

312""" 

313 ref_path = os.path.join(self.tmpdir, "derived_schema.yaml") 

314 with open(ref_path, "w") as f: 

315 f.write(ref_schema_content.strip()) 

316 

317 with tempfile.NamedTemporaryFile(delete=True, suffix=".yaml", dir=self.tmpdir) as temp_file: 

318 run_cli( 

319 [ 

320 "dump", 

321 "--dereference-resources", 

322 "--sort-columns", 

323 ref_path, 

324 temp_file.name, 

325 ], 

326 print_output=True, 

327 ) 

328 dumped_data = temp_file.read().decode("utf-8") 

329 data = yaml.safe_load(dumped_data) 

330 

331 # Verify resources are dereferenced (no columnRefs remain) 

332 for table in data.get("tables", []): 

333 self.assertNotIn("columnRefs", table) 

334 # Verify columns are present and sorted alphabetically 

335 columns = table.get("columns", []) 

336 self.assertGreater(len(columns), 0) 

337 names = [col["name"] for col in columns] 

338 self.assertEqual(names, sorted(names)) 

339 

340 def test_dump_json_with_strip_ids(self) -> None: 

341 """Test for ``dump`` command with JSON output.""" 

342 with tempfile.NamedTemporaryFile(delete=True, suffix=".json") as temp_file: 

343 run_cli(["dump", "--strip-ids", TEST_YAML, temp_file.name], print_output=True) 

344 dumped_data = temp_file.read().decode("utf-8") 

345 try: 

346 # Load the dumped YAML data to check for '@id' keys. 

347 data = yaml.safe_load(dumped_data) 

348 self._check_strip_ids(data) 

349 except ValueError: 

350 self.fail("Dumped YAML contains forbidden key '@id'") 

351 

352 @classmethod 

353 def _check_columns_sorted(cls, data: dict[str, Any]) -> None: 

354 """Check that columns in each table are sorted alphabetically by 

355 name. 

356 """ 

357 for table in data.get("tables", []): 

358 columns = table.get("columns", []) 

359 names = [col["name"] for col in columns] 

360 assert names == sorted(names), f"Columns not sorted in table {table.get('name')}: {names}" 

361 

362 def test_dump_yaml_with_sort_columns(self) -> None: 

363 """Test for ``dump`` command with YAML output and sorted columns.""" 

364 with tempfile.NamedTemporaryFile(delete=True, suffix=".yaml") as temp_file: 

365 run_cli(["dump", "--sort-columns", TEST_YAML, temp_file.name], print_output=True) 

366 dumped_data = temp_file.read().decode("utf-8") 

367 data = yaml.safe_load(dumped_data) 

368 self._check_columns_sorted(data) 

369 

370 def test_dump_json_with_sort_columns(self) -> None: 

371 """Test for ``dump`` command with JSON output and sorted columns.""" 

372 with tempfile.NamedTemporaryFile(delete=True, suffix=".json") as temp_file: 

373 run_cli(["dump", "--sort-columns", TEST_YAML, temp_file.name], print_output=True) 

374 dumped_data = temp_file.read().decode("utf-8") 

375 data = yaml.safe_load(dumped_data) 

376 self._check_columns_sorted(data) 

377 

378 def test_dump_with_invalid_file_extension_error(self) -> None: 

379 """Test for ``dump`` command with JSON output.""" 

380 run_cli(["dump", TEST_YAML, "out.bad"], expect_error=True) 

381 

382 def test_create_and_drop_indexes(self) -> None: 

383 """Test creating and dropping indexes using CLI commands with 

384 SQLite; no checking for the existence of the indexes is done on the 

385 database because other test cases cover that functionality 

386 sufficiently. 

387 """ 

388 # Create database without indexes 

389 run_cli(["create", "--skip-indexes", f"--engine-url={self.sqlite_url}", TEST_SALES_YAML]) 

390 

391 # Create the indexes using CLI 

392 run_cli( 

393 ["create-indexes", f"--engine-url={self.sqlite_url}", TEST_SALES_YAML, "--schema-name", "main"] 

394 ) 

395 

396 # Create the indexes again; should not cause an error 

397 run_cli( 

398 ["create-indexes", f"--engine-url={self.sqlite_url}", TEST_SALES_YAML, "--schema-name", "main"] 

399 ) 

400 

401 # Drop the indexes using CLI 

402 run_cli(["drop-indexes", f"--engine-url={self.sqlite_url}", TEST_SALES_YAML, "--schema-name", "main"]) 

403 

404 def test_generate_and_load_sql(self) -> None: 

405 """Test generating SQL and then executing it on a SQLite database.""" 

406 generated_sql = os.path.join(self.tmpdir, "generated.sql") 

407 

408 try: 

409 # Generate SQL DDL from schema using mock connection 

410 run_cli( 

411 [ 

412 "create", 

413 "--engine-url=sqlite://", 

414 f"--output-file={generated_sql}", 

415 f"{TEST_YAML}", 

416 ] 

417 ) 

418 

419 # Verify the SQL file was generated 

420 self.assertTrue(os.path.exists(generated_sql), "Generated SQL file should exist") 

421 

422 # Read the generated SQL 

423 with open(generated_sql) as f: 

424 sql = f.read() 

425 

426 # Verify SQL content is not empty 

427 self.assertGreater(len(sql.strip()), 0, "Generated SQL should not be empty") 

428 

429 # Execute the SQL against a real database 

430 engine = create_engine(self.sqlite_url) 

431 with engine.connect() as connection: 

432 with connection.begin(): 

433 # Split SQL into individual statements for execution since 

434 # SQLite can only execute one statement at a time 

435 statements = [stmt.strip() for stmt in sql.split(";") if stmt.strip()] 

436 for statement in statements: 

437 if statement: # Skip empty statements 

438 connection.execute(text(statement)) 

439 

440 # Verify that all expected tables were actually created 

441 with engine.connect() as connection: 

442 # Load the schema to get expected table names 

443 schema = Schema.from_uri(TEST_YAML, context={"id_generation": True}) 

444 expected_table_names = {table.name for table in schema.tables} 

445 

446 # Get all tables that were created in the database 

447 result = connection.execute(text("SELECT name FROM sqlite_master WHERE type='table'")) 

448 created_table_names = {row[0] for row in result.fetchall()} 

449 

450 # Verify all expected tables were created 

451 self.assertTrue( 

452 expected_table_names.issubset(created_table_names), 

453 f"Missing tables: {expected_table_names - created_table_names}. " 

454 f"Expected: {sorted(expected_table_names)}, " 

455 f"Created: {sorted(created_table_names)}", 

456 ) 

457 

458 engine.dispose() 

459 

460 except Exception as e: 

461 self.fail(f"Test failed with exception: {e}") 

462 

463 

464class ColumnRefsTestCase(unittest.TestCase): 

465 """Test handling of column references in CLI.""" 

466 

467 def setUp(self) -> None: 

468 """Set up a temporary directory for tests.""" 

469 self.temp_dir = tempfile.mkdtemp(dir=TEST_DIR) 

470 self.sqlite_url = f"sqlite:///{self.temp_dir}/db.sqlite3" 

471 

472 # Write out source schema file 

473 source_schema_content = """ 

474name: source_schema 

475tables: 

476- name: source_table 

477 columns: 

478 - name: ref_col1 

479 datatype: int 

480 - name: ref_col2 

481 datatype: string 

482 length: 64 

483 - name: ref_col3 

484 datatype: float 

485""" 

486 source_schema_path = os.path.join(self.temp_dir, "source_schema.yaml") 

487 with open(source_schema_path, "w") as f: 

488 f.write(source_schema_content.strip()) 

489 

490 # Write out referencing schema file 

491 ref_schema_content = """ 

492name: ref_schema 

493resources: 

494 source_schema: 

495 uri: {resource_path} 

496tables: 

497- name: ref_table 

498 columnRefs: 

499 source_schema: 

500 source_table: 

501 ref_col1: 

502 ref_col2: 

503 overrides: 

504 tap:column_index: 15 

505 col3: 

506 ref_name: ref_col3 

507""" 

508 self.ref_schema_path = os.path.join(self.temp_dir, "ref_schema.yaml") 

509 ref_content = ref_schema_content.format(resource_path=source_schema_path) 

510 with open(self.ref_schema_path, "w") as f: 

511 f.write(ref_content.strip()) 

512 

513 def tearDown(self) -> None: 

514 """Clean up temporary directory.""" 

515 shutil.rmtree(self.temp_dir, ignore_errors=True) 

516 

517 def test_validate_with_column_ref_index_increment(self) -> None: 

518 """Test that passing a valid value for column reference index increment 

519 works. 

520 """ 

521 run_cli( 

522 [ 

523 "--column-ref-index-increment=1", 

524 "validate", 

525 self.ref_schema_path, 

526 ] 

527 ) 

528 

529 def test_validate_with_column_ref_index_increment_error(self) -> None: 

530 """Test that passing an invalid value for column reference index raises 

531 an error. 

532 """ 

533 run_cli( 

534 [ 

535 "--column-ref-index-increment=-1", 

536 "validate", 

537 self.ref_schema_path, 

538 ], 

539 expect_error=True, 

540 ) 

541 

542 def test_load_tap_schema_with_column_refs(self) -> None: 

543 """Test load-tap-schema command with column reference index 

544 increment. 

545 """ 

546 # Create the TAP_SCHEMA database 

547 run_cli(["init-tap-schema", f"--engine-url={self.sqlite_url}"]) 

548 

549 # Load the TAP_SCHEMA data that includes column references 

550 run_cli( 

551 [ 

552 "load-tap-schema", 

553 f"--engine-url={self.sqlite_url}", 

554 self.ref_schema_path, 

555 ] 

556 ) 

557 

558 def test_load_tap_schema_with_column_ref_index_increment(self) -> None: 

559 """Test load-tap-schema command with column reference index 

560 increment. 

561 """ 

562 # Create the TAP_SCHEMA database 

563 run_cli(["init-tap-schema", f"--engine-url={self.sqlite_url}"]) 

564 

565 # Load the TAP_SCHEMA data that includes column reference index 

566 # increment 

567 run_cli( 

568 [ 

569 "--column-ref-index-increment=1", 

570 "load-tap-schema", 

571 f"--engine-url={self.sqlite_url}", 

572 self.ref_schema_path, 

573 ] 

574 ) 

575 

576 

577if __name__ == "__main__": 577 ↛ 578line 577 didn't jump to line 578 because the condition on line 577 was never true

578 unittest.main()