Coverage for python / lsst / resources / davutils.py: 24%

1098 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-26 08:44 +0000

1# This file is part of lsst-resources. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14import base64 

15import enum 

16import io 

17import json 

18import logging 

19import os 

20import posixpath 

21import random 

22import re 

23import stat 

24import threading 

25import time 

26import uuid 

27import xml.etree.ElementTree as eTree 

28from datetime import UTC, datetime 

29from http import HTTPStatus 

30from typing import Any, BinaryIO 

31 

32try: 

33 from typing import override # Python 3.12+ 

34except ImportError: 

35 from typing_extensions import override # Python 3.11 

36 

37from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse 

38 

39try: 

40 import fsspec 

41 from fsspec.spec import AbstractFileSystem 

42except ImportError: 

43 fsspec = None 

44 AbstractFileSystem = type 

45 

46import yaml 

47from astropy import units as u 

48from urllib3 import PoolManager, make_headers 

49from urllib3.response import HTTPResponse 

50from urllib3.util import Retry, Timeout, Url, parse_url 

51 

52from lsst.utils.logging import getLogger 

53from lsst.utils.timer import time_this 

54 

55# Use the same logger than `dav.py`. 

56log = getLogger(f"""{__name__.replace(".davutils", ".dav")}""") 

57 

58 

59def normalize_path(path: str | None) -> str: 

60 """Normalize a path intended to be part of a URL. 

61 

62 A path of the form "///a/b/c///../d/e/" would be normalized as "/a/b/d/e". 

63 The returned path is always absolute, i.e. starts by "/" and never 

64 ends by "/" except when the path is exactly "/" and does not contain 

65 "." nor "..". It does not contain consecutive "/" either. 

66 

67 Parameters 

68 ---------- 

69 path : `str`, optional 

70 Path to normalize (e.g., '/path/to/..///normalize/'). 

71 

72 Returns 

73 ------- 

74 url : `str` 

75 Normalized URL (e.g., '/path/normalize'). 

76 """ 

77 return "/" if not path else "/" + posixpath.normpath(path).lstrip("/") 

78 

79 

80def normalize_url(url: str, preserve_scheme: bool = False, preserve_path: bool = True) -> str: 

81 """Normalize a URL so that scheme be 'http' or 'https' and the URL path 

82 is normalized. 

83 

84 Parameters 

85 ---------- 

86 url : `str` 

87 URL to normalize (e.g., 'davs://example.org:1234///path/to//../dir/'). 

88 preserve_scheme : `bool` 

89 If True the scheme of `url` will be preserved. Otherwise the scheme 

90 of the returned normalized URL will be 'http' or 'https'. 

91 preserve_path : `bool` 

92 If True, the path of `url` will be preserved in the returned 

93 normalized URL, otherwise, the returned URL will have '/' as path. 

94 

95 Returns 

96 ------- 

97 url : `str` 

98 Normalized URL (e.g. 'https://example.org:1234/path/to/dir'). 

99 """ 

100 parsed = parse_url(url) 

101 if parsed.scheme is None: 

102 scheme = "http" 

103 else: 

104 scheme = parsed.scheme if preserve_scheme else parsed.scheme.replace("dav", "http") 

105 path = normalize_path(parsed.path) if preserve_path else "/" 

106 return Url(scheme=scheme, host=parsed.host, port=parsed.port, path=path).url 

107 

108 

109def redact_url(url: str) -> str: 

110 """Return a modified `url` with authorization query redacted. 

111 

112 The goal is that this method should be used for logging URLs to avoid 

113 leaking authorization tokens. 

114 

115 Parameters 

116 ---------- 

117 url : `str` 

118 URL to redact. 

119 

120 Returns 

121 ------- 

122 redacted_url : `str` 

123 For instance, when called with an URL like: 

124 

125 https://host.example.org:1234/a/b/c/file.data?key1=value1&key2=value2&authz=token#fragment 

126 

127 the returned value would be: 

128 

129 https://host.example.org:1234/a/b/c/file.data?key1=value1&key2=value2&authz=....#fragment 

130 """ 

131 parsed_url = urlparse(url) 

132 redacted_query: list[tuple[str, str]] = [] 

133 for pair in parse_qsl(parsed_url.query): 

134 redacted_query.append((pair[0], "...." if pair[0] == "authz" else pair[1])) 

135 

136 redacted_url = parsed_url._replace(query=urlencode(redacted_query)) 

137 return urlunparse(redacted_url) 

138 

139 

140class DavConfig: 

141 """Configurable settings a webDAV client must use when interacting with a 

142 particular storage endpoint. 

143 

144 Parameters 

145 ---------- 

146 config : `dict[str, str]` 

147 Dictionary of configurable settings for the webdav endpoint which 

148 base URL is `config["base_url"]`. 

149 

150 For instance, if `config["base_url"]` is 

151 

152 "davs://webdav.example.org:1234/" 

153 

154 any object of class `DavResourcePath` like 

155 

156 "davs://webdav.example.org:1234/path/to/any/file" 

157 

158 will use the settings in this configuration to configure its client. 

159 """ 

160 

161 # Timeout in seconds to establish a network connection with the remote 

162 # server. 

163 DEFAULT_TIMEOUT_CONNECT: float = 10.0 

164 

165 # Timeout in seconds to read the response to a request sent to a server. 

166 # This is total time for reading both the headers and the response body. 

167 # It must be large enough to allow for upload and download of files 

168 # of typical size the webdav client supports. 

169 DEFAULT_TIMEOUT_READ: float = 300.0 

170 

171 # Maximum number of network connections to persist against a single 

172 # "host:port" pair. If this endpoint client needs to issue more 

173 # simultaneous requests than this number, additional network connections 

174 # will be created but won't be persisted after use. 

175 DEFAULT_PERSISTENT_CONNECTIONS_PER_HOST: int = 20 

176 

177 # Size of the buffer (in mebibytes, i.e. 1024*1024 bytes) the webdav 

178 # client of this endpoint will use when sending requests and receiving 

179 # responses. 

180 DEFAULT_BUFFER_SIZE: int = 5 

181 

182 # Size of the block (in mebibytes, i.e. 1024*1024 bytes) the webdav 

183 # client of this endpoint will use for making partial reads. Each partial 

184 # read will request at least this number of bytes, unless the total size 

185 # of the file is lower than this value. 

186 DEFAULT_BLOCK_SIZE: int = 1 

187 

188 # Number of times to retry requests before failing. Retry happens only 

189 # under certain conditions. 

190 DEFAULT_RETRIES: int = 4 

191 

192 # Minimal and maximal retry backoff (in seconds) for the client to compute 

193 # the wait time before retrying a request. 

194 # A value in this interval is randomly selected as the backoff factor 

195 # every time a request is retried. 

196 DEFAULT_RETRY_BACKOFF_MIN: float = 1.0 

197 DEFAULT_RETRY_BACKOFF_MAX: float = 3.0 

198 

199 # Path to a directory or certificate bundle file where the certificates 

200 # of the trusted certificate authorities can be found. 

201 # Those certificates will be used by the client of the webdav endpoint 

202 # to verify the server's host certificate. 

203 # If None, the certificates trusted by the system are used. 

204 DEFAULT_TRUSTED_AUTHORITIES: str | None = None 

205 

206 # User name and password for the client to authenticate to the server. 

207 # If specified, HTTP basic authentication is used on all requests. 

208 DEFAULT_USER_NAME: str | None = None 

209 DEFAULT_USER_PASSWORD: str | None = None 

210 

211 # Path to the client certificate and associated private key the webdav 

212 # client must present to the server for authentication purposes. 

213 # If None, no client certificate is presented. 

214 DEFAULT_USER_CERT: str | None = None 

215 DEFAULT_USER_KEY: str | None = None 

216 

217 # Token the webdav client must sent to the server for authentication 

218 # purposes. The token may be the value of the token itself or the path 

219 # to a file where the token can be found. 

220 DEFAULT_TOKEN: str | None = None 

221 

222 # If this option is set to True, the webdav client attempts to reuse 

223 # the network connection to the server as long as possible. Note that 

224 # the server can unitaleraly decide to close the connection. 

225 # If disabled, the connection is closed after each request. 

226 DEFAULT_REUSE_CONNECTION: bool = True 

227 

228 # Default checksum algorithm to request the server to compute on every 

229 # file upload. Not al servers support this. 

230 # See RFC 3230 for details. 

231 DEFAULT_REQUEST_CHECKSUM: str | None = None 

232 

233 # If this option is set to True, the webdav client can return objects 

234 # compliant to the fsspec specification. 

235 # See: https://filesystem-spec.readthedocs.io 

236 DEFAULT_ENABLE_FSSPEC: bool = True 

237 

238 # If this option is set to True, memory usage is computed and reported 

239 # when executing in debug mode. Computing memory usage is costly, so only 

240 # set this when debugging. 

241 DEFAULT_COLLECT_MEMORY_USAGE: bool = False 

242 

243 # Accepted checksum algorithms. Must be lowercase. 

244 ACCEPTED_CHECKSUMS: list[str] = ["adler32", "md5", "sha-256", "sha-512"] 

245 

246 def __init__(self, config: dict | None = None) -> None: 

247 if config is None: 247 ↛ 250line 247 didn't jump to line 250 because the condition on line 247 was always true

248 config = {} 

249 

250 if (base_url := expand_vars(config.get("base_url"))) is None: 250 ↛ 253line 250 didn't jump to line 253 because the condition on line 250 was always true

251 self._base_url = "_default_" 

252 else: 

253 self._base_url = normalize_url(base_url, preserve_path=False) 

254 

255 self._timeout_connect: float = float(config.get("timeout_connect", DavConfig.DEFAULT_TIMEOUT_CONNECT)) 

256 self._timeout_read: float = float(config.get("timeout_read", DavConfig.DEFAULT_TIMEOUT_READ)) 

257 self._persistent_connections_per_host: int = int( 

258 config.get( 

259 "persistent_connections_per_host", 

260 DavConfig.DEFAULT_PERSISTENT_CONNECTIONS_PER_HOST, 

261 ) 

262 ) 

263 self._buffer_size: int = 1_048_576 * int(config.get("buffer_size", DavConfig.DEFAULT_BUFFER_SIZE)) 

264 self._block_size: int = 1_048_576 * int(config.get("block_size", DavConfig.DEFAULT_BLOCK_SIZE)) 

265 self._retries: int = int(config.get("retries", DavConfig.DEFAULT_RETRIES)) 

266 self._retry_backoff_min: float = float( 

267 config.get("retry_backoff_min", DavConfig.DEFAULT_RETRY_BACKOFF_MIN) 

268 ) 

269 self._retry_backoff_max: float = float( 

270 config.get("retry_backoff_max", DavConfig.DEFAULT_RETRY_BACKOFF_MAX) 

271 ) 

272 self._trusted_authorities: str | None = expand_vars( 

273 config.get("trusted_authorities", DavConfig.DEFAULT_TRUSTED_AUTHORITIES) 

274 ) 

275 self._user_name: str | None = expand_vars(config.get("user_name", DavConfig.DEFAULT_USER_NAME)) 

276 self._user_password: str | None = expand_vars( 

277 config.get("user_password", DavConfig.DEFAULT_USER_PASSWORD) 

278 ) 

279 self._user_cert: str | None = expand_vars(config.get("user_cert", DavConfig.DEFAULT_USER_CERT)) 

280 self._user_key: str | None = expand_vars(config.get("user_key", DavConfig.DEFAULT_USER_KEY)) 

281 self._token: str | None = expand_vars(config.get("token", DavConfig.DEFAULT_TOKEN)) 

282 self._reuse_connection: bool = config.get("reuse_connection", DavConfig.DEFAULT_REUSE_CONNECTION) 

283 self._enable_fsspec: bool = config.get("enable_fsspec", DavConfig.DEFAULT_ENABLE_FSSPEC) 

284 self._frontend_urls: list[str] = self._init_frontend_urls(config=config) 

285 self._collect_memory_usage: bool = config.get( 

286 "collect_memory_usage", DavConfig.DEFAULT_COLLECT_MEMORY_USAGE 

287 ) 

288 self._request_checksum: str | None = config.get( 

289 "request_checksum", DavConfig.DEFAULT_REQUEST_CHECKSUM 

290 ) 

291 if self._request_checksum is not None: 291 ↛ 292line 291 didn't jump to line 292 because the condition on line 291 was never true

292 self._request_checksum = self._request_checksum.lower() 

293 if self._request_checksum not in DavConfig.ACCEPTED_CHECKSUMS: 

294 raise ValueError( 

295 f"""Value for checksum algorithm {self._request_checksum} for storage endpoint """ 

296 f"""{self._base_url} is not among the accepted values: {DavConfig.ACCEPTED_CHECKSUMS}""" 

297 ) 

298 

299 def _init_frontend_urls(self, config: dict | None = None) -> list[str]: 

300 if config is None: 300 ↛ 301line 300 didn't jump to line 301 because the condition on line 300 was never true

301 return [] 

302 

303 # Initialize the URLs of the frontend servers, if present in 

304 # the configuration. 

305 frontend_urls: list[str] = [] 

306 for url in config.get("frontend_base_urls", []): 306 ↛ 308line 306 didn't jump to line 308 because the loop on line 306 never started

307 # Expand environment variables in this URL 

308 if (expanded_url := expand_vars(url)) is not None: 

309 frontend_urls.append(normalize_url(expanded_url, preserve_path=False)) 

310 

311 # Eliminate duplicate URLs. 

312 frontend_urls = list(set(frontend_urls)) 

313 

314 # Check that the scheme of this client's base URL is identical to 

315 # the scheme of the frontend server URLs. 

316 base_url_scheme = parse_url(self._base_url).scheme 

317 for url in frontend_urls: 317 ↛ 318line 317 didn't jump to line 318 because the loop on line 317 never started

318 if base_url_scheme != parse_url(url).scheme: 

319 raise ValueError( 

320 f"""inconsistent scheme in frontend URL {url} for endpoint """ 

321 f"""with base URL {self._base_url}""" 

322 ) 

323 

324 return frontend_urls 

325 

326 @property 

327 def base_url(self) -> str: 

328 return self._base_url 

329 

330 @property 

331 def timeout_connect(self) -> float: 

332 return self._timeout_connect 

333 

334 @property 

335 def timeout_read(self) -> float: 

336 return self._timeout_read 

337 

338 @property 

339 def persistent_connections_per_host(self) -> int: 

340 return self._persistent_connections_per_host 

341 

342 @property 

343 def buffer_size(self) -> int: 

344 return self._buffer_size 

345 

346 @property 

347 def block_size(self) -> int: 

348 return self._block_size 

349 

350 @property 

351 def retries(self) -> int: 

352 return self._retries 

353 

354 @property 

355 def retry_backoff_min(self) -> float: 

356 return self._retry_backoff_min 

357 

358 @property 

359 def retry_backoff_max(self) -> float: 

360 return self._retry_backoff_max 

361 

362 @property 

363 def trusted_authorities(self) -> str | None: 

364 return self._trusted_authorities 

365 

366 @property 

367 def token(self) -> str | None: 

368 return self._token 

369 

370 @property 

371 def reuse_connection(self) -> bool: 

372 return self._reuse_connection 

373 

374 @property 

375 def request_checksum(self) -> str | None: 

376 return self._request_checksum 

377 

378 @property 

379 def user_cert(self) -> str | None: 

380 return self._user_cert 

381 

382 @property 

383 def user_key(self) -> str | None: 

384 # If no user certificate was specified in the configuration, 

385 # ignore the private key, even if it was provided. 

386 if self._user_cert is None: 

387 return None 

388 

389 # If we have a user certificate but not a private key, assume the 

390 # private key is included in the same file as the user certificate. 

391 # That is typically the case when using a X.509 grid proxy as 

392 # client certificate. 

393 return self._user_cert if self._user_key is None else self._user_key 

394 

395 @property 

396 def user_name(self) -> str | None: 

397 return self._user_name 

398 

399 @property 

400 def user_password(self) -> str | None: 

401 return self._user_password 

402 

403 @property 

404 def enable_fsspec(self) -> bool: 

405 return self._enable_fsspec 

406 

407 @property 

408 def collect_memory_usage(self) -> bool: 

409 return self._collect_memory_usage 

410 

411 @property 

412 def frontend_urls(self) -> list[str]: 

413 return self._frontend_urls 

414 

415 

416class DavConfigPool: 

417 """Registry of configurable settings for all known webDAV endpoints. 

418 

419 Parameters 

420 ---------- 

421 filename : `list` [ `str` ] 

422 List of environment variables or file names to load the configuration 

423 from. The first file found in the list will be read and the 

424 configuration settings for all webDAV endpoints will be extracted 

425 from it. Other files will be ignored. 

426 

427 Each component of `filenames` can be an environment variable or 

428 the path of a file which itself can include an environment variable, 

429 e.g. '$HOME/path/to/config.yaml'. 

430 

431 The configuration file is a YAML file with the structure below: 

432 

433 - base_url: "davs://webdav1.example.org:1234/" 

434 persistent_connections_per_host: 10 

435 timeout_connect: 20.0 

436 timeout_read: 120.0 

437 retries: 3 

438 retry_backoff_min: 1.0 

439 retry_backoff_max: 3.0 

440 user_cert: "${X509_USER_PROXY}" 

441 user_key: "${X509_USER_PROXY}" 

442 token: "/path/to/bearer/token/file" 

443 trusted_authorities: "/etc/grid-security/certificates" 

444 buffer_size: 5 

445 enable_fsspec: false 

446 request_checksum: "md5" 

447 collect_memory_usage: false 

448 

449 - base_url: "davs://webdav2.example.org:1234/" 

450 user_name: "user" 

451 user_password: "password" 

452 persistent_connections_per_host: 5 

453 reuse_connection: false 

454 ... 

455 

456 All settings are optional. If no settings are found in the 

457 configuration file for a particular webDAV endpoint, sensible 

458 defaults will be used. 

459 

460 There is only a single instance of this class. This thead-safe 

461 singleton is intended to be initialized when the module is imported 

462 the first time. 

463 """ 

