Coverage for python / lsst / resources / http.py: 21%

799 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-17 08:44 +0000

1# This file is part of lsst-resources. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14__all__ = ("HttpResourcePath",) 

15 

16import contextlib 

17import datetime 

18import enum 

19import functools 

20import io 

21import json 

22import logging 

23import math 

24import os 

25import os.path 

26import random 

27import re 

28import ssl 

29import stat 

30from collections.abc import Iterator 

31from email.utils import parsedate_to_datetime 

32from typing import TYPE_CHECKING, Any, BinaryIO, cast 

33 

34try: 

35 # Prefer 'defusedxml' (not part of standard library) if available, since 

36 # 'xml' is vulnerable to XML bombs. 

37 import defusedxml.ElementTree as eTree 

38except ImportError: 

39 import xml.etree.ElementTree as eTree 

40 

41try: 

42 import fsspec 

43 from aiohttp import ClientSession, ClientTimeout, TCPConnector 

44 from fsspec.implementations.http import HTTPFileSystem 

45 from fsspec.spec import AbstractFileSystem 

46except ImportError: 

47 fsspec = None 

48 AbstractFileSystem = type 

49 HTTPFileSystem = type 

50 

51from urllib.parse import parse_qs 

52 

53import requests 

54from astropy import units as u 

55from requests.adapters import HTTPAdapter 

56from requests.auth import AuthBase 

57from urllib3.util.retry import Retry 

58 

59from lsst.utils.timer import time_this 

60 

61from ._resourceHandles import ResourceHandleProtocol 

62from ._resourceHandles._httpResourceHandle import HttpReadResourceHandle, parse_content_range_header 

63from ._resourcePath import ResourceInfo, ResourcePath 

64from .utils import _get_num_workers, get_tempdir 

65 

66if TYPE_CHECKING: 

67 from .utils import TransactionProtocol 

68 

69log = logging.getLogger(__name__) 

70 

71 

72def _timeout_from_environment(env_var: str, default_value: float) -> float: 

73 """Convert and return a timeout from the value of an environment variable 

74 or a default value if the environment variable is not initialized. The 

75 value of `env_var` must be a valid `float` otherwise this function raises. 

76 

77 Parameters 

78 ---------- 

79 env_var : `str` 

80 Environment variable to look for. 

81 default_value : `float`` 

82 Value to return if `env_var` is not defined in the environment. 

83 

84 Returns 

85 ------- 

86 _timeout_from_environment : `float` 

87 Converted value. 

88 """ 

89 try: 

90 timeout = float(os.environ.get(env_var, default_value)) 

91 except ValueError: 

92 raise ValueError( 

93 f"Expecting valid timeout value in environment variable {env_var} but found " 

94 f"{os.environ.get(env_var)}" 

95 ) from None 

96 

97 if math.isnan(timeout): 

98 raise ValueError(f"Unexpected timeout value NaN found in environment variable {env_var}") 

99 

100 return timeout 

101 

102 

103@functools.lru_cache 

104def _calc_tmpdir_buffer_size(tmpdir: str) -> int: 

105 """Compute the block size as 256 blocks of typical size 

106 (i.e. 4096 bytes) or 10 times the file system block size, 

107 whichever is higher. 

108 

109 This is a reasonable compromise between 

110 using memory for buffering and the number of system calls 

111 issued to read from or write to temporary files. 

112 """ 

113 fsstats = os.statvfs(tmpdir) 

114 return max(10 * fsstats.f_bsize, 256 * 4096) 

115 

116 

117class HttpResourcePathConfig: 

118 """Configuration class to encapsulate the configurable items used by class 

119 HttpResourcePath. 

120 """ 

121 

122 # Default timeouts for all HTTP requests (seconds). 

123 DEFAULT_TIMEOUT_CONNECT: float = 60.0 

124 DEFAULT_TIMEOUT_READ: float = 1_500.0 

125 

126 # Default lower and upper bounds for the backoff interval (seconds). 

127 # A value in this interval is randomly selected as the backoff factor when 

128 # requests need to be retried. 

129 DEFAULT_BACKOFF_MIN: float = 1.0 

130 DEFAULT_BACKOFF_MAX: float = 3.0 

131 

132 # Default number of connections to persist with both the front end and 

133 # back end servers. 

134 DEFAULT_FRONTEND_PERSISTENT_CONNECTIONS: int = 2 

135 DEFAULT_BACKEND_PERSISTENT_CONNECTIONS: int = 1 

136 

137 # Accepted digest algorithms 

138 ACCEPTED_DIGESTS: list[str] = ["adler32", "md5", "sha-256", "sha-512"] 

139 

140 def __init__(self) -> None: 

141 self._front_end_connections: int | None = None 

142 self._back_end_connections: int | None = None 

143 self._digest_algorithm: str | None = None 

144 self._send_expect_on_put: bool | None = None 

145 self._fsspec_is_enabled: bool | None = None 

146 self._timeout: tuple[float, float] | None = None 

147 self._collect_memory_usage: bool | None = None 

148 self._backoff_min: float | None = None 

149 self._backoff_max: float | None = None 

150 self._ca_bundle: str | None = "" 

151 self._client_token: str | None = "" 

152 self._client_cert: str | None = "" 

153 self._client_key: str | None = "" 

154 self._tmpdir_buffersize: tuple[str, int] | None = None 

155 self._ssl_context: ssl.SSLContext | None = None 

156 

157 @property 

158 def front_end_connections(self) -> int: 

159 """Number of persistent connections to the front end server.""" 

160 if self._front_end_connections is not None: 160 ↛ 161line 160 didn't jump to line 161 because the condition on line 160 was never true

161 return self._front_end_connections 

162 

163 default_pool_size = max(_get_num_workers(), self.DEFAULT_FRONTEND_PERSISTENT_CONNECTIONS) 

164 

165 try: 

166 self._front_end_connections = int( 

167 os.environ.get("LSST_HTTP_FRONTEND_PERSISTENT_CONNECTIONS", default_pool_size) 

168 ) 

169 except ValueError: 

170 self._front_end_connections = default_pool_size 

171 

172 return self._front_end_connections 

173 

174 @property 

175 def back_end_connections(self) -> int: 

176 """Number of persistent connections to the back end servers.""" 

177 if self._back_end_connections is not None: 177 ↛ 178line 177 didn't jump to line 178 because the condition on line 177 was never true

178 return self._back_end_connections 

179 

180 default_pool_size = max(_get_num_workers(), self.DEFAULT_FRONTEND_PERSISTENT_CONNECTIONS) 

181 

182 try: 

183 self._back_end_connections = int( 

184 os.environ.get("LSST_HTTP_BACKEND_PERSISTENT_CONNECTIONS", default_pool_size) 

185 ) 

186 except ValueError: 

187 self._back_end_connections = default_pool_size 

188 

189 return self._back_end_connections 

190 

191 @property 

192 def digest_algorithm(self) -> str: 

193 """Algorithm to ask the server to use for computing and recording 

194 digests of each file contents in PUT requests. 

195 

196 Returns 

197 ------- 

198 digest_algorithm: `str` 

199 The name of a digest algorithm or the empty string if no algotihm 

200 is configured. 

201 """ 

202 if self._digest_algorithm is not None: 

203 return self._digest_algorithm 

204 

205 digest = os.environ.get("LSST_HTTP_DIGEST", "").lower() 

206 if digest not in self.ACCEPTED_DIGESTS: 

207 digest = "" 

208 

209 self._digest_algorithm = digest 

210 return self._digest_algorithm 

211 

212 @property 

213 def send_expect_on_put(self) -> bool: 

214 """Return True if a "Expect: 100-continue" header is to be sent to 

215 the server on each PUT request. 

216 

217 Some servers (e.g. dCache) uses this information as an indication that 

218 the client knows how to handle redirects to the specific server that 

219 will actually receive the data for PUT requests. 

220 """ 

221 if self._send_expect_on_put is not None: 

222 return self._send_expect_on_put 

223 

224 self._send_expect_on_put = "LSST_HTTP_PUT_SEND_EXPECT_HEADER" in os.environ 

225 return self._send_expect_on_put 

226 

227 @property 

228 def fsspec_is_enabled(self) -> bool: 

229 """Return True if `fsspec` is enabled for objects of class 

230 HttpResourcePath. 

231 

232 To determine if `fsspec` is enabled, this method inspects the presence 

233 of the environment variable `LSST_HTTP_ENABLE_FSSPEC` (with any value). 

234 """ 

235 if self._fsspec_is_enabled is not None: 

236 return self._fsspec_is_enabled 

237 

238 self._fsspec_is_enabled = "LSST_HTTP_ENABLE_FSSPEC" in os.environ 

239 return self._fsspec_is_enabled 

240 

241 @property 

242 def timeout(self) -> tuple[float, float]: 

243 """Return a tuple with the values of timeouts for connecting to the 

244 server and reading its response, respectively. Both values are in 

245 seconds. 

246 """ 

247 if self._timeout is not None: 

248 return self._timeout 

249 

250 self._timeout = ( 

251 _timeout_from_environment("LSST_HTTP_TIMEOUT_CONNECT", self.DEFAULT_TIMEOUT_CONNECT), 

252 _timeout_from_environment("LSST_HTTP_TIMEOUT_READ", self.DEFAULT_TIMEOUT_READ), 

253 ) 

254 return self._timeout 

255 

256 @property 

257 def collect_memory_usage(self) -> bool: 

258 """Return true if we want to collect memory usage when timing 

259 operations against the remote server via the `lsst.utils.time_this` 

260 context manager. 

261 """ 

262 if self._collect_memory_usage is not None: 

263 return self._collect_memory_usage 

264 

265 self._collect_memory_usage = "LSST_HTTP_COLLECT_MEMORY_USAGE" in os.environ 

266 return self._collect_memory_usage 

267 

268 @property 

269 def backoff_min(self) -> float: 

270 """Lower bound of the interval from which a backoff factor is randomly 

271 selected when retrying requests (seconds). 

272 """ 

273 if self._backoff_min is not None: 

274 return self._backoff_min 

275 

276 self._backoff_min = self.DEFAULT_BACKOFF_MIN 

277 try: 

278 backoff_min = float(os.environ.get("LSST_HTTP_BACKOFF_MIN", self.DEFAULT_BACKOFF_MIN)) 

279 if not math.isnan(backoff_min): 279 ↛ 284line 279 didn't jump to line 284 because the condition on line 279 was always true

280 self._backoff_min = backoff_min 

281 except ValueError: 

282 pass 

283 

284 return self._backoff_min 

285 

286 @property 

287 def backoff_max(self) -> float: 

288 """Upper bound of the interval from which a backoff factor is randomly 

289 selected when retrying requests (seconds). 

290 """ 

291 if self._backoff_max is not None: 

292 return self._backoff_max 

293 

294 self._backoff_max = self.DEFAULT_BACKOFF_MAX 

295 try: 

296 backoff_max = float(os.environ.get("LSST_HTTP_BACKOFF_MAX", self.DEFAULT_BACKOFF_MAX)) 

297 if not math.isnan(backoff_max): 297 ↛ 302line 297 didn't jump to line 302 because the condition on line 297 was always true

298 self._backoff_max = backoff_max 

299 except ValueError: 

300 pass 

301 

302 return self._backoff_max 

303 

304 @property 

305 def ca_bundle(self) -> str | None: 

306 """Local path to the certificate bundle file or directory where the 

307 certifcates of the trusted authorities are located. 

308 

309 Return None if this host's system certificate bundle should be 

310 used for authenticating remote servers' certificates. 

311 """ 

312 if self._ca_bundle != "": 

313 return self._ca_bundle 

314 

315 # If a bundle was specified via the environment variable 

