Coverage for tests / test_diff.py: 10%

143 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-26 08:49 +0000

1# This file is part of felis. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22import unittest 

23 

24from sqlalchemy import create_engine 

25 

26from felis import datamodel as dm 

27from felis.diff import DatabaseDiff, FormattedSchemaDiff, SchemaDiff 

28from felis.metadata import MetaDataBuilder 

29 

30 

31class TestSchemaDiff(unittest.TestCase): 

32 """Test the SchemaDiff class.""" 

33 

34 def _diff(self, schema1, schema2): 

35 return SchemaDiff(schema1, schema2).diff 

36 

37 def test_schema_diff(self) -> None: 

38 """Test the comparison output generated by the SchemaDiff class.""" 

39 # Two schemas with different values 

40 schema1 = dm.Schema(name="schema1", id="#schema1", version="1.2.3", description="Schema 1", tables=[]) 

41 schema2 = dm.Schema(name="schema2", id="#schema2", version="4.5.6", description="Schema 2", tables=[]) 

42 diff = self._diff(schema1, schema2) 

43 self.assertSetEqual( 

44 set(diff.get("values_changed").keys()), 

45 set(f"root['{key}']" for key in ["name", "id", "version", "description"]), 

46 ) 

47 

48 # Call formatted handler function 

49 FormattedSchemaDiff(schema1, schema2)._handle_values_changed(diff["values_changed"]) 

50 

51 # Table added 

52 schema2.tables.append(dm.Table(name="table1", id="#table1", description="Table 1", columns=[])) 

53 diff = self._diff(schema1, schema2) 

54 self.assertIn("iterable_item_added", diff) 

55 self.assertIn("root['tables'][0]", diff["iterable_item_added"]) 

56 

57 # Call formatted handler function 

58 FormattedSchemaDiff(schema1, schema2)._handle_iterable_item_added(diff["iterable_item_added"]) 

59 

60 # Table removed 

61 schema2.tables.clear() 

62 schema1.tables.append(dm.Table(name="table1", id="#table1", description="Table 1", columns=[])) 

63 diff = self._diff(schema1, schema2) 

64 self.assertIn("iterable_item_removed", diff) 

65 self.assertIn("root['tables'][0]", diff["iterable_item_removed"]) 

66 

67 # Call formatted handler function 

68 FormattedSchemaDiff(schema1, schema2)._handle_iterable_item_removed(diff["iterable_item_removed"]) 

69 

70 # Different table descriptions 

71 schema1 = dm.Schema(name="schema1", id="#schema1", version="1.2.3", description="Schema 1", tables=[]) 

72 schema2 = dm.Schema(name="schema1", id="#schema1", version="1.2.3", description="Schema 1", tables=[]) 

73 schema1.tables.append(dm.Table(name="table1", id="#table1", description="Table 1", columns=[])) 

74 schema2.tables.append(dm.Table(name="table1", id="#table1", description="Table 2", columns=[])) 

75 diff = self._diff(schema1, schema2) 

76 self.assertIn("values_changed", diff) 

77 self.assertIn("root['tables'][0]['description']", diff["values_changed"]) 

78 old_value = diff["values_changed"]["root['tables'][0]['description']"]["old_value"] 

79 new_value = diff["values_changed"]["root['tables'][0]['description']"]["new_value"] 

80 self.assertEqual(old_value, "Table 1") 

81 self.assertEqual(new_value, "Table 2") 

82 

83 # Two different tables 

84 schema1 = dm.Schema(name="schema1", id="#schema1", version="1.2.3", description="Schema 1", tables=[]) 

85 schema2 = dm.Schema(name="schema1", id="#schema1", version="1.2.3", description="Schema 1", tables=[]) 

86 schema1.tables.append(dm.Table(name="table1", id="#table1", description="Table 1", columns=[])) 

87 schema2.tables.append(dm.Table(name="table2", id="#table2", description="Table 2", columns=[])) 

88 diff = self._diff(schema1, schema2) 

89 self.assertSetEqual( 

90 set(diff.get("values_changed").keys()), 

91 set(f"root['tables'][0]['{key}']" for key in ["name", "id", "description"]), 

92 ) 

93 

94 # Two tables with different columns 

95 schema1 = dm.Schema(name="schema1", id="#schema1", version="1.2.3", description="Schema 1", tables=[]) 

96 schema2 = dm.Schema(name="schema1", id="#schema1", version="1.2.3", description="Schema 1", tables=[]) 

97 schema1.tables.append(dm.Table(name="table1", id="#table1", description="Table 1", columns=[])) 

98 schema2.tables.append(dm.Table(name="table1", id="#table1", description="Table 1", columns=[])) 

99 schema2.tables[0].columns.append( 

100 dm.Column(name="column1", datatype="string", length=256, id="#column1", description="Column 1") 

101 ) 

102 diff = self._diff(schema1, schema2) 

103 self.assertIn("iterable_item_added", diff) 

104 self.assertIn("root['tables'][0]['columns'][0]", diff["iterable_item_added"]) 

105 

106 # Same columns in different order (no diff) 