464 

465 _instance = None 

466 _lock = threading.Lock() 

467 

468 def __new__(cls, filename: str | None = None) -> DavConfigPool: 

469 if cls._instance is None: 469 ↛ 474line 469 didn't jump to line 474 because the condition on line 469 was always true

470 with cls._lock: 

471 if cls._instance is None: 471 ↛ 474line 471 didn't jump to line 474

472 cls._instance = super().__new__(cls) 

473 

474 return cls._instance 

475 

476 def __init__(self, filename: str | None = None) -> None: 

477 # Create a default configuration. This configuration is 

478 # used when a URL doest not match any of the endpoints in the 

479 # configuration. 

480 self._default_config: DavConfig = DavConfig() 

481 

482 # The key of this dictionary is the URL of the webDAV endpoint, 

483 # e.g. "davs://host.example.org:1234/" 

484 self._configs: dict[str, DavConfig] = {} 

485 

486 # Load the configuration from the file we have been provided with, 

487 # if any. 

488 if filename is None: 488 ↛ 489line 488 didn't jump to line 489 because the condition on line 488 was never true

489 return 

490 

491 # filename can be the name of an environment variable or a path. 

492 # A path can include environment variables 

493 # (e.g. "$HOME/path/to/config.yaml") or "~" 

494 # (e.g. "~/path/to/config.yaml") 

495 if (filename := os.getenv(filename)) is not None: 495 ↛ 497line 495 didn't jump to line 497 because the condition on line 495 was never true

496 # Expand environment variables and '~' in the file name, if any. 

497 filename = os.path.expandvars(filename) 

498 filename = os.path.expanduser(filename) 

499 with open(filename) as file: 

500 for config_item in yaml.safe_load(file): 

501 config = DavConfig(config_item) 

502 if config.base_url not in self._configs: 

503 self._configs[config.base_url] = config 

504 else: 

505 # We already have a configuration for the same 

506 # endpoint. That is likely a human error in 

507 # the configuration file. 

508 raise ValueError( 

509 f"""configuration file {filename} contains two configurations for """ 

510 f"""endpoint {config.base_url}""" 

511 ) 

512 

513 def get_config_for_url(self, url: str) -> DavConfig: 

514 """Return the configuration to use a webDAV client when interacting 

515 with the server which hosts the resource at `url`. 

516 

517 Parameters 

518 ---------- 

519 url : `str` 

520 URL for which to obtain a configuration. 

521 """ 

522 # Select the configuration for the endpoint of the provided URL. 

523 normalized_url: str = normalize_url(url, preserve_path=False) 

524 if (config := self._configs.get(normalized_url)) is not None: 

525 return config 

526 

527 # No config was found for the specified URL. Use the default. 

528 return self._default_config 

529 

530 def _destroy(self) -> None: 

531 """Destroy this class singleton instance. 

532 

533 Helper method to be used in tests to reset global configuration. 

534 """ 

535 with DavConfigPool._lock: 

536 DavConfigPool._instance = None 

537 

538 

539def make_retry(config: DavConfig) -> Retry: 

540 """Create a ``urllib3.util.Retry`` object from settings in `config`. 

541 

542 Parameters 

543 ---------- 

544 config : `DavConfig` 

545 Configurable settings for a webDAV storage endpoint. 

546 

547 Returns 

548 ------- 

549 retry : `urllib3.util.Retry` 

550 Retry object to he used when creating a ``urllib3.PoolManager``. 

551 """ 

552 backoff_min: float = config.retry_backoff_min 

553 backoff_max: float = config.retry_backoff_max 

554 retry = Retry( 

555 # Total number of retries to allow. Takes precedence over other 

556 # counts. 

557 total=2 * config.retries, 

558 # How many connection-related errors to retry on. 

559 connect=config.retries, 

560 # How many times to retry on read errors. 

561 read=config.retries, 

562 # Backoff factor to apply between attempts after the second try 

563 # (seconds). Compute a random jitter to prevent all the clients which 

564 # started at the same time (even on different hosts) to overwhelm the 

565 # server by sending requests at the same time. 

566 backoff_factor=backoff_min + (backoff_max - backoff_min) * random.random(), 

567 # How many times to retry on bad status codes. 

568 status=config.retries, 

569 # Set of uppercased HTTP method verbs that we should retry on. 

570 # We only automatically retry idempotent requests. 

571 allowed_methods=frozenset( 

572 [ 

573 "COPY", 

574 "DELETE", 

575 "GET", 

576 "HEAD", 

577 "MKCOL", 

578 "OPTIONS", 

579 "PROPFIND", 

580 "PUT", 

581 ] 

582 ), 

583 # HTTP status codes that we should force a retry on. 

584 status_forcelist=frozenset( 

585 [ 

586 HTTPStatus.TOO_MANY_REQUESTS, # 429 

587 HTTPStatus.INTERNAL_SERVER_ERROR, # 500 

588 HTTPStatus.BAD_GATEWAY, # 502 

589 HTTPStatus.SERVICE_UNAVAILABLE, # 503 

590 HTTPStatus.GATEWAY_TIMEOUT, # 504 

591 ] 

592 ), 

593 # Whether to respect "Retry-After" header on status codes defined 

594 # above. 

595 respect_retry_after_header=True, 

596 ) 

597 return retry 

598 

599 

600class DavClientPool: 

601 """Container of reusable webDAV clients, each one specifically configured 

602 to talk to a single storage endpoint. 

603 

604 Parameters 

605 ---------- 

606 config_pool : `DavConfigPool` 

607 Pool of all known webDAV client configurations. 

608 

609 Notes 

610 ----- 

611 There is a single instance of this class. This thead-safe singleton is 

612 intended to be initialized when the module is imported the first time. 

613 """ 

614 

615 _instance = None 

616 _lock = threading.Lock() 

617 

618 def __new__(cls, config_pool: DavConfigPool) -> DavClientPool: 

619 if cls._instance is None: 619 ↛ 624line 619 didn't jump to line 624 because the condition on line 619 was always true

620 with cls._lock: 

621 if cls._instance is None: 621 ↛ 624line 621 didn't jump to line 624

622 cls._instance = super().__new__(cls) 

623 

624 return cls._instance 

625 

626 def __init__(self, config_pool: DavConfigPool) -> None: 

627 self._config_pool: DavConfigPool = config_pool 

628 

629 # The key of this dictionnary is a path-stripped URL of the form 

630 # "davs://host.example.org:1234/". The value is a reusable 

631 # DavClient to interact with that endpoint. 

632 self._clients: dict[str, DavClient] = {} 

633 

634 def get_client_for_url(self, url: str) -> DavClient: 

635 """Return a client for interacting with the endpoint where `url` 

636 is hosted. 

637 

638 Parameters 

639 ---------- 

640 url : `str` 

641 URL for which to obtain a client. 

642 

643 Notes 

644 ----- 

645 The returned client is thread-safe. If a client for that endpoint 

646 already exists it is reused, otherwise a new client is created 

647 with the appropriate configuration for interacting with the storage 

648 endpoint. 

649 """ 

650 # If we already have a client for this endpoint reuse it. 

651 url = normalize_url(url, preserve_path=False) 

652 if (client := self._clients.get(url)) is not None: 

653 return client 

654 

655 # No client for this endpoint was found. Create a new one and save it 

656 # for serving subsequent requests. 

657 with DavClientPool._lock: 

658 # If another client was created in the meantime by another thread 

659 # reuse it. 

660 if (client := self._clients.get(url)) is not None: 

661 return client 

662 

663 config: DavConfig = self._config_pool.get_config_for_url(url) 

664 self._clients[url] = self._make_client(url, config) 

665 

666 return self._clients[url] 

667 

668 def _make_client(self, url: str, config: DavConfig) -> DavClient: 

669 """Make a webDAV client for interacting with the server at `url`.""" 

670 # Check the server implements webDAV protocol and retrieve its 

671 # identity so that we can build a client for that specific 

672 # server implementation. 

673 client = DavClient(url, config) 

674 server_details = client.get_server_details(url) 

675 server_id = server_details.get("Server", None) 

676 accepts_ranges: bool | str | None = server_details.get("Accept-Ranges", None) 

677 if accepts_ranges is not None: 

678 accepts_ranges = accepts_ranges == "bytes" 

679 

680 if server_id is None: 

681 # Create a generic webDAV client 

682 return DavClient(url, config, accepts_ranges) 

683 

684 if server_id.startswith("dCache/"): 

685 # Create a client for a dCache webDAV server 

686 return DavClientDCache(url, config, accepts_ranges) 

687 elif server_id.startswith("XrootD/"): 

688 # Create a client for a XrootD webDAV server 

689 return DavClientXrootD(url, config, accepts_ranges) 

690 else: 

691 # Return a generic webDAV client 

692 return DavClient(url, config, accepts_ranges) 

693 

694 def _destroy(self) -> None: 

695 """Destroy this class singleton instance. 

696 

697 Helper method to be used in tests to reset global configuration. 

698 """ 

699 with DavClientPool._lock: 

700 DavClientPool._instance = None 

701 

702 

703class DavFileSizeCache: 

704 """Helper class to cache file sizes of recently uploaded files. 

705 

706 Parameters 

707 ---------- 

708 default_timeout : `float`, optional 

709 Default validity period, in seconds, of the entries in this cache. 

710 The validity period for a specific entry can be specified when the 

711 entry is added to the cache (see `update_size` method). 

712 

713 Notes 

714 ----- 

715 There is a single instance of this class shared by several `DavClient` 

716 objects. This singleton is thread safe. 

717 

718 Caching file sizes helps preventing sending requests to the server for 

719 retrieving the size of recently uploaded files. This is in particular 

720 intended to efficiently serve `Butler` requests for the size of a file it 

721 just wrote to the datastore. 

722 """ 

723 

724 _instance = None 

725 _lock = threading.Lock() 

726 

727 def __new__(cls) -> DavFileSizeCache: 

728 if cls._instance is None: 

729 with cls._lock: 

730 if cls._instance is None: 

731 cls._instance = super().__new__(cls) 

732 

733 return cls._instance 

734 

735 def __init__(self, default_timeout: float = 60.0) -> None: 

736 # The key of the cache dictionnary is a URL of the form 

737 # 

738 # "https://host.example.org:1234/path/to/file". 

739 # 

740 # The value is a triplet (file_size, last_updated, timeout) where: 

741 # - 'file_size' is the size of the file in bytes, 

742 # - 'last_updated' is the time when this entry was added to the cache 

743 # or last updated, in seconds since epoch, 

744 # - 'timeout' is the validity period of this cache entry, in seconds, 

745 # understood from the moment the cache entry was created. 

746 with DavFileSizeCache._lock: 

747 if not hasattr(self, "_cache"): 

748 self._default_timeout: float = default_timeout 

749 self._cache: dict[str, tuple[int, float, float]] = {} 

750 

751 def invalidate(self, url: str) -> None: 

752 """Invalidate the cache entry for `url`, if any. 

753 

754 Parameters 

755 ---------- 

756 url : `str` 

757 URL of the file to invalidate which cache entry must be 

758 invalidated. 

759 """ 

760 with DavFileSizeCache._lock: 

761 self._cache.pop(url, None) 

762 

763 def update_size(self, url: str, size: int | None, timeout: float | None = None) -> None: 

764 """Update the cache with an entry for `url` which has a size of `size` 

765 bytes. This entry is considered valid for a period of `timeout` 

766 seconds from now. 

767 

768 Parameters 

769 ---------- 

770 url : `str` 

771 URL of the file the size to be cached. 

772 size : `size` or `None`, optional 

773 Size in bytes of the file at `url`. If this value is `None`, the 

774 cache is not modified. 

775 timeout : `float` or `None`, optional 

776 The validity period, in seconds, this size is to be considered 

777 valid. If not specified, the default value specified when this 

778 object was created will be used for this cache entry. 

779 """ 

780 if size is None: 

781 return 

782 

783 timeout = self._default_timeout if timeout is None else timeout 

784 with DavFileSizeCache._lock: 

785 self._cache[url] = (size, time.time(), timeout) 

786 

787 def get_size(self, url: str) -> int | None: 

788 """Retrieve the cached valued of the size of file at `url`. 

789 

790 Parameters 

791 ---------- 

792 url : `str` 

793 URL of the file to retrieve the size for. 

794 

795 Returns 

796 ------- 

797 `size`: `int` or `None` 

798 The cached value of the size of file at `url` if any value was 

799 found in the cache, `None` otherwise. 

800 `None` is also returned if there is a cached value but its 

801 validity period has expired. In this case, the entry associated to 

802 `url` is removed from the cache. 

803 """ 

804 with DavFileSizeCache._lock: 

805 if (entry := self._cache.get(url, None)) is None: 

806 # There is no entry in the cache for this URL 

807 return None 

808 

809 # There is an entry in the cache for this URL. Check that 

810 # its validity period has not yet expired. 

811 size, last_updated, timeout = entry 

812 if time.time() <= last_updated + timeout: 

813 # This entry is stil valid 

814 return size 

815 else: 

816 # This entry is no longer valid. Remove it from the cache. 

817 self._cache.pop(url) 

818 return None 

819 

820 

821def unexpected_status_error(method: str, url: str, resp: HTTPResponse) -> Exception: 

822 """Raise an exception from `resp`. 

823 

824 Parameters 

825 ---------- 

826 method : `str` 

827 The method name triggering the error. 

828 url : `str` 

829 The URL that cause the error. 

830 resp : `resp` 

831 The error response. 

832 """ 

833 message = f"Unexpected response to HTTP request {method} {redact_url(url)}: {resp.status} {resp.reason}" 

834 body = resp.data.decode() 

835 if len(body) > 0: 

836 message += f" [response body: {body}]" 

837 

838 return ValueError(message) 

839 

840 

841class DavClient: 

842 """WebDAV client, configured to talk to a single storage endpoint. 

843 

844 Instances of this class are thread-safe. 

845 

846 Parameters 

847 ---------- 

848 url : `str` 

849 Root URL of the storage endpoint (e.g. 

850 "https://host.example.org:1234/"). 

851 config : `DavConfig` 

852 Configuration to initialize this client. 

853 accepts_ranges : `bool` | `None` 

854 Indicate whether the remote server accepts the ``Range`` header in GET 

855 requests. 

856 """ 

857 

858 def __init__(self, url: str, config: DavConfig, accepts_ranges: bool | None = None) -> None: 

859 # Lock to protect this client fields from concurrent modification. 

860 self._lock = threading.Lock() 

861 

862 # Base URL of the server this client will interact with. 

863 # It is of the form: "davs://host.example.org:1234/" 

864 self._base_url: str = url 

865 

866 # Configuration settings for the storage endpoint this client 

867 # will interact with. 

868 self._config: DavConfig = config 

869 

870 # Make the authorizer for this client's requests. 

871 self._authorizer: Authorizer | None = self._make_authorizer(config=self._config) 

872 

873 # Make the pool manager for this client to use for sending 

874 # requests to the server. 

875 self._pool_manager: PoolManager = self._make_pool_manager(config=self._config) 

876 

877 # Parser of PROPFIND responses. 

878 self._propfind_parser: DavPropfindParser = DavPropfindParser() 

879 

880 # Does the remote server accept a "Range" header in GET requests? 

881 # This field is lazy initialized. 

882 self._accepts_ranges: bool | None = accepts_ranges 

883 

884 # Can this client use a COPY request to duplicate files within a 

885 # single webDAV server? 

886 # Subclasses can overwrite this setting according to the server 

887 # capabilities and compliance to webDAV RFC. 

888 self._can_duplicate: bool = True 

889 

890 # Cache to store sizes of files this client has recently uploaded 

891 # to the server. 

892 self._file_size_cache = DavFileSizeCache() 

893 

894 def _make_authorizer(self, config: DavConfig) -> Authorizer | None: 

895 # If a token was specified in the configuration settings for this 

896 # endpoint, prefer it as the authentication method, even if other 

897 # authentication settings were also specified. 

898 if config.token is not None: 

899 return TokenAuthorizer(token=config.token) 

900 elif config.user_name is not None and config.user_password is not None: 

901 return BasicAuthorizer(user_name=config.user_name, user_password=config.user_password) 

902 

903 return None 

904 

905 def _make_pool_manager(self, config: DavConfig) -> PoolManager: 

906 # Prepare the trusted authorities certificates 

907 ca_certs, ca_cert_dir = None, None 

908 if config.trusted_authorities is not None: 

909 if os.path.isdir(config.trusted_authorities): 

910 ca_cert_dir = config.trusted_authorities 

911 elif os.path.isfile(config.trusted_authorities): 

912 ca_certs = config.trusted_authorities 

913 else: 

914 raise FileNotFoundError( 

915 f"Trusted authorities file or directory {config.trusted_authorities} does not exist" 

916 ) 

917 

918 # If a token was specified for this endpoint don't use the 

919 # <user certificate, private key> pair, even if they were also 

920 # specified. 

921 user_cert, user_key = None, None 

922 if config.token is None: 

923 user_cert = config.user_cert 

924 user_key = config.user_key 

925 

926 # Pool manager for sending requests. Connections in this pool manager 

927 # are generally left open by the client but the front-end server may 

928 # choose to close them in some specific situations. For instance, 

929 # whe serving a PUT request, the front server may redirect to a 

930 # backend server and close the network connection making it 

931 # unsuable for subsequent requests. 

932 # 

933 # In addition, the client may also choose to explicitly close the 

934 # network connection after receiving a response. 

