Coverage for python / lsst / daf / butler / remote_butler / _remote_butler.py: 0%

290 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:36 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30__all__ = ("RemoteButler",) 

31 

32import logging 

33import uuid 

34from collections.abc import Collection, Iterable, Iterator, Sequence 

35from contextlib import AbstractContextManager, contextmanager 

36from types import EllipsisType 

37from typing import TYPE_CHECKING, Any, TextIO, cast 

38 

39from deprecated.sphinx import deprecated 

40 

41from lsst.daf.butler.datastores.file_datastore.retrieve_artifacts import ( 

42 ArtifactIndexInfo, 

43 ZipIndex, 

44 determine_destination_for_retrieved_artifact, 

45 retrieve_and_zip, 

46 unpack_zips, 

47) 

48from lsst.resources import ResourcePath, ResourcePathExpression 

49from lsst.utils.iteration import chunk_iterable 

50 

51from .._butler import Butler, _DeprecatedDefault 

52from .._butler_collections import ButlerCollections 

53from .._butler_metrics import ButlerMetrics 

54from .._dataset_existence import DatasetExistence 

55from .._dataset_ref import DatasetId, DatasetRef 

56from .._dataset_type import DatasetType 

57from .._deferredDatasetHandle import DeferredDatasetHandle 

58from .._exceptions import DatasetNotFoundError 

59from .._query_all_datasets import QueryAllDatasetsParameters 

60from .._storage_class import StorageClass, StorageClassFactory 

61from .._utilities.locked_object import LockedObject 

62from ..datastore import DatasetRefURIs, DatastoreConfig 

63from ..datastore.cache_manager import AbstractDatastoreCacheManager, DatastoreCacheManager 

64from ..dimensions import DataCoordinate, DataIdValue, DimensionConfig, DimensionUniverse, SerializedDataId 

65from ..queries import Query 

66from ..queries.tree import make_column_literal 

67from ..registry import CollectionArgType, NoDefaultCollectionError, Registry, RegistryDefaults 

68from ..registry.expand_data_ids import expand_data_ids 

69from ._collection_args import convert_collection_arg_to_glob_string_list 

70from ._defaults import DefaultsHolder 

71from ._get import convert_http_url_to_resource_path, get_dataset_as_python_object 

72from ._http_connection import RemoteButlerHttpConnection, parse_model, quote_path_variable 

73from ._query_driver import RemoteQueryDriver 

74from ._query_results import convert_dataset_ref_results, read_query_results 

75from ._ref_utils import apply_storage_class_override, normalize_dataset_type_name, simplify_dataId 

76from ._registry import RemoteButlerRegistry 

77from ._remote_butler_collections import RemoteButlerCollections 

78from ._remote_file_transfer_source import RemoteFileTransferSource 

79from .server_models import ( 

80 CollectionList, 

81 FileInfoPayload, 

82 FindDatasetRequestModel, 

83 FindDatasetResponseModel, 

84 GetDatasetTypeResponseModel, 

85 GetFileByDataIdRequestModel, 

86 GetFileResponseModel, 

87 GetManyDatasetsRequestModel, 

88 GetManyDatasetsResponseModel, 

89 GetUniverseResponseModel, 

90 QueryAllDatasetsRequestModel, 

91) 

92 

93if TYPE_CHECKING: 

94 from .._dataset_provenance import DatasetProvenance 

95 from .._file_dataset import FileDataset 

96 from .._limited_butler import LimitedButler 

97 from .._timespan import Timespan 

98 from ..dimensions import DataId 

99 from ..transfers import RepoExportContext 

100 

101 

102_LOG = logging.getLogger(__name__) 

103 

104 

105class RemoteButler(Butler): # numpydoc ignore=PR02 

