Coverage for tests / test_cli.py: 27%

193 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 08:42 +0000

1# This file is part of felis. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22import logging 

23import os 

24import shutil 

25import tempfile 

26import unittest 

27from typing import Any 

28 

29import yaml 

30from sqlalchemy import create_engine, text 

31 

32import felis.tap_schema as tap_schema 

33from felis.datamodel import Schema 

34from felis.metadata import MetaDataBuilder 

35from felis.tests.run_cli import run_cli 

36 

37TEST_DIR = os.path.abspath(os.path.dirname(__file__)) 

38TEST_YAML = os.path.join(TEST_DIR, "data", "test.yml") 

39TEST_SALES_YAML = os.path.join(TEST_DIR, "data", "sales.yaml") 

40 

41 

42class CliTestCase(unittest.TestCase): 

43 """Tests for CLI commands.""" 

44 

45 def setUp(self) -> None: 

46 """Set up a temporary directory for tests.""" 

47 self.tmpdir = tempfile.mkdtemp(dir=TEST_DIR) 

48 self.sqlite_url = f"sqlite:///{self.tmpdir}/db.sqlite3" 

49 print(f"Using temporary directory: {self.tmpdir}") 

50 

51 # Clear any existing logging handlers to ensure fresh configuration for 

52 # each test 

53 for handler in logging.root.handlers[:]: 

54 logging.root.removeHandler(handler) 

55 

56 def tearDown(self) -> None: 

57 """Clean up temporary directory.""" 

58 shutil.rmtree(self.tmpdir, ignore_errors=True) 

59 

60 def test_invalid_command(self) -> None: 

61 """Test for invalid command.""" 

62 run_cli(["invalid"], expect_error=True) 

63 

64 def test_help(self) -> None: 

65 """Test for help command.""" 

66 run_cli(["--help"], print_output=True) 

67 

68 def test_create(self) -> None: 

69 """Test for create command.""" 

70 run_cli(["create", f"--engine-url={self.sqlite_url}", TEST_YAML]) 

71 

72 def test_create_with_echo(self) -> None: 

73 """Test for create command.""" 

74 run_cli(["create", "--echo", f"--engine-url={self.sqlite_url}", TEST_YAML]) 

75 

76 def test_create_with_dry_run(self) -> None: 

77 """Test for ``create --dry-run`` command.""" 

78 run_cli(["create", "--schema-name=main", f"--engine-url={self.sqlite_url}", "--dry-run", TEST_YAML]) 

79 

80 def test_create_with_ignore_constraints(self) -> None: 

81 """Test ``--ignore-constraints`` flag of ``create`` command.""" 

82 run_cli( 

83 [ 

84 "create", 

85 "--schema-name=main", 

86 "--ignore-constraints", 

87 f"--engine-url={self.sqlite_url}", 

88 "--dry-run", 

89 TEST_YAML, 

90 ] 

91 ) 

92 

93 def test_validate(self) -> None: 

94 """Test validate command.""" 

95 run_cli(["validate", TEST_YAML]) 

96 

97 def test_validate_with_log_file(self) -> None: 

98 """Test validate command with log file.""" 

99 log_file = os.path.join(self.tmpdir, "validate.log") 

100 run_cli([f"--log-file={log_file}", "validate", TEST_YAML], log_level=logging.DEBUG, print_cmd=True) 

101 if not os.path.exists(log_file): 

102 self.fail("Log file was not created") 

103 if os.path.getsize(log_file) == 0: 

104 self.fail("Log file is empty") 

105 

106 def test_validate_with_id_generation(self) -> None: 

107 """Test that loading a schema with IDs works if ID generation is 

108 enabled. This is the default behavior. 

109 """ 

110 test_yaml = os.path.join(TEST_DIR, "data", "test_id_generation.yaml") 

111 run_cli(["--id-generation", "validate", test_yaml]) 

112 

113 def test_validate_with_id_generation_error(self) -> None: 

114 """Test that loading a schema without IDs fails if ID generation is not 

115 enabled. 

116 """ 