935 return PoolManager( 

936 # Number of connection pools to cache before discarding the least 

937 # recently used pool. Each connection pool manages network 

938 # connections to a single host, so this is basically the number 

939 # of "host:port" we persist network connections to. 

940 num_pools=200, 

941 # Number of connections to the same "host:port" to persist for 

942 # later reuse. More than 1 is useful in multithreaded situations. 

943 # If more than this number of network connections are needed at 

944 # a particular moment, they will be created and discarded after 

945 # use. 

946 maxsize=config.persistent_connections_per_host, 

947 # Retry configuration to use by default with requests sent to 

948 # host in the front end. 

949 retries=make_retry(config), 

950 # Socket timeout in seconds for each individual connection. 

951 timeout=Timeout( 

952 connect=config.timeout_connect, 

953 read=config.timeout_read, 

954 ), 

955 # Size in bytes of the buffer for reading/writing data from/to 

956 # the underlying socket. 

957 blocksize=config.buffer_size, 

958 # Client certificate and private key for esablishing TLS 

959 # connections. If None, no client certificate is sent to the 

960 # server. Only relevant for endpoints using secure HTTP protocol. 

961 cert_file=user_cert, 

962 key_file=user_key, 

963 # We require verification of the server certificate. 

964 cert_reqs="CERT_REQUIRED", 

965 # Directory where the certificates of the trusted certificate 

966 # authorities can be found. The contents of that directory 

967 # must be as expected by OpenSSL. 

968 ca_cert_dir=ca_cert_dir, 

969 # Path to a file of concatenated CA certificates in PEM format. 

970 ca_certs=ca_certs, 

971 ) 

972 

973 def get_server_details(self, url: str) -> dict[str, str]: 

974 """Retrieve the details of the server and check it advertises 

975 compliance to class 1 of webDAV protocol. 

976 

977 Parameters 

978 ---------- 

979 url : `str` 

980 URL to check. 

981 

982 Returns 

983 ------- 

984 details: `dic[str, str]` 

985 The keys of the returned dictionary can be "Server" and 

986 "Accept-Ranges". Any of those keys may not exist in the returned 

987 dictionary if the server did not include it in its response. 

988 

989 The values are the values of the corresponding 

990 headers found in the response to the OPTIONS request. 

991 Examples of values for the "Server" header are 'dCache/9.2.4' or 

992 'XrootD/v5.7.1'. 

993 """ 

994 # Check that the value "1" is part of the value of the "DAV" header in 

995 # the response to an 'OPTIONS' request. 

996 # 

997 # We don't rely on webDAV locks, so a server complying to class 1 is 

998 # enough for our purposes. All webDAV servers must advertise at least 

999 # compliance class "1". 

1000 # 

1001 # Compliance classes are documented in 

1002 # http://www.webdav.org/specs/rfc4918.html#dav.compliance.classes 

1003 # 

1004 # Examples of values for header DAV are: 

1005 # DAV: 1, 2 

1006 # DAV: 1, <http://apache.org/dav/propset/fs/1> 

1007 resp = self.options(url) 

1008 if "DAV" not in resp.headers: 

1009 raise ValueError(f"Server of {resp.geturl()} does not implement webDAV protocol") 

1010 

1011 if "1" not in resp.headers.get("DAV").replace(" ", "").split(","): 

1012 raise ValueError( 

1013 f"Server of {resp.geturl()} does not advertise required compliance to webDAV protocol class 1" 

1014 ) 

1015 

1016 # The value of 'Server' header is expected to be of the form 

1017 # 'dCache/9.2.4' or 'XrootD/v5.7.1'. Not all servers include such a 

1018 # header in their response to an OPTIONS request. 

1019 details: dict[str, str] = {} 

1020 for header in ("Server", "Accept-Ranges"): 

1021 value = resp.headers.get(header, None) 

1022 if value is not None: 

1023 details[header] = value 

1024 

1025 return details 

1026 

1027 def _get_response_url(self, resp: HTTPResponse, default_url: str) -> str: 

1028 """Return the URL that response `resp` was obtained from. 

1029 

1030 If `resp` contains no redirection history, return `default_url`. 

1031 """ 

1032 if resp.retries is None: 

1033 return default_url 

1034 

1035 if len(resp.retries.history) == 0: 

1036 return default_url 

1037 

1038 return str(resp.retries.history[-1].redirect_location) 

1039 

1040 def _rewrite_url_for_frontend(self, url: str) -> str: 

1041 """Return a URL to reach one of the frontend servers that serves 

1042 requests sent against `url`. 

1043 

1044 Parameters 

1045 ---------- 

1046 url : `str` 

1047 Target URL. 

1048 

1049 Returns 

1050 ------- 

1051 url: `str` 

1052 URL to reach one of this client's frontend servers. If `url` does 

1053 not target this client's frontend servers, the returned value 

1054 is `url` unmodified. 

1055 """ 

1056 # Do nothing if this URL does not match this client's base URL. This 

1057 # happens, for instance, when `url` is a redirection to a backend 

1058 # server, so we don't want to rewrite it. 

1059 # 

1060 # Also, don't rewrite the URL if we don't have frontends configured 

1061 # for this client. 

1062 if not self._config.frontend_urls or not url.startswith(self._base_url): 

1063 return url 

1064 

1065 # Randomly select one of the configured frontends and return a modified 

1066 # URL which uses the selected frontend instead of the original one. 

1067 return random.choice(self._config.frontend_urls) + url.removeprefix(self._base_url) 

1068 

1069 def _request( 

1070 self, 

1071 method: str, 

1072 url: str, 

1073 headers: dict[str, str] | None = None, 

1074 body: BinaryIO | bytes | str | None = None, 

1075 preload_content: bool = True, 

1076 redirect: bool = True, 

1077 pool_manager: PoolManager | None = None, 

1078 ) -> HTTPResponse: 

1079 """Send a generic HTTP request and return the response. 

1080 

1081 Parameters 

1082 ---------- 

1083 method : `str` 

1084 Request method, e.g. 'GET', 'PUT', 'PROPFIND'. 

1085 url : `str` 

1086 Target URL. 

1087 headers : `dict[str, str]`, optional 

1088 Headers to sent with the request. 

1089 body : `bytes` or `str` or `None`, optional 

1090 Request body. 

1091 preload_content : `bool`, optional 

1092 If True, the response body is downloaded and can be retrieved 

1093 via the returned response `.data` property. If False, the 

1094 caller needs to call `.read()` on the returned response object to 

1095 download the body, either entirely in one call or by chunks. 

1096 redirect : `bool`, optional 

1097 If True, automatically handle redirects. If False, the returned 

1098 response may contain a redirection to another location. 

1099 pool_manager : `PoolManager`, optional 

1100 Pool manager to use for sending this request. If not provided, 

1101 this client's pool manager is used. 

1102 

1103 Returns 

1104 ------- 

1105 resp: `HTTPResponse` 

1106 Response to the request as received from the server. 

1107 """ 

1108 # Retrieve the URL we must use to send this request to one of this 

1109 # client's configured frontend servers. 

1110 url = self._rewrite_url_for_frontend(url) 

1111 

1112 # If this client is configured not to reuse the network connection 

1113 # with the server, add a "Connection: close" header to this request. 

1114 # 

1115 # However, if the caller has explicitly specified a "Connection" 

1116 # header, whatever its value, don't modify it. 

1117 headers = {} if headers is None else dict(headers) 

1118 if "Connection" not in headers and not self._config.reuse_connection: 

1119 headers.update({"Connection": "close"}) 

1120 

1121 # If an authorizer (basic or token) is configured for this client, 

1122 # allow it to set the "Authorization" header to this outgoing request. 

1123 if self._authorizer is not None: 

1124 self._authorizer.set_authorization(headers) 

1125 

1126 if log.isEnabledFor(logging.DEBUG): 

1127 annotation = "" 

1128 if method == "GET" and "Range" in headers: 

1129 byte_range = headers.get("Range", "").removeprefix("bytes=") 

1130 annotation = f" (byte range: {byte_range})" 

1131 

1132 log.debug("sending request %s %s%s", method, redact_url(url), annotation) 

1133 

1134 if pool_manager is None: 

1135 pool_manager = self._pool_manager 

1136 

1137 with time_this( 

1138 log, 

1139 msg="%s %s", 

1140 args=(method, url), 

1141 mem_usage=self._config.collect_memory_usage, 

1142 mem_unit=u.mebibyte, 

1143 ): 

1144 return pool_manager.request( 

1145 method, 

1146 url, 

1147 body=body, 

1148 headers=headers, 

1149 preload_content=preload_content, 

1150 redirect=redirect, 

1151 ) 

1152 

1153 def _options( 

1154 self, 

1155 url: str, 

1156 headers: dict[str, str] | None = None, 

1157 pool_manager: PoolManager | None = None, 

1158 ) -> HTTPResponse: 

1159 """Send a HTTP OPTIONS request and return the response unmodified. 

1160 

1161 Parameters 

1162 ---------- 

1163 url : `str` 

1164 Target URL. 

1165 headers : `dict[str, str]`, optional 

1166 Headers to sent with the request. 

1167 pool_manager : `PoolManager`, optional 

1168 Pool manager to use to send this request. 

1169 

1170 Returns 

1171 ------- 

1172 resp: `HTTPResponse` 

1173 Response to the request as received from the server. 

1174 

1175 Notes 

1176 ----- 

1177 This method is intended for subclasses to override when needed. 

1178 """ 

1179 return self._request("OPTIONS", url=url, headers=headers, pool_manager=pool_manager) 

1180 

1181 def _copy( 

1182 self, 

1183 url: str, 

1184 headers: dict[str, str] | None = None, 

1185 preload_content: bool = True, 

1186 pool_manager: PoolManager | None = None, 

1187 ) -> HTTPResponse: 

1188 """Send a webDAV COPY request and return the response unmodified. 

1189 

1190 Parameters 

1191 ---------- 

1192 url : `str` 

1193 Target URL. 

1194 headers : `dict[str, str]`, optional 

1195 Headers to sent with the request. 

1196 pool_manager : `PoolManager`, optional 

1197 Pool manager to use to send this request. 

1198 

1199 Notes 

1200 ----- 

1201 This method is intended for subclasses to override when needed. 

1202 """ 

1203 return self._request( 

1204 "COPY", url=url, headers=headers, preload_content=preload_content, pool_manager=pool_manager 

1205 ) 

1206 

1207 def _delete( 

1208 self, 

1209 url: str, 

1210 headers: dict[str, str] | None = None, 

1211 pool_manager: PoolManager | None = None, 

1212 ) -> HTTPResponse: 

1213 """Send a HTTP DELETE request and return the response unmodified. 

1214 

1215 Parameters 

1216 ---------- 

1217 url : `str` 

1218 Target URL. 

1219 headers : `dict[str, str]`, optional 

1220 Headers to sent with the request. 

1221 pool_manager : `PoolManager`, optional 

1222 Pool manager to use to send this request. 

1223 

1224 Notes 

1225 ----- 

1226 This method is intended for subclasses to override when needed. 

1227 """ 

1228 return self._request("DELETE", url=url, headers=headers, pool_manager=pool_manager) 

1229 

1230 def _get( 

1231 self, 

1232 url: str, 

1233 headers: dict[str, str] | None = None, 

1234 preload_content: bool = True, 

1235 redirect: bool = True, 

1236 pool_manager: PoolManager | None = None, 

1237 ) -> HTTPResponse: 

1238 """Send a HTTP GET request and return the response unmodified. 

1239 

1240 Parameters 

1241 ---------- 

1242 url : `str` 

1243 Target URL. 

1244 headers : `dict[str, str]`, optional 

1245 Headers to sent with the request. 

1246 preload_content : `bool`, optional 

1247 If True, the response body is downloaded and can be retrieved 

1248 via the returned response `.data` property. If False, the 

1249 caller needs to call the `.read()` on the returned response 

1250 object to download the body. 

1251 redirect : `bool`, optional 

1252 If True, follow redirections. 

1253 pool_manager : `PoolManager`, optional 

1254 Pool manager to send the request through. 

1255 

1256 Returns 

1257 ------- 

1258 resp: `HTTPResponse` 

1259 Response to the GET request as received from the server. 

1260 

1261 Notes 

1262 ----- 

1263 This method is intended for subclasses to override when needed. 

1264 """ 

1265 return self._request( 

1266 "GET", 

1267 url=url, 

1268 headers=headers, 

1269 preload_content=preload_content, 

1270 redirect=redirect, 

1271 pool_manager=pool_manager, 

1272 ) 

1273 

1274 def _head( 

1275 self, 

1276 url: str, 

1277 headers: dict[str, str] | None = None, 

1278 pool_manager: PoolManager | None = None, 

1279 ) -> HTTPResponse: 

1280 """Send a HTTP HEAD request and return the response. 

1281 

1282 Parameters 

1283 ---------- 

1284 url : `str` 

1285 Target URL. 

1286 headers : `bool` 

1287 If the target URL is not found, raise an exception. Otherwise 

1288 just return the response. 

1289 pool_manager : `PoolManager`, optional 

1290 Pool manager to use to send this request. 

1291 

1292 Notes 

1293 ----- 

1294 This method is intended for subclasses to override when needed. 

1295 """ 

1296 return self._request("HEAD", url=url, headers=headers, pool_manager=pool_manager) 

1297 

1298 def _mkcol( 

1299 self, 

1300 url: str, 

1301 headers: dict[str, str] | None = None, 

1302 pool_manager: PoolManager | None = None, 

1303 ) -> HTTPResponse: 

1304 """Send a webDAV MKCOL request and return the response unmodified. 

1305 

1306 Parameters 

1307 ---------- 

1308 url : `str` 

1309 Target URL. 

1310 headers : `dict[str, str]`, optional 

1311 Headers to sent with the request. 

1312 pool_manager : `PoolManager`, optional 

1313 Pool manager to use to send this request. 

1314 

1315 Notes 

1316 ----- 

1317 This method is intended for subclasses to override when needed. 

1318 """ 

1319 return self._request("MKCOL", url=url, headers=headers, pool_manager=pool_manager) 

1320 

1321 def _move( 

1322 self, 

1323 url: str, 

1324 headers: dict[str, str] | None = None, 

1325 pool_manager: PoolManager | None = None, 

1326 ) -> HTTPResponse: 

1327 """Send a webDAV MOVE request and return the response unmodified. 

1328 

1329 Parameters 

1330 ---------- 

1331 url : `str` 

1332 Target URL. 

1333 headers : `dict[str, str]`, optional 

1334 Headers to sent with the request. 

1335 pool_manager : `PoolManager`, optional 

1336 Pool manager to use to send this request. 

1337 

1338 Notes 

1339 ----- 

1340 This method is intended for subclasses to override when needed. 

1341 """ 

1342 return self._request("MOVE", url=url, headers=headers, pool_manager=pool_manager) 

1343 

1344 def _propfind( 

1345 self, 

1346 url: str, 

1347 headers: dict[str, str] | None = None, 

1348 body: str = "", 

1349 pool_manager: PoolManager | None = None, 

1350 ) -> HTTPResponse: 

1351 """Send a webDAV PROPFIND request and return the response unmodified. 

1352 

1353 Parameters 

1354 ---------- 

1355 url : `str` 

1356 Target URL. 

1357 headers : `dict[str, str]`, optional 

1358 Headers to sent with the request. 

1359 body : `str`, optional 

1360 Request body. 

1361 pool_manager : `PoolManager`, optional 

1362 Pool manager to use to send this request. 

1363 

1364 Notes 

1365 ----- 

1366 This method is intended for subclasses to override when needed. 

1367 """ 

1368 return self._request("PROPFIND", url=url, headers=headers, body=body, pool_manager=pool_manager) 

1369 

1370 def _put( 

1371 self, 

1372 url: str, 

1373 headers: dict[str, str] | None = None, 

1374 body: BinaryIO | bytes = b"", 

1375 preload_content: bool = True, 

1376 redirect: bool = True, 

1377 pool_manager: PoolManager | None = None, 

1378 ) -> HTTPResponse: 

1379 """Send a HTTP PUT request and return the response unmodified. 

1380 

1381 Parameters 

1382 ---------- 

1383 url : `str` 

1384 Target URL. 

1385 headers : `dict[str, str]`, optional 

1386 Headers to sent with the request. 

1387 body : `BinaryIO` or `bytes`, optional 

1388 Request body. 

1389 preload_content : `bool`, optional 

1390 If True, the response body is downloaded and can be retrieved 

1391 via the returned response `.data` property. If False, the 

1392 caller needs to call the `.read()` on the returned response 

1393 object to download the body. 

1394 redirect : `bool`, optional 

1395 If True, follow redirections. 

1396 pool_manager : `PoolManager`, optional 

1397 Pool manager to send the request through. 

1398 

1399 Returns 

1400 ------- 

1401 resp: `HTTPResponse` 

1402 Response to the PUT request as received from the server. 

1403 

1404 Notes 

1405 ----- 

1406 This method is intended for subclasses to override when needed. 

1407 """ 

1408 return self._request( 

1409 "PUT", 

1410 url=url, 

1411 headers=headers, 

1412 body=body, 

1413 preload_content=preload_content, 

1414 redirect=redirect, 

1415 pool_manager=pool_manager, 

1416 ) 

1417 

1418 def head( 

1419 self, 

1420 url: str, 

1421 headers: dict[str, str] | None = None, 

1422 ) -> HTTPResponse: 

1423 """Send a HTTP HEAD request, process and return the response 

1424 only if successful. 

1425 

1426 Parameters 

1427 ---------- 

1428 url : `str` 

1429 Target URL. 

1430 headers : `bool` 

1431 If the target URL is not found, raise an exception. Otherwise 

1432 just return the response. 

1433 """ 

1434 headers = {} if headers is None else dict(headers) 

1435 resp = self._head(url=url, headers=headers) 

1436 match resp.status: 

1437 case HTTPStatus.OK: 

1438 return resp 

1439 case HTTPStatus.NOT_FOUND: 

1440 raise FileNotFoundError(f"No file found at {resp.geturl()}") 

1441 case _: 

1442 raise unexpected_status_error("HEAD", url, resp) 

1443 

1444 def get( 

1445 self, 

1446 url: str, 

1447 headers: dict[str, str] | None = None, 

1448 preload_content: bool = True, 

1449 redirect: bool = True, 

1450 ) -> tuple[str, HTTPResponse]: 

