Coverage for python / lsst / daf / butler / tests / hybrid_butler.py: 0%

130 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-26 08:48 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30from collections.abc import Collection, Iterable, Iterator, Sequence 

31from contextlib import AbstractContextManager, contextmanager 

32from types import EllipsisType 

33from typing import Any, TextIO, cast 

34 

35from lsst.resources import ResourcePath, ResourcePathExpression 

36 

37from .._butler import Butler, _DeprecatedDefault 

38from .._butler_collections import ButlerCollections 

39from .._butler_metrics import ButlerMetrics 

40from .._dataset_existence import DatasetExistence 

41from .._dataset_provenance import DatasetProvenance 

42from .._dataset_ref import DatasetId, DatasetRef 

43from .._dataset_type import DatasetType 

44from .._deferredDatasetHandle import DeferredDatasetHandle 

45from .._file_dataset import FileDataset 

46from .._limited_butler import LimitedButler 

47from .._query_all_datasets import QueryAllDatasetsParameters 

48from .._storage_class import StorageClass 

49from .._timespan import Timespan 

50from ..datastore import DatasetRefURIs, FileTransferSource 

51from ..dimensions import DataCoordinate, DataId, DimensionElement, DimensionRecord, DimensionUniverse 

52from ..direct_butler import DirectButler 

53from ..queries import Query 

54from ..registry import CollectionArgType, Registry 

55from ..remote_butler import RemoteButler 

56from ..transfers import RepoExportContext 

57from .hybrid_butler_collections import HybridButlerCollections 

58from .hybrid_butler_registry import HybridButlerRegistry 

59 

60 

61class HybridButler(Butler): 

62 """A `Butler` that delegates methods to internal RemoteButler and 

63 DirectButler instances. Intended to allow testing of RemoteButler before 

64 its implementation is complete, by delegating unsupported methods to 

65 DirectButler. 

66 """ 

67 

68 _remote_butler: RemoteButler 

69 _direct_butler: DirectButler 

70 _registry: Registry 

71 

72 def __new__(cls, remote_butler: RemoteButler, direct_butler: DirectButler) -> HybridButler: 

73 self = cast(HybridButler, super().__new__(cls)) 

74 self._remote_butler = remote_butler 

75 self._direct_butler = direct_butler 

76 self._datastore = direct_butler._datastore 

77 self._registry = HybridButlerRegistry(direct_butler._registry, remote_butler.registry) 

78 # Force shared metrics. 

79 self._metrics = self._direct_butler._metrics 

80 self._remote_butler._metrics = self._metrics 

81 return self 

82 

83 def isWriteable(self) -> bool: 

84 return self._remote_butler.isWriteable() 

85 

86 def _caching_context(self) -> AbstractContextManager[None]: 

87 return self._direct_butler._caching_context() 

88 

89 def transaction(self) -> AbstractContextManager[None]: 

90 return self._direct_butler.transaction() 

91 

92 @contextmanager 

93 def record_metrics(self, metrics: ButlerMetrics | None = None) -> Iterator[ButlerMetrics]: 

94 if metrics is None: 

95 # Share same metrics in both butlers. 

96 metrics = ButlerMetrics() 

97 

98 with self._direct_butler.record_metrics(metrics), self._remote_butler.record_metrics(metrics): 

99 yield metrics 

100 

101 def put( 

102 self, 

103 obj: Any, 

104 datasetRefOrType: DatasetRef | DatasetType | str, 

105 /, 

106 dataId: DataId | None = None, 

107 *, 

108 run: str | None = None, 

109 provenance: DatasetProvenance | None = None, 

110 **kwargs: Any, 

111 ) -> DatasetRef: 

112 return self._direct_butler.put( 

113 obj, datasetRefOrType, dataId, run=run, provenance=provenance, **kwargs 

114 ) 

115 

