Coverage for python / lsst / daf / butler / _registry_shim.py: 41%

142 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-18 08:43 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30__all__ = ("RegistryShim",) 

31 

32import contextlib 

33from collections.abc import Iterable, Iterator, Mapping, Sequence 

34from typing import TYPE_CHECKING, Any 

35 

36from ._collection_type import CollectionType 

37from ._dataset_ref import DatasetId, DatasetIdGenEnum, DatasetRef 

38from ._dataset_type import DatasetType 

39from ._exceptions import CalibrationLookupError 

40from ._storage_class import StorageClassFactory 

41from ._timespan import Timespan 

42from .dimensions import ( 

43 DataCoordinate, 

44 DataId, 

45 DimensionElement, 

46 DimensionGroup, 

47 DimensionRecord, 

48 DimensionUniverse, 

49) 

50from .registry._collection_summary import CollectionSummary 

51from .registry._defaults import RegistryDefaults 

52from .registry._exceptions import NoDefaultCollectionError 

53from .registry._registry_base import RegistryBase 

54from .registry.queries._query_common import resolve_collections 

55 

56if TYPE_CHECKING: 

57 from .direct_butler import DirectButler 

58 from .registry._registry import CollectionArgType 

59 from .registry.interfaces import ObsCoreTableManager 

60 

61 

62class RegistryShim(RegistryBase): 

63 """Implementation of `Registry` interface exposed to clients by `Butler`. 

64 

65 Parameters 

66 ---------- 

67 butler : `DirectButler` 

68 Data butler instance. 

69 

70 Notes 

71 ----- 

72 This shim implementation of `Registry` forwards all methods to an actual 

73 Registry instance which is internal to Butler or to Butler methods. Its 

74 purpose is to provide a stable interface to many client-visible operations 

75 while we perform re-structuring of Registry and Butler implementations. 

76 """ 

77 

78 def __init__(self, butler: DirectButler): 

79 super().__init__(butler) 

80 self._registry = butler._registry 

81 

82 def isWriteable(self) -> bool: 

83 # Docstring inherited from a base class. 

84 return self._registry.isWriteable() 

85 

86 @property 

87 def dimensions(self) -> DimensionUniverse: 

88 # Docstring inherited from a base class. 

89 return self._registry.dimensions 

90 

91 @property 

92 def defaults(self) -> RegistryDefaults: 

93 # Docstring inherited from a base class. 

94 return self._registry.defaults 

95 

96 @defaults.setter 

97 def defaults(self, value: RegistryDefaults) -> None: 

98 # Docstring inherited from a base class. 

99 self._registry.defaults = value 

100 

101 def refresh(self) -> None: 

102 # Docstring inherited from a base class. 

103 self._registry.refresh() 

104 

105 def refresh_collection_summaries(self) -> None: 

106 # Docstring inherited from a base class. 

107 self._registry.refresh_collection_summaries() 

108 

109 def caching_context(self) -> contextlib.AbstractContextManager[None]: 

110 # Docstring inherited from a base class. 

111 return self._butler._caching_context() 

112 

113 @contextlib.contextmanager 

114 def transaction(self, *, savepoint: bool = False) -> Iterator[None]: 

115 # Docstring inherited from a base class. 

116 with self._registry.transaction(savepoint=savepoint): 

117 yield 

118 

119 def resetConnectionPool(self) -> None: 

120 # Docstring inherited from a base class. 

121 self._registry.resetConnectionPool() 

122 

123 def registerCollection( 

124 self, name: str, type: CollectionType = CollectionType.TAGGED, doc: str | None = None 

125 ) -> bool: 

126 # Docstring inherited from a base class. 

127 return self._registry.registerCollection(name, type, doc) 

128 

129 def getCollectionType(self, name: str) -> CollectionType: 

130 # Docstring inherited from a base class. 

131 return self._registry.getCollectionType(name) 

132 

133 def registerRun(self, name: str, doc: str | None = None) -> bool: 

134 # Docstring inherited from a base class. 

135 return self._registry.registerRun(name, doc) 

136 

137 def removeCollection(self, name: str) -> None: 

138 # Docstring inherited from a base class. 

139 self._registry.removeCollection(name) 

140 

141 def getCollectionChain(self, parent: str) -> Sequence[str]: 

142 # Docstring inherited from a base class. 

143 return self._registry.getCollectionChain(parent) 

144 

145 def setCollectionChain(self, parent: str, children: Any, *, flatten: bool = False) -> None: 

146 # Docstring inherited from a base class. 

147 self._registry.setCollectionChain(parent, children, flatten=flatten) 

148 

149 def getCollectionParentChains(self, collection: str) -> set[str]: 

150 # Docstring inherited from a base class. 