1451 """Send a HTTP GET request. 

1452 

1453 Parameters 

1454 ---------- 

1455 url : `str` 

1456 Target URL. 

1457 headers : `dict[str, str]`, optional 

1458 Headers to sent with the request. 

1459 preload_content : `bool`, optional 

1460 If True, the response body is downloaded and can be retrieved 

1461 via the returned response `.data` property. If False, the 

1462 caller needs to call the `.read()` on the returned response 

1463 object to download the body. 

1464 redirect : `bool`, optional 

1465 If True, follow redirections. 

1466 

1467 Returns 

1468 ------- 

1469 url: `str` 

1470 The URL we used to obtain this response. It may be different from 

1471 the URL passed as argument in case of redirection. 

1472 resp: `HTTPResponse` 

1473 Response to the GET request as received from the server. 

1474 """ 

1475 # Send the GET request to the frontend servers. 

1476 headers = {} if headers is None else dict(headers) 

1477 resp = self._get( 

1478 url, 

1479 headers=headers, 

1480 preload_content=preload_content, 

1481 redirect=redirect, 

1482 ) 

1483 match resp.status: 

1484 case HTTPStatus.OK | HTTPStatus.PARTIAL_CONTENT: 

1485 return self._get_response_url(resp, default_url=url), resp 

1486 case HTTPStatus.NOT_FOUND: 

1487 raise FileNotFoundError(f"No file found at {resp.geturl()}") 

1488 case status if status in resp.REDIRECT_STATUSES and not redirect: 

1489 # This response is a redirection but we are asked not to 

1490 # follow redirections, so return this response as is. 

1491 return self._get_response_url(resp, default_url=url), resp 

1492 case _: 

1493 raise unexpected_status_error("GET", url, resp) 

1494 

1495 def options( 

1496 self, 

1497 url: str, 

1498 headers: dict[str, str] | None = None, 

1499 ) -> HTTPResponse: 

1500 """Send a HTTP OPTIONS request and return the response on success. 

1501 

1502 Parameters 

1503 ---------- 

1504 url : `str` 

1505 Target URL. 

1506 headers : `dict` [`str`, `str`], optional 

1507 Headers to sent with the request. 

1508 

1509 Returns 

1510 ------- 

1511 resp: `HTTPResponse` 

1512 Response to the request as received from the server. 

1513 """ 

1514 resp = self._options(url=url, headers=headers) 

1515 match resp.status: 

1516 case HTTPStatus.OK | HTTPStatus.CREATED: 

1517 return resp 

1518 case _: 

1519 raise unexpected_status_error("OPTIONS", url, resp) 

1520 

1521 def propfind( 

1522 self, 

1523 url: str, 

1524 headers: dict[str, str] | None = None, 

1525 body: str = "", 

1526 depth: str = "0", 

1527 ) -> HTTPResponse: 

1528 """Send a HTTP PROPFIND request and return the unmodified response on 

1529 success. 

1530 

1531 Parameters 

1532 ---------- 

1533 url : `str` 

1534 Target URL. 

1535 headers : `dict[str, str]`, optional 

1536 Headers to sent with the request. 

1537 body : `str`, optional 

1538 Request body. 

1539 depth : `str`, optional 

1540 ???. 

1541 """ 

1542 headers = {} if headers is None else dict(headers) 

1543 headers.update( 

1544 { 

1545 "Depth": depth, 

1546 "Content-Type": 'application/xml; charset="utf-8"', 

1547 "Content-Length": str(len(body)), 

1548 } 

1549 ) 

1550 resp = self._propfind(url=url, headers=headers, body=body) 

1551 match resp.status: 

1552 case HTTPStatus.MULTI_STATUS | HTTPStatus.NOT_FOUND: 

1553 return resp 

1554 case _: 

1555 raise unexpected_status_error("PROPFIND", url, resp) 

1556 

1557 def put( 

1558 self, 

1559 url: str, 

1560 headers: dict[str, str] | None = None, 

1561 data: BinaryIO | bytes = b"", 

1562 ) -> int | None: 

1563 """Send a HTTP PUT request. 

1564 

1565 Parameters 

1566 ---------- 

1567 url : `str` 

1568 Target URL. 

1569 headers : `dict[str, str]`, optional 

1570 Headers to sent with the request. 

1571 data : `BinaryIO` or `bytes` 

1572 Request body. 

1573 

1574 Returns 

1575 ------- 

1576 size : `int | None` 

1577 The size in bytes of the file uploaded. Can be `None` if the size 

1578 could not be retrieved. 

1579 """ 

1580 # Send a PUT request with empty body and handle redirection. This 

1581 # is useful if the server redirects us; since we cannot rewind the 

1582 # data we are uploading, we don't start uploading data until we 

1583 # connect to the server that will actually serve our request. 

1584 frontend_headers = {} if headers is None else dict(headers) 

1585 frontend_headers.update({"Content-Length": "0"}) 

1586 resp = self._put(url, headers=frontend_headers, body=b"", redirect=False) 

1587 match resp.status: 

1588 case HTTPStatus.OK | HTTPStatus.CREATED | HTTPStatus.NO_CONTENT: 

1589 redirect_url = url 

1590 case status if status in resp.REDIRECT_STATUSES: 

1591 redirect_url = resp.headers.get("Location") 

1592 case _: 

1593 raise unexpected_status_error("PUT", url, resp) 

1594 

1595 # We may have been redirectred. Upload the file contents to 

1596 # its final destination. 

1597 

1598 # Ask the server to compute and record a checksum of the uploaded 

1599 # file contents, for later integrity checks. Since we don't compute 

1600 # the digest ourselves while uploading the data, we cannot control 

1601 # after the request is complete that the data we uploaded is 

1602 # identical to the data recorded by the server, but at least the 

1603 # server has recorded a digest of the data it stored. 

1604 # 

1605 # See RFC-3230 for details and 

1606 # https://www.iana.org/assignments/http-dig-alg/http-dig-alg.xhtml 

1607 # for the list of supported digest algorithhms. 

1608 # 

1609 # In addition, note that not all servers implement this RFC so 

1610 # the checksum reqquest may be ignored by the server. 

1611 backend_headers = {} if headers is None else dict(headers) 

1612 if (checksum := self._config.request_checksum) is not None: 

1613 backend_headers.update({"Want-Digest": checksum}) 

1614 

1615 resp = self._put(redirect_url, body=data, headers=backend_headers) 

1616 match resp.status: 

1617 case HTTPStatus.OK | HTTPStatus.CREATED | HTTPStatus.NO_CONTENT: 

1618 # Send a HEAD request to retrieve the size of the file we 

1619 # just uploaded 

1620 resp = self.head(redirect_url) 

1621 size = int(resp.headers.get("Content-Length", -1)) 

1622 return None if size == -1 else size 

1623 case _: 

1624 raise unexpected_status_error("PUT", redirect_url, resp) 

1625 

1626 def _get_temporary_basename(self, basename: str, prefix: str) -> str: 

1627 """Return a basename for a temporary file.""" 

1628 unique_id = str(uuid.uuid4()) 

1629 return f"{prefix}.{unique_id}.{basename}" 

1630 

1631 def _split_parent_and_basename(self, url: str) -> tuple[str, str]: 

1632 """Return the URL of the parent directory and the basename from 

1633 `url`. 

1634 """ 

1635 parsed: Url = parse_url(url) 

1636 normalized_path = normalize_path(parsed.path) 

1637 parent_path = posixpath.dirname(normalized_path) 

1638 basename = posixpath.basename(normalized_path) 

1639 parent_url = Url( 

1640 scheme=parsed.scheme, 

1641 auth=parsed.auth, 

1642 host=parsed.host, 

1643 port=parsed.port, 

1644 path=parent_path, 

1645 query=parsed.query, 

1646 fragment=parsed.fragment, 

1647 ).url 

1648 return parent_url, basename 

1649 

1650 def _parent(self, url: str) -> str: 

1651 """Return the URL of the parent directory to `url`.""" 

1652 parent_url, _ = self._split_parent_and_basename(url) 

1653 return parent_url 

1654 

1655 def _make_temporary_url(self, url: str, prefix: str = ".tmp") -> str: 

1656 """Return the URL of a temporary file based on `url`.""" 

1657 parent_url, basename = self._split_parent_and_basename(url) 

1658 temporary_basename = self._get_temporary_basename(basename=basename, prefix=prefix) 

1659 return f"{parent_url}/{temporary_basename}" 

1660 

1661 def exists(self, url: str) -> bool: 

1662 """Return True if a file or directory exists at `url`. 

1663 

1664 Parameters 

1665 ---------- 

1666 url : `str` 

1667 Target URL. 

1668 

1669 Returns 

1670 ------- 

1671 result: `bool` 

1672 True if there is an object at `url`. 

1673 """ 

1674 return self.stat(url).exists 

1675 

1676 def size(self, url: str) -> int: 

1677 """Return the size in bytes of resource at `url`. 

1678 

1679 If `url` designates a directory, the size is zero. 

1680 

1681 Parameters 

1682 ---------- 

1683 url : `str` 

1684 Target URL. 

1685 

1686 Returns 

1687 ------- 

1688 size: `int` 

1689 The number of bytes of the resource located at `url`. 

1690 """ 

1691 # Check if we have the size of this URL in our cache 

1692 if (size := self._file_size_cache.get_size(url)) is not None: 

1693 return size 

1694 

1695 stat = self.stat(url) 

1696 if not stat.exists: 

1697 raise FileNotFoundError(f"No file or directory found at {url}") 

1698 else: 

1699 return stat.size 

1700 

1701 def is_dir(self, url: str) -> bool: 

1702 """Return True if a directory exists at `url`. 

1703 

1704 Parameters 

1705 ---------- 

1706 url : `str` 

1707 Target URL. 

1708 

1709 Returns 

1710 ------- 

1711 result: `bool` 

1712 True if there is a directory at `url`. 

1713 """ 

1714 return self.stat(url).is_dir 

1715 

1716 def mkcol(self, url: str) -> None: 

1717 """Create a directory at `url`. 

1718 

1719 If a directory already exists at `url` no error is returned nor 

1720 exception is raised. An exception is raised if a file exists at `url`. 

1721 

1722 Parameters 

1723 ---------- 

1724 url : `str` 

1725 Target URL. 

1726 """ 

1727 resp = self._mkcol(url=url) 

1728 match resp.status: 

1729 case HTTPStatus.CREATED | HTTPStatus.METHOD_NOT_ALLOWED: 

1730 return 

1731 case HTTPStatus.CONFLICT: 

1732 # The parent directory does not exist. Create it first except 

1733 # if the parent's path is "/". 

1734 parent = self._parent(url) 

1735 if not parent.endswith("/"): 

1736 self.mkcol(parent) 

1737 resp = self._mkcol(url=url) 

1738 case _: 

1739 raise ValueError( 

1740 f"Can not create directory {resp.geturl()}: status {resp.status} {resp.reason}" 

1741 ) 

1742 

1743 def stat(self, url: str) -> DavFileMetadata: 

1744 """Return some properties of file or directory located at `url`. 

1745 

1746 Parameters 

1747 ---------- 

1748 url : `str` 

1749 Target URL. 

1750 

1751 Returns 

1752 ------- 

1753 result: `DavResourceMetadata` 

1754 Details of the resources at `url`. If no resource was found at 

1755 that URL no exception is raised. Instead the returned details allow 

1756 for detecting that the resource does not exist. 

1757 

1758 The returned value should include fields to determine 

1759 if there is a file or a directory at that `url` and if so, its 

1760 size and kind (file or directory). Other fields may also be 

1761 included depending on the implementation of the webDAV protocol 

1762 by the server. 

1763 """ 

1764 # Request the minimum set of DAV properties. 

1765 body = ( 

1766 """<?xml version="1.0" encoding="utf-8"?>""" 

1767 """<D:propfind xmlns:D="DAV:">""" 

1768 """<D:prop>""" 

1769 """<D:resourcetype/>""" 

1770 """<D:getcontentlength/>""" 

1771 """<D:getlastmodified/>""" 

1772 """</D:prop>""" 

1773 """</D:propfind>""" 

1774 ) 

1775 resp = self.propfind(url, body=body, depth="0") 

1776 match resp.status: 

1777 case HTTPStatus.NOT_FOUND: 

1778 href = url.replace(self._base_url, "", 1) 

1779 return DavFileMetadata(base_url=self._base_url, href=href) 

1780 case HTTPStatus.MULTI_STATUS: 

1781 property = self._propfind_parser.parse(resp.data)[0] 

1782 return DavFileMetadata.from_property(base_url=self._base_url, property=property) 

1783 case _: 

1784 raise unexpected_status_error("PROPFIND", url, resp) 

1785 

1786 def info(self, url: str, name: str | None = None) -> dict[str, Any]: 

1787 """Return the details about the file or directory at `url`. 

1788 

1789 Parameters 

1790 ---------- 

1791 url : `str` 

1792 Target URL. 

1793 name : `str` 

1794 Name of the object to be included in the returned value. If None, 

1795 the `url` is used as name. 

1796 

1797 Returns 

1798 ------- 

1799 result: `dict` 

1800 For an existing file, the returned value has the form: 

1801 

1802 .. code-block:: json 

1803 

1804 { 

1805 "name": name, 

1806 "size": 1234, 

1807 "type": "file", 

1808 "last_modified": 

1809 datetime.datetime(2025, 4, 10, 15, 12, 51, 227854), 

1810 "checksums": { 

1811 "adler32": "0fc5f83f", 

1812 "md5": "1f57339acdec099c6c0a41f8e3d5fcd0", 

1813 } 

1814 } 

1815 

1816 For an existing directory, the returned value has the form: 

1817 

1818 .. code-block:: json 

1819 

1820 { 

1821 "name": name, 

1822 "size": 0, 

1823 "type": "directory", 

1824 "last_modified": 

1825 datetime.datetime(2025, 4, 10, 15, 12, 51, 227854), 

1826 "checksums": {}, 

1827 } 

1828 

1829 For a non-existing file or directory, the returned value has the 

1830 form: 

1831 

1832 .. code-block:: json 

1833 

1834 { 

1835 "name": name, 

1836 "size": None, 

1837 "type": None, 

1838 "last_modified": datetime.datetime(1, 1, 1, 0, 0), 

1839 "checksums": {}, 

1840 } 

1841 

1842 Notes 

1843 ----- 

1844 The format of the returned directory is inspired and compatible with 

1845 `fsspec`. 

1846 

1847 The size of existing directories is always zero. The `checksums` 

1848 dictionary is empty for directories and may be empty for files if the 

1849 server does not compute and store the checksum of the files it stores. 

1850 """ 

1851 result: dict[str, Any] = { 

1852 "name": name if name is not None else url, 

1853 "type": None, 

1854 "size": None, 

1855 "last_modified": datetime.min, 

1856 "checksums": {}, 

1857 } 

1858 metadata = self.stat(url) 

1859 if not metadata.exists: 

1860 return result 

1861 

1862 result.update( 

1863 { 

1864 "type": "directory" if metadata.is_dir else "file", 

1865 "size": metadata.size, 

1866 "last_modified": metadata.last_modified, 

1867 "checksums": metadata.checksums, 

1868 } 

1869 ) 

1870 return result 

1871 

1872 def move(self, source_url: str, destination_url: str, overwrite: bool = False) -> HTTPResponse: 

1873 """Send a webDAV MOVE request and return the response unmodified. 

1874 

1875 Parameters 

1876 ---------- 

1877 source_url : `str` 

1878 Source URL. 

1879 destination_url : `str` 

1880 Destination URL. 

1881 overwrite : `bool`, optional 

1882 Overwrite the destination if it exists. 

1883 

1884 Returns 

1885 ------- 

1886 resp : `HTTPResponse` 

1887 The unmodified response received from the server. 

1888 """ 

1889 headers = { 

1890 "Destination": destination_url, 

1891 "Overwrite": "T" if overwrite else "F", 

1892 } 

1893 return self._move(source_url, headers=headers) 

1894 

1895 def read_dir(self, url: str) -> list[DavFileMetadata]: 

1896 """Return the properties of the files or directories contained in 

1897 directory located at `url`. 

1898 

1899 If `url` designates a file, only the details of itself are returned. 

1900 

1901 Parameters 

1902 ---------- 

1903 url : `str` 

1904 Target URL. 

1905 

1906 Returns 

1907 ------- 

1908 result: `list[DavResourceMetadata]` 

1909 List of details of each file or directory within `url`. 

1910 """ 

1911 body = ( 

1912 """<?xml version="1.0" encoding="utf-8"?>""" 

1913 """<D:propfind xmlns:D="DAV:"><D:prop>""" 

1914 """<D:resourcetype/><D:getcontentlength/><D:getlastmodified/><D:displayname/>""" 

1915 """</D:prop></D:propfind>""" 

1916 ) 

1917 resp = self.propfind(url, body=body, depth="1") 

1918 match resp.status: 

1919 case HTTPStatus.MULTI_STATUS: 

1920 pass 

1921 case HTTPStatus.NOT_FOUND: 

1922 raise FileNotFoundError(f"No directory found at {resp.geturl()}") 

1923 case _: 

1924 raise unexpected_status_error("PROPFIND", url, resp) 

1925 

1926 if (path := parse_url(url).path) is not None: 

1927 this_dir_href = path.rstrip("/") + "/" 

1928 else: 

1929 this_dir_href = "/" 

1930 

1931 result = [] 

1932 for property in self._propfind_parser.parse(resp.data): 

1933 # Don't include in the results the metadata of the directory we 

1934 # traversing. 

1935 # Some webDAV servers do not append a "/" to the href of a 

1936 # directory in their response to PROPFIND, so we must take into 

1937 # account that. 

1938 if property.is_file: 

1939 result.append(DavFileMetadata.from_property(base_url=self._base_url, property=property)) 

1940 elif property.is_dir and property.href != this_dir_href: 