106 """A `Butler` that can be used to connect through a remote server. 

107 

108 Parameters 

109 ---------- 

110 options : `ButlerInstanceOptions` 

111 Default values and other settings for the Butler instance. 

112 connection : `RemoteButlerHttpConnection` 

113 Connection to Butler server. 

114 cache : `RemoteButlerCache` 

115 Cache of data shared between multiple RemoteButler instances connected 

116 to the same server. 

117 use_disabled_datastore_cache : `bool`, optional 

118 If `True`, a datastore cache manager will be created with a default 

119 disabled state which can be enabled by the environment. If `False` 

120 a cache manager will be constructed from the default local 

121 configuration, likely caching by default but only specific storage 

122 classes. 

123 

124 Notes 

125 ----- 

126 Instead of using this constructor, most users should use either 

127 `Butler.from_config` or `RemoteButlerFactory`. 

128 """ 

129 

130 _registry_defaults: DefaultsHolder 

131 _connection: RemoteButlerHttpConnection 

132 _cache: RemoteButlerCache 

133 _registry: RemoteButlerRegistry 

134 _datastore_cache_manager: AbstractDatastoreCacheManager | None 

135 _use_disabled_datastore_cache: bool 

136 

137 # This is __new__ instead of __init__ because we have to support 

138 # instantiation via the legacy constructor Butler.__new__(), which 

139 # reads the configuration and selects which subclass to instantiate. The 

140 # interaction between __new__ and __init__ is kind of wacky in Python. If 

141 # we were using __init__ here, __init__ would be called twice (once when 

142 # the RemoteButler instance is constructed inside Butler.from_config(), and 

143 # a second time with the original arguments to Butler() when the instance 

144 # is returned from Butler.__new__() 

145 def __new__( 

146 cls, 

147 *, 

148 connection: RemoteButlerHttpConnection, 

149 defaults: RegistryDefaults, 

150 cache: RemoteButlerCache, 

151 use_disabled_datastore_cache: bool = True, 

152 metrics: ButlerMetrics | None = None, 

153 ) -> RemoteButler: 

154 self = cast(RemoteButler, super().__new__(cls)) 

155 self.storageClasses = StorageClassFactory() 

156 

157 self._connection = connection 

158 self._cache = cache 

159 self._datastore_cache_manager = None 

160 self._use_disabled_datastore_cache = use_disabled_datastore_cache 

161 self._metrics = metrics if metrics is not None else ButlerMetrics() 

162 

163 self._registry_defaults = DefaultsHolder(defaults) 

164 self._registry = RemoteButlerRegistry(self, self._registry_defaults, self._connection) 

165 defaults.finish(self._registry) 

166 

167 return self 

168 

169 def isWriteable(self) -> bool: 

170 # Docstring inherited. 

171 return False 

172 

173 @property 

174 @deprecated( 

175 "Please use 'collections' instead. collection_chains will be removed after v28.", 

176 version="v28", 

177 category=FutureWarning, 

178 ) 

179 def collection_chains(self) -> ButlerCollections: 

180 """Object with methods for modifying collection chains.""" 

181 return self.collections 

182 

183 @property 

184 def collections(self) -> ButlerCollections: 

185 """Object with methods for modifying and querying collections.""" 

186 return RemoteButlerCollections(self._registry_defaults, self._connection) 

187 

188 @property 

189 def dimensions(self) -> DimensionUniverse: 

190 # Docstring inherited. 

191 with self._cache.access() as cache: 

192 if cache.dimensions is not None: 

193 return cache.dimensions 

194 

195 response = self._connection.get("universe") 

196 model = parse_model(response, GetUniverseResponseModel) 

197 

198 config = DimensionConfig.from_simple(model.universe) 

199 universe = DimensionUniverse(config) 

200 with self._cache.access() as cache: 

201 if cache.dimensions is None: 

202 cache.dimensions = universe 

203 return cache.dimensions 

204 

205 @property 

206 def _cache_manager(self) -> AbstractDatastoreCacheManager: 

207 """Cache manager to use when reading files from the butler.""" 

208 # RemoteButler does not get any cache configuration from the server. 

209 # Either create a disabled cache manager which can be enabled via the 