117 test_yaml = os.path.join(TEST_DIR, "data", "test_id_generation.yaml") 

118 run_cli(["--no-id-generation", "validate", test_yaml], expect_error=True) 

119 

120 def test_validate_with_extra_checks(self) -> None: 

121 """Test schema validation flags.""" 

122 run_cli( 

123 [ 

124 "validate", 

125 "--check-description", 

126 "--check-tap-principal", 

127 "--check-tap-table-indexes", 

128 TEST_YAML, 

129 ] 

130 ) 

131 

132 def test_create_with_initialize_and_drop_error(self) -> None: 

133 """Test that initialize and drop can't be used together.""" 

134 run_cli(["create", "--initialize", "--drop", TEST_YAML], expect_error=True) 

135 

136 def test_load_tap_schema(self) -> None: 

137 """Test load-tap-schema command.""" 

138 # Create the TAP_SCHEMA database. 

139 tap_schema_path = tap_schema.TableManager.get_tap_schema_std_path() 

140 run_cli(["--id-generation", "create", f"--engine-url={self.sqlite_url}", tap_schema_path]) 

141 

142 # Load the TAP_SCHEMA data. 

143 run_cli(["load-tap-schema", f"--engine-url={self.sqlite_url}", TEST_YAML]) 

144 

145 def test_load_tap_schema_with_dry_run_and_output_file(self) -> None: 

146 """Test load-tap-schema command with dry run and output file.""" 

147 output_sql = os.path.join(self.tmpdir, "tap_schema.sql") 

148 run_cli( 

149 [ 

150 "load-tap-schema", 

151 "--engine-url=mysql://", 

152 "--dry-run", 

153 "--tap-schema-index=1", 

154 "--tap-tables-postfix=11", 

155 "--force-unbounded-arraysize", 

156 f"--output-file={output_sql}", 

157 TEST_YAML, 

158 ] 

159 ) 

160 if not os.path.exists(output_sql): 

161 self.fail("Output SQL file was not created") 

162 if os.path.getsize(output_sql) == 0: 

163 self.fail("Output SQL file is empty") 

164 

165 def test_init_tap_schema(self) -> None: 

166 """Test init-tap-schema command.""" 

167 run_cli(["init-tap-schema", f"--engine-url={self.sqlite_url}"]) 

168 

169 def test_init_tap_schema_mock(self) -> None: 

170 """Test init-tap-schema command with a mock URL, which should throw 

171 an error, as this is not supported. 

172 """ 

173 run_cli(["init-tap-schema", "sqlite://"], expect_error=True) 

174 

175 def test_init_tap_schema_with_extensions(self) -> None: 

176 """Test init-tap-schema command with default extensions.""" 

177 run_cli( 

178 [ 

179 "init-tap-schema", 

180 f"--engine-url={self.sqlite_url}", 

181 "--extensions", 

182 "resource://felis/config/tap_schema/tap_schema_extensions.yaml", 

183 ] 

184 ) 

185 

186 def test_init_tap_schema_with_custom_extensions(self) -> None: 

187 """Test init-tap-schema command with custom extensions file.""" 

188 extensions_file = os.path.join(self.tmpdir, "custom_extensions.yaml") 

189 extensions_content = """ 

190 name: TAP_SCHEMA 

191 tables: 

192 - name: schemas 

193 columns: 

194 - name: field1 

195 datatype: char 

196 length: 64 

197 description: A custom field 

198 """ 

199 with open(extensions_file, "w") as f: 

200 f.write(extensions_content) 

201 

202 run_cli(["init-tap-schema", f"--engine-url={self.sqlite_url}", "--extensions", extensions_file]) 

203 

204 def test_diff(self) -> None: 

205 """Test for ``diff`` command.""" 

206 test_diff1 = os.path.join(TEST_DIR, "data", "test_diff1.yaml") 

207 test_diff2 = os.path.join(TEST_DIR, "data", "test_diff2.yaml") 

208 

