Coverage for tests / test_versioning.py: 29%

110 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-24 08:17 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28import os 

29import os.path 

30import tempfile 

31import unittest 

32 

33from lsst.daf.butler.registry.attributes import DefaultButlerAttributeManager 

34from lsst.daf.butler.registry.databases.sqlite import SqliteDatabase 

35from lsst.daf.butler.registry.interfaces import ( 

36 Database, 

37 IncompatibleVersionError, 

38 VersionedExtension, 

39 VersionTuple, 

40) 

41from lsst.daf.butler.registry.versions import ButlerVersionsManager 

42from lsst.daf.butler.tests.utils import makeTestTempDir, removeTestTempDir 

43 

44TESTDIR = os.path.abspath(os.path.dirname(__file__)) 

45 

46# Assorted version numbers used throughout the tests 

47V_1_0_0 = VersionTuple(major=1, minor=0, patch=0) 

48V_1_0_1 = VersionTuple(major=1, minor=0, patch=1) 

49V_1_1_0 = VersionTuple(major=1, minor=1, patch=0) 

50V_2_0_0 = VersionTuple(major=2, minor=0, patch=0) 

51V_2_0_1 = VersionTuple(major=2, minor=0, patch=1) 

52 

53 

54class Manager0(VersionedExtension): 

55 """Versioned extension implementation for tests.""" 

56 

57 @classmethod 

58 def currentVersions(cls) -> list[VersionTuple]: 

59 return [] 

60 

61 

62class Manager1(VersionedExtension): 

63 """Versioned extension implementation for tests.""" 

64 

65 @classmethod 

66 def currentVersions(cls) -> list[VersionTuple]: 

67 return [V_1_0_0] 

68 

69 

70class Manager1_1(VersionedExtension): # noqa: N801 

71 """Versioned extension implementation for tests.""" 

72 

73 @classmethod 

74 def currentVersions(cls) -> list[VersionTuple]: 

75 return [V_1_1_0] 

76 

77 

78class Manager2(VersionedExtension): 

79 """Versioned extension implementation for tests. 

80 

81 This extension supports two schema versions. 

82 """ 

83 

84 @classmethod 

85 def currentVersions(cls) -> list[VersionTuple]: 

86 return [V_1_0_0, V_2_0_0] 

87 

88 @classmethod 

89 def _newDefaultSchemaVersion(cls) -> VersionTuple: 

90 return V_1_0_0 

91 

92 

93class SchemaVersioningTestCase(unittest.TestCase): 

94 """Tests for schema versioning classes.""" 

95 

96 def setUp(self): 

97 self.root = makeTestTempDir(TESTDIR) 

98 

99 def tearDown(self): 

100 removeTestTempDir(self.root) 

101 

102 def makeEmptyDatabase(self, origin: int = 0) -> Database: 

103 _, filename = tempfile.mkstemp(dir=self.root, suffix=".sqlite3") 

104 engine = SqliteDatabase.makeEngine(filename=filename) 

105 db = SqliteDatabase.fromEngine(engine=engine, origin=origin) 

106 self.addCleanup(db.dispose) 

107 return db 

108 

109 def test_new_schema(self) -> None: 

110 """Test for creating new database schema.""" 

111 # Check that managers know what schema versions they can make. 

112 Manager1.checkNewSchemaVersion(V_1_0_0) 

113 Manager2.checkNewSchemaVersion(V_1_0_0) 

114 Manager2.checkNewSchemaVersion(V_2_0_0) 

115 with self.assertRaises(IncompatibleVersionError): 

116 Manager1.checkNewSchemaVersion(V_1_0_1) 

117 with self.assertRaises(IncompatibleVersionError): 

118 Manager1.checkNewSchemaVersion(V_1_1_0) 

119 with self.assertRaises(IncompatibleVersionError): 

120 Manager2.checkNewSchemaVersion(V_1_0_1) 

121 

122 manager_versions = ( 

123 ((None, V_1_0_0), (None, V_1_0_0)), 

124 ((V_1_0_0, V_1_0_0), (V_1_0_0, V_1_0_0)), 

125 ((None, V_1_0_0), (V_2_0_0, V_2_0_0)), 

126 ) 

127 

128 for (v1, result1), (v2, result2) in manager_versions: 

129 # This is roughly what RegistryManagerTypes.makeRepo does. 

130 if v1 is not None: 

131 Manager1.checkNewSchemaVersion(v1) 

132 if v2 is not None: 

133 Manager2.checkNewSchemaVersion(v2) 

134 manager0 = Manager0() 

135 manager1 = Manager1(registry_schema_version=v1) 

136 manager2 = Manager2(registry_schema_version=v2) 

137 self.assertEqual(manager1.newSchemaVersion(), result1) 

138 self.assertEqual(manager2.newSchemaVersion(), result2) 

139 

140 database = self.makeEmptyDatabase() 

141 with database.declareStaticTables(create=True) as context: 