316 # 'LSST_HTTP_CACERT_BUNDLE' use it. 

317 self._ca_bundle = os.getenv("LSST_HTTP_CACERT_BUNDLE") 

318 return self._ca_bundle 

319 

320 @property 

321 def client_token(self) -> str | None: 

322 """Value of a bearer token or path to a local file which contains 

323 the bearer token to use for authenticating the client when sending 

324 requests to the webDAV or HTTP server. 

325 

326 Return None if no bearer token is configured in the environment. 

327 """ 

328 if self._client_token != "": 

329 return self._client_token 

330 

331 # If environment variable LSST_HTTP_AUTH_BEARER_TOKEN is 

332 # initialized use its value as the bearer token. 

333 self._client_token = os.getenv("LSST_HTTP_AUTH_BEARER_TOKEN") 

334 return self._client_token 

335 

336 @property 

337 def client_cert_key(self) -> tuple[str | None, str | None]: 

338 """Paths to a local file where the client certificate and associated 

339 private key are located. 

340 

341 Return a tuple (client certificate, private key) or (None, None) if no 

342 client certificate is configured via environment variables. 

343 """ 

344 if self._client_cert != "" and self._client_key != "": 

345 return (self._client_cert, self._client_key) 

346 

347 # If the environment variables LSST_HTTP_AUTH_CLIENT_CERT 

348 # and LSST_HTTP_AUTH_CLIENT_KEY are initialized use their values. 

349 self._client_cert = os.getenv("LSST_HTTP_AUTH_CLIENT_CERT") 

350 self._client_key = os.getenv("LSST_HTTP_AUTH_CLIENT_KEY") 

351 if self._client_cert and self._client_key: 

352 if not _is_protected(self._client_key): 

353 raise PermissionError( 

354 f"Private key file at {self._client_key} must be protected for access only by its owner" 

355 ) 

356 return (self._client_cert, self._client_key) 

357 

358 # If only the certificate was provided raise. 

359 if self._client_cert: 

360 raise ValueError( 

361 "Environment variable LSST_HTTP_AUTH_CLIENT_KEY must be set to client private key file path" 

362 ) 

363 

364 # If only the private key was provided raise. 

365 if self._client_key: 

366 raise ValueError( 

367 "Environment variable LSST_HTTP_AUTH_CLIENT_CERT must be set to client certificate file path" 

368 ) 

369 

370 # If a X.509 user proxy is available, use it as client credentials. 

371 self._client_cert = self._client_key = os.getenv("X509_USER_PROXY") 

372 return (self._client_cert, self._client_key) 

373 

374 @property 

375 def tmpdir_buffersize(self) -> tuple[str, int]: 

376 """Return the path to a temporary directory and the preferred buffer 

377 size to use when reading or writing files in that directory. 

378 """ 

379 if self._tmpdir_buffersize is not None: 

380 return self._tmpdir_buffersize 

381 

382 tmpdir = get_tempdir() 

383 

384 # Compute the block size as 256 blocks of typical size 

385 # (i.e. 4096 bytes) or 10 times the file system block size, 

386 # whichever is higher. This is a reasonable compromise between 

387 # using memory for buffering and the number of system calls 

388 # issued to read from or write to temporary files. 

389 bufsize = _calc_tmpdir_buffer_size(tmpdir) 

390 self._tmpdir_buffersize = (tmpdir, bufsize) 

391 

392 return self._tmpdir_buffersize 

393 

394 @property 

395 def ssl_context(self) -> ssl.SSLContext: 

396 """Return an SSL context equiped with the certificates of the trusted 

397 authorities. 

398 """ 

399 if self._ssl_context is None: 

400 self._ssl_context = ssl.create_default_context() 

401 if self.ca_bundle is not None: 

402 if os.path.isdir(self.ca_bundle): 

403 self._ssl_context.load_verify_locations(capath=self.ca_bundle) 

404 elif os.path.isfile(self.ca_bundle): 

405 self._ssl_context.load_verify_locations(cafile=self.ca_bundle) 

406 

407 return self._ssl_context 

408 

409 

410@functools.lru_cache 

411def _get_dav_and_server_headers(path: ResourcePath | str) -> tuple[str | None, str | None]: 

412 """Retrieve the "DAV" and "Server" headers sent by the remote server as 

413 part of the response to a single "OPTIONS" HTTP request. 

414 

415 Parameters 

416 ---------- 

417 path : `ResourcePath` or `str` 

418 URL to the resource to be checked. 

419 Should preferably refer to the root since the status is shared 

420 by all paths in that server. 

421 

422 Returns 

423 ------- 

424 _get_dav_and_server_headers : `tuple[str|None, str|None]` 

425 Values of the "DAV" and "Server" headers found in the response or 

426 None if any of those headers was not part of the response. 

427 """ 

428 try: 

429 if not isinstance(path, HttpResourcePath): 

430 path = HttpResourcePath(path) 

431 

432 config = HttpResourcePathConfig() 

433 with SessionStore(config=config).get(path) as session: 

434 resp = session.options( 

435 str(path), stream=False, timeout=config.timeout, headers=path._extra_headers 

436 ) 

437 

438 dav_header = server_header = None 

439 if resp.status_code == requests.codes.ok: 

440 dav_header = resp.headers.get("DAV") if "DAV" in resp.headers else None 

441 server_header = resp.headers.get("Server") if "Server" in resp.headers else None 

442 

443 return (dav_header, server_header) 

444 

445 except requests.exceptions.SSLError as e: 

446 log.warning( 

447 "Environment variable LSST_HTTP_CACERT_BUNDLE can be used to " 

448 "specify the path to a bundle of certificate authorities you trust " 

449 "which are not included in the default set of trusted authorities " 

450 "of this system." 

451 ) 

452 raise e 

453 

454 

455class BearerTokenAuth(AuthBase): 

456 """Attach a bearer token 'Authorization' header to each request. 

457 

458 Parameters 

459 ---------- 

460 token : `str` 

461 Can be either the path to a local protected file which contains the 

462 value of the token or the token itself. 

463 """ 

464 

465 def __init__(self, token: str): 

466 self._token = self._path = None 

467 self._mtime: float = -1.0 

468 if not token: 

469 return 

470 

471 self._token = token 

472 if os.path.isfile(token): 

473 self._path = os.path.abspath(token) 

474 if not _is_protected(self._path): 

475 raise PermissionError( 

476 f"Bearer token file at {self._path} must be protected for access only by its owner" 

477 ) 

478 self._refresh() 

479 

480 def _refresh(self) -> None: 

481 """Read the token file (if any) if its modification time is more recent 

482 than the last time we read it. 

483 """ 

484 if not self._path: 

485 return 

486 

487 if (mtime := os.stat(self._path).st_mtime) > self._mtime: 

488 log.debug("Reading bearer token file at %s", self._path) 

489 self._mtime = mtime 

490 with open(self._path) as f: 

491 self._token = f.read().rstrip("\n") 

492 

493 def __call__(self, req: requests.PreparedRequest) -> requests.PreparedRequest: 

494 # Only add a bearer token to a request when using secure HTTP. 

495 if req.url and req.url.lower().startswith("https://") and self._token: 

496 self._refresh() 

497 req.headers["Authorization"] = f"Bearer {self._token}" 

498 return req 

499 

500 

501class SessionStore: 

502 """Cache a reusable HTTP client session per endpoint. 

503 

504 Parameters 

505 ---------- 

506 config : `HttpResourcePathConfig` 

507 Configuration items shared by all instances of HttpResourcePath. 

508 num_pools : `int`, optional 

509 Number of connection pools to keep: there is one pool per remote 

510 host. 

511 max_persistent_connections : `int`, optional 

512 Maximum number of connections per remote host to persist in each 

513 connection pool. 

514 backoff_min : `float`, optional 

515 Minimum value of the interval to compute the exponential 

516 backoff factor when retrying requests (seconds). 

517 backoff_max : `float`, optional 

518 Maximum value of the interval to compute the exponential 

519 backoff factor when retrying requests (seconds). 

520 """ 

521 

522 def __init__( 

523 self, 

524 config: HttpResourcePathConfig, 

525 num_pools: int = 10, 

526 max_persistent_connections: int = 1, 

527 backoff_min: float = 1.0, 

528 backoff_max: float = 3.0, 

529 ) -> None: 

530 # Dictionary to store the session associated to a given URI. The key 

531 # of the dictionary is a root URI and the value is the session. 

532 self._sessions: dict[str, requests.Session] = {} 

533 

534 # Configuration for all instances of HttpResourcePath objects. 

535 self._config = config 

536 

537 # See documentation of urllib3 PoolManager class: 

538 # https://urllib3.readthedocs.io 

539 self._num_pools: int = num_pools 

540 

541 # See urllib3 Advanced Usage documentation: 

542 # https://urllib3.readthedocs.io/en/stable/advanced-usage.html 

543 self._max_persistent_connections: int = max_persistent_connections 

544 

545 # Minimum and maximum values of the interval to compute the exponential 

546 # backoff factor when retrying requests (seconds). 

547 self._backoff_min: float = backoff_min 

548 self._backoff_max: float = backoff_max if backoff_max > backoff_min else backoff_min + 1.0 

549 

550 def clear(self) -> None: 

551 """Destroy all previously created sessions and attempt to close 

552 underlying idle network connections. 

553 """ 

554 # Close all sessions and empty the store. Idle network connections 

555 # should be closed as a consequence. We don't have means through 

556 # the API exposed by Requests to actually force closing the 

557 # underlying open sockets. 

558 for session in self._sessions.values(): 

559 session.close() 

560 

561 self._sessions.clear() 

562 

563 def get(self, rpath: ResourcePath) -> requests.Session: 

564 """Retrieve a session for accessing the remote resource at rpath. 

565 

566 Parameters 

567 ---------- 

568 rpath : `ResourcePath` 

569 URL to a resource at the remote server for which a session is to 

570 be retrieved. 

571 

572 Notes 

573 ----- 

574 Once a session is created for a given endpoint it is cached and 

575 returned every time a session is requested for any path under that same 

576 endpoint. For instance, a single session will be cached and shared 

577 for paths "https://www.example.org/path/to/file" and 

578 "https://www.example.org/any/other/path". 

579 

580 Note that "https://www.example.org" and "https://www.example.org:12345" 

581 will have different sessions since the port number is not identical. 

582 """ 

583 root_uri = str(rpath.root_uri()) 

584 if root_uri not in self._sessions: 

585 # We don't have yet a session for this endpoint: create a new one. 

586 self._sessions[root_uri] = self._make_session(rpath) 

587 

588 return self._sessions[root_uri] 

589 

590 def _make_session(self, rpath: ResourcePath) -> requests.Session: 

591 """Make a new session configured from values from the environment.""" 

592 session = requests.Session() 

593 root_uri = str(rpath.root_uri()) 

594 log.debug("Creating new HTTP session for endpoint %s ...", root_uri) 

595 retries = Retry( 

596 # Total number of retries to allow. Takes precedence over other 

597 # counts. 

598 total=6, 

599 # How many connection-related errors to retry on. 

600 connect=3, 

601 # How many times to retry on read errors. 

602 read=3, 

603 # Backoff factor to apply between attempts after the second try 

604 # (seconds). Compute a random jitter to prevent all the clients 

605 # to overwhelm the server by sending requests at the same time. 

606 backoff_factor=self._backoff_min + (self._backoff_max - self._backoff_min) * random.random(), 

607 # How many times to retry on bad status codes. 

608 status=5, 

609 # Set of uppercased HTTP method verbs that we should retry on. 

610 # We only automatically retry idempotent requests. 

611 allowed_methods=frozenset( 

612 [ 

613 "COPY", 

614 "DELETE", 

615 "GET", 

616 "HEAD", 

617 "MKCOL", 

618 "OPTIONS", 

619 "PROPFIND", 

620 "PUT", 

621 ] 

622 ), 

623 # HTTP status codes that we should force a retry on. 

624 status_forcelist=frozenset( 

625 [ 

626 requests.codes.too_many_requests, # 429 

627 requests.codes.internal_server_error, # 500 

628 requests.codes.bad_gateway, # 502 

629 requests.codes.service_unavailable, # 503 

630 requests.codes.gateway_timeout, # 504 

631 ] 

632 ), 

633 # Whether to respect Retry-After header on status codes defined 

634 # above. 

635 respect_retry_after_header=True, 

636 ) 

