Coverage for python / lsst / daf / butler / tests / hybrid_butler_registry.py: 0%

117 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-26 08:48 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30import contextlib 

31from collections.abc import Iterable, Iterator, Mapping, Sequence 

32from typing import Any 

33 

34from .._collection_type import CollectionType 

35from .._dataset_association import DatasetAssociation 

36from .._dataset_ref import DatasetId, DatasetIdGenEnum, DatasetRef 

37from .._dataset_type import DatasetType 

38from .._storage_class import StorageClassFactory 

39from .._timespan import Timespan 

40from ..dimensions import ( 

41 DataCoordinate, 

42 DataId, 

43 DimensionElement, 

44 DimensionGroup, 

45 DimensionRecord, 

46 DimensionUniverse, 

47) 

48from ..registry import CollectionArgType, CollectionSummary, Registry, RegistryDefaults 

49from ..registry.queries import ( 

50 DataCoordinateQueryResults, 

51 DatasetQueryResults, 

52 DimensionRecordQueryResults, 

53) 

54from ..registry.sql_registry import SqlRegistry 

55 

56 

57class HybridButlerRegistry(Registry): 

58 """A `Registry` that delegates methods to internal `RemoteButlerRegistry` 

59 and `SqlRegistry` instances. Intended to allow testing of `RemoteButler` 

60 before its implementation is complete, by delegating unsupported methods to 

61 the direct `SqlRegistry`. 

62 

63 Parameters 

64 ---------- 

65 direct : `SqlRegistry` 

66 DirectButler SqlRegistry used to provide methods not yet implemented by 

67 RemoteButlerRegistry. 

68 

69 remote : `Registry` 

70 The RemoteButler Registry implementation we are intending to test. 

71 """ 

72 

73 def __init__(self, direct: SqlRegistry, remote: Registry): 

74 self._direct = direct 

75 self._remote = remote 

76 

77 def isWriteable(self) -> bool: 

78 return self._remote.isWriteable() 

79 

80 @property 

81 def dimensions(self) -> DimensionUniverse: 

82 return self._remote.dimensions 

83 

84 @property 

85 def defaults(self) -> RegistryDefaults: 

86 return self._remote.defaults 

87 

88 @defaults.setter 

89 def defaults(self, value: RegistryDefaults) -> None: 

90 # Make a copy before assigning the value. 

91 # When assigned, it will have finish() called on it -- we don't want to 

92 # intermingle the results of that between Remote and Direct, because 

93 # that could let the Remote side cheat. 

94 copy = RegistryDefaults(value.collections, value.run, value._infer, **value._kwargs) 

95 self._remote.defaults = value 

96 self._direct.defaults = copy 

97 

98 def refresh(self) -> None: 

99 self._direct.refresh() 

100 

101 def refresh_collection_summaries(self) -> None: 

102 self._direct.refresh_collection_summaries() 

103 

104 @contextlib.contextmanager 

105 def caching_context(self) -> Iterator[None]: 

106 with self._direct.caching_context(): 

107 with self._remote.caching_context(): 

108 yield 

109 

110 @contextlib.contextmanager 

111 def transaction(self, *, savepoint: bool = False) -> Iterator[None]: 

112 # RemoteButler doesn't support transactions, and if the direct registry 

113 # enters one its changes are invisible to the remote side. 

114 raise NotImplementedError() 

115 

116 def registerCollection( 

117 self, name: str, type: CollectionType = CollectionType.TAGGED, doc: str | None = None 

118 ) -> bool: 

119 return self._direct.registerCollection(name, type, doc) 

120 

121 def getCollectionType(self, name: str) -> CollectionType: 

122 return self._remote.getCollectionType(name) 

123 

124 def registerRun(self, name: str, doc: str | None = None) -> bool: 

125 return self._direct.registerRun(name, doc) 

126 

127 def removeCollection(self, name: str) -> None: 

128 return self._direct.removeCollection(name) 

129 

130 def getCollectionChain(self, parent: str) -> Sequence[str]: 

131 return self._remote.getCollectionChain(parent) 

132 

133 def setCollectionChain(self, parent: str, children: Any, *, flatten: bool = False) -> None: 

134 return self._direct.setCollectionChain(parent, children, flatten=flatten) 

135 

136 def getCollectionParentChains(self, collection: str) -> set[str]: 

137 return self._remote.getCollectionParentChains(collection) 

138 

139 def getCollectionDocumentation(self, collection: str) -> str | None: 