210 # environment, or create a cache manager from the default FileDatastore 

211 # config. This will not work properly if the defaults for 

212 # DatastoreConfig no longer include the cache. 

213 if self._datastore_cache_manager is None: 

214 datastore_config = DatastoreConfig() 

215 if not self._use_disabled_datastore_cache and "cached" in datastore_config: 

216 self._datastore_cache_manager = DatastoreCacheManager( 

217 datastore_config["cached"], universe=self.dimensions 

218 ) 

219 else: 

220 self._datastore_cache_manager = DatastoreCacheManager.create_disabled( 

221 universe=self.dimensions 

222 ) 

223 return self._datastore_cache_manager 

224 

225 def _caching_context(self) -> AbstractContextManager[None]: 

226 # Docstring inherited. 

227 # Not implemented for now, will have to think whether this needs to 

228 # do something on client side and/or remote side. 

229 raise NotImplementedError() 

230 

231 def transaction(self) -> AbstractContextManager[None]: 

232 """Will always raise NotImplementedError. 

233 Transactions are not supported by RemoteButler. 

234 """ 

235 raise NotImplementedError() 

236 

237 def put( 

238 self, 

239 obj: Any, 

240 datasetRefOrType: DatasetRef | DatasetType | str, 

241 /, 

242 dataId: DataId | None = None, 

243 *, 

244 run: str | None = None, 

245 provenance: DatasetProvenance | None = None, 

246 **kwargs: Any, 

247 ) -> DatasetRef: 

248 # Docstring inherited. 

249 raise NotImplementedError() 

250 

251 def getDeferred( 

252 self, 

253 datasetRefOrType: DatasetRef | DatasetType | str, 

254 /, 

255 dataId: DataId | None = None, 

256 *, 

257 parameters: dict | None = None, 

258 collections: Any = None, 

259 storageClass: str | StorageClass | None = None, 

260 timespan: Timespan | None = None, 

261 **kwargs: Any, 

262 ) -> DeferredDatasetHandle: 

263 response = self._get_file_info(datasetRefOrType, dataId, collections, timespan, kwargs) 

264 # Check that artifact information is available. 

265 _to_file_payload(response) 

266 ref = DatasetRef.from_simple(response.dataset_ref, universe=self.dimensions) 

267 return DeferredDatasetHandle(butler=self, ref=ref, parameters=parameters, storageClass=storageClass) 

268 

269 def get( 

270 self, 

271 datasetRefOrType: DatasetRef | DatasetType | str, 

272 /, 

273 dataId: DataId | None = None, 

274 *, 

275 parameters: dict[str, Any] | None = None, 

276 collections: Any = None, 

277 storageClass: StorageClass | str | None = None, 

278 timespan: Timespan | None = None, 

279 **kwargs: Any, 

280 ) -> Any: 

281 # Docstring inherited. 

282 with self._metrics.instrument_get(log=_LOG, msg="Retrieved remote dataset"): 

283 model = self._get_file_info(datasetRefOrType, dataId, collections, timespan, kwargs) 

284 

285 ref = DatasetRef.from_simple(model.dataset_ref, universe=self.dimensions) 

286 # If the caller provided a DatasetRef, they may have overridden the 

287 # component on it. We need to explicitly handle this because we 

288 # did not send the DatasetType to the server in this case. 

289 if isinstance(datasetRefOrType, DatasetRef): 

290 componentOverride = datasetRefOrType.datasetType.component() 

291 if componentOverride: 

292 ref = ref.makeComponentRef(componentOverride) 

293 ref = apply_storage_class_override(ref, datasetRefOrType, storageClass) 

294 

295 return self._get_dataset_as_python_object(ref, model, parameters) 

296 

297 def _get_dataset_as_python_object( 

298 self, 

299 ref: DatasetRef, 

300 model: GetFileResponseModel, 

301 parameters: dict[str, Any] | None, 

302 ) -> Any: 