116 def getDeferred( 

117 self, 

118 datasetRefOrType: DatasetRef | DatasetType | str, 

119 /, 

120 dataId: DataId | None = None, 

121 *, 

122 parameters: dict | None = None, 

123 collections: Any = None, 

124 storageClass: str | StorageClass | None = None, 

125 **kwargs: Any, 

126 ) -> DeferredDatasetHandle: 

127 return self._remote_butler.getDeferred( 

128 datasetRefOrType, 

129 dataId, 

130 parameters=parameters, 

131 collections=collections, 

132 storageClass=storageClass, 

133 **kwargs, 

134 ) 

135 

136 def get( 

137 self, 

138 datasetRefOrType: DatasetRef | DatasetType | str, 

139 /, 

140 dataId: DataId | None = None, 

141 *, 

142 parameters: dict[str, Any] | None = None, 

143 collections: Any = None, 

144 storageClass: StorageClass | str | None = None, 

145 **kwargs: Any, 

146 ) -> Any: 

147 return self._remote_butler.get( 

148 datasetRefOrType, 

149 dataId, 

150 parameters=parameters, 

151 collections=collections, 

152 storageClass=storageClass, 

153 **kwargs, 

154 ) 

155 

156 def getURIs( 

157 self, 

158 datasetRefOrType: DatasetRef | DatasetType | str, 

159 /, 

160 dataId: DataId | None = None, 

161 *, 

162 predict: bool = False, 

163 collections: Any = None, 

164 run: str | None = None, 

165 **kwargs: Any, 

166 ) -> DatasetRefURIs: 

167 return self._remote_butler.getURIs( 

168 datasetRefOrType, dataId, predict=predict, collections=collections, run=run, **kwargs 

169 ) 

170 

171 def getURI( 

172 self, 

173 datasetRefOrType: DatasetRef | DatasetType | str, 

174 /, 

175 dataId: DataId | None = None, 

176 *, 

177 predict: bool = False, 

178 collections: Any = None, 

179 run: str | None = None, 

180 **kwargs: Any, 

181 ) -> ResourcePath: 

182 return self._remote_butler.getURI( 

183 datasetRefOrType, dataId, predict=predict, collections=collections, run=run, **kwargs 

184 ) 

185 

186 def get_dataset_type(self, name: str) -> DatasetType: 

187 return self._remote_butler.get_dataset_type(name) 

188 

189 def get_dataset( 

190 self, 

191 id: DatasetId | str, 

192 *, 

193 storage_class: str | StorageClass | None = None, 

194 dimension_records: bool = False, 

195 datastore_records: bool = False, 

196 ) -> DatasetRef | None: 

197 return self._remote_butler.get_dataset( 

198 id, 

199 storage_class=storage_class, 

200 dimension_records=dimension_records, 

201 datastore_records=datastore_records, 

202 ) 

203 

204 def get_many_datasets(self, ids: Iterable[DatasetId | str]) -> list[DatasetRef]: 

205 return self._remote_butler.get_many_datasets(ids) 

206 

207 def find_dataset( 

208 self, 

209 dataset_type: DatasetType | str, 

210 data_id: DataId | None = None, 

211 *, 

212 collections: str | Sequence[str] | None = None, 

213 timespan: Timespan | None = None, 

214 storage_class: str | StorageClass | None = None, 

215 dimension_records: bool = False, 

216 datastore_records: bool = False, 

217 **kwargs: Any, 

218 ) -> DatasetRef | None: 

219 return self._remote_butler.find_dataset( 

220 dataset_type, 

221 data_id, 

222 collections=collections, 

223 timespan=timespan, 

224 storage_class=storage_class, 

225 dimension_records=dimension_records, 

226 datastore_records=datastore_records, 

227 **kwargs, 

228 ) 

229 

230 def retrieve_artifacts_zip( 

231 self, 

232 refs: Iterable[DatasetRef], 

233 destination: ResourcePathExpression, 

234 overwrite: bool = True, 

235 ) -> ResourcePath: 

236 return self._remote_butler.retrieve_artifacts_zip(refs, destination, overwrite) 

237 