151 return self._registry.getCollectionParentChains(collection) 

152 

153 def getCollectionDocumentation(self, collection: str) -> str | None: 

154 # Docstring inherited from a base class. 

155 return self._registry.getCollectionDocumentation(collection) 

156 

157 def setCollectionDocumentation(self, collection: str, doc: str | None) -> None: 

158 # Docstring inherited from a base class. 

159 self._registry.setCollectionDocumentation(collection, doc) 

160 

161 def getCollectionSummary(self, collection: str) -> CollectionSummary: 

162 # Docstring inherited from a base class. 

163 return self._registry.getCollectionSummary(collection) 

164 

165 def registerDatasetType(self, datasetType: DatasetType) -> bool: 

166 # Docstring inherited from a base class. 

167 return self._registry.registerDatasetType(datasetType) 

168 

169 def removeDatasetType(self, name: str | tuple[str, ...]) -> None: 

170 # Docstring inherited from a base class. 

171 self._registry.removeDatasetType(name) 

172 

173 def getDatasetType(self, name: str) -> DatasetType: 

174 # Docstring inherited from a base class. 

175 return self._registry.getDatasetType(name) 

176 

177 def supportsIdGenerationMode(self, mode: DatasetIdGenEnum) -> bool: 

178 # Docstring inherited from a base class. 

179 return self._registry.supportsIdGenerationMode(mode) 

180 

181 def findDataset( 

182 self, 

183 datasetType: DatasetType | str, 

184 dataId: DataId | None = None, 

185 *, 

186 collections: CollectionArgType | None = None, 

187 timespan: Timespan | None = None, 

188 datastore_records: bool = False, 

189 **kwargs: Any, 

190 ) -> DatasetRef | None: 

191 # Docstring inherited from a base class. 

192 if not isinstance(datasetType, DatasetType): 

193 datasetType = self.getDatasetType(datasetType) 

194 

195 dataId = DataCoordinate.standardize( 

196 dataId, 

197 dimensions=datasetType.dimensions, 

198 universe=self.dimensions, 

199 defaults=self.defaults.dataId, 

200 **kwargs, 

201 ) 

202 

203 with self._butler.query() as query: 

204 resolved_collections = resolve_collections(self._butler, collections) 

205 if not resolved_collections: 

206 if collections is None: 

207 raise NoDefaultCollectionError("No collections provided, and no default collections set") 

208 else: 

209 return None 

210 

211 if datasetType.isCalibration() and timespan is None: 

212 # Filter out calibration collections, because with no timespan 

213 # we have no way of selecting a dataset from them. 

214 collection_info = self._butler.collections.query_info( 

215 resolved_collections, flatten_chains=True 

216 ) 

217 resolved_collections = [ 

218 info.name for info in collection_info if info.type != CollectionType.CALIBRATION 

219 ] 

220 if not resolved_collections: 

221 return None 

222 

223 result = query.datasets(datasetType, resolved_collections, find_first=True).limit(2) 

224 dataset_type_name = result.dataset_type.name 

225 # Search only on the 'required' dimensions for the dataset type. 

226 # Any extra values provided by the user are ignored. 

227 minimal_data_id = DataCoordinate.standardize( 

228 dataId.subset(datasetType.dimensions.required).required, universe=self.dimensions 

229 ) 

230 result = result.where(minimal_data_id) 

231 if ( 

232 datasetType.isCalibration() 

233 and timespan is not None 

234 and (timespan.begin is not None or timespan.end is not None) 

235 ): 

236 timespan_column = query.expression_factory[dataset_type_name].timespan 

237 result = result.where(timespan_column.overlaps(timespan)) 

238 

239 datasets = list(result) 

240 if len(datasets) == 1: 

241 ref = datasets[0] 

242 if dataId.hasRecords(): 

243 ref = ref.expanded(dataId) 

244 # Propagate storage class from user-provided DatasetType, which 

245 # may not match the definition in the database. 

246 ref = ref.overrideStorageClass(datasetType.storageClass_name) 

247 if datastore_records: 

248 ref = self._registry.get_datastore_records(ref) 

249 return ref 

250 elif len(datasets) == 0: 

251 return None 

252 else: 

253 raise CalibrationLookupError( 

254 f"Ambiguous calibration lookup for {datasetType} with timespan {timespan}" 

255 f" in collections {resolved_collections}." 

256 ) 

257 

258 def insertDatasets( 

259 self, 

260 datasetType: DatasetType | str, 

261 dataIds: Iterable[DataId], 

262 run: str | None = None, 

263 expand: bool = True, 

264 idGenerationMode: DatasetIdGenEnum = DatasetIdGenEnum.UNIQUE, 

265 ) -> list[DatasetRef]: 

