Coverage for python / lsst / daf / butler / remote_butler / _remote_file_transfer_source.py: 0%

43 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-28 08:36 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30__all__ = ["RemoteFileTransferSource"] 

31 

32from collections.abc import Callable, Iterable, Iterator 

33from contextlib import contextmanager 

34from typing import Any, cast 

35 

36from lsst.daf.butler._dataset_ref import DatasetId, DatasetRef 

37from lsst.resources import ResourcePath 

38from lsst.resources.http import HttpResourcePath 

39from lsst.utils.iteration import chunk_iterable 

40 

41from .._location import Location 

42from ..datastore import FileTransferMap, FileTransferRecord, FileTransferSource 

43from ..datastore.stored_file_info import StoredFileInfo 

44from ._get import convert_http_url_to_resource_path 

45from ._http_connection import RemoteButlerHttpConnection, parse_model 

46from .server_models import ( 

47 FileTransferRecordModel, 

48 GetFileTransferInfoRequestModel, 

49 GetFileTransferInfoResponseModel, 

50) 

51 

52 

53class RemoteFileTransferSource(FileTransferSource): 

54 """Implementation of `FileTransferSource` that retrieves information from 

55 Butler server. 

56 

57 Parameters 

58 ---------- 

59 connection : `RemoteButlerHttpConnection` 

60 HTTP connection used to access the Butler server. 

61 """ 

62 

63 def __init__(self, connection: RemoteButlerHttpConnection) -> None: 

64 self._connection = connection 

65 self.name = f"RemoteFileTransferSource{connection.server_url}" 

66 

67 def get_file_info_for_transfer(self, dataset_ids: Iterable[DatasetId]) -> FileTransferMap: 

68 output: FileTransferMap = {} 

69 for chunk in chunk_iterable(dataset_ids, GetFileTransferInfoRequestModel.MAX_ITEMS_PER_REQUEST): 

70 request = GetFileTransferInfoRequestModel(dataset_ids=chunk) 

71 response = self._connection.post("file_transfer", request) 

72 model = parse_model(response, GetFileTransferInfoResponseModel) 

73 for id, records in model.files.items(): 

74 output[id] = [self._deserialize_file_transfer_record(r) for r in records] 

75 

76 return output 

77 

78 def locate_missing_files_for_transfer( 

79 self, refs: Iterable[DatasetRef], artifact_existence: dict[ResourcePath, bool] 

80 ) -> FileTransferMap: 

81 # The server does not provide an alternate way to look up files that 

82 # could not be found using the file transfer endpoint. 

83 return {} 

84 

85 def _deserialize_file_transfer_record(self, record: FileTransferRecordModel) -> FileTransferRecord: 

86 resource_path = convert_http_url_to_resource_path(record.url, self._connection.auth, record.auth) 

87 resource_path = _tweak_uri_for_unit_test(resource_path) 

88 

89 return FileTransferRecord( 

90 location=Location(None, resource_path), 

91 file_info=StoredFileInfo.from_simple(record.file_info), 

92 ) 

93 

94 

95def _tweak_uri_for_unit_test(path: ResourcePath) -> ResourcePath: 

96 # Provide a place for unit tests to hook in and modify URLs, since there is 

97 # no actual HTTP server reachable via a domain name during testing. 

98 return path 

99 

100 

101@contextmanager 

102def mock_file_transfer_uris_for_unit_test( 

103 callback: Callable[[HttpResourcePath], HttpResourcePath], 

104) -> Iterator[None]: 

105 """Hooks into the RemoteButler file transfer logic to modify URLs for unit 

106 testing. The given callback will be used to transform file download URLs 

107 before attempting to access them. 

108 

109 Parameters 

110 ---------- 

111 callback : `~collections.abc.Callable` 

112 A function that takes an `HttpResourcePath` as its only parameter and 

113 returns a modified `HttpResourcePath`. 

114 """ 

115 global _tweak_uri_for_unit_test 

116 orig = _tweak_uri_for_unit_test 

117 _tweak_uri_for_unit_test = cast(Any, callback) 

118 try: 

119 yield 

120 finally: 

121 _tweak_uri_for_unit_test = orig