209 run_cli(["diff", test_diff1, test_diff2]) 

210 

211 def test_diff_database(self) -> None: 

212 """Test for ``diff`` command with database.""" 

213 test_diff1 = os.path.join(TEST_DIR, "data", "test_diff1.yaml") 

214 test_diff2 = os.path.join(TEST_DIR, "data", "test_diff2.yaml") 

215 

216 engine = create_engine(self.sqlite_url) 

217 metadata_db = MetaDataBuilder(Schema.from_uri(test_diff1), apply_schema_to_metadata=False).build() 

218 metadata_db.create_all(engine) 

219 engine.dispose() 

220 

221 run_cli(["diff", f"--engine-url={self.sqlite_url}", test_diff2]) 

222 

223 def test_diff_alembic(self) -> None: 

224 """Test for ``diff`` command with ``--alembic`` comparator option.""" 

225 test_diff1 = os.path.join(TEST_DIR, "data", "test_diff1.yaml") 

226 test_diff2 = os.path.join(TEST_DIR, "data", "test_diff2.yaml") 

227 run_cli(["diff", "--comparator", "alembic", test_diff1, test_diff2], print_output=True) 

228 

229 def test_diff_error(self) -> None: 

230 """Test for ``diff`` command with bad arguments.""" 

231 test_diff1 = os.path.join(TEST_DIR, "data", "test_diff1.yaml") 

232 run_cli(["diff", test_diff1], expect_error=True) 

233 

234 def test_diff_error_on_change(self) -> None: 

235 """Test for ``diff`` command with ``--error-on-change`` flag.""" 

236 test_diff1 = os.path.join(TEST_DIR, "data", "test_diff1.yaml") 

237 test_diff2 = os.path.join(TEST_DIR, "data", "test_diff2.yaml") 

238 run_cli(["diff", "--error-on-change", test_diff1, test_diff2], expect_error=True, print_output=True) 

239 

240 def test_dump_yaml(self) -> None: 

241 """Test for ``dump`` command with YAML output.""" 

242 with tempfile.NamedTemporaryFile(delete=True, suffix=".yaml") as temp_file: 

243 run_cli(["dump", TEST_YAML, temp_file.name], print_output=True) 

244 

245 @classmethod 

246 def _check_strip_ids(cls, obj: Any) -> None: 

247 """ 

248 Recursively check that a dict/list structure has no attributes with key 

249 '@id'. Raises a ValueError if any '@id' key is found. This is used to 

250 check the output of the `--strip-ids` option in the `dump` command. 

251 """ 

252 if isinstance(obj, dict): 

253 for k, v in obj.items(): 

254 if k == "@id": 

255 raise ValueError("Found forbidden key '@id'") 

256 cls._check_strip_ids(v) 

257 elif isinstance(obj, list): 

258 for item in obj: 

259 cls._check_strip_ids(item) 

260 

261 def test_dump_yaml_with_strip_ids(self) -> None: 

262 """Test for ``dump`` command with YAML output and stripped IDs.""" 

263 with tempfile.NamedTemporaryFile(delete=True, suffix=".yaml") as temp_file: 

264 run_cli(["dump", "--strip-ids", TEST_YAML, temp_file.name], print_output=True) 

265 dumped_data = temp_file.read().decode("utf-8") 

266 try: 

267 # Load the dumped YAML data to check for '@id' keys. 

268 data = yaml.safe_load(dumped_data) 

269 self._check_strip_ids(data) 

270 except ValueError: 

271 self.fail("Dumped YAML contains forbidden key '@id'") 

272 

273 def test_dump_json(self) -> None: 

274 """Test for ``dump`` command with JSON output.""" 

275 with tempfile.NamedTemporaryFile(delete=True, suffix=".json") as temp_file: 

276 run_cli(["dump", TEST_YAML, temp_file.name], print_output=True) 

277 

278 def test_dump_json_with_strip_ids(self) -> None: 

279 """Test for ``dump`` command with JSON output.""" 