637 

638 # Persist the specified number of connections to the front end server. 

639 session.mount( 

640 root_uri, 

641 HTTPAdapter( 

642 pool_connections=self._num_pools, 

643 pool_maxsize=self._max_persistent_connections, 

644 pool_block=False, 

645 max_retries=retries, 

646 ), 

647 ) 

648 

649 # Do not persist the connections to back end servers which may vary 

650 # from request to request. Systematically persisting connections to 

651 # those servers may exhaust their capabilities when there are thousands 

652 # of simultaneous clients. 

653 session.mount( 

654 f"{rpath.scheme}://", 

655 HTTPAdapter( 

656 pool_connections=self._num_pools, 

657 pool_maxsize=0, 

658 pool_block=False, 

659 max_retries=retries, 

660 ), 

661 ) 

662 

663 # If the remote endpoint doesn't use secure HTTP we don't include 

664 # bearer tokens in the requests nor need to authenticate the remote 

665 # server. 

666 if rpath.scheme != "https": 

667 return session 

668 

669 # Set the trusted CA certificates bundle for authenticating remote 

670 # servers. 

671 session.verify = True if self._config.ca_bundle is None else self._config.ca_bundle 

672 

673 # Should we use a bearer token for client authentication? 

674 if (token := self._config.client_token) is not None: 

675 log.debug("... using bearer token authentication") 

676 session.auth = BearerTokenAuth(token) 

677 return session 

678 

679 # Should we instead use client certificate and private key? 

680 client_cert, client_key = self._config.client_cert_key 

681 if client_cert and client_key: 

682 log.debug("... using client certificate authentication.") 

683 session.cert = (client_cert, client_key) 

684 return session 

685 

686 log.debug( 

687 "Neither LSST_HTTP_AUTH_BEARER_TOKEN nor (LSST_HTTP_AUTH_CLIENT_CERT and " 

688 "LSST_HTTP_AUTH_CLIENT_KEY) are initialized. Client authentication is disabled." 

689 ) 

690 return session 

691 

692 

693class ActivityCaveat(enum.Enum): 

694 """Helper class for enumerating accepted activity caveats for requesting 

695 macaroons. 

696 """ 

697 

698 DOWNLOAD = 1 

699 UPLOAD = 2 

700 

701 

702class HttpResourcePath(ResourcePath): 

703 """General HTTP(S) resource. 

704 

705 Notes 

706 ----- 

707 In order to configure the behavior of instances of this class, the 

708 environment variables below are inspected: 

709 

710 - LSST_HTTP_CACERT_BUNDLE: path to a .pem file or to a directory which 

711 contains the .pem files of the trusted certificate authorities's 

712 certificates. If the remote server presents a server certificate 

713 issued by one of those trusted authorities, we trust it. 

714 If this environment variable is not initialized, the default 

715 authorities of the the execution host are trusted. 

716 

717 - LSST_HTTP_AUTH_BEARER_TOKEN: value of a bearer token or path to a 

718 local file containing a bearer token to be used as the client 

719 authentication mechanism with all requests. 

720 The permissions of the token file must be set so that only its 

721 owner can access it. 

722 If initialized, takes precedence over LSST_HTTP_AUTH_CLIENT_CERT 

723 and LSST_HTTP_AUTH_CLIENT_KEY. 

724 

725 - LSST_HTTP_AUTH_CLIENT_CERT: path to a .pem file which contains the 

726 client certificate for authenticating to the server. 

727 If initialized, the variable LSST_HTTP_AUTH_CLIENT_KEY must also be 

728 initialized with the path of the client private key file. 

729 The permissions of the client private key must be set so that only 

730 its owner can access it, at least for reading. 

731 

732 - LSST_HTTP_PUT_SEND_EXPECT_HEADER: if set (with any value), a 

733 "Expect: 100-Continue" header will be added to all HTTP PUT requests. 

734 This header is required by some servers to detect if the client 

735 knows how to handle redirections. In case of redirection, the body 

736 of the PUT request is sent to the redirected location and not to 

737 the front end server. 

738 

739 - LSST_HTTP_TIMEOUT_CONNECT and LSST_HTTP_TIMEOUT_READ: if set to a 

740 numeric value, they are interpreted as the number of seconds to wait 

741 for establishing a connection with the server and for reading its 

742 response, respectively. 

743 

744 - LSST_HTTP_FRONTEND_PERSISTENT_CONNECTIONS and 

745 LSST_HTTP_BACKEND_PERSISTENT_CONNECTIONS: contain the maximum number 

746 of connections to attempt to persist with both the front end servers 

747 and the back end servers. 

748 Default values: DEFAULT_FRONTEND_PERSISTENT_CONNECTIONS and 

749 DEFAULT_BACKEND_PERSISTENT_CONNECTIONS. 

750 

751 - LSST_HTTP_DIGEST: case-insensitive name of the digest algorithm to 

752 ask the server to compute for every file's content sent to the server 

753 via a PUT request. No digest is requested if this variable is not set 

754 or is set to an invalid value. 

755 Valid values are those in ACCEPTED_DIGESTS. 

756 

757 - LSST_HTTP_ENABLE_FSSPEC: the presence of this environment variable 

758 activates the usage of `fsspec` compatible file system to read 

759 a HTTP URL. The value of the variable is not inspected. 

760 """ 

761 

762 @staticmethod 

763 def create_http_resource_path( 

764 path: str, *, extra_headers: dict[str, str] | None = None 

765 ) -> HttpResourcePath: 

766 """Create an instance of `HttpResourcePath` with additional 

767 HTTP-specific configuration. 

768 

769 Parameters 

770 ---------- 

771 path : `str` 

772 HTTP URL to be wrapped in a `ResourcePath` instance. 

773 extra_headers : `dict` [ `str`, `str` ], optional 

774 Additional headers that will be sent with every HTTP request made 

775 by this `ResourcePath`. These override any headers that may be 

776 generated internally by `HttpResourcePath` (e.g. authentication 

777 headers). 

778 

779 Returns 

780 ------- 

781 instance : `ResourcePath` 

782 Newly-created `HttpResourcePath` instance. 

783 

784 Notes 

785 ----- 

786 Most users should use the `ResourcePath` constructor, instead. 

787 """ 

788 # Make sure we instantiate ResourcePath using a string to guarantee we 

789 # get a new ResourcePath. If we accidentally provided a ResourcePath 

790 # instance instead, the ResourcePath constructor sometimes returns 

791 # the original object and we would be modifying an object that is 

792 # supposed to be immutable. 

793 instance = ResourcePath(str(path)) 

794 assert isinstance(instance, HttpResourcePath) 

795 instance._extra_headers = extra_headers 

796 return instance 

797 

798 # WebDAV servers known to be able to sign URLs. The values are lowercased 

799 # server identifiers retrieved from the 'Server' header included in 

800 # the response to a HTTP OPTIONS request. 

801 SUPPORTED_URL_SIGNERS = ("dcache", "xrootd") 

802 

803 # Configuration items for this class instances. 

804 _config: HttpResourcePathConfig = HttpResourcePathConfig() 

805 

806 # The session for metadata requests is used for interacting with 

807 # the front end servers for requests such as PROPFIND, HEAD, etc. Those 

808 # interactions are typically served by the front end servers. We want to 

809 # keep the connection to the front end servers open, to reduce the cost 

810 # associated to TCP and TLS handshaking for each new request. 

811 _metadata_session_store = SessionStore( 

812 config=_config, 

813 num_pools=5, 

814 max_persistent_connections=_config.front_end_connections, 

815 backoff_min=_config.backoff_min, 

816 backoff_max=_config.backoff_max, 

817 ) 

818 

819 # The data session is used for interaction with the front end servers which 

820 # typically redirect to the back end servers for serving our PUT and GET 

821 # requests. We attempt to keep a single connection open with the front end 

822 # server, if possible. This depends on how the server behaves and the 

823 # kind of request. Some servers close the connection when redirecting 

824 # the client to a back end server, for instance when serving a PUT 

825 # request. 

826 _data_session_store = SessionStore( 

827 config=_config, 

828 num_pools=25, 

829 max_persistent_connections=_config.back_end_connections, 

830 backoff_min=_config.backoff_min, 

831 backoff_max=_config.backoff_max, 

832 ) 

833 

834 # Process ID which created the session stores above. We need to store this 

835 # to replace sessions created by a parent process and inherited by a 

836 # child process after a fork, to avoid confusing the SSL layer. 

837 _pid: int = -1 

838 

839 # Connector used by a session pool to establish network connections to 

840 # remote servers. This connector is exclusively used by fsspec file system 

841 # and is shared by all instances of this class. 

842 _tcp_connector: TCPConnector | None = None 

843 

844 # Additional headers added to every request. 

845 _extra_headers: dict[str, str] | None = None 

846 

847 @property 

848 def metadata_session(self) -> _SessionWrapper: 

849 """Client session to send requests which do not require upload or 

850 download of data, i.e. mostly metadata requests. 

851 """ 

852 session = None 

853 if hasattr(self, "_metadata_session"): 

854 if HttpResourcePath._pid == os.getpid(): 

855 session = self._metadata_session 

856 else: 

857 # The metadata session we have in cache was likely created by 

858 # a parent process. Discard all the sessions in that store. 

859 self._metadata_session_store.clear() 

860 

861 # Retrieve a new metadata session. 

862 if session is None: 

863 HttpResourcePath._pid = os.getpid() 

864 session = self._metadata_session_store.get(self) 

865 self._metadata_session: requests.Session = session 

866 return _SessionWrapper(session, extra_headers=self._extra_headers) 

867 

868 @property 

869 def data_session(self) -> _SessionWrapper: 

870 """Client session for uploading and downloading data.""" 

871 session = None 

872 if hasattr(self, "_data_session"): 

873 if HttpResourcePath._pid == os.getpid(): 

874 session = self._data_session 

875 else: 

876 # The data session we have in cache was likely created by 

877 # a parent process. Discard all the sessions in that store. 

878 self._data_session_store.clear() 

879 

880 # Retrieve a new data session. 

881 if session is None: 

882 HttpResourcePath._pid = os.getpid() 

883 session = self._data_session_store.get(self) 

884 self._data_session: requests.Session = session 

885 return _SessionWrapper(session, extra_headers=self._extra_headers) 

886 

887 def _clear_sessions(self) -> None: 

888 """Close the socket connections that are still open. 

889 

890 Used only in test suites to avoid warnings. 

891 """ 

892 self._metadata_session_store.clear() 

893 self._data_session_store.clear() 

894 

895 if hasattr(self, "_metadata_session"): 

896 delattr(self, "_metadata_session") 

897 

898 if hasattr(self, "_data_session"): 

899 delattr(self, "_data_session") 

900 

901 def _init_server_properties(self) -> None: 

902 """Initialize instance variables '_is_webdav' and '_server' by 

903 sending a single OPTIONS request to the remote server and 

904 saving the results. 

905 """ 

906 # Retrieve the "DAV" and the "Server" headers for the root URL of this 

907 # path 

908 dav_header, server_header = _get_dav_and_server_headers(self.root_uri()) 

909 

