Coverage for python / lsst / resources / _resourceHandles / _davResourceHandle.py: 24%
122 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:32 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:32 +0000
1# This file is part of lsst-resources.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14__all__ = ("DavReadResourceHandle",)
16import io
17import logging
18from collections.abc import Callable, Iterable
19from typing import TYPE_CHECKING, AnyStr
21from ..davutils import DavClient, redact_url
22from ._baseResourceHandle import BaseResourceHandle, CloseStatus
24if TYPE_CHECKING:
25 from ..dav import DavResourcePath
28class DavReadResourceHandle(BaseResourceHandle[bytes]):
29 """WebDAV-based specialization of `.BaseResourceHandle`.
31 Parameters
32 ----------
33 mode : `str`
34 Handle modes as described in the python `io` module.
35 log : `~logging.Logger`
36 Logger to used when writing messages.
37 uri : `lsst.resources.dav.DavResourcePath`
38 URI of remote resource.
39 file_size : `int`
40 Size of the remote file, in bytes.
41 newline : `str` or `None`, optional
42 When doing multiline operations, break the stream on given character.
43 Defaults to newline. If a file is opened in binary mode, this argument
44 is not used, as binary files will only split lines on the binary
45 newline representation.
46 """
48 def __init__(
49 self,
50 mode: str,
51 log: logging.Logger,
52 uri: DavResourcePath,
53 file_size: int,
54 *,
55 newline: AnyStr | None = None,
56 ) -> None:
57 super().__init__(mode, log, uri, newline=newline)
58 self._uri: DavResourcePath = uri
59 self._client: DavClient = self._uri._client
60 self._filesize: int = file_size
61 self._closed = CloseStatus.OPEN
62 self._current_position = 0
63 self._cache: DavReadAheadCache = DavReadAheadCache(
64 client=self._client,
65 url=self._uri._internal_url,
66 filesize=self._filesize,
67 blocksize=self._uri._client._config.block_size,
68 log=log,
69 )
70 self._log.debug("initializing read handle for %s [%#x]", self._uri, id(self))
72 def close(self) -> None:
73 if self._closed != CloseStatus.CLOSED:
74 self._log.debug("closing read handle for %s [%#x]", self._uri, id(self))
75 self._cache.close()
76 self._closed = CloseStatus.CLOSED
78 @property
79 def closed(self) -> bool:
80 return self._closed == CloseStatus.CLOSED
82 def fileno(self) -> int:
83 raise io.UnsupportedOperation("DavReadResourceHandle does not have a file number")
85 def flush(self) -> None:
86 modes = set(self._mode)
87 if {"w", "x", "a", "+"} & modes:
88 raise io.UnsupportedOperation("DavReadResourceHandles are read only")
90 @property
91 def isatty(self) -> bool | Callable[[], bool]:
92 return False
94 def readable(self) -> bool:
95 return True
97 def readline(self, size: int = -1) -> bytes:
98 raise io.UnsupportedOperation("DavReadResourceHandles Do not support line by line reading")
100 def readlines(self, hint: int = -1) -> Iterable[bytes]:
101 raise io.UnsupportedOperation("DavReadResourceHandles Do not support line by line reading")
103 def seek(self, offset: int, whence: int = io.SEEK_SET) -> int:
104 self._log.debug(
105 "handle seek for %s: offset=%d, whence=%d, current_position=%d",
106 self._uri,
107 offset,
108 whence,
109 self._current_position,
110 )
112 match whence:
113 case io.SEEK_SET:
114 if offset < 0:
115 raise ValueError(f"negative seek value {offset}")
116 self._current_position = offset
117 case io.SEEK_CUR:
118 self._current_position += offset
119 case io.SEEK_END:
120 self._current_position = self._filesize + offset
121 case _:
122 raise ValueError(f"unexpected value {whence} for whence in seek()")
124 if self._current_position < 0:
125 self._current_position = 0
127 return self._current_position
129 def seekable(self) -> bool:
130 return True
132 def tell(self) -> int:
133 return self._current_position
135 def truncate(self, size: int | None = None) -> int:
136 raise io.UnsupportedOperation("DavReadResourceHandles Do not support truncation")
138 def writable(self) -> bool:
139 return False
141 def write(self, b: bytes, /) -> int:
142 raise io.UnsupportedOperation("DavReadResourceHandles are read only")
144 def writelines(self, b: Iterable[bytes], /) -> None:
145 raise io.UnsupportedOperation("DavReadResourceHandles are read only")
147 @property
148 def _eof(self) -> bool:
149 return self._current_position >= self._filesize
151 def read(self, size: int = -1) -> bytes:
152 self._log.debug(
153 "handle read for %s: filesize=%d, current_position=%d, size=%d",
154 self._uri,
155 self._filesize,
156 self._current_position,
157 size,
158 )
160 if self.closed:
161 raise ValueError("I/O operation on closed file")
163 if size == 0 or self._eof:
164 return b""
166 if size < 0:
167 # Read up to the end of the file
168 size = self._filesize - self._current_position
170 output = self._cache.fetch(start=self._current_position, end=self._current_position + size)
171 self._current_position += len(output)
173 self._log.debug("returning %d bytes from handle read for %s", len(output), self._uri)
175 return output
177 def readinto(self, output: bytearray) -> int:
178 """Read up to `len(output)` bytes into `output` and return the number
179 of bytes read.
181 Parameters
182 ----------
183 output : `bytearray`
184 Byte array to write output into.
185 """
186 if self._eof or len(output) == 0:
187 return 0
189 data = self.read(len(output))
190 output[:] = data
191 return len(data)
194class DavReadAheadCache:
195 """Helper read-ahead cache for fetching chunks of a DavResourceHandle.
197 Parameters
198 ----------
199 client : `lsst.resources.davutils.DavClient`
200 The webDAV client to interact with the server to download data.
201 url : `str`
202 URL of the resource to download data from.
203 filesize : `int`
204 Size in bytes of the remote file.
205 blocksize : `int`
206 Size in bytes of the block for this resource. This is the size we use
207 to retrieve data from this resource.
208 log : `logging.Logger`
209 Logger object to emit log records.
211 Notes
212 -----
213 Behavior of this cache is inspired from fsspec's ReadAheadCache class.
214 https://github.com/fsspec/filesystem_spec/blob/master/fsspec/caching.py
215 """
217 def __init__(
218 self, client: DavClient, url: str, filesize: int, blocksize: int, log: logging.Logger
219 ) -> None:
220 self._client: DavClient = client
221 self._url: str = url
222 self._backend_url: str = url
223 self._filesize: int = filesize
224 self._blocksize: int = blocksize
225 self._cache = b""
226 self._start: int = 0
227 self._end: int = 0
228 self._log: logging.Logger = log
230 def geturl(self) -> str:
231 return redact_url(self._url)
233 def fetch(self, start: int, end: int) -> bytes:
234 """Fetch a chunk of the file and store it in memory.
236 Parameters
237 ----------
238 start : `int`
239 Position of the first byte of the chunk.
240 end : `int`
241 Position of the last byte of the chunk.
243 Returns
244 -------
245 output: `bytes`
246 A chunk of up to end-start bytes. The returned chunk is
247 served directly from the in-memory buffer without fetching new
248 data from the remote file if it is already cached. Otherwise,
249 a new chunk is retrieved from the origin file server and cached
250 in memory. The size of the chunk can be the configured block
251 size for this particular kind of resource path or the remaining
252 bytes in the file.
253 """
254 self._log.debug(
255 "DavReadAheadCache.fetch: %s start=%d end=%d [total: %d]",
256 self.geturl(),
257 start,
258 end,
259 end - start,
260 )
261 start = max(0, start)
262 end = min(end, self._filesize)
263 if start >= self._filesize or start >= end:
264 return b""
266 if start >= self._start and end <= self._end:
267 # The requested chunk is entirely cached
268 return self._cache[start - self._start : end - self._start]
270 # The requested chunk is not fully in cache. Repopulate the cache
271 # with a number of blocks large enough to satisfy the requested chunk.
272 blocks_to_fetch = 1 + ((end - start) // self._blocksize)
273 bytes_to_fetch = self._blocksize * blocks_to_fetch
274 end_range = min(self._filesize, start + bytes_to_fetch)
275 start_range = max(0, end_range - bytes_to_fetch)
277 self._log.debug(
278 "populating handle cache for %s with %d blocks [%d - %d, total bytes: %d]",
279 self.geturl(),
280 blocks_to_fetch,
281 start_range,
282 end_range,
283 end_range - start_range,
284 )
286 # Read the requested range.
287 self._backend_url, self._cache = self._client.read_range(
288 self._backend_url, start=start_range, end=end_range - 1, release_backend=False
289 )
290 self._start = start_range
291 self._end = self._start + len(self._cache)
292 return self._cache[start - self._start : end - self._start]
294 def close(self) -> None:
295 # Send a HEAD request to the backend server and ask it to close this
296 # connection. This signals the server that we no longer need it for
297 # serving partial reads for this handle. We can ignore its response.
298 self._client._request("HEAD", self._backend_url, headers={"Connection": "close"}, redirect=False)