Coverage for python / lsst / resources / davutils.py: 24%
1098 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:32 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:32 +0000
1# This file is part of lsst-resources.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14import base64
15import enum
16import io
17import json
18import logging
19import os
20import posixpath
21import random
22import re
23import stat
24import threading
25import time
26import uuid
27import xml.etree.ElementTree as eTree
28from datetime import UTC, datetime
29from http import HTTPStatus
30from typing import Any, BinaryIO
32try:
33 from typing import override # Python 3.12+
34except ImportError:
35 from typing_extensions import override # Python 3.11
37from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse
39try:
40 import fsspec
41 from fsspec.spec import AbstractFileSystem
42except ImportError:
43 fsspec = None
44 AbstractFileSystem = type
46import yaml
47from astropy import units as u
48from urllib3 import PoolManager, make_headers
49from urllib3.response import HTTPResponse
50from urllib3.util import Retry, Timeout, Url, parse_url
52from lsst.utils.logging import getLogger
53from lsst.utils.timer import time_this
55# Use the same logger than `dav.py`.
56log = getLogger(f"""{__name__.replace(".davutils", ".dav")}""")
59def normalize_path(path: str | None) -> str:
60 """Normalize a path intended to be part of a URL.
62 A path of the form "///a/b/c///../d/e/" would be normalized as "/a/b/d/e".
63 The returned path is always absolute, i.e. starts by "/" and never
64 ends by "/" except when the path is exactly "/" and does not contain
65 "." nor "..". It does not contain consecutive "/" either.
67 Parameters
68 ----------
69 path : `str`, optional
70 Path to normalize (e.g., '/path/to/..///normalize/').
72 Returns
73 -------
74 url : `str`
75 Normalized URL (e.g., '/path/normalize').
76 """
77 return "/" if not path else "/" + posixpath.normpath(path).lstrip("/")
80def normalize_url(url: str, preserve_scheme: bool = False, preserve_path: bool = True) -> str:
81 """Normalize a URL so that scheme be 'http' or 'https' and the URL path
82 is normalized.
84 Parameters
85 ----------
86 url : `str`
87 URL to normalize (e.g., 'davs://example.org:1234///path/to//../dir/').
88 preserve_scheme : `bool`
89 If True the scheme of `url` will be preserved. Otherwise the scheme
90 of the returned normalized URL will be 'http' or 'https'.
91 preserve_path : `bool`
92 If True, the path of `url` will be preserved in the returned
93 normalized URL, otherwise, the returned URL will have '/' as path.
95 Returns
96 -------
97 url : `str`
98 Normalized URL (e.g. 'https://example.org:1234/path/to/dir').
99 """
100 parsed = parse_url(url)
101 if parsed.scheme is None:
102 scheme = "http"
103 else:
104 scheme = parsed.scheme if preserve_scheme else parsed.scheme.replace("dav", "http")
105 path = normalize_path(parsed.path) if preserve_path else "/"
106 return Url(scheme=scheme, host=parsed.host, port=parsed.port, path=path).url
109def redact_url(url: str) -> str:
110 """Return a modified `url` with authorization query redacted.
112 The goal is that this method should be used for logging URLs to avoid
113 leaking authorization tokens.
115 Parameters
116 ----------
117 url : `str`
118 URL to redact.
120 Returns
121 -------
122 redacted_url : `str`
123 For instance, when called with an URL like:
125 https://host.example.org:1234/a/b/c/file.data?key1=value1&key2=value2&authz=token#fragment
127 the returned value would be:
129 https://host.example.org:1234/a/b/c/file.data?key1=value1&key2=value2&authz=....#fragment
130 """
131 parsed_url = urlparse(url)
132 redacted_query: list[tuple[str, str]] = []
133 for pair in parse_qsl(parsed_url.query):
134 redacted_query.append((pair[0], "...." if pair[0] == "authz" else pair[1]))
136 redacted_url = parsed_url._replace(query=urlencode(redacted_query))
137 return urlunparse(redacted_url)
140class DavConfig:
141 """Configurable settings a webDAV client must use when interacting with a
142 particular storage endpoint.
144 Parameters
145 ----------
146 config : `dict[str, str]`
147 Dictionary of configurable settings for the webdav endpoint which
148 base URL is `config["base_url"]`.
150 For instance, if `config["base_url"]` is
152 "davs://webdav.example.org:1234/"
154 any object of class `DavResourcePath` like
156 "davs://webdav.example.org:1234/path/to/any/file"
158 will use the settings in this configuration to configure its client.
159 """
161 # Timeout in seconds to establish a network connection with the remote
162 # server.
163 DEFAULT_TIMEOUT_CONNECT: float = 10.0
165 # Timeout in seconds to read the response to a request sent to a server.
166 # This is total time for reading both the headers and the response body.
167 # It must be large enough to allow for upload and download of files
168 # of typical size the webdav client supports.
169 DEFAULT_TIMEOUT_READ: float = 300.0
171 # Maximum number of network connections to persist against a single
172 # "host:port" pair. If this endpoint client needs to issue more
173 # simultaneous requests than this number, additional network connections
174 # will be created but won't be persisted after use.
175 DEFAULT_PERSISTENT_CONNECTIONS_PER_HOST: int = 20
177 # Size of the buffer (in mebibytes, i.e. 1024*1024 bytes) the webdav
178 # client of this endpoint will use when sending requests and receiving
179 # responses.
180 DEFAULT_BUFFER_SIZE: int = 5
182 # Size of the block (in mebibytes, i.e. 1024*1024 bytes) the webdav
183 # client of this endpoint will use for making partial reads. Each partial
184 # read will request at least this number of bytes, unless the total size
185 # of the file is lower than this value.
186 DEFAULT_BLOCK_SIZE: int = 1
188 # Number of times to retry requests before failing. Retry happens only
189 # under certain conditions.
190 DEFAULT_RETRIES: int = 4
192 # Minimal and maximal retry backoff (in seconds) for the client to compute
193 # the wait time before retrying a request.
194 # A value in this interval is randomly selected as the backoff factor
195 # every time a request is retried.
196 DEFAULT_RETRY_BACKOFF_MIN: float = 1.0
197 DEFAULT_RETRY_BACKOFF_MAX: float = 3.0
199 # Path to a directory or certificate bundle file where the certificates
200 # of the trusted certificate authorities can be found.
201 # Those certificates will be used by the client of the webdav endpoint
202 # to verify the server's host certificate.
203 # If None, the certificates trusted by the system are used.
204 DEFAULT_TRUSTED_AUTHORITIES: str | None = None
206 # User name and password for the client to authenticate to the server.
207 # If specified, HTTP basic authentication is used on all requests.
208 DEFAULT_USER_NAME: str | None = None
209 DEFAULT_USER_PASSWORD: str | None = None
211 # Path to the client certificate and associated private key the webdav
212 # client must present to the server for authentication purposes.
213 # If None, no client certificate is presented.
214 DEFAULT_USER_CERT: str | None = None
215 DEFAULT_USER_KEY: str | None = None
217 # Token the webdav client must sent to the server for authentication
218 # purposes. The token may be the value of the token itself or the path
219 # to a file where the token can be found.
220 DEFAULT_TOKEN: str | None = None
222 # If this option is set to True, the webdav client attempts to reuse
223 # the network connection to the server as long as possible. Note that
224 # the server can unitaleraly decide to close the connection.
225 # If disabled, the connection is closed after each request.
226 DEFAULT_REUSE_CONNECTION: bool = True
228 # Default checksum algorithm to request the server to compute on every
229 # file upload. Not al servers support this.
230 # See RFC 3230 for details.
231 DEFAULT_REQUEST_CHECKSUM: str | None = None
233 # If this option is set to True, the webdav client can return objects
234 # compliant to the fsspec specification.
235 # See: https://filesystem-spec.readthedocs.io
236 DEFAULT_ENABLE_FSSPEC: bool = True
238 # If this option is set to True, memory usage is computed and reported
239 # when executing in debug mode. Computing memory usage is costly, so only
240 # set this when debugging.
241 DEFAULT_COLLECT_MEMORY_USAGE: bool = False
243 # Accepted checksum algorithms. Must be lowercase.
244 ACCEPTED_CHECKSUMS: list[str] = ["adler32", "md5", "sha-256", "sha-512"]
246 def __init__(self, config: dict | None = None) -> None:
247 if config is None: 247 ↛ 250line 247 didn't jump to line 250 because the condition on line 247 was always true
248 config = {}
250 if (base_url := expand_vars(config.get("base_url"))) is None: 250 ↛ 253line 250 didn't jump to line 253 because the condition on line 250 was always true
251 self._base_url = "_default_"
252 else:
253 self._base_url = normalize_url(base_url, preserve_path=False)
255 self._timeout_connect: float = float(config.get("timeout_connect", DavConfig.DEFAULT_TIMEOUT_CONNECT))
256 self._timeout_read: float = float(config.get("timeout_read", DavConfig.DEFAULT_TIMEOUT_READ))
257 self._persistent_connections_per_host: int = int(
258 config.get(
259 "persistent_connections_per_host",
260 DavConfig.DEFAULT_PERSISTENT_CONNECTIONS_PER_HOST,
261 )
262 )
263 self._buffer_size: int = 1_048_576 * int(config.get("buffer_size", DavConfig.DEFAULT_BUFFER_SIZE))
264 self._block_size: int = 1_048_576 * int(config.get("block_size", DavConfig.DEFAULT_BLOCK_SIZE))
265 self._retries: int = int(config.get("retries", DavConfig.DEFAULT_RETRIES))
266 self._retry_backoff_min: float = float(
267 config.get("retry_backoff_min", DavConfig.DEFAULT_RETRY_BACKOFF_MIN)
268 )
269 self._retry_backoff_max: float = float(
270 config.get("retry_backoff_max", DavConfig.DEFAULT_RETRY_BACKOFF_MAX)
271 )
272 self._trusted_authorities: str | None = expand_vars(
273 config.get("trusted_authorities", DavConfig.DEFAULT_TRUSTED_AUTHORITIES)
274 )
275 self._user_name: str | None = expand_vars(config.get("user_name", DavConfig.DEFAULT_USER_NAME))
276 self._user_password: str | None = expand_vars(
277 config.get("user_password", DavConfig.DEFAULT_USER_PASSWORD)
278 )
279 self._user_cert: str | None = expand_vars(config.get("user_cert", DavConfig.DEFAULT_USER_CERT))
280 self._user_key: str | None = expand_vars(config.get("user_key", DavConfig.DEFAULT_USER_KEY))
281 self._token: str | None = expand_vars(config.get("token", DavConfig.DEFAULT_TOKEN))
282 self._reuse_connection: bool = config.get("reuse_connection", DavConfig.DEFAULT_REUSE_CONNECTION)
283 self._enable_fsspec: bool = config.get("enable_fsspec", DavConfig.DEFAULT_ENABLE_FSSPEC)
284 self._frontend_urls: list[str] = self._init_frontend_urls(config=config)
285 self._collect_memory_usage: bool = config.get(
286 "collect_memory_usage", DavConfig.DEFAULT_COLLECT_MEMORY_USAGE
287 )
288 self._request_checksum: str | None = config.get(
289 "request_checksum", DavConfig.DEFAULT_REQUEST_CHECKSUM
290 )
291 if self._request_checksum is not None: 291 ↛ 292line 291 didn't jump to line 292 because the condition on line 291 was never true
292 self._request_checksum = self._request_checksum.lower()
293 if self._request_checksum not in DavConfig.ACCEPTED_CHECKSUMS:
294 raise ValueError(
295 f"""Value for checksum algorithm {self._request_checksum} for storage endpoint """
296 f"""{self._base_url} is not among the accepted values: {DavConfig.ACCEPTED_CHECKSUMS}"""
297 )
299 def _init_frontend_urls(self, config: dict | None = None) -> list[str]:
300 if config is None: 300 ↛ 301line 300 didn't jump to line 301 because the condition on line 300 was never true
301 return []
303 # Initialize the URLs of the frontend servers, if present in
304 # the configuration.
305 frontend_urls: list[str] = []
306 for url in config.get("frontend_base_urls", []): 306 ↛ 308line 306 didn't jump to line 308 because the loop on line 306 never started
307 # Expand environment variables in this URL
308 if (expanded_url := expand_vars(url)) is not None:
309 frontend_urls.append(normalize_url(expanded_url, preserve_path=False))
311 # Eliminate duplicate URLs.
312 frontend_urls = list(set(frontend_urls))
314 # Check that the scheme of this client's base URL is identical to
315 # the scheme of the frontend server URLs.
316 base_url_scheme = parse_url(self._base_url).scheme
317 for url in frontend_urls: 317 ↛ 318line 317 didn't jump to line 318 because the loop on line 317 never started
318 if base_url_scheme != parse_url(url).scheme:
319 raise ValueError(
320 f"""inconsistent scheme in frontend URL {url} for endpoint """
321 f"""with base URL {self._base_url}"""
322 )
324 return frontend_urls
326 @property
327 def base_url(self) -> str:
328 return self._base_url
330 @property
331 def timeout_connect(self) -> float:
332 return self._timeout_connect
334 @property
335 def timeout_read(self) -> float:
336 return self._timeout_read
338 @property
339 def persistent_connections_per_host(self) -> int:
340 return self._persistent_connections_per_host
342 @property
343 def buffer_size(self) -> int:
344 return self._buffer_size
346 @property
347 def block_size(self) -> int:
348 return self._block_size
350 @property
351 def retries(self) -> int:
352 return self._retries
354 @property
355 def retry_backoff_min(self) -> float:
356 return self._retry_backoff_min
358 @property
359 def retry_backoff_max(self) -> float:
360 return self._retry_backoff_max
362 @property
363 def trusted_authorities(self) -> str | None:
364 return self._trusted_authorities
366 @property
367 def token(self) -> str | None:
368 return self._token
370 @property
371 def reuse_connection(self) -> bool:
372 return self._reuse_connection
374 @property
375 def request_checksum(self) -> str | None:
376 return self._request_checksum
378 @property
379 def user_cert(self) -> str | None:
380 return self._user_cert
382 @property
383 def user_key(self) -> str | None:
384 # If no user certificate was specified in the configuration,
385 # ignore the private key, even if it was provided.
386 if self._user_cert is None:
387 return None
389 # If we have a user certificate but not a private key, assume the
390 # private key is included in the same file as the user certificate.
391 # That is typically the case when using a X.509 grid proxy as
392 # client certificate.
393 return self._user_cert if self._user_key is None else self._user_key
395 @property
396 def user_name(self) -> str | None:
397 return self._user_name
399 @property
400 def user_password(self) -> str | None:
401 return self._user_password
403 @property
404 def enable_fsspec(self) -> bool:
405 return self._enable_fsspec
407 @property
408 def collect_memory_usage(self) -> bool:
409 return self._collect_memory_usage
411 @property
412 def frontend_urls(self) -> list[str]:
413 return self._frontend_urls
416class DavConfigPool:
417 """Registry of configurable settings for all known webDAV endpoints.
419 Parameters
420 ----------
421 filename : `list` [ `str` ]
422 List of environment variables or file names to load the configuration
423 from. The first file found in the list will be read and the
424 configuration settings for all webDAV endpoints will be extracted
425 from it. Other files will be ignored.
427 Each component of `filenames` can be an environment variable or
428 the path of a file which itself can include an environment variable,
429 e.g. '$HOME/path/to/config.yaml'.
431 The configuration file is a YAML file with the structure below:
433 - base_url: "davs://webdav1.example.org:1234/"
434 persistent_connections_per_host: 10
435 timeout_connect: 20.0
436 timeout_read: 120.0
437 retries: 3
438 retry_backoff_min: 1.0
439 retry_backoff_max: 3.0
440 user_cert: "${X509_USER_PROXY}"
441 user_key: "${X509_USER_PROXY}"
442 token: "/path/to/bearer/token/file"
443 trusted_authorities: "/etc/grid-security/certificates"
444 buffer_size: 5
445 enable_fsspec: false
446 request_checksum: "md5"
447 collect_memory_usage: false
449 - base_url: "davs://webdav2.example.org:1234/"
450 user_name: "user"
451 user_password: "password"
452 persistent_connections_per_host: 5
453 reuse_connection: false
454 ...
456 All settings are optional. If no settings are found in the
457 configuration file for a particular webDAV endpoint, sensible
458 defaults will be used.
460 There is only a single instance of this class. This thead-safe
461 singleton is intended to be initialized when the module is imported
462 the first time.
463 """
465 _instance = None
466 _lock = threading.Lock()
468 def __new__(cls, filename: str | None = None) -> DavConfigPool:
469 if cls._instance is None: 469 ↛ 474line 469 didn't jump to line 474 because the condition on line 469 was always true
470 with cls._lock:
471 if cls._instance is None: 471 ↛ 474line 471 didn't jump to line 474
472 cls._instance = super().__new__(cls)
474 return cls._instance
476 def __init__(self, filename: str | None = None) -> None:
477 # Create a default configuration. This configuration is
478 # used when a URL doest not match any of the endpoints in the
479 # configuration.
480 self._default_config: DavConfig = DavConfig()
482 # The key of this dictionary is the URL of the webDAV endpoint,
483 # e.g. "davs://host.example.org:1234/"
484 self._configs: dict[str, DavConfig] = {}
486 # Load the configuration from the file we have been provided with,
487 # if any.
488 if filename is None: 488 ↛ 489line 488 didn't jump to line 489 because the condition on line 488 was never true
489 return
491 # filename can be the name of an environment variable or a path.
492 # A path can include environment variables
493 # (e.g. "$HOME/path/to/config.yaml") or "~"
494 # (e.g. "~/path/to/config.yaml")
495 if (filename := os.getenv(filename)) is not None: 495 ↛ 497line 495 didn't jump to line 497 because the condition on line 495 was never true
496 # Expand environment variables and '~' in the file name, if any.
497 filename = os.path.expandvars(filename)
498 filename = os.path.expanduser(filename)
499 with open(filename) as file:
500 for config_item in yaml.safe_load(file):
501 config = DavConfig(config_item)
502 if config.base_url not in self._configs:
503 self._configs[config.base_url] = config
504 else:
505 # We already have a configuration for the same
506 # endpoint. That is likely a human error in
507 # the configuration file.
508 raise ValueError(
509 f"""configuration file {filename} contains two configurations for """
510 f"""endpoint {config.base_url}"""
511 )
513 def get_config_for_url(self, url: str) -> DavConfig:
514 """Return the configuration to use a webDAV client when interacting
515 with the server which hosts the resource at `url`.
517 Parameters
518 ----------
519 url : `str`
520 URL for which to obtain a configuration.
521 """
522 # Select the configuration for the endpoint of the provided URL.
523 normalized_url: str = normalize_url(url, preserve_path=False)
524 if (config := self._configs.get(normalized_url)) is not None:
525 return config
527 # No config was found for the specified URL. Use the default.
528 return self._default_config
530 def _destroy(self) -> None:
531 """Destroy this class singleton instance.
533 Helper method to be used in tests to reset global configuration.
534 """
535 with DavConfigPool._lock:
536 DavConfigPool._instance = None
539def make_retry(config: DavConfig) -> Retry:
540 """Create a ``urllib3.util.Retry`` object from settings in `config`.
542 Parameters
543 ----------
544 config : `DavConfig`
545 Configurable settings for a webDAV storage endpoint.
547 Returns
548 -------
549 retry : `urllib3.util.Retry`
550 Retry object to he used when creating a ``urllib3.PoolManager``.
551 """
552 backoff_min: float = config.retry_backoff_min
553 backoff_max: float = config.retry_backoff_max
554 retry = Retry(
555 # Total number of retries to allow. Takes precedence over other
556 # counts.
557 total=2 * config.retries,
558 # How many connection-related errors to retry on.
559 connect=config.retries,
560 # How many times to retry on read errors.
561 read=config.retries,
562 # Backoff factor to apply between attempts after the second try
563 # (seconds). Compute a random jitter to prevent all the clients which
564 # started at the same time (even on different hosts) to overwhelm the
565 # server by sending requests at the same time.
566 backoff_factor=backoff_min + (backoff_max - backoff_min) * random.random(),
567 # How many times to retry on bad status codes.
568 status=config.retries,
569 # Set of uppercased HTTP method verbs that we should retry on.
570 # We only automatically retry idempotent requests.
571 allowed_methods=frozenset(
572 [
573 "COPY",
574 "DELETE",
575 "GET",
576 "HEAD",
577 "MKCOL",
578 "OPTIONS",
579 "PROPFIND",
580 "PUT",
581 ]
582 ),
583 # HTTP status codes that we should force a retry on.
584 status_forcelist=frozenset(
585 [
586 HTTPStatus.TOO_MANY_REQUESTS, # 429
587 HTTPStatus.INTERNAL_SERVER_ERROR, # 500
588 HTTPStatus.BAD_GATEWAY, # 502
589 HTTPStatus.SERVICE_UNAVAILABLE, # 503
590 HTTPStatus.GATEWAY_TIMEOUT, # 504
591 ]
592 ),
593 # Whether to respect "Retry-After" header on status codes defined
594 # above.
595 respect_retry_after_header=True,
596 )
597 return retry
600class DavClientPool:
601 """Container of reusable webDAV clients, each one specifically configured
602 to talk to a single storage endpoint.
604 Parameters
605 ----------
606 config_pool : `DavConfigPool`
607 Pool of all known webDAV client configurations.
609 Notes
610 -----
611 There is a single instance of this class. This thead-safe singleton is
612 intended to be initialized when the module is imported the first time.
613 """
615 _instance = None
616 _lock = threading.Lock()
618 def __new__(cls, config_pool: DavConfigPool) -> DavClientPool:
619 if cls._instance is None: 619 ↛ 624line 619 didn't jump to line 624 because the condition on line 619 was always true
620 with cls._lock:
621 if cls._instance is None: 621 ↛ 624line 621 didn't jump to line 624
622 cls._instance = super().__new__(cls)
624 return cls._instance
626 def __init__(self, config_pool: DavConfigPool) -> None:
627 self._config_pool: DavConfigPool = config_pool
629 # The key of this dictionnary is a path-stripped URL of the form
630 # "davs://host.example.org:1234/". The value is a reusable
631 # DavClient to interact with that endpoint.
632 self._clients: dict[str, DavClient] = {}
634 def get_client_for_url(self, url: str) -> DavClient:
635 """Return a client for interacting with the endpoint where `url`
636 is hosted.
638 Parameters
639 ----------
640 url : `str`
641 URL for which to obtain a client.
643 Notes
644 -----
645 The returned client is thread-safe. If a client for that endpoint
646 already exists it is reused, otherwise a new client is created
647 with the appropriate configuration for interacting with the storage
648 endpoint.
649 """
650 # If we already have a client for this endpoint reuse it.
651 url = normalize_url(url, preserve_path=False)
652 if (client := self._clients.get(url)) is not None:
653 return client
655 # No client for this endpoint was found. Create a new one and save it
656 # for serving subsequent requests.
657 with DavClientPool._lock:
658 # If another client was created in the meantime by another thread
659 # reuse it.
660 if (client := self._clients.get(url)) is not None:
661 return client
663 config: DavConfig = self._config_pool.get_config_for_url(url)
664 self._clients[url] = self._make_client(url, config)
666 return self._clients[url]
668 def _make_client(self, url: str, config: DavConfig) -> DavClient:
669 """Make a webDAV client for interacting with the server at `url`."""
670 # Check the server implements webDAV protocol and retrieve its
671 # identity so that we can build a client for that specific
672 # server implementation.
673 client = DavClient(url, config)
674 server_details = client.get_server_details(url)
675 server_id = server_details.get("Server", None)
676 accepts_ranges: bool | str | None = server_details.get("Accept-Ranges", None)
677 if accepts_ranges is not None:
678 accepts_ranges = accepts_ranges == "bytes"
680 if server_id is None:
681 # Create a generic webDAV client
682 return DavClient(url, config, accepts_ranges)
684 if server_id.startswith("dCache/"):
685 # Create a client for a dCache webDAV server
686 return DavClientDCache(url, config, accepts_ranges)
687 elif server_id.startswith("XrootD/"):
688 # Create a client for a XrootD webDAV server
689 return DavClientXrootD(url, config, accepts_ranges)
690 else:
691 # Return a generic webDAV client
692 return DavClient(url, config, accepts_ranges)
694 def _destroy(self) -> None:
695 """Destroy this class singleton instance.
697 Helper method to be used in tests to reset global configuration.
698 """
699 with DavClientPool._lock:
700 DavClientPool._instance = None
703class DavFileSizeCache:
704 """Helper class to cache file sizes of recently uploaded files.
706 Parameters
707 ----------
708 default_timeout : `float`, optional
709 Default validity period, in seconds, of the entries in this cache.
710 The validity period for a specific entry can be specified when the
711 entry is added to the cache (see `update_size` method).
713 Notes
714 -----
715 There is a single instance of this class shared by several `DavClient`
716 objects. This singleton is thread safe.
718 Caching file sizes helps preventing sending requests to the server for
719 retrieving the size of recently uploaded files. This is in particular
720 intended to efficiently serve `Butler` requests for the size of a file it
721 just wrote to the datastore.
722 """
724 _instance = None
725 _lock = threading.Lock()
727 def __new__(cls) -> DavFileSizeCache:
728 if cls._instance is None:
729 with cls._lock:
730 if cls._instance is None:
731 cls._instance = super().__new__(cls)
733 return cls._instance
735 def __init__(self, default_timeout: float = 60.0) -> None:
736 # The key of the cache dictionnary is a URL of the form
737 #
738 # "https://host.example.org:1234/path/to/file".
739 #
740 # The value is a triplet (file_size, last_updated, timeout) where:
741 # - 'file_size' is the size of the file in bytes,
742 # - 'last_updated' is the time when this entry was added to the cache
743 # or last updated, in seconds since epoch,
744 # - 'timeout' is the validity period of this cache entry, in seconds,
745 # understood from the moment the cache entry was created.
746 with DavFileSizeCache._lock:
747 if not hasattr(self, "_cache"):
748 self._default_timeout: float = default_timeout
749 self._cache: dict[str, tuple[int, float, float]] = {}
751 def invalidate(self, url: str) -> None:
752 """Invalidate the cache entry for `url`, if any.
754 Parameters
755 ----------
756 url : `str`
757 URL of the file to invalidate which cache entry must be
758 invalidated.
759 """
760 with DavFileSizeCache._lock:
761 self._cache.pop(url, None)
763 def update_size(self, url: str, size: int | None, timeout: float | None = None) -> None:
764 """Update the cache with an entry for `url` which has a size of `size`
765 bytes. This entry is considered valid for a period of `timeout`
766 seconds from now.
768 Parameters
769 ----------
770 url : `str`
771 URL of the file the size to be cached.
772 size : `size` or `None`, optional
773 Size in bytes of the file at `url`. If this value is `None`, the
774 cache is not modified.
775 timeout : `float` or `None`, optional
776 The validity period, in seconds, this size is to be considered
777 valid. If not specified, the default value specified when this
778 object was created will be used for this cache entry.
779 """
780 if size is None:
781 return
783 timeout = self._default_timeout if timeout is None else timeout
784 with DavFileSizeCache._lock:
785 self._cache[url] = (size, time.time(), timeout)
787 def get_size(self, url: str) -> int | None:
788 """Retrieve the cached valued of the size of file at `url`.
790 Parameters
791 ----------
792 url : `str`
793 URL of the file to retrieve the size for.
795 Returns
796 -------
797 `size`: `int` or `None`
798 The cached value of the size of file at `url` if any value was
799 found in the cache, `None` otherwise.
800 `None` is also returned if there is a cached value but its
801 validity period has expired. In this case, the entry associated to
802 `url` is removed from the cache.
803 """
804 with DavFileSizeCache._lock:
805 if (entry := self._cache.get(url, None)) is None:
806 # There is no entry in the cache for this URL
807 return None
809 # There is an entry in the cache for this URL. Check that
810 # its validity period has not yet expired.
811 size, last_updated, timeout = entry
812 if time.time() <= last_updated + timeout:
813 # This entry is stil valid
814 return size
815 else:
816 # This entry is no longer valid. Remove it from the cache.
817 self._cache.pop(url)
818 return None
821def unexpected_status_error(method: str, url: str, resp: HTTPResponse) -> Exception:
822 """Raise an exception from `resp`.
824 Parameters
825 ----------
826 method : `str`
827 The method name triggering the error.
828 url : `str`
829 The URL that cause the error.
830 resp : `resp`
831 The error response.
832 """
833 message = f"Unexpected response to HTTP request {method} {redact_url(url)}: {resp.status} {resp.reason}"
834 body = resp.data.decode()
835 if len(body) > 0:
836 message += f" [response body: {body}]"
838 return ValueError(message)
841class DavClient:
842 """WebDAV client, configured to talk to a single storage endpoint.
844 Instances of this class are thread-safe.
846 Parameters
847 ----------
848 url : `str`
849 Root URL of the storage endpoint (e.g.
850 "https://host.example.org:1234/").
851 config : `DavConfig`
852 Configuration to initialize this client.
853 accepts_ranges : `bool` | `None`
854 Indicate whether the remote server accepts the ``Range`` header in GET
855 requests.
856 """
858 def __init__(self, url: str, config: DavConfig, accepts_ranges: bool | None = None) -> None:
859 # Lock to protect this client fields from concurrent modification.
860 self._lock = threading.Lock()
862 # Base URL of the server this client will interact with.
863 # It is of the form: "davs://host.example.org:1234/"
864 self._base_url: str = url
866 # Configuration settings for the storage endpoint this client
867 # will interact with.
868 self._config: DavConfig = config
870 # Make the authorizer for this client's requests.
871 self._authorizer: Authorizer | None = self._make_authorizer(config=self._config)
873 # Make the pool manager for this client to use for sending
874 # requests to the server.
875 self._pool_manager: PoolManager = self._make_pool_manager(config=self._config)
877 # Parser of PROPFIND responses.
878 self._propfind_parser: DavPropfindParser = DavPropfindParser()
880 # Does the remote server accept a "Range" header in GET requests?
881 # This field is lazy initialized.
882 self._accepts_ranges: bool | None = accepts_ranges
884 # Can this client use a COPY request to duplicate files within a
885 # single webDAV server?
886 # Subclasses can overwrite this setting according to the server
887 # capabilities and compliance to webDAV RFC.
888 self._can_duplicate: bool = True
890 # Cache to store sizes of files this client has recently uploaded
891 # to the server.
892 self._file_size_cache = DavFileSizeCache()
894 def _make_authorizer(self, config: DavConfig) -> Authorizer | None:
895 # If a token was specified in the configuration settings for this
896 # endpoint, prefer it as the authentication method, even if other
897 # authentication settings were also specified.
898 if config.token is not None:
899 return TokenAuthorizer(token=config.token)
900 elif config.user_name is not None and config.user_password is not None:
901 return BasicAuthorizer(user_name=config.user_name, user_password=config.user_password)
903 return None
905 def _make_pool_manager(self, config: DavConfig) -> PoolManager:
906 # Prepare the trusted authorities certificates
907 ca_certs, ca_cert_dir = None, None
908 if config.trusted_authorities is not None:
909 if os.path.isdir(config.trusted_authorities):
910 ca_cert_dir = config.trusted_authorities
911 elif os.path.isfile(config.trusted_authorities):
912 ca_certs = config.trusted_authorities
913 else:
914 raise FileNotFoundError(
915 f"Trusted authorities file or directory {config.trusted_authorities} does not exist"
916 )
918 # If a token was specified for this endpoint don't use the
919 # <user certificate, private key> pair, even if they were also
920 # specified.
921 user_cert, user_key = None, None
922 if config.token is None:
923 user_cert = config.user_cert
924 user_key = config.user_key
926 # Pool manager for sending requests. Connections in this pool manager
927 # are generally left open by the client but the front-end server may
928 # choose to close them in some specific situations. For instance,
929 # whe serving a PUT request, the front server may redirect to a
930 # backend server and close the network connection making it
931 # unsuable for subsequent requests.
932 #
933 # In addition, the client may also choose to explicitly close the
934 # network connection after receiving a response.
935 return PoolManager(
936 # Number of connection pools to cache before discarding the least
937 # recently used pool. Each connection pool manages network
938 # connections to a single host, so this is basically the number
939 # of "host:port" we persist network connections to.
940 num_pools=200,
941 # Number of connections to the same "host:port" to persist for
942 # later reuse. More than 1 is useful in multithreaded situations.
943 # If more than this number of network connections are needed at
944 # a particular moment, they will be created and discarded after
945 # use.
946 maxsize=config.persistent_connections_per_host,
947 # Retry configuration to use by default with requests sent to
948 # host in the front end.
949 retries=make_retry(config),
950 # Socket timeout in seconds for each individual connection.
951 timeout=Timeout(
952 connect=config.timeout_connect,
953 read=config.timeout_read,
954 ),
955 # Size in bytes of the buffer for reading/writing data from/to
956 # the underlying socket.
957 blocksize=config.buffer_size,
958 # Client certificate and private key for esablishing TLS
959 # connections. If None, no client certificate is sent to the
960 # server. Only relevant for endpoints using secure HTTP protocol.
961 cert_file=user_cert,
962 key_file=user_key,
963 # We require verification of the server certificate.
964 cert_reqs="CERT_REQUIRED",
965 # Directory where the certificates of the trusted certificate
966 # authorities can be found. The contents of that directory
967 # must be as expected by OpenSSL.
968 ca_cert_dir=ca_cert_dir,
969 # Path to a file of concatenated CA certificates in PEM format.
970 ca_certs=ca_certs,
971 )
973 def get_server_details(self, url: str) -> dict[str, str]:
974 """Retrieve the details of the server and check it advertises
975 compliance to class 1 of webDAV protocol.
977 Parameters
978 ----------
979 url : `str`
980 URL to check.
982 Returns
983 -------
984 details: `dic[str, str]`
985 The keys of the returned dictionary can be "Server" and
986 "Accept-Ranges". Any of those keys may not exist in the returned
987 dictionary if the server did not include it in its response.
989 The values are the values of the corresponding
990 headers found in the response to the OPTIONS request.
991 Examples of values for the "Server" header are 'dCache/9.2.4' or
992 'XrootD/v5.7.1'.
993 """
994 # Check that the value "1" is part of the value of the "DAV" header in
995 # the response to an 'OPTIONS' request.
996 #
997 # We don't rely on webDAV locks, so a server complying to class 1 is
998 # enough for our purposes. All webDAV servers must advertise at least
999 # compliance class "1".
1000 #
1001 # Compliance classes are documented in
1002 # http://www.webdav.org/specs/rfc4918.html#dav.compliance.classes
1003 #
1004 # Examples of values for header DAV are:
1005 # DAV: 1, 2
1006 # DAV: 1, <http://apache.org/dav/propset/fs/1>
1007 resp = self.options(url)
1008 if "DAV" not in resp.headers:
1009 raise ValueError(f"Server of {resp.geturl()} does not implement webDAV protocol")
1011 if "1" not in resp.headers.get("DAV").replace(" ", "").split(","):
1012 raise ValueError(
1013 f"Server of {resp.geturl()} does not advertise required compliance to webDAV protocol class 1"
1014 )
1016 # The value of 'Server' header is expected to be of the form
1017 # 'dCache/9.2.4' or 'XrootD/v5.7.1'. Not all servers include such a
1018 # header in their response to an OPTIONS request.
1019 details: dict[str, str] = {}
1020 for header in ("Server", "Accept-Ranges"):
1021 value = resp.headers.get(header, None)
1022 if value is not None:
1023 details[header] = value
1025 return details
1027 def _get_response_url(self, resp: HTTPResponse, default_url: str) -> str:
1028 """Return the URL that response `resp` was obtained from.
1030 If `resp` contains no redirection history, return `default_url`.
1031 """
1032 if resp.retries is None:
1033 return default_url
1035 if len(resp.retries.history) == 0:
1036 return default_url
1038 return str(resp.retries.history[-1].redirect_location)
1040 def _rewrite_url_for_frontend(self, url: str) -> str:
1041 """Return a URL to reach one of the frontend servers that serves
1042 requests sent against `url`.
1044 Parameters
1045 ----------
1046 url : `str`
1047 Target URL.
1049 Returns
1050 -------
1051 url: `str`
1052 URL to reach one of this client's frontend servers. If `url` does
1053 not target this client's frontend servers, the returned value
1054 is `url` unmodified.
1055 """
1056 # Do nothing if this URL does not match this client's base URL. This
1057 # happens, for instance, when `url` is a redirection to a backend
1058 # server, so we don't want to rewrite it.
1059 #
1060 # Also, don't rewrite the URL if we don't have frontends configured
1061 # for this client.
1062 if not self._config.frontend_urls or not url.startswith(self._base_url):
1063 return url
1065 # Randomly select one of the configured frontends and return a modified
1066 # URL which uses the selected frontend instead of the original one.
1067 return random.choice(self._config.frontend_urls) + url.removeprefix(self._base_url)
1069 def _request(
1070 self,
1071 method: str,
1072 url: str,
1073 headers: dict[str, str] | None = None,
1074 body: BinaryIO | bytes | str | None = None,
1075 preload_content: bool = True,
1076 redirect: bool = True,
1077 pool_manager: PoolManager | None = None,
1078 ) -> HTTPResponse:
1079 """Send a generic HTTP request and return the response.
1081 Parameters
1082 ----------
1083 method : `str`
1084 Request method, e.g. 'GET', 'PUT', 'PROPFIND'.
1085 url : `str`
1086 Target URL.
1087 headers : `dict[str, str]`, optional
1088 Headers to sent with the request.
1089 body : `bytes` or `str` or `None`, optional
1090 Request body.
1091 preload_content : `bool`, optional
1092 If True, the response body is downloaded and can be retrieved
1093 via the returned response `.data` property. If False, the
1094 caller needs to call `.read()` on the returned response object to
1095 download the body, either entirely in one call or by chunks.
1096 redirect : `bool`, optional
1097 If True, automatically handle redirects. If False, the returned
1098 response may contain a redirection to another location.
1099 pool_manager : `PoolManager`, optional
1100 Pool manager to use for sending this request. If not provided,
1101 this client's pool manager is used.
1103 Returns
1104 -------
1105 resp: `HTTPResponse`
1106 Response to the request as received from the server.
1107 """
1108 # Retrieve the URL we must use to send this request to one of this
1109 # client's configured frontend servers.
1110 url = self._rewrite_url_for_frontend(url)
1112 # If this client is configured not to reuse the network connection
1113 # with the server, add a "Connection: close" header to this request.
1114 #
1115 # However, if the caller has explicitly specified a "Connection"
1116 # header, whatever its value, don't modify it.
1117 headers = {} if headers is None else dict(headers)
1118 if "Connection" not in headers and not self._config.reuse_connection:
1119 headers.update({"Connection": "close"})
1121 # If an authorizer (basic or token) is configured for this client,
1122 # allow it to set the "Authorization" header to this outgoing request.
1123 if self._authorizer is not None:
1124 self._authorizer.set_authorization(headers)
1126 if log.isEnabledFor(logging.DEBUG):
1127 annotation = ""
1128 if method == "GET" and "Range" in headers:
1129 byte_range = headers.get("Range", "").removeprefix("bytes=")
1130 annotation = f" (byte range: {byte_range})"
1132 log.debug("sending request %s %s%s", method, redact_url(url), annotation)
1134 if pool_manager is None:
1135 pool_manager = self._pool_manager
1137 with time_this(
1138 log,
1139 msg="%s %s",
1140 args=(method, url),
1141 mem_usage=self._config.collect_memory_usage,
1142 mem_unit=u.mebibyte,
1143 ):
1144 return pool_manager.request(
1145 method,
1146 url,
1147 body=body,
1148 headers=headers,
1149 preload_content=preload_content,
1150 redirect=redirect,
1151 )
1153 def _options(
1154 self,
1155 url: str,
1156 headers: dict[str, str] | None = None,
1157 pool_manager: PoolManager | None = None,
1158 ) -> HTTPResponse:
1159 """Send a HTTP OPTIONS request and return the response unmodified.
1161 Parameters
1162 ----------
1163 url : `str`
1164 Target URL.
1165 headers : `dict[str, str]`, optional
1166 Headers to sent with the request.
1167 pool_manager : `PoolManager`, optional
1168 Pool manager to use to send this request.
1170 Returns
1171 -------
1172 resp: `HTTPResponse`
1173 Response to the request as received from the server.
1175 Notes
1176 -----
1177 This method is intended for subclasses to override when needed.
1178 """
1179 return self._request("OPTIONS", url=url, headers=headers, pool_manager=pool_manager)
1181 def _copy(
1182 self,
1183 url: str,
1184 headers: dict[str, str] | None = None,
1185 preload_content: bool = True,
1186 pool_manager: PoolManager | None = None,
1187 ) -> HTTPResponse:
1188 """Send a webDAV COPY request and return the response unmodified.
1190 Parameters
1191 ----------
1192 url : `str`
1193 Target URL.
1194 headers : `dict[str, str]`, optional
1195 Headers to sent with the request.
1196 pool_manager : `PoolManager`, optional
1197 Pool manager to use to send this request.
1199 Notes
1200 -----
1201 This method is intended for subclasses to override when needed.
1202 """
1203 return self._request(
1204 "COPY", url=url, headers=headers, preload_content=preload_content, pool_manager=pool_manager
1205 )
1207 def _delete(
1208 self,
1209 url: str,
1210 headers: dict[str, str] | None = None,
1211 pool_manager: PoolManager | None = None,
1212 ) -> HTTPResponse:
1213 """Send a HTTP DELETE request and return the response unmodified.
1215 Parameters
1216 ----------
1217 url : `str`
1218 Target URL.
1219 headers : `dict[str, str]`, optional
1220 Headers to sent with the request.
1221 pool_manager : `PoolManager`, optional
1222 Pool manager to use to send this request.
1224 Notes
1225 -----
1226 This method is intended for subclasses to override when needed.
1227 """
1228 return self._request("DELETE", url=url, headers=headers, pool_manager=pool_manager)
1230 def _get(
1231 self,
1232 url: str,
1233 headers: dict[str, str] | None = None,
1234 preload_content: bool = True,
1235 redirect: bool = True,
1236 pool_manager: PoolManager | None = None,
1237 ) -> HTTPResponse:
1238 """Send a HTTP GET request and return the response unmodified.
1240 Parameters
1241 ----------
1242 url : `str`
1243 Target URL.
1244 headers : `dict[str, str]`, optional
1245 Headers to sent with the request.
1246 preload_content : `bool`, optional
1247 If True, the response body is downloaded and can be retrieved
1248 via the returned response `.data` property. If False, the
1249 caller needs to call the `.read()` on the returned response
1250 object to download the body.
1251 redirect : `bool`, optional
1252 If True, follow redirections.
1253 pool_manager : `PoolManager`, optional
1254 Pool manager to send the request through.
1256 Returns
1257 -------
1258 resp: `HTTPResponse`
1259 Response to the GET request as received from the server.
1261 Notes
1262 -----
1263 This method is intended for subclasses to override when needed.
1264 """
1265 return self._request(
1266 "GET",
1267 url=url,
1268 headers=headers,
1269 preload_content=preload_content,
1270 redirect=redirect,
1271 pool_manager=pool_manager,
1272 )
1274 def _head(
1275 self,
1276 url: str,
1277 headers: dict[str, str] | None = None,
1278 pool_manager: PoolManager | None = None,
1279 ) -> HTTPResponse:
1280 """Send a HTTP HEAD request and return the response.
1282 Parameters
1283 ----------
1284 url : `str`
1285 Target URL.
1286 headers : `bool`
1287 If the target URL is not found, raise an exception. Otherwise
1288 just return the response.
1289 pool_manager : `PoolManager`, optional
1290 Pool manager to use to send this request.
1292 Notes
1293 -----
1294 This method is intended for subclasses to override when needed.
1295 """
1296 return self._request("HEAD", url=url, headers=headers, pool_manager=pool_manager)
1298 def _mkcol(
1299 self,
1300 url: str,
1301 headers: dict[str, str] | None = None,
1302 pool_manager: PoolManager | None = None,
1303 ) -> HTTPResponse:
1304 """Send a webDAV MKCOL request and return the response unmodified.
1306 Parameters
1307 ----------
1308 url : `str`
1309 Target URL.
1310 headers : `dict[str, str]`, optional
1311 Headers to sent with the request.
1312 pool_manager : `PoolManager`, optional
1313 Pool manager to use to send this request.
1315 Notes
1316 -----
1317 This method is intended for subclasses to override when needed.
1318 """
1319 return self._request("MKCOL", url=url, headers=headers, pool_manager=pool_manager)
1321 def _move(
1322 self,
1323 url: str,
1324 headers: dict[str, str] | None = None,
1325 pool_manager: PoolManager | None = None,
1326 ) -> HTTPResponse:
1327 """Send a webDAV MOVE request and return the response unmodified.
1329 Parameters
1330 ----------
1331 url : `str`
1332 Target URL.
1333 headers : `dict[str, str]`, optional
1334 Headers to sent with the request.
1335 pool_manager : `PoolManager`, optional
1336 Pool manager to use to send this request.
1338 Notes
1339 -----
1340 This method is intended for subclasses to override when needed.
1341 """
1342 return self._request("MOVE", url=url, headers=headers, pool_manager=pool_manager)
1344 def _propfind(
1345 self,
1346 url: str,
1347 headers: dict[str, str] | None = None,
1348 body: str = "",
1349 pool_manager: PoolManager | None = None,
1350 ) -> HTTPResponse:
1351 """Send a webDAV PROPFIND request and return the response unmodified.
1353 Parameters
1354 ----------
1355 url : `str`
1356 Target URL.
1357 headers : `dict[str, str]`, optional
1358 Headers to sent with the request.
1359 body : `str`, optional
1360 Request body.
1361 pool_manager : `PoolManager`, optional
1362 Pool manager to use to send this request.
1364 Notes
1365 -----
1366 This method is intended for subclasses to override when needed.
1367 """
1368 return self._request("PROPFIND", url=url, headers=headers, body=body, pool_manager=pool_manager)
1370 def _put(
1371 self,
1372 url: str,
1373 headers: dict[str, str] | None = None,
1374 body: BinaryIO | bytes = b"",
1375 preload_content: bool = True,
1376 redirect: bool = True,
1377 pool_manager: PoolManager | None = None,
1378 ) -> HTTPResponse:
1379 """Send a HTTP PUT request and return the response unmodified.
1381 Parameters
1382 ----------
1383 url : `str`
1384 Target URL.
1385 headers : `dict[str, str]`, optional
1386 Headers to sent with the request.
1387 body : `BinaryIO` or `bytes`, optional
1388 Request body.
1389 preload_content : `bool`, optional
1390 If True, the response body is downloaded and can be retrieved
1391 via the returned response `.data` property. If False, the
1392 caller needs to call the `.read()` on the returned response
1393 object to download the body.
1394 redirect : `bool`, optional
1395 If True, follow redirections.
1396 pool_manager : `PoolManager`, optional
1397 Pool manager to send the request through.
1399 Returns
1400 -------
1401 resp: `HTTPResponse`
1402 Response to the PUT request as received from the server.
1404 Notes
1405 -----
1406 This method is intended for subclasses to override when needed.
1407 """
1408 return self._request(
1409 "PUT",
1410 url=url,
1411 headers=headers,
1412 body=body,
1413 preload_content=preload_content,
1414 redirect=redirect,
1415 pool_manager=pool_manager,
1416 )
1418 def head(
1419 self,
1420 url: str,
1421 headers: dict[str, str] | None = None,
1422 ) -> HTTPResponse:
1423 """Send a HTTP HEAD request, process and return the response
1424 only if successful.
1426 Parameters
1427 ----------
1428 url : `str`
1429 Target URL.
1430 headers : `bool`
1431 If the target URL is not found, raise an exception. Otherwise
1432 just return the response.
1433 """
1434 headers = {} if headers is None else dict(headers)
1435 resp = self._head(url=url, headers=headers)
1436 match resp.status:
1437 case HTTPStatus.OK:
1438 return resp
1439 case HTTPStatus.NOT_FOUND:
1440 raise FileNotFoundError(f"No file found at {resp.geturl()}")
1441 case _:
1442 raise unexpected_status_error("HEAD", url, resp)
1444 def get(
1445 self,
1446 url: str,
1447 headers: dict[str, str] | None = None,
1448 preload_content: bool = True,
1449 redirect: bool = True,
1450 ) -> tuple[str, HTTPResponse]:
1451 """Send a HTTP GET request.
1453 Parameters
1454 ----------
1455 url : `str`
1456 Target URL.
1457 headers : `dict[str, str]`, optional
1458 Headers to sent with the request.
1459 preload_content : `bool`, optional
1460 If True, the response body is downloaded and can be retrieved
1461 via the returned response `.data` property. If False, the
1462 caller needs to call the `.read()` on the returned response
1463 object to download the body.
1464 redirect : `bool`, optional
1465 If True, follow redirections.
1467 Returns
1468 -------
1469 url: `str`
1470 The URL we used to obtain this response. It may be different from
1471 the URL passed as argument in case of redirection.
1472 resp: `HTTPResponse`
1473 Response to the GET request as received from the server.
1474 """
1475 # Send the GET request to the frontend servers.
1476 headers = {} if headers is None else dict(headers)
1477 resp = self._get(
1478 url,
1479 headers=headers,
1480 preload_content=preload_content,
1481 redirect=redirect,
1482 )
1483 match resp.status:
1484 case HTTPStatus.OK | HTTPStatus.PARTIAL_CONTENT:
1485 return self._get_response_url(resp, default_url=url), resp
1486 case HTTPStatus.NOT_FOUND:
1487 raise FileNotFoundError(f"No file found at {resp.geturl()}")
1488 case status if status in resp.REDIRECT_STATUSES and not redirect:
1489 # This response is a redirection but we are asked not to
1490 # follow redirections, so return this response as is.
1491 return self._get_response_url(resp, default_url=url), resp
1492 case _:
1493 raise unexpected_status_error("GET", url, resp)
1495 def options(
1496 self,
1497 url: str,
1498 headers: dict[str, str] | None = None,
1499 ) -> HTTPResponse:
1500 """Send a HTTP OPTIONS request and return the response on success.
1502 Parameters
1503 ----------
1504 url : `str`
1505 Target URL.
1506 headers : `dict` [`str`, `str`], optional
1507 Headers to sent with the request.
1509 Returns
1510 -------
1511 resp: `HTTPResponse`
1512 Response to the request as received from the server.
1513 """
1514 resp = self._options(url=url, headers=headers)
1515 match resp.status:
1516 case HTTPStatus.OK | HTTPStatus.CREATED:
1517 return resp
1518 case _:
1519 raise unexpected_status_error("OPTIONS", url, resp)
1521 def propfind(
1522 self,
1523 url: str,
1524 headers: dict[str, str] | None = None,
1525 body: str = "",
1526 depth: str = "0",
1527 ) -> HTTPResponse:
1528 """Send a HTTP PROPFIND request and return the unmodified response on
1529 success.
1531 Parameters
1532 ----------
1533 url : `str`
1534 Target URL.
1535 headers : `dict[str, str]`, optional
1536 Headers to sent with the request.
1537 body : `str`, optional
1538 Request body.
1539 depth : `str`, optional
1540 ???.
1541 """
1542 headers = {} if headers is None else dict(headers)
1543 headers.update(
1544 {
1545 "Depth": depth,
1546 "Content-Type": 'application/xml; charset="utf-8"',
1547 "Content-Length": str(len(body)),
1548 }
1549 )
1550 resp = self._propfind(url=url, headers=headers, body=body)
1551 match resp.status:
1552 case HTTPStatus.MULTI_STATUS | HTTPStatus.NOT_FOUND:
1553 return resp
1554 case _:
1555 raise unexpected_status_error("PROPFIND", url, resp)
1557 def put(
1558 self,
1559 url: str,
1560 headers: dict[str, str] | None = None,
1561 data: BinaryIO | bytes = b"",
1562 ) -> int | None:
1563 """Send a HTTP PUT request.
1565 Parameters
1566 ----------
1567 url : `str`
1568 Target URL.
1569 headers : `dict[str, str]`, optional
1570 Headers to sent with the request.
1571 data : `BinaryIO` or `bytes`
1572 Request body.
1574 Returns
1575 -------
1576 size : `int | None`
1577 The size in bytes of the file uploaded. Can be `None` if the size
1578 could not be retrieved.
1579 """
1580 # Send a PUT request with empty body and handle redirection. This
1581 # is useful if the server redirects us; since we cannot rewind the
1582 # data we are uploading, we don't start uploading data until we
1583 # connect to the server that will actually serve our request.
1584 frontend_headers = {} if headers is None else dict(headers)
1585 frontend_headers.update({"Content-Length": "0"})
1586 resp = self._put(url, headers=frontend_headers, body=b"", redirect=False)
1587 match resp.status:
1588 case HTTPStatus.OK | HTTPStatus.CREATED | HTTPStatus.NO_CONTENT:
1589 redirect_url = url
1590 case status if status in resp.REDIRECT_STATUSES:
1591 redirect_url = resp.headers.get("Location")
1592 case _:
1593 raise unexpected_status_error("PUT", url, resp)
1595 # We may have been redirectred. Upload the file contents to
1596 # its final destination.
1598 # Ask the server to compute and record a checksum of the uploaded
1599 # file contents, for later integrity checks. Since we don't compute
1600 # the digest ourselves while uploading the data, we cannot control
1601 # after the request is complete that the data we uploaded is
1602 # identical to the data recorded by the server, but at least the
1603 # server has recorded a digest of the data it stored.
1604 #
1605 # See RFC-3230 for details and
1606 # https://www.iana.org/assignments/http-dig-alg/http-dig-alg.xhtml
1607 # for the list of supported digest algorithhms.
1608 #
1609 # In addition, note that not all servers implement this RFC so
1610 # the checksum reqquest may be ignored by the server.
1611 backend_headers = {} if headers is None else dict(headers)
1612 if (checksum := self._config.request_checksum) is not None:
1613 backend_headers.update({"Want-Digest": checksum})
1615 resp = self._put(redirect_url, body=data, headers=backend_headers)
1616 match resp.status:
1617 case HTTPStatus.OK | HTTPStatus.CREATED | HTTPStatus.NO_CONTENT:
1618 # Send a HEAD request to retrieve the size of the file we
1619 # just uploaded
1620 resp = self.head(redirect_url)
1621 size = int(resp.headers.get("Content-Length", -1))
1622 return None if size == -1 else size
1623 case _:
1624 raise unexpected_status_error("PUT", redirect_url, resp)
1626 def _get_temporary_basename(self, basename: str, prefix: str) -> str:
1627 """Return a basename for a temporary file."""
1628 unique_id = str(uuid.uuid4())
1629 return f"{prefix}.{unique_id}.{basename}"
1631 def _split_parent_and_basename(self, url: str) -> tuple[str, str]:
1632 """Return the URL of the parent directory and the basename from
1633 `url`.
1634 """
1635 parsed: Url = parse_url(url)
1636 normalized_path = normalize_path(parsed.path)
1637 parent_path = posixpath.dirname(normalized_path)
1638 basename = posixpath.basename(normalized_path)
1639 parent_url = Url(
1640 scheme=parsed.scheme,
1641 auth=parsed.auth,
1642 host=parsed.host,
1643 port=parsed.port,
1644 path=parent_path,
1645 query=parsed.query,
1646 fragment=parsed.fragment,
1647 ).url
1648 return parent_url, basename
1650 def _parent(self, url: str) -> str:
1651 """Return the URL of the parent directory to `url`."""
1652 parent_url, _ = self._split_parent_and_basename(url)
1653 return parent_url
1655 def _make_temporary_url(self, url: str, prefix: str = ".tmp") -> str:
1656 """Return the URL of a temporary file based on `url`."""
1657 parent_url, basename = self._split_parent_and_basename(url)
1658 temporary_basename = self._get_temporary_basename(basename=basename, prefix=prefix)
1659 return f"{parent_url}/{temporary_basename}"
1661 def exists(self, url: str) -> bool:
1662 """Return True if a file or directory exists at `url`.
1664 Parameters
1665 ----------
1666 url : `str`
1667 Target URL.
1669 Returns
1670 -------
1671 result: `bool`
1672 True if there is an object at `url`.
1673 """
1674 return self.stat(url).exists
1676 def size(self, url: str) -> int:
1677 """Return the size in bytes of resource at `url`.
1679 If `url` designates a directory, the size is zero.
1681 Parameters
1682 ----------
1683 url : `str`
1684 Target URL.
1686 Returns
1687 -------
1688 size: `int`
1689 The number of bytes of the resource located at `url`.
1690 """
1691 # Check if we have the size of this URL in our cache
1692 if (size := self._file_size_cache.get_size(url)) is not None:
1693 return size
1695 stat = self.stat(url)
1696 if not stat.exists:
1697 raise FileNotFoundError(f"No file or directory found at {url}")
1698 else:
1699 return stat.size
1701 def is_dir(self, url: str) -> bool:
1702 """Return True if a directory exists at `url`.
1704 Parameters
1705 ----------
1706 url : `str`
1707 Target URL.
1709 Returns
1710 -------
1711 result: `bool`
1712 True if there is a directory at `url`.
1713 """
1714 return self.stat(url).is_dir
1716 def mkcol(self, url: str) -> None:
1717 """Create a directory at `url`.
1719 If a directory already exists at `url` no error is returned nor
1720 exception is raised. An exception is raised if a file exists at `url`.
1722 Parameters
1723 ----------
1724 url : `str`
1725 Target URL.
1726 """
1727 resp = self._mkcol(url=url)
1728 match resp.status:
1729 case HTTPStatus.CREATED | HTTPStatus.METHOD_NOT_ALLOWED:
1730 return
1731 case HTTPStatus.CONFLICT:
1732 # The parent directory does not exist. Create it first except
1733 # if the parent's path is "/".
1734 parent = self._parent(url)
1735 if not parent.endswith("/"):
1736 self.mkcol(parent)
1737 resp = self._mkcol(url=url)
1738 case _:
1739 raise ValueError(
1740 f"Can not create directory {resp.geturl()}: status {resp.status} {resp.reason}"
1741 )
1743 def stat(self, url: str) -> DavFileMetadata:
1744 """Return some properties of file or directory located at `url`.
1746 Parameters
1747 ----------
1748 url : `str`
1749 Target URL.
1751 Returns
1752 -------
1753 result: `DavResourceMetadata`
1754 Details of the resources at `url`. If no resource was found at
1755 that URL no exception is raised. Instead the returned details allow
1756 for detecting that the resource does not exist.
1758 The returned value should include fields to determine
1759 if there is a file or a directory at that `url` and if so, its
1760 size and kind (file or directory). Other fields may also be
1761 included depending on the implementation of the webDAV protocol
1762 by the server.
1763 """
1764 # Request the minimum set of DAV properties.
1765 body = (
1766 """<?xml version="1.0" encoding="utf-8"?>"""
1767 """<D:propfind xmlns:D="DAV:">"""
1768 """<D:prop>"""
1769 """<D:resourcetype/>"""
1770 """<D:getcontentlength/>"""
1771 """<D:getlastmodified/>"""
1772 """</D:prop>"""
1773 """</D:propfind>"""
1774 )
1775 resp = self.propfind(url, body=body, depth="0")
1776 match resp.status:
1777 case HTTPStatus.NOT_FOUND:
1778 href = url.replace(self._base_url, "", 1)
1779 return DavFileMetadata(base_url=self._base_url, href=href)
1780 case HTTPStatus.MULTI_STATUS:
1781 property = self._propfind_parser.parse(resp.data)[0]
1782 return DavFileMetadata.from_property(base_url=self._base_url, property=property)
1783 case _:
1784 raise unexpected_status_error("PROPFIND", url, resp)
1786 def info(self, url: str, name: str | None = None) -> dict[str, Any]:
1787 """Return the details about the file or directory at `url`.
1789 Parameters
1790 ----------
1791 url : `str`
1792 Target URL.
1793 name : `str`
1794 Name of the object to be included in the returned value. If None,
1795 the `url` is used as name.
1797 Returns
1798 -------
1799 result: `dict`
1800 For an existing file, the returned value has the form:
1802 .. code-block:: json
1804 {
1805 "name": name,
1806 "size": 1234,
1807 "type": "file",
1808 "last_modified":
1809 datetime.datetime(2025, 4, 10, 15, 12, 51, 227854),
1810 "checksums": {
1811 "adler32": "0fc5f83f",
1812 "md5": "1f57339acdec099c6c0a41f8e3d5fcd0",
1813 }
1814 }
1816 For an existing directory, the returned value has the form:
1818 .. code-block:: json
1820 {
1821 "name": name,
1822 "size": 0,
1823 "type": "directory",
1824 "last_modified":
1825 datetime.datetime(2025, 4, 10, 15, 12, 51, 227854),
1826 "checksums": {},
1827 }
1829 For a non-existing file or directory, the returned value has the
1830 form:
1832 .. code-block:: json
1834 {
1835 "name": name,
1836 "size": None,
1837 "type": None,
1838 "last_modified": datetime.datetime(1, 1, 1, 0, 0),
1839 "checksums": {},
1840 }
1842 Notes
1843 -----
1844 The format of the returned directory is inspired and compatible with
1845 `fsspec`.
1847 The size of existing directories is always zero. The `checksums`
1848 dictionary is empty for directories and may be empty for files if the
1849 server does not compute and store the checksum of the files it stores.
1850 """
1851 result: dict[str, Any] = {
1852 "name": name if name is not None else url,
1853 "type": None,
1854 "size": None,
1855 "last_modified": datetime.min,
1856 "checksums": {},
1857 }
1858 metadata = self.stat(url)
1859 if not metadata.exists:
1860 return result
1862 result.update(
1863 {
1864 "type": "directory" if metadata.is_dir else "file",
1865 "size": metadata.size,
1866 "last_modified": metadata.last_modified,
1867 "checksums": metadata.checksums,
1868 }
1869 )
1870 return result
1872 def move(self, source_url: str, destination_url: str, overwrite: bool = False) -> HTTPResponse:
1873 """Send a webDAV MOVE request and return the response unmodified.
1875 Parameters
1876 ----------
1877 source_url : `str`
1878 Source URL.
1879 destination_url : `str`
1880 Destination URL.
1881 overwrite : `bool`, optional
1882 Overwrite the destination if it exists.
1884 Returns
1885 -------
1886 resp : `HTTPResponse`
1887 The unmodified response received from the server.
1888 """
1889 headers = {
1890 "Destination": destination_url,
1891 "Overwrite": "T" if overwrite else "F",
1892 }
1893 return self._move(source_url, headers=headers)
1895 def read_dir(self, url: str) -> list[DavFileMetadata]:
1896 """Return the properties of the files or directories contained in
1897 directory located at `url`.
1899 If `url` designates a file, only the details of itself are returned.
1901 Parameters
1902 ----------
1903 url : `str`
1904 Target URL.
1906 Returns
1907 -------
1908 result: `list[DavResourceMetadata]`
1909 List of details of each file or directory within `url`.
1910 """
1911 body = (
1912 """<?xml version="1.0" encoding="utf-8"?>"""
1913 """<D:propfind xmlns:D="DAV:"><D:prop>"""
1914 """<D:resourcetype/><D:getcontentlength/><D:getlastmodified/><D:displayname/>"""
1915 """</D:prop></D:propfind>"""
1916 )
1917 resp = self.propfind(url, body=body, depth="1")
1918 match resp.status:
1919 case HTTPStatus.MULTI_STATUS:
1920 pass
1921 case HTTPStatus.NOT_FOUND:
1922 raise FileNotFoundError(f"No directory found at {resp.geturl()}")
1923 case _:
1924 raise unexpected_status_error("PROPFIND", url, resp)
1926 if (path := parse_url(url).path) is not None:
1927 this_dir_href = path.rstrip("/") + "/"
1928 else:
1929 this_dir_href = "/"
1931 result = []
1932 for property in self._propfind_parser.parse(resp.data):
1933 # Don't include in the results the metadata of the directory we
1934 # traversing.
1935 # Some webDAV servers do not append a "/" to the href of a
1936 # directory in their response to PROPFIND, so we must take into
1937 # account that.
1938 if property.is_file:
1939 result.append(DavFileMetadata.from_property(base_url=self._base_url, property=property))
1940 elif property.is_dir and property.href != this_dir_href:
1941 result.append(DavFileMetadata.from_property(base_url=self._base_url, property=property))
1943 return result
1945 def read(self, url: str) -> tuple[str, bytes]:
1946 """Download the contents of file located at `url`.
1948 Parameters
1949 ----------
1950 url : `str`
1951 Target URL.
1953 Returns
1954 -------
1955 url: `str`
1956 Backend URL from which the data was obtained.
1957 data: `bytes`
1958 Contents of the file.
1960 Notes
1961 -----
1962 The caller must ensure that the resource at `url` is a file, not
1963 a directory.
1964 """
1965 backend_url, resp = self.get(url)
1966 return backend_url, resp.data
1968 def read_range(
1969 self,
1970 url: str,
1971 start: int,
1972 end: int | None,
1973 headers: dict[str, str] | None = None,
1974 release_backend: bool = True,
1975 ) -> tuple[str, bytes]:
1976 """Download partial content of file located at `url`.
1978 Parameters
1979 ----------
1980 url : `str`
1981 Target URL.
1982 start : `int`
1983 Starting byte offset of the range to download.
1984 end : `int`
1985 Ending byte offset of the range to download.
1986 headers : `dict[str,str]`, optional
1987 Specific headers to sent with the GET request.
1988 release_backend : `bool`, optional
1989 Whether or not to close the connection to the backend.
1991 Returns
1992 -------
1993 backend_url: `str`
1994 URL used to retrieve this data. If the server redirected us
1995 this is the URL we were redirected to.
1996 data: `bytes`
1997 Partial contents of the file.
1999 Notes
2000 -----
2001 The caller must ensure that the resource at `url` is a file, not
2002 a directory. This is important because some webDAV servers respond
2003 with an HTML document when asked for reading a directory.
2004 """
2005 range_headers = {"Accept-Encoding": "identity"}
2006 if end is None:
2007 range_headers.update({"Range": f"bytes={start}-"})
2008 else:
2009 range_headers.update({"Range": f"bytes={start}-{end}"})
2011 frontend_headers = {} if headers is None else dict(headers)
2012 frontend_headers.update(range_headers)
2014 # Send the request to the frontend server and don't follow
2015 # redirections automatically. We need to be able to add a
2016 # "Connection: close" request header when sending the request to the
2017 # backend server (if any) if are requested to. We don't send that
2018 # header to the frontend.
2019 final_url, resp = self.get(url, headers=frontend_headers, redirect=False)
2020 match resp.status:
2021 case HTTPStatus.PARTIAL_CONTENT:
2022 return final_url, resp.data
2023 case status if status not in resp.REDIRECT_STATUSES:
2024 raise unexpected_status_error("GET (with 'Range' header)", url, resp)
2025 case _:
2026 pass
2028 # We were redirected to a backend server. Follow the redirection and
2029 # if requested add a "Connection: close" header to explicitly release
2030 # the backend server.
2031 redirect_url = resp.headers.get("Location")
2032 log.debug("GET request to %s got redirected to %s", url, redirect_url)
2034 backend_headers = {} if headers is None else dict(headers)
2035 backend_headers.update(range_headers)
2036 backend_headers.update({"Connection": "close" if release_backend else "keep-alive"})
2038 final_url, resp = self.get(redirect_url, headers=backend_headers)
2039 match resp.status:
2040 case HTTPStatus.PARTIAL_CONTENT:
2041 return final_url, resp.data
2042 case _:
2043 raise unexpected_status_error("GET (with 'Range' header)", redirect_url, resp)
2045 def _write_response_body_to_file(self, resp: HTTPResponse, filename: str, chunk_size: int) -> int:
2046 """Write the the response body to a local file.
2048 Parameters
2049 ----------
2050 resp : `HTTPResponse`
2051 The HTTP Response to read the body from.
2052 filename : `str`
2053 Local file to write the content to. If the file already exists,
2054 it will be rewritten.
2055 chunk_size : `int`
2056 Size of the chunks to write to `filename`.
2058 Returns
2059 -------
2060 count: `int`
2061 Number of bytes written to `filename`.
2062 """
2063 try:
2064 # Read the response body into a pre-allocated memory buffer and
2065 # write the buffer content to the destination file avoiding
2066 # copies if possible.
2067 content_length = 0
2068 with open(filename, "wb", buffering=0) as file:
2069 view = memoryview(bytearray(chunk_size))
2070 while True:
2071 if (count := resp.readinto(view)) > 0: # type: ignore
2072 content_length += count
2073 file.write(view[:count])
2074 else:
2075 break
2077 # Check that the expected and actual content lengths match.
2078 # Perform this check only when the body of the response was not
2079 # encoded by the server.
2080 expected_length: int = int(resp.headers.get("Content-Length", -1))
2081 if (
2082 "Content-Encoding" not in resp.headers
2083 and expected_length != -1
2084 and expected_length != content_length
2085 ):
2086 raise ValueError(
2087 f"Size of downloaded file does not match value in Content-Length header for "
2088 f"{resp.geturl()}: expecting {expected_length} and got {content_length} bytes"
2089 )
2091 return content_length
2092 finally:
2093 # Release the connection
2094 resp.drain_conn()
2095 resp.release_conn()
2097 def download(self, url: str, filename: str, chunk_size: int) -> int:
2098 """Download the content of a file and write it to local file.
2100 Parameters
2101 ----------
2102 url : `str`
2103 Target URL.
2104 filename : `str`
2105 Local file to write the content to. If the file already exists,
2106 it will be rewritten.
2107 chunk_size : `int`
2108 Size of the chunks to write to `filename`.
2110 Returns
2111 -------
2112 count: `int`
2113 Number of bytes written to `filename`.
2115 Notes
2116 -----
2117 The caller must ensure that the resource at `url` is a file, not
2118 a directory.
2119 """
2120 _, resp = self.get(url, preload_content=False)
2121 return self._write_response_body_to_file(resp, filename, chunk_size)
2123 def write(self, url: str, data: BinaryIO | bytes) -> int | None:
2124 """Create or rewrite a remote file at `url` with `data` as its
2125 contents.
2127 Parameters
2128 ----------
2129 url : `str`
2130 Target URL.
2131 data : `bytes`
2132 Sequence of bytes to upload.
2134 Returns
2135 -------
2136 size : `int | None`
2137 The size in bytes of the file uploaded. Can be `None` if the size
2138 could not be retrieved.
2140 Notes
2141 -----
2142 If a file already exists at `url` it will be rewritten.
2143 """
2144 # According to RFC 4918, the parent directory of the file must
2145 # exist before we can write to it. So create it first and then
2146 # upload.
2147 self.mkcol(self._parent(url))
2149 try:
2150 # Upload to a temporary file and rename to the final name.
2151 temporary_url = self._make_temporary_url(url)
2152 size = self.put(temporary_url, data=data)
2153 self.rename(temporary_url, url, overwrite=True, create_parent=False)
2155 # Update the file size cache with this size
2156 self._file_size_cache.update_size(url, size)
2157 return size
2158 except Exception:
2159 # Upload failed. Attempt to remove the temporary file.
2160 self.delete(temporary_url)
2161 raise
2163 def checksums(self, url: str) -> dict[str, str]:
2164 """Return the checksums of the contents of file located at `url`.
2166 The checksums are retrieved from the storage endpoint. There may be
2167 none if the storage endpoint does not automatically expose the
2168 checksums it computes.
2170 Parameters
2171 ----------
2172 url : `str`
2173 Target URL.
2175 Returns
2176 -------
2177 checksums: `dict[str, str]`
2178 A file exists at `url`.
2179 The key of the dictionary is the lowercased name of the checksum
2180 algorithm (e.g. "md5", "adler32"). The value is the lowercased
2181 checksum itself (e.g. "78441cec2479ec8b545c4d6699f542da").
2182 """
2183 stat = self.stat(url)
2184 if not stat.exists:
2185 raise FileNotFoundError(f"No file found at {url}")
2187 return stat.checksums if stat.is_file else {}
2189 def delete(self, url: str) -> None:
2190 """Delete the file or directory at `url`.
2192 If there is no file or directory at `url` is not considered an error.
2194 Parameters
2195 ----------
2196 url : `str`
2197 Target URL.
2199 Notes
2200 -----
2201 If `url` designates a directory, some webDAV servers recursively
2202 remove the directory and its contents. Others, only remove the
2203 directory if it is empty.
2205 For a consisten behavior, the caller must check what kind of object
2206 the target URL is and walk the hierarchy removing all objects.
2207 """
2208 resp = self._delete(url)
2209 match resp.status:
2210 case HTTPStatus.OK | HTTPStatus.ACCEPTED | HTTPStatus.NO_CONTENT | HTTPStatus.NOT_FOUND:
2211 # Invalidate the entry for this file in our cache, if any
2212 self._file_size_cache.invalidate(url)
2213 case _:
2214 raise ValueError(
2215 f"Unable to delete resource {resp.geturl()}: status {resp.status} {resp.reason}"
2216 )
2218 def accepts_ranges(self, url: str) -> bool:
2219 """Return `True` if the server supports a 'Range' header in
2220 GET requests against `url`.
2222 Parameters
2223 ----------
2224 url : `str`
2225 Target URL.
2226 """
2227 # If we have already determined that the server accepts "Range" for
2228 # another URL, we assume that it implements that feature for any
2229 # file it serves, so reuse that information.
2230 if self._accepts_ranges is not None:
2231 return self._accepts_ranges
2233 with self._lock:
2234 if self._accepts_ranges is None:
2235 self._accepts_ranges = self.head(url).headers.get("Accept-Ranges", "") == "bytes"
2237 return self._accepts_ranges
2239 @property
2240 def supports_duplicate(self) -> bool:
2241 """Return True if the server this client interacts with implements
2242 webDAV COPY method.
2243 """
2244 return self._can_duplicate
2246 def copy(self, source_url: str, destination_url: str, overwrite: bool = False) -> None:
2247 """Copy the file at `source_url` to `destination_url` in the same
2248 storage endpoint.
2250 Parameters
2251 ----------
2252 source_url : `str`
2253 URL of the source file.
2254 destination_url : `str`
2255 URL of the destination file. Its parent directory must exist.
2256 overwrite : `bool`
2257 If True and a file exists at `destination_url` it will be
2258 overwritten. Otherwise an exception is raised.
2259 """
2260 headers = {
2261 "Destination": destination_url,
2262 "Overwrite": "T" if overwrite else "F",
2263 }
2264 resp = self._copy(source_url, headers=headers)
2265 match resp.status:
2266 case HTTPStatus.CREATED | HTTPStatus.NO_CONTENT:
2267 self._file_size_cache.invalidate(destination_url)
2268 case _:
2269 raise ValueError(
2270 f"Could not copy {resp.geturl()} to {destination_url}: status {resp.status} {resp.reason}"
2271 )
2273 def duplicate(self, source_url: str, destination_url: str, overwrite: bool = False) -> None:
2274 """Copy the file at `source_url` to `destination_url` in the same
2275 storage endpoint.
2277 Parameters
2278 ----------
2279 source_url : `str`
2280 URL of the source file.
2281 destination_url : `str`
2282 URL of the destination file. Its parent directory is created if
2283 necessary.
2284 overwrite : `bool`
2285 If True and a file exists at `destination_url` it will be
2286 overwritten. Otherwise an exception is raised.
2287 """
2288 # Check the source is a file
2289 if self.is_dir(source_url):
2290 raise NotImplementedError(f"copy is not implemented for directory {source_url}")
2292 # Create the destination's parent directory first because COPY may
2293 # fail if it does not exist, depending on the server implementation
2294 # of RFC 4918.
2295 destination_parent = self._parent(destination_url)
2296 self.mkcol(destination_parent)
2297 self.copy(source_url=source_url, destination_url=destination_url, overwrite=overwrite)
2299 def rename(
2300 self,
2301 source_url: str,
2302 destination_url: str,
2303 overwrite: bool = False,
2304 create_parent: bool = True,
2305 ) -> None:
2306 """Rename (move) the file at `source_url` to `destination_url` in the
2307 same storage endpoint.
2309 Parameters
2310 ----------
2311 source_url : `str`
2312 URL of the source file.
2313 destination_url : `str`
2314 URL of the destination file. Its parent directory must exist.
2315 overwrite : `bool`, optional
2316 If True and a file exists at `destination_url` it will be
2317 overwritten. Otherwise an exception is raised.
2318 create_parent : `bool`, optional
2319 Whether to create the parent.
2320 """
2321 # Create the destination's parent directory first because MOVE may
2322 # fail if it does not exist, depending on the server implementation
2323 # of RFC 4918.
2324 if create_parent:
2325 destination_parent = self._parent(destination_url)
2326 self.mkcol(destination_parent)
2328 resp = self.move(source_url=source_url, destination_url=destination_url, overwrite=overwrite)
2329 match resp.status:
2330 case HTTPStatus.OK | HTTPStatus.CREATED | HTTPStatus.NO_CONTENT:
2331 self._file_size_cache.invalidate(destination_url)
2332 case _:
2333 raise ValueError(
2334 f"""Could not move file {resp.geturl()} to {destination_url}: status {resp.status} """
2335 f"""{resp.reason}"""
2336 )
2338 def generate_presigned_get_url(self, url: str, expiration_time_seconds: int) -> str:
2339 """Return a pre-signed URL that can be used to retrieve this resource
2340 using an HTTP GET without supplying any access credentials.
2342 Parameters
2343 ----------
2344 url : `str`
2345 Target URL.
2346 expiration_time_seconds : `int`
2347 Number of seconds until the generated URL is no longer valid.
2349 Returns
2350 -------
2351 url : `str`
2352 HTTP URL signed for GET.
2353 """
2354 raise NotImplementedError(f"URL signing is not supported by server for {self}")
2356 def generate_presigned_put_url(self, url: str, expiration_time_seconds: int) -> str:
2357 """Return a pre-signed URL that can be used to upload a file to this
2358 path using an HTTP PUT without supplying any access credentials.
2360 Parameters
2361 ----------
2362 url : `str`
2363 Target URL.
2364 expiration_time_seconds : `int`
2365 Number of seconds until the generated URL is no longer valid.
2367 Returns
2368 -------
2369 url : `str`
2370 HTTP URL signed for PUT.
2371 """
2372 raise NotImplementedError(f"URL signing is not supported by server for {self}")
2375class ActivityCaveat(enum.Enum):
2376 """Helper class for enumerating accepted activity caveats for requesting
2377 macaroons for dCache or XRootD webDAV servers.
2378 """
2380 DOWNLOAD = 1
2381 UPLOAD = 2
2384class DavClientURLSigner(DavClient):
2385 """WebDAV client which supports signing of URL for upload and download.
2387 Instances of this class are thread-safe.
2389 Parameters
2390 ----------
2391 url : `str`
2392 Root URL of the storage endpoint
2393 (e.g. "https://host.example.org:1234/").
2394 config : `DavConfig`
2395 Configuration to initialize this client.
2396 accepts_ranges : `bool` | `None`
2397 Indicate whether the remote server accepts the ``Range`` header in GET
2398 requests.
2399 """
2401 def __init__(self, url: str, config: DavConfig, accepts_ranges: bool | None = None) -> None:
2402 super().__init__(url=url, config=config, accepts_ranges=accepts_ranges)
2404 def generate_presigned_get_url(self, url: str, expiration_time_seconds: int) -> str:
2405 """Return a pre-signed URL that can be used to retrieve the resource
2406 at `url` using an HTTP GET without supplying any access credentials.
2408 Parameters
2409 ----------
2410 url : `str`
2411 URL of an existing file.
2412 expiration_time_seconds : `int`
2413 Number of seconds until the generated URL is no longer valid.
2415 Returns
2416 -------
2417 url : `str`
2418 HTTP URL signed for GET.
2420 Notes
2421 -----
2422 Although the returned URL allows for downloading the file at `url`
2423 without supplying credentials, the HTTP client must be configured
2424 to accept the certificate the server will present if the client wants
2425 validate it. The server's certificate may be issued by a certificate
2426 authority unknown to the client.
2427 """
2428 macaroon: str = self._get_macaroon(url, ActivityCaveat.DOWNLOAD, expiration_time_seconds)
2429 return f"{url}?authz={macaroon}"
2431 def generate_presigned_put_url(self, url: str, expiration_time_seconds: int) -> str:
2432 """Return a pre-signed URL that can be used to upload a file to `url`
2433 using an HTTP PUT without supplying any access credentials.
2435 Parameters
2436 ----------
2437 url : `str`
2438 URL of an existing file.
2439 expiration_time_seconds : `int`
2440 Number of seconds until the generated URL is no longer valid.
2442 Returns
2443 -------
2444 url : `str`
2445 HTTP URL signed for PUT.
2447 Notes
2448 -----
2449 Although the returned URL allows for uploading a file to `url`
2450 without supplying credentials, the HTTP client must be configured
2451 to accept the certificate the server will present if the client wants
2452 validate it. The server's certificate may be issued by a certificate
2453 authority unknown to the client.
2454 """
2455 macaroon: str = self._get_macaroon(url, ActivityCaveat.UPLOAD, expiration_time_seconds)
2456 return f"{url}?authz={macaroon}"
2458 def _get_macaroon(self, url: str, activity: ActivityCaveat, expiration_time_seconds: int) -> str:
2459 """Return a macaroon for uploading or downloading the file at `url`.
2461 Parameters
2462 ----------
2463 url : `str`
2464 URL of an existing file.
2465 activity : `ActivityCaveat`
2466 the activity the macaroon is requested for.
2467 expiration_time_seconds : `int`
2468 Requested duration of the macaroon, in seconds.
2470 Returns
2471 -------
2472 macaroon : `str`
2473 Macaroon to be used with `url` in a GET or PUT request.
2474 """
2475 # dCache and XRootD webDAV servers support delivery of macaroons.
2476 #
2477 # For details about dCache macaroons see:
2478 # https://www.dcache.org/manuals/UserGuide-9.2/macaroons.shtml
2479 match activity:
2480 case ActivityCaveat.DOWNLOAD:
2481 activity_caveat = "DOWNLOAD,LIST"
2482 case ActivityCaveat.UPLOAD:
2483 activity_caveat = "UPLOAD,LIST,DELETE,MANAGE"
2485 # Retrieve a macaroon for the requested activities and duration
2486 headers = {"Content-Type": "application/macaroon-request"}
2487 body = {
2488 "caveats": [
2489 f"activity:{activity_caveat}",
2490 ],
2491 "validity": f"PT{expiration_time_seconds}S",
2492 }
2493 resp = self._request("POST", url, headers=headers, body=json.dumps(body))
2494 if resp.status != HTTPStatus.OK:
2495 raise ValueError(
2496 f"Could not retrieve a macaroon for URL {resp.geturl()}, status: {resp.status} {resp.reason}"
2497 )
2499 # We are expecting the body of the response to be formatted in JSON.
2500 # dCache sets the 'Content-Type' of the response to 'application/json'
2501 # but XRootD does not set any 'Content-Type' header 8-[
2502 #
2503 # An example of a response body returned by dCache is shown below:
2504 # {
2505 # "macaroon": "MDA[...]Qo",
2506 # "uri": {
2507 # "targetWithMacaroon": "https://dcache.example.org/?authz=MD...",
2508 # "baseWithMacaroon": "https://dcache.example.org/?authz=MD...",
2509 # "target": "https://dcache.example.org/",
2510 # "base": "https://dcache.example.org/"
2511 # }
2512 # }
2513 #
2514 # An example of a response body returned by XRootD is shown below:
2515 # {
2516 # "macaroon": "MDA[...]Qo",
2517 # "expires_in": 86400
2518 # }
2519 try:
2520 response_body = json.loads(resp.data.decode())
2521 except json.JSONDecodeError:
2522 raise ValueError(f"Could not deserialize response to POST request for URL {resp.geturl()}")
2524 if "macaroon" in response_body:
2525 return response_body["macaroon"]
2527 raise ValueError(f"Could not retrieve macaroon for URL {resp.geturl()}")
2529 @override
2530 def duplicate(self, source_url: str, destination_url: str, overwrite: bool = False) -> None:
2531 """Copy the file at `source_url` to `destination_url` in the same
2532 storage endpoint.
2534 Parameters
2535 ----------
2536 source_url : `str`
2537 URL of the source file.
2538 destination_url : `str`
2539 URL of the destination file. Its parent directory must exist.
2540 overwrite : `bool`
2541 If True and a file exists at `destination_url` it will be
2542 overwritten. Otherwise an exception is raised.
2543 """
2544 # Check the source is a file
2545 if self.is_dir(source_url):
2546 raise NotImplementedError(f"copy is not implemented for directory {source_url}")
2548 # Neither dCache nor XrootD currently implement the COPY
2549 # webDAV method as documented in
2550 #
2551 # http://www.webdav.org/specs/rfc4918.html#METHOD_COPY
2552 #
2553 # (See issues DM-37603 and DM-37651 for details)
2554 # With those servers use third-party copy instead.
2555 return self._copy_via_third_party(source_url, destination_url, overwrite)
2557 def _copy_via_third_party(self, source_url: str, destination_url: str, overwrite: bool = False) -> None:
2558 """Copy the file at `source_url` to `destination_url` in the same
2559 storage endpoint using the third-party copy functionality
2560 implemented by dCache and XRootD servers.
2562 Parameters
2563 ----------
2564 source_url : `str`
2565 URL of the source file.
2566 destination_url : `str`
2567 URL of the destination file. Its parent directory must exist.
2568 overwrite : `bool`
2569 If True and a file exists at `destination_url` it will be
2570 overwritten. Otherwise an exception is raised.
2571 """
2572 # To implement COPY we use dCache's third-party copy mechanism
2573 # documented at:
2574 #
2575 # https://www.dcache.org/manuals/UserGuide-10.2/webdav.shtml#third-party-transfers
2576 #
2577 # The reason is that dCache does not correctly implement webDAV's COPY
2578 # method. See https://github.com/dCache/dcache/issues/6950
2580 # Create the destination's parent directory first because COPY may
2581 # fail if it does not exist, depending on the server implementation
2582 # of RFC 4918.
2583 destination_parent = self._parent(destination_url)
2584 self.mkcol(destination_parent)
2586 # Retrieve a macaroon for downloading the source
2587 download_macaroon = self._get_macaroon(source_url, ActivityCaveat.DOWNLOAD, 300)
2589 # Prepare and send the COPY request
2590 try:
2591 headers = {
2592 "Source": source_url,
2593 "TransferHeaderAuthorization": f"Bearer {download_macaroon}",
2594 "Credential": "none",
2595 "Depth": "0",
2596 "Overwrite": "T" if overwrite else "F",
2597 "RequireChecksumVerification": "false",
2598 }
2599 resp = self._copy(destination_url, headers=headers, preload_content=False)
2600 match resp.status:
2601 case HTTPStatus.CREATED:
2602 return
2603 case HTTPStatus.ACCEPTED:
2604 pass
2605 case _:
2606 raise ValueError(
2607 f"Unable to copy resource {resp.geturl()}; status: {resp.status} {resp.reason}"
2608 )
2610 # Analyse the response to the COPY request that the server has
2611 # not completed yet.
2612 content_type = resp.headers.get("Content-Type")
2613 if content_type != "text/perf-marker-stream":
2614 raise ValueError(
2615 f"""Unexpected Content-Type {content_type} in response to COPY request from """
2616 f"""{source_url} to {destination_url}"""
2617 )
2619 # Read the performance markers in the response body until we get
2620 # a "success" or "failure" notification.
2621 #
2622 # Documentation:
2623 # https://dcache.org/manuals/UserGuide-10.2/webdav.shtml#third-party-transfers
2624 for marker in io.TextIOWrapper(resp): # type: ignore
2625 marker = marker.rstrip("\n")
2626 if marker == "": # EOF
2627 raise ValueError(
2628 f"""Copying file from {source_url} to {destination_url} failed: """
2629 """could not get response from server"""
2630 )
2631 elif marker.startswith("failure:"):
2632 raise ValueError(
2633 f"""Copying file from {source_url} to {destination_url} failed with error: """
2634 f"""{marker}"""
2635 )
2636 elif marker.startswith("success:"):
2637 return
2638 finally:
2639 resp.drain_conn()
2642class DavClientDCache(DavClientURLSigner):
2643 """Client for interacting with a dCache webDAV server.
2645 Instances of this class are thread-safe.
2647 Parameters
2648 ----------
2649 url : `str`
2650 Root URL of the storage endpoint
2651 (e.g. "https://host.example.org:1234/").
2652 config : `DavConfig`
2653 Configuration to initialize this client.
2654 accepts_ranges : `bool` | `None`
2655 Indicate whether the remote server accepts the ``Range`` header in GET
2656 requests.
2657 """
2659 # Regular expression to parse dCache's response body of a successful
2660 # PUT request. Such a response body is of the form:
2661 #
2662 # "104857600 bytes uploaded\r\n\r\n"
2663 #
2664 rex: re.Pattern = re.compile(r"^(\d*) bytes uploaded", re.IGNORECASE | re.ASCII)
2666 def __init__(self, url: str, config: DavConfig, accepts_ranges: bool | None = None) -> None:
2667 super().__init__(url=url, config=config, accepts_ranges=accepts_ranges)
2669 # Create a specialized pool manager for sending requests to dCache
2670 # webdav door, in particular for retrieving metadata.
2671 #
2672 # As of dCache v10.2.14, the webDAV door leaves the network connection
2673 # unusable for us for sending subsequent requests after serving
2674 # GET, PUT, DELETE, etc., but leaves the connection intact after
2675 # serving MKCOL, MOVE and PROPFIND requests.
2676 # We take advantage of that by using a dedicated pool manager for
2677 # those requests, so that the network connections managed by that pool
2678 # be reused. This avoids establishing the TCP+TLS connection for each
2679 # request.
2680 pool_manager = self._make_pool_manager(self._config)
2681 self._propfind_pool_manager = pool_manager
2682 self._move_pool_manager = pool_manager
2683 self._mkcol_pool_manager = pool_manager
2685 # dCache does not deliver macaroons when we are not using a secure
2686 # channel to interact with the door. In that case, we can not use
2687 # third party copy and dCache does not correctly support the COPY
2688 # method as stated in RFC-4918.
2689 self._can_duplicate = self._base_url.startswith("https://")
2691 @override
2692 def _mkcol(
2693 self,
2694 url: str,
2695 headers: dict[str, str] | None = None,
2696 pool_manager: PoolManager | None = None,
2697 ) -> HTTPResponse:
2698 """Inherits doc string."""
2699 return self._request("MKCOL", url=url, headers=headers, pool_manager=self._mkcol_pool_manager)
2701 @override
2702 def _move(
2703 self,
2704 url: str,
2705 headers: dict[str, str] | None = None,
2706 pool_manager: PoolManager | None = None,
2707 ) -> HTTPResponse:
2708 """Inherits doc string."""
2709 return self._request("MOVE", url=url, headers=headers, pool_manager=self._move_pool_manager)
2711 @override
2712 def _propfind(
2713 self,
2714 url: str,
2715 headers: dict[str, str] | None = None,
2716 body: str = "",
2717 pool_manager: PoolManager | None = None,
2718 ) -> HTTPResponse:
2719 """Inherits doc string."""
2720 return self._request(
2721 "PROPFIND", url=url, headers=headers, body=body, pool_manager=self._propfind_pool_manager
2722 )
2724 @override
2725 def put(
2726 self,
2727 url: str,
2728 headers: dict[str, str] | None = None,
2729 data: BinaryIO | bytes = b"",
2730 ) -> int | None:
2731 # Docstring inherited.
2732 # Send a PUT request with empty body to the dCache frontend server to
2733 # get redirected to the backend.
2734 #
2735 # Details:
2736 # https://www.dcache.org/manuals/UserGuide-10.2/webdav.shtml#redirection
2737 frontend_headers = {} if headers is None else dict(headers)
2738 frontend_headers.update({"Content-Length": "0", "Expect": "100-continue"})
2739 if is_zero_length := isinstance(data, bytes) and len(data) == 0:
2740 # We are uploading an empty file. Don't send the "Expect" header
2741 # so that the dCache door handles this PUT request itself without
2742 # redirecting us to a pool.
2743 frontend_headers.pop("Expect")
2745 resp = self._put(url, headers=frontend_headers, body=b"", redirect=False)
2746 match resp.status:
2747 case HTTPStatus.OK | HTTPStatus.CREATED | HTTPStatus.NO_CONTENT:
2748 redirect_url = url
2749 case status if status in resp.REDIRECT_STATUSES:
2750 redirect_url = resp.headers.get("Location")
2751 case _:
2752 raise unexpected_status_error("PUT", url, resp)
2754 # If we are uploading an empty file, there is nothing more to do.
2755 if is_zero_length:
2756 return 0
2758 # We may have beend redirected to a backend server. Upload the file
2759 # contents to its final destination. Explicitly ask the server to close
2760 # this network connection after serving this PUT request to release
2761 # the associated dCache mover.
2762 backend_headers = {} if headers is None else dict(headers)
2763 backend_headers.update({"Connection": "close"})
2765 # Ask dCache to compute and record a checksum of the uploaded
2766 # file contents, for later integrity checks. Since we don't compute
2767 # the digest ourselves while uploading the data, we cannot control
2768 # after the request is complete that the data we uploaded is
2769 # identical to the data recorded by the server, but at least the
2770 # server has recorded a digest of the data it stored.
2771 #
2772 # See RFC-3230 for details and
2773 # https://www.iana.org/assignments/http-dig-alg/http-dig-alg.xhtml
2774 # for the list of supported digest algorithhms.
2775 if (checksum := self._config.request_checksum) is not None:
2776 backend_headers.update({"Want-Digest": checksum})
2778 resp = self._put(redirect_url, body=data, headers=backend_headers)
2779 match resp.status:
2780 case HTTPStatus.OK | HTTPStatus.CREATED | HTTPStatus.NO_CONTENT:
2781 # Parse the response body and extract the number of bytes
2782 # uploaded. This allows us to avoid sending a HEAD request
2783 # to retrieve the file size.
2784 response_body = resp.data.decode()
2785 if match := DavClientDCache.rex.match(response_body):
2786 return int(match.group(1))
2787 else:
2788 return None
2789 case _:
2790 raise unexpected_status_error("PUT", redirect_url, resp)
2792 @override
2793 def download(self, url: str, filename: str, chunk_size: int) -> int:
2794 """Download the content of a file and write it to local file.
2796 Parameters
2797 ----------
2798 url : `str`
2799 Target URL.
2800 filename : `str`
2801 Local file to write the content to. If the file already exists,
2802 it will be rewritten.
2803 chunk_size : `int`
2804 Size of the chunks to write to `filename`.
2806 Returns
2807 -------
2808 count: `int`
2809 Number of bytes written to `filename`.
2811 Notes
2812 -----
2813 The caller must ensure that the resource at `url` is a file, not
2814 a directory.
2815 """
2816 # Send a GET request without following redirection to get redirected
2817 # to the backend server.
2818 _, resp = self.get(url, preload_content=False, redirect=False)
2819 match resp.status:
2820 case HTTPStatus.OK:
2821 # We were not redirected. Consume this response.
2822 return self._write_response_body_to_file(resp, filename, chunk_size)
2823 case status if status not in resp.REDIRECT_STATUSES:
2824 raise unexpected_status_error("GET", url, resp)
2825 case _:
2826 # We were redirected. Follow this redirection.
2827 pass
2829 # Drain and release the response we received from the frontend server
2830 # so that the connection can be reused.
2831 resp.drain_conn()
2832 resp.release_conn()
2834 # We were redirected to a backend server. Send a GET request to the
2835 # backend server and ask it to close the HTTP connection to force
2836 # closing the network connection.
2837 redirect_url = resp.headers.get("Location")
2838 _, resp = self.get(redirect_url, headers={"Connection": "close"}, preload_content=False)
2839 match resp.status:
2840 case HTTPStatus.OK:
2841 return self._write_response_body_to_file(resp, filename, chunk_size)
2842 case _:
2843 raise unexpected_status_error("GET", redirect_url, resp)
2845 @override
2846 def read(self, url: str) -> tuple[str, bytes]:
2847 """Download the contents of file located at `url`.
2849 Parameters
2850 ----------
2851 url : `str`
2852 Target URL.
2854 Returns
2855 -------
2856 url: `str`
2857 Backend URL from which the data was obtained.
2858 data: `bytes`
2859 Contents of the file.
2861 Notes
2862 -----
2863 The caller must ensure that the resource at `url` is a file, not
2864 a directory.
2865 """
2866 # Send a GET request without following redirection to get redirected
2867 # to the backend server.
2868 backend_url, resp = self.get(url, redirect=False)
2869 match resp.status:
2870 case HTTPStatus.OK:
2871 return backend_url, resp.data
2872 case status if status in resp.REDIRECT_STATUSES:
2873 redirect_url = resp.headers.get("Location")
2874 case _:
2875 raise unexpected_status_error("GET", url, resp)
2877 # We were redirected. Send a GET request to the backend server
2878 # and ask it to close the HTTP connection to force closing the
2879 # network connection.
2880 final_url, resp = self.get(redirect_url, headers={"Connection": "close"})
2881 match resp.status:
2882 case HTTPStatus.OK:
2883 return final_url, resp.data
2884 case _:
2885 raise unexpected_status_error("GET", redirect_url, resp)
2887 @override
2888 def write(self, url: str, data: BinaryIO | bytes) -> int | None:
2889 """Create or rewrite a remote file at `url` with `data` as its
2890 contents.
2892 Parameters
2893 ----------
2894 url : `str`
2895 Target URL.
2896 data : `bytes`
2897 Sequence of bytes to upload.
2899 Returns
2900 -------
2901 size : `int | None`
2902 The size in bytes of the file uploaded. Can be `None` if the size
2903 could not be retrieved.
2905 Notes
2906 -----
2907 If a file already exists at `url` it will be rewritten.
2908 """
2909 # dCache will automatically create all the parent directories so we
2910 # don't need to explicitly create them. Although this is not compliant
2911 # to RFC 4918, this is advantageous because it avoids several
2912 # round-trips to the server for creating all the directories
2913 # before actually uploading the data.
2914 try:
2915 # Upload to a temporary file and rename to the final name.
2916 temporary_url = self._make_temporary_url(url)
2917 size = self.put(temporary_url, data=data)
2918 self.rename(temporary_url, url, overwrite=True, create_parent=False)
2920 # Update the file size cache with this size
2921 self._file_size_cache.update_size(url, size)
2922 return size
2923 except Exception:
2924 # Upload failed. Attempt to remove the temporary file.
2925 self.delete(temporary_url)
2926 raise
2928 @override
2929 def mkcol(self, url: str) -> None:
2930 """Create a directory at `url`.
2932 If a directory already exists at `url` no error is returned nor
2933 exception is raised. An exception is raised if a file exists at `url`.
2935 Parameters
2936 ----------
2937 url : `str`
2938 Target URL.
2939 """
2940 # A "MKCOL" request to dCache does not automatically create all
2941 # the intermediate directories if they do not exist. However, a
2942 # "PUT" request of a file does create the directory hierarchy.
2943 #
2944 # We exploit that to create directory hierarchies: we first create an
2945 # empty file with a random name and then we remove it. As a side
2946 # effect, the target directory will be created.
2947 #
2948 # Creating a directory this way implies two requests to the server
2949 # ("PUT" and "DELETE"), while using "MKCOL" would on average imply
2950 # one request per inexisting directory in the hierarchy. When
2951 # directory hierarchies are relatively deep, requiring two
2952 # requests per hierarchy is better than sending a "MKCOL" request
2953 # per directory in the hierarchy.
2954 try:
2955 temporary_url = self._make_temporary_url(url=f"{url}/mkcol")
2956 self.put(temporary_url, data=b"")
2957 finally:
2958 self.delete(temporary_url)
2960 @override
2961 def info(self, url: str, name: str | None = None) -> dict[str, Any]:
2962 # Docstring inherited.
2963 result: dict[str, Any] = {
2964 "name": name if name is not None else url,
2965 "type": None,
2966 "size": None,
2967 "last_modified": datetime.min,
2968 "checksums": {},
2969 }
2971 # Request live DAV properties as well as the checksums that dCache
2972 # recorded about this file.
2973 body = (
2974 """<?xml version="1.0" encoding="utf-8"?>"""
2975 """<D:propfind xmlns:D="DAV:" xmlns:dcache="http://www.dcache.org/2013/webdav">"""
2976 """<D:prop>"""
2977 """<D:resourcetype/>"""
2978 """<D:getcontentlength/>"""
2979 """<D:getlastmodified/>"""
2980 """<D:displayname/>"""
2981 """<dcache:Checksums/>"""
2982 """</D:prop>"""
2983 """</D:propfind>"""
2984 )
2985 resp = self.propfind(url, body=body, depth="0")
2986 match resp.status:
2987 case HTTPStatus.NOT_FOUND:
2988 return result
2989 case HTTPStatus.MULTI_STATUS:
2990 property = self._propfind_parser.parse(resp.data)[0]
2991 metadata = DavFileMetadata.from_property(base_url=self._base_url, property=property)
2992 result.update(
2993 {
2994 "type": "directory" if metadata.is_dir else "file",
2995 "size": metadata.size,
2996 "last_modified": metadata.last_modified,
2997 "checksums": metadata.checksums,
2998 }
2999 )
3000 return result
3001 case _:
3002 raise unexpected_status_error("PROPFIND", url, resp)
3005class DavClientXrootD(DavClientURLSigner):
3006 """Client for interacting with a XrootD webDAV server.
3008 Instances of this class are thread-safe.
3010 Parameters
3011 ----------
3012 url : `str`
3013 Root URL of the storage endpoint
3014 (e.g. "https://host.example.org:1234/").
3015 config : `DavConfig`
3016 Configuration to initialize this client.
3017 accepts_ranges : `bool` | `None`
3018 Indicate whether the remote server accepts the ``Range`` header in GET
3019 requests.
3020 """
3022 def __init__(self, url: str, config: DavConfig, accepts_ranges: bool | None = None) -> None:
3023 super().__init__(url=url, config=config, accepts_ranges=accepts_ranges)
3025 @override
3026 def put(
3027 self,
3028 url: str,
3029 headers: dict[str, str] | None = None,
3030 data: BinaryIO | bytes = b"",
3031 ) -> int | None:
3032 # Docstring inherited.
3033 # Send a PUT request with empty body to the XRootD frontend server to
3034 # get redirected to the backend.
3035 frontend_headers = {} if headers is None else dict(headers)
3036 frontend_headers.update({"Content-Length": "0", "Expect": "100-continue"})
3037 for attempt in range(max_attempts := 3):
3038 resp = self._put(url, headers=frontend_headers, body=b"", redirect=False)
3039 if resp.status in (
3040 HTTPStatus.OK,
3041 HTTPStatus.CREATED,
3042 HTTPStatus.NO_CONTENT,
3043 ):
3044 redirect_url = url
3045 break
3046 elif resp.status in resp.REDIRECT_STATUSES:
3047 redirect_url = resp.headers.get("Location")
3048 break
3049 elif resp.status == HTTPStatus.LOCKED:
3050 # Sometimes XRootD servers respond with status code LOCKED and
3051 # response body of the form:
3052 #
3053 # "Output file /path/to/file is already opened by 1 writer;
3054 # open denied."
3055 #
3056 # If we get such a response, try again, unless we reached
3057 # the maximum number of attempts.
3058 if attempt == max_attempts - 1:
3059 raise ValueError(
3060 f"""Unexpected response to HTTP request PUT {resp.geturl()}: status {resp.status} """
3061 f"""{resp.reason} [{resp.data.decode()}] after {max_attempts} attempts"""
3062 )
3064 # Wait a bit and try again
3065 log.warning(
3066 f"""got unexpected response status {HTTPStatus.LOCKED} Locked for PUT {resp.geturl()} """
3067 f"""(attempt {attempt}/{max_attempts}), retrying..."""
3068 )
3069 time.sleep((attempt + 1) * 0.100)
3070 continue
3071 else:
3072 raise unexpected_status_error("PUT", url, resp)
3074 # We were redirected to a backend server. Upload the file contents to
3075 # its final destination.
3077 # XRootD backend servers typically use a single port number for
3078 # accepting connections from clients. It is therefore beneficial
3079 # to keep those connections open, if the server allows.
3081 # Ask the server to compute and record a checksum of the uploaded
3082 # file contents, for later integrity checks. Since we don't compute
3083 # the digest ourselves while uploading the data, we cannot control
3084 # after the request is complete that the data we uploaded is
3085 # identical to the data recorded by the server, but at least the
3086 # server has recorded a digest of the data it stored.
3087 #
3088 # See RFC-3230 for details and
3089 # https://www.iana.org/assignments/http-dig-alg/http-dig-alg.xhtml
3090 # for the list of supported digest algorithhms.
3091 #
3092 # In addition, note that not all servers implement this RFC so
3093 # the checksum reqquest may be ignored by the server.
3094 backend_headers = {} if headers is None else dict(headers)
3095 if (checksum := self._config.request_checksum) is not None:
3096 backend_headers.update({"Want-Digest": checksum})
3098 resp = self._put(redirect_url, body=data, headers=backend_headers)
3099 match resp.status:
3100 case HTTPStatus.OK | HTTPStatus.CREATED | HTTPStatus.NO_CONTENT:
3101 # Send a HEAD request to retrieve the size of the file we
3102 # just uploaded.
3103 resp = self.head(redirect_url)
3104 size = int(resp.headers.get("Content-Length", -1))
3105 return None if size == -1 else size
3106 case _:
3107 raise unexpected_status_error("PUT", redirect_url, resp)
3109 @override
3110 def info(self, url: str, name: str | None = None) -> dict[str, Any]:
3111 # XRootD does not include checksums in the response to PROPFIND
3112 # request. We need to send a specific HEAD request to retrieve
3113 # the ADLER32 checksum.
3114 #
3115 # If found, the checksum is included in the response header "Digest",
3116 # which is of the form:
3117 #
3118 # Digest: adler32=0e4709f2
3119 result = super().info(url, name)
3120 if result["type"] == "file":
3121 headers: dict[str, str] = {"Want-Digest": "adler32"}
3122 resp = self.head(url=url, headers=headers)
3123 if (digest := resp.headers.get("Digest")) is not None:
3124 value = digest.split("=")[1]
3125 result["checksums"].update({"adler32": value})
3127 return result
3129 @override
3130 def write(self, url: str, data: BinaryIO | bytes) -> int | None:
3131 """Create or rewrite a remote file at `url` with `data` as its
3132 contents.
3134 Parameters
3135 ----------
3136 url : `str`
3137 Target URL.
3138 data : `bytes`
3139 Sequence of bytes to upload.
3141 Returns
3142 -------
3143 size : `int | None`
3144 The size in bytes of the file uploaded. Can be `None` if the size
3145 could not be retrieved.
3147 Notes
3148 -----
3149 If a file already exists at `url` it will be rewritten.
3150 """
3151 # XRootD will automatically create all the parent directories so we
3152 # don't need to explicitly create them. Although this is not compliant
3153 # to RFC 4918, this is advantageous because it avoids several
3154 # round-trips to the server for creating all the directories
3155 # before actually uploading the data.
3156 try:
3157 # Upload to a temporary file and rename to the final name.
3158 temporary_url = self._make_temporary_url(url)
3159 size = self.put(temporary_url, data=data)
3160 self.rename(temporary_url, url, overwrite=True, create_parent=False)
3162 # Update the file size cache with this size
3163 self._file_size_cache.update_size(url, size)
3164 return size
3165 except Exception:
3166 # Upload failed. Attempt to remove the temporary file.
3167 self.delete(temporary_url)
3168 raise
3170 @override
3171 def mkcol(self, url: str) -> None:
3172 """Create a directory at `url`.
3174 If a directory already exists at `url` no error is returned nor
3175 exception is raised. An exception is raised if a file exists at `url`.
3177 Parameters
3178 ----------
3179 url : `str`
3180 Target URL.
3181 """
3182 # XRootD automatically creates all the intermediate directories.
3183 resp = self._mkcol(url)
3184 match resp.status:
3185 case HTTPStatus.CREATED:
3186 return
3187 case HTTPStatus.METHOD_NOT_ALLOWED:
3188 # XRootD returns "405 Method Not Allowed" when either a file
3189 # or a directory already exists at `url`
3190 stat = self.stat(url)
3191 if stat.is_dir:
3192 # A directory exists at `url`. Nothing more to do.
3193 return
3194 elif stat.is_file:
3195 raise NotADirectoryError(
3196 f"Can not create a directory because a file already exists at {resp.geturl()}"
3197 )
3198 case _:
3199 raise ValueError(
3200 f"Can not create directory {resp.geturl()}: status {resp.status} {resp.reason}"
3201 )
3203 @override
3204 def stat(self, url: str) -> DavFileMetadata:
3205 # Docstring inherited.
3206 # XRootD v5.9.1 responds "200 OK" to a HEAD request against an
3207 # existing file. When the target URL is a directory, it also responds
3208 # "200 OK". In both cases the response header "Content-Length"
3209 # is present but has different meaning. If the target URL is a file,
3210 # the header value is the size in bytes of the file. If the target
3211 # URL is a directory, the header value is the number of items in
3212 # the directory.
3213 #
3214 # So there is not an easy way to determine if the target URL is a
3215 # file or a directory from the response to a HEAD request.
3216 #
3217 # When the target URL is a directory and we ask for a digest, the
3218 # server responds "409 Conflict". We use this behavior to
3219 # discriminate between a file and a directory.
3220 #
3221 # Note that XRootD does not include the "Last-Modified" header in the
3222 # response to a HEAD request so we cannot include the last modified
3223 # time in the value returned by this method.
3224 resp = self._head(url, headers={"Want-Digest": "adler32"})
3225 match resp.status:
3226 case HTTPStatus.OK:
3227 # There is a file at target URL
3228 if "Content-Length" in resp.headers:
3229 href = url.replace(self._base_url, "", 1)
3230 size = int(resp.headers.get("Content-Length"))
3231 return DavFileMetadata(self._base_url, href=href, exists=True, is_dir=False, size=size)
3232 else:
3233 raise ValueError(
3234 f"""Expecting Content-Length header to be present in """
3235 f"""response to HTTP HEAD {resp.geturl()}: status {resp.status} """
3236 f"""{resp.reason} [{resp.data.decode()}] but could not find it"""
3237 )
3238 case HTTPStatus.CONFLICT:
3239 # There is a directory at target URL
3240 href = url.replace(self._base_url, "", 1)
3241 return DavFileMetadata(self._base_url, href=href, exists=True, is_dir=True)
3242 case HTTPStatus.NOT_FOUND:
3243 # There is neither a file nor a directory at target URL
3244 return DavFileMetadata(base_url=url, exists=False)
3245 case _:
3246 raise unexpected_status_error("HEAD", url, resp)
3249class DavFileMetadata:
3250 """Container for attributes of interest of a webDAV file or directory.
3252 Parameters
3253 ----------
3254 base_url : `str`
3255 Base URL.
3256 href : `str`, optional
3257 Path component that can be added to the base URL.
3258 name : `str`, optional
3259 Name.
3260 exists : `bool`, optional
3261 Whether file or directory exist.
3262 size : `int`, optional
3263 Size of file.
3264 is_dir : `bool`, optional
3265 Whether the URL points to a directory or file.
3266 last_modified : `bool`, optional
3267 Last modified date.
3268 checksums : `dict` [ `str`, `str` ] | `None`, optional
3269 Checksums.
3270 """
3272 def __init__(
3273 self,
3274 base_url: str,
3275 href: str = "",
3276 name: str = "",
3277 exists: bool = False,
3278 size: int = -1,
3279 is_dir: bool = False,
3280 last_modified: datetime = datetime.min,
3281 checksums: dict[str, str] | None = None,
3282 ):
3283 self._url: str = base_url if not href else base_url.rstrip("/") + href
3284 self._href: str = href
3285 self._name: str = name
3286 self._exists: bool = exists
3287 self._size: int = size
3288 self._is_dir: bool = is_dir
3289 self._last_modified: datetime = last_modified
3290 self._checksums: dict[str, str] = {} if checksums is None else dict(checksums)
3292 @staticmethod
3293 def from_property(base_url: str, property: DavProperty) -> DavFileMetadata:
3294 """Create an instance from the values in `property`.
3296 Parameters
3297 ----------
3298 base_url : `str`
3299 Base URL.
3300 property : `DavProperty`
3301 Properties to associate with URL.
3302 """
3303 return DavFileMetadata(
3304 base_url=base_url,
3305 href=property.href,
3306 name=property.name,
3307 exists=property.exists,
3308 size=property.size,
3309 is_dir=property.is_dir,
3310 last_modified=property.last_modified,
3311 checksums=dict(property.checksums),
3312 )
3314 def __str__(self) -> str:
3315 return (
3316 f"""{self._url} {self._href} {self._name} {self._exists} {self._size} {self._is_dir} """
3317 f"""{self._checksums}"""
3318 )
3320 @property
3321 def url(self) -> str:
3322 return self._url
3324 @property
3325 def href(self) -> str:
3326 return self._href
3328 @property
3329 def name(self) -> str:
3330 return self._name
3332 @property
3333 def exists(self) -> bool:
3334 return self._exists
3336 @property
3337 def size(self) -> int:
3338 if not self._exists:
3339 return -1
3341 return 0 if self._is_dir else self._size
3343 @property
3344 def is_dir(self) -> bool:
3345 return self._exists and self._is_dir
3347 @property
3348 def is_file(self) -> bool:
3349 return self._exists and not self._is_dir
3351 @property
3352 def last_modified(self) -> datetime:
3353 return self._last_modified
3355 @property
3356 def checksums(self) -> dict[str, str]:
3357 return self._checksums
3360class DavProperty:
3361 """Helper class to encapsulate select live DAV properties of a single
3362 resource, as retrieved via a PROPFIND request.
3364 Parameters
3365 ----------
3366 response : `eTree.Element` or `None`
3367 The XML response defining the DAV property.
3368 """
3370 # Regular expression to compare against the 'status' element of a
3371 # PROPFIND response's 'propstat' element.
3372 _status_ok_rex = re.compile(r"^HTTP/.* 200 .*$", re.IGNORECASE)
3374 def __init__(self, response: eTree.Element | None):
3375 self._href: str = ""
3376 self._displayname: str = ""
3377 self._collection: bool = False
3378 self._getlastmodified: str = ""
3379 self._getcontentlength: int = -1
3380 self._checksums: dict[str, str] = {}
3382 if response is not None:
3383 self._parse(response)
3385 def _parse(self, response: eTree.Element) -> None:
3386 # Extract 'href'.
3387 if (element := response.find("./{DAV:}href")) is not None:
3388 # We need to use "str(element.text)"" instead of "element.text" to
3389 # keep mypy happy.
3390 self._href = str(element.text).strip()
3391 else:
3392 raise ValueError(
3393 "Property 'href' expected but not found in PROPFIND response: "
3394 f"{eTree.tostring(response, encoding='unicode')}"
3395 )
3397 for propstat in response.findall("./{DAV:}propstat"):
3398 # Only extract properties of interest with status OK.
3399 status = propstat.find("./{DAV:}status")
3400 if status is None or not self._status_ok_rex.match(str(status.text)):
3401 continue
3403 for prop in propstat.findall("./{DAV:}prop"):
3404 # Parse "collection".
3405 if (element := prop.find("./{DAV:}resourcetype/{DAV:}collection")) is not None:
3406 self._collection = True
3408 # Parse "getlastmodified".
3409 if (element := prop.find("./{DAV:}getlastmodified")) is not None:
3410 self._getlastmodified = str(element.text)
3412 # Parse "getcontentlength".
3413 if (element := prop.find("./{DAV:}getcontentlength")) is not None:
3414 self._getcontentlength = int(str(element.text))
3416 # Parse "displayname".
3417 if (element := prop.find("./{DAV:}displayname")) is not None:
3418 self._displayname = str(element.text)
3420 # Parse "Checksums"
3421 if (element := prop.find("./{http://www.dcache.org/2013/webdav}Checksums")) is not None:
3422 self._checksums = self._parse_checksums(element.text)
3424 # Some webDAV servers don't include the 'displayname' property in the
3425 # response so try to infer it from the value of the 'href' property.
3426 # Depending on the server the href value may end with '/'.
3427 if not self._displayname:
3428 self._displayname = os.path.basename(self._href.rstrip("/"))
3430 # Some webDAV servers do not append a "/" to the href of directories.
3431 # Ensure we include a single final "/" in our response.
3432 if self._collection:
3433 self._href = self._href.rstrip("/") + "/"
3435 # Force a size of 0 for collections.
3436 if self._collection:
3437 self._getcontentlength = 0
3439 def _parse_checksums(self, checksums: str | None) -> dict[str, str]:
3440 # checksums argument is of the form
3441 # md5=MyS/wljSzI9WYiyrsuyoxw==,adler32=23b104f2
3442 result: dict[str, str] = {}
3443 if checksums is not None:
3444 for checksum in checksums.split(","):
3445 if (pos := checksum.find("=")) != -1:
3446 algorithm, value = (checksum[:pos].lower(), checksum[pos + 1 :])
3447 if algorithm == "md5":
3448 # dCache documentation about how it encodes the
3449 # MD5 checksum:
3450 #
3451 # https://www.dcache.org/manuals/UserGuide-10.2/webdav.shtml#checksums
3452 result[algorithm] = bytes.hex(base64.standard_b64decode(value))
3453 else:
3454 result[algorithm] = value
3456 return result
3458 @property
3459 def exists(self) -> bool:
3460 # It is either a directory or a file with length of at least zero
3461 return self._collection or self._getcontentlength >= 0
3463 @property
3464 def is_dir(self) -> bool:
3465 return self._collection
3467 @property
3468 def is_file(self) -> bool:
3469 return not self._collection
3471 @property
3472 def last_modified(self) -> datetime:
3473 if not self._getlastmodified:
3474 return datetime.min
3476 # Last modified timestamp is of the form:
3477 # 'Wed, 12 Mar 2025 10:11:13 GMT'
3478 return datetime.strptime(self._getlastmodified, "%a, %d %b %Y %H:%M:%S %Z").replace(tzinfo=UTC)
3480 @property
3481 def size(self) -> int:
3482 return self._getcontentlength
3484 @property
3485 def name(self) -> str:
3486 return self._displayname
3488 @property
3489 def href(self) -> str:
3490 return self._href
3492 @property
3493 def checksums(self) -> dict[str, str]:
3494 return self._checksums
3497class DavPropfindParser:
3498 """Helper class to parse the response body of a PROPFIND request."""
3500 def __init__(self) -> None:
3501 return
3503 def parse(self, body: bytes) -> list[DavProperty]:
3504 """Parse the XML-encoded contents of the response body to a webDAV
3505 PROPFIND request.
3507 Parameters
3508 ----------
3509 body : `bytes`
3510 XML-encoded response body to a PROPFIND request.
3512 Returns
3513 -------
3514 responses : `list` [ `DavProperty` ]
3515 Parsed content of the response.
3517 Notes
3518 -----
3519 Is is expected that there is at least one reponse in `body`, otherwise
3520 this function raises.
3521 """
3522 # A response body to a PROPFIND request is of the form (indented for
3523 # readability):
3524 #
3525 # <?xml version="1.0" encoding="UTF-8"?>
3526 # <D:multistatus xmlns:D="DAV:">
3527 # <D:response>
3528 # <D:href>path/to/resource</D:href>
3529 # <D:propstat>
3530 # <D:prop>
3531 # <D:resourcetype>
3532 # <D:collection xmlns:D="DAV:"/>
3533 # </D:resourcetype>
3534 # <D:getlastmodified>
3535 # Fri, 27 Jan 2 023 13:59:01 GMT
3536 # </D:getlastmodified>
3537 # <D:getcontentlength>
3538 # 12345
3539 # </D:getcontentlength>
3540 # </D:prop>
3541 # <D:status>
3542 # HTTP/1.1 200 OK
3543 # </D:status>
3544 # </D:propstat>
3545 # </D:response>
3546 # <D:response>
3547 # ...
3548 # </D:response>
3549 # <D:response>
3550 # ...
3551 # </D:response>
3552 # </D:multistatus>
3554 # Scan all the 'response' elements and extract the relevant properties
3555 decoded_body: str = body.decode().strip()
3556 responses = []
3557 multistatus = eTree.fromstring(decoded_body)
3558 for response in multistatus.findall("./{DAV:}response"):
3559 responses.append(DavProperty(response))
3561 if responses:
3562 return responses
3563 else:
3564 # Could not parse the body
3565 raise ValueError(f"Unable to parse response for PROPFIND request: {decoded_body}")
3568class Authorizer:
3569 """Base class for attaching an 'Authorization' header to a HTTP request."""
3571 def set_authorization(self, headers: dict[str, str]) -> None:
3572 """Add the 'Authorization' header to `headers`.
3574 Parameters
3575 ----------
3576 headers : `dict` [ `str`, `str` ]
3577 Dict to augment with authorization information.
3579 Notes
3580 -----
3581 This method must be implemented by concrete subclasses.
3582 """
3583 raise NotImplementedError
3585 def _is_file_protected(self, filepath: str) -> bool:
3586 """Return true if the permissions of file at `filepath` only allow for
3587 access by its owner.
3589 Parameters
3590 ----------
3591 filepath : `str`
3592 Path of a local file.
3593 """
3594 if not os.path.isfile(filepath):
3595 return False
3597 mode = stat.S_IMODE(os.stat(filepath).st_mode)
3598 owner_accessible = bool(mode & stat.S_IRWXU)
3599 group_accessible = bool(mode & stat.S_IRWXG)
3600 other_accessible = bool(mode & stat.S_IRWXO)
3601 return owner_accessible and not group_accessible and not other_accessible
3603 def _read_if_modified_since(
3604 self, filename: str | None, timestamp: float
3605 ) -> tuple[str, float] | tuple[None, None]:
3606 """Read local file `filename` if its modification time is more
3607 recent than `timestamp`.
3609 Parameters
3610 ----------
3611 filename : `str`, optional
3612 Path of a local file.
3614 timestamp: `float`, optional
3615 Timestamp to compare against the last modification time of
3616 `filename`. The contents of file at `filename` is only read if its
3617 modification time is more recent than `timestamp`.
3619 Returns
3620 -------
3621 result: `tuple[str, float]`
3622 tuple of (contents of file `filename`, timestamp of the read
3623 operation).
3625 If `filename` is `None`, the returned value is `tuple[None, None]`.
3626 """
3627 if filename is None:
3628 return (None, None)
3630 if os.stat(filename).st_mtime < timestamp:
3631 return (None, None)
3633 with open(filename) as file:
3634 time_of_last_read = time.time()
3635 return (file.read().rstrip("\n"), time_of_last_read)
3638class TokenAuthorizer(Authorizer):
3639 """Attach a bearer token 'Authorization' header to each request.
3641 Parameters
3642 ----------
3643 token : `str`
3644 Can be either the path to a local file which contains the
3645 value of the token or the token itself. If `token` is a file
3646 it must be protected so that only the owner can read and write it.
3647 """
3649 def __init__(self, token: str | None = None) -> None:
3650 self._token = self._token_path = None
3651 self._time_of_last_read: float = -1.0
3652 if token is None:
3653 return
3655 self._token = token
3656 if os.path.isfile(token):
3657 self._token_path = os.path.abspath(token)
3658 if not self._is_file_protected(self._token_path):
3659 raise PermissionError(
3660 f"""Authorization token file at {self._token_path} must be protected for access only """
3661 """by its owner"""
3662 )
3663 self._update_token()
3665 def _update_token(self) -> None:
3666 """Read the token file (if any) if its modification time is more recent
3667 than the last time we read it.
3668 """
3669 if self._token_path is None:
3670 return None
3672 token, time_of_last_read = self._read_if_modified_since(self._token_path, self._time_of_last_read)
3673 if token is None or time_of_last_read is None:
3674 return
3676 # Update the token value and the last time we read it.
3677 self._token = token
3678 self._time_of_last_read = time_of_last_read
3680 @override
3681 def set_authorization(self, headers: dict[str, str]) -> None:
3682 """Add the 'Authorization' header to `headers`.
3684 Parameters
3685 ----------
3686 headers : `dict` [ `str`, `str` ]
3687 Dict to augment with authorization information.
3688 """
3689 if self._token is None:
3690 return
3692 self._update_token()
3693 headers["Authorization"] = f"Bearer {self._token}"
3696class BasicAuthorizer(Authorizer):
3697 """Attach a 'Authorization' header to each request using Basic
3698 authentication.
3700 Parameters
3701 ----------
3702 user_name : `str`
3703 Can be either the path to a local file which contains the
3704 user name or the user name itself. If `user_name` is a file
3705 it must be protected so that only the owner can read and write it.
3706 user_password : `str`
3707 Can be either the path to a local file which contains the
3708 value of the password or the password itself. If `user_password` is a
3709 file it must be protected so that only the owner can read and write it.
3710 """
3712 def __init__(self, user_name: str | None = None, user_password: str | None = None) -> None:
3713 if user_name is None or user_password is None:
3714 return
3716 self._user_name: str | None = user_name
3717 self._user_password: str | None = user_password
3718 self._user_password_path: str | None = None
3719 self._time_of_last_read: float = -1.0
3720 self._header_value: str = ""
3722 if os.path.isfile(self._user_password):
3723 # The value in `user_password` is the path to a file. Check
3724 # the file is protected and read its contents.
3725 self._user_password_path = os.path.abspath(self._user_password)
3726 if not self._is_file_protected(self._user_password_path):
3727 raise PermissionError(
3728 f"""Password file at {self._user_password_path} must be protected for access only """
3729 """by its owner"""
3730 )
3731 self._update_password()
3732 else:
3733 self._update_header_value()
3735 def _update_header_value(self) -> None:
3736 """Compute the value of the 'Authorization' header using HTTP basic
3737 authorization.
3738 """
3739 basic_auth_header = make_headers(basic_auth=f"{self._user_name}:{self._user_password}")
3740 self._header_value = basic_auth_header["authorization"]
3742 def _update_password(self) -> None:
3743 """Update the password of this authorizer if the file it is stored in
3744 has been modified since the last time we read it.
3745 """
3746 if self._user_password_path is None:
3747 return None
3749 password, time_of_last_read = self._read_if_modified_since(
3750 self._user_password_path, self._time_of_last_read
3751 )
3752 if password is None or time_of_last_read is None:
3753 return
3755 # Update the password, the last time we read it and re-compute the
3756 # value of the "Authorization" header.
3757 self._user_password = password
3758 self._time_of_last_read = time_of_last_read
3759 self._update_header_value()
3761 @override
3762 def set_authorization(self, headers: dict[str, str]) -> None:
3763 """Add the 'Authorization' header to `headers`.
3765 Parameters
3766 ----------
3767 headers : `dict` [ `str`, `str` ]
3768 Dict to augment with authorization information.
3769 """
3770 if self._user_name is None or self._user_password is None:
3771 return
3773 self._update_password()
3774 headers["Authorization"] = self._header_value
3777def expand_vars(path: str | None) -> str | None:
3778 """Expand the environment variables in `path` and return the path with
3779 the value of the variable expanded.
3781 Parameters
3782 ----------
3783 path : `str` or `None`
3784 Abolute or relative path which may include an environment variable
3785 (e.g. '$HOME/path/to/my/file').
3787 Returns
3788 -------
3789 path: `str`
3790 The path with the values of the environment variables expanded.
3791 """
3792 return None if path is None else os.path.expandvars(path)
3795def dump_response(method: str, resp: HTTPResponse, dump_body: bool = False) -> None:
3796 """Dump response for debugging purposes.
3798 Parameters
3799 ----------
3800 method : `str`
3801 Method name to include in log output.
3802 resp : `HTTPResponse`
3803 Response to dump.
3804 dump_body : `bool`, optional
3805 Whether or not to issue a debug log message.
3806 """
3807 log.debug("%s %s", method, resp.geturl())
3808 log.debug(" %s %s", resp.status, resp.reason)
3810 for header, value in resp.headers.items():
3811 log.debug(" %s: %s", header, value)
3813 if dump_body:
3814 log.debug(" response body length: %d", len(resp.data.decode()))