910 # Check that "1" is part of the value of the "DAV" header. We don't 

911 # use locks, so a server complying to class 1 is enough for our 

912 # purposes. All webDAV servers must advertise at least compliance 

913 # class "1". 

914 # 

915 # Compliance classes are documented in 

916 # http://www.webdav.org/specs/rfc4918.html#dav.compliance.classes 

917 # 

918 # Examples of values for header DAV are: 

919 # DAV: 1, 2 

920 # DAV: 1, <http://apache.org/dav/propset/fs/1> 

921 self._is_webdav: bool = False 

922 if dav_header is not None: 

923 self._is_webdav = "1" in dav_header.replace(" ", "").split(",") 

924 

925 self._server: str | None = None 

926 if server_header is not None: 

927 # Server header is expected to be of the form 'dCache/9.2.4' 

928 # or 'XrootD/v5.7.1'. Strip version and put in lowercase. 

929 self._server = server_header.split("/")[0].lower() 

930 

931 @property 

932 def is_webdav_endpoint(self) -> bool: 

933 """Check if the current endpoint implements WebDAV features. 

934 

935 This is stored per URI but cached by root so there is only one check 

936 per hostname. 

937 """ 

938 if hasattr(self, "_is_webdav"): 

939 return self._is_webdav 

940 

941 self._init_server_properties() 

942 return self._is_webdav 

943 

944 @property 

945 def server(self) -> str | None: 

946 """Return the lowercased identifier of the remote server, retrieved 

947 from the response header 'Server' from an 'OPTIONS' HTTP request. 

948 

949 If the remote server does not include that header in its response 

950 to an 'OPTIONS' request, server() returns None. 

951 

952 Examples of return values are "dcache", "xrootd". 

953 """ 

954 if hasattr(self, "_server"): 

955 return self._server 

956 

957 self._init_server_properties() 

958 return self._server 

959 

960 @property 

961 def server_signs_urls(self) -> bool: 

962 """Return true if the remote server support signing or URLs for 

963 download and upload. 

964 """ 

965 return self.server in HttpResourcePath.SUPPORTED_URL_SIGNERS 

966 

967 @classmethod 

968 def _reload_config(cls) -> None: 

969 """Reload the configuration for all instances of this class. That 

970 configuration is instantiated from the environment. 

971 

972 This is an internal method mainly intended for tests. 

973 """ 

974 HttpResourcePath._config = HttpResourcePathConfig() 

975 

976 def exists(self) -> bool: 

977 """Check that a remote HTTP resource exists.""" 

978 log.debug("Checking if resource exists: %s", self.geturl()) 

979 if not self.is_webdav_endpoint: 

980 # The remote is a plain HTTP server. Let's attempt a HEAD 

981 # request, even if the behavior for such a request against a 

982 # directory is not specified, so it depends on the server 

983 # implementation. 

984 resp = self._head_non_webdav_url() 

985 return self._is_successful_non_webdav_head_request(resp) 

986 

987 # The remote endpoint is a webDAV server: send a PROPFIND request 

988 # to determine if it exists. 

989 resp = self._propfind() 

990 if resp.status_code == requests.codes.multi_status: # 207 

991 prop = _parse_propfind_response_body(resp.text)[0] 

992 return prop.exists 

993 else: # 404 Not Found 

994 return False 

995 

996 def size(self) -> int: 

997 """Return the size of the remote resource in bytes.""" 

998 if self.dirLike: 

999 return 0 

1000 info = self.get_info() 

1001 # dirLike can be None if we are unsure. Only flag if we are certain 

1002 # we have been told this is a directory but webDAV reports it as a 

1003 # file. 

1004 if not info.is_file and self.dirLike is False: 

1005 raise IsADirectoryError( 

1006 f"Resource {self} is reported by server as a directory but has a file path" 

1007 ) 

1008 return info.size 

1009 

1010 def get_info(self) -> ResourceInfo: 

1011 """Return lightweight metadata about this HTTP resource.""" 

1012 if not self.is_webdav_endpoint: 

1013 resp = self._head_non_webdav_url() 

1014 return self._get_info_from_non_webdav_head(resp) 

1015 

1016 resp = self._propfind() 

1017 if resp.status_code != requests.codes.multi_status: 

1018 raise FileNotFoundError( 

1019 f"Resource {self} does not exist, status: {resp.status_code} {resp.reason}" 

1020 ) 

1021 

1022 prop = _parse_propfind_response_body(resp.text)[0] 

1023 if not prop.exists: 

1024 raise FileNotFoundError(f"Resource {self} does not exist") 

1025 

1026 return ResourceInfo( 

1027 uri=str(self), 

1028 is_file=prop.is_file, 

1029 size=prop.size, 

1030 last_modified=prop.last_modified, 

1031 checksums=dict(prop.checksums), 

1032 ) 

1033 

1034 def _get_info_from_non_webdav_head(self, resp: requests.Response) -> ResourceInfo: 

1035 """Build `ResourceInfo` from a non-WebDAV HEAD-like response.""" 

1036 if not self._is_successful_non_webdav_head_request(resp): 

1037 if resp.status_code == requests.codes.not_found: 

1038 raise FileNotFoundError( 

1039 f"Resource {self} does not exist, status: {resp.status_code} {resp.reason}" 

1040 ) 

1041 raise ValueError( 

1042 f"Unexpected response for HEAD request for {self}, status: {resp.status_code} {resp.reason}" 

1043 ) 

1044 

1045 if self.dirLike: 

1046 size = 0 

1047 elif resp.status_code == requests.codes.ok: # 200 

1048 if "Content-Length" not in resp.headers: 

1049 raise ValueError( 

1050 f"Response to HEAD request to {self} does not contain 'Content-Length' header" 

1051 ) 

1052 size = int(resp.headers["Content-Length"]) 

1053 elif resp.status_code == requests.codes.partial_content: 

1054 # 206 Partial Content, returned from a GET request with a Range 

1055 # header (used to emulate HEAD for presigned S3 URLs). 

1056 content_range_header = resp.headers.get("Content-Range") 

1057 if content_range_header is None: 

1058 raise ValueError(f"Response to GET request to {self} did not contain 'Content-Range' header") 

1059 content_range = parse_content_range_header(content_range_header) 

1060 size_total = content_range.total 

1061 if size_total is None: 

1062 raise ValueError(f"Content-Range header for {self} did not include a total file size") 

1063 size = size_total 

1064 else: 

1065 # 416 Range Not Satisfiable can occur on a GET for a 0-byte file. 

1066 size = 0 

1067 

1068 checksums = {} 

1069 digest_header = resp.headers.get("Digest") 

1070 if digest_header is not None: 

1071 for digest in digest_header.split(","): 

1072 algorithm, separator, value = digest.strip().partition("=") 

1073 if separator: 

1074 checksums[algorithm.lower()] = value 

1075 

1076 last_modified = None 

1077 if last_modified_header := resp.headers.get("Last-Modified"): 

1078 last_modified = parsedate_to_datetime(last_modified_header) 

1079 if last_modified.tzinfo is None: 

1080 last_modified = last_modified.replace(tzinfo=datetime.UTC) 

1081 else: 

1082 last_modified = last_modified.astimezone(datetime.UTC) 

1083 

1084 return ResourceInfo( 

1085 uri=str(self), 

1086 is_file=not self.dirLike, 

1087 size=size, 

1088 last_modified=last_modified, 

1089 checksums=checksums, 

1090 ) 

1091 

1092 def _head_non_webdav_url(self) -> requests.Response: 

1093 """Return a response from a HTTP HEAD request for a non-WebDAV HTTP 

1094 URL. 

1095 

1096 Emulates HEAD using a 1-byte GET for presigned S3 URLs. 

1097 """ 

1098 if self._looks_like_presigned_s3_url(): 

1099 # Presigned S3 URLs are signed for a single method only, so you 

1100 # can't call HEAD on a URL signed for GET. However, S3 does 

1101 # support Range requests, so you can ask for a 1-byte range with 

1102 # GET for a similar effect to HEAD. 

1103 # 

1104 # Note that some headers differ between a true HEAD request and the 

1105 # response returned by this GET, e.g. Content-Length will always be 

1106 # 1, and the status code is 206 instead of 200. 

1107 return self.metadata_session.get( 

1108 self.geturl(), 

1109 timeout=self._config.timeout, 

1110 allow_redirects=True, 

1111 stream=False, 

1112 headers={"Range": "bytes=0-0"}, 

1113 ) 

1114 else: 

1115 return self.metadata_session.head( 

1116 self.geturl(), timeout=self._config.timeout, allow_redirects=True, stream=False 

1117 ) 

1118 

1119 def _is_successful_non_webdav_head_request(self, resp: requests.Response) -> bool: 

1120 """Return `True` if the status code in the response indicates a 

1121 successful response to ``_head_non_webdav_url``. 

1122 """ 

1123 return resp.status_code in ( 

1124 requests.codes.ok, # 200, from a normal HEAD or GET request 

1125 requests.codes.partial_content, # 206, returned from a GET request with a Range header. 

1126 # 416, returned from a GET request with a 1-byte Range header that 

1127 # is longer than the 0-byte file. 

1128 requests.codes.range_not_satisfiable, 

1129 ) 

1130 

1131 def _looks_like_presigned_s3_url(self) -> bool: 

1132 """Return `True` if this ResourcePath's URL is likely to be a presigned 

1133 S3 URL. 

1134 """ 

1135 query_params = parse_qs(self._uri.query) 

1136 return "Signature" in query_params and "Expires" in query_params 

1137 

1138 def mkdir(self) -> None: 

1139 """Create the directory resource if it does not already exist.""" 

1140 # Creating directories is only available on WebDAV back ends. 

1141 if not self.is_webdav_endpoint: 

1142 raise NotImplementedError( 

1143 f"Creation of directory {self} is not implemented by plain HTTP servers" 

1144 ) 

1145 

1146 if not self.dirLike: 

1147 raise NotADirectoryError(f"Can not create a 'directory' for file-like URI {self}") 

1148 

1149 # Check if the target directory already exists. 

1150 resp = self._propfind() 

1151 if resp.status_code == requests.codes.multi_status: # 207 

1152 prop = _parse_propfind_response_body(resp.text)[0] 

1153 if prop.exists: 

1154 if prop.is_directory: 

1155 return 

1156 else: 

1157 # A file exists at this path 

1158 raise NotADirectoryError( 

1159 f"Can not create a directory for {self} because a file already exists at that path" 

1160 ) 

1161 

1162 # Target directory does not exist. Create it and its ancestors as 

1163 # needed. We need to test if parent URL is different from self URL, 

1164 # otherwise we could be stuck in a recursive loop 

1165 # where self == parent. 

1166 if self.geturl() != self.parent().geturl(): 

1167 self.parent().mkdir() 

1168 

1169 log.debug("Creating new directory: %s", self.geturl()) 

1170 self._mkcol() 

1171 

1172 def remove(self) -> None: 

1173 """Remove the resource.""" 

1174 self._delete() 

1175 

1176 def read(self, size: int = -1) -> bytes: 

1177 """Open the resource and return the contents in bytes. 

1178 

1179 Parameters 

1180 ---------- 

1181 size : `int`, optional 

1182 The number of bytes to read. Negative or omitted indicates 

1183 that all data should be read. 

1184 """ 

1185 # Use the data session as a context manager to ensure that the 

1186 # network connections to both the front end and back end servers are 

1187 # closed after downloading the data. 

1188 log.debug("Reading from remote resource: %s", self.geturl()) 

1189 stream = size > 0 

1190 with self.data_session as session: 

1191 with time_this(log, msg="GET %s", args=(self,)): 

1192 resp = session.get(self.geturl(), stream=stream, timeout=self._config.timeout) 