140 return self._remote.getCollectionDocumentation(collection) 

141 

142 def setCollectionDocumentation(self, collection: str, doc: str | None) -> None: 

143 return self._direct.setCollectionDocumentation(collection, doc) 

144 

145 def getCollectionSummary(self, collection: str) -> CollectionSummary: 

146 return self._remote.getCollectionSummary(collection) 

147 

148 def registerDatasetType(self, datasetType: DatasetType) -> bool: 

149 # We need to make sure that dataset type universe is the same as 

150 # direct registry universe. 

151 if datasetType.dimensions.universe is self._remote.dimensions: 

152 datasetType = DatasetType( 

153 datasetType.name, 

154 datasetType.dimensions.names, 

155 datasetType.storageClass, 

156 universe=self._direct.dimensions, 

157 isCalibration=datasetType.isCalibration(), 

158 ) 

159 return self._direct.registerDatasetType(datasetType) 

160 

161 def removeDatasetType(self, name: str | tuple[str, ...]) -> None: 

162 return self._direct.removeDatasetType(name) 

163 

164 def getDatasetType(self, name: str) -> DatasetType: 

165 return self._remote.getDatasetType(name) 

166 

167 def supportsIdGenerationMode(self, mode: DatasetIdGenEnum) -> bool: 

168 return self._direct.supportsIdGenerationMode(mode) 

169 

170 def findDataset( 

171 self, 

172 datasetType: DatasetType | str, 

173 dataId: DataId | None = None, 

174 *, 

175 collections: CollectionArgType | None = None, 

176 timespan: Timespan | None = None, 

177 datastore_records: bool = False, 

178 **kwargs: Any, 

179 ) -> DatasetRef | None: 

180 return self._remote.findDataset( 

181 datasetType, 

182 dataId, 

183 collections=collections, 

184 timespan=timespan, 

185 datastore_records=datastore_records, 

186 **kwargs, 

187 ) 

188 

189 def insertDatasets( 

190 self, 

191 datasetType: DatasetType | str, 

192 dataIds: Iterable[DataId], 

193 run: str | None = None, 

194 expand: bool = True, 

195 idGenerationMode: DatasetIdGenEnum = DatasetIdGenEnum.UNIQUE, 

196 ) -> list[DatasetRef]: 

197 return self._direct.insertDatasets(datasetType, dataIds, run, expand, idGenerationMode) 

198 

199 def _importDatasets( 

200 self, datasets: Iterable[DatasetRef], expand: bool = True, assume_new: bool = False 

201 ) -> list[DatasetRef]: 

202 return self._direct._importDatasets(datasets, expand, assume_new) 

203 

204 def getDataset(self, id: DatasetId) -> DatasetRef | None: 

205 return self._remote.getDataset(id) 

206 

207 def _fetch_run_dataset_ids(self, run: str) -> list[DatasetId]: 

208 return self._direct._fetch_run_dataset_ids(run) 

209 

210 def removeDatasets(self, refs: Iterable[DatasetRef]) -> None: 

211 return self._direct.removeDatasets(refs) 

212 

213 def associate(self, collection: str, refs: Iterable[DatasetRef]) -> None: 

214 return self._direct.associate(collection, refs) 

215 

216 def disassociate(self, collection: str, refs: Iterable[DatasetRef]) -> None: 

217 return self._direct.disassociate(collection, refs) 

218 

219 def certify(self, collection: str, refs: Iterable[DatasetRef], timespan: Timespan) -> None: 

220 return self._direct.certify(collection, refs, timespan) 

221 

222 def decertify( 

223 self, 

224 collection: str, 

225 datasetType: str | DatasetType, 

226 timespan: Timespan, 

227 *, 

228 dataIds: Iterable[DataId] | None = None, 

229 ) -> None: 

230 return self._direct.decertify(collection, datasetType, timespan, dataIds=dataIds) 

231 

232 def getDatasetLocations(self, ref: DatasetRef) -> Iterable[str]: 

233 return self._direct.getDatasetLocations(ref) 

234 

235 def expandDataId( 

236 self, 

237 dataId: DataId | None = None, 

238 *, 

239 dimensions: Iterable[str] | DimensionGroup | None = None, 

240 records: Mapping[str, DimensionRecord | None] | None = None, 

241 withDefaults: bool = True, 

242 **kwargs: Any, 

243 ) -> DataCoordinate: 

244 return self._remote.expandDataId( 

245 dataId, dimensions=dimensions, records=records, withDefaults=withDefaults, **kwargs 

246 ) 

