Coverage for tests / test_metadata.py: 10%

145 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-07 08:14 +0000

1# This file is part of felis. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22import os 

23import unittest 

24 

25import yaml 

26from sqlalchemy import ( 

27 CheckConstraint, 

28 Constraint, 

29 ForeignKeyConstraint, 

30 Index, 

31 MetaData, 

32 PrimaryKeyConstraint, 

33 UniqueConstraint, 

34) 

35 

36from felis import datamodel as dm 

37from felis.datamodel import Schema 

38from felis.db.database_context import create_database_context 

39from felis.metadata import MetaDataBuilder, get_datatype_with_variants 

40 

41TESTDIR = os.path.abspath(os.path.dirname(__file__)) 

42TEST_YAML = os.path.join(TESTDIR, "data", "sales.yaml") 

43 

44 

45class MetaDataTestCase(unittest.TestCase): 

46 """Test creation of SQLAlchemy metadata from a Felis schema.""" 

47 

48 def setUp(self) -> None: 

49 """Load the YAML test data.""" 

50 with open(TEST_YAML) as data: 

51 self.yaml_data = yaml.safe_load(data) 

52 

53 def test_create_all(self) -> None: 

54 """Create all tables in the schema using the metadata object and a 

55 SQLite connection. 

56 

57 Check that the reflected metadata matches that created by the builder. 

58 """ 

59 

60 def _sorted_indexes(indexes: set[Index]) -> list[Index]: 

61 """Return a sorted list of indexes.""" 

62 return sorted(indexes, key=lambda i: i.name) 

63 

64 def _sorted_constraints(constraints: set[Constraint]) -> list[Constraint]: 

65 """Return a sorted list of constraints with the 

66 `PrimaryKeyConstraint` objects filtered out. 

67 """ 

68 return sorted( 

69 [c for c in constraints if not isinstance(c, PrimaryKeyConstraint)], key=lambda c: c.name 

70 ) 

71 

72 schema = Schema.model_validate(self.yaml_data) 

73 schema.name = "main" 

74 builder = MetaDataBuilder(schema) 

75 md = builder.build() 

76 

77 with create_database_context("sqlite:///:memory:", md) as db_ctx: 

78 db_ctx.create_all() 

79 

80 md_db = MetaData() 

81 md_db.reflect(db_ctx.engine.connect(), schema=schema.name) 

82 

83 self.assertEqual(md_db.tables.keys(), md.tables.keys()) 

84 

85 for md_table_name in md.tables.keys(): 

86 md_table = md.tables[md_table_name] 

87 md_db_table = md_db.tables[md_table_name] 

88 self.assertEqual(md_table.columns.keys(), md_db_table.columns.keys()) 

89 for md_column_name in md_table.columns.keys(): 

90 md_column = md_table.columns[md_column_name] 

91 md_db_column = md_db_table.columns[md_column_name] 

92 self.assertEqual(type(md_column.type), type(md_db_column.type)) 

93 self.assertEqual(md_column.nullable, md_db_column.nullable) 

94 self.assertEqual(md_column.primary_key, md_db_column.primary_key) 

95 self.assertTrue( 

96 (md_table.constraints and md_db_table.constraints) 

97 or (not md_table.constraints and not md_table.constraints), 

98 "Constraints not created correctly", 

99 ) 

100 if md_table.constraints: 

101 self.assertEqual(len(md_table.constraints), len(md_db_table.constraints)) 

102 md_constraints = _sorted_constraints(md_table.constraints) 

103 md_db_constraints = _sorted_constraints(md_db_table.constraints) 

104 for md_constraint, md_db_constraint in zip(md_constraints, md_db_constraints): 

105 self.assertEqual(md_constraint.name, md_db_constraint.name) 

106 self.assertEqual(md_constraint.deferrable, md_db_constraint.deferrable) 

107 self.assertEqual(md_constraint.initially, md_db_constraint.initially) 

108 self.assertEqual( 

109 type(md_constraint), type(md_db_constraint), "Constraint types do not match" 

110 ) 

111 if isinstance(md_constraint, ForeignKeyConstraint) and isinstance( 

112 md_db_constraint, ForeignKeyConstraint 

113 ): 

114 md_fk: ForeignKeyConstraint = md_constraint 

115 md_db_fk: ForeignKeyConstraint = md_db_constraint 

116 self.assertEqual(md_fk.referred_table.name, md_db_fk.referred_table.name) 

117 self.assertEqual(md_fk.column_keys, md_db_fk.column_keys) 

118 self.assertEqual(md_fk.ondelete, md_db_fk.ondelete) 

119 self.assertEqual(md_fk.onupdate, md_db_fk.onupdate) 

120 elif isinstance(md_constraint, UniqueConstraint) and isinstance( 

121 md_db_constraint, UniqueConstraint 

122 ): 

123 md_uniq: UniqueConstraint = md_constraint 

124 md_db_uniq: UniqueConstraint = md_db_constraint 

125 self.assertEqual(md_uniq.columns.keys(), md_db_uniq.columns.keys()) 

126 elif isinstance(md_constraint, CheckConstraint) and isinstance( 

127 md_db_constraint, CheckConstraint 

128 ): 

129 md_check: CheckConstraint = md_constraint 

130 md_db_check: CheckConstraint = md_db_constraint 

131 self.assertEqual(str(md_check.sqltext), str(md_db_check.sqltext)) 

132 self.assertTrue( 

133 (md_table.indexes and md_db_table.indexes) 

134 or (not md_table.indexes and not md_table.indexes), 

135 "Indexes not created correctly", 

136 ) 

137 if md_table.indexes: 

138 md_indexes = _sorted_indexes(md_table.indexes) 

139 md_db_indexes = _sorted_indexes(md_db_table.indexes) 