1193 

1194 if resp.status_code != requests.codes.ok: # 200 

1195 raise FileNotFoundError( 

1196 f"Unable to read resource {self}; status: {resp.status_code} {resp.reason}" 

1197 ) 

1198 if not stream: 

1199 return resp.content 

1200 else: 

1201 return next(resp.iter_content(chunk_size=size)) 

1202 

1203 def write(self, data: bytes, overwrite: bool = True) -> None: 

1204 """Write the supplied bytes to the new resource. 

1205 

1206 Parameters 

1207 ---------- 

1208 data : `bytes` 

1209 The bytes to write to the resource. The entire contents of the 

1210 resource will be replaced. 

1211 overwrite : `bool`, optional 

1212 If `True` the resource will be overwritten if it exists. Otherwise 

1213 the write will fail. 

1214 """ 

1215 log.debug("Writing to remote resource: %s", self.geturl()) 

1216 if not overwrite and self.exists(): 

1217 raise FileExistsError(f"Remote resource {self} exists and overwrite has been disabled") 

1218 

1219 # Ensure the parent directory exists. 

1220 # This is only meaningful and appropriate for WebDAV, not the general 

1221 # HTTP case. e.g. for S3 HTTP URLs, the underlying service has no 

1222 # concept of 'directories' at all. 

1223 if self.is_webdav_endpoint: 

1224 self.parent().mkdir() 

1225 

1226 # Upload the data. 

1227 log.debug("Writing data to remote resource: %s", self.geturl()) 

1228 self._put(data=data) 

1229 

1230 def transfer_from( 

1231 self, 

1232 src: ResourcePath, 

1233 transfer: str = "copy", 

1234 overwrite: bool = False, 

1235 transaction: TransactionProtocol | None = None, 

1236 multithreaded: bool = True, 

1237 ) -> None: 

1238 """Transfer the current resource to a Webdav repository. 

1239 

1240 Parameters 

1241 ---------- 

1242 src : `ResourcePath` 

1243 Source URI. 

1244 transfer : `str` 

1245 Mode to use for transferring the resource. Supports the following 

1246 options: copy. 

1247 overwrite : `bool`, optional 

1248 Whether overwriting the remote resource is allowed or not. 

1249 transaction : `~lsst.resources.utils.TransactionProtocol`, optional 

1250 Currently unused. 

1251 multithreaded : `bool`, optional 

1252 If `True` the transfer will be allowed to attempt to improve 

1253 throughput by using parallel download streams. This may of no 

1254 effect if the URI scheme does not support parallel streams or 

1255 if a global override has been applied. If `False` parallel 

1256 streams will be disabled. 

1257 """ 

1258 # Fail early to prevent delays if remote resources are requested. 

1259 if transfer not in self.transferModes: 

1260 raise ValueError(f"Transfer mode {transfer} not supported by URI scheme {self.scheme}") 

1261 

1262 # Existence checks cost time so do not call this unless we know 

1263 # that debugging is enabled. 

1264 if log.isEnabledFor(logging.DEBUG): 

1265 log.debug( 

1266 "Transferring %s [exists: %s] -> %s [exists: %s] (transfer=%s)", 

1267 src, 

1268 src.exists(), 

1269 self, 

1270 self.exists(), 

1271 transfer, 

1272 ) 

1273 

1274 # Short circuit immediately if the URIs are identical. 

1275 if self == src: 

1276 log.debug( 

1277 "Target and destination URIs are identical: %s, returning immediately." 

1278 " No further action required.", 

1279 self, 

1280 ) 

1281 return 

1282 

1283 if not overwrite and self.exists(): 

1284 raise FileExistsError(f"Destination path {self} already exists.") 

1285 

1286 if transfer == "auto": 

1287 transfer = self.transferDefault 

1288 

1289 # We can use webDAV 'COPY' or 'MOVE' if both the current and source 

1290 # resources are located in the same server. 

1291 if isinstance(src, type(self)) and self.root_uri() == src.root_uri() and self.is_webdav_endpoint: 

1292 log.debug("Transfer from %s to %s directly", src, self) 

1293 return self._move(src) if transfer == "move" else self._copy(src) 

1294 

1295 # For resources of different classes or for plain HTTP resources we can 

1296 # perform the copy or move operation by downloading to a local file 

1297 # and uploading to the destination. 

1298 self._copy_via_local(src) 

1299 

1300 # This was an explicit move, try to remove the source. 

1301 if transfer == "move": 

1302 src.remove() 

1303 

1304 def walk( 

1305 self, file_filter: str | re.Pattern | None = None 

1306 ) -> Iterator[list | tuple[ResourcePath, list[str], list[str]]]: 

1307 """Walk the directory tree returning matching files and directories. 

1308 

1309 Parameters 

1310 ---------- 

1311 file_filter : `str` or `re.Pattern`, optional 

1312 Regex to filter out files from the list before it is returned. 

1313 

1314 Yields 

1315 ------ 

1316 dirpath : `ResourcePath` 

1317 Current directory being examined. 

1318 dirnames : `list` of `str` 

1319 Names of subdirectories within dirpath. 

1320 filenames : `list` of `str` 

1321 Names of all the files within dirpath. 

1322 """ 

1323 if not self.dirLike: 

1324 raise ValueError("Can not walk a non-directory URI") 

1325 

1326 # Walking directories is only available on WebDAV back ends. 

1327 if not self.is_webdav_endpoint: 

1328 raise NotImplementedError(f"Walking directory {self} is not implemented by plain HTTP servers") 

1329 

1330 if isinstance(file_filter, str): 

1331 file_filter = re.compile(file_filter) 

1332 

1333 resp = self._propfind(depth="1") 

1334 if resp.status_code == requests.codes.multi_status: # 207 

1335 files: list[str] = [] 

1336 dirs: list[str] = [] 

1337 

1338 for prop in _parse_propfind_response_body(resp.text): 

1339 if prop.is_file: 

1340 files.append(prop.name) 

1341 elif not prop.href.rstrip("/").endswith(self.path.rstrip("/")): 

1342 # Only include the names of sub-directories not the name of 

1343 # the directory being walked. 

1344 dirs.append(prop.name) 

1345 

1346 if file_filter is not None: 

1347 files = [f for f in files if file_filter.search(f)] 

1348 

1349 if not dirs and not files: 

1350 return 

1351 else: 

1352 yield type(self)(self, forceAbsolute=False, forceDirectory=True), dirs, files 

1353 

1354 for dir in dirs: 

1355 new_uri = self.join(dir, forceDirectory=True) 

1356 yield from new_uri.walk(file_filter) 

1357 

1358 def generate_presigned_get_url(self, *, expiration_time_seconds: int) -> str: 

1359 """Return a pre-signed URL that can be used to retrieve this resource 

1360 using an HTTP GET without supplying any access credentials. 

1361 

1362 Parameters 

1363 ---------- 

1364 expiration_time_seconds : `int` 

1365 Number of seconds until the generated URL is no longer valid. 

1366 

1367 Returns 

1368 ------- 

1369 url : `str` 

1370 HTTP URL signed for GET. 

1371 """ 

1372 if not self.is_webdav_endpoint: 

1373 # This is already an HTTP URL readable without any authentication 

1374 # credentials, so return it as-is. 

1375 return str(self) 

1376 

1377 return self._sign_with_macaroon(ActivityCaveat.DOWNLOAD, expiration_time_seconds) 

1378 

1379 def generate_presigned_put_url(self, *, expiration_time_seconds: int) -> str: 

1380 """Return a pre-signed URL that can be used to upload a file to this 

1381 path using an HTTP PUT without supplying any access credentials. 

1382 

1383 Parameters 

1384 ---------- 

1385 expiration_time_seconds : `int` 

1386 Number of seconds until the generated URL is no longer valid. 

1387 

1388 Returns 

1389 ------- 

1390 url : `str` 

1391 HTTP URL signed for PUT. 

1392 """ 

1393 if not self.is_webdav_endpoint: 

1394 return super().generate_presigned_put_url(expiration_time_seconds=expiration_time_seconds) 

1395 

1396 return self._sign_with_macaroon(ActivityCaveat.UPLOAD, expiration_time_seconds) 

1397 

1398 def to_fsspec(self) -> tuple[AbstractFileSystem, str]: 

1399 """Return an abstract file system and path that can be used by fsspec. 

1400 

1401 Returns 

1402 ------- 

1403 fs : `fsspec.spec.AbstractFileSystem` 

1404 A file system object suitable for use with the returned path. 

1405 path : `str` 

1406 A path that can be opened by the file system object. 

1407 """ 

1408 if fsspec is None: 

1409 return super().to_fsspec() 

1410 

1411 if not self.is_webdav_endpoint or self.server not in HttpResourcePath.SUPPORTED_URL_SIGNERS: 

1412 return fsspec.url_to_fs(self.geturl(), client_kwargs={"headers": self._extra_headers}) 

1413 

1414 if self.isdir(): 

1415 raise NotImplementedError( 

1416 f"method HttpResourcePath.to_fsspec() not implemented for directory {self}" 

1417 ) 

1418 

1419 # If usage of fsspec-compatible file system is disabled in the 

1420 # configuration we raise an exception which signals the caller 

1421 # that it cannot use fsspec. An example of such a caller is 

1422 # `lsst.daf.butler.formatters.ParquetFormatter`. 

1423 # 

1424 # Note that we don't call super().to_fsspec() since that method 

1425 # assumes that fsspec can be used provided fsspec package is 

1426 # importable. 

1427 # 

1428 # The motivation for making this configurable is that for HTTP 

1429 # URLs fsspec.HTTPFileSystem uses async I/O and we have found 

1430 # unexpected behavior by clients when used against dCache for reading 

1431 # parquet files via a ParquetFormatter instance. That behavior cannot 

1432 # be reproduced when using other callers. 

1433 # 

1434 # This needs more investigation to discard the possibility that async 

1435 # I/O, used by fsspec.HTTPFileSystem, is related to this behavior. 

1436 if not self._config.fsspec_is_enabled: 

1437 raise ImportError("fsspec is disabled for HttpResourcePath objects with webDAV back end") 

1438 

1439 async def get_client_session(**kwargs: Any) -> ClientSession: 

1440 """Return a aiohttp.ClientSession configured to use an 

1441 `aiohttp.TCPConnector` shared by all instances of this class. 

1442 

1443 Parameters 

1444 ---------- 

1445 **kwargs : `Any` 

1446 Keyword arguments passed unmodified to the contructor of 

1447 `aiohttp.ClientSession`. 

1448 

1449 Returns 

1450 ------- 

1451 session : `aiohttp.ClientSession` 

1452 Client session that `aiohttp.HTTPFileSystem` will use to pool 

1453 TCP connections to the server. 

1454 """ 

1455 if HttpResourcePath._tcp_connector is None: 

1456 HttpResourcePath._tcp_connector = TCPConnector( 

1457 # SSL context equipped with client credentials and 

1458 # configured to validate server certificates. 

1459 ssl=self._config.ssl_context, 

1460 # Total number of simultaneous connections this connector 

1461 # keeps open with any host. 

1462 # 

1463 # The default is 100 but we deliberately reduced it to 

1464 # avoid keeping a large number of open connexions to file 

1465 # servers when thousands of quanta execute simultaneously. 

1466 # 

1467 # In any case, new connexions are automatically established 

1468 # when needed. 

1469 limit=10, 

1470 # Number of simultaneous connections to a single host:port. 

1471 limit_per_host=1, 

1472 # Close network connection after usage 

1473 force_close=True, 

1474 ) 

1475 

1476 connect_timeout, read_timeout = self._config.timeout 