142 attributes = DefaultButlerAttributeManager.initialize(database, context) 

143 

144 vmgr = ButlerVersionsManager(attributes) 

145 vmgr.storeManagersConfig({"manager0": manager0, "manager1": manager1, "manager2": manager2}) 

146 

147 attr_dict = dict(attributes.items()) 

148 expected = { 

149 "config:registry.managers.manager0": Manager0.extensionName(), 

150 "config:registry.managers.manager1": Manager1.extensionName(), 

151 "config:registry.managers.manager2": Manager2.extensionName(), 

152 f"version:{Manager1.extensionName()}": str(result1), 

153 f"version:{Manager2.extensionName()}": str(result2), 

154 } 

155 self.assertEqual(attr_dict, expected) 

156 

157 def test_existing_schema(self) -> None: 

158 """Test for reading manager versions from existing database.""" 

159 manager_versions = ( 

160 ((None, V_1_0_0), (None, V_1_0_0)), 

161 ((V_1_0_0, V_1_0_0), (V_1_0_0, V_1_0_0)), 

162 ) 

163 

164 for (v1, result1), (v2, result2) in manager_versions: 

165 # This is roughly what RegistryManagerTypes.loadRepo does. 

166 if v1 is not None: 

167 Manager1.checkNewSchemaVersion(v1) 

168 if v2 is not None: 

169 Manager2.checkNewSchemaVersion(v2) 

170 manager0 = Manager0() 

171 manager1 = Manager1(registry_schema_version=v1) 

172 manager2 = Manager2(registry_schema_version=v2) 

173 

174 # Create new schema first. 

175 database = self.makeEmptyDatabase() 

176 with database.declareStaticTables(create=True) as context: 

177 attributes = DefaultButlerAttributeManager.initialize(database, context) 

178 

179 vmgr = ButlerVersionsManager(attributes) 

180 vmgr.storeManagersConfig({"manager0": manager0, "manager1": manager1, "manager2": manager2}) 

181 

182 # Switch to reading existing manager configs/versions. 

183 with database.declareStaticTables(create=False) as context: 

184 attributes = DefaultButlerAttributeManager.initialize(database, context) 

185 

186 vmgr = ButlerVersionsManager(attributes) 

187 vmgr.checkManagersConfig({"manager0": Manager0, "manager1": Manager1, "manager2": Manager2}) 

188 versions = vmgr.managerVersions() 

189 

190 Manager1.checkCompatibility(result1, database.isWriteable()) 

191 Manager2.checkCompatibility(result2, database.isWriteable()) 

192 

193 # Make manager instances using versions from registry. 

194 manager0 = Manager0(registry_schema_version=versions.get("manager0")) 

195 manager1 = Manager1(registry_schema_version=versions.get("manager1")) 

196 manager2 = Manager2(registry_schema_version=versions.get("manager2")) 

197 self.assertIsNone(manager0._registry_schema_version) 

198 self.assertEqual(manager1._registry_schema_version, result1) 

199 self.assertEqual(manager2._registry_schema_version, result2) 

200 

201 def test_compatibility(self) -> None: 

202 """Test for version compatibility rules.""" 

203 # Manager, version, update, compatible 

204 compat_matrix = ( 

205 (Manager0, V_1_0_0, False, True), 

206 (Manager0, V_1_0_0, True, True), 

207 (Manager1, V_1_0_0, False, True), 

208 (Manager1, V_1_0_0, True, True), 

209 (Manager1, V_1_0_1, False, True), 

210 (Manager1, V_1_0_1, True, True), 

211 (Manager1, V_1_1_0, False, False), 

212 (Manager1, V_1_1_0, True, False), 

213 (Manager1, V_2_0_0, False, False), 

214 (Manager1, V_2_0_0, True, False), 

215 (Manager1_1, V_1_0_0, False, True), 

216 (Manager1_1, V_1_0_0, True, False), 

217 (Manager1_1, V_1_0_1, False, True), 

218 (Manager1_1, V_1_0_1, True, False), 

219 (Manager1_1, V_1_1_0, False, True), 

220 (Manager1_1, V_1_1_0, True, True), 

221 (Manager2, V_1_0_0, False, True), 

222 (Manager2, V_1_0_0, True, True), 

223 (Manager2, V_1_0_1, False, True), 

224 (Manager2, V_1_0_1, True, True), 

225 (Manager2, V_1_1_0, False, False), 

226 (Manager2, V_1_1_0, True, False), 

227 (Manager2, V_2_0_0, False, True), 

228 (Manager2, V_2_0_0, True, True), 

229 ) 

230 

231 for Manager, version, update, compatible in compat_matrix: 

232 with self.subTest(test=repr((Manager, version, update, compatible))): 

233 if compatible: 

234 Manager.checkCompatibility(version, update) 

235 else: 

236 with self.assertRaises(IncompatibleVersionError): 

237 Manager.checkCompatibility(version, update) 

238 

239 

240if __name__ == "__main__": 

241 unittest.main()