303 # This thin wrapper method is here to provide a place to hook in a mock 

304 # mimicking DatastoreMock functionality for use in unit tests. 

305 return get_dataset_as_python_object( 

306 ref, 

307 _to_file_payload(model), 

308 auth=self._connection.auth, 

309 parameters=parameters, 

310 cache_manager=self._cache_manager, 

311 ) 

312 

313 def _get_file_info( 

314 self, 

315 datasetRefOrType: DatasetRef | DatasetType | str, 

316 dataId: DataId | None, 

317 collections: CollectionArgType, 

318 timespan: Timespan | None, 

319 kwargs: dict[str, DataIdValue], 

320 ) -> GetFileResponseModel: 

321 """Send a request to the server for the file URLs and metadata 

322 associated with a dataset. 

323 """ 

324 if isinstance(datasetRefOrType, DatasetRef): 

325 if dataId is not None: 

326 raise ValueError("DatasetRef given, cannot use dataId as well") 

327 return self._get_file_info_for_ref(datasetRefOrType) 

328 else: 

329 request = GetFileByDataIdRequestModel( 

330 dataset_type=normalize_dataset_type_name(datasetRefOrType), 

331 collections=self._normalize_collections(collections), 

332 data_id=simplify_dataId(dataId, kwargs), 

333 default_data_id=self._serialize_default_data_id(), 

334 timespan=timespan, 

335 ) 

336 response = self._connection.post("get_file_by_data_id", request) 

337 return parse_model(response, GetFileResponseModel) 

338 

339 def _get_file_info_for_ref(self, ref: DatasetRef) -> GetFileResponseModel: 

340 response = self._connection.get(f"get_file/{_to_uuid_string(ref.id)}") 

341 return parse_model(response, GetFileResponseModel) 

342 

343 def getURIs( 

344 self, 

345 datasetRefOrType: DatasetRef | DatasetType | str, 

346 /, 

347 dataId: DataId | None = None, 

348 *, 

349 predict: bool = False, 

350 collections: Any = None, 

351 run: str | None = None, 

352 **kwargs: Any, 

353 ) -> DatasetRefURIs: 

354 # Docstring inherited. 

355 if predict or run: 

356 raise NotImplementedError("Predict mode is not supported by RemoteButler") 

357 

358 response = self._get_file_info(datasetRefOrType, dataId, collections, None, kwargs) 

359 file_info = _to_file_payload(response).file_info 

360 if len(file_info) == 1: 

361 return DatasetRefURIs( 

362 primaryURI=convert_http_url_to_resource_path( 

363 file_info[0].url, self._connection.auth, file_info[0].auth 

364 ) 

365 ) 

366 else: 

367 components = {} 

368 for f in file_info: 

369 component = f.datastoreRecords.component 

370 if component is None: 

371 raise ValueError( 

372 f"DatasetId {response.dataset_ref.id} has a component file" 

373 " with no component name defined" 

374 ) 

375 components[component] = convert_http_url_to_resource_path( 

376 f.url, self._connection.auth, f.auth 

377 ) 

378 return DatasetRefURIs(componentURIs=components) 

379 

380 def get_dataset_type(self, name: str) -> DatasetType: 

381 with self._cache.access() as cache: 

382 if (cached_value := cache.dataset_types.get(name)) is not None: 

383 return cached_value 

384 

385 response = self._connection.get(f"dataset_type/{quote_path_variable(name)}") 

386 model = parse_model(response, GetDatasetTypeResponseModel) 

387 value = DatasetType.from_simple(model.dataset_type, universe=self.dimensions) 

388 with self._cache.access() as cache: 

389 return cache.dataset_types.setdefault(name, value) 

390 

391 def get_dataset( 

392 self, 

393 id: DatasetId | str, 

394 *, 

395 storage_class: str | StorageClass | None = None, 

396 dimension_records: bool = False, 

397 datastore_records: bool = False, 

398 ) -> DatasetRef | None: 

399 # datastore_records is intentionally ignored. It is an optimization 