140 self.assertEqual(len(md_indexes), len(md_db_indexes)) 

141 for md_index, md_db_index in zip(md_table.indexes, md_db_table.indexes): 

142 self.assertEqual(md_index.name, md_db_index.name) 

143 self.assertEqual(md_index.columns.keys(), md_db_index.columns.keys()) 

144 

145 def test_builder(self) -> None: 

146 """Test that the information in the metadata object created by the 

147 builder matches the data in the Felis schema used to create it. 

148 """ 

149 sch = Schema.model_validate(self.yaml_data) 

150 bld = MetaDataBuilder(sch, apply_schema_to_metadata=False) 

151 md = bld.build() 

152 

153 self.assertEqual(len(sch.tables), len(md.tables)) 

154 self.assertEqual([table.name for table in sch.tables], list(md.tables.keys())) 

155 for table in sch.tables: 

156 md_table = md.tables[table.name] 

157 self.assertEqual(table.name, md_table.name) 

158 self.assertEqual(len(table.columns), len(md_table.columns)) 

159 for column in table.columns: 

160 md_table_column = md_table.columns[column.name] 

161 datatype = get_datatype_with_variants(column) 

162 self.assertEqual(type(datatype), type(md_table_column.type)) 

163 if column.nullable is not None: 

164 self.assertEqual(column.nullable, md_table_column.nullable) 

165 for constraint in table.constraints: 

166 md_constraint = [mdc for mdc in md_table.constraints if mdc.name == constraint.name][0] 

167 if isinstance(constraint, dm.ForeignKeyConstraint): 

168 self.assertTrue(isinstance(md_constraint, ForeignKeyConstraint)) 

169 self.assertTrue( 

170 sorted([sch[column_id].name for column_id in constraint.columns]), 

171 sorted(md_constraint.columns.keys()), 

172 ) 

173 elif isinstance(constraint, dm.UniqueConstraint): 

174 self.assertEqual( 

175 sorted([sch[column_id].name for column_id in constraint.columns]), 

176 sorted(md_constraint.columns.keys()), 

177 ) 

178 elif isinstance(constraint, dm.CheckConstraint): 

179 self.assertEqual(constraint.expression, str(md_constraint.sqltext)) 

180 for index in table.indexes: 

181 md_index = [mdi for mdi in md_table.indexes if mdi.name == index.name][0] 

182 self.assertEqual( 

183 sorted([sch[column_id].name for column_id in index.columns]), 

184 sorted(md_index.columns.keys()), 

185 ) 

186 if table.primary_key: 

187 if isinstance(table.primary_key, str): 

188 primary_keys = [sch[table.primary_key].name] 

189 else: 

190 primary_keys = [sch[pk].name for pk in table.primary_key] 

191 for primary_key in primary_keys: 

192 self.assertTrue(md_table.columns[primary_key].primary_key) 

193 

194 def test_timestamp(self) -> None: 

195 """Test that the `timestamp` datatype is created correctly.""" 

196 for precision in [None, 6]: 

197 col = dm.Column( 

198 **{ 

199 "name": "timestamp_test", 

200 "id": "#timestamp_test", 

201 "datatype": "timestamp", 

202 "precision": precision, 

203 } 

204 ) 

205 datatype = get_datatype_with_variants(col) 

206 variant_dict = datatype._variant_mapping 

207 self.assertTrue("mysql" in variant_dict) 

208 self.assertTrue("postgresql" in variant_dict) 

209 pg_timestamp = variant_dict["postgresql"] 

210 self.assertEqual(pg_timestamp.timezone, False) 

211 self.assertEqual(pg_timestamp.precision, precision) 

212 mysql_timestamp = variant_dict["mysql"] 

213 self.assertEqual(mysql_timestamp.timezone, False) 

214 self.assertEqual(mysql_timestamp.fsp, precision) 

215 

216 def test_ignore_constraints(self) -> None: 

217 """Test that constraints are not created when the 

218 ``ignore_constraints`` flag is set on the metadata builder. 

219 """ 

220 schema = Schema.model_validate(self.yaml_data) 

221 schema.name = "main" 

222 builder = MetaDataBuilder(schema, ignore_constraints=True) 

223 md = builder.build() 

224 for table in md.tables.values(): 

225 non_primary_key_constraints = [ 

226 c for c in table.constraints if not isinstance(c, PrimaryKeyConstraint) 

227 ] 

228 self.assertEqual( 

229 len(non_primary_key_constraints), 

230 0, 

231 msg=f"Table {table.name} has non-primary key constraints defined", 

232 ) 

233 

234 def test_table_name_postfix(self) -> None: 

235 """Test that table name postfixes are correctly applied.""" 

236 schema = Schema.model_validate(self.yaml_data) 

237 schema.name = "main" 

238 builder = MetaDataBuilder(schema, table_name_postfix="_test") 

239 md = builder.build() 

240 for table in md.tables.values(): 

241 self.assertTrue(table.name.endswith("_test")) 

242 

243 def test_fk_actions(self) -> None: 

244 """Test that foreign key constraints with on delete and on update 

245 actions are created correctly. 

246 """ 

247 schema = Schema.model_validate(self.yaml_data) 

248 schema.name = "main" 

249 builder = MetaDataBuilder(schema) 

250 md = builder.build() 

251 

252 for table in md.tables.values(): 

253 for constraint in table.constraints: 

254 if isinstance(constraint, ForeignKeyConstraint): 

255 self.assertIn(constraint.ondelete, ["SET NULL"]) 

256 self.assertIn(constraint.onupdate, ["CASCADE"]) 

257 

258 

259if __name__ == "__main__": 259 ↛ 260line 259 didn't jump to line 260 because the condition on line 259 was never true

260 unittest.main()