Coverage for tests / test_cli.py: 25%
230 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-07 08:14 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-07 08:14 +0000
1# This file is part of felis.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22import logging
23import os
24import shutil
25import tempfile
26import unittest
27from typing import Any
29import yaml
30from sqlalchemy import create_engine, text
32import felis.tap_schema as tap_schema
33from felis.datamodel import Schema
34from felis.metadata import MetaDataBuilder
35from felis.tests.run_cli import run_cli
37TEST_DIR = os.path.abspath(os.path.dirname(__file__))
38TEST_YAML = os.path.join(TEST_DIR, "data", "test.yml")
39TEST_SALES_YAML = os.path.join(TEST_DIR, "data", "sales.yaml")
42class CliTestCase(unittest.TestCase):
43 """Tests for CLI commands."""
45 def setUp(self) -> None:
46 """Set up a temporary directory for tests."""
47 self.tmpdir = tempfile.mkdtemp(dir=TEST_DIR)
48 self.sqlite_url = f"sqlite:///{self.tmpdir}/db.sqlite3"
49 print(f"Using temporary directory: {self.tmpdir}")
51 # Clear any existing logging handlers to ensure fresh configuration for
52 # each test
53 for handler in logging.root.handlers[:]:
54 logging.root.removeHandler(handler)
56 def tearDown(self) -> None:
57 """Clean up temporary directory."""
58 shutil.rmtree(self.tmpdir, ignore_errors=True)
60 def test_invalid_command(self) -> None:
61 """Test for invalid command."""
62 run_cli(["invalid"], expect_error=True)
64 def test_help(self) -> None:
65 """Test for help command."""
66 run_cli(["--help"], print_output=True)
68 def test_create(self) -> None:
69 """Test for create command."""
70 run_cli(["create", f"--engine-url={self.sqlite_url}", TEST_YAML])
72 def test_create_with_echo(self) -> None:
73 """Test for create command."""
74 run_cli(["create", "--echo", f"--engine-url={self.sqlite_url}", TEST_YAML])
76 def test_create_with_dry_run(self) -> None:
77 """Test for ``create --dry-run`` command."""
78 run_cli(["create", "--schema-name=main", f"--engine-url={self.sqlite_url}", "--dry-run", TEST_YAML])
80 def test_create_with_ignore_constraints(self) -> None:
81 """Test ``--ignore-constraints`` flag of ``create`` command."""
82 run_cli(
83 [
84 "create",
85 "--schema-name=main",
86 "--ignore-constraints",
87 f"--engine-url={self.sqlite_url}",
88 "--dry-run",
89 TEST_YAML,
90 ]
91 )
93 def test_validate(self) -> None:
94 """Test validate command."""
95 run_cli(["validate", TEST_YAML])
97 def test_validate_with_log_file(self) -> None:
98 """Test validate command with log file."""
99 log_file = os.path.join(self.tmpdir, "validate.log")
100 run_cli([f"--log-file={log_file}", "validate", TEST_YAML], log_level=logging.DEBUG, print_cmd=True)
101 if not os.path.exists(log_file):
102 self.fail("Log file was not created")
103 if os.path.getsize(log_file) == 0:
104 self.fail("Log file is empty")
106 def test_validate_with_id_generation(self) -> None:
107 """Test that loading a schema with IDs works if ID generation is
108 enabled. This is the default behavior.
109 """
110 test_yaml = os.path.join(TEST_DIR, "data", "test_id_generation.yaml")
111 run_cli(["--id-generation", "validate", test_yaml])
113 def test_validate_with_id_generation_error(self) -> None:
114 """Test that loading a schema without IDs fails if ID generation is not
115 enabled.
116 """
117 test_yaml = os.path.join(TEST_DIR, "data", "test_id_generation.yaml")
118 run_cli(["--no-id-generation", "validate", test_yaml], expect_error=True)
120 def test_validate_with_extra_checks(self) -> None:
121 """Test schema validation flags."""
122 run_cli(
123 [
124 "validate",
125 "--check-description",
126 "--check-tap-principal",
127 "--check-tap-table-indexes",
128 TEST_YAML,
129 ]
130 )
132 def test_create_with_initialize_and_drop_error(self) -> None:
133 """Test that initialize and drop can't be used together."""
134 run_cli(["create", "--initialize", "--drop", TEST_YAML], expect_error=True)
136 def test_load_tap_schema(self) -> None:
137 """Test load-tap-schema command."""
138 # Create the TAP_SCHEMA database.
139 tap_schema_path = tap_schema.TableManager.get_tap_schema_std_path()
140 run_cli(["--id-generation", "create", f"--engine-url={self.sqlite_url}", tap_schema_path])
142 # Load the TAP_SCHEMA data.
143 run_cli(["load-tap-schema", f"--engine-url={self.sqlite_url}", TEST_YAML])
145 def test_load_tap_schema_with_dry_run_and_output_file(self) -> None:
146 """Test load-tap-schema command with dry run and output file."""
147 output_sql = os.path.join(self.tmpdir, "tap_schema.sql")
148 run_cli(
149 [
150 "load-tap-schema",
151 "--engine-url=mysql://",
152 "--dry-run",
153 "--tap-schema-index=1",
154 "--tap-tables-postfix=11",
155 "--force-unbounded-arraysize",
156 f"--output-file={output_sql}",
157 TEST_YAML,
158 ]
159 )
160 if not os.path.exists(output_sql):
161 self.fail("Output SQL file was not created")
162 if os.path.getsize(output_sql) == 0:
163 self.fail("Output SQL file is empty")
165 def test_init_tap_schema(self) -> None:
166 """Test init-tap-schema command."""
167 run_cli(["init-tap-schema", f"--engine-url={self.sqlite_url}"])
169 def test_init_tap_schema_mock(self) -> None:
170 """Test init-tap-schema command with a mock URL, which should throw
171 an error, as this is not supported.
172 """
173 run_cli(["init-tap-schema", "sqlite://"], expect_error=True)
175 def test_init_tap_schema_with_extensions(self) -> None:
176 """Test init-tap-schema command with default extensions."""
177 run_cli(
178 [
179 "init-tap-schema",
180 f"--engine-url={self.sqlite_url}",
181 "--extensions",
182 "resource://felis/config/tap_schema/tap_schema_extensions.yaml",
183 ]
184 )
186 def test_init_tap_schema_with_custom_extensions(self) -> None:
187 """Test init-tap-schema command with custom extensions file."""
188 extensions_file = os.path.join(self.tmpdir, "custom_extensions.yaml")
189 extensions_content = """
190 name: TAP_SCHEMA
191 tables:
192 - name: schemas
193 columns:
194 - name: field1
195 datatype: char
196 length: 64
197 description: A custom field
198 """
199 with open(extensions_file, "w") as f:
200 f.write(extensions_content)
202 run_cli(["init-tap-schema", f"--engine-url={self.sqlite_url}", "--extensions", extensions_file])
204 def test_diff(self) -> None:
205 """Test for ``diff`` command."""
206 test_diff1 = os.path.join(TEST_DIR, "data", "test_diff1.yaml")
207 test_diff2 = os.path.join(TEST_DIR, "data", "test_diff2.yaml")
209 run_cli(["diff", test_diff1, test_diff2])
211 def test_diff_database(self) -> None:
212 """Test for ``diff`` command with database."""
213 test_diff1 = os.path.join(TEST_DIR, "data", "test_diff1.yaml")
214 test_diff2 = os.path.join(TEST_DIR, "data", "test_diff2.yaml")
216 engine = create_engine(self.sqlite_url)
217 metadata_db = MetaDataBuilder(Schema.from_uri(test_diff1), apply_schema_to_metadata=False).build()
218 metadata_db.create_all(engine)
219 engine.dispose()
221 run_cli(["diff", f"--engine-url={self.sqlite_url}", test_diff2])
223 def test_diff_alembic(self) -> None:
224 """Test for ``diff`` command with ``--alembic`` comparator option."""
225 test_diff1 = os.path.join(TEST_DIR, "data", "test_diff1.yaml")
226 test_diff2 = os.path.join(TEST_DIR, "data", "test_diff2.yaml")
227 run_cli(["diff", "--comparator", "alembic", test_diff1, test_diff2], print_output=True)
229 def test_diff_error(self) -> None:
230 """Test for ``diff`` command with bad arguments."""
231 test_diff1 = os.path.join(TEST_DIR, "data", "test_diff1.yaml")
232 run_cli(["diff", test_diff1], expect_error=True)
234 def test_diff_error_on_change(self) -> None:
235 """Test for ``diff`` command with ``--error-on-change`` flag."""
236 test_diff1 = os.path.join(TEST_DIR, "data", "test_diff1.yaml")
237 test_diff2 = os.path.join(TEST_DIR, "data", "test_diff2.yaml")
238 run_cli(["diff", "--error-on-change", test_diff1, test_diff2], expect_error=True, print_output=True)
240 def test_dump_yaml(self) -> None:
241 """Test for ``dump`` command with YAML output."""
242 with tempfile.NamedTemporaryFile(delete=True, suffix=".yaml") as temp_file:
243 run_cli(["dump", TEST_YAML, temp_file.name], print_output=True)
245 @classmethod
246 def _check_strip_ids(cls, obj: Any) -> None:
247 """
248 Recursively check that a dict/list structure has no attributes with key
249 '@id'. Raises a ValueError if any '@id' key is found. This is used to
250 check the output of the `--strip-ids` option in the `dump` command.
251 """
252 if isinstance(obj, dict):
253 for k, v in obj.items():
254 if k == "@id":
255 raise ValueError("Found forbidden key '@id'")
256 cls._check_strip_ids(v)
257 elif isinstance(obj, list):
258 for item in obj:
259 cls._check_strip_ids(item)
261 def test_dump_yaml_with_strip_ids(self) -> None:
262 """Test for ``dump`` command with YAML output and stripped IDs."""
263 with tempfile.NamedTemporaryFile(delete=True, suffix=".yaml") as temp_file:
264 run_cli(["dump", "--strip-ids", TEST_YAML, temp_file.name], print_output=True)
265 dumped_data = temp_file.read().decode("utf-8")
266 try:
267 # Load the dumped YAML data to check for '@id' keys.
268 data = yaml.safe_load(dumped_data)
269 self._check_strip_ids(data)
270 except ValueError:
271 self.fail("Dumped YAML contains forbidden key '@id'")
273 def test_dump_json(self) -> None:
274 """Test for ``dump`` command with JSON output."""
275 with tempfile.NamedTemporaryFile(delete=True, suffix=".json") as temp_file:
276 run_cli(["dump", TEST_YAML, temp_file.name], print_output=True)
278 def test_dump_with_dereference_resources_and_sort_columns(self) -> None:
279 """Test dump with both --dereference-resources and --sort-columns."""
280 # Define a source schema with columns in non-alphabetical order
281 source_schema_content = """
282name: base_schema
283tables:
284- name: base_table
285 columns:
286 - name: zebra_col
287 datatype: string
288 length: 32
289 - name: alpha_col
290 datatype: int
291 - name: middle_col
292 datatype: float
293"""
294 source_path = os.path.join(self.tmpdir, "base_schema.yaml")
295 with open(source_path, "w") as f:
296 f.write(source_schema_content.strip())
298 # Define a referencing schema that pulls columns via columnRefs
299 ref_schema_content = f"""
300name: derived_schema
301resources:
302 base_schema:
303 uri: {source_path}
304tables:
305- name: derived_table
306 columnRefs:
307 base_schema:
308 base_table:
309 zebra_col:
310 alpha_col:
311 middle_col:
312"""
313 ref_path = os.path.join(self.tmpdir, "derived_schema.yaml")
314 with open(ref_path, "w") as f:
315 f.write(ref_schema_content.strip())
317 with tempfile.NamedTemporaryFile(delete=True, suffix=".yaml", dir=self.tmpdir) as temp_file:
318 run_cli(
319 [
320 "dump",
321 "--dereference-resources",
322 "--sort-columns",
323 ref_path,
324 temp_file.name,
325 ],
326 print_output=True,
327 )
328 dumped_data = temp_file.read().decode("utf-8")
329 data = yaml.safe_load(dumped_data)
331 # Verify resources are dereferenced (no columnRefs remain)
332 for table in data.get("tables", []):
333 self.assertNotIn("columnRefs", table)
334 # Verify columns are present and sorted alphabetically
335 columns = table.get("columns", [])
336 self.assertGreater(len(columns), 0)
337 names = [col["name"] for col in columns]
338 self.assertEqual(names, sorted(names))
340 def test_dump_json_with_strip_ids(self) -> None:
341 """Test for ``dump`` command with JSON output."""
342 with tempfile.NamedTemporaryFile(delete=True, suffix=".json") as temp_file:
343 run_cli(["dump", "--strip-ids", TEST_YAML, temp_file.name], print_output=True)
344 dumped_data = temp_file.read().decode("utf-8")
345 try:
346 # Load the dumped YAML data to check for '@id' keys.
347 data = yaml.safe_load(dumped_data)
348 self._check_strip_ids(data)
349 except ValueError:
350 self.fail("Dumped YAML contains forbidden key '@id'")
352 @classmethod
353 def _check_columns_sorted(cls, data: dict[str, Any]) -> None:
354 """Check that columns in each table are sorted alphabetically by
355 name.
356 """
357 for table in data.get("tables", []):
358 columns = table.get("columns", [])
359 names = [col["name"] for col in columns]
360 assert names == sorted(names), f"Columns not sorted in table {table.get('name')}: {names}"
362 def test_dump_yaml_with_sort_columns(self) -> None:
363 """Test for ``dump`` command with YAML output and sorted columns."""
364 with tempfile.NamedTemporaryFile(delete=True, suffix=".yaml") as temp_file:
365 run_cli(["dump", "--sort-columns", TEST_YAML, temp_file.name], print_output=True)
366 dumped_data = temp_file.read().decode("utf-8")
367 data = yaml.safe_load(dumped_data)
368 self._check_columns_sorted(data)
370 def test_dump_json_with_sort_columns(self) -> None:
371 """Test for ``dump`` command with JSON output and sorted columns."""
372 with tempfile.NamedTemporaryFile(delete=True, suffix=".json") as temp_file:
373 run_cli(["dump", "--sort-columns", TEST_YAML, temp_file.name], print_output=True)
374 dumped_data = temp_file.read().decode("utf-8")
375 data = yaml.safe_load(dumped_data)
376 self._check_columns_sorted(data)
378 def test_dump_with_invalid_file_extension_error(self) -> None:
379 """Test for ``dump`` command with JSON output."""
380 run_cli(["dump", TEST_YAML, "out.bad"], expect_error=True)
382 def test_create_and_drop_indexes(self) -> None:
383 """Test creating and dropping indexes using CLI commands with
384 SQLite; no checking for the existence of the indexes is done on the
385 database because other test cases cover that functionality
386 sufficiently.
387 """
388 # Create database without indexes
389 run_cli(["create", "--skip-indexes", f"--engine-url={self.sqlite_url}", TEST_SALES_YAML])
391 # Create the indexes using CLI
392 run_cli(
393 ["create-indexes", f"--engine-url={self.sqlite_url}", TEST_SALES_YAML, "--schema-name", "main"]
394 )
396 # Create the indexes again; should not cause an error
397 run_cli(
398 ["create-indexes", f"--engine-url={self.sqlite_url}", TEST_SALES_YAML, "--schema-name", "main"]
399 )
401 # Drop the indexes using CLI
402 run_cli(["drop-indexes", f"--engine-url={self.sqlite_url}", TEST_SALES_YAML, "--schema-name", "main"])
404 def test_generate_and_load_sql(self) -> None:
405 """Test generating SQL and then executing it on a SQLite database."""
406 generated_sql = os.path.join(self.tmpdir, "generated.sql")
408 try:
409 # Generate SQL DDL from schema using mock connection
410 run_cli(
411 [
412 "create",
413 "--engine-url=sqlite://",
414 f"--output-file={generated_sql}",
415 f"{TEST_YAML}",
416 ]
417 )
419 # Verify the SQL file was generated
420 self.assertTrue(os.path.exists(generated_sql), "Generated SQL file should exist")
422 # Read the generated SQL
423 with open(generated_sql) as f:
424 sql = f.read()
426 # Verify SQL content is not empty
427 self.assertGreater(len(sql.strip()), 0, "Generated SQL should not be empty")
429 # Execute the SQL against a real database
430 engine = create_engine(self.sqlite_url)
431 with engine.connect() as connection:
432 with connection.begin():
433 # Split SQL into individual statements for execution since
434 # SQLite can only execute one statement at a time
435 statements = [stmt.strip() for stmt in sql.split(";") if stmt.strip()]
436 for statement in statements:
437 if statement: # Skip empty statements
438 connection.execute(text(statement))
440 # Verify that all expected tables were actually created
441 with engine.connect() as connection:
442 # Load the schema to get expected table names
443 schema = Schema.from_uri(TEST_YAML, context={"id_generation": True})
444 expected_table_names = {table.name for table in schema.tables}
446 # Get all tables that were created in the database
447 result = connection.execute(text("SELECT name FROM sqlite_master WHERE type='table'"))
448 created_table_names = {row[0] for row in result.fetchall()}
450 # Verify all expected tables were created
451 self.assertTrue(
452 expected_table_names.issubset(created_table_names),
453 f"Missing tables: {expected_table_names - created_table_names}. "
454 f"Expected: {sorted(expected_table_names)}, "
455 f"Created: {sorted(created_table_names)}",
456 )
458 engine.dispose()
460 except Exception as e:
461 self.fail(f"Test failed with exception: {e}")
464class ColumnRefsTestCase(unittest.TestCase):
465 """Test handling of column references in CLI."""
467 def setUp(self) -> None:
468 """Set up a temporary directory for tests."""
469 self.temp_dir = tempfile.mkdtemp(dir=TEST_DIR)
470 self.sqlite_url = f"sqlite:///{self.temp_dir}/db.sqlite3"
472 # Write out source schema file
473 source_schema_content = """
474name: source_schema
475tables:
476- name: source_table
477 columns:
478 - name: ref_col1
479 datatype: int
480 - name: ref_col2
481 datatype: string
482 length: 64
483 - name: ref_col3
484 datatype: float
485"""
486 source_schema_path = os.path.join(self.temp_dir, "source_schema.yaml")
487 with open(source_schema_path, "w") as f:
488 f.write(source_schema_content.strip())
490 # Write out referencing schema file
491 ref_schema_content = """
492name: ref_schema
493resources:
494 source_schema:
495 uri: {resource_path}
496tables:
497- name: ref_table
498 columnRefs:
499 source_schema:
500 source_table:
501 ref_col1:
502 ref_col2:
503 overrides:
504 tap:column_index: 15
505 col3:
506 ref_name: ref_col3
507"""
508 self.ref_schema_path = os.path.join(self.temp_dir, "ref_schema.yaml")
509 ref_content = ref_schema_content.format(resource_path=source_schema_path)
510 with open(self.ref_schema_path, "w") as f:
511 f.write(ref_content.strip())
513 def tearDown(self) -> None:
514 """Clean up temporary directory."""
515 shutil.rmtree(self.temp_dir, ignore_errors=True)
517 def test_validate_with_column_ref_index_increment(self) -> None:
518 """Test that passing a valid value for column reference index increment
519 works.
520 """
521 run_cli(
522 [
523 "--column-ref-index-increment=1",
524 "validate",
525 self.ref_schema_path,
526 ]
527 )
529 def test_validate_with_column_ref_index_increment_error(self) -> None:
530 """Test that passing an invalid value for column reference index raises
531 an error.
532 """
533 run_cli(
534 [
535 "--column-ref-index-increment=-1",
536 "validate",
537 self.ref_schema_path,
538 ],
539 expect_error=True,
540 )
542 def test_load_tap_schema_with_column_refs(self) -> None:
543 """Test load-tap-schema command with column reference index
544 increment.
545 """
546 # Create the TAP_SCHEMA database
547 run_cli(["init-tap-schema", f"--engine-url={self.sqlite_url}"])
549 # Load the TAP_SCHEMA data that includes column references
550 run_cli(
551 [
552 "load-tap-schema",
553 f"--engine-url={self.sqlite_url}",
554 self.ref_schema_path,
555 ]
556 )
558 def test_load_tap_schema_with_column_ref_index_increment(self) -> None:
559 """Test load-tap-schema command with column reference index
560 increment.
561 """
562 # Create the TAP_SCHEMA database
563 run_cli(["init-tap-schema", f"--engine-url={self.sqlite_url}"])
565 # Load the TAP_SCHEMA data that includes column reference index
566 # increment
567 run_cli(
568 [
569 "--column-ref-index-increment=1",
570 "load-tap-schema",
571 f"--engine-url={self.sqlite_url}",
572 self.ref_schema_path,
573 ]
574 )
577if __name__ == "__main__": 577 ↛ 578line 577 didn't jump to line 578 because the condition on line 577 was never true
578 unittest.main()