400 # flag that only applies to DirectButler. 

401 path = f"dataset/{_to_uuid_string(id)}" 

402 response = self._connection.get(path, params={"dimension_records": bool(dimension_records)}) 

403 model = parse_model(response, FindDatasetResponseModel) 

404 if model.dataset_ref is None: 

405 return None 

406 ref = DatasetRef.from_simple(model.dataset_ref, universe=self.dimensions) 

407 if storage_class is not None: 

408 ref = ref.overrideStorageClass(storage_class) 

409 return ref 

410 

411 def get_many_datasets(self, ids: Iterable[DatasetId | str]) -> list[DatasetRef]: 

412 result = [] 

413 for batch in chunk_iterable(ids, GetManyDatasetsRequestModel.MAX_ITEMS_PER_REQUEST): 

414 request = GetManyDatasetsRequestModel(dataset_ids=batch) 

415 response = self._connection.post("datasets", request) 

416 model = parse_model(response, GetManyDatasetsResponseModel) 

417 refs = convert_dataset_ref_results(model, self.dimensions) 

418 result.extend(refs) 

419 return result 

420 

421 def find_dataset( 

422 self, 

423 dataset_type: DatasetType | str, 

424 data_id: DataId | None = None, 

425 *, 

426 collections: str | Sequence[str] | None = None, 

427 timespan: Timespan | None = None, 

428 storage_class: str | StorageClass | None = None, 

429 dimension_records: bool = False, 

430 datastore_records: bool = False, 

431 **kwargs: Any, 

432 ) -> DatasetRef | None: 

433 # datastore_records is intentionally ignored. It is an optimization 

434 # flag that only applies to DirectButler. 

435 

436 query = FindDatasetRequestModel( 

437 dataset_type=normalize_dataset_type_name(dataset_type), 

438 data_id=simplify_dataId(data_id, kwargs), 

439 default_data_id=self._serialize_default_data_id(), 

440 collections=self._normalize_collections(collections), 

441 timespan=timespan, 

442 dimension_records=dimension_records, 

443 ) 

444 

445 response = self._connection.post("find_dataset", query) 

446 

447 model = parse_model(response, FindDatasetResponseModel) 

448 if model.dataset_ref is None: 

449 return None 

450 

451 ref = DatasetRef.from_simple(model.dataset_ref, universe=self.dimensions) 

452 if isinstance(data_id, DataCoordinate) and data_id.hasRecords(): 

453 ref = ref.expanded(data_id) 

454 return apply_storage_class_override(ref, dataset_type, storage_class) 

455 

456 def _retrieve_artifacts( 

457 self, 

458 refs: Iterable[DatasetRef], 

459 destination: ResourcePathExpression, 

460 transfer: str = "auto", 

461 preserve_path: bool = True, 

462 overwrite: bool = False, 

463 write_index: bool = True, 

464 add_prefix: bool = False, 

465 ) -> dict[ResourcePath, ArtifactIndexInfo]: 

466 destination = ResourcePath(destination).abspath() 

467 if not destination.isdir(): 

468 raise ValueError(f"Destination location must refer to a directory. Given {destination}.") 

469 

470 if transfer not in ("auto", "copy"): 

471 raise ValueError("Only 'copy' and 'auto' transfer modes are supported.") 

472 

473 requested_ids = {ref.id for ref in refs} 

474 have_copied: dict[ResourcePath, ResourcePath] = {} 

475 artifact_map: dict[ResourcePath, ArtifactIndexInfo] = {} 

476 # Sort to ensure that in many refs to one file situation the same 

477 # ref is used for any prefix that might be added. 

478 for ref in sorted(refs): 

479 prefix = str(ref.id)[:8] + "-" if add_prefix else "" 

480 file_info = _to_file_payload(self._get_file_info_for_ref(ref)).file_info 

481 for file in file_info: 

482 source_uri = ResourcePath(str(file.url)) 