280 with tempfile.NamedTemporaryFile(delete=True, suffix=".json") as temp_file: 

281 run_cli(["dump", "--strip-ids", TEST_YAML, temp_file.name], print_output=True) 

282 dumped_data = temp_file.read().decode("utf-8") 

283 try: 

284 # Load the dumped YAML data to check for '@id' keys. 

285 data = yaml.safe_load(dumped_data) 

286 self._check_strip_ids(data) 

287 except ValueError: 

288 self.fail("Dumped YAML contains forbidden key '@id'") 

289 

290 def test_dump_with_invalid_file_extension_error(self) -> None: 

291 """Test for ``dump`` command with JSON output.""" 

292 run_cli(["dump", TEST_YAML, "out.bad"], expect_error=True) 

293 

294 def test_create_and_drop_indexes(self) -> None: 

295 """Test creating and dropping indexes using CLI commands with 

296 SQLite; no checking for the existence of the indexes is done on the 

297 database because other test cases cover that functionality 

298 sufficiently. 

299 """ 

300 # Create database without indexes 

301 run_cli(["create", "--skip-indexes", f"--engine-url={self.sqlite_url}", TEST_SALES_YAML]) 

302 

303 # Create the indexes using CLI 

304 run_cli( 

305 ["create-indexes", f"--engine-url={self.sqlite_url}", TEST_SALES_YAML, "--schema-name", "main"] 

306 ) 

307 

308 # Create the indexes again; should not cause an error 

309 run_cli( 

310 ["create-indexes", f"--engine-url={self.sqlite_url}", TEST_SALES_YAML, "--schema-name", "main"] 

311 ) 

312 

313 # Drop the indexes using CLI 

314 run_cli(["drop-indexes", f"--engine-url={self.sqlite_url}", TEST_SALES_YAML, "--schema-name", "main"]) 

315 

316 def test_generate_and_load_sql(self) -> None: 

317 """Test generating SQL and then executing it on a SQLite database.""" 

318 generated_sql = os.path.join(self.tmpdir, "generated.sql") 

319 

320 try: 

321 # Generate SQL DDL from schema using mock connection 

322 run_cli( 

323 [ 

324 "create", 

325 "--engine-url=sqlite://", 

326 f"--output-file={generated_sql}", 

327 f"{TEST_YAML}", 

328 ] 

329 ) 

330 

331 # Verify the SQL file was generated 

332 self.assertTrue(os.path.exists(generated_sql), "Generated SQL file should exist") 

333 

334 # Read the generated SQL 

335 with open(generated_sql) as f: 

336 sql = f.read() 

337 

338 # Verify SQL content is not empty 

339 self.assertGreater(len(sql.strip()), 0, "Generated SQL should not be empty") 

340 

341 # Execute the SQL against a real database 

342 engine = create_engine(self.sqlite_url) 

343 with engine.connect() as connection: 

344 with connection.begin(): 

345 # Split SQL into individual statements for execution since 

346 # SQLite can only execute one statement at a time 

347 statements = [stmt.strip() for stmt in sql.split(";") if stmt.strip()] 

348 for statement in statements: 

349 if statement: # Skip empty statements 

350 connection.execute(text(statement)) 

351 

352 # Verify that all expected tables were actually created 

353 with engine.connect() as connection: 

354 # Load the schema to get expected table names 

355 schema = Schema.from_uri(TEST_YAML, context={"id_generation": True}) 

356 expected_table_names = {table.name for table in schema.tables} 

357 

358 # Get all tables that were created in the database 

359 result = connection.execute(text("SELECT name FROM sqlite_master WHERE type='table'")) 

360 created_table_names = {row[0] for row in result.fetchall()} 

361 

362 # Verify all expected tables were created 

363 self.assertTrue( 

364 expected_table_names.issubset(created_table_names), 

365 f"Missing tables: {expected_table_names - created_table_names}. " 

366 f"Expected: {sorted(expected_table_names)}, " 

367 f"Created: {sorted(created_table_names)}", 

368 ) 

369 

370 engine.dispose() 

