Coverage for python / lsst / resources / _resourceHandles / _davResourceHandle.py: 24%

122 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 08:38 +0000

1# This file is part of lsst-resources. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14__all__ = ("DavReadResourceHandle",) 

15 

16import io 

17import logging 

18from collections.abc import Callable, Iterable 

19from typing import TYPE_CHECKING, AnyStr 

20 

21from ..davutils import DavClient, redact_url 

22from ._baseResourceHandle import BaseResourceHandle, CloseStatus 

23 

24if TYPE_CHECKING: 

25 from ..dav import DavResourcePath 

26 

27 

28class DavReadResourceHandle(BaseResourceHandle[bytes]): 

29 """WebDAV-based specialization of `.BaseResourceHandle`. 

30 

31 Parameters 

32 ---------- 

33 mode : `str` 

34 Handle modes as described in the python `io` module. 

35 log : `~logging.Logger` 

36 Logger to used when writing messages. 

37 uri : `lsst.resources.dav.DavResourcePath` 

38 URI of remote resource. 

39 file_size : `int` 

40 Size of the remote file, in bytes. 

41 newline : `str` or `None`, optional 

42 When doing multiline operations, break the stream on given character. 

43 Defaults to newline. If a file is opened in binary mode, this argument 

44 is not used, as binary files will only split lines on the binary 

45 newline representation. 

46 """ 

47 

48 def __init__( 

49 self, 

50 mode: str, 

51 log: logging.Logger, 

52 uri: DavResourcePath, 

53 file_size: int, 

54 *, 

55 newline: AnyStr | None = None, 

56 ) -> None: 

57 super().__init__(mode, log, uri, newline=newline) 

58 self._uri: DavResourcePath = uri 

59 self._client: DavClient = self._uri._client 

60 self._filesize: int = file_size 

61 self._closed = CloseStatus.OPEN 

62 self._current_position = 0 

63 self._cache: DavReadAheadCache = DavReadAheadCache( 

64 client=self._client, 

65 url=self._uri._internal_url, 

66 filesize=self._filesize, 

67 blocksize=self._uri._client._config.block_size, 

68 log=log, 

69 ) 

70 self._log.debug("initializing read handle for %s [%#x]", self._uri, id(self)) 

71 

72 def close(self) -> None: 

73 if self._closed != CloseStatus.CLOSED: 

74 self._log.debug("closing read handle for %s [%#x]", self._uri, id(self)) 

75 self._cache.close() 

76 self._closed = CloseStatus.CLOSED 

77 

78 @property 

79 def closed(self) -> bool: 

80 return self._closed == CloseStatus.CLOSED 

81 

82 def fileno(self) -> int: 

83 raise io.UnsupportedOperation("DavReadResourceHandle does not have a file number") 

84 

85 def flush(self) -> None: 

86 modes = set(self._mode) 

87 if {"w", "x", "a", "+"} & modes: 

88 raise io.UnsupportedOperation("DavReadResourceHandles are read only") 

89 

90 @property 

91 def isatty(self) -> bool | Callable[[], bool]: 

92 return False 

93 

94 def readable(self) -> bool: 

95 return True 

96 

97 def readline(self, size: int = -1) -> bytes: 

98 raise io.UnsupportedOperation("DavReadResourceHandles Do not support line by line reading") 

99 

100 def readlines(self, hint: int = -1) -> Iterable[bytes]: 

101 raise io.UnsupportedOperation("DavReadResourceHandles Do not support line by line reading") 

102 

103 def seek(self, offset: int, whence: int = io.SEEK_SET) -> int: 

104 self._log.debug( 

105 "handle seek for %s: offset=%d, whence=%d, current_position=%d", 

106 self._uri, 

107 offset, 

108 whence, 

109 self._current_position, 

110 ) 

111 

112 match whence: 

113 case io.SEEK_SET: 

114 if offset < 0: 

115 raise ValueError(f"negative seek value {offset}") 

116 self._current_position = offset 

117 case io.SEEK_CUR: 

118 self._current_position += offset 

119 case io.SEEK_END: 

120 self._current_position = self._filesize + offset 

121 case _: 

122 raise ValueError(f"unexpected value {whence} for whence in seek()") 

123 

124 if self._current_position < 0: 

125 self._current_position = 0 

126 

127 return self._current_position 

128 

129 def seekable(self) -> bool: 

130 return True 

131 

132 def tell(self) -> int: 

133 return self._current_position 

134 

135 def truncate(self, size: int | None = None) -> int: 

136 raise io.UnsupportedOperation("DavReadResourceHandles Do not support truncation") 

137 

138 def writable(self) -> bool: 

139 return False 

140 

141 def write(self, b: bytes, /) -> int: 

142 raise io.UnsupportedOperation("DavReadResourceHandles are read only") 

143 

144 def writelines(self, b: Iterable[bytes], /) -> None: 

145 raise io.UnsupportedOperation("DavReadResourceHandles are read only") 

146 

147 @property 

148 def _eof(self) -> bool: 

149 return self._current_position >= self._filesize 

150 

151 def read(self, size: int = -1) -> bytes: 