238 def retrieveArtifacts( 

239 self, 

240 refs: Iterable[DatasetRef], 

241 destination: ResourcePathExpression, 

242 transfer: str = "auto", 

243 preserve_path: bool = True, 

244 overwrite: bool = False, 

245 ) -> list[ResourcePath]: 

246 return self._remote_butler.retrieveArtifacts(refs, destination, transfer, preserve_path, overwrite) 

247 

248 def exists( 

249 self, 

250 dataset_ref_or_type: DatasetRef | DatasetType | str, 

251 /, 

252 data_id: DataId | None = None, 

253 *, 

254 full_check: bool = True, 

255 collections: Any = None, 

256 **kwargs: Any, 

257 ) -> DatasetExistence: 

258 return self._remote_butler.exists( 

259 dataset_ref_or_type, data_id, full_check=full_check, collections=collections, **kwargs 

260 ) 

261 

262 def _exists_many( 

263 self, 

264 refs: Iterable[DatasetRef], 

265 /, 

266 *, 

267 full_check: bool = True, 

268 ) -> dict[DatasetRef, DatasetExistence]: 

269 return self._remote_butler._exists_many(refs, full_check=full_check) 

270 

271 def removeRuns( 

272 self, 

273 names: Iterable[str], 

274 unstore: bool | type[_DeprecatedDefault] = _DeprecatedDefault, 

275 *, 

276 unlink_from_chains: bool = False, 

277 ) -> None: 

278 return self._direct_butler.removeRuns(names, unstore, unlink_from_chains=unlink_from_chains) 

279 

280 def ingest_zip( 

281 self, 

282 zip_file: ResourcePathExpression, 

283 transfer: str = "auto", 

284 *, 

285 transfer_dimensions: bool = False, 

286 dry_run: bool = False, 

287 skip_existing: bool = False, 

288 ) -> None: 

289 # Docstring inherited. 

290 return self._direct_butler.ingest_zip( 

291 zip_file, 

292 transfer=transfer, 

293 transfer_dimensions=transfer_dimensions, 

294 dry_run=dry_run, 

295 skip_existing=skip_existing, 

296 ) 

297 

298 def ingest( 

299 self, 

300 *datasets: FileDataset, 

301 transfer: str | None = "auto", 

302 record_validation_info: bool = True, 

303 skip_existing: bool = False, 

304 ) -> None: 

305 return self._direct_butler.ingest( 

306 *datasets, 

307 transfer=transfer, 

308 record_validation_info=record_validation_info, 

309 skip_existing=skip_existing, 

310 ) 

311 

312 def export( 

313 self, 

314 *, 

315 directory: str | None = None, 

316 filename: str | None = None, 

317 format: str | None = None, 

318 transfer: str | None = None, 

319 ) -> AbstractContextManager[RepoExportContext]: 

320 return self._direct_butler.export( 

321 directory=directory, filename=filename, format=format, transfer=transfer 

322 ) 

323 

324 def import_( 

325 self, 

326 *, 

327 directory: ResourcePathExpression | None = None, 

328 filename: ResourcePathExpression | TextIO | None = None, 

329 format: str | None = None, 

330 transfer: str | None = None, 

331 skip_dimensions: set | None = None, 

332 record_validation_info: bool = True, 

333 without_datastore: bool = False, 

334 ) -> None: 

335 self._direct_butler.import_( 

336 directory=directory, 

337 filename=filename, 

338 format=format, 

339 transfer=transfer, 

340 skip_dimensions=skip_dimensions, 

341 record_validation_info=record_validation_info, 

342 without_datastore=without_datastore, 

343 ) 

344 

345 def transfer_dimension_records_from( 

346 self, source_butler: LimitedButler | Butler, source_refs: Iterable[DatasetRef | DataCoordinate] 

347 ) -> None: 

348 return self._direct_butler.transfer_dimension_records_from(source_butler, source_refs) 

349 

