Coverage for python / lsst / daf / butler / remote_butler / _remote_file_transfer_source.py: 0%
43 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-24 08:16 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-24 08:16 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30__all__ = ["RemoteFileTransferSource"]
32from collections.abc import Callable, Iterable, Iterator
33from contextlib import contextmanager
34from typing import Any, cast
36from lsst.daf.butler._dataset_ref import DatasetId, DatasetRef
37from lsst.resources import ResourcePath
38from lsst.resources.http import HttpResourcePath
39from lsst.utils.iteration import chunk_iterable
41from .._location import Location
42from ..datastore import FileTransferMap, FileTransferRecord, FileTransferSource
43from ..datastore.stored_file_info import StoredFileInfo
44from ._get import convert_http_url_to_resource_path
45from ._http_connection import RemoteButlerHttpConnection, parse_model
46from .server_models import (
47 FileTransferRecordModel,
48 GetFileTransferInfoRequestModel,
49 GetFileTransferInfoResponseModel,
50)
53class RemoteFileTransferSource(FileTransferSource):
54 """Implementation of `FileTransferSource` that retrieves information from
55 Butler server.
57 Parameters
58 ----------
59 connection : `RemoteButlerHttpConnection`
60 HTTP connection used to access the Butler server.
61 """
63 def __init__(self, connection: RemoteButlerHttpConnection) -> None:
64 self._connection = connection
65 self.name = f"RemoteFileTransferSource{connection.server_url}"
67 def get_file_info_for_transfer(self, dataset_ids: Iterable[DatasetId]) -> FileTransferMap:
68 output: FileTransferMap = {}
69 for chunk in chunk_iterable(dataset_ids, GetFileTransferInfoRequestModel.MAX_ITEMS_PER_REQUEST):
70 request = GetFileTransferInfoRequestModel(dataset_ids=chunk)
71 response = self._connection.post("file_transfer", request)
72 model = parse_model(response, GetFileTransferInfoResponseModel)
73 for id, records in model.files.items():
74 output[id] = [self._deserialize_file_transfer_record(r) for r in records]
76 return output
78 def locate_missing_files_for_transfer(
79 self, refs: Iterable[DatasetRef], artifact_existence: dict[ResourcePath, bool]
80 ) -> FileTransferMap:
81 # The server does not provide an alternate way to look up files that
82 # could not be found using the file transfer endpoint.
83 return {}
85 def _deserialize_file_transfer_record(self, record: FileTransferRecordModel) -> FileTransferRecord:
86 resource_path = convert_http_url_to_resource_path(record.url, self._connection.auth, record.auth)
87 resource_path = _tweak_uri_for_unit_test(resource_path)
89 return FileTransferRecord(
90 location=Location(None, resource_path),
91 file_info=StoredFileInfo.from_simple(record.file_info),
92 )
95def _tweak_uri_for_unit_test(path: ResourcePath) -> ResourcePath:
96 # Provide a place for unit tests to hook in and modify URLs, since there is
97 # no actual HTTP server reachable via a domain name during testing.
98 return path
101@contextmanager
102def mock_file_transfer_uris_for_unit_test(
103 callback: Callable[[HttpResourcePath], HttpResourcePath],
104) -> Iterator[None]:
105 """Hooks into the RemoteButler file transfer logic to modify URLs for unit
106 testing. The given callback will be used to transform file download URLs
107 before attempting to access them.
109 Parameters
110 ----------
111 callback : `~collections.abc.Callable`
112 A function that takes an `HttpResourcePath` as its only parameter and
113 returns a modified `HttpResourcePath`.
114 """
115 global _tweak_uri_for_unit_test
116 orig = _tweak_uri_for_unit_test
117 _tweak_uri_for_unit_test = cast(Any, callback)
118 try:
119 yield
120 finally:
121 _tweak_uri_for_unit_test = orig