Coverage for python / lsst / resources / http.py: 21%
799 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:32 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:32 +0000
1# This file is part of lsst-resources.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14__all__ = ("HttpResourcePath",)
16import contextlib
17import datetime
18import enum
19import functools
20import io
21import json
22import logging
23import math
24import os
25import os.path
26import random
27import re
28import ssl
29import stat
30from collections.abc import Iterator
31from email.utils import parsedate_to_datetime
32from typing import TYPE_CHECKING, Any, BinaryIO, cast
34try:
35 # Prefer 'defusedxml' (not part of standard library) if available, since
36 # 'xml' is vulnerable to XML bombs.
37 import defusedxml.ElementTree as eTree
38except ImportError:
39 import xml.etree.ElementTree as eTree
41try:
42 import fsspec
43 from aiohttp import ClientSession, ClientTimeout, TCPConnector
44 from fsspec.implementations.http import HTTPFileSystem
45 from fsspec.spec import AbstractFileSystem
46except ImportError:
47 fsspec = None
48 AbstractFileSystem = type
49 HTTPFileSystem = type
51from urllib.parse import parse_qs
53import requests
54from astropy import units as u
55from requests.adapters import HTTPAdapter
56from requests.auth import AuthBase
57from urllib3.util.retry import Retry
59from lsst.utils.timer import time_this
61from ._resourceHandles import ResourceHandleProtocol
62from ._resourceHandles._httpResourceHandle import HttpReadResourceHandle, parse_content_range_header
63from ._resourcePath import ResourceInfo, ResourcePath
64from .utils import _get_num_workers, get_tempdir
66if TYPE_CHECKING:
67 from .utils import TransactionProtocol
69log = logging.getLogger(__name__)
72def _timeout_from_environment(env_var: str, default_value: float) -> float:
73 """Convert and return a timeout from the value of an environment variable
74 or a default value if the environment variable is not initialized. The
75 value of `env_var` must be a valid `float` otherwise this function raises.
77 Parameters
78 ----------
79 env_var : `str`
80 Environment variable to look for.
81 default_value : `float``
82 Value to return if `env_var` is not defined in the environment.
84 Returns
85 -------
86 _timeout_from_environment : `float`
87 Converted value.
88 """
89 try:
90 timeout = float(os.environ.get(env_var, default_value))
91 except ValueError:
92 raise ValueError(
93 f"Expecting valid timeout value in environment variable {env_var} but found "
94 f"{os.environ.get(env_var)}"
95 ) from None
97 if math.isnan(timeout):
98 raise ValueError(f"Unexpected timeout value NaN found in environment variable {env_var}")
100 return timeout
103@functools.lru_cache
104def _calc_tmpdir_buffer_size(tmpdir: str) -> int:
105 """Compute the block size as 256 blocks of typical size
106 (i.e. 4096 bytes) or 10 times the file system block size,
107 whichever is higher.
109 This is a reasonable compromise between
110 using memory for buffering and the number of system calls
111 issued to read from or write to temporary files.
112 """
113 fsstats = os.statvfs(tmpdir)
114 return max(10 * fsstats.f_bsize, 256 * 4096)
117class HttpResourcePathConfig:
118 """Configuration class to encapsulate the configurable items used by class
119 HttpResourcePath.
120 """
122 # Default timeouts for all HTTP requests (seconds).
123 DEFAULT_TIMEOUT_CONNECT: float = 60.0
124 DEFAULT_TIMEOUT_READ: float = 1_500.0
126 # Default lower and upper bounds for the backoff interval (seconds).
127 # A value in this interval is randomly selected as the backoff factor when
128 # requests need to be retried.
129 DEFAULT_BACKOFF_MIN: float = 1.0
130 DEFAULT_BACKOFF_MAX: float = 3.0
132 # Default number of connections to persist with both the front end and
133 # back end servers.
134 DEFAULT_FRONTEND_PERSISTENT_CONNECTIONS: int = 2
135 DEFAULT_BACKEND_PERSISTENT_CONNECTIONS: int = 1
137 # Accepted digest algorithms
138 ACCEPTED_DIGESTS: list[str] = ["adler32", "md5", "sha-256", "sha-512"]
140 def __init__(self) -> None:
141 self._front_end_connections: int | None = None
142 self._back_end_connections: int | None = None
143 self._digest_algorithm: str | None = None
144 self._send_expect_on_put: bool | None = None
145 self._fsspec_is_enabled: bool | None = None
146 self._timeout: tuple[float, float] | None = None
147 self._collect_memory_usage: bool | None = None
148 self._backoff_min: float | None = None
149 self._backoff_max: float | None = None
150 self._ca_bundle: str | None = ""
151 self._client_token: str | None = ""
152 self._client_cert: str | None = ""
153 self._client_key: str | None = ""
154 self._tmpdir_buffersize: tuple[str, int] | None = None
155 self._ssl_context: ssl.SSLContext | None = None
157 @property
158 def front_end_connections(self) -> int:
159 """Number of persistent connections to the front end server."""
160 if self._front_end_connections is not None: 160 ↛ 161line 160 didn't jump to line 161 because the condition on line 160 was never true
161 return self._front_end_connections
163 default_pool_size = max(_get_num_workers(), self.DEFAULT_FRONTEND_PERSISTENT_CONNECTIONS)
165 try:
166 self._front_end_connections = int(
167 os.environ.get("LSST_HTTP_FRONTEND_PERSISTENT_CONNECTIONS", default_pool_size)
168 )
169 except ValueError:
170 self._front_end_connections = default_pool_size
172 return self._front_end_connections
174 @property
175 def back_end_connections(self) -> int:
176 """Number of persistent connections to the back end servers."""
177 if self._back_end_connections is not None: 177 ↛ 178line 177 didn't jump to line 178 because the condition on line 177 was never true
178 return self._back_end_connections
180 default_pool_size = max(_get_num_workers(), self.DEFAULT_FRONTEND_PERSISTENT_CONNECTIONS)
182 try:
183 self._back_end_connections = int(
184 os.environ.get("LSST_HTTP_BACKEND_PERSISTENT_CONNECTIONS", default_pool_size)
185 )
186 except ValueError:
187 self._back_end_connections = default_pool_size
189 return self._back_end_connections
191 @property
192 def digest_algorithm(self) -> str:
193 """Algorithm to ask the server to use for computing and recording
194 digests of each file contents in PUT requests.
196 Returns
197 -------
198 digest_algorithm: `str`
199 The name of a digest algorithm or the empty string if no algotihm
200 is configured.
201 """
202 if self._digest_algorithm is not None:
203 return self._digest_algorithm
205 digest = os.environ.get("LSST_HTTP_DIGEST", "").lower()
206 if digest not in self.ACCEPTED_DIGESTS:
207 digest = ""
209 self._digest_algorithm = digest
210 return self._digest_algorithm
212 @property
213 def send_expect_on_put(self) -> bool:
214 """Return True if a "Expect: 100-continue" header is to be sent to
215 the server on each PUT request.
217 Some servers (e.g. dCache) uses this information as an indication that
218 the client knows how to handle redirects to the specific server that
219 will actually receive the data for PUT requests.
220 """
221 if self._send_expect_on_put is not None:
222 return self._send_expect_on_put
224 self._send_expect_on_put = "LSST_HTTP_PUT_SEND_EXPECT_HEADER" in os.environ
225 return self._send_expect_on_put
227 @property
228 def fsspec_is_enabled(self) -> bool:
229 """Return True if `fsspec` is enabled for objects of class
230 HttpResourcePath.
232 To determine if `fsspec` is enabled, this method inspects the presence
233 of the environment variable `LSST_HTTP_ENABLE_FSSPEC` (with any value).
234 """
235 if self._fsspec_is_enabled is not None:
236 return self._fsspec_is_enabled
238 self._fsspec_is_enabled = "LSST_HTTP_ENABLE_FSSPEC" in os.environ
239 return self._fsspec_is_enabled
241 @property
242 def timeout(self) -> tuple[float, float]:
243 """Return a tuple with the values of timeouts for connecting to the
244 server and reading its response, respectively. Both values are in
245 seconds.
246 """
247 if self._timeout is not None:
248 return self._timeout
250 self._timeout = (
251 _timeout_from_environment("LSST_HTTP_TIMEOUT_CONNECT", self.DEFAULT_TIMEOUT_CONNECT),
252 _timeout_from_environment("LSST_HTTP_TIMEOUT_READ", self.DEFAULT_TIMEOUT_READ),
253 )
254 return self._timeout
256 @property
257 def collect_memory_usage(self) -> bool:
258 """Return true if we want to collect memory usage when timing
259 operations against the remote server via the `lsst.utils.time_this`
260 context manager.
261 """
262 if self._collect_memory_usage is not None:
263 return self._collect_memory_usage
265 self._collect_memory_usage = "LSST_HTTP_COLLECT_MEMORY_USAGE" in os.environ
266 return self._collect_memory_usage
268 @property
269 def backoff_min(self) -> float:
270 """Lower bound of the interval from which a backoff factor is randomly
271 selected when retrying requests (seconds).
272 """
273 if self._backoff_min is not None:
274 return self._backoff_min
276 self._backoff_min = self.DEFAULT_BACKOFF_MIN
277 try:
278 backoff_min = float(os.environ.get("LSST_HTTP_BACKOFF_MIN", self.DEFAULT_BACKOFF_MIN))
279 if not math.isnan(backoff_min): 279 ↛ 284line 279 didn't jump to line 284 because the condition on line 279 was always true
280 self._backoff_min = backoff_min
281 except ValueError:
282 pass
284 return self._backoff_min
286 @property
287 def backoff_max(self) -> float:
288 """Upper bound of the interval from which a backoff factor is randomly
289 selected when retrying requests (seconds).
290 """
291 if self._backoff_max is not None:
292 return self._backoff_max
294 self._backoff_max = self.DEFAULT_BACKOFF_MAX
295 try:
296 backoff_max = float(os.environ.get("LSST_HTTP_BACKOFF_MAX", self.DEFAULT_BACKOFF_MAX))
297 if not math.isnan(backoff_max): 297 ↛ 302line 297 didn't jump to line 302 because the condition on line 297 was always true
298 self._backoff_max = backoff_max
299 except ValueError:
300 pass
302 return self._backoff_max
304 @property
305 def ca_bundle(self) -> str | None:
306 """Local path to the certificate bundle file or directory where the
307 certifcates of the trusted authorities are located.
309 Return None if this host's system certificate bundle should be
310 used for authenticating remote servers' certificates.
311 """
312 if self._ca_bundle != "":
313 return self._ca_bundle
315 # If a bundle was specified via the environment variable
316 # 'LSST_HTTP_CACERT_BUNDLE' use it.
317 self._ca_bundle = os.getenv("LSST_HTTP_CACERT_BUNDLE")
318 return self._ca_bundle
320 @property
321 def client_token(self) -> str | None:
322 """Value of a bearer token or path to a local file which contains
323 the bearer token to use for authenticating the client when sending
324 requests to the webDAV or HTTP server.
326 Return None if no bearer token is configured in the environment.
327 """
328 if self._client_token != "":
329 return self._client_token
331 # If environment variable LSST_HTTP_AUTH_BEARER_TOKEN is
332 # initialized use its value as the bearer token.
333 self._client_token = os.getenv("LSST_HTTP_AUTH_BEARER_TOKEN")
334 return self._client_token
336 @property
337 def client_cert_key(self) -> tuple[str | None, str | None]:
338 """Paths to a local file where the client certificate and associated
339 private key are located.
341 Return a tuple (client certificate, private key) or (None, None) if no
342 client certificate is configured via environment variables.
343 """
344 if self._client_cert != "" and self._client_key != "":
345 return (self._client_cert, self._client_key)
347 # If the environment variables LSST_HTTP_AUTH_CLIENT_CERT
348 # and LSST_HTTP_AUTH_CLIENT_KEY are initialized use their values.
349 self._client_cert = os.getenv("LSST_HTTP_AUTH_CLIENT_CERT")
350 self._client_key = os.getenv("LSST_HTTP_AUTH_CLIENT_KEY")
351 if self._client_cert and self._client_key:
352 if not _is_protected(self._client_key):
353 raise PermissionError(
354 f"Private key file at {self._client_key} must be protected for access only by its owner"
355 )
356 return (self._client_cert, self._client_key)
358 # If only the certificate was provided raise.
359 if self._client_cert:
360 raise ValueError(
361 "Environment variable LSST_HTTP_AUTH_CLIENT_KEY must be set to client private key file path"
362 )
364 # If only the private key was provided raise.
365 if self._client_key:
366 raise ValueError(
367 "Environment variable LSST_HTTP_AUTH_CLIENT_CERT must be set to client certificate file path"
368 )
370 # If a X.509 user proxy is available, use it as client credentials.
371 self._client_cert = self._client_key = os.getenv("X509_USER_PROXY")
372 return (self._client_cert, self._client_key)
374 @property
375 def tmpdir_buffersize(self) -> tuple[str, int]:
376 """Return the path to a temporary directory and the preferred buffer
377 size to use when reading or writing files in that directory.
378 """
379 if self._tmpdir_buffersize is not None:
380 return self._tmpdir_buffersize
382 tmpdir = get_tempdir()
384 # Compute the block size as 256 blocks of typical size
385 # (i.e. 4096 bytes) or 10 times the file system block size,
386 # whichever is higher. This is a reasonable compromise between
387 # using memory for buffering and the number of system calls
388 # issued to read from or write to temporary files.
389 bufsize = _calc_tmpdir_buffer_size(tmpdir)
390 self._tmpdir_buffersize = (tmpdir, bufsize)
392 return self._tmpdir_buffersize
394 @property
395 def ssl_context(self) -> ssl.SSLContext:
396 """Return an SSL context equiped with the certificates of the trusted
397 authorities.
398 """
399 if self._ssl_context is None:
400 self._ssl_context = ssl.create_default_context()
401 if self.ca_bundle is not None:
402 if os.path.isdir(self.ca_bundle):
403 self._ssl_context.load_verify_locations(capath=self.ca_bundle)
404 elif os.path.isfile(self.ca_bundle):
405 self._ssl_context.load_verify_locations(cafile=self.ca_bundle)
407 return self._ssl_context
410@functools.lru_cache
411def _get_dav_and_server_headers(path: ResourcePath | str) -> tuple[str | None, str | None]:
412 """Retrieve the "DAV" and "Server" headers sent by the remote server as
413 part of the response to a single "OPTIONS" HTTP request.
415 Parameters
416 ----------
417 path : `ResourcePath` or `str`
418 URL to the resource to be checked.
419 Should preferably refer to the root since the status is shared
420 by all paths in that server.
422 Returns
423 -------
424 _get_dav_and_server_headers : `tuple[str|None, str|None]`
425 Values of the "DAV" and "Server" headers found in the response or
426 None if any of those headers was not part of the response.
427 """
428 try:
429 if not isinstance(path, HttpResourcePath):
430 path = HttpResourcePath(path)
432 config = HttpResourcePathConfig()
433 with SessionStore(config=config).get(path) as session:
434 resp = session.options(
435 str(path), stream=False, timeout=config.timeout, headers=path._extra_headers
436 )
438 dav_header = server_header = None
439 if resp.status_code == requests.codes.ok:
440 dav_header = resp.headers.get("DAV") if "DAV" in resp.headers else None
441 server_header = resp.headers.get("Server") if "Server" in resp.headers else None
443 return (dav_header, server_header)
445 except requests.exceptions.SSLError as e:
446 log.warning(
447 "Environment variable LSST_HTTP_CACERT_BUNDLE can be used to "
448 "specify the path to a bundle of certificate authorities you trust "
449 "which are not included in the default set of trusted authorities "
450 "of this system."
451 )
452 raise e
455class BearerTokenAuth(AuthBase):
456 """Attach a bearer token 'Authorization' header to each request.
458 Parameters
459 ----------
460 token : `str`
461 Can be either the path to a local protected file which contains the
462 value of the token or the token itself.
463 """
465 def __init__(self, token: str):
466 self._token = self._path = None
467 self._mtime: float = -1.0
468 if not token:
469 return
471 self._token = token
472 if os.path.isfile(token):
473 self._path = os.path.abspath(token)
474 if not _is_protected(self._path):
475 raise PermissionError(
476 f"Bearer token file at {self._path} must be protected for access only by its owner"
477 )
478 self._refresh()
480 def _refresh(self) -> None:
481 """Read the token file (if any) if its modification time is more recent
482 than the last time we read it.
483 """
484 if not self._path:
485 return
487 if (mtime := os.stat(self._path).st_mtime) > self._mtime:
488 log.debug("Reading bearer token file at %s", self._path)
489 self._mtime = mtime
490 with open(self._path) as f:
491 self._token = f.read().rstrip("\n")
493 def __call__(self, req: requests.PreparedRequest) -> requests.PreparedRequest:
494 # Only add a bearer token to a request when using secure HTTP.
495 if req.url and req.url.lower().startswith("https://") and self._token:
496 self._refresh()
497 req.headers["Authorization"] = f"Bearer {self._token}"
498 return req
501class SessionStore:
502 """Cache a reusable HTTP client session per endpoint.
504 Parameters
505 ----------
506 config : `HttpResourcePathConfig`
507 Configuration items shared by all instances of HttpResourcePath.
508 num_pools : `int`, optional
509 Number of connection pools to keep: there is one pool per remote
510 host.
511 max_persistent_connections : `int`, optional
512 Maximum number of connections per remote host to persist in each
513 connection pool.
514 backoff_min : `float`, optional
515 Minimum value of the interval to compute the exponential
516 backoff factor when retrying requests (seconds).
517 backoff_max : `float`, optional
518 Maximum value of the interval to compute the exponential
519 backoff factor when retrying requests (seconds).
520 """
522 def __init__(
523 self,
524 config: HttpResourcePathConfig,
525 num_pools: int = 10,
526 max_persistent_connections: int = 1,
527 backoff_min: float = 1.0,
528 backoff_max: float = 3.0,
529 ) -> None:
530 # Dictionary to store the session associated to a given URI. The key
531 # of the dictionary is a root URI and the value is the session.
532 self._sessions: dict[str, requests.Session] = {}
534 # Configuration for all instances of HttpResourcePath objects.
535 self._config = config
537 # See documentation of urllib3 PoolManager class:
538 # https://urllib3.readthedocs.io
539 self._num_pools: int = num_pools
541 # See urllib3 Advanced Usage documentation:
542 # https://urllib3.readthedocs.io/en/stable/advanced-usage.html
543 self._max_persistent_connections: int = max_persistent_connections
545 # Minimum and maximum values of the interval to compute the exponential
546 # backoff factor when retrying requests (seconds).
547 self._backoff_min: float = backoff_min
548 self._backoff_max: float = backoff_max if backoff_max > backoff_min else backoff_min + 1.0
550 def clear(self) -> None:
551 """Destroy all previously created sessions and attempt to close
552 underlying idle network connections.
553 """
554 # Close all sessions and empty the store. Idle network connections
555 # should be closed as a consequence. We don't have means through
556 # the API exposed by Requests to actually force closing the
557 # underlying open sockets.
558 for session in self._sessions.values():
559 session.close()
561 self._sessions.clear()
563 def get(self, rpath: ResourcePath) -> requests.Session:
564 """Retrieve a session for accessing the remote resource at rpath.
566 Parameters
567 ----------
568 rpath : `ResourcePath`
569 URL to a resource at the remote server for which a session is to
570 be retrieved.
572 Notes
573 -----
574 Once a session is created for a given endpoint it is cached and
575 returned every time a session is requested for any path under that same
576 endpoint. For instance, a single session will be cached and shared
577 for paths "https://www.example.org/path/to/file" and
578 "https://www.example.org/any/other/path".
580 Note that "https://www.example.org" and "https://www.example.org:12345"
581 will have different sessions since the port number is not identical.
582 """
583 root_uri = str(rpath.root_uri())
584 if root_uri not in self._sessions:
585 # We don't have yet a session for this endpoint: create a new one.
586 self._sessions[root_uri] = self._make_session(rpath)
588 return self._sessions[root_uri]
590 def _make_session(self, rpath: ResourcePath) -> requests.Session:
591 """Make a new session configured from values from the environment."""
592 session = requests.Session()
593 root_uri = str(rpath.root_uri())
594 log.debug("Creating new HTTP session for endpoint %s ...", root_uri)
595 retries = Retry(
596 # Total number of retries to allow. Takes precedence over other
597 # counts.
598 total=6,
599 # How many connection-related errors to retry on.
600 connect=3,
601 # How many times to retry on read errors.
602 read=3,
603 # Backoff factor to apply between attempts after the second try
604 # (seconds). Compute a random jitter to prevent all the clients
605 # to overwhelm the server by sending requests at the same time.
606 backoff_factor=self._backoff_min + (self._backoff_max - self._backoff_min) * random.random(),
607 # How many times to retry on bad status codes.
608 status=5,
609 # Set of uppercased HTTP method verbs that we should retry on.
610 # We only automatically retry idempotent requests.
611 allowed_methods=frozenset(
612 [
613 "COPY",
614 "DELETE",
615 "GET",
616 "HEAD",
617 "MKCOL",
618 "OPTIONS",
619 "PROPFIND",
620 "PUT",
621 ]
622 ),
623 # HTTP status codes that we should force a retry on.
624 status_forcelist=frozenset(
625 [
626 requests.codes.too_many_requests, # 429
627 requests.codes.internal_server_error, # 500
628 requests.codes.bad_gateway, # 502
629 requests.codes.service_unavailable, # 503
630 requests.codes.gateway_timeout, # 504
631 ]
632 ),
633 # Whether to respect Retry-After header on status codes defined
634 # above.
635 respect_retry_after_header=True,
636 )
638 # Persist the specified number of connections to the front end server.
639 session.mount(
640 root_uri,
641 HTTPAdapter(
642 pool_connections=self._num_pools,
643 pool_maxsize=self._max_persistent_connections,
644 pool_block=False,
645 max_retries=retries,
646 ),
647 )
649 # Do not persist the connections to back end servers which may vary
650 # from request to request. Systematically persisting connections to
651 # those servers may exhaust their capabilities when there are thousands
652 # of simultaneous clients.
653 session.mount(
654 f"{rpath.scheme}://",
655 HTTPAdapter(
656 pool_connections=self._num_pools,
657 pool_maxsize=0,
658 pool_block=False,
659 max_retries=retries,
660 ),
661 )
663 # If the remote endpoint doesn't use secure HTTP we don't include
664 # bearer tokens in the requests nor need to authenticate the remote
665 # server.
666 if rpath.scheme != "https":
667 return session
669 # Set the trusted CA certificates bundle for authenticating remote
670 # servers.
671 session.verify = True if self._config.ca_bundle is None else self._config.ca_bundle
673 # Should we use a bearer token for client authentication?
674 if (token := self._config.client_token) is not None:
675 log.debug("... using bearer token authentication")
676 session.auth = BearerTokenAuth(token)
677 return session
679 # Should we instead use client certificate and private key?
680 client_cert, client_key = self._config.client_cert_key
681 if client_cert and client_key:
682 log.debug("... using client certificate authentication.")
683 session.cert = (client_cert, client_key)
684 return session
686 log.debug(
687 "Neither LSST_HTTP_AUTH_BEARER_TOKEN nor (LSST_HTTP_AUTH_CLIENT_CERT and "
688 "LSST_HTTP_AUTH_CLIENT_KEY) are initialized. Client authentication is disabled."
689 )
690 return session
693class ActivityCaveat(enum.Enum):
694 """Helper class for enumerating accepted activity caveats for requesting
695 macaroons.
696 """
698 DOWNLOAD = 1
699 UPLOAD = 2
702class HttpResourcePath(ResourcePath):
703 """General HTTP(S) resource.
705 Notes
706 -----
707 In order to configure the behavior of instances of this class, the
708 environment variables below are inspected:
710 - LSST_HTTP_CACERT_BUNDLE: path to a .pem file or to a directory which
711 contains the .pem files of the trusted certificate authorities's
712 certificates. If the remote server presents a server certificate
713 issued by one of those trusted authorities, we trust it.
714 If this environment variable is not initialized, the default
715 authorities of the the execution host are trusted.
717 - LSST_HTTP_AUTH_BEARER_TOKEN: value of a bearer token or path to a
718 local file containing a bearer token to be used as the client
719 authentication mechanism with all requests.
720 The permissions of the token file must be set so that only its
721 owner can access it.
722 If initialized, takes precedence over LSST_HTTP_AUTH_CLIENT_CERT
723 and LSST_HTTP_AUTH_CLIENT_KEY.
725 - LSST_HTTP_AUTH_CLIENT_CERT: path to a .pem file which contains the
726 client certificate for authenticating to the server.
727 If initialized, the variable LSST_HTTP_AUTH_CLIENT_KEY must also be
728 initialized with the path of the client private key file.
729 The permissions of the client private key must be set so that only
730 its owner can access it, at least for reading.
732 - LSST_HTTP_PUT_SEND_EXPECT_HEADER: if set (with any value), a
733 "Expect: 100-Continue" header will be added to all HTTP PUT requests.
734 This header is required by some servers to detect if the client
735 knows how to handle redirections. In case of redirection, the body
736 of the PUT request is sent to the redirected location and not to
737 the front end server.
739 - LSST_HTTP_TIMEOUT_CONNECT and LSST_HTTP_TIMEOUT_READ: if set to a
740 numeric value, they are interpreted as the number of seconds to wait
741 for establishing a connection with the server and for reading its
742 response, respectively.
744 - LSST_HTTP_FRONTEND_PERSISTENT_CONNECTIONS and
745 LSST_HTTP_BACKEND_PERSISTENT_CONNECTIONS: contain the maximum number
746 of connections to attempt to persist with both the front end servers
747 and the back end servers.
748 Default values: DEFAULT_FRONTEND_PERSISTENT_CONNECTIONS and
749 DEFAULT_BACKEND_PERSISTENT_CONNECTIONS.
751 - LSST_HTTP_DIGEST: case-insensitive name of the digest algorithm to
752 ask the server to compute for every file's content sent to the server
753 via a PUT request. No digest is requested if this variable is not set
754 or is set to an invalid value.
755 Valid values are those in ACCEPTED_DIGESTS.
757 - LSST_HTTP_ENABLE_FSSPEC: the presence of this environment variable
758 activates the usage of `fsspec` compatible file system to read
759 a HTTP URL. The value of the variable is not inspected.
760 """
762 @staticmethod
763 def create_http_resource_path(
764 path: str, *, extra_headers: dict[str, str] | None = None
765 ) -> HttpResourcePath:
766 """Create an instance of `HttpResourcePath` with additional
767 HTTP-specific configuration.
769 Parameters
770 ----------
771 path : `str`
772 HTTP URL to be wrapped in a `ResourcePath` instance.
773 extra_headers : `dict` [ `str`, `str` ], optional
774 Additional headers that will be sent with every HTTP request made
775 by this `ResourcePath`. These override any headers that may be
776 generated internally by `HttpResourcePath` (e.g. authentication
777 headers).
779 Returns
780 -------
781 instance : `ResourcePath`
782 Newly-created `HttpResourcePath` instance.
784 Notes
785 -----
786 Most users should use the `ResourcePath` constructor, instead.
787 """
788 # Make sure we instantiate ResourcePath using a string to guarantee we
789 # get a new ResourcePath. If we accidentally provided a ResourcePath
790 # instance instead, the ResourcePath constructor sometimes returns
791 # the original object and we would be modifying an object that is
792 # supposed to be immutable.
793 instance = ResourcePath(str(path))
794 assert isinstance(instance, HttpResourcePath)
795 instance._extra_headers = extra_headers
796 return instance
798 # WebDAV servers known to be able to sign URLs. The values are lowercased
799 # server identifiers retrieved from the 'Server' header included in
800 # the response to a HTTP OPTIONS request.
801 SUPPORTED_URL_SIGNERS = ("dcache", "xrootd")
803 # Configuration items for this class instances.
804 _config: HttpResourcePathConfig = HttpResourcePathConfig()
806 # The session for metadata requests is used for interacting with
807 # the front end servers for requests such as PROPFIND, HEAD, etc. Those
808 # interactions are typically served by the front end servers. We want to
809 # keep the connection to the front end servers open, to reduce the cost
810 # associated to TCP and TLS handshaking for each new request.
811 _metadata_session_store = SessionStore(
812 config=_config,
813 num_pools=5,
814 max_persistent_connections=_config.front_end_connections,
815 backoff_min=_config.backoff_min,
816 backoff_max=_config.backoff_max,
817 )
819 # The data session is used for interaction with the front end servers which
820 # typically redirect to the back end servers for serving our PUT and GET
821 # requests. We attempt to keep a single connection open with the front end
822 # server, if possible. This depends on how the server behaves and the
823 # kind of request. Some servers close the connection when redirecting
824 # the client to a back end server, for instance when serving a PUT
825 # request.
826 _data_session_store = SessionStore(
827 config=_config,
828 num_pools=25,
829 max_persistent_connections=_config.back_end_connections,
830 backoff_min=_config.backoff_min,
831 backoff_max=_config.backoff_max,
832 )
834 # Process ID which created the session stores above. We need to store this
835 # to replace sessions created by a parent process and inherited by a
836 # child process after a fork, to avoid confusing the SSL layer.
837 _pid: int = -1
839 # Connector used by a session pool to establish network connections to
840 # remote servers. This connector is exclusively used by fsspec file system
841 # and is shared by all instances of this class.
842 _tcp_connector: TCPConnector | None = None
844 # Additional headers added to every request.
845 _extra_headers: dict[str, str] | None = None
847 @property
848 def metadata_session(self) -> _SessionWrapper:
849 """Client session to send requests which do not require upload or
850 download of data, i.e. mostly metadata requests.
851 """
852 session = None
853 if hasattr(self, "_metadata_session"):
854 if HttpResourcePath._pid == os.getpid():
855 session = self._metadata_session
856 else:
857 # The metadata session we have in cache was likely created by
858 # a parent process. Discard all the sessions in that store.
859 self._metadata_session_store.clear()
861 # Retrieve a new metadata session.
862 if session is None:
863 HttpResourcePath._pid = os.getpid()
864 session = self._metadata_session_store.get(self)
865 self._metadata_session: requests.Session = session
866 return _SessionWrapper(session, extra_headers=self._extra_headers)
868 @property
869 def data_session(self) -> _SessionWrapper:
870 """Client session for uploading and downloading data."""
871 session = None
872 if hasattr(self, "_data_session"):
873 if HttpResourcePath._pid == os.getpid():
874 session = self._data_session
875 else:
876 # The data session we have in cache was likely created by
877 # a parent process. Discard all the sessions in that store.
878 self._data_session_store.clear()
880 # Retrieve a new data session.
881 if session is None:
882 HttpResourcePath._pid = os.getpid()
883 session = self._data_session_store.get(self)
884 self._data_session: requests.Session = session
885 return _SessionWrapper(session, extra_headers=self._extra_headers)
887 def _clear_sessions(self) -> None:
888 """Close the socket connections that are still open.
890 Used only in test suites to avoid warnings.
891 """
892 self._metadata_session_store.clear()
893 self._data_session_store.clear()
895 if hasattr(self, "_metadata_session"):
896 delattr(self, "_metadata_session")
898 if hasattr(self, "_data_session"):
899 delattr(self, "_data_session")
901 def _init_server_properties(self) -> None:
902 """Initialize instance variables '_is_webdav' and '_server' by
903 sending a single OPTIONS request to the remote server and
904 saving the results.
905 """
906 # Retrieve the "DAV" and the "Server" headers for the root URL of this
907 # path
908 dav_header, server_header = _get_dav_and_server_headers(self.root_uri())
910 # Check that "1" is part of the value of the "DAV" header. We don't
911 # use locks, so a server complying to class 1 is enough for our
912 # purposes. All webDAV servers must advertise at least compliance
913 # class "1".
914 #
915 # Compliance classes are documented in
916 # http://www.webdav.org/specs/rfc4918.html#dav.compliance.classes
917 #
918 # Examples of values for header DAV are:
919 # DAV: 1, 2
920 # DAV: 1, <http://apache.org/dav/propset/fs/1>
921 self._is_webdav: bool = False
922 if dav_header is not None:
923 self._is_webdav = "1" in dav_header.replace(" ", "").split(",")
925 self._server: str | None = None
926 if server_header is not None:
927 # Server header is expected to be of the form 'dCache/9.2.4'
928 # or 'XrootD/v5.7.1'. Strip version and put in lowercase.
929 self._server = server_header.split("/")[0].lower()
931 @property
932 def is_webdav_endpoint(self) -> bool:
933 """Check if the current endpoint implements WebDAV features.
935 This is stored per URI but cached by root so there is only one check
936 per hostname.
937 """
938 if hasattr(self, "_is_webdav"):
939 return self._is_webdav
941 self._init_server_properties()
942 return self._is_webdav
944 @property
945 def server(self) -> str | None:
946 """Return the lowercased identifier of the remote server, retrieved
947 from the response header 'Server' from an 'OPTIONS' HTTP request.
949 If the remote server does not include that header in its response
950 to an 'OPTIONS' request, server() returns None.
952 Examples of return values are "dcache", "xrootd".
953 """
954 if hasattr(self, "_server"):
955 return self._server
957 self._init_server_properties()
958 return self._server
960 @property
961 def server_signs_urls(self) -> bool:
962 """Return true if the remote server support signing or URLs for
963 download and upload.
964 """
965 return self.server in HttpResourcePath.SUPPORTED_URL_SIGNERS
967 @classmethod
968 def _reload_config(cls) -> None:
969 """Reload the configuration for all instances of this class. That
970 configuration is instantiated from the environment.
972 This is an internal method mainly intended for tests.
973 """
974 HttpResourcePath._config = HttpResourcePathConfig()
976 def exists(self) -> bool:
977 """Check that a remote HTTP resource exists."""
978 log.debug("Checking if resource exists: %s", self.geturl())
979 if not self.is_webdav_endpoint:
980 # The remote is a plain HTTP server. Let's attempt a HEAD
981 # request, even if the behavior for such a request against a
982 # directory is not specified, so it depends on the server
983 # implementation.
984 resp = self._head_non_webdav_url()
985 return self._is_successful_non_webdav_head_request(resp)
987 # The remote endpoint is a webDAV server: send a PROPFIND request
988 # to determine if it exists.
989 resp = self._propfind()
990 if resp.status_code == requests.codes.multi_status: # 207
991 prop = _parse_propfind_response_body(resp.text)[0]
992 return prop.exists
993 else: # 404 Not Found
994 return False
996 def size(self) -> int:
997 """Return the size of the remote resource in bytes."""
998 if self.dirLike:
999 return 0
1000 info = self.get_info()
1001 # dirLike can be None if we are unsure. Only flag if we are certain
1002 # we have been told this is a directory but webDAV reports it as a
1003 # file.
1004 if not info.is_file and self.dirLike is False:
1005 raise IsADirectoryError(
1006 f"Resource {self} is reported by server as a directory but has a file path"
1007 )
1008 return info.size
1010 def get_info(self) -> ResourceInfo:
1011 """Return lightweight metadata about this HTTP resource."""
1012 if not self.is_webdav_endpoint:
1013 resp = self._head_non_webdav_url()
1014 return self._get_info_from_non_webdav_head(resp)
1016 resp = self._propfind()
1017 if resp.status_code != requests.codes.multi_status:
1018 raise FileNotFoundError(
1019 f"Resource {self} does not exist, status: {resp.status_code} {resp.reason}"
1020 )
1022 prop = _parse_propfind_response_body(resp.text)[0]
1023 if not prop.exists:
1024 raise FileNotFoundError(f"Resource {self} does not exist")
1026 return ResourceInfo(
1027 uri=str(self),
1028 is_file=prop.is_file,
1029 size=prop.size,
1030 last_modified=prop.last_modified,
1031 checksums=dict(prop.checksums),
1032 )
1034 def _get_info_from_non_webdav_head(self, resp: requests.Response) -> ResourceInfo:
1035 """Build `ResourceInfo` from a non-WebDAV HEAD-like response."""
1036 if not self._is_successful_non_webdav_head_request(resp):
1037 if resp.status_code == requests.codes.not_found:
1038 raise FileNotFoundError(
1039 f"Resource {self} does not exist, status: {resp.status_code} {resp.reason}"
1040 )
1041 raise ValueError(
1042 f"Unexpected response for HEAD request for {self}, status: {resp.status_code} {resp.reason}"
1043 )
1045 if self.dirLike:
1046 size = 0
1047 elif resp.status_code == requests.codes.ok: # 200
1048 if "Content-Length" not in resp.headers:
1049 raise ValueError(
1050 f"Response to HEAD request to {self} does not contain 'Content-Length' header"
1051 )
1052 size = int(resp.headers["Content-Length"])
1053 elif resp.status_code == requests.codes.partial_content:
1054 # 206 Partial Content, returned from a GET request with a Range
1055 # header (used to emulate HEAD for presigned S3 URLs).
1056 content_range_header = resp.headers.get("Content-Range")
1057 if content_range_header is None:
1058 raise ValueError(f"Response to GET request to {self} did not contain 'Content-Range' header")
1059 content_range = parse_content_range_header(content_range_header)
1060 size_total = content_range.total
1061 if size_total is None:
1062 raise ValueError(f"Content-Range header for {self} did not include a total file size")
1063 size = size_total
1064 else:
1065 # 416 Range Not Satisfiable can occur on a GET for a 0-byte file.
1066 size = 0
1068 checksums = {}
1069 digest_header = resp.headers.get("Digest")
1070 if digest_header is not None:
1071 for digest in digest_header.split(","):
1072 algorithm, separator, value = digest.strip().partition("=")
1073 if separator:
1074 checksums[algorithm.lower()] = value
1076 last_modified = None
1077 if last_modified_header := resp.headers.get("Last-Modified"):
1078 last_modified = parsedate_to_datetime(last_modified_header)
1079 if last_modified.tzinfo is None:
1080 last_modified = last_modified.replace(tzinfo=datetime.UTC)
1081 else:
1082 last_modified = last_modified.astimezone(datetime.UTC)
1084 return ResourceInfo(
1085 uri=str(self),
1086 is_file=not self.dirLike,
1087 size=size,
1088 last_modified=last_modified,
1089 checksums=checksums,
1090 )
1092 def _head_non_webdav_url(self) -> requests.Response:
1093 """Return a response from a HTTP HEAD request for a non-WebDAV HTTP
1094 URL.
1096 Emulates HEAD using a 1-byte GET for presigned S3 URLs.
1097 """
1098 if self._looks_like_presigned_s3_url():
1099 # Presigned S3 URLs are signed for a single method only, so you
1100 # can't call HEAD on a URL signed for GET. However, S3 does
1101 # support Range requests, so you can ask for a 1-byte range with
1102 # GET for a similar effect to HEAD.
1103 #
1104 # Note that some headers differ between a true HEAD request and the
1105 # response returned by this GET, e.g. Content-Length will always be
1106 # 1, and the status code is 206 instead of 200.
1107 return self.metadata_session.get(
1108 self.geturl(),
1109 timeout=self._config.timeout,
1110 allow_redirects=True,
1111 stream=False,
1112 headers={"Range": "bytes=0-0"},
1113 )
1114 else:
1115 return self.metadata_session.head(
1116 self.geturl(), timeout=self._config.timeout, allow_redirects=True, stream=False
1117 )
1119 def _is_successful_non_webdav_head_request(self, resp: requests.Response) -> bool:
1120 """Return `True` if the status code in the response indicates a
1121 successful response to ``_head_non_webdav_url``.
1122 """
1123 return resp.status_code in (
1124 requests.codes.ok, # 200, from a normal HEAD or GET request
1125 requests.codes.partial_content, # 206, returned from a GET request with a Range header.
1126 # 416, returned from a GET request with a 1-byte Range header that
1127 # is longer than the 0-byte file.
1128 requests.codes.range_not_satisfiable,
1129 )
1131 def _looks_like_presigned_s3_url(self) -> bool:
1132 """Return `True` if this ResourcePath's URL is likely to be a presigned
1133 S3 URL.
1134 """
1135 query_params = parse_qs(self._uri.query)
1136 return "Signature" in query_params and "Expires" in query_params
1138 def mkdir(self) -> None:
1139 """Create the directory resource if it does not already exist."""
1140 # Creating directories is only available on WebDAV back ends.
1141 if not self.is_webdav_endpoint:
1142 raise NotImplementedError(
1143 f"Creation of directory {self} is not implemented by plain HTTP servers"
1144 )
1146 if not self.dirLike:
1147 raise NotADirectoryError(f"Can not create a 'directory' for file-like URI {self}")
1149 # Check if the target directory already exists.
1150 resp = self._propfind()
1151 if resp.status_code == requests.codes.multi_status: # 207
1152 prop = _parse_propfind_response_body(resp.text)[0]
1153 if prop.exists:
1154 if prop.is_directory:
1155 return
1156 else:
1157 # A file exists at this path
1158 raise NotADirectoryError(
1159 f"Can not create a directory for {self} because a file already exists at that path"
1160 )
1162 # Target directory does not exist. Create it and its ancestors as
1163 # needed. We need to test if parent URL is different from self URL,
1164 # otherwise we could be stuck in a recursive loop
1165 # where self == parent.
1166 if self.geturl() != self.parent().geturl():
1167 self.parent().mkdir()
1169 log.debug("Creating new directory: %s", self.geturl())
1170 self._mkcol()
1172 def remove(self) -> None:
1173 """Remove the resource."""
1174 self._delete()
1176 def read(self, size: int = -1) -> bytes:
1177 """Open the resource and return the contents in bytes.
1179 Parameters
1180 ----------
1181 size : `int`, optional
1182 The number of bytes to read. Negative or omitted indicates
1183 that all data should be read.
1184 """
1185 # Use the data session as a context manager to ensure that the
1186 # network connections to both the front end and back end servers are
1187 # closed after downloading the data.
1188 log.debug("Reading from remote resource: %s", self.geturl())
1189 stream = size > 0
1190 with self.data_session as session:
1191 with time_this(log, msg="GET %s", args=(self,)):
1192 resp = session.get(self.geturl(), stream=stream, timeout=self._config.timeout)
1194 if resp.status_code != requests.codes.ok: # 200
1195 raise FileNotFoundError(
1196 f"Unable to read resource {self}; status: {resp.status_code} {resp.reason}"
1197 )
1198 if not stream:
1199 return resp.content
1200 else:
1201 return next(resp.iter_content(chunk_size=size))
1203 def write(self, data: bytes, overwrite: bool = True) -> None:
1204 """Write the supplied bytes to the new resource.
1206 Parameters
1207 ----------
1208 data : `bytes`
1209 The bytes to write to the resource. The entire contents of the
1210 resource will be replaced.
1211 overwrite : `bool`, optional
1212 If `True` the resource will be overwritten if it exists. Otherwise
1213 the write will fail.
1214 """
1215 log.debug("Writing to remote resource: %s", self.geturl())
1216 if not overwrite and self.exists():
1217 raise FileExistsError(f"Remote resource {self} exists and overwrite has been disabled")
1219 # Ensure the parent directory exists.
1220 # This is only meaningful and appropriate for WebDAV, not the general
1221 # HTTP case. e.g. for S3 HTTP URLs, the underlying service has no
1222 # concept of 'directories' at all.
1223 if self.is_webdav_endpoint:
1224 self.parent().mkdir()
1226 # Upload the data.
1227 log.debug("Writing data to remote resource: %s", self.geturl())
1228 self._put(data=data)
1230 def transfer_from(
1231 self,
1232 src: ResourcePath,
1233 transfer: str = "copy",
1234 overwrite: bool = False,
1235 transaction: TransactionProtocol | None = None,
1236 multithreaded: bool = True,
1237 ) -> None:
1238 """Transfer the current resource to a Webdav repository.
1240 Parameters
1241 ----------
1242 src : `ResourcePath`
1243 Source URI.
1244 transfer : `str`
1245 Mode to use for transferring the resource. Supports the following
1246 options: copy.
1247 overwrite : `bool`, optional
1248 Whether overwriting the remote resource is allowed or not.
1249 transaction : `~lsst.resources.utils.TransactionProtocol`, optional
1250 Currently unused.
1251 multithreaded : `bool`, optional
1252 If `True` the transfer will be allowed to attempt to improve
1253 throughput by using parallel download streams. This may of no
1254 effect if the URI scheme does not support parallel streams or
1255 if a global override has been applied. If `False` parallel
1256 streams will be disabled.
1257 """
1258 # Fail early to prevent delays if remote resources are requested.
1259 if transfer not in self.transferModes:
1260 raise ValueError(f"Transfer mode {transfer} not supported by URI scheme {self.scheme}")
1262 # Existence checks cost time so do not call this unless we know
1263 # that debugging is enabled.
1264 if log.isEnabledFor(logging.DEBUG):
1265 log.debug(
1266 "Transferring %s [exists: %s] -> %s [exists: %s] (transfer=%s)",
1267 src,
1268 src.exists(),
1269 self,
1270 self.exists(),
1271 transfer,
1272 )
1274 # Short circuit immediately if the URIs are identical.
1275 if self == src:
1276 log.debug(
1277 "Target and destination URIs are identical: %s, returning immediately."
1278 " No further action required.",
1279 self,
1280 )
1281 return
1283 if not overwrite and self.exists():
1284 raise FileExistsError(f"Destination path {self} already exists.")
1286 if transfer == "auto":
1287 transfer = self.transferDefault
1289 # We can use webDAV 'COPY' or 'MOVE' if both the current and source
1290 # resources are located in the same server.
1291 if isinstance(src, type(self)) and self.root_uri() == src.root_uri() and self.is_webdav_endpoint:
1292 log.debug("Transfer from %s to %s directly", src, self)
1293 return self._move(src) if transfer == "move" else self._copy(src)
1295 # For resources of different classes or for plain HTTP resources we can
1296 # perform the copy or move operation by downloading to a local file
1297 # and uploading to the destination.
1298 self._copy_via_local(src)
1300 # This was an explicit move, try to remove the source.
1301 if transfer == "move":
1302 src.remove()
1304 def walk(
1305 self, file_filter: str | re.Pattern | None = None
1306 ) -> Iterator[list | tuple[ResourcePath, list[str], list[str]]]:
1307 """Walk the directory tree returning matching files and directories.
1309 Parameters
1310 ----------
1311 file_filter : `str` or `re.Pattern`, optional
1312 Regex to filter out files from the list before it is returned.
1314 Yields
1315 ------
1316 dirpath : `ResourcePath`
1317 Current directory being examined.
1318 dirnames : `list` of `str`
1319 Names of subdirectories within dirpath.
1320 filenames : `list` of `str`
1321 Names of all the files within dirpath.
1322 """
1323 if not self.dirLike:
1324 raise ValueError("Can not walk a non-directory URI")
1326 # Walking directories is only available on WebDAV back ends.
1327 if not self.is_webdav_endpoint:
1328 raise NotImplementedError(f"Walking directory {self} is not implemented by plain HTTP servers")
1330 if isinstance(file_filter, str):
1331 file_filter = re.compile(file_filter)
1333 resp = self._propfind(depth="1")
1334 if resp.status_code == requests.codes.multi_status: # 207
1335 files: list[str] = []
1336 dirs: list[str] = []
1338 for prop in _parse_propfind_response_body(resp.text):
1339 if prop.is_file:
1340 files.append(prop.name)
1341 elif not prop.href.rstrip("/").endswith(self.path.rstrip("/")):
1342 # Only include the names of sub-directories not the name of
1343 # the directory being walked.
1344 dirs.append(prop.name)
1346 if file_filter is not None:
1347 files = [f for f in files if file_filter.search(f)]
1349 if not dirs and not files:
1350 return
1351 else:
1352 yield type(self)(self, forceAbsolute=False, forceDirectory=True), dirs, files
1354 for dir in dirs:
1355 new_uri = self.join(dir, forceDirectory=True)
1356 yield from new_uri.walk(file_filter)
1358 def generate_presigned_get_url(self, *, expiration_time_seconds: int) -> str:
1359 """Return a pre-signed URL that can be used to retrieve this resource
1360 using an HTTP GET without supplying any access credentials.
1362 Parameters
1363 ----------
1364 expiration_time_seconds : `int`
1365 Number of seconds until the generated URL is no longer valid.
1367 Returns
1368 -------
1369 url : `str`
1370 HTTP URL signed for GET.
1371 """
1372 if not self.is_webdav_endpoint:
1373 # This is already an HTTP URL readable without any authentication
1374 # credentials, so return it as-is.
1375 return str(self)
1377 return self._sign_with_macaroon(ActivityCaveat.DOWNLOAD, expiration_time_seconds)
1379 def generate_presigned_put_url(self, *, expiration_time_seconds: int) -> str:
1380 """Return a pre-signed URL that can be used to upload a file to this
1381 path using an HTTP PUT without supplying any access credentials.
1383 Parameters
1384 ----------
1385 expiration_time_seconds : `int`
1386 Number of seconds until the generated URL is no longer valid.
1388 Returns
1389 -------
1390 url : `str`
1391 HTTP URL signed for PUT.
1392 """
1393 if not self.is_webdav_endpoint:
1394 return super().generate_presigned_put_url(expiration_time_seconds=expiration_time_seconds)
1396 return self._sign_with_macaroon(ActivityCaveat.UPLOAD, expiration_time_seconds)
1398 def to_fsspec(self) -> tuple[AbstractFileSystem, str]:
1399 """Return an abstract file system and path that can be used by fsspec.
1401 Returns
1402 -------
1403 fs : `fsspec.spec.AbstractFileSystem`
1404 A file system object suitable for use with the returned path.
1405 path : `str`
1406 A path that can be opened by the file system object.
1407 """
1408 if fsspec is None:
1409 return super().to_fsspec()
1411 if not self.is_webdav_endpoint or self.server not in HttpResourcePath.SUPPORTED_URL_SIGNERS:
1412 return fsspec.url_to_fs(self.geturl(), client_kwargs={"headers": self._extra_headers})
1414 if self.isdir():
1415 raise NotImplementedError(
1416 f"method HttpResourcePath.to_fsspec() not implemented for directory {self}"
1417 )
1419 # If usage of fsspec-compatible file system is disabled in the
1420 # configuration we raise an exception which signals the caller
1421 # that it cannot use fsspec. An example of such a caller is
1422 # `lsst.daf.butler.formatters.ParquetFormatter`.
1423 #
1424 # Note that we don't call super().to_fsspec() since that method
1425 # assumes that fsspec can be used provided fsspec package is
1426 # importable.
1427 #
1428 # The motivation for making this configurable is that for HTTP
1429 # URLs fsspec.HTTPFileSystem uses async I/O and we have found
1430 # unexpected behavior by clients when used against dCache for reading
1431 # parquet files via a ParquetFormatter instance. That behavior cannot
1432 # be reproduced when using other callers.
1433 #
1434 # This needs more investigation to discard the possibility that async
1435 # I/O, used by fsspec.HTTPFileSystem, is related to this behavior.
1436 if not self._config.fsspec_is_enabled:
1437 raise ImportError("fsspec is disabled for HttpResourcePath objects with webDAV back end")
1439 async def get_client_session(**kwargs: Any) -> ClientSession:
1440 """Return a aiohttp.ClientSession configured to use an
1441 `aiohttp.TCPConnector` shared by all instances of this class.
1443 Parameters
1444 ----------
1445 **kwargs : `Any`
1446 Keyword arguments passed unmodified to the contructor of
1447 `aiohttp.ClientSession`.
1449 Returns
1450 -------
1451 session : `aiohttp.ClientSession`
1452 Client session that `aiohttp.HTTPFileSystem` will use to pool
1453 TCP connections to the server.
1454 """
1455 if HttpResourcePath._tcp_connector is None:
1456 HttpResourcePath._tcp_connector = TCPConnector(
1457 # SSL context equipped with client credentials and
1458 # configured to validate server certificates.
1459 ssl=self._config.ssl_context,
1460 # Total number of simultaneous connections this connector
1461 # keeps open with any host.
1462 #
1463 # The default is 100 but we deliberately reduced it to
1464 # avoid keeping a large number of open connexions to file
1465 # servers when thousands of quanta execute simultaneously.
1466 #
1467 # In any case, new connexions are automatically established
1468 # when needed.
1469 limit=10,
1470 # Number of simultaneous connections to a single host:port.
1471 limit_per_host=1,
1472 # Close network connection after usage
1473 force_close=True,
1474 )
1476 connect_timeout, read_timeout = self._config.timeout
1477 return ClientSession(
1478 connector=HttpResourcePath._tcp_connector,
1479 timeout=ClientTimeout(
1480 connect=connect_timeout,
1481 sock_connect=connect_timeout,
1482 sock_read=read_timeout,
1483 total=2 * read_timeout,
1484 ),
1485 **kwargs,
1486 )
1488 # Retrieve a signed URL for download valid for 2 hours.
1489 url = self.generate_presigned_get_url(expiration_time_seconds=2 * 3_600)
1491 # HTTPFileSystem constructor accepts the argument 'block_size'. The
1492 # default value is 'fsspec.utils.DEFAULT_BLOCK_SIZE' which is 5 MB.
1493 # That seems to be a reasonable block size for downloading files.
1494 return HTTPFileSystem(get_client=get_client_session), url
1496 def _sign_with_macaroon(self, activity: ActivityCaveat, expiration_time_seconds: int) -> str:
1497 # dCache and XRootD webDAV servers support delivery of macaroons.
1498 #
1499 # For details about dCache macaroons see:
1500 # https://www.dcache.org/manuals/UserGuide-9.2/macaroons.shtml
1501 if self.server is None:
1502 raise NotImplementedError(f"server for '{self}' does not support signing URLs")
1503 elif self.server not in HttpResourcePath.SUPPORTED_URL_SIGNERS:
1504 raise NotImplementedError(f"server '{self.server}' does not support signing for {self}")
1506 match activity:
1507 case ActivityCaveat.DOWNLOAD:
1508 activity_caveat = "DOWNLOAD,LIST"
1509 case ActivityCaveat.UPLOAD:
1510 activity_caveat = "UPLOAD,LIST"
1512 # Retrieve a macaroon for the requested activities and duration
1513 headers = {"Content-Type": "application/macaroon-request"}
1514 body = {
1515 "caveats": [
1516 f"activity:{activity_caveat}",
1517 ],
1518 "validity": f"PT{expiration_time_seconds}S",
1519 }
1520 resp = self._post(data=json.dumps(body), headers=headers)
1521 if resp.status_code != requests.codes.ok:
1522 raise ValueError(
1523 f"could not retrieve a macaroon for URL {self}, status: {resp.status_code} {resp.reason}"
1524 )
1526 # We are expecting the body of the response to be formatted in JSON.
1527 # dCache sets the 'Content-Type' of the response to 'application/json'
1528 # but XRootD does not set any 'Content-Type' header 8-[
1529 #
1530 # An example of a response body returned by dCache is shown below:
1531 # {
1532 # "macaroon": "MDA[...]Qo",
1533 # "uri": {
1534 # "targetWithMacaroon": "https://dcache.example.org/?authz=MD...",
1535 # "baseWithMacaroon": "https://dcache.example.org/?authz=MD...",
1536 # "target": "https://dcache.example.org/",
1537 # "base": "https://dcache.example.org/"
1538 # }
1539 # }
1540 #
1541 # An example of a response body returned by XRootD is shown below:
1542 # {
1543 # "macaroon": "MDA[...]Qo",
1544 # "expires_in": 86400
1545 # }
1546 try:
1547 response_body = json.loads(resp.text)
1548 if "macaroon" in response_body:
1549 return str(self.replace(query=f"authz={response_body['macaroon']}"))
1550 else:
1551 raise ValueError(f"could not retrieve macaroon for URL {self}")
1552 except json.JSONDecodeError:
1553 raise ValueError(f"could not deserialize response to POST request for URL {self}")
1555 @contextlib.contextmanager
1556 def _as_local(
1557 self, multithreaded: bool = True, tmpdir: ResourcePath | None = None
1558 ) -> Iterator[ResourcePath]:
1559 """Download object over HTTP and place in temporary directory.
1561 Parameters
1562 ----------
1563 multithreaded : `bool`, optional
1564 If `True` the transfer will be allowed to attempt to improve
1565 throughput by using parallel download streams. This may of no
1566 effect if the URI scheme does not support parallel streams or
1567 if a global override has been applied. If `False` parallel
1568 streams will be disabled.
1569 tmpdir : `ResourcePath` or `None`, optional
1570 Explicit override of the temporary directory to use for remote
1571 downloads.
1573 Returns
1574 -------
1575 local_uri : `ResourcePath`
1576 A URI to a local POSIX file corresponding to a local temporary
1577 downloaded copy of the resource.
1578 """
1579 # Use the session as a context manager to ensure that connections
1580 # to both the front end and back end servers are closed after the
1581 # download operation is finished.
1582 with self.data_session as session:
1583 resp = session.get(self.geturl(), stream=True, timeout=self._config.timeout)
1584 if resp.status_code != requests.codes.ok:
1585 raise FileNotFoundError(
1586 f"Unable to download resource {self}; status: {resp.status_code} {resp.reason}"
1587 )
1589 if tmpdir is None:
1590 temp_dir, buffer_size = self._config.tmpdir_buffersize
1591 tmpdir = ResourcePath(temp_dir, forceDirectory=True)
1592 else:
1593 buffer_size = _calc_tmpdir_buffer_size(tmpdir.ospath)
1595 with ResourcePath.temporary_uri(
1596 suffix=self.getExtension(), prefix=tmpdir, delete=True
1597 ) as tmp_uri:
1598 expected_length = int(resp.headers.get("Content-Length", "-1"))
1599 with time_this(
1600 log,
1601 msg="GET %s [length=%d] to local file %s [chunk_size=%d]",
1602 args=(self, expected_length, tmp_uri, buffer_size),
1603 mem_usage=self._config.collect_memory_usage,
1604 mem_unit=u.mebibyte,
1605 ):
1606 content_length = 0
1607 with open(tmp_uri.ospath, "wb", buffering=buffer_size) as tmpFile:
1608 for chunk in resp.iter_content(chunk_size=buffer_size):
1609 tmpFile.write(chunk)
1610 content_length += len(chunk)
1612 # Check that the expected and actual content lengths match.
1613 # Perform this check only when the contents of the file was not
1614 # encoded by the server.
1615 if (
1616 "Content-Encoding" not in resp.headers
1617 and expected_length >= 0
1618 and expected_length != content_length
1619 ):
1620 raise ValueError(
1621 f"Size of downloaded file does not match value in Content-Length header for {self}: "
1622 f"expecting {expected_length} and got {content_length} bytes"
1623 )
1625 yield tmp_uri
1627 def _send_webdav_request(
1628 self,
1629 method: str,
1630 url: str | None = None,
1631 headers: dict[str, str] | None = None,
1632 body: str | None = None,
1633 session: _SessionWrapper | None = None,
1634 timeout: tuple[float, float] | None = None,
1635 ) -> requests.Response:
1636 """Send a webDAV request and correctly handle redirects.
1638 Parameters
1639 ----------
1640 method : `str`
1641 The mthod of the HTTP request to be sent, e.g. PROPFIND, MKCOL.
1642 headers : `dict`, optional
1643 A dictionary of key-value pairs (both strings) to include as
1644 headers in the request.
1645 body : `str`, optional
1646 The body of the request.
1648 Notes
1649 -----
1650 This way of sending webDAV requests is necessary for handling
1651 redirection ourselves, since the 'requests' package changes the method
1652 of the redirected request when the server responds with status 302 and
1653 the method of the original request is not HEAD (which is the case for
1654 webDAV requests).
1656 That means that when the webDAV server we interact with responds with
1657 a redirection to a PROPFIND or MKCOL request, the request gets
1658 converted to a GET request when sent to the redirected location.
1660 See `requests.sessions.SessionRedirectMixin.rebuild_method()` in
1661 https://github.com/psf/requests/blob/main/requests/sessions.py
1663 This behavior of the 'requests' package is meant to be compatible with
1664 what is specified in RFC 9110:
1666 https://www.rfc-editor.org/rfc/rfc9110#name-302-found
1668 For our purposes, we do need to follow the redirection and send a new
1669 request using the same HTTP verb.
1670 """
1671 if url is None:
1672 url = self.geturl()
1674 if headers is None:
1675 headers = {}
1677 if session is None:
1678 session = self.metadata_session
1680 if timeout is None:
1681 timeout = self._config.timeout
1683 with time_this(
1684 log,
1685 msg="%s %s",
1686 args=(
1687 method,
1688 url,
1689 ),
1690 mem_usage=self._config.collect_memory_usage,
1691 mem_unit=u.mebibyte,
1692 ):
1693 for _ in range(max_redirects := 5):
1694 resp = session.request(
1695 method,
1696 url,
1697 data=body,
1698 headers=headers,
1699 stream=False,
1700 timeout=timeout,
1701 allow_redirects=False,
1702 )
1703 if resp.is_redirect:
1704 url = resp.headers["Location"]
1705 else:
1706 return resp
1708 # We reached the maximum allowed number of redirects.
1709 # Stop trying.
1710 raise ValueError(
1711 f"Could not get a response to {method} request for {self} after {max_redirects} redirections"
1712 )
1714 def _propfind(self, body: str | None = None, depth: str = "0") -> requests.Response:
1715 """Send a PROPFIND webDAV request and return the response.
1717 Parameters
1718 ----------
1719 body : `str`, optional
1720 The body of the PROPFIND request to send to the server. If
1721 provided, it is expected to be a XML document.
1722 depth : `str`, optional
1723 The value of the 'Depth' header to include in the request.
1725 Returns
1726 -------
1727 response : `requests.Response`
1728 Response to the PROPFIND request.
1730 Notes
1731 -----
1732 It raises `ValueError` if the status code of the PROPFIND request
1733 is different from "207 Multistatus" or "404 Not Found".
1734 """
1735 if body is None:
1736 # Request only the DAV live properties we are explicitly interested
1737 # in namely 'resourcetype', 'getcontentlength', 'getlastmodified'
1738 # and 'displayname'.
1739 body = (
1740 """<?xml version="1.0" encoding="utf-8" ?>"""
1741 """<D:propfind xmlns:D="DAV:"><D:prop>"""
1742 """<D:resourcetype/><D:getcontentlength/><D:getlastmodified/><D:displayname/>"""
1743 """</D:prop></D:propfind>"""
1744 )
1745 headers = {
1746 "Depth": depth,
1747 "Content-Type": 'application/xml; charset="utf-8"',
1748 "Content-Length": str(len(body)),
1749 }
1750 resp = self._send_webdav_request("PROPFIND", headers=headers, body=body)
1751 if resp.status_code in (requests.codes.multi_status, requests.codes.not_found):
1752 return resp
1753 else:
1754 raise ValueError(
1755 f"Unexpected response for PROPFIND request for {self}, status: {resp.status_code} "
1756 f"{resp.reason}"
1757 )
1759 def _options(self) -> requests.Response:
1760 """Send a OPTIONS webDAV request for this resource."""
1761 resp = self._send_webdav_request("OPTIONS")
1762 if resp.status_code in (requests.codes.ok, requests.codes.created):
1763 return resp
1765 raise ValueError(
1766 f"Unexpected response to OPTIONS request for {self}, status: {resp.status_code} {resp.reason}"
1767 )
1769 def _head(self) -> requests.Response:
1770 """Send a HEAD request for this resource."""
1771 if not self.is_webdav_endpoint:
1772 # The remote is a plain HTTP server.
1773 return self._head_non_webdav_url()
1774 return self._send_webdav_request("HEAD")
1776 def _mkcol(self) -> None:
1777 """Send a MKCOL webDAV request to create a collection. The collection
1778 may already exist.
1779 """
1780 resp = self._send_webdav_request("MKCOL")
1781 if resp.status_code == requests.codes.created: # 201
1782 return
1784 if resp.status_code == requests.codes.method_not_allowed: # 405
1785 # The remote directory already exists
1786 log.debug("Can not create directory: %s may already exist: skipping.", self.geturl())
1787 else:
1788 raise ValueError(f"Can not create directory {self}, status: {resp.status_code} {resp.reason}")
1790 def _delete(self) -> None:
1791 """Send a DELETE webDAV request for this resource."""
1792 log.debug("Deleting %s ...", self.geturl())
1794 # If this is a directory, ensure the remote is a webDAV server because
1795 # plain HTTP servers don't support DELETE requests on non-file
1796 # paths.
1797 if self.dirLike and not self.is_webdav_endpoint:
1798 raise NotImplementedError(
1799 f"Deletion of directory {self} is not implemented by plain HTTP servers"
1800 )
1802 # Deleting non-empty directories may take some time, so increase
1803 # the timeout for getting a response from the server.
1804 timeout = self._config.timeout
1805 if self.dirLike:
1806 timeout = (timeout[0], timeout[1] * 100)
1807 resp = self._send_webdav_request("DELETE", timeout=timeout)
1808 if resp.status_code in (
1809 requests.codes.ok,
1810 requests.codes.accepted,
1811 requests.codes.no_content,
1812 requests.codes.not_found,
1813 ):
1814 # We can get a "404 Not Found" error when the file or directory
1815 # does not exist or when the DELETE request was retried several
1816 # times and a previous attempt actually deleted the resource.
1817 # Therefore we consider that a "Not Found" response is not an
1818 # error since we reached the state desired by the user.
1819 return
1820 else:
1821 # TODO: the response to a DELETE request against a webDAV server
1822 # may be multistatus. If so, we need to parse the reponse body to
1823 # determine more precisely the reason of the failure (e.g. a lock)
1824 # and provide a more helpful error message.
1825 raise ValueError(f"Unable to delete resource {self}; status: {resp.status_code} {resp.reason}")
1827 def _copy_via_local(self, src: ResourcePath) -> None:
1828 """Replace the contents of this resource with the contents of a remote
1829 resource by using a local temporary file.
1831 Parameters
1832 ----------
1833 src : `HttpResourcePath`
1834 The source of the contents to copy to `self`.
1835 """
1836 with src.as_local() as local_uri:
1837 log.debug("Transfer from %s to %s via local file %s", src, self, local_uri)
1838 with open(local_uri.ospath, "rb") as f:
1839 self._put(data=f)
1841 def _copy_or_move(self, method: str, src: HttpResourcePath) -> None:
1842 """Send a COPY or MOVE webDAV request to copy or replace the contents
1843 of this resource with the contents of another resource located in the
1844 same server.
1846 Parameters
1847 ----------
1848 method : `str`
1849 The method to perform. Valid values are "COPY" or "MOVE" (in
1850 uppercase).
1851 src : `HttpResourcePath`
1852 The source of the contents to move to `self`.
1853 """
1854 headers = {"Destination": self.geturl()}
1855 resp = self._send_webdav_request(method, url=src.geturl(), headers=headers, session=self.data_session)
1856 if resp.status_code in (requests.codes.created, requests.codes.no_content):
1857 return
1859 if resp.status_code == requests.codes.multi_status:
1860 tree = eTree.fromstring(resp.content)
1861 status_element = tree.find("./{DAV:}response/{DAV:}status")
1862 status = status_element.text if status_element is not None else "unknown"
1863 error = tree.find("./{DAV:}response/{DAV:}error")
1864 raise ValueError(f"{method} returned multistatus reponse with status {status} and error {error}")
1865 else:
1866 raise ValueError(
1867 f"{method} operation from {src} to {self} failed, status: {resp.status_code} {resp.reason}"
1868 )
1870 def _copy(self, src: HttpResourcePath) -> None:
1871 """Send a COPY webDAV request to replace the contents of this resource
1872 (if any) with the contents of another resource located in the same
1873 server.
1875 Parameters
1876 ----------
1877 src : `HttpResourcePath`
1878 The source of the contents to copy to `self`.
1879 """
1880 # Neither dCache nor XrootD currently implement the COPY
1881 # webDAV method as documented in
1882 # http://www.webdav.org/specs/rfc4918.html#METHOD_COPY
1883 # (See issues DM-37603 and DM-37651 for details)
1884 #
1885 # For the time being, we use a temporary local file to
1886 # perform the copy client side.
1887 # TODO: when those 2 issues above are solved remove the 3 lines below.
1888 must_use_local = True
1889 if must_use_local:
1890 return self._copy_via_local(src)
1892 return self._copy_or_move("COPY", src)
1894 def _move(self, src: HttpResourcePath) -> None:
1895 """Send a MOVE webDAV request to replace the contents of this resource
1896 with the contents of another resource located in the same server.
1898 Parameters
1899 ----------
1900 src : `HttpResourcePath`
1901 The source of the contents to move to `self`.
1902 """
1903 return self._copy_or_move("MOVE", src)
1905 def _post(self, data: str | None = None, headers: dict[str, str] | None = None) -> requests.Response:
1906 """Perform an HTTP POST request and returns the received response.
1908 Parameters
1909 ----------
1910 body : `bytes`
1911 The contents of the request body.
1912 """
1913 resp = self.metadata_session.request(
1914 "POST",
1915 self.geturl(),
1916 data=data,
1917 headers=headers,
1918 stream=False,
1919 timeout=self._config.timeout,
1920 allow_redirects=True,
1921 )
1922 if resp.status_code == requests.codes.ok:
1923 return resp
1925 raise ValueError(f"POST request for {self} failed, status: {resp.status_code} {resp.reason}")
1927 def _put(self, data: BinaryIO | bytes) -> None:
1928 """Perform an HTTP PUT request and handle redirection.
1930 Parameters
1931 ----------
1932 data : `Union[BinaryIO, bytes]`
1933 The data to be included in the body of the PUT request.
1934 """
1935 # Retrieve the final URL for this upload by sending a PUT request with
1936 # no content. Follow a single server redirection to retrieve the
1937 # final URL.
1938 headers = {"Content-Length": "0"}
1940 # If we are explicitly configured for or if we know the remote server
1941 # is dCache, send an "Expect" header to signal the server that this
1942 # client knows how to handle redirection in PUT requests.
1943 #
1944 # The goal is that the contents of the file we want to upload is sent
1945 # directly to the dCache pool without transiting through the dCache
1946 # webDAV door. Otherwise, the uploaded data would transit twice over
1947 # the network: first from this client to the dCache webDAV door and
1948 # second from webDAV door to the target pool (i.e. the dCache file
1949 # server which will ultimately store the data we will upload).
1950 #
1951 # Systematically uploading data via dCache webDAV door could add
1952 # unnecessary load to the door which we can avoid by instead uploading
1953 # directly to the dCache pool.
1954 #
1955 # For further details see section "Redirection on upload":
1956 #
1957 # https://www.dcache.org/manuals/UserGuide-9.2/webdav.shtml#redirection
1958 if self._config.send_expect_on_put or self.server == "dcache":
1959 headers["Expect"] = "100-continue"
1961 url = self.geturl()
1963 # Use the session as a context manager to ensure the underlying
1964 # connections are closed after finishing uploading the data.
1965 with self.data_session as session:
1966 # Send an empty PUT request to get redirected to the final
1967 # destination.
1968 log.debug("Sending empty PUT request to %s", url)
1969 with time_this(
1970 log,
1971 msg="PUT (no data) %s",
1972 args=(url,),
1973 mem_usage=self._config.collect_memory_usage,
1974 mem_unit=u.mebibyte,
1975 ):
1976 resp = session.request(
1977 "PUT",
1978 url,
1979 data=None,
1980 headers=headers,
1981 stream=False,
1982 timeout=self._config.timeout,
1983 allow_redirects=False,
1984 )
1985 if resp.is_redirect:
1986 url = resp.headers["Location"]
1988 # Upload the data to the final destination.
1989 log.debug("Uploading data to %s", url)
1991 # Ask the server to compute and record a checksum of the uploaded
1992 # file contents, for later integrity checks. Since we don't compute
1993 # the digest ourselves while uploading the data, we cannot control
1994 # after the request is complete that the data we uploaded is
1995 # identical to the data recorded by the server, but at least the
1996 # server has recorded a digest of the data it stored.
1997 #
1998 # See RFC-3230 for details and
1999 # https://www.iana.org/assignments/http-dig-alg/http-dig-alg.xhtml
2000 # for the list of supported digest algorithhms.
2001 # In addition, note that not all servers implement this RFC so
2002 # the checksum may not be computed by the server.
2003 put_headers: dict[str, str] | None = None
2004 if digest := self._config.digest_algorithm:
2005 put_headers = {"Want-Digest": digest}
2007 with time_this(
2008 log,
2009 msg="PUT %s",
2010 args=(url,),
2011 mem_usage=self._config.collect_memory_usage,
2012 mem_unit=u.mebibyte,
2013 ):
2014 resp = session.request(
2015 "PUT",
2016 url,
2017 data=data,
2018 headers=put_headers,
2019 stream=False,
2020 timeout=self._config.timeout,
2021 allow_redirects=False,
2022 )
2023 if resp.status_code in (
2024 requests.codes.ok,
2025 requests.codes.created,
2026 requests.codes.no_content,
2027 ):
2028 return
2029 else:
2030 raise ValueError(f"Can not write file {self}, status: {resp.status_code} {resp.reason}")
2032 @contextlib.contextmanager
2033 def _openImpl(
2034 self,
2035 mode: str = "r",
2036 *,
2037 encoding: str | None = None,
2038 ) -> Iterator[ResourceHandleProtocol]:
2039 resp = self._head()
2040 accepts_range = resp.status_code == requests.codes.ok and resp.headers.get("Accept-Ranges") == "bytes"
2041 handle: ResourceHandleProtocol
2042 if mode in ("rb", "r") and accepts_range:
2043 handle = HttpReadResourceHandle(mode, log, self, timeout=self._config.timeout)
2044 if mode == "r":
2045 # cast because the protocol is compatible, but does not have
2046 # BytesIO in the inheritance tree
2047 yield io.TextIOWrapper(cast(Any, handle), encoding=encoding)
2048 else:
2049 yield handle
2050 else:
2051 with super()._openImpl(mode, encoding=encoding) as http_handle:
2052 yield http_handle
2054 def _copy_extra_attributes(self, original_uri: ResourcePath) -> None:
2055 assert isinstance(original_uri, HttpResourcePath)
2056 self._extra_headers = original_uri._extra_headers
2059def _dump_response(resp: requests.Response) -> None:
2060 """Log the contents of a HTTP or webDAV request and its response.
2062 Parameters
2063 ----------
2064 resp : `requests.Response`
2065 The response to log.
2067 Notes
2068 -----
2069 Intended for development purposes only.
2070 """
2071 log.debug("-----------------------------------------------")
2072 log.debug("Request")
2073 log.debug(" method=%s", resp.request.method)
2074 log.debug(" URL=%s", resp.request.url)
2075 log.debug(" headers=%s", resp.request.headers)
2076 if resp.request.method == "PUT":
2077 log.debug(" body=<data>")
2078 elif resp.request.body is None:
2079 log.debug(" body=<empty>")
2080 else:
2081 log.debug(" body=%r", resp.request.body[:120])
2083 log.debug("Response:")
2084 log.debug(" status_code=%d", resp.status_code)
2085 log.debug(" headers=%s", resp.headers)
2086 if not resp.content:
2087 log.debug(" body=<empty>")
2088 elif "Content-Type" in resp.headers and resp.headers["Content-Type"] == "text/plain":
2089 log.debug(" body=%r", resp.content)
2090 else:
2091 log.debug(" body=%r", resp.content[:80])
2094def _is_protected(filepath: str) -> bool:
2095 """Return true if the permissions of file at filepath only allow for access
2096 by its owner.
2098 Parameters
2099 ----------
2100 filepath : `str`
2101 Path of a local file.
2102 """
2103 if not os.path.isfile(filepath):
2104 return False
2105 mode = stat.S_IMODE(os.stat(filepath).st_mode)
2106 owner_accessible = bool(mode & stat.S_IRWXU)
2107 group_accessible = bool(mode & stat.S_IRWXG)
2108 other_accessible = bool(mode & stat.S_IRWXO)
2109 return owner_accessible and not group_accessible and not other_accessible
2112def _parse_propfind_response_body(body: str) -> list[DavProperty]:
2113 """Parse the XML-encoded contents of the response body to a webDAV PROPFIND
2114 request.
2116 Parameters
2117 ----------
2118 body : `str`
2119 XML-encoded response body to a PROPFIND request
2121 Returns
2122 -------
2123 responses : `List[DavProperty]`
2125 Notes
2126 -----
2127 Is is expected that there is at least one reponse in `body`, otherwise
2128 this function raises.
2129 """
2130 # A response body to a PROPFIND request is of the form (indented for
2131 # readability):
2132 #
2133 # <?xml version="1.0" encoding="UTF-8"?>
2134 # <D:multistatus xmlns:D="DAV:">
2135 # <D:response>
2136 # <D:href>path/to/resource</D:href>
2137 # <D:propstat>
2138 # <D:prop>
2139 # <D:resourcetype>
2140 # <D:collection xmlns:D="DAV:"/>
2141 # </D:resourcetype>
2142 # <D:getlastmodified>
2143 # Fri, 27 Jan 2 023 13:59:01 GMT
2144 # </D:getlastmodified>
2145 # <D:getcontentlength>
2146 # 12345
2147 # </D:getcontentlength>
2148 # </D:prop>
2149 # <D:status>
2150 # HTTP/1.1 200 OK
2151 # </D:status>
2152 # </D:propstat>
2153 # </D:response>
2154 # <D:response>
2155 # ...
2156 # </D:response>
2157 # <D:response>
2158 # ...
2159 # </D:response>
2160 # </D:multistatus>
2162 # Scan all the 'response' elements and extract the relevant properties
2163 responses = []
2164 multistatus = eTree.fromstring(body.strip())
2165 for response in multistatus.findall("./{DAV:}response"):
2166 responses.append(DavProperty(response))
2168 if responses:
2169 return responses
2170 else:
2171 # Could not parse the body
2172 raise ValueError(f"Unable to parse response for PROPFIND request: {body}")
2175class DavProperty:
2176 """Helper class to encapsulate select live DAV properties of a single
2177 resource, as retrieved via a PROPFIND request.
2179 Parameters
2180 ----------
2181 response : `eTree.Element` or `None`
2182 The XML response defining the DAV property.
2183 """
2185 # Regular expression to compare against the 'status' element of a
2186 # PROPFIND response's 'propstat' element.
2187 _status_ok_rex = re.compile(r"^HTTP/.* 200 .*$", re.IGNORECASE)
2189 def __init__(self, response: eTree.Element | None):
2190 self._href: str = ""
2191 self._displayname: str = ""
2192 self._collection: bool = False
2193 self._getlastmodified: str = ""
2194 self._getcontentlength: int = -1
2196 if response is not None:
2197 self._parse(response)
2199 def _parse(self, response: eTree.Element) -> None:
2200 # Extract 'href'.
2201 if (element := response.find("./{DAV:}href")) is not None:
2202 # We need to use "str(element.text)"" instead of "element.text" to
2203 # keep mypy happy.
2204 self._href = str(element.text).strip()
2205 else:
2206 raise ValueError(
2207 "Property 'href' expected but not found in PROPFIND response: "
2208 f"{eTree.tostring(response, encoding='unicode')}"
2209 )
2211 for propstat in response.findall("./{DAV:}propstat"):
2212 # Only extract properties of interest with status OK.
2213 status = propstat.find("./{DAV:}status")
2214 if status is None or not self._status_ok_rex.match(str(status.text)):
2215 continue
2217 for prop in propstat.findall("./{DAV:}prop"):
2218 # Parse "collection".
2219 if (element := prop.find("./{DAV:}resourcetype/{DAV:}collection")) is not None:
2220 self._collection = True
2222 # Parse "getlastmodified".
2223 if (element := prop.find("./{DAV:}getlastmodified")) is not None:
2224 self._getlastmodified = str(element.text)
2226 # Parse "getcontentlength".
2227 if (element := prop.find("./{DAV:}getcontentlength")) is not None:
2228 self._getcontentlength = int(str(element.text))
2230 # Parse "displayname".
2231 if (element := prop.find("./{DAV:}displayname")) is not None:
2232 self._displayname = str(element.text)
2234 # Some webDAV servers don't include the 'displayname' property in the
2235 # response so try to infer it from the value of the 'href' property.
2236 # Depending on the server the href value may end with '/'.
2237 if not self._displayname:
2238 self._displayname = os.path.basename(self._href.rstrip("/"))
2240 # Force a size of 0 for collections.
2241 if self._collection:
2242 self._getcontentlength = 0
2244 @property
2245 def exists(self) -> bool:
2246 # It is either a directory or a file with length of at least zero
2247 return self._collection or self._getcontentlength >= 0
2249 @property
2250 def is_directory(self) -> bool:
2251 return self._collection
2253 @property
2254 def is_file(self) -> bool:
2255 return not self._collection
2257 @property
2258 def size(self) -> int:
2259 return self._getcontentlength
2261 @property
2262 def last_modified(self) -> datetime.datetime | None:
2263 if not self._getlastmodified:
2264 return None
2266 last_modified = parsedate_to_datetime(self._getlastmodified)
2267 if last_modified.tzinfo is None:
2268 last_modified = last_modified.replace(tzinfo=datetime.UTC)
2269 else:
2270 last_modified = last_modified.astimezone(datetime.UTC)
2271 return last_modified
2273 @property
2274 def checksums(self) -> dict[str, str]:
2275 return {}
2277 @property
2278 def name(self) -> str:
2279 return self._displayname
2281 @property
2282 def href(self) -> str:
2283 return self._href
2286class _SessionWrapper(contextlib.AbstractContextManager):
2287 """Wraps a `requests.Session` to allow header values to be injected with
2288 all requests.
2290 Notes
2291 -----
2292 `requests.Session` already has a feature for setting headers globally, but
2293 our session objects are global and authorization headers can vary for each
2294 HttpResourcePath instance.
2295 """
2297 def __init__(self, session: requests.Session, *, extra_headers: dict[str, str] | None) -> None:
2298 self._session = session
2299 self._extra_headers = extra_headers
2301 def __enter__(self) -> _SessionWrapper:
2302 self._session.__enter__()
2303 return self
2305 def __exit__(
2306 self,
2307 exc_type: Any,
2308 exc_value: Any,
2309 traceback: Any,
2310 ) -> None:
2311 return self._session.__exit__(exc_type, exc_value, traceback)
2313 def get(
2314 self,
2315 url: str,
2316 *,
2317 timeout: tuple[float, float],
2318 allow_redirects: bool = True,
2319 stream: bool,
2320 headers: dict[str, str] | None = None,
2321 ) -> requests.Response:
2322 return self._session.get(
2323 url,
2324 timeout=timeout,
2325 allow_redirects=allow_redirects,
2326 stream=stream,
2327 headers=self._augment_headers(headers),
2328 )
2330 def head(
2331 self,
2332 url: str,
2333 *,
2334 timeout: tuple[float, float],
2335 allow_redirects: bool,
2336 stream: bool,
2337 headers: dict[str, str] | None = None,
2338 ) -> requests.Response:
2339 return self._session.head(
2340 url,
2341 timeout=timeout,
2342 allow_redirects=allow_redirects,
2343 stream=stream,
2344 headers=self._augment_headers(headers),
2345 )
2347 def request(
2348 self,
2349 method: str,
2350 url: str,
2351 *,
2352 data: str | bytes | BinaryIO | None,
2353 timeout: tuple[float, float],
2354 allow_redirects: bool,
2355 stream: bool,
2356 headers: dict[str, str] | None = None,
2357 ) -> requests.Response:
2358 return self._session.request(
2359 method,
2360 url,
2361 data=data,
2362 timeout=timeout,
2363 allow_redirects=allow_redirects,
2364 stream=stream,
2365 headers=self._augment_headers(headers),
2366 )
2368 def _augment_headers(self, headers: dict[str, str] | None) -> dict[str, str]:
2369 if headers is None:
2370 headers = {}
2372 if self._extra_headers is not None:
2373 headers = headers | self._extra_headers
2375 return headers