483 # For DECam/zip we only want to copy once. 

484 # For zip files we need to unpack so that they can be 

485 # zipped up again if needed. 

486 is_zip = source_uri.getExtension() == ".zip" and "zip-path" in source_uri.fragment 

487 cleaned_source_uri = source_uri.replace(fragment="", query="", params="") 

488 if is_zip: 

489 if cleaned_source_uri not in have_copied: 

490 zipped_artifacts = unpack_zips( 

491 [cleaned_source_uri], requested_ids, destination, preserve_path, overwrite 

492 ) 

493 artifact_map.update(zipped_artifacts) 

494 have_copied[cleaned_source_uri] = cleaned_source_uri 

495 elif cleaned_source_uri not in have_copied: 

496 relative_path = ResourcePath(file.datastoreRecords.path, forceAbsolute=False) 

497 target_uri = determine_destination_for_retrieved_artifact( 

498 destination, relative_path, preserve_path, prefix 

499 ) 

500 # Because signed URLs expire, we want to do the transfer 

501 # soon after retrieving the URL. 

502 target_uri.transfer_from(source_uri, transfer="copy", overwrite=overwrite) 

503 have_copied[cleaned_source_uri] = target_uri 

504 artifact_map[target_uri] = ArtifactIndexInfo.from_single(file.datastoreRecords, ref.id) 

505 else: 

506 target_uri = have_copied[cleaned_source_uri] 

507 artifact_map[target_uri].append(ref.id) 

508 

509 if write_index: 

510 index = ZipIndex.from_artifact_map(refs, artifact_map, destination) 

511 index.write_index(destination) 

512 

513 return artifact_map 

514 

515 def retrieve_artifacts_zip( 

516 self, 

517 refs: Iterable[DatasetRef], 

518 destination: ResourcePathExpression, 

519 overwrite: bool = True, 

520 ) -> ResourcePath: 

521 return retrieve_and_zip(refs, destination, self._retrieve_artifacts, overwrite) 

522 

523 def retrieveArtifacts( 

524 self, 

525 refs: Iterable[DatasetRef], 

526 destination: ResourcePathExpression, 

527 transfer: str = "auto", 

528 preserve_path: bool = True, 

529 overwrite: bool = False, 

530 ) -> list[ResourcePath]: 

531 artifact_map = self._retrieve_artifacts( 

532 refs, 

533 destination, 

534 transfer, 

535 preserve_path, 

536 overwrite, 

537 ) 

538 return list(artifact_map) 

539 

540 def exists( 

541 self, 

542 dataset_ref_or_type: DatasetRef | DatasetType | str, 

543 /, 

544 data_id: DataId | None = None, 

545 *, 

546 full_check: bool = True, 

547 collections: Any = None, 

548 **kwargs: Any, 

549 ) -> DatasetExistence: 

550 try: 

551 response = self._get_file_info( 

552 dataset_ref_or_type, dataId=data_id, collections=collections, timespan=None, kwargs=kwargs 

553 ) 

554 except DatasetNotFoundError: 

555 return DatasetExistence.UNRECOGNIZED 

556 

557 if response.artifact is None: 

558 if full_check: 

559 return DatasetExistence.RECORDED 

560 else: 

561 return DatasetExistence.RECORDED | DatasetExistence._ASSUMED 

562 

563 if full_check: 

564 for file in response.artifact.file_info: 

565 if not ResourcePath(str(file.url)).exists(): 

566 return DatasetExistence.RECORDED | DatasetExistence.DATASTORE 

567 return DatasetExistence.VERIFIED 

568 else: 

569 return DatasetExistence.KNOWN 

570 

571 def _exists_many( 

572 self, 

573 refs: Iterable[DatasetRef], 

574 /, 

575 *, 

576 full_check: bool = True, 

577 ) -> dict[DatasetRef, DatasetExistence]: 

578 return {ref: self.exists(ref, full_check=full_check) for ref in refs} 

579 

