Coverage for python / lsst / resources / dav.py: 29%
367 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 08:44 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 08:44 +0000
1# This file is part of lsst-resources.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14__all__ = ("DavResourcePath",)
16import contextlib
17import datetime
18import functools
19import io
20import logging
21import os
22import re
23import threading
24import urllib
25from collections.abc import Iterator
26from typing import TYPE_CHECKING, Any, BinaryIO, cast
28try:
29 from typing import override # Python 3.12+
30except ImportError:
31 from typing_extensions import override # Python 3.11
33try:
34 import fsspec
35 from fsspec.spec import AbstractFileSystem
36except ImportError:
37 fsspec = None
38 AbstractFileSystem = type
40from ._resourceHandles import ResourceHandleProtocol
41from ._resourceHandles._davResourceHandle import DavReadResourceHandle
42from ._resourcePath import ResourceInfo, ResourcePath, ResourcePathExpression
43from .davutils import (
44 DavClient,
45 DavClientPool,
46 DavConfigPool,
47 DavFileMetadata,
48 normalize_path,
49 normalize_url,
50)
51from .utils import get_tempdir
53if TYPE_CHECKING:
54 from .utils import TransactionProtocol
56from lsst.utils.logging import getLogger
58log = getLogger(__name__)
61@functools.lru_cache
62def _calc_tmpdir_buffer_size(tmpdir: str) -> int:
63 """Compute the block size to use for writing files in `tmpdir` as
64 256 blocks of typical size (i.e. 4096 bytes) or 10 times the file system
65 block size, whichever is higher.
67 This is a reasonable compromise between using memory for buffering and
68 the number of system calls issued to read from or write to temporary
69 files.
70 """
71 fsstats = os.statvfs(tmpdir)
72 return max(10 * fsstats.f_bsize, 256 * 4096)
75class DavResourcePathConfig:
76 """Configuration class to encapsulate the configurable items used by
77 all instances of class `DavResourcePath`.
79 Instantiating this class creates a thread-safe singleton.
80 """
82 _instance = None
83 _lock = threading.Lock()
85 def __new__(cls) -> DavResourcePathConfig:
86 if cls._instance is None: 86 ↛ 91line 86 didn't jump to line 91 because the condition on line 86 was always true
87 with cls._lock:
88 if cls._instance is None: 88 ↛ 91line 88 didn't jump to line 91
89 cls._instance = super().__new__(cls)
91 return cls._instance
93 def __init__(self) -> None:
94 # Path to the local temporary directory all instances of
95 # `DavResourcePath` must use and its associated buffer size (in bytes).
96 self._tmpdir_buffersize: tuple[str, int] | None = None
98 @property
99 def tmpdir_buffersize(self) -> tuple[str, int]:
100 """Return the path to a temporary directory and the preferred buffer
101 size to use when reading/writing files from/to that directory.
102 """
103 if self._tmpdir_buffersize is not None:
104 return self._tmpdir_buffersize
106 # Retrieve and cache the path and the blocksize for the temporary
107 # directory if no other thread has done that in the meantime.
108 with DavResourcePathConfig._lock:
109 if self._tmpdir_buffersize is None:
110 tmpdir = get_tempdir()
111 bufsize = _calc_tmpdir_buffer_size(tmpdir)
112 self._tmpdir_buffersize = (tmpdir, bufsize)
114 return self._tmpdir_buffersize
116 def _destroy(self) -> None:
117 """Destroy this class singleton instance.
119 Helper method to be used in tests to reset global configuration.
120 """
121 with DavResourcePathConfig._lock:
122 DavResourcePathConfig._instance = None
125class DavGlobals:
126 """Helper container to encapsulate all the gloal objects needed by this
127 module.
128 """
130 def __init__(self) -> None:
131 # Client pool used by all DavResourcePath instances.
132 # Use Any as type annotation to keep mypy happy.
133 self._client_pool: Any = None
135 # Configuration used by all DavResourcePath instances.
136 self._config: Any = None
138 # (Re)Initialize the objects above.
139 self._reset()
141 def _reset(self) -> None:
142 """Initialize all the globals.
144 This method is a helper for reinitializing globals in tests.
145 """
146 # Initialize the singleton instance of the webdav endpoint
147 # configuration pool.
148 config_pool: DavConfigPool = DavConfigPool("LSST_RESOURCES_WEBDAV_CONFIG")
150 # Initialize the singleton instance of the webdav client pool. This is
151 # a thread-safe singleton shared by all instances of DavResourcePath.
152 if self._client_pool is not None: 152 ↛ 153line 152 didn't jump to line 153 because the condition on line 152 was never true
153 self._client_pool._destroy()
155 self._client_pool = DavClientPool(config_pool)
157 # Initialize the singleton instance of the configuration shared
158 # all DavResourcePath objects.
159 if self._config is not None: 159 ↛ 160line 159 didn't jump to line 160 because the condition on line 159 was never true
160 self._config._destroy()
162 self._config = DavResourcePathConfig()
164 def client_pool(self) -> DavClientPool:
165 """Return the pool of reusable webDAV clients."""
166 return self._client_pool
168 def config(self) -> DavResourcePathConfig:
169 """Return the configuration settings for all `DavResourcePath`
170 objects.
171 """
172 return self._config
175# Convenience object to encapsulate all global objects needed by this module.
176dav_globals: DavGlobals = DavGlobals()
179class DavResourcePath(ResourcePath):
180 """WebDAV resource.
182 Parameters
183 ----------
184 uri : `ResourcePathExpression`
185 URI to store in object.
186 root : `str` or `ResourcePath` or `None`, optional
187 Root for relative URIs. Not used in this constructor.
188 forceAbsolute : `bool`
189 Whether to force absolute URI. A WebDAV URI is always absolute.
190 forceDirectory : `bool` or `None`, optional
191 Whether this URI represents a directory.
192 isTemporary : `bool` or `None`, optional
193 Whether this URI represents a temporary resource.
194 """
196 def __init__(
197 self,
198 uri: ResourcePathExpression,
199 root: str | ResourcePath | None = None,
200 forceAbsolute: bool = True,
201 forceDirectory: bool | None = None,
202 isTemporary: bool | None = None,
203 ) -> None:
204 # Build the internal URL we use to talk to the server, which
205 # uses "http" or "https" as scheme instead of "dav" or "davs".
206 self._internal_url: str = normalize_url(self.geturl())
208 # WebDAV client this path must use to interact with the server.
209 self._dav_client: DavClient | None = None
211 # Retrieve the configuration shared by all instances of this class.
212 self._config: DavResourcePathConfig = dav_globals.config()
214 log.debug("created instance of DavResourcePath %s [%#x]", self, id(self))
216 @classmethod
217 def _fixupPathUri(
218 cls,
219 parsed: urllib.parse.ParseResult,
220 root: ResourcePath | None = None,
221 forceAbsolute: bool = False,
222 forceDirectory: bool | None = None,
223 ) -> tuple[urllib.parse.ParseResult, bool | None]:
224 """Correct any issues with the supplied URI.
226 This function ensures that the path of the URI is normalized.
227 """
228 # Call the superclass' _fixupPathUri.
229 parsed, dirLike = super()._fixupPathUri(parsed, forceDirectory=forceDirectory)
231 # Clean the URL's path and ensure dir-like paths end by "/".
232 path = normalize_path(parsed.path)
233 if dirLike and path != "/":
234 path += "/"
236 return parsed._replace(path=path), dirLike
238 @property
239 def _client(self) -> DavClient:
240 """Return the webDAV client for this resource."""
241 # If we already have a client, use it.
242 if self._dav_client is not None:
243 return self._dav_client
245 # Retrieve the client this resource must use to interact with the
246 # server from the global client pool.
247 self._dav_client = dav_globals.client_pool().get_client_for_url(self._internal_url)
248 return self._dav_client
250 def _stat(self) -> DavFileMetadata:
251 """Retrieve metadata about this resource."""
252 return self._client.stat(self._internal_url)
254 @override
255 def mkdir(self) -> None:
256 """Create the directory resource if it does not already exist."""
257 log.debug("mkdir %s [%#x]", self, id(self))
259 if not self.isdir():
260 raise NotADirectoryError(f"Can not create a directory for file-like URI {self}")
262 stat = self._stat()
263 if stat.is_dir:
264 return
266 if stat.is_file:
267 # A file exists at this path.
268 raise NotADirectoryError(
269 f"Can not create a directory for {self} because a file already exists at that URL"
270 )
272 # The underlying webDAV client will use the knowledge it has about
273 # the specific server to create the requested directory
274 # hierarchy by issueing the minimum possible number of requests.
275 self._client.mkcol(self._internal_url)
277 @override
278 def exists(self) -> bool:
279 """Check that this resource exists."""
280 log.debug("exists %s [%#x]", self, id(self))
282 return self._stat().exists
284 @override
285 def size(self) -> int:
286 """Return the size of the remote resource in bytes."""
287 log.debug("size %s [%#x]", self, id(self))
289 return 0 if self.isdir() else self._client.size(self._internal_url)
291 @override
292 def get_info(self) -> ResourceInfo:
293 """Return lightweight metadata details about this resource."""
294 log.debug("get_info %s [%#x]", self, id(self))
296 info = self._client.info(self._internal_url)
297 if info["type"] is None:
298 raise FileNotFoundError(f"Resource {self} does not exist")
300 return ResourceInfo(
301 uri=str(self),
302 is_file=info["type"] == "file",
303 size=info["size"],
304 last_modified=info["last_modified"],
305 checksums=info["checksums"],
306 )
308 @override
309 def read(self, size: int = -1) -> bytes:
310 """Open the resource and return the contents in bytes.
312 Parameters
313 ----------
314 size : `int`, optional
315 The number of bytes to read. Negative or omitted indicates that
316 all data should be read.
317 """
318 log.debug("read %s [%#x] size=%d", self, id(self), size)
320 # A GET request on a dCache directory returns the contents of the
321 # directory in HTML, to be visualized with a browser. This means
322 # that we need to check first that this resource is not a directory.
323 #
324 # Since isdir() only checks that the URL of the resource ends in "/"
325 # without actually asking the server, this check is not robust.
326 # However, it is a reasonable compromise since it prevents doing
327 # an additional roundtrip to the server to retrieve this resource's
328 # metadata.
329 if self.isdir():
330 raise ValueError(f"method read() is not implemented for directory {self}")
332 if size < 0:
333 # Read the entire file content
334 _, data = self._client.read(self._internal_url)
335 return data
337 # This is a partial read. Retrieve the file size.
338 stat = self._stat()
339 if not stat.is_file:
340 raise FileNotFoundError(f"No file found at {self}")
342 if size == 0 or stat.size == 0:
343 return b""
345 # Read the requested chunk of data and release the backend server.
346 end_range = min(stat.size, size) - 1
347 _, data = self._client.read_range(self._internal_url, start=0, end=end_range, release_backend=True)
348 return data
350 @override
351 @contextlib.contextmanager
352 def _as_local(
353 self, multithreaded: bool = True, tmpdir: ResourcePath | None = None
354 ) -> Iterator[ResourcePath]:
355 """Download object and place in temporary directory.
357 Parameters
358 ----------
359 multithreaded : `bool`, optional
360 If `True` the transfer will be allowed to attempt to improve
361 throughput by using parallel download streams. This may of no
362 effect if the URI scheme does not support parallel streams or
363 if a global override has been applied. If `False` parallel
364 streams will be disabled.
365 tmpdir : `ResourcePath` or `None`, optional
366 Explicit override of the temporary directory to use for remote
367 downloads.
369 Returns
370 -------
371 local_uri : `ResourcePath`
372 A URI to a local POSIX file corresponding to a local temporary
373 downloaded copy of the resource.
374 """
375 log.debug("_as_local %s [%#x] tmpdir: %s", self, id(self), tmpdir)
377 # We need to ensure that this resource is actually a file since
378 # the response to a GET request on a directory may be implemented in
379 # several ways, according to RFC 4818.
380 if self.isdir():
381 raise FileNotFoundError(f"{self} is a directory")
383 if tmpdir is None:
384 local_dir, buffer_size = self._config.tmpdir_buffersize
385 tmpdir = ResourcePath(local_dir, forceDirectory=True)
386 else:
387 buffer_size = _calc_tmpdir_buffer_size(tmpdir.ospath)
389 with ResourcePath.temporary_uri(suffix=self.getExtension(), prefix=tmpdir, delete=True) as tmp_uri:
390 log.debug(
391 "downloading %s [%#x] to local file %s [buffer_size %d]",
392 self,
393 id(self),
394 tmp_uri.ospath,
395 buffer_size,
396 )
397 self._client.download(self._internal_url, tmp_uri.ospath, buffer_size)
398 yield tmp_uri
400 @override
401 def write(self, data: BinaryIO | bytes, overwrite: bool = True) -> None:
402 """Write the supplied bytes to the new resource.
404 Parameters
405 ----------
406 data : `bytes`
407 The bytes to write to the resource. The entire contents of the
408 resource will be replaced.
409 overwrite : `bool`, optional
410 If `True` the resource will be overwritten if it exists. Otherwise
411 the write will fail.
412 """
413 log.debug("write %s [%#x] overwrite=%s", self, id(self), overwrite)
415 if self.isdir():
416 raise ValueError(f"Method write() is not implemented for directory {self}")
418 if not overwrite and self._stat().is_file:
419 raise FileExistsError(f"File {self} exists and overwrite has been disabled")
421 self._client.write(self._internal_url, data)
423 @override
424 def remove(self) -> None:
425 """Remove the resource.
427 If the resource is a directory, it must be empty otherwise this
428 method raises. Removing a non-existent file or directory is not
429 considered an error.
430 """
431 log.debug("remove %s [%#x]", self, id(self))
433 stat = self._stat()
434 if not stat.exists:
435 # There is no resource at this uri. There is nothing to do.
436 return
438 if stat.is_dir:
439 entries = self._client.read_dir(self._internal_url)
440 if len(entries) > 0:
441 raise IsADirectoryError(f"Directory {self} is not empty")
443 # This resource is a either file or an empty directory, we can remove
444 # it.
445 self._client.delete(self._internal_url)
447 def remove_dir(self, recursive: bool = False) -> None:
448 """Remove a directory if empty.
450 Parameters
451 ----------
452 recursive : `bool`
453 If `True` recursively remove all files and directories under this
454 directory.
456 Notes
457 -----
458 This method is not present in the superclass.
459 """
460 log.debug("remove_dir %s [%#x] recursive=%s", self, id(self), recursive)
462 if not self.isdir():
463 raise NotADirectoryError(f"{self} is not a directory")
465 for root, subdirs, files in self.walk():
466 if not recursive and (len(subdirs) > 0 or len(files) > 0):
467 raise IsADirectoryError(f"Directory at {self} is not empty and recursive argument is False")
469 for file in files:
470 root.join(file).remove()
472 for subdir in subdirs:
473 DavResourcePath(root.join(subdir, forceDirectory=True)).remove_dir(recursive=recursive)
475 # Remove empty top directory
476 self.remove()
478 @override
479 def transfer_from(
480 self,
481 src: ResourcePath,
482 transfer: str = "copy",
483 overwrite: bool = False,
484 transaction: TransactionProtocol | None = None,
485 multithreaded: bool = True,
486 ) -> None:
487 """Transfer to this URI from another.
489 Parameters
490 ----------
491 src : `ResourcePath`
492 Source URI.
493 transfer : `str`
494 Mode to use for transferring the resource. Generically there are
495 many standard options: copy, link, symlink, hardlink, relsymlink.
496 Not all URIs support all modes.
497 overwrite : `bool`, optional
498 Allow an existing file to be overwritten. Defaults to `False`.
499 transaction : `~lsst.resources.utils.TransactionProtocol`, optional
500 A transaction object that can (depending on implementation)
501 rollback transfers on error. Not guaranteed to be implemented.
502 multithreaded : `bool`, optional
503 If `True` the transfer will be allowed to attempt to improve
504 throughput by using parallel download streams. This may of no
505 effect if the URI scheme does not support parallel streams or
506 if a global override has been applied. If `False` parallel
507 streams will be disabled.
508 """
509 log.debug(
510 "transfer_from %s [%#x] src=%s transfer=%s overwrite=%s",
511 self,
512 id(self),
513 src,
514 transfer,
515 overwrite,
516 )
518 # Fail early to prevent delays if remote resources are requested.
519 if transfer not in self.transferModes:
520 raise ValueError(f"Transfer mode {transfer} not supported by URI scheme {self.scheme}")
522 # Existence checks cost time so do not call this unless we know
523 # that debugging is enabled.
524 destination_exists = None
525 if log.isEnabledFor(logging.DEBUG):
526 destination_exists = self.exists()
527 log.debug(
528 "Transferring %s [exists: %s] -> %s [exists: %s] (transfer=%s)",
529 src.geturl(),
530 src.exists(),
531 self,
532 destination_exists,
533 transfer,
534 )
536 # Short circuit immediately if the URIs are identical.
537 if self == src:
538 log.debug(
539 "Target and destination URIs are identical: %s, returning immediately."
540 " No further action required.",
541 self,
542 )
543 return
545 if not overwrite:
546 if destination_exists is None:
547 destination_exists = self.exists()
549 if destination_exists:
550 raise FileExistsError(f"Destination path {self} already exists.")
552 if transfer == "auto":
553 transfer = self.transferDefault
555 # We can use webDAV 'COPY' or 'MOVE' if both the current and source
556 # resources are located in the same server.
557 if isinstance(src, type(self)) and self.root_uri() == src.root_uri():
558 log.debug("Transfer from %s to %s [%#x] directly", src, self, id(self))
559 return (
560 self._move_from(src, overwrite=overwrite)
561 if transfer == "move"
562 else self._copy_from(src, overwrite=overwrite)
563 )
565 # For resources of different classes we can perform the copy or move
566 # operation by downloading to a local file and uploading to the
567 # destination.
568 self._copy_via_local(src)
570 # This was an explicit move, try to remove the source.
571 if transfer == "move":
572 src.remove()
574 def _copy_via_local(self, source: ResourcePath) -> None:
575 """Replace the contents of this resource with the contents of a remote
576 resource by using a local temporary file.
578 Parameters
579 ----------
580 source : `ResourcePath`
581 The source of the contents to copy to `self`.
582 """
583 with source.as_local() as local_uri:
584 log.debug(
585 "Transfer from %s to %s [%#x] via local file %s",
586 source.geturl(),
587 self,
588 id(self),
589 local_uri,
590 )
591 with open(local_uri.ospath, "rb") as f:
592 self.write(data=f)
594 def _copy_from(self, source: DavResourcePath, overwrite: bool = False) -> None:
595 """Copy the contents of `source` to this resource. `source` must
596 be a file.
597 """
598 log.debug("_copy_from %s [%#x] source=%s overwrite=%s", self, id(self), source, overwrite)
600 # Copy is only supported for files, not directories.
601 if self.isdir():
602 raise ValueError(f"Copy is not supported because destination {self} is a directory")
604 if source.isdir():
605 raise ValueError(f"Copy is not supported for directory {source}")
607 if not source.exists():
608 raise FileNotFoundError(f"No file found at {source}")
610 # If the server supports file duplication, use that method.
611 if self._client.supports_duplicate:
612 return self._client.duplicate(source._internal_url, self._internal_url, overwrite)
614 # Make this copy via a local file
615 if not overwrite and self.exists():
616 raise FileExistsError(f"Destination path {self} already exists.")
618 self._copy_via_local(source)
620 def _move_from(self, source: DavResourcePath, overwrite: bool = False) -> None:
621 """Send a MOVE webDAV request to replace the contents of this resource
622 with the contents of another resource located in the same server.
624 Parameters
625 ----------
626 source : `DavResourcePath`
627 The source of the contents to move to `self`.
628 """
629 log.debug("_move_from %s [%#x] source=%s overwrite=%s", self, id(self), source, overwrite)
631 # Move is only supported for files, not directories.
632 if self.isdir():
633 raise ValueError(f"Move is not supported for destination directory {self}")
635 if source.isdir():
636 raise ValueError(f"Move is not supported for directory {source}")
638 if not source.exists():
639 raise FileNotFoundError(f"No file found at {source}")
641 self._client.rename(source._internal_url, self._internal_url, overwrite)
643 @override
644 def walk(
645 self, file_filter: str | re.Pattern | None = None
646 ) -> Iterator[list | tuple[ResourcePath, list[str], list[str]]]:
647 """Walk the directory tree returning matching files and directories.
649 Parameters
650 ----------
651 file_filter : `str` or `re.Pattern`, optional
652 Regex to filter out files from the list before it is returned.
654 Yields
655 ------
656 dirpath : `ResourcePath`
657 Current directory being examined.
658 dirnames : `list` of `str`
659 Names of subdirectories within dirpath.
660 filenames : `list` of `str`
661 Names of all the files within dirpath.
662 """
663 if not self.isdir():
664 raise ValueError(f"Can not walk non-directory URI {self}")
666 # We must return no entries for non-existent directories.
667 if not self.exists():
668 return
670 # Retrieve the entries in this directory
671 entries = self._client.read_dir(self._internal_url)
672 files = [e.name for e in entries if e.is_file]
673 subdirs = [e.name for e in entries if e.is_dir]
675 # Filter files
676 if isinstance(file_filter, str):
677 file_filter = re.compile(file_filter)
679 if file_filter is not None:
680 files = [f for f in files if file_filter.search(f)]
682 if not subdirs and not files:
683 return
684 else:
685 yield type(self)(self, forceAbsolute=False, forceDirectory=True), subdirs, files
687 for subdir in subdirs:
688 new_uri = self.join(subdir, forceDirectory=True)
689 yield from new_uri.walk(file_filter)
691 @override
692 def generate_presigned_get_url(self, *, expiration_time_seconds: int) -> str:
693 """Return a pre-signed URL that can be used to retrieve this resource
694 using an HTTP GET without supplying any access credentials.
696 Parameters
697 ----------
698 expiration_time_seconds : `int`
699 Number of seconds until the generated URL is no longer valid.
701 Returns
702 -------
703 url : `str`
704 HTTP URL signed for GET.
705 """
706 return self._client.generate_presigned_get_url(self._internal_url, expiration_time_seconds)
708 @override
709 def generate_presigned_put_url(self, *, expiration_time_seconds: int) -> str:
710 """Return a pre-signed URL that can be used to upload a file to this
711 path using an HTTP PUT without supplying any access credentials.
713 Parameters
714 ----------
715 expiration_time_seconds : `int`
716 Number of seconds until the generated URL is no longer valid.
718 Returns
719 -------
720 url : `str`
721 HTTP URL signed for PUT.
722 """
723 return self._client.generate_presigned_put_url(self._internal_url, expiration_time_seconds)
725 @override
726 def to_fsspec(self) -> tuple[DavFileSystem, str]:
727 """Return an abstract file system and path that can be used by fsspec.
729 Returns
730 -------
731 fs : `fsspec.spec.AbstractFileSystem`
732 A file system object suitable for use with the returned path.
733 path : `str`
734 A path that can be opened by the file system object.
735 """
736 if fsspec is None or not self._client._config.enable_fsspec:
737 raise ImportError("fsspec is not available")
739 log.debug("DavResourcePath.to_fsspec: %s", self)
740 fsys = DavFileSystem(self)
741 return fsys, fsys._path
743 @override
744 @contextlib.contextmanager
745 def _openImpl(
746 self,
747 mode: str = "r",
748 *,
749 encoding: str | None = None,
750 ) -> Iterator[ResourceHandleProtocol]:
751 log.debug("DavResourcePath._openImpl: %s mode: %s", self, mode)
753 if mode in ("rb", "r") and self._client.accepts_ranges(self._internal_url):
754 stat = self._stat()
755 if stat.is_dir:
756 raise OSError(f"open is not implemented for directory {self}")
758 if not stat.is_file:
759 raise FileNotFoundError(f"No such file {self}")
761 with DavReadResourceHandle(mode, log.logger, uri=self, file_size=stat.size) as handle:
762 if mode == "r":
763 # cast because the protocol is compatible, but does not
764 # have BytesIO in the inheritance tree
765 yield io.TextIOWrapper(cast(Any, handle), encoding=encoding)
766 else:
767 yield handle
768 else:
769 with super()._openImpl(mode, encoding=encoding) as handle:
770 yield handle
773class DavFileSystem(AbstractFileSystem):
774 """Minimal fsspec-compatible read-only file system which contains a single
775 file.
777 Parameters
778 ----------
779 uri : `DavResourcePath`
780 URI of the single resource contained in the file system.
781 """
783 protocol = ("davs", "dav")
785 def __init__(self, uri: DavResourcePath):
786 super().__init__()
787 self._uri: DavResourcePath = uri
788 self._path: str = self._uri.geturl()
789 self._size: int | None = None
791 @override
792 def info(self, path: str, **kwargs: Any) -> dict[str, Any]:
793 log.debug("DavFileSystem.info %s", path)
794 if path != self._path:
795 raise FileNotFoundError(path)
797 return {
798 "name": path,
799 "size": self.size(self._path),
800 "type": "file",
801 }
803 @override
804 def ls(self, path: str, detail: bool = True, **kwargs: Any) -> list[str] | list[dict[str, str]]:
805 log.debug("DavFileSystem.ls %s", path)
806 if path != self._path:
807 raise FileNotFoundError(path)
809 return list(self.info(path)) if detail else list(path)
811 @override
812 def modified(self, path: str) -> datetime.datetime:
813 log.debug("DavFileSystem.modified %s", path)
814 if path != self._path:
815 raise FileNotFoundError(path)
817 return self._uri._stat().last_modified
819 @override
820 def size(self, path: str) -> int:
821 log.debug("DavFileSystem.size %s", path)
822 if path != self._path:
823 raise FileNotFoundError(path)
825 if self._size is None:
826 self._size = self._uri.size()
828 return self._size
830 @override
831 def isfile(self, path: str) -> bool:
832 log.debug("DavFileSystem.isfile %s", path)
833 return path == self._path
835 @override
836 def isdir(self, path: str) -> bool:
837 log.debug("DavFileSystem.isdir %s", path)
838 return False
840 @override
841 def exists(self, path: str, **kwargs: Any) -> bool:
842 log.debug("DavFileSystem.exists %s", path)
843 return path == self._path
845 @override
846 def open(
847 self,
848 path: str,
849 mode: str = "rb",
850 encoding: str | None = None,
851 block_size: int | None = None,
852 cache_options: dict[Any, Any] | None = None,
853 compression: str | None = None,
854 **kwargs: Any,
855 ) -> DavReadResourceHandle | io.TextIOWrapper:
856 log.debug(
857 "DavFileSystem.open path: %s mode: %s encoding: %s blocksize: %s",
858 path,
859 mode,
860 encoding,
861 block_size,
862 )
863 if path != self._path:
864 raise FileNotFoundError(f"File {path} does not exist")
866 if mode not in ("rb", "r"):
867 raise OSError(f"Opening {path} for writing is not supported")
869 handle = DavReadResourceHandle(mode, log.logger, self._uri, self.size(self._path))
870 if mode == "rb":
871 return handle
872 else:
873 return io.TextIOWrapper(cast(Any, handle), encoding=encoding)
875 @property
876 def fsid(self) -> Any:
877 return "davs"
879 @override
880 def mkdir(self, path: str, create_parents: bool = True, **kwargs: Any) -> None:
881 raise NotImplementedError
883 @override
884 def makedirs(self, path: str, exist_ok: bool = False) -> None:
885 raise NotImplementedError
887 @override
888 def rmdir(self, path: str) -> None:
889 raise NotImplementedError
891 @override
892 def walk(
893 self,
894 path: str,
895 maxdepth: int | None = None,
896 topdown: bool = True,
897 on_error: str = "omit",
898 **kwargs: Any,
899 ) -> None:
900 raise NotImplementedError
902 @override
903 def find(
904 self,
905 path: str,
906 maxdepth: int | None = None,
907 withdirs: bool = False,
908 detail: bool = False,
909 **kwargs: Any,
910 ) -> None:
911 raise NotImplementedError
913 @override
914 def du(
915 self,
916 path: str,
917 total: bool = True,
918 maxdepth: int | None = None,
919 withdirs: bool = False,
920 **kwargs: Any,
921 ) -> None:
922 raise NotImplementedError
924 @override
925 def glob(self, path: str, maxdepth: int | None = None, **kwargs: Any) -> None:
926 raise NotImplementedError
928 @override
929 def rm_file(self, path: str) -> None:
930 raise NotImplementedError
932 @override
933 def rm(self, path: str, recursive: bool = False, maxdepth: int | None = None) -> None:
934 raise NotImplementedError
936 @override
937 def touch(self, path: str, truncate: bool = True, **kwargs: Any) -> None:
938 raise NotImplementedError
940 @override
941 def ukey(self, path: str) -> None:
942 raise NotImplementedError
944 @override
945 def created(self, path: str) -> None:
946 raise NotImplementedError