Coverage for python / lsst / daf / butler / tests / _dummyRegistry.py: 25%

110 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-22 08:55 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27from __future__ import annotations 

28 

29__all__ = ("DummyRegistry",) 

30 

31from collections.abc import Iterable, Iterator 

32from typing import Any 

33 

34from lsst.daf.butler import DimensionUniverse, ddl 

35from lsst.daf.butler.registry.bridge.ephemeral import EphemeralDatastoreRegistryBridge 

36from lsst.daf.butler.registry.interfaces import ( 

37 Database, 

38 DatabaseInsertMode, 

39 DatasetIdRef, 

40 DatasetRecordStorageManager, 

41 DatastoreRegistryBridge, 

42 DatastoreRegistryBridgeManager, 

43 OpaqueTableStorage, 

44 OpaqueTableStorageManager, 

45 StaticTablesContext, 

46 VersionTuple, 

47) 

48 

49from ..datastore import DatastoreTransaction 

50 

51 

52class DummyOpaqueTableStorage(OpaqueTableStorage): 

53 def __init__(self, name: str, spec: ddl.TableSpec) -> None: 

54 super().__init__(name=name) 

55 self._rows: list[dict] = [] 

56 self._spec = spec 

57 

58 def insert(self, *data: dict, transaction: DatastoreTransaction | None = None) -> None: 

59 # Docstring inherited from OpaqueTableStorage. 

60 self._insert(*data, transaction=transaction, insert_mode=DatabaseInsertMode.INSERT) 

61 

62 def replace(self, *data: dict, transaction: DatastoreTransaction | None = None) -> None: 

63 # Docstring inherited from OpaqueTableStorage. 

64 self._insert(*data, transaction=transaction, insert_mode=DatabaseInsertMode.REPLACE) 

65 

66 def ensure(self, *data: dict, transaction: DatastoreTransaction | None = None) -> None: 

67 # Docstring inherited from OpaqueTableStorage. 

68 self._insert(*data, transaction=transaction, insert_mode=DatabaseInsertMode.ENSURE) 

69 

70 def _insert( 

71 self, 

72 *data: dict, 

73 transaction: DatastoreTransaction | None = None, 

74 insert_mode: DatabaseInsertMode = DatabaseInsertMode.INSERT, 

75 ) -> None: 

76 uniqueConstraints = list(self._spec.unique) 

77 uniqueConstraints.append(tuple(field.name for field in self._spec.fields if field.primaryKey)) 

78 for d in data: 

79 skipping = False 

80 for constraint in uniqueConstraints: 

81 matching = list(self.fetch(**{k: d[k] for k in constraint})) 

82 if len(matching) != 0: 

83 match insert_mode: 

84 case DatabaseInsertMode.INSERT: 

85 raise RuntimeError( 

86 f"Unique constraint {constraint} violation in external table {self.name}." 

87 ) 

88 case DatabaseInsertMode.ENSURE: 

89 # Row already exists. Skip. 

90 skipping = True 

91 case DatabaseInsertMode.REPLACE: 

92 # Should try to put these rows back on transaction 

93 # rollback... 

94 self.delete([], *matching) 

95 case _: 

96 raise ValueError(f"Unrecognized insert mode: {insert_mode}.") 

97 

98 if skipping: 

99 continue 

100 self._rows.append(d) 

101 if transaction is not None: 

102 transaction.registerUndo("insert", self.delete, [], d) 

103 

104 def fetch(self, **where: Any) -> Iterator[dict]: 

105 # Docstring inherited from OpaqueTableStorage. 

106 where = where.copy() # May need to modify it. 

107 

108 # Can support an IN operator if given list. 

109 wherein = {} 

110 for k in list(where): 

111 if isinstance(where[k], tuple | list | set): 

112 wherein[k] = set(where[k]) 

113 del where[k] 

114 

115 for d in self._rows: 

116 if all(d[k] == v for k, v in where.items()): 

117 if wherein: 

118 match = True 

119 for k, v in wherein.items(): 

120 if d[k] not in v: 

121 match = False 

122 break 

123 if match: 

124 yield d 

125 else: 

126 yield d 

127 

128 def delete(self, columns: Iterable[str], *rows: dict) -> None: 

129 # Docstring inherited from OpaqueTableStorage. 

130 kept_rows = [] 

131 for table_row in self._rows: 

132 for where_row in rows: 

133 if all(table_row[k] == v for k, v in where_row.items()): 

134 break 