1477 return ClientSession( 

1478 connector=HttpResourcePath._tcp_connector, 

1479 timeout=ClientTimeout( 

1480 connect=connect_timeout, 

1481 sock_connect=connect_timeout, 

1482 sock_read=read_timeout, 

1483 total=2 * read_timeout, 

1484 ), 

1485 **kwargs, 

1486 ) 

1487 

1488 # Retrieve a signed URL for download valid for 2 hours. 

1489 url = self.generate_presigned_get_url(expiration_time_seconds=2 * 3_600) 

1490 

1491 # HTTPFileSystem constructor accepts the argument 'block_size'. The 

1492 # default value is 'fsspec.utils.DEFAULT_BLOCK_SIZE' which is 5 MB. 

1493 # That seems to be a reasonable block size for downloading files. 

1494 return HTTPFileSystem(get_client=get_client_session), url 

1495 

1496 def _sign_with_macaroon(self, activity: ActivityCaveat, expiration_time_seconds: int) -> str: 

1497 # dCache and XRootD webDAV servers support delivery of macaroons. 

1498 # 

1499 # For details about dCache macaroons see: 

1500 # https://www.dcache.org/manuals/UserGuide-9.2/macaroons.shtml 

1501 if self.server is None: 

1502 raise NotImplementedError(f"server for '{self}' does not support signing URLs") 

1503 elif self.server not in HttpResourcePath.SUPPORTED_URL_SIGNERS: 

1504 raise NotImplementedError(f"server '{self.server}' does not support signing for {self}") 

1505 

1506 match activity: 

1507 case ActivityCaveat.DOWNLOAD: 

1508 activity_caveat = "DOWNLOAD,LIST" 

1509 case ActivityCaveat.UPLOAD: 

1510 activity_caveat = "UPLOAD,LIST" 

1511 

1512 # Retrieve a macaroon for the requested activities and duration 

1513 headers = {"Content-Type": "application/macaroon-request"} 

1514 body = { 

1515 "caveats": [ 

1516 f"activity:{activity_caveat}", 

1517 ], 

1518 "validity": f"PT{expiration_time_seconds}S", 

1519 } 

1520 resp = self._post(data=json.dumps(body), headers=headers) 

1521 if resp.status_code != requests.codes.ok: 

1522 raise ValueError( 

1523 f"could not retrieve a macaroon for URL {self}, status: {resp.status_code} {resp.reason}" 

1524 ) 

1525 

1526 # We are expecting the body of the response to be formatted in JSON. 

1527 # dCache sets the 'Content-Type' of the response to 'application/json' 

1528 # but XRootD does not set any 'Content-Type' header 8-[ 

1529 # 

1530 # An example of a response body returned by dCache is shown below: 

1531 # { 

1532 # "macaroon": "MDA[...]Qo", 

1533 # "uri": { 

1534 # "targetWithMacaroon": "https://dcache.example.org/?authz=MD...", 

1535 # "baseWithMacaroon": "https://dcache.example.org/?authz=MD...", 

1536 # "target": "https://dcache.example.org/", 

1537 # "base": "https://dcache.example.org/" 

1538 # } 

1539 # } 

1540 # 

1541 # An example of a response body returned by XRootD is shown below: 

1542 # { 

1543 # "macaroon": "MDA[...]Qo", 

1544 # "expires_in": 86400 

1545 # } 

1546 try: 

1547 response_body = json.loads(resp.text) 

1548 if "macaroon" in response_body: 

1549 return str(self.replace(query=f"authz={response_body['macaroon']}")) 

1550 else: 

1551 raise ValueError(f"could not retrieve macaroon for URL {self}") 

1552 except json.JSONDecodeError: 

1553 raise ValueError(f"could not deserialize response to POST request for URL {self}") 

1554 

1555 @contextlib.contextmanager 

1556 def _as_local( 

1557 self, multithreaded: bool = True, tmpdir: ResourcePath | None = None 

1558 ) -> Iterator[ResourcePath]: 

1559 """Download object over HTTP and place in temporary directory. 

1560 

1561 Parameters 

1562 ---------- 

1563 multithreaded : `bool`, optional 

1564 If `True` the transfer will be allowed to attempt to improve 

1565 throughput by using parallel download streams. This may of no 

1566 effect if the URI scheme does not support parallel streams or 

1567 if a global override has been applied. If `False` parallel 

1568 streams will be disabled. 

1569 tmpdir : `ResourcePath` or `None`, optional 

1570 Explicit override of the temporary directory to use for remote 

1571 downloads. 

1572 

1573 Returns 

1574 ------- 

1575 local_uri : `ResourcePath` 

1576 A URI to a local POSIX file corresponding to a local temporary 

1577 downloaded copy of the resource. 

1578 """ 

1579 # Use the session as a context manager to ensure that connections 

1580 # to both the front end and back end servers are closed after the 

1581 # download operation is finished. 

1582 with self.data_session as session: 

1583 resp = session.get(self.geturl(), stream=True, timeout=self._config.timeout) 

1584 if resp.status_code != requests.codes.ok: 

1585 raise FileNotFoundError( 

1586 f"Unable to download resource {self}; status: {resp.status_code} {resp.reason}" 

1587 ) 

1588 

1589 if tmpdir is None: 

1590 temp_dir, buffer_size = self._config.tmpdir_buffersize 

1591 tmpdir = ResourcePath(temp_dir, forceDirectory=True) 

1592 else: 

1593 buffer_size = _calc_tmpdir_buffer_size(tmpdir.ospath) 

1594 

1595 with ResourcePath.temporary_uri( 

1596 suffix=self.getExtension(), prefix=tmpdir, delete=True 

1597 ) as tmp_uri: 

1598 expected_length = int(resp.headers.get("Content-Length", "-1")) 

1599 with time_this( 

1600 log, 

1601 msg="GET %s [length=%d] to local file %s [chunk_size=%d]", 

1602 args=(self, expected_length, tmp_uri, buffer_size), 

1603 mem_usage=self._config.collect_memory_usage, 

1604 mem_unit=u.mebibyte, 

1605 ): 

1606 content_length = 0 

1607 with open(tmp_uri.ospath, "wb", buffering=buffer_size) as tmpFile: 

1608 for chunk in resp.iter_content(chunk_size=buffer_size): 

1609 tmpFile.write(chunk) 

1610 content_length += len(chunk) 

1611 

1612 # Check that the expected and actual content lengths match. 

1613 # Perform this check only when the contents of the file was not 

1614 # encoded by the server. 

1615 if ( 

1616 "Content-Encoding" not in resp.headers 

1617 and expected_length >= 0 

1618 and expected_length != content_length 

1619 ): 

1620 raise ValueError( 

1621 f"Size of downloaded file does not match value in Content-Length header for {self}: " 

1622 f"expecting {expected_length} and got {content_length} bytes" 

1623 ) 

1624 

1625 yield tmp_uri 

1626 

1627 def _send_webdav_request( 

1628 self, 

1629 method: str, 

1630 url: str | None = None, 

1631 headers: dict[str, str] | None = None, 

1632 body: str | None = None, 

1633 session: _SessionWrapper | None = None, 

1634 timeout: tuple[float, float] | None = None, 

1635 ) -> requests.Response: 

1636 """Send a webDAV request and correctly handle redirects. 

1637 

1638 Parameters 

1639 ---------- 

1640 method : `str` 

1641 The mthod of the HTTP request to be sent, e.g. PROPFIND, MKCOL. 

1642 headers : `dict`, optional 

1643 A dictionary of key-value pairs (both strings) to include as 

1644 headers in the request. 

1645 body : `str`, optional 

1646 The body of the request. 

1647 

1648 Notes 

1649 ----- 

1650 This way of sending webDAV requests is necessary for handling 

1651 redirection ourselves, since the 'requests' package changes the method 

1652 of the redirected request when the server responds with status 302 and 

1653 the method of the original request is not HEAD (which is the case for 

1654 webDAV requests). 

1655 

1656 That means that when the webDAV server we interact with responds with 

1657 a redirection to a PROPFIND or MKCOL request, the request gets 

1658 converted to a GET request when sent to the redirected location. 

1659 

1660 See `requests.sessions.SessionRedirectMixin.rebuild_method()` in 

1661 https://github.com/psf/requests/blob/main/requests/sessions.py 

1662 

1663 This behavior of the 'requests' package is meant to be compatible with 

1664 what is specified in RFC 9110: 

1665 

1666 https://www.rfc-editor.org/rfc/rfc9110#name-302-found 

1667 

1668 For our purposes, we do need to follow the redirection and send a new 

1669 request using the same HTTP verb. 

1670 """ 

1671 if url is None: 

1672 url = self.geturl() 

1673 

1674 if headers is None: 

1675 headers = {} 

1676 

1677 if session is None: 

1678 session = self.metadata_session 

1679 

1680 if timeout is None: 

1681 timeout = self._config.timeout 

1682 

1683 with time_this( 

1684 log, 

1685 msg="%s %s", 

1686 args=( 

1687 method, 

1688 url, 

1689 ), 

1690 mem_usage=self._config.collect_memory_usage, 

1691 mem_unit=u.mebibyte, 

1692 ): 

1693 for _ in range(max_redirects := 5): 

1694 resp = session.request( 

1695 method, 

1696 url, 

1697 data=body, 

1698 headers=headers, 

1699 stream=False, 

1700 timeout=timeout, 

1701 allow_redirects=False, 

1702 ) 

1703 if resp.is_redirect: 

1704 url = resp.headers["Location"] 

1705 else: 

1706 return resp 

1707 

1708 # We reached the maximum allowed number of redirects. 

1709 # Stop trying. 

1710 raise ValueError( 

1711 f"Could not get a response to {method} request for {self} after {max_redirects} redirections" 

1712 ) 

1713 

1714 def _propfind(self, body: str | None = None, depth: str = "0") -> requests.Response: 

1715 """Send a PROPFIND webDAV request and return the response. 

1716 

1717 Parameters 

1718 ---------- 

1719 body : `str`, optional 

1720 The body of the PROPFIND request to send to the server. If 

1721 provided, it is expected to be a XML document. 

1722 depth : `str`, optional 

1723 The value of the 'Depth' header to include in the request. 

1724 

1725 Returns 

1726 ------- 

1727 response : `requests.Response` 

1728 Response to the PROPFIND request. 

1729 

1730 Notes 

1731 ----- 

1732 It raises `ValueError` if the status code of the PROPFIND request 

1733 is different from "207 Multistatus" or "404 Not Found". 

1734 """ 

1735 if body is None: 

1736 # Request only the DAV live properties we are explicitly interested 

1737 # in namely 'resourcetype', 'getcontentlength', 'getlastmodified' 

1738 # and 'displayname'. 

1739 body = ( 

1740 """<?xml version="1.0" encoding="utf-8" ?>""" 

1741 """<D:propfind xmlns:D="DAV:"><D:prop>""" 

1742 """<D:resourcetype/><D:getcontentlength/><D:getlastmodified/><D:displayname/>""" 

1743 """</D:prop></D:propfind>""" 

1744 ) 

1745 headers = { 

1746 "Depth": depth, 

1747 "Content-Type": 'application/xml; charset="utf-8"', 

1748 "Content-Length": str(len(body)), 

1749 } 

1750 resp = self._send_webdav_request("PROPFIND", headers=headers, body=body) 

1751 if resp.status_code in (requests.codes.multi_status, requests.codes.not_found): 

1752 return resp 

1753 else: 

1754 raise ValueError( 

1755 f"Unexpected response for PROPFIND request for {self}, status: {resp.status_code} " 

1756 f"{resp.reason}" 

1757 ) 

1758 

1759 def _options(self) -> requests.Response: 

1760 """Send a OPTIONS webDAV request for this resource.""" 

1761 resp = self._send_webdav_request("OPTIONS") 