580 def removeRuns( 

581 self, 

582 names: Iterable[str], 

583 unstore: bool | type[_DeprecatedDefault] = _DeprecatedDefault, 

584 *, 

585 unlink_from_chains: bool = False, 

586 ) -> None: 

587 # Docstring inherited. 

588 raise NotImplementedError() 

589 

590 def ingest( 

591 self, 

592 *datasets: FileDataset, 

593 transfer: str | None = "auto", 

594 record_validation_info: bool = True, 

595 skip_existing: bool = False, 

596 ) -> None: 

597 # Docstring inherited. 

598 raise NotImplementedError() 

599 

600 def ingest_zip( 

601 self, 

602 zip_file: ResourcePathExpression, 

603 transfer: str = "auto", 

604 *, 

605 transfer_dimensions: bool = False, 

606 dry_run: bool = False, 

607 skip_existing: bool = False, 

608 ) -> None: 

609 # Docstring inherited. 

610 raise NotImplementedError() 

611 

612 def export( 

613 self, 

614 *, 

615 directory: str | None = None, 

616 filename: str | None = None, 

617 format: str | None = None, 

618 transfer: str | None = None, 

619 ) -> AbstractContextManager[RepoExportContext]: 

620 # Docstring inherited. 

621 raise NotImplementedError() 

622 

623 def import_( 

624 self, 

625 *, 

626 directory: ResourcePathExpression | None = None, 

627 filename: ResourcePathExpression | TextIO | None = None, 

628 format: str | None = None, 

629 transfer: str | None = None, 

630 skip_dimensions: set | None = None, 

631 record_validation_info: bool = True, 

632 without_datastore: bool = False, 

633 ) -> None: 

634 # Docstring inherited. 

635 raise NotImplementedError() 

636 

637 def transfer_dimension_records_from( 

638 self, source_butler: LimitedButler | Butler, source_refs: Iterable[DatasetRef | DataCoordinate] 

639 ) -> None: 

640 # Docstring inherited. 

641 raise NotImplementedError() 

642 

643 def transfer_from( 

644 self, 

645 source_butler: LimitedButler, 

646 source_refs: Iterable[DatasetRef], 

647 transfer: str = "auto", 

648 skip_missing: bool = True, 

649 register_dataset_types: bool = False, 

650 transfer_dimensions: bool = False, 

651 dry_run: bool = False, 

652 ) -> Collection[DatasetRef]: 

653 # Docstring inherited. 

654 raise NotImplementedError() 

655 

656 def validateConfiguration( 

657 self, 

658 logFailures: bool = False, 

659 datasetTypeNames: Iterable[str] | None = None, 

660 ignore: Iterable[str] | None = None, 

661 ) -> None: 

662 # Docstring inherited. 

663 raise NotImplementedError() 

664 

665 @property 

666 def run(self) -> str | None: 

667 # Docstring inherited. 

668 return self._registry_defaults.get().run 

669 

670 @property 

671 def registry(self) -> Registry: 

672 return self._registry 

673 

674 @contextmanager 

675 def query(self) -> Iterator[Query]: 

676 driver = RemoteQueryDriver(self, self._connection) 

677 with driver: 

678 query = Query(driver) 

679 yield query 

680 

681 @contextmanager 

682 def _query_all_datasets_by_page( 

683 self, args: QueryAllDatasetsParameters 

684 ) -> Iterator[Iterator[list[DatasetRef]]]: 

685 universe = self.dimensions 

686 

687 request = QueryAllDatasetsRequestModel( 

688 collections=self._normalize_collections(args.collections), 

689 name=[normalize_dataset_type_name(name) for name in args.name], 

690 find_first=args.find_first, 

691 data_id=simplify_dataId(args.data_id, args.kwargs), 

692 default_data_id=self._serialize_default_data_id(), 

693 where=args.where, 

694 bind={k: make_column_literal(v) for k, v in args.bind.items()}, 

695 limit=args.limit, 

696 with_dimension_records=args.with_dimension_records, 

697 ) 