152 self._log.debug( 

153 "handle read for %s: filesize=%d, current_position=%d, size=%d", 

154 self._uri, 

155 self._filesize, 

156 self._current_position, 

157 size, 

158 ) 

159 

160 if self.closed: 

161 raise ValueError("I/O operation on closed file") 

162 

163 if size == 0 or self._eof: 

164 return b"" 

165 

166 if size < 0: 

167 # Read up to the end of the file 

168 size = self._filesize - self._current_position 

169 

170 output = self._cache.fetch(start=self._current_position, end=self._current_position + size) 

171 self._current_position += len(output) 

172 

173 self._log.debug("returning %d bytes from handle read for %s", len(output), self._uri) 

174 

175 return output 

176 

177 def readinto(self, output: bytearray) -> int: 

178 """Read up to `len(output)` bytes into `output` and return the number 

179 of bytes read. 

180 

181 Parameters 

182 ---------- 

183 output : `bytearray` 

184 Byte array to write output into. 

185 """ 

186 if self._eof or len(output) == 0: 

187 return 0 

188 

189 data = self.read(len(output)) 

190 output[:] = data 

191 return len(data) 

192 

193 

194class DavReadAheadCache: 

195 """Helper read-ahead cache for fetching chunks of a DavResourceHandle. 

196 

197 Parameters 

198 ---------- 

199 client : `lsst.resources.davutils.DavClient` 

200 The webDAV client to interact with the server to download data. 

201 url : `str` 

202 URL of the resource to download data from. 

203 filesize : `int` 

204 Size in bytes of the remote file. 

205 blocksize : `int` 

206 Size in bytes of the block for this resource. This is the size we use 

207 to retrieve data from this resource. 

208 log : `logging.Logger` 

209 Logger object to emit log records. 

210 

211 Notes 

212 ----- 

213 Behavior of this cache is inspired from fsspec's ReadAheadCache class. 

214 https://github.com/fsspec/filesystem_spec/blob/master/fsspec/caching.py 

215 """ 

216 

217 def __init__( 

218 self, client: DavClient, url: str, filesize: int, blocksize: int, log: logging.Logger 

219 ) -> None: 

220 self._client: DavClient = client 

221 self._url: str = url 

222 self._backend_url: str = url 

223 self._filesize: int = filesize 

224 self._blocksize: int = blocksize 

225 self._cache = b"" 

226 self._start: int = 0 

227 self._end: int = 0 

228 self._log: logging.Logger = log 

229 

230 def geturl(self) -> str: 

231 return redact_url(self._url) 

232 

233 def fetch(self, start: int, end: int) -> bytes: 

234 """Fetch a chunk of the file and store it in memory. 

235 

236 Parameters 

237 ---------- 

238 start : `int` 

239 Position of the first byte of the chunk. 

240 end : `int` 

241 Position of the last byte of the chunk. 

242 

243 Returns 

244 ------- 

245 output: `bytes` 

246 A chunk of up to end-start bytes. The returned chunk is 

247 served directly from the in-memory buffer without fetching new 

248 data from the remote file if it is already cached. Otherwise, 

249 a new chunk is retrieved from the origin file server and cached 

250 in memory. The size of the chunk can be the configured block 

251 size for this particular kind of resource path or the remaining 

252 bytes in the file. 

253 """ 

254 self._log.debug( 

255 "DavReadAheadCache.fetch: %s start=%d end=%d [total: %d]", 

256 self.geturl(), 

257 start, 

258 end, 

259 end - start, 

260 ) 

261 start = max(0, start) 

262 end = min(end, self._filesize) 

263 if start >= self._filesize or start >= end: 

264 return b"" 

265 

266 if start >= self._start and end <= self._end: 

267 # The requested chunk is entirely cached 

268 return self._cache[start - self._start : end - self._start] 

269 

270 # The requested chunk is not fully in cache. Repopulate the cache 

271 # with a number of blocks large enough to satisfy the requested chunk. 

272 blocks_to_fetch = 1 + ((end - start) // self._blocksize) 

273 bytes_to_fetch = self._blocksize * blocks_to_fetch 

274 end_range = min(self._filesize, start + bytes_to_fetch) 

275 start_range = max(0, end_range - bytes_to_fetch) 

276 

277 self._log.debug( 

278 "populating handle cache for %s with %d blocks [%d - %d, total bytes: %d]", 

279 self.geturl(), 

280 blocks_to_fetch, 

281 start_range, 

282 end_range, 

283 end_range - start_range, 

284 ) 

285 

286 # Read the requested range. 

287 self._backend_url, self._cache = self._client.read_range( 

288 self._backend_url, start=start_range, end=end_range - 1, release_backend=False 

289 ) 

290 self._start = start_range 

291 self._end = self._start + len(self._cache) 

292 return self._cache[start - self._start : end - self._start] 

293 

294 def close(self) -> None: 

295 # Send a HEAD request to the backend server and ask it to close this 

296 # connection. This signals the server that we no longer need it for 

297 # serving partial reads for this handle. We can ignore its response. 

298 self._client._request("HEAD", self._backend_url, headers={"Connection": "close"}, redirect=False)