Coverage for python / lsst / resources / _resourceHandles / _httpResourceHandle.py: 23%

131 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 08:38 +0000

1# This file is part of lsst-resources. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14__all__ = ("HttpReadResourceHandle",) 

15 

16import io 

17import logging 

18import re 

19from collections.abc import Callable, Iterable 

20from typing import TYPE_CHECKING, AnyStr, NamedTuple 

21 

22import requests 

23 

24from lsst.utils.timer import time_this 

25 

26from ._baseResourceHandle import BaseResourceHandle, CloseStatus 

27 

28if TYPE_CHECKING: 

29 from ..http import HttpResourcePath 

30 

31 

32# Prevent circular import by copying this code. Can be removed as soon 

33# as separate dav implementation is implemented. 

34def _dav_to_http(url: str) -> str: 

35 """Convert dav scheme in URL to http scheme.""" 

36 if url.startswith("dav"): 

37 url = "http" + url.removeprefix("dav") 

38 return url 

39 

40 

41class HttpReadResourceHandle(BaseResourceHandle[bytes]): 

42 """HTTP-based specialization of `.BaseResourceHandle`. 

43 

44 Parameters 

45 ---------- 

46 mode : `str` 

47 Handle modes as described in the python `io` module. 

48 log : `~logging.Logger` 

49 Logger to used when writing messages. 

50 uri : `lsst.resources.http.HttpResourcePath` 

51 URI of remote resource. 

52 timeout : `tuple` [`int`, `int`] 

53 Timeout to use for connections: connection timeout and read timeout 

54 in a tuple. 

55 newline : `str` or `None`, optional 

56 When doing multiline operations, break the stream on given character. 

57 Defaults to newline. If a file is opened in binary mode, this argument 

58 is not used, as binary files will only split lines on the binary 

59 newline representation. 

60 """ 

61 

62 def __init__( 

63 self, 

64 mode: str, 

65 log: logging.Logger, 

66 uri: HttpResourcePath, 

67 *, 

68 timeout: tuple[float, float] | None = None, 

69 newline: AnyStr | None = None, 

70 ) -> None: 

71 super().__init__(mode, log, uri, newline=newline) 

72 self._url = uri.geturl() 

73 self._session = uri.data_session 

74 

75 if timeout is None: 

76 raise ValueError("timeout must be specified when constructing this object") 

77 self._timeout = timeout 

78 

79 self._completeBuffer: io.BytesIO | None = None 

80 

81 self._closed = CloseStatus.OPEN 

82 self._current_position = 0 

83 self._eof = False 

84 self._total_size = -1 # Unknown 

85 

86 def close(self) -> None: 

87 self._closed = CloseStatus.CLOSED 

88 self._completeBuffer = None 

89 self._eof = True 

90 

91 @property 

92 def closed(self) -> bool: 

93 return self._closed == CloseStatus.CLOSED 

94 

95 def fileno(self) -> int: 

96 raise io.UnsupportedOperation("HttpReadResourceHandle does not have a file number") 

97 

98 def flush(self) -> None: 

99 modes = set(self._mode) 

100 if {"w", "x", "a", "+"} & modes: 

101 raise io.UnsupportedOperation("HttpReadResourceHandles are read only") 

102 

103 @property 

104 def isatty(self) -> bool | Callable[[], bool]: 

105 return False 

106 

107 def readable(self) -> bool: 

108 return True 

109 

110 def readline(self, size: int = -1) -> bytes: 

111 raise io.UnsupportedOperation("HttpReadResourceHandles Do not support line by line reading") 

112 

113 def readlines(self, size: int = -1) -> Iterable[bytes]: 

114 raise io.UnsupportedOperation("HttpReadResourceHandles Do not support line by line reading") 

115 

116 def _size(self) -> int: 

117 if self._total_size == -1: 

118 self._total_size = self._uri.size() 

119 return self._total_size 

120 

121 def seek(self, offset: int, whence: int = io.SEEK_SET) -> int: 

122 self._eof = False 

123 if whence == io.SEEK_CUR and (self._current_position + offset) >= 0: 

124 self._current_position += offset 

125 elif whence == io.SEEK_SET and offset >= 0: 

126 self._current_position = offset 

127 elif whence == io.SEEK_END: 

128 self._current_position = self._size() + offset 

129 else: 

130 raise io.UnsupportedOperation("Seek value is incorrect, or whence mode is unsupported") 

131 

132 # handle if the complete file has be read already 

133 if self._completeBuffer is not None: 

134 self._completeBuffer.seek(self._current_position, whence) 

135 return self._current_position 

136 

137 def seekable(self) -> bool: 

138 return True 

139 

140 def tell(self) -> int: 

141 return self._current_position 

142 

143 def truncate(self, size: int | None = None) -> int: 

144 raise io.UnsupportedOperation("HttpReadResourceHandles Do not support truncation") 

145 

146 def writable(self) -> bool: 

147 return False 

148 

149 def write(self, b: bytes, /) -> int: 

150 raise io.UnsupportedOperation("HttpReadResourceHandles are read only") 

151 

152 def writelines(self, b: Iterable[bytes], /) -> None: 

153 raise io.UnsupportedOperation("HttpReadResourceHandles are read only") 

154 

155 def read(self, size: int = -1) -> bytes: 

156 if self._eof: 

157 # At EOF so always return an empty byte string. 

158 return b"" 

159 

160 # branch for if the complete file has been read before 

