Coverage for tests / test_versioning.py: 29%
110 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 08:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 08:49 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28import os
29import os.path
30import tempfile
31import unittest
33from lsst.daf.butler.registry.attributes import DefaultButlerAttributeManager
34from lsst.daf.butler.registry.databases.sqlite import SqliteDatabase
35from lsst.daf.butler.registry.interfaces import (
36 Database,
37 IncompatibleVersionError,
38 VersionedExtension,
39 VersionTuple,
40)
41from lsst.daf.butler.registry.versions import ButlerVersionsManager
42from lsst.daf.butler.tests.utils import makeTestTempDir, removeTestTempDir
44TESTDIR = os.path.abspath(os.path.dirname(__file__))
46# Assorted version numbers used throughout the tests
47V_1_0_0 = VersionTuple(major=1, minor=0, patch=0)
48V_1_0_1 = VersionTuple(major=1, minor=0, patch=1)
49V_1_1_0 = VersionTuple(major=1, minor=1, patch=0)
50V_2_0_0 = VersionTuple(major=2, minor=0, patch=0)
51V_2_0_1 = VersionTuple(major=2, minor=0, patch=1)
54class Manager0(VersionedExtension):
55 """Versioned extension implementation for tests."""
57 @classmethod
58 def currentVersions(cls) -> list[VersionTuple]:
59 return []
62class Manager1(VersionedExtension):
63 """Versioned extension implementation for tests."""
65 @classmethod
66 def currentVersions(cls) -> list[VersionTuple]:
67 return [V_1_0_0]
70class Manager1_1(VersionedExtension): # noqa: N801
71 """Versioned extension implementation for tests."""
73 @classmethod
74 def currentVersions(cls) -> list[VersionTuple]:
75 return [V_1_1_0]
78class Manager2(VersionedExtension):
79 """Versioned extension implementation for tests.
81 This extension supports two schema versions.
82 """
84 @classmethod
85 def currentVersions(cls) -> list[VersionTuple]:
86 return [V_1_0_0, V_2_0_0]
88 @classmethod
89 def _newDefaultSchemaVersion(cls) -> VersionTuple:
90 return V_1_0_0
93class SchemaVersioningTestCase(unittest.TestCase):
94 """Tests for schema versioning classes."""
96 def setUp(self):
97 self.root = makeTestTempDir(TESTDIR)
99 def tearDown(self):
100 removeTestTempDir(self.root)
102 def makeEmptyDatabase(self, origin: int = 0) -> Database:
103 _, filename = tempfile.mkstemp(dir=self.root, suffix=".sqlite3")
104 engine = SqliteDatabase.makeEngine(filename=filename)
105 db = SqliteDatabase.fromEngine(engine=engine, origin=origin)
106 self.addCleanup(db.dispose)
107 return db
109 def test_new_schema(self) -> None:
110 """Test for creating new database schema."""
111 # Check that managers know what schema versions they can make.
112 Manager1.checkNewSchemaVersion(V_1_0_0)
113 Manager2.checkNewSchemaVersion(V_1_0_0)
114 Manager2.checkNewSchemaVersion(V_2_0_0)
115 with self.assertRaises(IncompatibleVersionError):
116 Manager1.checkNewSchemaVersion(V_1_0_1)
117 with self.assertRaises(IncompatibleVersionError):
118 Manager1.checkNewSchemaVersion(V_1_1_0)
119 with self.assertRaises(IncompatibleVersionError):
120 Manager2.checkNewSchemaVersion(V_1_0_1)
122 manager_versions = (
123 ((None, V_1_0_0), (None, V_1_0_0)),
124 ((V_1_0_0, V_1_0_0), (V_1_0_0, V_1_0_0)),
125 ((None, V_1_0_0), (V_2_0_0, V_2_0_0)),
126 )
128 for (v1, result1), (v2, result2) in manager_versions:
129 # This is roughly what RegistryManagerTypes.makeRepo does.
130 if v1 is not None:
131 Manager1.checkNewSchemaVersion(v1)
132 if v2 is not None:
133 Manager2.checkNewSchemaVersion(v2)
134 manager0 = Manager0()
135 manager1 = Manager1(registry_schema_version=v1)
136 manager2 = Manager2(registry_schema_version=v2)
137 self.assertEqual(manager1.newSchemaVersion(), result1)
138 self.assertEqual(manager2.newSchemaVersion(), result2)
140 database = self.makeEmptyDatabase()
141 with database.declareStaticTables(create=True) as context:
142 attributes = DefaultButlerAttributeManager.initialize(database, context)
144 vmgr = ButlerVersionsManager(attributes)
145 vmgr.storeManagersConfig({"manager0": manager0, "manager1": manager1, "manager2": manager2})
147 attr_dict = dict(attributes.items())
148 expected = {
149 "config:registry.managers.manager0": Manager0.extensionName(),
150 "config:registry.managers.manager1": Manager1.extensionName(),
151 "config:registry.managers.manager2": Manager2.extensionName(),
152 f"version:{Manager1.extensionName()}": str(result1),
153 f"version:{Manager2.extensionName()}": str(result2),
154 }
155 self.assertEqual(attr_dict, expected)
157 def test_existing_schema(self) -> None:
158 """Test for reading manager versions from existing database."""
159 manager_versions = (
160 ((None, V_1_0_0), (None, V_1_0_0)),
161 ((V_1_0_0, V_1_0_0), (V_1_0_0, V_1_0_0)),
162 )
164 for (v1, result1), (v2, result2) in manager_versions:
165 # This is roughly what RegistryManagerTypes.loadRepo does.
166 if v1 is not None:
167 Manager1.checkNewSchemaVersion(v1)
168 if v2 is not None:
169 Manager2.checkNewSchemaVersion(v2)
170 manager0 = Manager0()
171 manager1 = Manager1(registry_schema_version=v1)
172 manager2 = Manager2(registry_schema_version=v2)
174 # Create new schema first.
175 database = self.makeEmptyDatabase()
176 with database.declareStaticTables(create=True) as context:
177 attributes = DefaultButlerAttributeManager.initialize(database, context)
179 vmgr = ButlerVersionsManager(attributes)
180 vmgr.storeManagersConfig({"manager0": manager0, "manager1": manager1, "manager2": manager2})
182 # Switch to reading existing manager configs/versions.
183 with database.declareStaticTables(create=False) as context:
184 attributes = DefaultButlerAttributeManager.initialize(database, context)
186 vmgr = ButlerVersionsManager(attributes)
187 vmgr.checkManagersConfig({"manager0": Manager0, "manager1": Manager1, "manager2": Manager2})
188 versions = vmgr.managerVersions()
190 Manager1.checkCompatibility(result1, database.isWriteable())
191 Manager2.checkCompatibility(result2, database.isWriteable())
193 # Make manager instances using versions from registry.
194 manager0 = Manager0(registry_schema_version=versions.get("manager0"))
195 manager1 = Manager1(registry_schema_version=versions.get("manager1"))
196 manager2 = Manager2(registry_schema_version=versions.get("manager2"))
197 self.assertIsNone(manager0._registry_schema_version)
198 self.assertEqual(manager1._registry_schema_version, result1)
199 self.assertEqual(manager2._registry_schema_version, result2)
201 def test_compatibility(self) -> None:
202 """Test for version compatibility rules."""
203 # Manager, version, update, compatible
204 compat_matrix = (
205 (Manager0, V_1_0_0, False, True),
206 (Manager0, V_1_0_0, True, True),
207 (Manager1, V_1_0_0, False, True),
208 (Manager1, V_1_0_0, True, True),
209 (Manager1, V_1_0_1, False, True),
210 (Manager1, V_1_0_1, True, True),
211 (Manager1, V_1_1_0, False, False),
212 (Manager1, V_1_1_0, True, False),
213 (Manager1, V_2_0_0, False, False),
214 (Manager1, V_2_0_0, True, False),
215 (Manager1_1, V_1_0_0, False, True),
216 (Manager1_1, V_1_0_0, True, False),
217 (Manager1_1, V_1_0_1, False, True),
218 (Manager1_1, V_1_0_1, True, False),
219 (Manager1_1, V_1_1_0, False, True),
220 (Manager1_1, V_1_1_0, True, True),
221 (Manager2, V_1_0_0, False, True),
222 (Manager2, V_1_0_0, True, True),
223 (Manager2, V_1_0_1, False, True),
224 (Manager2, V_1_0_1, True, True),
225 (Manager2, V_1_1_0, False, False),
226 (Manager2, V_1_1_0, True, False),
227 (Manager2, V_2_0_0, False, True),
228 (Manager2, V_2_0_0, True, True),
229 )
231 for Manager, version, update, compatible in compat_matrix:
232 with self.subTest(test=repr((Manager, version, update, compatible))):
233 if compatible:
234 Manager.checkCompatibility(version, update)
235 else:
236 with self.assertRaises(IncompatibleVersionError):
237 Manager.checkCompatibility(version, update)
240if __name__ == "__main__":
241 unittest.main()