698 with self._connection.post_with_stream_response("query/all_datasets", request) as response: 

699 pages = read_query_results(response) 

700 yield (convert_dataset_ref_results(page, universe) for page in pages) 

701 

702 def pruneDatasets( 

703 self, 

704 refs: Iterable[DatasetRef], 

705 *, 

706 disassociate: bool = True, 

707 unstore: bool = False, 

708 tags: Iterable[str] = (), 

709 purge: bool = False, 

710 ) -> None: 

711 # Docstring inherited. 

712 raise NotImplementedError() 

713 

714 def _normalize_collections(self, collections: CollectionArgType | None) -> CollectionList: 

715 """Convert the ``collections`` parameter in the format used by Butler 

716 methods to a standardized format for the REST API. 

717 """ 

718 if collections is None: 

719 if not self.collections.defaults: 

720 raise NoDefaultCollectionError( 

721 "No collections provided, and no defaults from butler construction." 

722 ) 

723 collections = self.collections.defaults 

724 return convert_collection_arg_to_glob_string_list(collections) 

725 

726 def clone( 

727 self, 

728 *, 

729 collections: CollectionArgType | None | EllipsisType = ..., 

730 run: str | None | EllipsisType = ..., 

731 inferDefaults: bool | EllipsisType = ..., 

732 dataId: dict[str, str] | EllipsisType = ..., 

733 metrics: ButlerMetrics | None = None, 

734 ) -> RemoteButler: 

735 defaults = self._registry_defaults.get().clone(collections, run, inferDefaults, dataId) 

736 return RemoteButler( 

737 connection=self._connection, cache=self._cache, defaults=defaults, metrics=metrics 

738 ) 

739 

740 def close(self) -> None: 

741 pass 

742 

743 def _expand_data_ids(self, data_ids: Iterable[DataCoordinate]) -> list[DataCoordinate]: 

744 return expand_data_ids(data_ids, self.dimensions, self.query, None) 

745 

746 @property 

747 def _file_transfer_source(self) -> RemoteFileTransferSource: 

748 return RemoteFileTransferSource(self._connection) 

749 

750 def __str__(self) -> str: 

751 return f"RemoteButler({self._connection.server_url})" 

752 

753 def _serialize_default_data_id(self) -> SerializedDataId: 

754 """Convert the default data ID to a serializable format.""" 

755 # In an ideal world, the default data ID would just get combined with 

756 # the rest of the data ID on the client side instead of being sent 

757 # separately to the server. Unfortunately, that requires knowledge of 

758 # the DatasetType's dimensions which we don't always have available on 

759 # the client. Data ID values can be specified indirectly by "implied" 

760 # dimensions, but knowing what things are implied depends on what the 

761 # required dimensions are. 

762 

763 return self._registry_defaults.get().dataId.to_simple(minimal=True).dataId 

764 

765 

766def _to_file_payload(get_file_response: GetFileResponseModel) -> FileInfoPayload: 

767 if get_file_response.artifact is None: 

768 ref = get_file_response.dataset_ref 

769 raise DatasetNotFoundError(f"Dataset is known, but artifact is not available. (datasetId='{ref.id}')") 

770 

771 return get_file_response.artifact 

772 

773 

774def _to_uuid_string(id: uuid.UUID | str) -> str: 

775 """Convert a UUID, or string parseable as a UUID, into a string formatted 

776 like '1481269e-4c8d-4696-bcca-d1b4c9005d06' 

777 """ 

778 return str(uuid.UUID(str(id))) 

779 

780 

781class _RemoteButlerCacheData: 

782 def __init__(self) -> None: 

783 self.dimensions: DimensionUniverse | None = None 

784 self.dataset_types: dict[str, DatasetType] = {} 

785 

786 

787class RemoteButlerCache(LockedObject[_RemoteButlerCacheData]): 

788 def __init__(self) -> None: 

789 super().__init__(_RemoteButlerCacheData())