Coverage for tests / test_diff.py: 10%
143 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:37 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:37 +0000
1# This file is part of felis.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22import unittest
24from sqlalchemy import create_engine
26from felis import datamodel as dm
27from felis.diff import DatabaseDiff, FormattedSchemaDiff, SchemaDiff
28from felis.metadata import MetaDataBuilder
31class TestSchemaDiff(unittest.TestCase):
32 """Test the SchemaDiff class."""
34 def _diff(self, schema1, schema2):
35 return SchemaDiff(schema1, schema2).diff
37 def test_schema_diff(self) -> None:
38 """Test the comparison output generated by the SchemaDiff class."""
39 # Two schemas with different values
40 schema1 = dm.Schema(name="schema1", id="#schema1", version="1.2.3", description="Schema 1", tables=[])
41 schema2 = dm.Schema(name="schema2", id="#schema2", version="4.5.6", description="Schema 2", tables=[])
42 diff = self._diff(schema1, schema2)
43 self.assertSetEqual(
44 set(diff.get("values_changed").keys()),
45 set(f"root['{key}']" for key in ["name", "id", "version", "description"]),
46 )
48 # Call formatted handler function
49 FormattedSchemaDiff(schema1, schema2)._handle_values_changed(diff["values_changed"])
51 # Table added
52 schema2.tables.append(dm.Table(name="table1", id="#table1", description="Table 1", columns=[]))
53 diff = self._diff(schema1, schema2)
54 self.assertIn("iterable_item_added", diff)
55 self.assertIn("root['tables'][0]", diff["iterable_item_added"])
57 # Call formatted handler function
58 FormattedSchemaDiff(schema1, schema2)._handle_iterable_item_added(diff["iterable_item_added"])
60 # Table removed
61 schema2.tables.clear()
62 schema1.tables.append(dm.Table(name="table1", id="#table1", description="Table 1", columns=[]))
63 diff = self._diff(schema1, schema2)
64 self.assertIn("iterable_item_removed", diff)
65 self.assertIn("root['tables'][0]", diff["iterable_item_removed"])
67 # Call formatted handler function
68 FormattedSchemaDiff(schema1, schema2)._handle_iterable_item_removed(diff["iterable_item_removed"])
70 # Different table descriptions
71 schema1 = dm.Schema(name="schema1", id="#schema1", version="1.2.3", description="Schema 1", tables=[])
72 schema2 = dm.Schema(name="schema1", id="#schema1", version="1.2.3", description="Schema 1", tables=[])
73 schema1.tables.append(dm.Table(name="table1", id="#table1", description="Table 1", columns=[]))
74 schema2.tables.append(dm.Table(name="table1", id="#table1", description="Table 2", columns=[]))
75 diff = self._diff(schema1, schema2)
76 self.assertIn("values_changed", diff)
77 self.assertIn("root['tables'][0]['description']", diff["values_changed"])
78 old_value = diff["values_changed"]["root['tables'][0]['description']"]["old_value"]
79 new_value = diff["values_changed"]["root['tables'][0]['description']"]["new_value"]
80 self.assertEqual(old_value, "Table 1")
81 self.assertEqual(new_value, "Table 2")
83 # Two different tables
84 schema1 = dm.Schema(name="schema1", id="#schema1", version="1.2.3", description="Schema 1", tables=[])
85 schema2 = dm.Schema(name="schema1", id="#schema1", version="1.2.3", description="Schema 1", tables=[])
86 schema1.tables.append(dm.Table(name="table1", id="#table1", description="Table 1", columns=[]))
87 schema2.tables.append(dm.Table(name="table2", id="#table2", description="Table 2", columns=[]))
88 diff = self._diff(schema1, schema2)
89 self.assertSetEqual(
90 set(diff.get("values_changed").keys()),
91 set(f"root['tables'][0]['{key}']" for key in ["name", "id", "description"]),
92 )
94 # Two tables with different columns
95 schema1 = dm.Schema(name="schema1", id="#schema1", version="1.2.3", description="Schema 1", tables=[])
96 schema2 = dm.Schema(name="schema1", id="#schema1", version="1.2.3", description="Schema 1", tables=[])
97 schema1.tables.append(dm.Table(name="table1", id="#table1", description="Table 1", columns=[]))
98 schema2.tables.append(dm.Table(name="table1", id="#table1", description="Table 1", columns=[]))
99 schema2.tables[0].columns.append(
100 dm.Column(name="column1", datatype="string", length=256, id="#column1", description="Column 1")
101 )
102 diff = self._diff(schema1, schema2)
103 self.assertIn("iterable_item_added", diff)
104 self.assertIn("root['tables'][0]['columns'][0]", diff["iterable_item_added"])
106 # Same columns in different order (no diff)
107 schema1 = dm.Schema(name="schema1", id="#schema1", version="1.2.3", description="Schema 1", tables=[])
108 schema2 = dm.Schema(name="schema1", id="#schema1", version="1.2.3", description="Schema 1", tables=[])
109 schema1.tables.append(dm.Table(name="table1", id="#table1", description="Table 1", columns=[]))
110 schema2.tables.append(dm.Table(name="table1", id="#table1", description="Table 1", columns=[]))
111 schema1.tables[0].columns.append(
112 dm.Column(name="column1", datatype="string", length=256, id="#column1", description="Column 1")
113 )
114 schema1.tables[0].columns.append(
115 dm.Column(name="column2", datatype="string", length=256, id="#column2", description="Column 2")
116 )
117 schema2.tables[0].columns.append(
118 dm.Column(name="column2", datatype="string", length=256, id="#column2", description="Column 2")
119 )
120 schema2.tables[0].columns.append(
121 dm.Column(name="column1", datatype="string", length=256, id="#column1", description="Column 1")
122 )
123 diff = self._diff(schema1, schema2)
124 self.assertEqual(len(diff), 0)
126 # Same columns with different descriptions
127 schema1 = dm.Schema(name="schema1", id="#schema1", version="1.2.3", description="Schema 1", tables=[])
128 schema2 = dm.Schema(name="schema1", id="#schema1", version="1.2.3", description="Schema 1", tables=[])
129 schema1.tables.append(dm.Table(name="table1", id="#table1", description="Table 1", columns=[]))
130 schema2.tables.append(dm.Table(name="table1", id="#table1", description="Table 1", columns=[]))
131 schema1.tables[0].columns.append(
132 dm.Column(name="column1", datatype="string", length=256, id="#column1", description="Column 1")
133 )
134 schema2.tables[0].columns.append(
135 dm.Column(name="column1", datatype="string", length=256, id="#column1", description="Column 2")
136 )
137 diff = self._diff(schema1, schema2)
138 self.assertIn("values_changed", diff)
139 self.assertIn("root['tables'][0]['columns'][0]['description']", diff["values_changed"])
140 old_value = diff["values_changed"]["root['tables'][0]['columns'][0]['description']"]["old_value"]
141 new_value = diff["values_changed"]["root['tables'][0]['columns'][0]['description']"]["new_value"]
142 self.assertEqual(old_value, "Column 1")
143 self.assertEqual(new_value, "Column 2")
145 # Added a field to a column
146 schema1 = dm.Schema(name="schema1", id="#schema1", version="1.2.3", description="Schema 1", tables=[])
147 schema2 = dm.Schema(name="schema1", id="#schema1", version="1.2.3", description="Schema 1", tables=[])
148 schema1.tables.append(dm.Table(name="table1", id="#table1", description="Table 1", columns=[]))
149 schema2.tables.append(dm.Table(name="table1", id="#table1", description="Table 1", columns=[]))
150 schema1.tables[0].columns.append(
151 dm.Column(name="column1", datatype="string", length=256, id="#column1", description="Column 1")
152 )
153 schema2.tables[0].columns.append(
154 dm.Column(
155 name="column1",
156 datatype="string",
157 length=256,
158 id="#column1",
159 description="Column 1",
160 ivoa_ucd="meta.id;src;meta.main ",
161 )
162 )
163 diff = self._diff(schema1, schema2)
164 self.assertIn("dictionary_item_added", diff)
165 self.assertIn("root['tables'][0]['columns'][0]['ivoa_ucd']", diff["dictionary_item_added"])
167 # Call formatted handler function
168 FormattedSchemaDiff(schema1, schema2)._handle_dictionary_item_added(diff["dictionary_item_added"])
170 # Removed a field from a column
171 schema1 = dm.Schema(name="schema1", id="#schema1", version="1.2.3", description="Schema 1", tables=[])
172 schema2 = dm.Schema(name="schema1", id="#schema1", version="1.2.3", description="Schema 1", tables=[])
173 schema1.tables.append(dm.Table(name="table1", id="#table1", description="Table 1", columns=[]))
174 schema2.tables.append(dm.Table(name="table1", id="#table1", description="Table 1", columns=[]))
175 schema1.tables[0].columns.append(
176 dm.Column(
177 name="column1",
178 datatype="string",
179 length=256,
180 id="#column1",
181 description="Column 1",
182 ivoa_ucd="meta.id;src;meta.main ",
183 )
184 )
185 schema2.tables[0].columns.append(
186 dm.Column(name="column1", datatype="string", length=256, id="#column1", description="Column 1")
187 )
188 diff = self._diff(schema1, schema2)
189 self.assertIn("dictionary_item_removed", diff)
190 self.assertIn("root['tables'][0]['columns'][0]['ivoa_ucd']", diff["dictionary_item_removed"])
192 # Call formatted handler function
193 FormattedSchemaDiff(schema1, schema2)._handle_dictionary_item_removed(diff["dictionary_item_removed"])
195 def test_index_diff(self) -> None:
196 """Test differences in indices between tables."""
197 schema1 = dm.Schema(name="schema1", id="#schema1", version="1.2.3", description="Schema 1", tables=[])
198 schema1.tables.append(dm.Table(name="table1", id="#table1", description="Table 1", columns=[]))
199 schema1.tables[0].columns.append(
200 dm.Column(name="column1", datatype="int", id="#column1", description="Column 1")
201 )
202 schema1.tables[0].indexes.append(
203 dm.Index(name="index1", id="#index1", description="Index 1", columns=["column1"])
204 )
206 schema2 = dm.Schema(name="schema1", id="#schema1", version="1.2.3", description="Schema 1", tables=[])
207 schema2.tables.append(dm.Table(name="table1", id="#table1", description="Table 1", columns=[]))
208 schema2.tables[0].columns.append(
209 dm.Column(name="column2", datatype="int", id="#column2", description="Column 2")
210 )
211 schema2.tables[0].indexes.append(
212 dm.Index(name="index1", id="#index1", description="Index 1", columns=["column2"])
213 )
214 diff = self._diff(schema1, schema2)
215 self.assertIn("values_changed", diff)
216 self.assertIn("root['tables'][0]['indexes'][0]['columns'][0]", diff["values_changed"])
217 new_value = diff["values_changed"]["root['tables'][0]['indexes'][0]['columns'][0]"]["new_value"]
218 old_value = diff["values_changed"]["root['tables'][0]['indexes'][0]['columns'][0]"]["old_value"]
219 self.assertEqual(old_value, "column1")
220 self.assertEqual(new_value, "column2")
222 # Print formatted diff to make sure it works for these changes
223 FormattedSchemaDiff(schema1, schema2).print()
225 def test_print(self) -> None:
226 schema1 = dm.Schema(name="schema1", id="#schema1", version="1.2.3", description="Schema 1", tables=[])
227 schema2 = dm.Schema(name="schema2", id="#schema2", version="4.5.6", description="Schema 2", tables=[])
228 SchemaDiff(schema1, schema2).print()
230 def test_formatted_print(self) -> None:
231 schema1 = dm.Schema(name="schema1", id="#schema1", version="1.2.3", description="Schema 1", tables=[])
232 schema2 = dm.Schema(name="schema2", id="#schema2", version="4.5.6", description="Schema 2", tables=[])
233 FormattedSchemaDiff(schema1, schema2).print()
235 def test_parse_deepdiff_path(self) -> None:
236 path = "root['tables'][0]['columns'][0]['ivoa_ucd']"
237 keys = FormattedSchemaDiff._parse_deepdiff_path(path)
238 self.assertListEqual(keys, ["tables", 0, "columns", 0, "ivoa_ucd"])
240 def test_get_id_error(self) -> None:
241 id_dict = {"tables": [{"indexes": [{"columns": [{"name": "column1"}, {"name": "column2"}]}]}]}
242 keys = ["tables", 0, "indexes", 0, "columns", 0]
243 with self.assertRaises(ValueError):
244 FormattedSchemaDiff._get_id(id_dict, keys)
247class TestDatabaseDiff(unittest.TestCase):
248 """Test the DatabaseDiff class."""
250 def test_database_diff(self) -> None:
251 """Test the comparison output generated by the DatabaseDiff class."""
252 # Two tables with different columns
253 schema1 = dm.Schema(name="schema1", id="#schema1", version="1.2.3", description="Schema 1", tables=[])
254 schema1.tables.append(dm.Table(name="table1", id="#table1", description="Table 1", columns=[]))
255 schema1.tables[0].columns.append(
256 dm.Column(name="column1", datatype="string", length=256, id="#column1", description="Column 1")
257 )
259 schema2 = dm.Schema(name="schema1", id="#schema1", version="1.2.3", description="Schema 1", tables=[])
260 schema2.tables.append(dm.Table(name="table1", id="#table1", description="Table 1", columns=[]))
261 schema2.tables[0].columns.append(
262 dm.Column(name="column1", datatype="string", length=256, id="#column1", description="Column 1")
263 )
264 schema2.tables[0].columns.append(
265 dm.Column(name="column2", datatype="string", length=256, id="#column2", description="Column 2")
266 )
268 metadata_db = MetaDataBuilder(schema1, apply_schema_to_metadata=False).build()
269 engine = create_engine("sqlite:///:memory:")
270 metadata_db.create_all(engine)
272 db_diff = DatabaseDiff(schema2, engine)
273 db_diff.print()
275 self.assertEqual(db_diff.diff[0][0], "add_column")
277 engine.dispose()