161 if self._completeBuffer is not None: 

162 result = self._completeBuffer.read(size) 

163 self._current_position += len(result) 

164 return result 

165 

166 if self._completeBuffer is None and size == -1 and self._current_position == 0: 

167 # The whole file has been requested, read it into a buffer and 

168 # return the result 

169 self._completeBuffer = io.BytesIO() 

170 with time_this(self._log, msg="Read from remote resource %s", args=(self._url,)): 

171 with self._session as session: 

172 resp = session.get(_dav_to_http(self._url), stream=False, timeout=self._timeout) 

173 

174 if (code := resp.status_code) not in (requests.codes.ok, requests.codes.partial): 

175 raise FileNotFoundError(f"Unable to read resource {self._url}; status code: {code}") 

176 self._completeBuffer.write(resp.content) 

177 self._current_position = self._completeBuffer.tell() 

178 

179 return self._completeBuffer.getbuffer().tobytes() 

180 

181 # A partial read is required, either because a size has been specified, 

182 # or a read has previously been done. Any time we specify a byte range 

183 # we must disable the gzip compression on the server since we want 

184 # to address ranges in the uncompressed file. If we send ranges that 

185 # are interpreted by the server as offsets into the compressed file 

186 # then that is at least confusing and also there is no guarantee that 

187 # the bytes can be uncompressed. 

188 

189 end_pos = self._current_position + (size - 1) if size >= 0 else "" 

190 headers = {"Range": f"bytes={self._current_position}-{end_pos}", "Accept-Encoding": "identity"} 

191 

192 with time_this( 

193 self._log, msg="Read from remote resource %s using headers %s", args=(self._url, headers) 

194 ): 

195 with self._session as session: 

196 resp = session.get( 

197 _dav_to_http(self._url), stream=False, timeout=self._timeout, headers=headers 

198 ) 

199 

200 if resp.status_code == requests.codes.range_not_satisfiable: 

201 # Must have run off the end of the file. A standard file handle 

202 # will treat this as EOF so be consistent with that. Do not change 

203 # the current position. 

204 self._eof = True 

205 return b"" 

206 

207 if (code := resp.status_code) not in (requests.codes.ok, requests.codes.partial): 

208 raise FileNotFoundError( 

209 f"Unable to read resource {self._url}, or bytes are out of range; status code: {code}" 

210 ) 

211 

212 # The response header should tell us the total number of bytes 

213 # in the file and also the current position we have got to in the 

214 # server. 

215 if "Content-Range" in resp.headers: 

216 content_range = parse_content_range_header(resp.headers["Content-Range"]) 

217 if content_range.total is not None: 

218 # Store in case we need this later. 

219 self._total_size = content_range.total 

220 if ( 

221 content_range.total is not None 

222 and content_range.range_end is not None 

223 and content_range.range_end >= content_range.total - 1 

224 ): 

225 self._eof = True 

226 

227 # Try to guess that we overran the end. This will not help if we 

228 # read exactly the number of bytes to get us to the end and so we 

229 # will need to do one more read and get a 416. 

230 len_content = len(resp.content) 

231 if len_content < size: 

232 self._eof = True 

233 

234 self._current_position += len_content 

235 return resp.content 

236 

237 

238class ContentRange(NamedTuple): 

239 """Represents the data in an HTTP Content-Range header.""" 

240 

241 range_start: int | None 

242 """First byte of the zero-indexed, inclusive range returned by this 

243 response. `None` if the range was not available in the header. 

244 """ 

245 range_end: int | None 

246 """Last byte of the zero-indexed, inclusive range returned by this 

247 response. `None` if the range was not available in the header. 

248 """ 

249 total: int | None 

250 """Total size of the file in bytes. `None` if the file size was not 

251 available in the header. 

252 """ 

253 

254 

255def parse_content_range_header(header: str) -> ContentRange: 

256 """Parse an HTTP 'Content-Range' header. 

257 

258 Parameters 

259 ---------- 

260 header : `str` 

261 Value of an HTTP Content-Range header to be parsed. 

262 

263 Returns 

264 ------- 

265 content_range : `ContentRange` 

266 The byte range included in the response and the total file size. 

267 

268 Raises 

269 ------ 

270 ValueError 

271 If the header was not in the expected format. 

272 """ 

273 # There are three possible formats for Content-Range. All of them start 

274 # with optional whitespace and a unit, which for our purposes should always 

275 # be "bytes". 

276 prefix = r"^\s*bytes\s+" 

277 

278 # Content-Range: <unit> <range-start>-<range-end>/<size> 

279 if (case1 := re.match(prefix + r"(\d+)-(\d+)/(\d+)", header)) is not None: 

280 return ContentRange( 

281 range_start=int(case1.group(1)), range_end=int(case1.group(2)), total=int(case1.group(3)) 

282 ) 

283 

284 # Content-Range: <unit> <range-start>-<range-end>/* 

285 if (case2 := re.match(prefix + r"(\d+)-(\d+)/\*", header)) is not None: 

286 return ContentRange(range_start=int(case2.group(1)), range_end=int(case2.group(2)), total=None) 

287 

288 # Content-Range: <unit> */<size> 

289 if (case3 := re.match(prefix + r"\*/(\d+)", header)) is not None: 

290 return ContentRange(range_start=None, range_end=None, total=int(case3.group(1))) 

291 

292 raise ValueError(f"Content-Range header in unexpected format: '{header}'")