1941 result.append(DavFileMetadata.from_property(base_url=self._base_url, property=property)) 

1942 

1943 return result 

1944 

1945 def read(self, url: str) -> tuple[str, bytes]: 

1946 """Download the contents of file located at `url`. 

1947 

1948 Parameters 

1949 ---------- 

1950 url : `str` 

1951 Target URL. 

1952 

1953 Returns 

1954 ------- 

1955 url: `str` 

1956 Backend URL from which the data was obtained. 

1957 data: `bytes` 

1958 Contents of the file. 

1959 

1960 Notes 

1961 ----- 

1962 The caller must ensure that the resource at `url` is a file, not 

1963 a directory. 

1964 """ 

1965 backend_url, resp = self.get(url) 

1966 return backend_url, resp.data 

1967 

1968 def read_range( 

1969 self, 

1970 url: str, 

1971 start: int, 

1972 end: int | None, 

1973 headers: dict[str, str] | None = None, 

1974 release_backend: bool = True, 

1975 ) -> tuple[str, bytes]: 

1976 """Download partial content of file located at `url`. 

1977 

1978 Parameters 

1979 ---------- 

1980 url : `str` 

1981 Target URL. 

1982 start : `int` 

1983 Starting byte offset of the range to download. 

1984 end : `int` 

1985 Ending byte offset of the range to download. 

1986 headers : `dict[str,str]`, optional 

1987 Specific headers to sent with the GET request. 

1988 release_backend : `bool`, optional 

1989 Whether or not to close the connection to the backend. 

1990 

1991 Returns 

1992 ------- 

1993 backend_url: `str` 

1994 URL used to retrieve this data. If the server redirected us 

1995 this is the URL we were redirected to. 

1996 data: `bytes` 

1997 Partial contents of the file. 

1998 

1999 Notes 

2000 ----- 

2001 The caller must ensure that the resource at `url` is a file, not 

2002 a directory. This is important because some webDAV servers respond 

2003 with an HTML document when asked for reading a directory. 

2004 """ 

2005 range_headers = {"Accept-Encoding": "identity"} 

2006 if end is None: 

2007 range_headers.update({"Range": f"bytes={start}-"}) 

2008 else: 

2009 range_headers.update({"Range": f"bytes={start}-{end}"}) 

2010 

2011 frontend_headers = {} if headers is None else dict(headers) 

2012 frontend_headers.update(range_headers) 

2013 

2014 # Send the request to the frontend server and don't follow 

2015 # redirections automatically. We need to be able to add a 

2016 # "Connection: close" request header when sending the request to the 

2017 # backend server (if any) if are requested to. We don't send that 

2018 # header to the frontend. 

2019 final_url, resp = self.get(url, headers=frontend_headers, redirect=False) 

2020 match resp.status: 

2021 case HTTPStatus.PARTIAL_CONTENT: 

2022 return final_url, resp.data 

2023 case status if status not in resp.REDIRECT_STATUSES: 

2024 raise unexpected_status_error("GET (with 'Range' header)", url, resp) 

2025 case _: 

2026 pass 

2027 

2028 # We were redirected to a backend server. Follow the redirection and 

2029 # if requested add a "Connection: close" header to explicitly release 

2030 # the backend server. 

2031 redirect_url = resp.headers.get("Location") 

2032 log.debug("GET request to %s got redirected to %s", url, redirect_url) 

2033 

2034 backend_headers = {} if headers is None else dict(headers) 

2035 backend_headers.update(range_headers) 

2036 backend_headers.update({"Connection": "close" if release_backend else "keep-alive"}) 

2037 

2038 final_url, resp = self.get(redirect_url, headers=backend_headers) 

2039 match resp.status: 

2040 case HTTPStatus.PARTIAL_CONTENT: 

2041 return final_url, resp.data 

2042 case _: 

2043 raise unexpected_status_error("GET (with 'Range' header)", redirect_url, resp) 

2044 

2045 def _write_response_body_to_file(self, resp: HTTPResponse, filename: str, chunk_size: int) -> int: 

2046 """Write the the response body to a local file. 

2047 

2048 Parameters 

2049 ---------- 

2050 resp : `HTTPResponse` 

2051 The HTTP Response to read the body from. 

2052 filename : `str` 

2053 Local file to write the content to. If the file already exists, 

2054 it will be rewritten. 

2055 chunk_size : `int` 

2056 Size of the chunks to write to `filename`. 

2057 

2058 Returns 

2059 ------- 

2060 count: `int` 

2061 Number of bytes written to `filename`. 

2062 """ 

2063 try: 

2064 # Read the response body into a pre-allocated memory buffer and 

2065 # write the buffer content to the destination file avoiding 

2066 # copies if possible. 

2067 content_length = 0 

2068 with open(filename, "wb", buffering=0) as file: 

2069 view = memoryview(bytearray(chunk_size)) 

2070 while True: 

2071 if (count := resp.readinto(view)) > 0: # type: ignore 

2072 content_length += count 

2073 file.write(view[:count]) 

2074 else: 

2075 break 

2076 

2077 # Check that the expected and actual content lengths match. 

2078 # Perform this check only when the body of the response was not 

2079 # encoded by the server. 

2080 expected_length: int = int(resp.headers.get("Content-Length", -1)) 

2081 if ( 

2082 "Content-Encoding" not in resp.headers 

2083 and expected_length != -1 

2084 and expected_length != content_length 

2085 ): 

2086 raise ValueError( 

2087 f"Size of downloaded file does not match value in Content-Length header for " 

2088 f"{resp.geturl()}: expecting {expected_length} and got {content_length} bytes" 

2089 ) 

2090 

2091 return content_length 

2092 finally: 

2093 # Release the connection 

2094 resp.drain_conn() 

2095 resp.release_conn() 

2096 

2097 def download(self, url: str, filename: str, chunk_size: int) -> int: 

2098 """Download the content of a file and write it to local file. 

2099 

2100 Parameters 

2101 ---------- 

2102 url : `str` 

2103 Target URL. 

2104 filename : `str` 

2105 Local file to write the content to. If the file already exists, 

2106 it will be rewritten. 

2107 chunk_size : `int` 

2108 Size of the chunks to write to `filename`. 

2109 

2110 Returns 

2111 ------- 

2112 count: `int` 

2113 Number of bytes written to `filename`. 

2114 

2115 Notes 

2116 ----- 

2117 The caller must ensure that the resource at `url` is a file, not 

2118 a directory. 

2119 """ 

2120 _, resp = self.get(url, preload_content=False) 

2121 return self._write_response_body_to_file(resp, filename, chunk_size) 

2122 

2123 def write(self, url: str, data: BinaryIO | bytes) -> int | None: 

2124 """Create or rewrite a remote file at `url` with `data` as its 

2125 contents. 

2126 

2127 Parameters 

2128 ---------- 

2129 url : `str` 

2130 Target URL. 

2131 data : `bytes` 

2132 Sequence of bytes to upload. 

2133 

2134 Returns 

2135 ------- 

2136 size : `int | None` 

2137 The size in bytes of the file uploaded. Can be `None` if the size 

2138 could not be retrieved. 

2139 

2140 Notes 

2141 ----- 

2142 If a file already exists at `url` it will be rewritten. 

2143 """ 

2144 # According to RFC 4918, the parent directory of the file must 

2145 # exist before we can write to it. So create it first and then 

2146 # upload. 

2147 self.mkcol(self._parent(url)) 

2148 

2149 try: 

2150 # Upload to a temporary file and rename to the final name. 

2151 temporary_url = self._make_temporary_url(url) 

2152 size = self.put(temporary_url, data=data) 

2153 self.rename(temporary_url, url, overwrite=True, create_parent=False) 

2154 

2155 # Update the file size cache with this size 

2156 self._file_size_cache.update_size(url, size) 

2157 return size 

2158 except Exception: 

2159 # Upload failed. Attempt to remove the temporary file. 

2160 self.delete(temporary_url) 

2161 raise 

2162 

2163 def checksums(self, url: str) -> dict[str, str]: 

2164 """Return the checksums of the contents of file located at `url`. 

2165 

2166 The checksums are retrieved from the storage endpoint. There may be 

2167 none if the storage endpoint does not automatically expose the 

2168 checksums it computes. 

2169 

2170 Parameters 

2171 ---------- 

2172 url : `str` 

2173 Target URL. 

2174 

2175 Returns 

2176 ------- 

2177 checksums: `dict[str, str]` 

2178 A file exists at `url`. 

2179 The key of the dictionary is the lowercased name of the checksum 

2180 algorithm (e.g. "md5", "adler32"). The value is the lowercased 

2181 checksum itself (e.g. "78441cec2479ec8b545c4d6699f542da"). 

2182 """ 

2183 stat = self.stat(url) 

2184 if not stat.exists: 

2185 raise FileNotFoundError(f"No file found at {url}") 

2186 

2187 return stat.checksums if stat.is_file else {} 

2188 

2189 def delete(self, url: str) -> None: 

2190 """Delete the file or directory at `url`. 

2191 

2192 If there is no file or directory at `url` is not considered an error. 

2193 

2194 Parameters 

2195 ---------- 

2196 url : `str` 

2197 Target URL. 

2198 

2199 Notes 

2200 ----- 

2201 If `url` designates a directory, some webDAV servers recursively 

2202 remove the directory and its contents. Others, only remove the 

2203 directory if it is empty. 

2204 

2205 For a consisten behavior, the caller must check what kind of object 

2206 the target URL is and walk the hierarchy removing all objects. 

2207 """ 

2208 resp = self._delete(url) 

2209 match resp.status: 

2210 case HTTPStatus.OK | HTTPStatus.ACCEPTED | HTTPStatus.NO_CONTENT | HTTPStatus.NOT_FOUND: 

2211 # Invalidate the entry for this file in our cache, if any 

2212 self._file_size_cache.invalidate(url) 

2213 case _: 

2214 raise ValueError( 

2215 f"Unable to delete resource {resp.geturl()}: status {resp.status} {resp.reason}" 

2216 ) 

2217 

2218 def accepts_ranges(self, url: str) -> bool: 

2219 """Return `True` if the server supports a 'Range' header in 

2220 GET requests against `url`. 

2221 

2222 Parameters 

2223 ---------- 

2224 url : `str` 

2225 Target URL. 

2226 """ 

2227 # If we have already determined that the server accepts "Range" for 

2228 # another URL, we assume that it implements that feature for any 

2229 # file it serves, so reuse that information. 

2230 if self._accepts_ranges is not None: 

2231 return self._accepts_ranges 

2232 

2233 with self._lock: 

2234 if self._accepts_ranges is None: 

2235 self._accepts_ranges = self.head(url).headers.get("Accept-Ranges", "") == "bytes" 

2236 

2237 return self._accepts_ranges 

2238 

2239 @property 

2240 def supports_duplicate(self) -> bool: 

2241 """Return True if the server this client interacts with implements 

2242 webDAV COPY method. 

2243 """ 

2244 return self._can_duplicate 

2245 

2246 def copy(self, source_url: str, destination_url: str, overwrite: bool = False) -> None: 

2247 """Copy the file at `source_url` to `destination_url` in the same 

2248 storage endpoint. 

2249 

2250 Parameters 

2251 ---------- 

2252 source_url : `str` 

2253 URL of the source file. 

2254 destination_url : `str` 

2255 URL of the destination file. Its parent directory must exist. 

2256 overwrite : `bool` 

2257 If True and a file exists at `destination_url` it will be 

2258 overwritten. Otherwise an exception is raised. 

2259 """ 

2260 headers = { 

2261 "Destination": destination_url, 

2262 "Overwrite": "T" if overwrite else "F", 

2263 } 

2264 resp = self._copy(source_url, headers=headers) 

2265 match resp.status: 

2266 case HTTPStatus.CREATED | HTTPStatus.NO_CONTENT: 

2267 self._file_size_cache.invalidate(destination_url) 

2268 case _: 

2269 raise ValueError( 

2270 f"Could not copy {resp.geturl()} to {destination_url}: status {resp.status} {resp.reason}" 

2271 ) 

2272 

2273 def duplicate(self, source_url: str, destination_url: str, overwrite: bool = False) -> None: 

2274 """Copy the file at `source_url` to `destination_url` in the same 

2275 storage endpoint. 

2276 

2277 Parameters 

2278 ---------- 

2279 source_url : `str` 

2280 URL of the source file. 

2281 destination_url : `str` 

2282 URL of the destination file. Its parent directory is created if 

2283 necessary. 

2284 overwrite : `bool` 

2285 If True and a file exists at `destination_url` it will be 

2286 overwritten. Otherwise an exception is raised. 

2287 """ 

2288 # Check the source is a file 

2289 if self.is_dir(source_url): 

2290 raise NotImplementedError(f"copy is not implemented for directory {source_url}") 

2291 

2292 # Create the destination's parent directory first because COPY may 

2293 # fail if it does not exist, depending on the server implementation 

2294 # of RFC 4918. 

2295 destination_parent = self._parent(destination_url) 

2296 self.mkcol(destination_parent) 

2297 self.copy(source_url=source_url, destination_url=destination_url, overwrite=overwrite) 

2298 

2299 def rename( 

2300 self, 

2301 source_url: str, 

2302 destination_url: str, 

2303 overwrite: bool = False, 

2304 create_parent: bool = True, 

2305 ) -> None: 

2306 """Rename (move) the file at `source_url` to `destination_url` in the 

2307 same storage endpoint. 

2308 

2309 Parameters 

2310 ---------- 

2311 source_url : `str` 

2312 URL of the source file. 

2313 destination_url : `str` 

2314 URL of the destination file. Its parent directory must exist. 

2315 overwrite : `bool`, optional 

2316 If True and a file exists at `destination_url` it will be 

2317 overwritten. Otherwise an exception is raised. 

2318 create_parent : `bool`, optional 

2319 Whether to create the parent. 

2320 """ 

2321 # Create the destination's parent directory first because MOVE may 

2322 # fail if it does not exist, depending on the server implementation 

2323 # of RFC 4918. 

2324 if create_parent: 

2325 destination_parent = self._parent(destination_url) 

2326 self.mkcol(destination_parent) 

2327 

2328 resp = self.move(source_url=source_url, destination_url=destination_url, overwrite=overwrite) 

2329 match resp.status: 

2330 case HTTPStatus.OK | HTTPStatus.CREATED | HTTPStatus.NO_CONTENT: 

2331 self._file_size_cache.invalidate(destination_url) 

2332 case _: 

2333 raise ValueError( 

2334 f"""Could not move file {resp.geturl()} to {destination_url}: status {resp.status} """ 

2335 f"""{resp.reason}""" 

2336 ) 

2337 

2338 def generate_presigned_get_url(self, url: str, expiration_time_seconds: int) -> str: 

2339 """Return a pre-signed URL that can be used to retrieve this resource 

2340 using an HTTP GET without supplying any access credentials. 

2341 

2342 Parameters 

2343 ---------- 

2344 url : `str` 

2345 Target URL. 

2346 expiration_time_seconds : `int` 

2347 Number of seconds until the generated URL is no longer valid. 

2348 

2349 Returns 

2350 ------- 

2351 url : `str` 

2352 HTTP URL signed for GET. 

2353 """ 

2354 raise NotImplementedError(f"URL signing is not supported by server for {self}") 

2355 

2356 def generate_presigned_put_url(self, url: str, expiration_time_seconds: int) -> str: 

2357 """Return a pre-signed URL that can be used to upload a file to this 

2358 path using an HTTP PUT without supplying any access credentials. 

2359 

2360 Parameters 

2361 ---------- 

2362 url : `str` 

2363 Target URL. 

2364 expiration_time_seconds : `int` 

2365 Number of seconds until the generated URL is no longer valid. 

2366 

2367 Returns 

2368 ------- 

2369 url : `str` 

2370 HTTP URL signed for PUT. 

2371 """ 

2372 raise NotImplementedError(f"URL signing is not supported by server for {self}") 

2373 

2374 

2375class ActivityCaveat(enum.Enum): 

2376 """Helper class for enumerating accepted activity caveats for requesting 

2377 macaroons for dCache or XRootD webDAV servers. 

2378 """ 

2379 

2380 DOWNLOAD = 1 

2381 UPLOAD = 2 

2382 

2383 

2384class DavClientURLSigner(DavClient): 

2385 """WebDAV client which supports signing of URL for upload and download. 

2386 

2387 Instances of this class are thread-safe. 

2388 

2389 Parameters 

2390 ---------- 

2391 url : `str` 

2392 Root URL of the storage endpoint 

2393 (e.g. "https://host.example.org:1234/"). 

2394 config : `DavConfig` 

2395 Configuration to initialize this client. 

2396 accepts_ranges : `bool` | `None` 

2397 Indicate whether the remote server accepts the ``Range`` header in GET 

2398 requests. 

2399 """ 

2400 

2401 def __init__(self, url: str, config: DavConfig, accepts_ranges: bool | None = None) -> None: 

2402 super().__init__(url=url, config=config, accepts_ranges=accepts_ranges) 

2403 

2404 def generate_presigned_get_url(self, url: str, expiration_time_seconds: int) -> str: 

2405 """Return a pre-signed URL that can be used to retrieve the resource 

2406 at `url` using an HTTP GET without supplying any access credentials. 

2407 

2408 Parameters 

2409 ---------- 

2410 url : `str` 

2411 URL of an existing file. 

2412 expiration_time_seconds : `int` 

2413 Number of seconds until the generated URL is no longer valid. 

2414 

2415 Returns 

2416 ------- 

2417 url : `str` 

2418 HTTP URL signed for GET. 

2419 

2420 Notes 

2421 ----- 

2422 Although the returned URL allows for downloading the file at `url` 

2423 without supplying credentials, the HTTP client must be configured 

2424 to accept the certificate the server will present if the client wants 

2425 validate it. The server's certificate may be issued by a certificate 

2426 authority unknown to the client. 

2427 """ 