1762 if resp.status_code in (requests.codes.ok, requests.codes.created): 

1763 return resp 

1764 

1765 raise ValueError( 

1766 f"Unexpected response to OPTIONS request for {self}, status: {resp.status_code} {resp.reason}" 

1767 ) 

1768 

1769 def _head(self) -> requests.Response: 

1770 """Send a HEAD request for this resource.""" 

1771 if not self.is_webdav_endpoint: 

1772 # The remote is a plain HTTP server. 

1773 return self._head_non_webdav_url() 

1774 return self._send_webdav_request("HEAD") 

1775 

1776 def _mkcol(self) -> None: 

1777 """Send a MKCOL webDAV request to create a collection. The collection 

1778 may already exist. 

1779 """ 

1780 resp = self._send_webdav_request("MKCOL") 

1781 if resp.status_code == requests.codes.created: # 201 

1782 return 

1783 

1784 if resp.status_code == requests.codes.method_not_allowed: # 405 

1785 # The remote directory already exists 

1786 log.debug("Can not create directory: %s may already exist: skipping.", self.geturl()) 

1787 else: 

1788 raise ValueError(f"Can not create directory {self}, status: {resp.status_code} {resp.reason}") 

1789 

1790 def _delete(self) -> None: 

1791 """Send a DELETE webDAV request for this resource.""" 

1792 log.debug("Deleting %s ...", self.geturl()) 

1793 

1794 # If this is a directory, ensure the remote is a webDAV server because 

1795 # plain HTTP servers don't support DELETE requests on non-file 

1796 # paths. 

1797 if self.dirLike and not self.is_webdav_endpoint: 

1798 raise NotImplementedError( 

1799 f"Deletion of directory {self} is not implemented by plain HTTP servers" 

1800 ) 

1801 

1802 # Deleting non-empty directories may take some time, so increase 

1803 # the timeout for getting a response from the server. 

1804 timeout = self._config.timeout 

1805 if self.dirLike: 

1806 timeout = (timeout[0], timeout[1] * 100) 

1807 resp = self._send_webdav_request("DELETE", timeout=timeout) 

1808 if resp.status_code in ( 

1809 requests.codes.ok, 

1810 requests.codes.accepted, 

1811 requests.codes.no_content, 

1812 requests.codes.not_found, 

1813 ): 

1814 # We can get a "404 Not Found" error when the file or directory 

1815 # does not exist or when the DELETE request was retried several 

1816 # times and a previous attempt actually deleted the resource. 

1817 # Therefore we consider that a "Not Found" response is not an 

1818 # error since we reached the state desired by the user. 

1819 return 

1820 else: 

1821 # TODO: the response to a DELETE request against a webDAV server 

1822 # may be multistatus. If so, we need to parse the reponse body to 

1823 # determine more precisely the reason of the failure (e.g. a lock) 

1824 # and provide a more helpful error message. 

1825 raise ValueError(f"Unable to delete resource {self}; status: {resp.status_code} {resp.reason}") 

1826 

1827 def _copy_via_local(self, src: ResourcePath) -> None: 

1828 """Replace the contents of this resource with the contents of a remote 

1829 resource by using a local temporary file. 

1830 

1831 Parameters 

1832 ---------- 

1833 src : `HttpResourcePath` 

1834 The source of the contents to copy to `self`. 

1835 """ 

1836 with src.as_local() as local_uri: 

1837 log.debug("Transfer from %s to %s via local file %s", src, self, local_uri) 

1838 with open(local_uri.ospath, "rb") as f: 

1839 self._put(data=f) 

1840 

1841 def _copy_or_move(self, method: str, src: HttpResourcePath) -> None: 

1842 """Send a COPY or MOVE webDAV request to copy or replace the contents 

1843 of this resource with the contents of another resource located in the 

1844 same server. 

1845 

1846 Parameters 

1847 ---------- 

1848 method : `str` 

1849 The method to perform. Valid values are "COPY" or "MOVE" (in 

1850 uppercase). 

1851 src : `HttpResourcePath` 

1852 The source of the contents to move to `self`. 

1853 """ 

1854 headers = {"Destination": self.geturl()} 

1855 resp = self._send_webdav_request(method, url=src.geturl(), headers=headers, session=self.data_session) 

1856 if resp.status_code in (requests.codes.created, requests.codes.no_content): 

1857 return 

1858 

1859 if resp.status_code == requests.codes.multi_status: 

1860 tree = eTree.fromstring(resp.content) 

1861 status_element = tree.find("./{DAV:}response/{DAV:}status") 

1862 status = status_element.text if status_element is not None else "unknown" 

1863 error = tree.find("./{DAV:}response/{DAV:}error") 

1864 raise ValueError(f"{method} returned multistatus reponse with status {status} and error {error}") 

1865 else: 

1866 raise ValueError( 

1867 f"{method} operation from {src} to {self} failed, status: {resp.status_code} {resp.reason}" 

1868 ) 

1869 

1870 def _copy(self, src: HttpResourcePath) -> None: 

1871 """Send a COPY webDAV request to replace the contents of this resource 

1872 (if any) with the contents of another resource located in the same 

1873 server. 

1874 

1875 Parameters 

1876 ---------- 

1877 src : `HttpResourcePath` 

1878 The source of the contents to copy to `self`. 

1879 """ 

1880 # Neither dCache nor XrootD currently implement the COPY 

1881 # webDAV method as documented in 

1882 # http://www.webdav.org/specs/rfc4918.html#METHOD_COPY 

1883 # (See issues DM-37603 and DM-37651 for details) 

1884 # 

1885 # For the time being, we use a temporary local file to 

1886 # perform the copy client side. 

1887 # TODO: when those 2 issues above are solved remove the 3 lines below. 

1888 must_use_local = True 

1889 if must_use_local: 

1890 return self._copy_via_local(src) 

1891 

1892 return self._copy_or_move("COPY", src) 

1893 

1894 def _move(self, src: HttpResourcePath) -> None: 

1895 """Send a MOVE webDAV request to replace the contents of this resource 

1896 with the contents of another resource located in the same server. 

1897 

1898 Parameters 

1899 ---------- 

1900 src : `HttpResourcePath` 

1901 The source of the contents to move to `self`. 

1902 """ 

1903 return self._copy_or_move("MOVE", src) 

1904 

1905 def _post(self, data: str | None = None, headers: dict[str, str] | None = None) -> requests.Response: 

1906 """Perform an HTTP POST request and returns the received response. 

1907 

1908 Parameters 

1909 ---------- 

1910 body : `bytes` 

1911 The contents of the request body. 

1912 """ 

1913 resp = self.metadata_session.request( 

1914 "POST", 

1915 self.geturl(), 

1916 data=data, 

1917 headers=headers, 

1918 stream=False, 

1919 timeout=self._config.timeout, 

1920 allow_redirects=True, 

1921 ) 

1922 if resp.status_code == requests.codes.ok: 

1923 return resp 

1924 

1925 raise ValueError(f"POST request for {self} failed, status: {resp.status_code} {resp.reason}") 

1926 

1927 def _put(self, data: BinaryIO | bytes) -> None: 

1928 """Perform an HTTP PUT request and handle redirection. 

1929 

1930 Parameters 

1931 ---------- 

1932 data : `Union[BinaryIO, bytes]` 

1933 The data to be included in the body of the PUT request. 

1934 """ 

1935 # Retrieve the final URL for this upload by sending a PUT request with 

1936 # no content. Follow a single server redirection to retrieve the 

1937 # final URL. 

1938 headers = {"Content-Length": "0"} 

1939 

1940 # If we are explicitly configured for or if we know the remote server 

1941 # is dCache, send an "Expect" header to signal the server that this 

1942 # client knows how to handle redirection in PUT requests. 

1943 # 

1944 # The goal is that the contents of the file we want to upload is sent 

1945 # directly to the dCache pool without transiting through the dCache 

1946 # webDAV door. Otherwise, the uploaded data would transit twice over 

1947 # the network: first from this client to the dCache webDAV door and 

1948 # second from webDAV door to the target pool (i.e. the dCache file 

1949 # server which will ultimately store the data we will upload). 

1950 # 

1951 # Systematically uploading data via dCache webDAV door could add 

1952 # unnecessary load to the door which we can avoid by instead uploading 

1953 # directly to the dCache pool. 

1954 # 

1955 # For further details see section "Redirection on upload": 

1956 # 

1957 # https://www.dcache.org/manuals/UserGuide-9.2/webdav.shtml#redirection 

1958 if self._config.send_expect_on_put or self.server == "dcache": 

1959 headers["Expect"] = "100-continue" 

1960 

1961 url = self.geturl() 

1962 

1963 # Use the session as a context manager to ensure the underlying 

1964 # connections are closed after finishing uploading the data. 

1965 with self.data_session as session: 

1966 # Send an empty PUT request to get redirected to the final 

1967 # destination. 

1968 log.debug("Sending empty PUT request to %s", url) 

1969 with time_this( 

1970 log, 

1971 msg="PUT (no data) %s", 

1972 args=(url,), 

1973 mem_usage=self._config.collect_memory_usage, 

1974 mem_unit=u.mebibyte, 

1975 ): 

1976 resp = session.request( 

1977 "PUT", 

1978 url, 

1979 data=None, 

1980 headers=headers, 

1981 stream=False, 

1982 timeout=self._config.timeout, 

1983 allow_redirects=False, 

1984 ) 

1985 if resp.is_redirect: 

1986 url = resp.headers["Location"] 

1987 

1988 # Upload the data to the final destination. 

1989 log.debug("Uploading data to %s", url) 

1990 

1991 # Ask the server to compute and record a checksum of the uploaded 

1992 # file contents, for later integrity checks. Since we don't compute 

1993 # the digest ourselves while uploading the data, we cannot control 

1994 # after the request is complete that the data we uploaded is 

1995 # identical to the data recorded by the server, but at least the 

1996 # server has recorded a digest of the data it stored. 

1997 # 

1998 # See RFC-3230 for details and 

1999 # https://www.iana.org/assignments/http-dig-alg/http-dig-alg.xhtml 

2000 # for the list of supported digest algorithhms. 

2001 # In addition, note that not all servers implement this RFC so 

2002 # the checksum may not be computed by the server. 

2003 put_headers: dict[str, str] | None = None 

2004 if digest := self._config.digest_algorithm: 

2005 put_headers = {"Want-Digest": digest} 

2006 

2007 with time_this( 

2008 log, 

2009 msg="PUT %s", 

2010 args=(url,), 

2011 mem_usage=self._config.collect_memory_usage, 

2012 mem_unit=u.mebibyte, 

2013 ): 

2014 resp = session.request( 

2015 "PUT", 

2016 url, 

2017 data=data, 

2018 headers=put_headers, 

2019 stream=False, 

2020 timeout=self._config.timeout, 

2021 allow_redirects=False, 

2022 ) 

2023 if resp.status_code in ( 

2024 requests.codes.ok, 

2025 requests.codes.created, 

2026 requests.codes.no_content, 

2027 ): 

2028 return 

2029 else: 

2030 raise ValueError(f"Can not write file {self}, status: {resp.status_code} {resp.reason}") 

2031 

2032 @contextlib.contextmanager 

2033 def _openImpl( 

2034 self, 

2035 mode: str = "r", 

2036 *, 

2037 encoding: str | None = None, 

2038 ) -> Iterator[ResourceHandleProtocol]: 

2039 resp = self._head() 

2040 accepts_range = resp.status_code == requests.codes.ok and resp.headers.get("Accept-Ranges") == "bytes" 

2041 handle: ResourceHandleProtocol 

2042 if mode in ("rb", "r") and accepts_range: 

2043 handle = HttpReadResourceHandle(mode, log, self, timeout=self._config.timeout) 

2044 if mode == "r": 

