Coverage for python / lsst / daf / butler / tests / _dummyRegistry.py: 25%
110 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 08:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 08:49 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
27from __future__ import annotations
29__all__ = ("DummyRegistry",)
31from collections.abc import Iterable, Iterator
32from typing import Any
34from lsst.daf.butler import DimensionUniverse, ddl
35from lsst.daf.butler.registry.bridge.ephemeral import EphemeralDatastoreRegistryBridge
36from lsst.daf.butler.registry.interfaces import (
37 Database,
38 DatabaseInsertMode,
39 DatasetIdRef,
40 DatasetRecordStorageManager,
41 DatastoreRegistryBridge,
42 DatastoreRegistryBridgeManager,
43 OpaqueTableStorage,
44 OpaqueTableStorageManager,
45 StaticTablesContext,
46 VersionTuple,
47)
49from ..datastore import DatastoreTransaction
52class DummyOpaqueTableStorage(OpaqueTableStorage):
53 def __init__(self, name: str, spec: ddl.TableSpec) -> None:
54 super().__init__(name=name)
55 self._rows: list[dict] = []
56 self._spec = spec
58 def insert(self, *data: dict, transaction: DatastoreTransaction | None = None) -> None:
59 # Docstring inherited from OpaqueTableStorage.
60 self._insert(*data, transaction=transaction, insert_mode=DatabaseInsertMode.INSERT)
62 def replace(self, *data: dict, transaction: DatastoreTransaction | None = None) -> None:
63 # Docstring inherited from OpaqueTableStorage.
64 self._insert(*data, transaction=transaction, insert_mode=DatabaseInsertMode.REPLACE)
66 def ensure(self, *data: dict, transaction: DatastoreTransaction | None = None) -> None:
67 # Docstring inherited from OpaqueTableStorage.
68 self._insert(*data, transaction=transaction, insert_mode=DatabaseInsertMode.ENSURE)
70 def _insert(
71 self,
72 *data: dict,
73 transaction: DatastoreTransaction | None = None,
74 insert_mode: DatabaseInsertMode = DatabaseInsertMode.INSERT,
75 ) -> None:
76 uniqueConstraints = list(self._spec.unique)
77 uniqueConstraints.append(tuple(field.name for field in self._spec.fields if field.primaryKey))
78 for d in data:
79 skipping = False
80 for constraint in uniqueConstraints:
81 matching = list(self.fetch(**{k: d[k] for k in constraint}))
82 if len(matching) != 0:
83 match insert_mode:
84 case DatabaseInsertMode.INSERT:
85 raise RuntimeError(
86 f"Unique constraint {constraint} violation in external table {self.name}."
87 )
88 case DatabaseInsertMode.ENSURE:
89 # Row already exists. Skip.
90 skipping = True
91 case DatabaseInsertMode.REPLACE:
92 # Should try to put these rows back on transaction
93 # rollback...
94 self.delete([], *matching)
95 case _:
96 raise ValueError(f"Unrecognized insert mode: {insert_mode}.")
98 if skipping:
99 continue
100 self._rows.append(d)
101 if transaction is not None:
102 transaction.registerUndo("insert", self.delete, [], d)
104 def fetch(self, **where: Any) -> Iterator[dict]:
105 # Docstring inherited from OpaqueTableStorage.
106 where = where.copy() # May need to modify it.
108 # Can support an IN operator if given list.
109 wherein = {}
110 for k in list(where):
111 if isinstance(where[k], tuple | list | set):
112 wherein[k] = set(where[k])
113 del where[k]
115 for d in self._rows:
116 if all(d[k] == v for k, v in where.items()):
117 if wherein:
118 match = True
119 for k, v in wherein.items():
120 if d[k] not in v:
121 match = False
122 break
123 if match:
124 yield d
125 else:
126 yield d
128 def delete(self, columns: Iterable[str], *rows: dict) -> None:
129 # Docstring inherited from OpaqueTableStorage.
130 kept_rows = []
131 for table_row in self._rows:
132 for where_row in rows:
133 if all(table_row[k] == v for k, v in where_row.items()):
134 break
135 else:
136 kept_rows.append(table_row)
137 self._rows = kept_rows
140class DummyOpaqueTableStorageManager(OpaqueTableStorageManager):
141 def __init__(self, registry_schema_version: VersionTuple | None = None) -> None:
142 super().__init__(registry_schema_version=registry_schema_version)
143 self._storages: dict[str, DummyOpaqueTableStorage] = {}
145 def clone(self, db: Database) -> OpaqueTableStorageManager:
146 return self
148 @classmethod
149 def initialize(
150 cls, db: Database, context: StaticTablesContext, registry_schema_version: VersionTuple | None = None
151 ) -> OpaqueTableStorageManager:
152 # Docstring inherited from OpaqueTableStorageManager.
153 # Not used, but needed to satisfy ABC requirement.
154 return cls(registry_schema_version=registry_schema_version)
156 def get(self, name: str) -> OpaqueTableStorage | None:
157 # Docstring inherited from OpaqueTableStorageManager.
158 return self._storages.get(name)
160 def register(self, name: str, spec: ddl.TableSpec) -> OpaqueTableStorage:
161 # Docstring inherited from OpaqueTableStorageManager.
162 return self._storages.setdefault(name, DummyOpaqueTableStorage(name, spec))
164 @classmethod
165 def currentVersions(cls) -> list[VersionTuple]:
166 # Docstring inherited from VersionedExtension.
167 return []
170class DummyDatastoreRegistryBridgeManager(DatastoreRegistryBridgeManager):
171 def __init__(
172 self,
173 opaque: OpaqueTableStorageManager,
174 universe: DimensionUniverse,
175 registry_schema_version: VersionTuple | None = None,
176 ):
177 super().__init__(
178 opaque=opaque,
179 universe=universe,
180 registry_schema_version=registry_schema_version,
181 )
182 self._bridges: dict[str, EphemeralDatastoreRegistryBridge] = {}
184 def clone(self, *, db: Database, opaque: OpaqueTableStorageManager) -> DatastoreRegistryBridgeManager:
185 return DummyDatastoreRegistryBridgeManager(
186 opaque=opaque,
187 universe=self.universe,
188 registry_schema_version=self._registry_schema_version,
189 )
191 @classmethod
192 def initialize(
193 cls,
194 db: Database,
195 context: StaticTablesContext,
196 *,
197 opaque: OpaqueTableStorageManager,
198 datasets: type[DatasetRecordStorageManager],
199 universe: DimensionUniverse,
200 registry_schema_version: VersionTuple | None = None,
201 ) -> DatastoreRegistryBridgeManager:
202 # Docstring inherited from DatastoreRegistryBridgeManager
203 # Not used, but needed to satisfy ABC requirement.
204 return cls(
205 opaque=opaque,
206 universe=universe,
207 registry_schema_version=registry_schema_version,
208 )
210 def refresh(self) -> None:
211 # Docstring inherited from DatastoreRegistryBridgeManager
212 pass
214 def register(self, name: str, *, ephemeral: bool = False) -> DatastoreRegistryBridge:
215 # Docstring inherited from DatastoreRegistryBridgeManager
216 return self._bridges.setdefault(name, EphemeralDatastoreRegistryBridge(name))
218 def findDatastores(self, ref: DatasetIdRef) -> Iterable[str]:
219 # Docstring inherited from DatastoreRegistryBridgeManager
220 for name, bridge in self._bridges.items():
221 if ref in bridge:
222 yield name
224 @classmethod
225 def currentVersions(cls) -> list[VersionTuple]:
226 # Docstring inherited from VersionedExtension.
227 return []
230class DummyRegistry:
231 """Dummy Registry, for Datastore test purposes."""
233 def __init__(self) -> None:
234 self._opaque = DummyOpaqueTableStorageManager()
235 self.dimensions = DimensionUniverse()
236 self._datastoreBridges = DummyDatastoreRegistryBridgeManager(self._opaque, self.dimensions, None)
238 def getDatastoreBridgeManager(self) -> DatastoreRegistryBridgeManager:
239 return self._datastoreBridges