2428 macaroon: str = self._get_macaroon(url, ActivityCaveat.DOWNLOAD, expiration_time_seconds) 

2429 return f"{url}?authz={macaroon}" 

2430 

2431 def generate_presigned_put_url(self, url: str, expiration_time_seconds: int) -> str: 

2432 """Return a pre-signed URL that can be used to upload a file to `url` 

2433 using an HTTP PUT without supplying any access credentials. 

2434 

2435 Parameters 

2436 ---------- 

2437 url : `str` 

2438 URL of an existing file. 

2439 expiration_time_seconds : `int` 

2440 Number of seconds until the generated URL is no longer valid. 

2441 

2442 Returns 

2443 ------- 

2444 url : `str` 

2445 HTTP URL signed for PUT. 

2446 

2447 Notes 

2448 ----- 

2449 Although the returned URL allows for uploading a file to `url` 

2450 without supplying credentials, the HTTP client must be configured 

2451 to accept the certificate the server will present if the client wants 

2452 validate it. The server's certificate may be issued by a certificate 

2453 authority unknown to the client. 

2454 """ 

2455 macaroon: str = self._get_macaroon(url, ActivityCaveat.UPLOAD, expiration_time_seconds) 

2456 return f"{url}?authz={macaroon}" 

2457 

2458 def _get_macaroon(self, url: str, activity: ActivityCaveat, expiration_time_seconds: int) -> str: 

2459 """Return a macaroon for uploading or downloading the file at `url`. 

2460 

2461 Parameters 

2462 ---------- 

2463 url : `str` 

2464 URL of an existing file. 

2465 activity : `ActivityCaveat` 

2466 the activity the macaroon is requested for. 

2467 expiration_time_seconds : `int` 

2468 Requested duration of the macaroon, in seconds. 

2469 

2470 Returns 

2471 ------- 

2472 macaroon : `str` 

2473 Macaroon to be used with `url` in a GET or PUT request. 

2474 """ 

2475 # dCache and XRootD webDAV servers support delivery of macaroons. 

2476 # 

2477 # For details about dCache macaroons see: 

2478 # https://www.dcache.org/manuals/UserGuide-9.2/macaroons.shtml 

2479 match activity: 

2480 case ActivityCaveat.DOWNLOAD: 

2481 activity_caveat = "DOWNLOAD,LIST" 

2482 case ActivityCaveat.UPLOAD: 

2483 activity_caveat = "UPLOAD,LIST,DELETE,MANAGE" 

2484 

2485 # Retrieve a macaroon for the requested activities and duration 

2486 headers = {"Content-Type": "application/macaroon-request"} 

2487 body = { 

2488 "caveats": [ 

2489 f"activity:{activity_caveat}", 

2490 ], 

2491 "validity": f"PT{expiration_time_seconds}S", 

2492 } 

2493 resp = self._request("POST", url, headers=headers, body=json.dumps(body)) 

2494 if resp.status != HTTPStatus.OK: 

2495 raise ValueError( 

2496 f"Could not retrieve a macaroon for URL {resp.geturl()}, status: {resp.status} {resp.reason}" 

2497 ) 

2498 

2499 # We are expecting the body of the response to be formatted in JSON. 

2500 # dCache sets the 'Content-Type' of the response to 'application/json' 

2501 # but XRootD does not set any 'Content-Type' header 8-[ 

2502 # 

2503 # An example of a response body returned by dCache is shown below: 

2504 # { 

2505 # "macaroon": "MDA[...]Qo", 

2506 # "uri": { 

2507 # "targetWithMacaroon": "https://dcache.example.org/?authz=MD...", 

2508 # "baseWithMacaroon": "https://dcache.example.org/?authz=MD...", 

2509 # "target": "https://dcache.example.org/", 

2510 # "base": "https://dcache.example.org/" 

2511 # } 

2512 # } 

2513 # 

2514 # An example of a response body returned by XRootD is shown below: 

2515 # { 

2516 # "macaroon": "MDA[...]Qo", 

2517 # "expires_in": 86400 

2518 # } 

2519 try: 

2520 response_body = json.loads(resp.data.decode()) 

2521 except json.JSONDecodeError: 

2522 raise ValueError(f"Could not deserialize response to POST request for URL {resp.geturl()}") 

2523 

2524 if "macaroon" in response_body: 

2525 return response_body["macaroon"] 

2526 

2527 raise ValueError(f"Could not retrieve macaroon for URL {resp.geturl()}") 

2528 

2529 @override 

2530 def duplicate(self, source_url: str, destination_url: str, overwrite: bool = False) -> None: 

2531 """Copy the file at `source_url` to `destination_url` in the same 

2532 storage endpoint. 

2533 

2534 Parameters 

2535 ---------- 

2536 source_url : `str` 

2537 URL of the source file. 

2538 destination_url : `str` 

2539 URL of the destination file. Its parent directory must exist. 

2540 overwrite : `bool` 

2541 If True and a file exists at `destination_url` it will be 

2542 overwritten. Otherwise an exception is raised. 

2543 """ 

2544 # Check the source is a file 

2545 if self.is_dir(source_url): 

2546 raise NotImplementedError(f"copy is not implemented for directory {source_url}") 

2547 

2548 # Neither dCache nor XrootD currently implement the COPY 

2549 # webDAV method as documented in 

2550 # 

2551 # http://www.webdav.org/specs/rfc4918.html#METHOD_COPY 

2552 # 

2553 # (See issues DM-37603 and DM-37651 for details) 

2554 # With those servers use third-party copy instead. 

2555 return self._copy_via_third_party(source_url, destination_url, overwrite) 

2556 

2557 def _copy_via_third_party(self, source_url: str, destination_url: str, overwrite: bool = False) -> None: 

2558 """Copy the file at `source_url` to `destination_url` in the same 

2559 storage endpoint using the third-party copy functionality 

2560 implemented by dCache and XRootD servers. 

2561 

2562 Parameters 

2563 ---------- 

2564 source_url : `str` 

2565 URL of the source file. 

2566 destination_url : `str` 

2567 URL of the destination file. Its parent directory must exist. 

2568 overwrite : `bool` 

2569 If True and a file exists at `destination_url` it will be 

2570 overwritten. Otherwise an exception is raised. 

2571 """ 

2572 # To implement COPY we use dCache's third-party copy mechanism 

2573 # documented at: 

2574 # 

2575 # https://www.dcache.org/manuals/UserGuide-10.2/webdav.shtml#third-party-transfers 

2576 # 

2577 # The reason is that dCache does not correctly implement webDAV's COPY 

2578 # method. See https://github.com/dCache/dcache/issues/6950 

2579 

2580 # Create the destination's parent directory first because COPY may 

2581 # fail if it does not exist, depending on the server implementation 

2582 # of RFC 4918. 

2583 destination_parent = self._parent(destination_url) 

2584 self.mkcol(destination_parent) 

2585 

2586 # Retrieve a macaroon for downloading the source 

2587 download_macaroon = self._get_macaroon(source_url, ActivityCaveat.DOWNLOAD, 300) 

2588 

2589 # Prepare and send the COPY request 

2590 try: 

2591 headers = { 

2592 "Source": source_url, 

2593 "TransferHeaderAuthorization": f"Bearer {download_macaroon}", 

2594 "Credential": "none", 

2595 "Depth": "0", 

2596 "Overwrite": "T" if overwrite else "F", 

2597 "RequireChecksumVerification": "false", 

2598 } 

2599 resp = self._copy(destination_url, headers=headers, preload_content=False) 

2600 match resp.status: 

2601 case HTTPStatus.CREATED: 

2602 return 

2603 case HTTPStatus.ACCEPTED: 

2604 pass 

2605 case _: 

2606 raise ValueError( 

2607 f"Unable to copy resource {resp.geturl()}; status: {resp.status} {resp.reason}" 

2608 ) 

2609 

2610 # Analyse the response to the COPY request that the server has 

2611 # not completed yet. 

2612 content_type = resp.headers.get("Content-Type") 

2613 if content_type != "text/perf-marker-stream": 

2614 raise ValueError( 

2615 f"""Unexpected Content-Type {content_type} in response to COPY request from """ 

2616 f"""{source_url} to {destination_url}""" 

2617 ) 

2618 

2619 # Read the performance markers in the response body until we get 

2620 # a "success" or "failure" notification. 

2621 # 

2622 # Documentation: 

2623 # https://dcache.org/manuals/UserGuide-10.2/webdav.shtml#third-party-transfers 

2624 for marker in io.TextIOWrapper(resp): # type: ignore 

2625 marker = marker.rstrip("\n") 

2626 if marker == "": # EOF 

2627 raise ValueError( 

2628 f"""Copying file from {source_url} to {destination_url} failed: """ 

2629 """could not get response from server""" 

2630 ) 

2631 elif marker.startswith("failure:"): 

2632 raise ValueError( 

2633 f"""Copying file from {source_url} to {destination_url} failed with error: """ 

2634 f"""{marker}""" 

2635 ) 

2636 elif marker.startswith("success:"): 

2637 return 

2638 finally: 

2639 resp.drain_conn() 

2640 

2641 

2642class DavClientDCache(DavClientURLSigner): 

2643 """Client for interacting with a dCache webDAV server. 

2644 

2645 Instances of this class are thread-safe. 

2646 

2647 Parameters 

2648 ---------- 

2649 url : `str` 

2650 Root URL of the storage endpoint 

2651 (e.g. "https://host.example.org:1234/"). 

2652 config : `DavConfig` 

2653 Configuration to initialize this client. 

2654 accepts_ranges : `bool` | `None` 

2655 Indicate whether the remote server accepts the ``Range`` header in GET 

2656 requests. 

2657 """ 

2658 

2659 # Regular expression to parse dCache's response body of a successful 

2660 # PUT request. Such a response body is of the form: 

2661 # 

2662 # "104857600 bytes uploaded\r\n\r\n" 

2663 # 

2664 rex: re.Pattern = re.compile(r"^(\d*) bytes uploaded", re.IGNORECASE | re.ASCII) 

2665 

2666 def __init__(self, url: str, config: DavConfig, accepts_ranges: bool | None = None) -> None: 

2667 super().__init__(url=url, config=config, accepts_ranges=accepts_ranges) 

2668 

2669 # Create a specialized pool manager for sending requests to dCache 

2670 # webdav door, in particular for retrieving metadata. 

2671 # 

2672 # As of dCache v10.2.14, the webDAV door leaves the network connection 

2673 # unusable for us for sending subsequent requests after serving 

2674 # GET, PUT, DELETE, etc., but leaves the connection intact after 

2675 # serving MKCOL, MOVE and PROPFIND requests. 

2676 # We take advantage of that by using a dedicated pool manager for 

2677 # those requests, so that the network connections managed by that pool 

2678 # be reused. This avoids establishing the TCP+TLS connection for each 

2679 # request. 

2680 pool_manager = self._make_pool_manager(self._config) 

2681 self._propfind_pool_manager = pool_manager 

2682 self._move_pool_manager = pool_manager 

2683 self._mkcol_pool_manager = pool_manager 

2684 

2685 # dCache does not deliver macaroons when we are not using a secure 

2686 # channel to interact with the door. In that case, we can not use 

2687 # third party copy and dCache does not correctly support the COPY 

2688 # method as stated in RFC-4918. 

2689 self._can_duplicate = self._base_url.startswith("https://") 

2690 

2691 @override 

2692 def _mkcol( 

2693 self, 

2694 url: str, 

2695 headers: dict[str, str] | None = None, 

2696 pool_manager: PoolManager | None = None, 

2697 ) -> HTTPResponse: 

2698 """Inherits doc string.""" 

2699 return self._request("MKCOL", url=url, headers=headers, pool_manager=self._mkcol_pool_manager) 

2700 

2701 @override 

2702 def _move( 

2703 self, 

2704 url: str, 

2705 headers: dict[str, str] | None = None, 

2706 pool_manager: PoolManager | None = None, 

2707 ) -> HTTPResponse: 

2708 """Inherits doc string.""" 

2709 return self._request("MOVE", url=url, headers=headers, pool_manager=self._move_pool_manager) 

2710 

2711 @override 

2712 def _propfind( 

2713 self, 

2714 url: str, 

2715 headers: dict[str, str] | None = None, 

2716 body: str = "", 

2717 pool_manager: PoolManager | None = None, 

2718 ) -> HTTPResponse: 

2719 """Inherits doc string.""" 

2720 return self._request( 

2721 "PROPFIND", url=url, headers=headers, body=body, pool_manager=self._propfind_pool_manager 

2722 ) 

2723 

2724 @override 

2725 def put( 

2726 self, 

2727 url: str, 

2728 headers: dict[str, str] | None = None, 

2729 data: BinaryIO | bytes = b"", 

2730 ) -> int | None: 

2731 # Docstring inherited. 

2732 # Send a PUT request with empty body to the dCache frontend server to 

2733 # get redirected to the backend. 

2734 # 

2735 # Details: 

2736 # https://www.dcache.org/manuals/UserGuide-10.2/webdav.shtml#redirection 

2737 frontend_headers = {} if headers is None else dict(headers) 

2738 frontend_headers.update({"Content-Length": "0", "Expect": "100-continue"}) 

2739 if is_zero_length := isinstance(data, bytes) and len(data) == 0: 

2740 # We are uploading an empty file. Don't send the "Expect" header 

2741 # so that the dCache door handles this PUT request itself without 

2742 # redirecting us to a pool. 

2743 frontend_headers.pop("Expect") 

2744 

2745 resp = self._put(url, headers=frontend_headers, body=b"", redirect=False) 

2746 match resp.status: 

2747 case HTTPStatus.OK | HTTPStatus.CREATED | HTTPStatus.NO_CONTENT: 

2748 redirect_url = url 

2749 case status if status in resp.REDIRECT_STATUSES: 

2750 redirect_url = resp.headers.get("Location") 

2751 case _: 

2752 raise unexpected_status_error("PUT", url, resp) 

2753 

2754 # If we are uploading an empty file, there is nothing more to do. 

2755 if is_zero_length: 

2756 return 0 

2757 

2758 # We may have beend redirected to a backend server. Upload the file 

2759 # contents to its final destination. Explicitly ask the server to close 

2760 # this network connection after serving this PUT request to release 

2761 # the associated dCache mover. 

2762 backend_headers = {} if headers is None else dict(headers) 

2763 backend_headers.update({"Connection": "close"}) 

2764 

2765 # Ask dCache to compute and record a checksum of the uploaded 

2766 # file contents, for later integrity checks. Since we don't compute 

2767 # the digest ourselves while uploading the data, we cannot control 

2768 # after the request is complete that the data we uploaded is 

2769 # identical to the data recorded by the server, but at least the 

2770 # server has recorded a digest of the data it stored. 

2771 # 

2772 # See RFC-3230 for details and 

2773 # https://www.iana.org/assignments/http-dig-alg/http-dig-alg.xhtml 

2774 # for the list of supported digest algorithhms. 

2775 if (checksum := self._config.request_checksum) is not None: 

2776 backend_headers.update({"Want-Digest": checksum}) 

2777 

2778 resp = self._put(redirect_url, body=data, headers=backend_headers) 

2779 match resp.status: 

2780 case HTTPStatus.OK | HTTPStatus.CREATED | HTTPStatus.NO_CONTENT: 

2781 # Parse the response body and extract the number of bytes 

2782 # uploaded. This allows us to avoid sending a HEAD request 

2783 # to retrieve the file size. 

2784 response_body = resp.data.decode() 

2785 if match := DavClientDCache.rex.match(response_body): 

2786 return int(match.group(1)) 

2787 else: 

2788 return None 

2789 case _: 

2790 raise unexpected_status_error("PUT", redirect_url, resp) 

2791 

2792 @override 

2793 def download(self, url: str, filename: str, chunk_size: int) -> int: 

2794 """Download the content of a file and write it to local file. 

2795 

2796 Parameters 

2797 ---------- 

2798 url : `str` 

2799 Target URL. 

2800 filename : `str` 

2801 Local file to write the content to. If the file already exists, 

2802 it will be rewritten. 

2803 chunk_size : `int` 

2804 Size of the chunks to write to `filename`. 

2805 

2806 Returns 

2807 ------- 

2808 count: `int` 

2809 Number of bytes written to `filename`. 

2810 

2811 Notes 

2812 ----- 

2813 The caller must ensure that the resource at `url` is a file, not 

2814 a directory. 

2815 """ 

2816 # Send a GET request without following redirection to get redirected 

2817 # to the backend server. 

2818 _, resp = self.get(url, preload_content=False, redirect=False) 

2819 match resp.status: 

2820 case HTTPStatus.OK: 

2821 # We were not redirected. Consume this response. 

2822 return self._write_response_body_to_file(resp, filename, chunk_size) 

2823 case status if status not in resp.REDIRECT_STATUSES: 

2824 raise unexpected_status_error("GET", url, resp) 

2825 case _: 

2826 # We were redirected. Follow this redirection. 

2827 pass 

2828 

2829 # Drain and release the response we received from the frontend server 

2830 # so that the connection can be reused. 

2831 resp.drain_conn() 

2832 resp.release_conn() 

2833 

2834 # We were redirected to a backend server. Send a GET request to the 

2835 # backend server and ask it to close the HTTP connection to force 

2836 # closing the network connection. 

2837 redirect_url = resp.headers.get("Location") 

2838 _, resp = self.get(redirect_url, headers={"Connection": "close"}, preload_content=False) 

2839 match resp.status: 

2840 case HTTPStatus.OK: 

2841 return self._write_response_body_to_file(resp, filename, chunk_size) 

2842 case _: 

2843 raise unexpected_status_error("GET", redirect_url, resp) 

2844 

2845 @override 

2846 def read(self, url: str) -> tuple[str, bytes]: 

