Coverage for python / lsst / resources / proxied.py: 37%

85 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-26 08:44 +0000

1# This file is part of lsst-resources. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14__all__ = ("ProxiedResourcePath",) 

15 

16import contextlib 

17import dataclasses 

18import logging 

19import re 

20from abc import ABC, abstractmethod 

21from collections.abc import Iterator 

22 

23from ._resourcePath import ResourceHandleProtocol, ResourceInfo, ResourcePath, ResourcePathExpression 

24from .utils import TransactionProtocol 

25 

26try: 

27 import fsspec 

28 from fsspec.spec import AbstractFileSystem 

29except ImportError: 

30 fsspec = None 

31 AbstractFileSystem = type 

32 

33 

34log = logging.getLogger(__name__) 

35 

36 

37class ProxiedResourcePath(ABC, ResourcePath): 

38 """URI that is represented internally by another type of URI for file I/O. 

39 

40 For example ``abc://xyz/file.txt`` could be the public URI form but 

41 internally all file access is forwarded to a ``file`` URI. 

42 """ 

43 

44 _proxy: ResourcePath | None = None 

45 

46 @abstractmethod 

47 def _set_proxy(self) -> None: 

48 """Calculate the internal `ResourcePath` corresponding to the public 

49 version. 

50 """ 

51 raise NotImplementedError("Proxy must be configured") 

52 

53 def _get_proxy(self) -> ResourcePath: 

54 """Retrieve the proxied ResourcePath.""" 

55 proxy = self._proxy 

56 if proxy is None: 

57 raise FileNotFoundError(f"Internal error: No proxy ResourcePath available for {self}") 

58 return proxy 

59 

60 def to_fsspec(self) -> tuple[AbstractFileSystem, str]: 

61 try: 

62 proxy = self._get_proxy() 

63 except FileNotFoundError: 

64 raise NotImplementedError(f"No proxy registered for {self}. Resource does not exist.") from None 

65 return proxy.to_fsspec() 

66 

67 def isdir(self) -> bool: 

68 if self.dirLike is None: 

69 try: 

70 proxy = self._get_proxy() 

71 except FileNotFoundError: 

72 return False 

73 self.dirLike = proxy.isdir() 

74 return self.dirLike 

75 

76 def exists(self) -> bool: 

77 try: 

78 proxy = self._get_proxy() 

79 except FileNotFoundError: 

80 # If there is no proxy registered then the resource can not exist. 

81 return False 

82 return proxy.exists() 

83 

84 def remove(self) -> None: 

85 proxy = self._get_proxy() 

86 proxy.remove() 

87 

88 def read(self, size: int = -1) -> bytes: 

89 proxy = self._get_proxy() 

90 return proxy.read(size=size) 

91 

92 @contextlib.contextmanager 

93 def as_local( 

94 self, multithreaded: bool = True, tmpdir: ResourcePathExpression | None = None 

95 ) -> Iterator[ResourcePath]: 

96 proxy = self._get_proxy() 

97 with proxy.as_local(multithreaded=multithreaded, tmpdir=tmpdir) as loc: 

98 yield loc 

99 

100 @contextlib.contextmanager 

101 def open( 

102 self, 

103 mode: str = "r", 

104 *, 

105 encoding: str | None = None, 

106 prefer_file_temporary: bool = False, 

107 ) -> Iterator[ResourceHandleProtocol]: 

108 proxy = self._get_proxy() 

109 with proxy.open(mode, encoding=encoding, prefer_file_temporary=prefer_file_temporary) as fh: 

110 yield fh 

111 

112 def walk( 

113 self, file_filter: str | re.Pattern | None = None 

114 ) -> Iterator[list | tuple[ResourcePath, list[str], list[str]]]: 

115 try: 

116 proxy = self._get_proxy() 

117 except FileNotFoundError as e: 

118 raise ValueError(str(e)) from None 

119 for proxied_root, dirs, files in proxy.walk(file_filter=file_filter): 

120 # Need to return the directory in the original form and not the 

121 # proxy form. 

122 relative_to_self = proxied_root.path.removeprefix(proxy.path) 

123 root = self.replace(path=self._pathModule.join(self.path, relative_to_self)) 

124 yield root, dirs, files 

125 

126 def size(self) -> int: 

127 proxy = self._get_proxy() 

128 return proxy.size() 

129 

130 def get_info(self) -> ResourceInfo: 

131 proxy = self._get_proxy() 

132 return dataclasses.replace(proxy.get_info(), uri=str(self)) 

133 

134 def write(self, data: bytes, overwrite: bool = True) -> None: 

135 proxy = self._get_proxy() 

136 proxy.write(data, overwrite=overwrite) 

137 

138 def mkdir(self) -> None: 

139 proxy = self._get_proxy() 

140 proxy.mkdir() 

141 

142 def transfer_from( 

143 self, 

144 src: ResourcePath, 

145 transfer: str = "copy", 

146 overwrite: bool = False, 

147 transaction: TransactionProtocol | None = None, 

148 multithreaded: bool = True, 

149 ) -> None: 

150 proxy = self._get_proxy() 

151 proxy.transfer_from( 

152 src, transfer=transfer, overwrite=overwrite, transaction=transaction, multithreaded=multithreaded 

153 )