135 else: 

136 kept_rows.append(table_row) 

137 self._rows = kept_rows 

138 

139 

140class DummyOpaqueTableStorageManager(OpaqueTableStorageManager): 

141 def __init__(self, registry_schema_version: VersionTuple | None = None) -> None: 

142 super().__init__(registry_schema_version=registry_schema_version) 

143 self._storages: dict[str, DummyOpaqueTableStorage] = {} 

144 

145 def clone(self, db: Database) -> OpaqueTableStorageManager: 

146 return self 

147 

148 @classmethod 

149 def initialize( 

150 cls, db: Database, context: StaticTablesContext, registry_schema_version: VersionTuple | None = None 

151 ) -> OpaqueTableStorageManager: 

152 # Docstring inherited from OpaqueTableStorageManager. 

153 # Not used, but needed to satisfy ABC requirement. 

154 return cls(registry_schema_version=registry_schema_version) 

155 

156 def get(self, name: str) -> OpaqueTableStorage | None: 

157 # Docstring inherited from OpaqueTableStorageManager. 

158 return self._storages.get(name) 

159 

160 def register(self, name: str, spec: ddl.TableSpec) -> OpaqueTableStorage: 

161 # Docstring inherited from OpaqueTableStorageManager. 

162 return self._storages.setdefault(name, DummyOpaqueTableStorage(name, spec)) 

163 

164 @classmethod 

165 def currentVersions(cls) -> list[VersionTuple]: 

166 # Docstring inherited from VersionedExtension. 

167 return [] 

168 

169 

170class DummyDatastoreRegistryBridgeManager(DatastoreRegistryBridgeManager): 

171 def __init__( 

172 self, 

173 opaque: OpaqueTableStorageManager, 

174 universe: DimensionUniverse, 

175 registry_schema_version: VersionTuple | None = None, 

176 ): 

177 super().__init__( 

178 opaque=opaque, 

179 universe=universe, 

180 registry_schema_version=registry_schema_version, 

181 ) 

182 self._bridges: dict[str, EphemeralDatastoreRegistryBridge] = {} 

183 

184 def clone(self, *, db: Database, opaque: OpaqueTableStorageManager) -> DatastoreRegistryBridgeManager: 

185 return DummyDatastoreRegistryBridgeManager( 

186 opaque=opaque, 

187 universe=self.universe, 

188 registry_schema_version=self._registry_schema_version, 

189 ) 

190 

191 @classmethod 

192 def initialize( 

193 cls, 

194 db: Database, 

195 context: StaticTablesContext, 

196 *, 

197 opaque: OpaqueTableStorageManager, 

198 datasets: type[DatasetRecordStorageManager], 

199 universe: DimensionUniverse, 

200 registry_schema_version: VersionTuple | None = None, 

201 ) -> DatastoreRegistryBridgeManager: 

202 # Docstring inherited from DatastoreRegistryBridgeManager 

203 # Not used, but needed to satisfy ABC requirement. 

204 return cls( 

205 opaque=opaque, 

206 universe=universe, 

207 registry_schema_version=registry_schema_version, 

208 ) 

209 

210 def refresh(self) -> None: 

211 # Docstring inherited from DatastoreRegistryBridgeManager 

212 pass 

213 

214 def register(self, name: str, *, ephemeral: bool = False) -> DatastoreRegistryBridge: 

215 # Docstring inherited from DatastoreRegistryBridgeManager 

216 return self._bridges.setdefault(name, EphemeralDatastoreRegistryBridge(name)) 

217 

218 def findDatastores(self, ref: DatasetIdRef) -> Iterable[str]: 

219 # Docstring inherited from DatastoreRegistryBridgeManager 

220 for name, bridge in self._bridges.items(): 

221 if ref in bridge: 

222 yield name 

223 

224 @classmethod 

225 def currentVersions(cls) -> list[VersionTuple]: 

226 # Docstring inherited from VersionedExtension. 

227 return [] 

228 

229 

230class DummyRegistry: 

231 """Dummy Registry, for Datastore test purposes.""" 

232 

233 def __init__(self) -> None: 

234 self._opaque = DummyOpaqueTableStorageManager() 

235 self.dimensions = DimensionUniverse() 

236 self._datastoreBridges = DummyDatastoreRegistryBridgeManager(self._opaque, self.dimensions, None) 

237 

238 def getDatastoreBridgeManager(self) -> DatastoreRegistryBridgeManager: 

239 return self._datastoreBridges