2847 """Download the contents of file located at `url`. 

2848 

2849 Parameters 

2850 ---------- 

2851 url : `str` 

2852 Target URL. 

2853 

2854 Returns 

2855 ------- 

2856 url: `str` 

2857 Backend URL from which the data was obtained. 

2858 data: `bytes` 

2859 Contents of the file. 

2860 

2861 Notes 

2862 ----- 

2863 The caller must ensure that the resource at `url` is a file, not 

2864 a directory. 

2865 """ 

2866 # Send a GET request without following redirection to get redirected 

2867 # to the backend server. 

2868 backend_url, resp = self.get(url, redirect=False) 

2869 match resp.status: 

2870 case HTTPStatus.OK: 

2871 return backend_url, resp.data 

2872 case status if status in resp.REDIRECT_STATUSES: 

2873 redirect_url = resp.headers.get("Location") 

2874 case _: 

2875 raise unexpected_status_error("GET", url, resp) 

2876 

2877 # We were redirected. Send a GET request to the backend server 

2878 # and ask it to close the HTTP connection to force closing the 

2879 # network connection. 

2880 final_url, resp = self.get(redirect_url, headers={"Connection": "close"}) 

2881 match resp.status: 

2882 case HTTPStatus.OK: 

2883 return final_url, resp.data 

2884 case _: 

2885 raise unexpected_status_error("GET", redirect_url, resp) 

2886 

2887 @override 

2888 def write(self, url: str, data: BinaryIO | bytes) -> int | None: 

2889 """Create or rewrite a remote file at `url` with `data` as its 

2890 contents. 

2891 

2892 Parameters 

2893 ---------- 

2894 url : `str` 

2895 Target URL. 

2896 data : `bytes` 

2897 Sequence of bytes to upload. 

2898 

2899 Returns 

2900 ------- 

2901 size : `int | None` 

2902 The size in bytes of the file uploaded. Can be `None` if the size 

2903 could not be retrieved. 

2904 

2905 Notes 

2906 ----- 

2907 If a file already exists at `url` it will be rewritten. 

2908 """ 

2909 # dCache will automatically create all the parent directories so we 

2910 # don't need to explicitly create them. Although this is not compliant 

2911 # to RFC 4918, this is advantageous because it avoids several 

2912 # round-trips to the server for creating all the directories 

2913 # before actually uploading the data. 

2914 try: 

2915 # Upload to a temporary file and rename to the final name. 

2916 temporary_url = self._make_temporary_url(url) 

2917 size = self.put(temporary_url, data=data) 

2918 self.rename(temporary_url, url, overwrite=True, create_parent=False) 

2919 

2920 # Update the file size cache with this size 

2921 self._file_size_cache.update_size(url, size) 

2922 return size 

2923 except Exception: 

2924 # Upload failed. Attempt to remove the temporary file. 

2925 self.delete(temporary_url) 

2926 raise 

2927 

2928 @override 

2929 def mkcol(self, url: str) -> None: 

2930 """Create a directory at `url`. 

2931 

2932 If a directory already exists at `url` no error is returned nor 

2933 exception is raised. An exception is raised if a file exists at `url`. 

2934 

2935 Parameters 

2936 ---------- 

2937 url : `str` 

2938 Target URL. 

2939 """ 

2940 # A "MKCOL" request to dCache does not automatically create all 

2941 # the intermediate directories if they do not exist. However, a 

2942 # "PUT" request of a file does create the directory hierarchy. 

2943 # 

2944 # We exploit that to create directory hierarchies: we first create an 

2945 # empty file with a random name and then we remove it. As a side 

2946 # effect, the target directory will be created. 

2947 # 

2948 # Creating a directory this way implies two requests to the server 

2949 # ("PUT" and "DELETE"), while using "MKCOL" would on average imply 

2950 # one request per inexisting directory in the hierarchy. When 

2951 # directory hierarchies are relatively deep, requiring two 

2952 # requests per hierarchy is better than sending a "MKCOL" request 

2953 # per directory in the hierarchy. 

2954 try: 

2955 temporary_url = self._make_temporary_url(url=f"{url}/mkcol") 

2956 self.put(temporary_url, data=b"") 

2957 finally: 

2958 self.delete(temporary_url) 

2959 

2960 @override 

2961 def info(self, url: str, name: str | None = None) -> dict[str, Any]: 

2962 # Docstring inherited. 

2963 result: dict[str, Any] = { 

2964 "name": name if name is not None else url, 

2965 "type": None, 

2966 "size": None, 

2967 "last_modified": datetime.min, 

2968 "checksums": {}, 

2969 } 

2970 

2971 # Request live DAV properties as well as the checksums that dCache 

2972 # recorded about this file. 

2973 body = ( 

2974 """<?xml version="1.0" encoding="utf-8"?>""" 

2975 """<D:propfind xmlns:D="DAV:" xmlns:dcache="http://www.dcache.org/2013/webdav">""" 

2976 """<D:prop>""" 

2977 """<D:resourcetype/>""" 

2978 """<D:getcontentlength/>""" 

2979 """<D:getlastmodified/>""" 

2980 """<D:displayname/>""" 

2981 """<dcache:Checksums/>""" 

2982 """</D:prop>""" 

2983 """</D:propfind>""" 

2984 ) 

2985 resp = self.propfind(url, body=body, depth="0") 

2986 match resp.status: 

2987 case HTTPStatus.NOT_FOUND: 

2988 return result 

2989 case HTTPStatus.MULTI_STATUS: 

2990 property = self._propfind_parser.parse(resp.data)[0] 

2991 metadata = DavFileMetadata.from_property(base_url=self._base_url, property=property) 

2992 result.update( 

2993 { 

2994 "type": "directory" if metadata.is_dir else "file", 

2995 "size": metadata.size, 

2996 "last_modified": metadata.last_modified, 

2997 "checksums": metadata.checksums, 

2998 } 

2999 ) 

3000 return result 

3001 case _: 

3002 raise unexpected_status_error("PROPFIND", url, resp) 

3003 

3004 

3005class DavClientXrootD(DavClientURLSigner): 

3006 """Client for interacting with a XrootD webDAV server. 

3007 

3008 Instances of this class are thread-safe. 

3009 

3010 Parameters 

3011 ---------- 

3012 url : `str` 

3013 Root URL of the storage endpoint 

3014 (e.g. "https://host.example.org:1234/"). 

3015 config : `DavConfig` 

3016 Configuration to initialize this client. 

3017 accepts_ranges : `bool` | `None` 

3018 Indicate whether the remote server accepts the ``Range`` header in GET 

3019 requests. 

3020 """ 

3021 

3022 def __init__(self, url: str, config: DavConfig, accepts_ranges: bool | None = None) -> None: 

3023 super().__init__(url=url, config=config, accepts_ranges=accepts_ranges) 

3024 

3025 @override 

3026 def put( 

3027 self, 

3028 url: str, 

3029 headers: dict[str, str] | None = None, 

3030 data: BinaryIO | bytes = b"", 

3031 ) -> int | None: 

3032 # Docstring inherited. 

3033 # Send a PUT request with empty body to the XRootD frontend server to 

3034 # get redirected to the backend. 

3035 frontend_headers = {} if headers is None else dict(headers) 

3036 frontend_headers.update({"Content-Length": "0", "Expect": "100-continue"}) 

3037 for attempt in range(max_attempts := 3): 

3038 resp = self._put(url, headers=frontend_headers, body=b"", redirect=False) 

3039 if resp.status in ( 

3040 HTTPStatus.OK, 

3041 HTTPStatus.CREATED, 

3042 HTTPStatus.NO_CONTENT, 

3043 ): 

3044 redirect_url = url 

3045 break 

3046 elif resp.status in resp.REDIRECT_STATUSES: 

3047 redirect_url = resp.headers.get("Location") 

3048 break 

3049 elif resp.status == HTTPStatus.LOCKED: 

3050 # Sometimes XRootD servers respond with status code LOCKED and 

3051 # response body of the form: 

3052 # 

3053 # "Output file /path/to/file is already opened by 1 writer; 

3054 # open denied." 

3055 # 

3056 # If we get such a response, try again, unless we reached 

3057 # the maximum number of attempts. 

3058 if attempt == max_attempts - 1: 

3059 raise ValueError( 

3060 f"""Unexpected response to HTTP request PUT {resp.geturl()}: status {resp.status} """ 

3061 f"""{resp.reason} [{resp.data.decode()}] after {max_attempts} attempts""" 

3062 ) 

3063 

3064 # Wait a bit and try again 

3065 log.warning( 

3066 f"""got unexpected response status {HTTPStatus.LOCKED} Locked for PUT {resp.geturl()} """ 

3067 f"""(attempt {attempt}/{max_attempts}), retrying...""" 

3068 ) 

3069 time.sleep((attempt + 1) * 0.100) 

3070 continue 

3071 else: 

3072 raise unexpected_status_error("PUT", url, resp) 

3073 

3074 # We were redirected to a backend server. Upload the file contents to 

3075 # its final destination. 

3076 

3077 # XRootD backend servers typically use a single port number for 

3078 # accepting connections from clients. It is therefore beneficial 

3079 # to keep those connections open, if the server allows. 

3080 

3081 # Ask the server to compute and record a checksum of the uploaded 

3082 # file contents, for later integrity checks. Since we don't compute 

3083 # the digest ourselves while uploading the data, we cannot control 

3084 # after the request is complete that the data we uploaded is 

3085 # identical to the data recorded by the server, but at least the 

3086 # server has recorded a digest of the data it stored. 

3087 # 

3088 # See RFC-3230 for details and 

3089 # https://www.iana.org/assignments/http-dig-alg/http-dig-alg.xhtml 

3090 # for the list of supported digest algorithhms. 

3091 # 

3092 # In addition, note that not all servers implement this RFC so 

3093 # the checksum reqquest may be ignored by the server. 

3094 backend_headers = {} if headers is None else dict(headers) 

3095 if (checksum := self._config.request_checksum) is not None: 

3096 backend_headers.update({"Want-Digest": checksum}) 

3097 

3098 resp = self._put(redirect_url, body=data, headers=backend_headers) 

3099 match resp.status: 

3100 case HTTPStatus.OK | HTTPStatus.CREATED | HTTPStatus.NO_CONTENT: 

3101 # Send a HEAD request to retrieve the size of the file we 

3102 # just uploaded. 

3103 resp = self.head(redirect_url) 

3104 size = int(resp.headers.get("Content-Length", -1)) 

3105 return None if size == -1 else size 

3106 case _: 

3107 raise unexpected_status_error("PUT", redirect_url, resp) 

3108 

3109 @override 

3110 def info(self, url: str, name: str | None = None) -> dict[str, Any]: 

3111 # XRootD does not include checksums in the response to PROPFIND 

3112 # request. We need to send a specific HEAD request to retrieve 

3113 # the ADLER32 checksum. 

3114 # 

3115 # If found, the checksum is included in the response header "Digest", 

3116 # which is of the form: 

3117 # 

3118 # Digest: adler32=0e4709f2 

3119 result = super().info(url, name) 

3120 if result["type"] == "file": 

3121 headers: dict[str, str] = {"Want-Digest": "adler32"} 

3122 resp = self.head(url=url, headers=headers) 

3123 if (digest := resp.headers.get("Digest")) is not None: 

3124 value = digest.split("=")[1] 

3125 result["checksums"].update({"adler32": value}) 

3126 

3127 return result 

3128 

3129 @override 

3130 def write(self, url: str, data: BinaryIO | bytes) -> int | None: 

3131 """Create or rewrite a remote file at `url` with `data` as its 

3132 contents. 

3133 

3134 Parameters 

3135 ---------- 

3136 url : `str` 

3137 Target URL. 

3138 data : `bytes` 

3139 Sequence of bytes to upload. 

3140 

3141 Returns 

3142 ------- 

3143 size : `int | None` 

3144 The size in bytes of the file uploaded. Can be `None` if the size 

3145 could not be retrieved. 

3146 

3147 Notes 

3148 ----- 

3149 If a file already exists at `url` it will be rewritten. 

3150 """ 

3151 # XRootD will automatically create all the parent directories so we 

3152 # don't need to explicitly create them. Although this is not compliant 

3153 # to RFC 4918, this is advantageous because it avoids several 

3154 # round-trips to the server for creating all the directories 

3155 # before actually uploading the data. 

3156 try: 

3157 # Upload to a temporary file and rename to the final name. 

3158 temporary_url = self._make_temporary_url(url) 

3159 size = self.put(temporary_url, data=data) 

3160 self.rename(temporary_url, url, overwrite=True, create_parent=False) 

3161 

3162 # Update the file size cache with this size 

3163 self._file_size_cache.update_size(url, size) 

3164 return size 

3165 except Exception: 

3166 # Upload failed. Attempt to remove the temporary file. 

3167 self.delete(temporary_url) 

3168 raise 

3169 

3170 @override 

3171 def mkcol(self, url: str) -> None: 

3172 """Create a directory at `url`. 

3173 

3174 If a directory already exists at `url` no error is returned nor 

3175 exception is raised. An exception is raised if a file exists at `url`. 

3176 

3177 Parameters 

3178 ---------- 

3179 url : `str` 

3180 Target URL. 

3181 """ 

3182 # XRootD automatically creates all the intermediate directories. 

3183 resp = self._mkcol(url) 

3184 match resp.status: 

3185 case HTTPStatus.CREATED: 

3186 return 

3187 case HTTPStatus.METHOD_NOT_ALLOWED: 

3188 # XRootD returns "405 Method Not Allowed" when either a file 

3189 # or a directory already exists at `url` 

3190 stat = self.stat(url) 

3191 if stat.is_dir: 

3192 # A directory exists at `url`. Nothing more to do. 

3193 return 

3194 elif stat.is_file: 

3195 raise NotADirectoryError( 

3196 f"Can not create a directory because a file already exists at {resp.geturl()}" 

3197 ) 

3198 case _: 

3199 raise ValueError( 

3200 f"Can not create directory {resp.geturl()}: status {resp.status} {resp.reason}" 

3201 ) 

3202 

3203 @override 

3204 def stat(self, url: str) -> DavFileMetadata: 

3205 # Docstring inherited. 

3206 # XRootD v5.9.1 responds "200 OK" to a HEAD request against an 

3207 # existing file. When the target URL is a directory, it also responds 

3208 # "200 OK". In both cases the response header "Content-Length" 

3209 # is present but has different meaning. If the target URL is a file, 

3210 # the header value is the size in bytes of the file. If the target 

3211 # URL is a directory, the header value is the number of items in 

3212 # the directory. 

3213 # 

3214 # So there is not an easy way to determine if the target URL is a 

3215 # file or a directory from the response to a HEAD request. 

3216 # 

3217 # When the target URL is a directory and we ask for a digest, the 

3218 # server responds "409 Conflict". We use this behavior to 

3219 # discriminate between a file and a directory. 

3220 # 

3221 # Note that XRootD does not include the "Last-Modified" header in the 

3222 # response to a HEAD request so we cannot include the last modified 

3223 # time in the value returned by this method. 

3224 resp = self._head(url, headers={"Want-Digest": "adler32"}) 

3225 match resp.status: 

3226 case HTTPStatus.OK: 

3227 # There is a file at target URL 

3228 if "Content-Length" in resp.headers: 

3229 href = url.replace(self._base_url, "", 1) 

3230 size = int(resp.headers.get("Content-Length")) 

3231 return DavFileMetadata(self._base_url, href=href, exists=True, is_dir=False, size=size) 

3232 else: 

3233 raise ValueError( 

3234 f"""Expecting Content-Length header to be present in """ 

3235 f"""response to HTTP HEAD {resp.geturl()}: status {resp.status} """ 

3236 f"""{resp.reason} [{resp.data.decode()}] but could not find it""" 

3237 ) 

3238 case HTTPStatus.CONFLICT: 

3239 # There is a directory at target URL 

3240 href = url.replace(self._base_url, "", 1) 

3241 return DavFileMetadata(self._base_url, href=href, exists=True, is_dir=True) 

3242 case HTTPStatus.NOT_FOUND: 

3243 # There is neither a file nor a directory at target URL 

3244 return DavFileMetadata(base_url=url, exists=False) 

3245 case _: 

3246 raise unexpected_status_error("HEAD", url, resp) 

3247 

3248 

3249class DavFileMetadata: 

3250 """Container for attributes of interest of a webDAV file or directory. 

3251 

3252 Parameters 

3253 ---------- 

3254 base_url : `str` 

3255 Base URL. 

3256 href : `str`, optional 

3257 Path component that can be added to the base URL. 

3258 name : `str`, optional 

3259 Name. 

3260 exists : `bool`, optional 

3261 Whether file or directory exist. 

3262 size : `int`, optional 

3263 Size of file. 

3264 is_dir : `bool`, optional 

3265 Whether the URL points to a directory or file. 

3266 last_modified : `bool`, optional 

3267 Last modified date. 

3268 checksums : `dict` [ `str`, `str` ] | `None`, optional 

3269 Checksums. 

3270 """ 

3271 

3272 def __init__( 

3273 self, 

3274 base_url: str, 

3275 href: str = "", 

3276 name: str = "", 

3277 exists: bool = False, 

3278 size: int = -1, 

3279 is_dir: bool = False, 

3280 last_modified: datetime = datetime.min, 

3281 checksums: dict[str, str] | None = None, 

3282 ): 

3283 self._url: str = base_url if not href else base_url.rstrip("/") + href 

3284 self._href: str = href 

3285 self._name: str = name 

3286 self._exists: bool = exists 

3287 self._size: int = size 

3288 self._is_dir: bool = is_dir 

3289 self._last_modified: datetime = last_modified 

3290 self._checksums: dict[str, str] = {} if checksums is None else dict(checksums) 

3291 

3292 @staticmethod 

3293 def from_property(base_url: str, property: DavProperty) -> DavFileMetadata: 

3294 """Create an instance from the values in `property`. 

3295 

3296 Parameters 

3297 ---------- 

3298 base_url : `str` 

3299 Base URL. 

3300 property : `DavProperty` 

3301 Properties to associate with URL. 

3302 """ 