266 # Docstring inherited from a base class. 

267 return self._registry.insertDatasets(datasetType, dataIds, run, expand, idGenerationMode) 

268 

269 def _importDatasets( 

270 self, datasets: Iterable[DatasetRef], expand: bool = True, assume_new: bool = False 

271 ) -> list[DatasetRef]: 

272 # Docstring inherited from a base class. 

273 return self._registry._importDatasets(datasets, expand, assume_new) 

274 

275 def getDataset(self, id: DatasetId) -> DatasetRef | None: 

276 # Docstring inherited from a base class. 

277 return self._registry.getDataset(id) 

278 

279 def _fetch_run_dataset_ids(self, run: str) -> list[DatasetId]: 

280 # Docstring inherited. 

281 return self._registry._fetch_run_dataset_ids(run) 

282 

283 def removeDatasets(self, refs: Iterable[DatasetRef]) -> None: 

284 # Docstring inherited from a base class. 

285 self._registry.removeDatasets(refs) 

286 

287 def associate(self, collection: str, refs: Iterable[DatasetRef]) -> None: 

288 # Docstring inherited from a base class. 

289 self._registry.associate(collection, refs) 

290 

291 def disassociate(self, collection: str, refs: Iterable[DatasetRef]) -> None: 

292 # Docstring inherited from a base class. 

293 self._registry.disassociate(collection, refs) 

294 

295 def certify(self, collection: str, refs: Iterable[DatasetRef], timespan: Timespan) -> None: 

296 # Docstring inherited from a base class. 

297 self._registry.certify(collection, refs, timespan) 

298 

299 def decertify( 

300 self, 

301 collection: str, 

302 datasetType: str | DatasetType, 

303 timespan: Timespan, 

304 *, 

305 dataIds: Iterable[DataId] | None = None, 

306 ) -> None: 

307 # Docstring inherited from a base class. 

308 self._registry.decertify(collection, datasetType, timespan, dataIds=dataIds) 

309 

310 def getDatasetLocations(self, ref: DatasetRef) -> Iterable[str]: 

311 # Docstring inherited from a base class. 

312 return self._registry.getDatasetLocations(ref) 

313 

314 def expandDataId( 

315 self, 

316 dataId: DataId | None = None, 

317 *, 

318 dimensions: Iterable[str] | DimensionGroup | None = None, 

319 records: Mapping[str, DimensionRecord | None] | None = None, 

320 withDefaults: bool = True, 

321 **kwargs: Any, 

322 ) -> DataCoordinate: 

323 # Docstring inherited from a base class. 

324 return self._registry.expandDataId( 

325 dataId, dimensions=dimensions, records=records, withDefaults=withDefaults, **kwargs 

326 ) 

327 

328 def insertDimensionData( 

329 self, 

330 element: DimensionElement | str, 

331 *data: Mapping[str, Any] | DimensionRecord, 

332 conform: bool = True, 

333 replace: bool = False, 

334 skip_existing: bool = False, 

335 ) -> None: 

336 # Docstring inherited from a base class. 

337 self._registry.insertDimensionData( 

338 element, *data, conform=conform, replace=replace, skip_existing=skip_existing 

339 ) 

340 

341 def syncDimensionData( 

342 self, 

343 element: DimensionElement | str, 

344 row: Mapping[str, Any] | DimensionRecord, 

345 conform: bool = True, 

346 update: bool = False, 

347 ) -> bool | dict[str, Any]: 

348 # Docstring inherited from a base class. 

349 return self._registry.syncDimensionData(element, row, conform, update) 

350 

351 def queryDatasetTypes( 

352 self, 

353 expression: Any = ..., 

354 *, 

355 missing: list[str] | None = None, 

356 ) -> Iterable[DatasetType]: 

357 # Docstring inherited from a base class. 

358 return self._registry.queryDatasetTypes(expression, missing=missing) 

359 

360 def queryCollections( 

361 self, 

362 expression: Any = ..., 

363 datasetType: DatasetType | None = None, 

364 collectionTypes: Iterable[CollectionType] | CollectionType = CollectionType.all(), 

365 flattenChains: bool = False, 

366 includeChains: bool | None = None, 

367 ) -> Sequence[str]: 

368 # Docstring inherited from a base class. 

369 return self._registry.queryCollections( 

370 expression, datasetType, collectionTypes, flattenChains, includeChains 

371 ) 

372 

373 @property 

374 def obsCoreTableManager(self) -> ObsCoreTableManager | None: 

375 # Docstring inherited from a base class. 

376 return self._registry.obsCoreTableManager 

377 

378 @property 

379 def storageClasses(self) -> StorageClassFactory: 

380 return self._registry.storageClasses