2045 # cast because the protocol is compatible, but does not have 

2046 # BytesIO in the inheritance tree 

2047 yield io.TextIOWrapper(cast(Any, handle), encoding=encoding) 

2048 else: 

2049 yield handle 

2050 else: 

2051 with super()._openImpl(mode, encoding=encoding) as http_handle: 

2052 yield http_handle 

2053 

2054 def _copy_extra_attributes(self, original_uri: ResourcePath) -> None: 

2055 assert isinstance(original_uri, HttpResourcePath) 

2056 self._extra_headers = original_uri._extra_headers 

2057 

2058 

2059def _dump_response(resp: requests.Response) -> None: 

2060 """Log the contents of a HTTP or webDAV request and its response. 

2061 

2062 Parameters 

2063 ---------- 

2064 resp : `requests.Response` 

2065 The response to log. 

2066 

2067 Notes 

2068 ----- 

2069 Intended for development purposes only. 

2070 """ 

2071 log.debug("-----------------------------------------------") 

2072 log.debug("Request") 

2073 log.debug(" method=%s", resp.request.method) 

2074 log.debug(" URL=%s", resp.request.url) 

2075 log.debug(" headers=%s", resp.request.headers) 

2076 if resp.request.method == "PUT": 

2077 log.debug(" body=<data>") 

2078 elif resp.request.body is None: 

2079 log.debug(" body=<empty>") 

2080 else: 

2081 log.debug(" body=%r", resp.request.body[:120]) 

2082 

2083 log.debug("Response:") 

2084 log.debug(" status_code=%d", resp.status_code) 

2085 log.debug(" headers=%s", resp.headers) 

2086 if not resp.content: 

2087 log.debug(" body=<empty>") 

2088 elif "Content-Type" in resp.headers and resp.headers["Content-Type"] == "text/plain": 

2089 log.debug(" body=%r", resp.content) 

2090 else: 

2091 log.debug(" body=%r", resp.content[:80]) 

2092 

2093 

2094def _is_protected(filepath: str) -> bool: 

2095 """Return true if the permissions of file at filepath only allow for access 

2096 by its owner. 

2097 

2098 Parameters 

2099 ---------- 

2100 filepath : `str` 

2101 Path of a local file. 

2102 """ 

2103 if not os.path.isfile(filepath): 

2104 return False 

2105 mode = stat.S_IMODE(os.stat(filepath).st_mode) 

2106 owner_accessible = bool(mode & stat.S_IRWXU) 

2107 group_accessible = bool(mode & stat.S_IRWXG) 

2108 other_accessible = bool(mode & stat.S_IRWXO) 

2109 return owner_accessible and not group_accessible and not other_accessible 

2110 

2111 

2112def _parse_propfind_response_body(body: str) -> list[DavProperty]: 

2113 """Parse the XML-encoded contents of the response body to a webDAV PROPFIND 

2114 request. 

2115 

2116 Parameters 

2117 ---------- 

2118 body : `str` 

2119 XML-encoded response body to a PROPFIND request 

2120 

2121 Returns 

2122 ------- 

2123 responses : `List[DavProperty]` 

2124 

2125 Notes 

2126 ----- 

2127 Is is expected that there is at least one reponse in `body`, otherwise 

2128 this function raises. 

2129 """ 

2130 # A response body to a PROPFIND request is of the form (indented for 

2131 # readability): 

2132 # 

2133 # <?xml version="1.0" encoding="UTF-8"?> 

2134 # <D:multistatus xmlns:D="DAV:"> 

2135 # <D:response> 

2136 # <D:href>path/to/resource</D:href> 

2137 # <D:propstat> 

2138 # <D:prop> 

2139 # <D:resourcetype> 

2140 # <D:collection xmlns:D="DAV:"/> 

2141 # </D:resourcetype> 

2142 # <D:getlastmodified> 

2143 # Fri, 27 Jan 2 023 13:59:01 GMT 

2144 # </D:getlastmodified> 

2145 # <D:getcontentlength> 

2146 # 12345 

2147 # </D:getcontentlength> 

2148 # </D:prop> 

2149 # <D:status> 

2150 # HTTP/1.1 200 OK 

2151 # </D:status> 

2152 # </D:propstat> 

2153 # </D:response> 

2154 # <D:response> 

2155 # ... 

2156 # </D:response> 

2157 # <D:response> 

2158 # ... 

2159 # </D:response> 

2160 # </D:multistatus> 

2161 

2162 # Scan all the 'response' elements and extract the relevant properties 

2163 responses = [] 

2164 multistatus = eTree.fromstring(body.strip()) 

2165 for response in multistatus.findall("./{DAV:}response"): 

2166 responses.append(DavProperty(response)) 

2167 

2168 if responses: 

2169 return responses 

2170 else: 

2171 # Could not parse the body 

2172 raise ValueError(f"Unable to parse response for PROPFIND request: {body}") 

2173 

2174 

2175class DavProperty: 

2176 """Helper class to encapsulate select live DAV properties of a single 

2177 resource, as retrieved via a PROPFIND request. 

2178 

2179 Parameters 

2180 ---------- 

2181 response : `eTree.Element` or `None` 

2182 The XML response defining the DAV property. 

2183 """ 

2184 

2185 # Regular expression to compare against the 'status' element of a 

2186 # PROPFIND response's 'propstat' element. 

2187 _status_ok_rex = re.compile(r"^HTTP/.* 200 .*$", re.IGNORECASE) 

2188 

2189 def __init__(self, response: eTree.Element | None): 

2190 self._href: str = "" 

2191 self._displayname: str = "" 

2192 self._collection: bool = False 

2193 self._getlastmodified: str = "" 

2194 self._getcontentlength: int = -1 

2195 

2196 if response is not None: 

2197 self._parse(response) 

2198 

2199 def _parse(self, response: eTree.Element) -> None: 

2200 # Extract 'href'. 

2201 if (element := response.find("./{DAV:}href")) is not None: 

2202 # We need to use "str(element.text)"" instead of "element.text" to 

2203 # keep mypy happy. 

2204 self._href = str(element.text).strip() 

2205 else: 

2206 raise ValueError( 

2207 "Property 'href' expected but not found in PROPFIND response: " 

2208 f"{eTree.tostring(response, encoding='unicode')}" 

2209 ) 

2210 

2211 for propstat in response.findall("./{DAV:}propstat"): 

2212 # Only extract properties of interest with status OK. 

2213 status = propstat.find("./{DAV:}status") 

2214 if status is None or not self._status_ok_rex.match(str(status.text)): 

2215 continue 

2216 

2217 for prop in propstat.findall("./{DAV:}prop"): 

2218 # Parse "collection". 

2219 if (element := prop.find("./{DAV:}resourcetype/{DAV:}collection")) is not None: 

2220 self._collection = True 

2221 

2222 # Parse "getlastmodified". 

2223 if (element := prop.find("./{DAV:}getlastmodified")) is not None: 

2224 self._getlastmodified = str(element.text) 

2225 

2226 # Parse "getcontentlength". 

2227 if (element := prop.find("./{DAV:}getcontentlength")) is not None: 

2228 self._getcontentlength = int(str(element.text)) 

2229 

2230 # Parse "displayname". 

2231 if (element := prop.find("./{DAV:}displayname")) is not None: 

2232 self._displayname = str(element.text) 

2233 

2234 # Some webDAV servers don't include the 'displayname' property in the 

2235 # response so try to infer it from the value of the 'href' property. 

2236 # Depending on the server the href value may end with '/'. 

2237 if not self._displayname: 

2238 self._displayname = os.path.basename(self._href.rstrip("/")) 

2239 

2240 # Force a size of 0 for collections. 

2241 if self._collection: 

2242 self._getcontentlength = 0 

2243 

2244 @property 

2245 def exists(self) -> bool: 

2246 # It is either a directory or a file with length of at least zero 

2247 return self._collection or self._getcontentlength >= 0 

2248 

2249 @property 

2250 def is_directory(self) -> bool: 

2251 return self._collection 

2252 

2253 @property 

2254 def is_file(self) -> bool: 

2255 return not self._collection 

2256 

2257 @property 

2258 def size(self) -> int: 

2259 return self._getcontentlength 

2260 

2261 @property 

2262 def last_modified(self) -> datetime.datetime | None: 

2263 if not self._getlastmodified: 

2264 return None 

2265 

2266 last_modified = parsedate_to_datetime(self._getlastmodified) 

2267 if last_modified.tzinfo is None: 

2268 last_modified = last_modified.replace(tzinfo=datetime.UTC) 

2269 else: 

2270 last_modified = last_modified.astimezone(datetime.UTC) 

2271 return last_modified 

2272 

2273 @property 

2274 def checksums(self) -> dict[str, str]: 

2275 return {} 

2276 

2277 @property 

2278 def name(self) -> str: 

2279 return self._displayname 

2280 

2281 @property 

2282 def href(self) -> str: 

2283 return self._href 

2284 

2285 

2286class _SessionWrapper(contextlib.AbstractContextManager): 

2287 """Wraps a `requests.Session` to allow header values to be injected with 

2288 all requests. 

2289 

2290 Notes 

2291 ----- 

2292 `requests.Session` already has a feature for setting headers globally, but 

2293 our session objects are global and authorization headers can vary for each 

2294 HttpResourcePath instance. 

2295 """ 

2296 

2297 def __init__(self, session: requests.Session, *, extra_headers: dict[str, str] | None) -> None: 

2298 self._session = session 

2299 self._extra_headers = extra_headers 

2300 

2301 def __enter__(self) -> _SessionWrapper: 

2302 self._session.__enter__() 

2303 return self 

2304 

2305 def __exit__( 

2306 self, 

2307 exc_type: Any, 

2308 exc_value: Any, 

2309 traceback: Any, 

2310 ) -> None: 

2311 return self._session.__exit__(exc_type, exc_value, traceback) 

2312 

2313 def get( 

2314 self, 

2315 url: str, 

2316 *, 

2317 timeout: tuple[float, float], 

2318 allow_redirects: bool = True, 

2319 stream: bool, 

2320 headers: dict[str, str] | None = None, 

2321 ) -> requests.Response: 

2322 return self._session.get( 

2323 url, 

2324 timeout=timeout, 

2325 allow_redirects=allow_redirects, 

2326 stream=stream, 

2327 headers=self._augment_headers(headers), 

2328 ) 

2329 

2330 def head( 

2331 self, 

2332 url: str, 

2333 *, 

2334 timeout: tuple[float, float], 

2335 allow_redirects: bool, 

2336 stream: bool, 

2337 headers: dict[str, str] | None = None, 

2338 ) -> requests.Response: 

2339 return self._session.head( 

2340 url, 

2341 timeout=timeout, 

2342 allow_redirects=allow_redirects, 

2343 stream=stream, 

2344 headers=self._augment_headers(headers), 

2345 ) 

2346 

2347 def request( 

2348 self, 

2349 method: str, 

2350 url: str, 

2351 *, 

2352 data: str | bytes | BinaryIO | None, 

2353 timeout: tuple[float, float], 

2354 allow_redirects: bool, 

2355 stream: bool, 

2356 headers: dict[str, str] | None = None, 

2357 ) -> requests.Response: 

2358 return self._session.request( 

2359 method, 

2360 url, 

2361 data=data, 

2362 timeout=timeout, 

2363 allow_redirects=allow_redirects, 

2364 stream=stream, 

2365 headers=self._augment_headers(headers), 

2366 ) 

2367 

2368 def _augment_headers(self, headers: dict[str, str] | None) -> dict[str, str]: 

2369 if headers is None: 

2370 headers = {} 

2371 

2372 if self._extra_headers is not None: 

2373 headers = headers | self._extra_headers 

2374 

2375 return headers