247 

248 def insertDimensionData( 

249 self, 

250 element: DimensionElement | str, 

251 *data: Mapping[str, Any] | DimensionRecord, 

252 conform: bool = True, 

253 replace: bool = False, 

254 skip_existing: bool = False, 

255 ) -> None: 

256 return self._direct.insertDimensionData( 

257 element, *data, conform=conform, replace=replace, skip_existing=skip_existing 

258 ) 

259 

260 def syncDimensionData( 

261 self, 

262 element: DimensionElement | str, 

263 row: Mapping[str, Any] | DimensionRecord, 

264 conform: bool = True, 

265 update: bool = False, 

266 ) -> bool | dict[str, Any]: 

267 return self._direct.syncDimensionData(element, row, conform, update) 

268 

269 def queryDatasetTypes( 

270 self, 

271 expression: Any = ..., 

272 *, 

273 missing: list[str] | None = None, 

274 ) -> Iterable[DatasetType]: 

275 return self._remote.queryDatasetTypes(expression, missing=missing) 

276 

277 def queryCollections( 

278 self, 

279 expression: Any = ..., 

280 datasetType: DatasetType | None = None, 

281 collectionTypes: Iterable[CollectionType] | CollectionType = CollectionType.all(), 

282 flattenChains: bool = False, 

283 includeChains: bool | None = None, 

284 ) -> Sequence[str]: 

285 return self._remote.queryCollections( 

286 expression, datasetType, collectionTypes, flattenChains, includeChains 

287 ) 

288 

289 def queryDatasets( 

290 self, 

291 datasetType: Any, 

292 *, 

293 collections: CollectionArgType | None = None, 

294 dimensions: Iterable[str] | None = None, 

295 dataId: DataId | None = None, 

296 where: str = "", 

297 findFirst: bool = False, 

298 bind: Mapping[str, Any] | None = None, 

299 check: bool = True, 

300 **kwargs: Any, 

301 ) -> DatasetQueryResults: 

302 return self._remote.queryDatasets( 

303 datasetType, 

304 collections=collections, 

305 dimensions=dimensions, 

306 dataId=dataId, 

307 where=where, 

308 findFirst=findFirst, 

309 bind=bind, 

310 check=check, 

311 **kwargs, 

312 ) 

313 

314 def queryDataIds( 

315 self, 

316 dimensions: DimensionGroup | Iterable[str] | str, 

317 *, 

318 dataId: DataId | None = None, 

319 datasets: Any = None, 

320 collections: CollectionArgType | None = None, 

321 where: str = "", 

322 bind: Mapping[str, Any] | None = None, 

323 check: bool = True, 

324 **kwargs: Any, 

325 ) -> DataCoordinateQueryResults: 

326 return self._remote.queryDataIds( 

327 dimensions, 

328 dataId=dataId, 

329 datasets=datasets, 

330 collections=collections, 

331 where=where, 

332 bind=bind, 

333 check=check, 

334 **kwargs, 

335 ) 

336 

337 def queryDimensionRecords( 

338 self, 

339 element: DimensionElement | str, 

340 *, 

341 dataId: DataId | None = None, 

342 datasets: Any = None, 

343 collections: CollectionArgType | None = None, 

344 where: str = "", 

345 bind: Mapping[str, Any] | None = None, 

346 check: bool = True, 

347 **kwargs: Any, 

348 ) -> DimensionRecordQueryResults: 

349 return self._remote.queryDimensionRecords( 

350 element, 

351 dataId=dataId, 

352 datasets=datasets, 

353 collections=collections, 

354 where=where, 

355 bind=bind, 

356 check=check, 

357 **kwargs, 

358 ) 

359 

360 def queryDatasetAssociations( 

361 self, 

362 datasetType: str | DatasetType, 

363 collections: CollectionArgType | None = ..., 

364 *, 

365 collectionTypes: Iterable[CollectionType] = CollectionType.all(), 

366 flattenChains: bool = False, 

367 ) -> Iterator[DatasetAssociation]: 

368 return self._remote.queryDatasetAssociations( 

369 datasetType, collections, collectionTypes=collectionTypes, flattenChains=flattenChains 

370 ) 

371 

372 @property 

373 def storageClasses(self) -> StorageClassFactory: 

374 return self._remote.storageClasses 

375 

376 @storageClasses.setter 

377 def storageClasses(self, value: StorageClassFactory) -> None: 

378 raise NotImplementedError()