371 

372 except Exception as e: 

373 self.fail(f"Test failed with exception: {e}") 

374 

375 

376class ColumnRefsTestCase(unittest.TestCase): 

377 """Test handling of column references in CLI.""" 

378 

379 def setUp(self) -> None: 

380 """Set up a temporary directory for tests.""" 

381 self.temp_dir = tempfile.mkdtemp(dir=TEST_DIR) 

382 self.sqlite_url = f"sqlite:///{self.temp_dir}/db.sqlite3" 

383 

384 # Write out source schema file 

385 source_schema_content = """ 

386name: source_schema 

387tables: 

388- name: source_table 

389 columns: 

390 - name: ref_col1 

391 datatype: int 

392 - name: ref_col2 

393 datatype: string 

394 length: 64 

395 - name: ref_col3 

396 datatype: float 

397""" 

398 source_schema_path = os.path.join(self.temp_dir, "source_schema.yaml") 

399 with open(source_schema_path, "w") as f: 

400 f.write(source_schema_content.strip()) 

401 

402 # Write out referencing schema file 

403 ref_schema_content = """ 

404name: ref_schema 

405resources: 

406 source_schema: 

407 uri: {resource_path} 

408tables: 

409- name: ref_table 

410 columnRefs: 

411 source_schema: 

412 source_table: 

413 ref_col1: 

414 ref_col2: 

415 overrides: 

416 tap:column_index: 15 

417 col3: 

418 ref_name: ref_col3 

419""" 

420 self.ref_schema_path = os.path.join(self.temp_dir, "ref_schema.yaml") 

421 ref_content = ref_schema_content.format(resource_path=source_schema_path) 

422 with open(self.ref_schema_path, "w") as f: 

423 f.write(ref_content.strip()) 

424 

425 def tearDown(self) -> None: 

426 """Clean up temporary directory.""" 

427 shutil.rmtree(self.temp_dir, ignore_errors=True) 

428 

429 def test_validate_with_column_ref_index_increment(self) -> None: 

430 """Test that passing a valid value for column reference index increment 

431 works. 

432 """ 

433 run_cli( 

434 [ 

435 "--column-ref-index-increment=1", 

436 "validate", 

437 self.ref_schema_path, 

438 ] 

439 ) 

440 

441 def test_validate_with_column_ref_index_increment_error(self) -> None: 

442 """Test that passing an invalid value for column reference index raises 

443 an error. 

444 """ 

445 run_cli( 

446 [ 

447 "--column-ref-index-increment=-1", 

448 "validate", 

449 self.ref_schema_path, 

450 ], 

451 expect_error=True, 

452 ) 

453 

454 def test_load_tap_schema_with_column_refs(self) -> None: 

455 """Test load-tap-schema command with column reference index 

456 increment. 

457 """ 

458 # Create the TAP_SCHEMA database 

459 run_cli(["init-tap-schema", f"--engine-url={self.sqlite_url}"]) 

460 

461 # Load the TAP_SCHEMA data that includes column references 

462 run_cli( 

463 [ 

464 "load-tap-schema", 

465 f"--engine-url={self.sqlite_url}", 

466 self.ref_schema_path, 

467 ] 

468 ) 

469 

470 def test_load_tap_schema_with_column_ref_index_increment(self) -> None: 

471 """Test load-tap-schema command with column reference index 

472 increment. 

473 """ 

474 # Create the TAP_SCHEMA database 

475 run_cli(["init-tap-schema", f"--engine-url={self.sqlite_url}"]) 

476 

477 # Load the TAP_SCHEMA data that includes column reference index 

478 # increment 

479 run_cli( 

480 [ 

481 "--column-ref-index-increment=1", 

482 "load-tap-schema", 

483 f"--engine-url={self.sqlite_url}", 

484 self.ref_schema_path, 

485 ] 

486 ) 

487 

488 

489if __name__ == "__main__": 489 ↛ 490line 489 didn't jump to line 490 because the condition on line 489 was never true

490 unittest.main()