3303 return DavFileMetadata( 

3304 base_url=base_url, 

3305 href=property.href, 

3306 name=property.name, 

3307 exists=property.exists, 

3308 size=property.size, 

3309 is_dir=property.is_dir, 

3310 last_modified=property.last_modified, 

3311 checksums=dict(property.checksums), 

3312 ) 

3313 

3314 def __str__(self) -> str: 

3315 return ( 

3316 f"""{self._url} {self._href} {self._name} {self._exists} {self._size} {self._is_dir} """ 

3317 f"""{self._checksums}""" 

3318 ) 

3319 

3320 @property 

3321 def url(self) -> str: 

3322 return self._url 

3323 

3324 @property 

3325 def href(self) -> str: 

3326 return self._href 

3327 

3328 @property 

3329 def name(self) -> str: 

3330 return self._name 

3331 

3332 @property 

3333 def exists(self) -> bool: 

3334 return self._exists 

3335 

3336 @property 

3337 def size(self) -> int: 

3338 if not self._exists: 

3339 return -1 

3340 

3341 return 0 if self._is_dir else self._size 

3342 

3343 @property 

3344 def is_dir(self) -> bool: 

3345 return self._exists and self._is_dir 

3346 

3347 @property 

3348 def is_file(self) -> bool: 

3349 return self._exists and not self._is_dir 

3350 

3351 @property 

3352 def last_modified(self) -> datetime: 

3353 return self._last_modified 

3354 

3355 @property 

3356 def checksums(self) -> dict[str, str]: 

3357 return self._checksums 

3358 

3359 

3360class DavProperty: 

3361 """Helper class to encapsulate select live DAV properties of a single 

3362 resource, as retrieved via a PROPFIND request. 

3363 

3364 Parameters 

3365 ---------- 

3366 response : `eTree.Element` or `None` 

3367 The XML response defining the DAV property. 

3368 """ 

3369 

3370 # Regular expression to compare against the 'status' element of a 

3371 # PROPFIND response's 'propstat' element. 

3372 _status_ok_rex = re.compile(r"^HTTP/.* 200 .*$", re.IGNORECASE) 

3373 

3374 def __init__(self, response: eTree.Element | None): 

3375 self._href: str = "" 

3376 self._displayname: str = "" 

3377 self._collection: bool = False 

3378 self._getlastmodified: str = "" 

3379 self._getcontentlength: int = -1 

3380 self._checksums: dict[str, str] = {} 

3381 

3382 if response is not None: 

3383 self._parse(response) 

3384 

3385 def _parse(self, response: eTree.Element) -> None: 

3386 # Extract 'href'. 

3387 if (element := response.find("./{DAV:}href")) is not None: 

3388 # We need to use "str(element.text)"" instead of "element.text" to 

3389 # keep mypy happy. 

3390 self._href = str(element.text).strip() 

3391 else: 

3392 raise ValueError( 

3393 "Property 'href' expected but not found in PROPFIND response: " 

3394 f"{eTree.tostring(response, encoding='unicode')}" 

3395 ) 

3396 

3397 for propstat in response.findall("./{DAV:}propstat"): 

3398 # Only extract properties of interest with status OK. 

3399 status = propstat.find("./{DAV:}status") 

3400 if status is None or not self._status_ok_rex.match(str(status.text)): 

3401 continue 

3402 

3403 for prop in propstat.findall("./{DAV:}prop"): 

3404 # Parse "collection". 

3405 if (element := prop.find("./{DAV:}resourcetype/{DAV:}collection")) is not None: 

3406 self._collection = True 

3407 

3408 # Parse "getlastmodified". 

3409 if (element := prop.find("./{DAV:}getlastmodified")) is not None: 

3410 self._getlastmodified = str(element.text) 

3411 

3412 # Parse "getcontentlength". 

3413 if (element := prop.find("./{DAV:}getcontentlength")) is not None: 

3414 self._getcontentlength = int(str(element.text)) 

3415 

3416 # Parse "displayname". 

3417 if (element := prop.find("./{DAV:}displayname")) is not None: 

3418 self._displayname = str(element.text) 

3419 

3420 # Parse "Checksums" 

3421 if (element := prop.find("./{http://www.dcache.org/2013/webdav}Checksums")) is not None: 

3422 self._checksums = self._parse_checksums(element.text) 

3423 

3424 # Some webDAV servers don't include the 'displayname' property in the 

3425 # response so try to infer it from the value of the 'href' property. 

3426 # Depending on the server the href value may end with '/'. 

3427 if not self._displayname: 

3428 self._displayname = os.path.basename(self._href.rstrip("/")) 

3429 

3430 # Some webDAV servers do not append a "/" to the href of directories. 

3431 # Ensure we include a single final "/" in our response. 

3432 if self._collection: 

3433 self._href = self._href.rstrip("/") + "/" 

3434 

3435 # Force a size of 0 for collections. 

3436 if self._collection: 

3437 self._getcontentlength = 0 

3438 

3439 def _parse_checksums(self, checksums: str | None) -> dict[str, str]: 

3440 # checksums argument is of the form 

3441 # md5=MyS/wljSzI9WYiyrsuyoxw==,adler32=23b104f2 

3442 result: dict[str, str] = {} 

3443 if checksums is not None: 

3444 for checksum in checksums.split(","): 

3445 if (pos := checksum.find("=")) != -1: 

3446 algorithm, value = (checksum[:pos].lower(), checksum[pos + 1 :]) 

3447 if algorithm == "md5": 

3448 # dCache documentation about how it encodes the 

3449 # MD5 checksum: 

3450 # 

3451 # https://www.dcache.org/manuals/UserGuide-10.2/webdav.shtml#checksums 

3452 result[algorithm] = bytes.hex(base64.standard_b64decode(value)) 

3453 else: 

3454 result[algorithm] = value 

3455 

3456 return result 

3457 

3458 @property 

3459 def exists(self) -> bool: 

3460 # It is either a directory or a file with length of at least zero 

3461 return self._collection or self._getcontentlength >= 0 

3462 

3463 @property 

3464 def is_dir(self) -> bool: 

3465 return self._collection 

3466 

3467 @property 

3468 def is_file(self) -> bool: 

3469 return not self._collection 

3470 

3471 @property 

3472 def last_modified(self) -> datetime: 

3473 if not self._getlastmodified: 

3474 return datetime.min 

3475 

3476 # Last modified timestamp is of the form: 

3477 # 'Wed, 12 Mar 2025 10:11:13 GMT' 

3478 return datetime.strptime(self._getlastmodified, "%a, %d %b %Y %H:%M:%S %Z").replace(tzinfo=UTC) 

3479 

3480 @property 

3481 def size(self) -> int: 

3482 return self._getcontentlength 

3483 

3484 @property 

3485 def name(self) -> str: 

3486 return self._displayname 

3487 

3488 @property 

3489 def href(self) -> str: 

3490 return self._href 

3491 

3492 @property 

3493 def checksums(self) -> dict[str, str]: 

3494 return self._checksums 

3495 

3496 

3497class DavPropfindParser: 

3498 """Helper class to parse the response body of a PROPFIND request.""" 

3499 

3500 def __init__(self) -> None: 

3501 return 

3502 

3503 def parse(self, body: bytes) -> list[DavProperty]: 

3504 """Parse the XML-encoded contents of the response body to a webDAV 

3505 PROPFIND request. 

3506 

3507 Parameters 

3508 ---------- 

3509 body : `bytes` 

3510 XML-encoded response body to a PROPFIND request. 

3511 

3512 Returns 

3513 ------- 

3514 responses : `list` [ `DavProperty` ] 

3515 Parsed content of the response. 

3516 

3517 Notes 

3518 ----- 

3519 Is is expected that there is at least one reponse in `body`, otherwise 

3520 this function raises. 

3521 """ 

3522 # A response body to a PROPFIND request is of the form (indented for 

3523 # readability): 

3524 # 

3525 # <?xml version="1.0" encoding="UTF-8"?> 

3526 # <D:multistatus xmlns:D="DAV:"> 

3527 # <D:response> 

3528 # <D:href>path/to/resource</D:href> 

3529 # <D:propstat> 

3530 # <D:prop> 

3531 # <D:resourcetype> 

3532 # <D:collection xmlns:D="DAV:"/> 

3533 # </D:resourcetype> 

3534 # <D:getlastmodified> 

3535 # Fri, 27 Jan 2 023 13:59:01 GMT 

3536 # </D:getlastmodified> 

3537 # <D:getcontentlength> 

3538 # 12345 

3539 # </D:getcontentlength> 

3540 # </D:prop> 

3541 # <D:status> 

3542 # HTTP/1.1 200 OK 

3543 # </D:status> 

3544 # </D:propstat> 

3545 # </D:response> 

3546 # <D:response> 

3547 # ... 

3548 # </D:response> 

3549 # <D:response> 

3550 # ... 

3551 # </D:response> 

3552 # </D:multistatus> 

3553 

3554 # Scan all the 'response' elements and extract the relevant properties 

3555 decoded_body: str = body.decode().strip() 

3556 responses = [] 

3557 multistatus = eTree.fromstring(decoded_body) 

3558 for response in multistatus.findall("./{DAV:}response"): 

3559 responses.append(DavProperty(response)) 

3560 

3561 if responses: 

3562 return responses 

3563 else: 

3564 # Could not parse the body 

3565 raise ValueError(f"Unable to parse response for PROPFIND request: {decoded_body}") 

3566 

3567 

3568class Authorizer: 

3569 """Base class for attaching an 'Authorization' header to a HTTP request.""" 

3570 

3571 def set_authorization(self, headers: dict[str, str]) -> None: 

3572 """Add the 'Authorization' header to `headers`. 

3573 

3574 Parameters 

3575 ---------- 

3576 headers : `dict` [ `str`, `str` ] 

3577 Dict to augment with authorization information. 

3578 

3579 Notes 

3580 ----- 

3581 This method must be implemented by concrete subclasses. 

3582 """ 

3583 raise NotImplementedError 

3584 

3585 def _is_file_protected(self, filepath: str) -> bool: 

3586 """Return true if the permissions of file at `filepath` only allow for 

3587 access by its owner. 

3588 

3589 Parameters 

3590 ---------- 

3591 filepath : `str` 

3592 Path of a local file. 

3593 """ 

3594 if not os.path.isfile(filepath): 

3595 return False 

3596 

3597 mode = stat.S_IMODE(os.stat(filepath).st_mode) 

3598 owner_accessible = bool(mode & stat.S_IRWXU) 

3599 group_accessible = bool(mode & stat.S_IRWXG) 

3600 other_accessible = bool(mode & stat.S_IRWXO) 

3601 return owner_accessible and not group_accessible and not other_accessible 

3602 

3603 def _read_if_modified_since( 

3604 self, filename: str | None, timestamp: float 

3605 ) -> tuple[str, float] | tuple[None, None]: 

3606 """Read local file `filename` if its modification time is more 

3607 recent than `timestamp`. 

3608 

3609 Parameters 

3610 ---------- 

3611 filename : `str`, optional 

3612 Path of a local file. 

3613 

3614 timestamp: `float`, optional 

3615 Timestamp to compare against the last modification time of 

3616 `filename`. The contents of file at `filename` is only read if its 

3617 modification time is more recent than `timestamp`. 

3618 

3619 Returns 

3620 ------- 

3621 result: `tuple[str, float]` 

3622 tuple of (contents of file `filename`, timestamp of the read 

3623 operation). 

3624 

3625 If `filename` is `None`, the returned value is `tuple[None, None]`. 

3626 """ 

3627 if filename is None: 

3628 return (None, None) 

3629 

3630 if os.stat(filename).st_mtime < timestamp: 

3631 return (None, None) 

3632 

3633 with open(filename) as file: 

3634 time_of_last_read = time.time() 

3635 return (file.read().rstrip("\n"), time_of_last_read) 

3636 

3637 

3638class TokenAuthorizer(Authorizer): 

3639 """Attach a bearer token 'Authorization' header to each request. 

3640 

3641 Parameters 

3642 ---------- 

3643 token : `str` 

3644 Can be either the path to a local file which contains the 

3645 value of the token or the token itself. If `token` is a file 

3646 it must be protected so that only the owner can read and write it. 

3647 """ 

3648 

3649 def __init__(self, token: str | None = None) -> None: 

3650 self._token = self._token_path = None 

3651 self._time_of_last_read: float = -1.0 

3652 if token is None: 

3653 return 

3654 

3655 self._token = token 

3656 if os.path.isfile(token): 

3657 self._token_path = os.path.abspath(token) 

3658 if not self._is_file_protected(self._token_path): 

3659 raise PermissionError( 

3660 f"""Authorization token file at {self._token_path} must be protected for access only """ 

3661 """by its owner""" 

3662 ) 

3663 self._update_token() 

3664 

3665 def _update_token(self) -> None: 

3666 """Read the token file (if any) if its modification time is more recent 

3667 than the last time we read it. 

3668 """ 

3669 if self._token_path is None: 

3670 return None 

3671 

3672 token, time_of_last_read = self._read_if_modified_since(self._token_path, self._time_of_last_read) 

3673 if token is None or time_of_last_read is None: 

3674 return 

3675 

3676 # Update the token value and the last time we read it. 

3677 self._token = token 

3678 self._time_of_last_read = time_of_last_read 

3679 

3680 @override 

3681 def set_authorization(self, headers: dict[str, str]) -> None: 

3682 """Add the 'Authorization' header to `headers`. 

3683 

3684 Parameters 

3685 ---------- 

3686 headers : `dict` [ `str`, `str` ] 

3687 Dict to augment with authorization information. 

3688 """ 

3689 if self._token is None: 

3690 return 

3691 

3692 self._update_token() 

3693 headers["Authorization"] = f"Bearer {self._token}" 

3694 

3695 

3696class BasicAuthorizer(Authorizer): 

3697 """Attach a 'Authorization' header to each request using Basic 

3698 authentication. 

3699 

3700 Parameters 

3701 ---------- 

3702 user_name : `str` 

3703 Can be either the path to a local file which contains the 

3704 user name or the user name itself. If `user_name` is a file 

3705 it must be protected so that only the owner can read and write it. 

3706 user_password : `str` 

3707 Can be either the path to a local file which contains the 

3708 value of the password or the password itself. If `user_password` is a 

3709 file it must be protected so that only the owner can read and write it. 

3710 """ 

3711 

3712 def __init__(self, user_name: str | None = None, user_password: str | None = None) -> None: 

3713 if user_name is None or user_password is None: 

3714 return 

3715 

3716 self._user_name: str | None = user_name 

3717 self._user_password: str | None = user_password 

3718 self._user_password_path: str | None = None 

3719 self._time_of_last_read: float = -1.0 

3720 self._header_value: str = "" 

3721 

3722 if os.path.isfile(self._user_password): 

3723 # The value in `user_password` is the path to a file. Check 

3724 # the file is protected and read its contents. 

3725 self._user_password_path = os.path.abspath(self._user_password) 

3726 if not self._is_file_protected(self._user_password_path): 

3727 raise PermissionError( 

3728 f"""Password file at {self._user_password_path} must be protected for access only """ 

3729 """by its owner""" 

3730 ) 

3731 self._update_password() 

3732 else: 

3733 self._update_header_value() 

3734 

3735 def _update_header_value(self) -> None: 

3736 """Compute the value of the 'Authorization' header using HTTP basic 

3737 authorization. 

3738 """ 

3739 basic_auth_header = make_headers(basic_auth=f"{self._user_name}:{self._user_password}") 

3740 self._header_value = basic_auth_header["authorization"] 

3741 

3742 def _update_password(self) -> None: 

3743 """Update the password of this authorizer if the file it is stored in 

3744 has been modified since the last time we read it. 

3745 """ 

3746 if self._user_password_path is None: 

3747 return None 

3748 

3749 password, time_of_last_read = self._read_if_modified_since( 

3750 self._user_password_path, self._time_of_last_read 

3751 ) 

3752 if password is None or time_of_last_read is None: 

3753 return 

3754 

3755 # Update the password, the last time we read it and re-compute the 

3756 # value of the "Authorization" header. 

3757 self._user_password = password 

3758 self._time_of_last_read = time_of_last_read 

3759 self._update_header_value() 

3760 

3761 @override 

3762 def set_authorization(self, headers: dict[str, str]) -> None: 

3763 """Add the 'Authorization' header to `headers`. 

3764 

3765 Parameters 

3766 ---------- 

3767 headers : `dict` [ `str`, `str` ] 

3768 Dict to augment with authorization information. 

3769 """ 

3770 if self._user_name is None or self._user_password is None: 

3771 return 

3772 

3773 self._update_password() 

3774 headers["Authorization"] = self._header_value 

3775 

3776 

3777def expand_vars(path: str | None) -> str | None: 

3778 """Expand the environment variables in `path` and return the path with 

3779 the value of the variable expanded. 

3780 

3781 Parameters 

3782 ---------- 

3783 path : `str` or `None` 

3784 Abolute or relative path which may include an environment variable 

3785 (e.g. '$HOME/path/to/my/file'). 

3786 

3787 Returns 

3788 ------- 

3789 path: `str` 

3790 The path with the values of the environment variables expanded. 

3791 """ 

3792 return None if path is None else os.path.expandvars(path) 

3793 

3794 

3795def dump_response(method: str, resp: HTTPResponse, dump_body: bool = False) -> None: 

3796 """Dump response for debugging purposes. 

3797 

3798 Parameters 

3799 ---------- 

3800 method : `str` 

3801 Method name to include in log output. 

3802 resp : `HTTPResponse` 

3803 Response to dump. 

3804 dump_body : `bool`, optional 

3805 Whether or not to issue a debug log message. 

3806 """ 

3807 log.debug("%s %s", method, resp.geturl()) 

3808 log.debug(" %s %s", resp.status, resp.reason) 

3809 

3810 for header, value in resp.headers.items(): 

3811 log.debug(" %s: %s", header, value) 

3812 

3813 if dump_body: 

3814 log.debug(" response body length: %d", len(resp.data.decode()))