107 schema1 = dm.Schema(name="schema1", id="#schema1", version="1.2.3", description="Schema 1", tables=[]) 

108 schema2 = dm.Schema(name="schema1", id="#schema1", version="1.2.3", description="Schema 1", tables=[]) 

109 schema1.tables.append(dm.Table(name="table1", id="#table1", description="Table 1", columns=[])) 

110 schema2.tables.append(dm.Table(name="table1", id="#table1", description="Table 1", columns=[])) 

111 schema1.tables[0].columns.append( 

112 dm.Column(name="column1", datatype="string", length=256, id="#column1", description="Column 1") 

113 ) 

114 schema1.tables[0].columns.append( 

115 dm.Column(name="column2", datatype="string", length=256, id="#column2", description="Column 2") 

116 ) 

117 schema2.tables[0].columns.append( 

118 dm.Column(name="column2", datatype="string", length=256, id="#column2", description="Column 2") 

119 ) 

120 schema2.tables[0].columns.append( 

121 dm.Column(name="column1", datatype="string", length=256, id="#column1", description="Column 1") 

122 ) 

123 diff = self._diff(schema1, schema2) 

124 self.assertEqual(len(diff), 0) 

125 

126 # Same columns with different descriptions 

127 schema1 = dm.Schema(name="schema1", id="#schema1", version="1.2.3", description="Schema 1", tables=[]) 

128 schema2 = dm.Schema(name="schema1", id="#schema1", version="1.2.3", description="Schema 1", tables=[]) 

129 schema1.tables.append(dm.Table(name="table1", id="#table1", description="Table 1", columns=[])) 

130 schema2.tables.append(dm.Table(name="table1", id="#table1", description="Table 1", columns=[])) 

131 schema1.tables[0].columns.append( 

132 dm.Column(name="column1", datatype="string", length=256, id="#column1", description="Column 1") 

133 ) 

134 schema2.tables[0].columns.append( 

135 dm.Column(name="column1", datatype="string", length=256, id="#column1", description="Column 2") 

136 ) 

137 diff = self._diff(schema1, schema2) 

138 self.assertIn("values_changed", diff) 

139 self.assertIn("root['tables'][0]['columns'][0]['description']", diff["values_changed"]) 

140 old_value = diff["values_changed"]["root['tables'][0]['columns'][0]['description']"]["old_value"] 

141 new_value = diff["values_changed"]["root['tables'][0]['columns'][0]['description']"]["new_value"] 

142 self.assertEqual(old_value, "Column 1") 

143 self.assertEqual(new_value, "Column 2") 

144 

145 # Added a field to a column 

146 schema1 = dm.Schema(name="schema1", id="#schema1", version="1.2.3", description="Schema 1", tables=[]) 

147 schema2 = dm.Schema(name="schema1", id="#schema1", version="1.2.3", description="Schema 1", tables=[]) 

148 schema1.tables.append(dm.Table(name="table1", id="#table1", description="Table 1", columns=[])) 

149 schema2.tables.append(dm.Table(name="table1", id="#table1", description="Table 1", columns=[])) 

150 schema1.tables[0].columns.append( 

151 dm.Column(name="column1", datatype="string", length=256, id="#column1", description="Column 1") 

152 ) 

153 schema2.tables[0].columns.append( 

154 dm.Column( 

155 name="column1", 

156 datatype="string", 

157 length=256, 

158 id="#column1", 

159 description="Column 1", 

160 ivoa_ucd="meta.id;src;meta.main ", 

161 ) 

162 ) 

163 diff = self._diff(schema1, schema2) 

164 self.assertIn("dictionary_item_added", diff) 

165 self.assertIn("root['tables'][0]['columns'][0]['ivoa:ucd']", diff["dictionary_item_added"]) 

166 

167 # Call formatted handler function 

168 FormattedSchemaDiff(schema1, schema2)._handle_dictionary_item_added(diff["dictionary_item_added"]) 

169 

170 # Removed a field from a column 

171 schema1 = dm.Schema(name="schema1", id="#schema1", version="1.2.3", description="Schema 1", tables=[]) 

172 schema2 = dm.Schema(name="schema1", id="#schema1", version="1.2.3", description="Schema 1", tables=[]) 

173 schema1.tables.append(dm.Table(name="table1", id="#table1", description="Table 1", columns=[])) 

174 schema2.tables.append(dm.Table(name="table1", id="#table1", description="Table 1", columns=[])) 

175 schema1.tables[0].columns.append( 

176 dm.Column( 

177 name="column1", 

178 datatype="string", 

179 length=256, 

180 id="#column1", 

181 description="Column 1", 

182 ivoa_ucd="meta.id;src;meta.main ", 

183 ) 

184 ) 

185 schema2.tables[0].columns.append( 

186 dm.Column(name="column1", datatype="string", length=256, id="#column1", description="Column 1") 

187 ) 

188 diff = self._diff(schema1, schema2) 

189 self.assertIn("dictionary_item_removed", diff) 

190 self.assertIn("root['tables'][0]['columns'][0]['ivoa:ucd']", diff["dictionary_item_removed"]) 

191 

192 # Call formatted handler function 