350 def transfer_from( 

351 self, 

352 source_butler: LimitedButler, 

353 source_refs: Iterable[DatasetRef], 

354 transfer: str = "auto", 

355 skip_missing: bool = True, 

356 register_dataset_types: bool = False, 

357 transfer_dimensions: bool = False, 

358 dry_run: bool = False, 

359 ) -> Collection[DatasetRef]: 

360 return self._direct_butler.transfer_from( 

361 source_butler, 

362 source_refs, 

363 transfer, 

364 skip_missing, 

365 register_dataset_types, 

366 transfer_dimensions, 

367 dry_run, 

368 ) 

369 

370 def validateConfiguration( 

371 self, 

372 logFailures: bool = False, 

373 datasetTypeNames: Iterable[str] | None = None, 

374 ignore: Iterable[str] | None = None, 

375 ) -> None: 

376 return self._direct_butler.validateConfiguration(logFailures, datasetTypeNames, ignore) 

377 

378 @property 

379 def run(self) -> str | None: 

380 return self._remote_butler.run 

381 

382 @property 

383 def registry(self) -> Registry: 

384 return self._registry 

385 

386 def query(self) -> AbstractContextManager[Query]: 

387 return self._remote_butler.query() 

388 

389 def clone( 

390 self, 

391 *, 

392 collections: CollectionArgType | None | EllipsisType = ..., 

393 run: str | None | EllipsisType = ..., 

394 inferDefaults: bool | EllipsisType = ..., 

395 dataId: dict[str, str] | EllipsisType = ..., 

396 metrics: ButlerMetrics | None = None, 

397 ) -> HybridButler: 

398 remote_butler = self._remote_butler.clone( 

399 collections=collections, run=run, inferDefaults=inferDefaults, dataId=dataId, metrics=metrics 

400 ) 

401 direct_butler = self._direct_butler.clone( 

402 collections=collections, run=run, inferDefaults=inferDefaults, dataId=dataId, metrics=metrics 

403 ) 

404 return HybridButler(remote_butler, direct_butler) 

405 

406 def pruneDatasets( 

407 self, 

408 refs: Iterable[DatasetRef], 

409 *, 

410 disassociate: bool = True, 

411 unstore: bool = False, 

412 tags: Iterable[str] = (), 

413 purge: bool = False, 

414 ) -> None: 

415 return self._direct_butler.pruneDatasets( 

416 refs, disassociate=disassociate, unstore=unstore, tags=tags, purge=purge 

417 ) 

418 

419 @property 

420 def dimensions(self) -> DimensionUniverse: 

421 return self._remote_butler.dimensions 

422 

423 def _extract_all_dimension_records_from_data_ids( 

424 self, 

425 source_butler: LimitedButler | Butler, 

426 data_ids: set[DataCoordinate], 

427 allowed_elements: frozenset[DimensionElement], 

428 ) -> dict[DimensionElement, dict[DataCoordinate, DimensionRecord]]: 

429 return self._direct_butler._extract_all_dimension_records_from_data_ids( 

430 source_butler, data_ids, allowed_elements 

431 ) 

432 

433 def _expand_data_ids(self, data_ids: Iterable[DataCoordinate]) -> list[DataCoordinate]: 

434 return self._remote_butler._expand_data_ids(data_ids) 

435 

436 @property 

437 def collection_chains(self) -> ButlerCollections: 

438 return HybridButlerCollections(self) 

439 

440 @property 

441 def collections(self) -> ButlerCollections: 

442 return HybridButlerCollections(self) 

443 

444 def _query_all_datasets_by_page( 

445 self, args: QueryAllDatasetsParameters 

446 ) -> AbstractContextManager[Iterator[list[DatasetRef]]]: 

447 return self._remote_butler._query_all_datasets_by_page(args) 

448 

449 @property 

450 def _file_transfer_source(self) -> FileTransferSource: 

451 return self._remote_butler._file_transfer_source 

452 

453 def close(self) -> None: 

454 self._direct_butler.close() 

455 self._remote_butler.close()