193 FormattedSchemaDiff(schema1, schema2)._handle_dictionary_item_removed(diff["dictionary_item_removed"]) 

194 

195 def test_index_diff(self) -> None: 

196 """Test differences in indices between tables.""" 

197 schema1 = dm.Schema(name="schema1", id="#schema1", version="1.2.3", description="Schema 1", tables=[]) 

198 schema1.tables.append(dm.Table(name="table1", id="#table1", description="Table 1", columns=[])) 

199 schema1.tables[0].columns.append( 

200 dm.Column(name="column1", datatype="int", id="#column1", description="Column 1") 

201 ) 

202 schema1.tables[0].indexes.append( 

203 dm.Index(name="index1", id="#index1", description="Index 1", columns=["column1"]) 

204 ) 

205 

206 schema2 = dm.Schema(name="schema1", id="#schema1", version="1.2.3", description="Schema 1", tables=[]) 

207 schema2.tables.append(dm.Table(name="table1", id="#table1", description="Table 1", columns=[])) 

208 schema2.tables[0].columns.append( 

209 dm.Column(name="column2", datatype="int", id="#column2", description="Column 2") 

210 ) 

211 schema2.tables[0].indexes.append( 

212 dm.Index(name="index1", id="#index1", description="Index 1", columns=["column2"]) 

213 ) 

214 diff = self._diff(schema1, schema2) 

215 self.assertIn("values_changed", diff) 

216 self.assertIn("root['tables'][0]['indexes'][0]['columns'][0]", diff["values_changed"]) 

217 new_value = diff["values_changed"]["root['tables'][0]['indexes'][0]['columns'][0]"]["new_value"] 

218 old_value = diff["values_changed"]["root['tables'][0]['indexes'][0]['columns'][0]"]["old_value"] 

219 self.assertEqual(old_value, "column1") 

220 self.assertEqual(new_value, "column2") 

221 

222 # Print formatted diff to make sure it works for these changes 

223 FormattedSchemaDiff(schema1, schema2).print() 

224 

225 def test_print(self) -> None: 

226 schema1 = dm.Schema(name="schema1", id="#schema1", version="1.2.3", description="Schema 1", tables=[]) 

227 schema2 = dm.Schema(name="schema2", id="#schema2", version="4.5.6", description="Schema 2", tables=[]) 

228 SchemaDiff(schema1, schema2).print() 

229 

230 def test_formatted_print(self) -> None: 

231 schema1 = dm.Schema(name="schema1", id="#schema1", version="1.2.3", description="Schema 1", tables=[]) 

232 schema2 = dm.Schema(name="schema2", id="#schema2", version="4.5.6", description="Schema 2", tables=[]) 

233 FormattedSchemaDiff(schema1, schema2).print() 

234 

235 def test_parse_deepdiff_path(self) -> None: 

236 path = "root['tables'][0]['columns'][0]['ivoa:ucd']" 

237 keys = FormattedSchemaDiff._parse_deepdiff_path(path) 

238 self.assertListEqual(keys, ["tables", 0, "columns", 0, "ivoa:ucd"]) 

239 

240 def test_get_id_error(self) -> None: 

241 id_dict = {"tables": [{"indexes": [{"columns": [{"name": "column1"}, {"name": "column2"}]}]}]} 

242 keys = ["tables", 0, "indexes", 0, "columns", 0] 

243 with self.assertRaises(ValueError): 

244 FormattedSchemaDiff._get_id(id_dict, keys) 

245 

246 

247class TestDatabaseDiff(unittest.TestCase): 

248 """Test the DatabaseDiff class.""" 

249 

250 def test_database_diff(self) -> None: 

251 """Test the comparison output generated by the DatabaseDiff class.""" 

252 # Two tables with different columns 

253 schema1 = dm.Schema(name="schema1", id="#schema1", version="1.2.3", description="Schema 1", tables=[]) 

254 schema1.tables.append(dm.Table(name="table1", id="#table1", description="Table 1", columns=[])) 

255 schema1.tables[0].columns.append( 

256 dm.Column(name="column1", datatype="string", length=256, id="#column1", description="Column 1") 

257 ) 

258 

259 schema2 = dm.Schema(name="schema1", id="#schema1", version="1.2.3", description="Schema 1", tables=[]) 

260 schema2.tables.append(dm.Table(name="table1", id="#table1", description="Table 1", columns=[])) 

261 schema2.tables[0].columns.append( 

262 dm.Column(name="column1", datatype="string", length=256, id="#column1", description="Column 1") 

263 ) 

264 schema2.tables[0].columns.append( 

265 dm.Column(name="column2", datatype="string", length=256, id="#column2", description="Column 2") 

266 ) 

267 

268 metadata_db = MetaDataBuilder(schema1, apply_schema_to_metadata=False).build() 

269 engine = create_engine("sqlite:///:memory:") 

270 metadata_db.create_all(engine) 

271 

272 db_diff = DatabaseDiff(schema2, engine) 

273 db_diff.print() 

274 

275 self.assertEqual(db_diff.diff[0][0], "add_column") 

276 

277 engine.dispose()