Coverage for python / lsst / resources / s3.py: 20%

365 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-28 08:32 +0000

1# This file is part of lsst-resources. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14__all__ = ("S3ResourcePath",) 

15 

16import concurrent.futures 

17import contextlib 

18import datetime 

19import io 

20import logging 

21import os 

22import re 

23import sys 

24import threading 

25from collections import defaultdict 

26from collections.abc import Iterable, Iterator 

27from functools import cache, cached_property 

28from typing import IO, TYPE_CHECKING, cast 

29 

30from botocore.exceptions import ClientError 

31 

32from lsst.utils.iteration import chunk_iterable 

33from lsst.utils.timer import time_this 

34 

35from ._resourceHandles._baseResourceHandle import ResourceHandleProtocol 

36from ._resourceHandles._s3ResourceHandle import S3ResourceHandle 

37from ._resourcePath import ( 

38 _EXECUTOR_TYPE, 

39 MBulkResult, 

40 ResourceInfo, 

41 ResourcePath, 

42 _get_executor_class, 

43 _patch_environ, 

44) 

45from .s3utils import ( 

46 _get_s3_connection_parameters, 

47 _s3_disable_bucket_validation, 

48 _s3_should_validate_bucket, 

49 all_retryable_errors, 

50 backoff, 

51 bucketExists, 

52 getS3Client, 

53 max_retry_time, 

54 retryable_io_errors, 

55 s3CheckFileExists, 

56 translate_client_error, 

57) 

58from .utils import _get_num_workers 

59 

60try: 

61 from boto3.s3.transfer import TransferConfig # type: ignore 

62except ImportError: 

63 TransferConfig = None 

64 

65try: 

66 import s3fs 

67 from fsspec.spec import AbstractFileSystem 

68except ImportError: 

69 s3fs = None 

70 AbstractFileSystem = type 

71 

72if TYPE_CHECKING: 

73 with contextlib.suppress(ImportError): 

74 import boto3 

75 

76 from .utils import TransactionProtocol 

77 

78 

79log = logging.getLogger(__name__) 

80 

81 

82class ProgressPercentage: 

83 """Progress bar for S3 file uploads. 

84 

85 Parameters 

86 ---------- 

87 file : `ResourcePath` 

88 Resource that is relevant to the progress percentage. The size of this 

89 resource will be used to determine progress. The name will be used 

90 in the log messages unless overridden by ``file_for_msg``. 

91 file_for_msg : `ResourcePath` or `None`, optional 

92 Resource name to include in log messages in preference to ``file``. 

93 msg : `str`, optional 

94 Message text to be included in every progress log message. 

95 """ 

96 

97 log_level = logging.DEBUG 

98 """Default log level to use when issuing a message.""" 

99 

100 def __init__(self, file: ResourcePath, file_for_msg: ResourcePath | None = None, msg: str = ""): 

101 self._filename = file 

102 self._file_for_msg = str(file_for_msg) if file_for_msg is not None else str(file) 

103 self._size = file.size() 

104 self._seen_so_far = 0 

105 self._lock = threading.Lock() 

106 self._msg = msg 

107 

108 def __call__(self, bytes_amount: int) -> None: 

109 # To simplify, assume this is hooked up to a single filename 

110 with self._lock: 

111 self._seen_so_far += bytes_amount 

112 percentage = (100 * self._seen_so_far) // self._size 

113 log.log( 

114 self.log_level, 

115 "%s %s %s / %s (%s%%)", 

116 self._msg, 

117 self._file_for_msg, 

118 self._seen_so_far, 

119 self._size, 

120 percentage, 

121 ) 

122 

123 

124@cache 

125def _parse_string_to_maybe_bool(maybe_bool_str: str) -> bool | None: 

126 """Map a string to either a boolean value or None. 

127 

128 Parameters 

129 ---------- 

130 maybe_bool_str : `str` 

131 The value to parse 

132 

133 Results 

134 ------- 

135 maybe_bool : `bool` or `None` 

136 The parsed value. 

137 """ 

138 if maybe_bool_str.lower() in ["t", "true", "yes", "y", "1"]: 

139 maybe_bool = True 

140 elif maybe_bool_str.lower() in ["f", "false", "no", "n", "0"]: 

141 maybe_bool = False 

142 elif maybe_bool_str.lower() in ["none", ""]: 

143 maybe_bool = None 

144 else: 

145 raise ValueError(f'Value of "{maybe_bool_str}" is not True, False, or None.') 

146 

147 return maybe_bool 

148 

149 

150class S3ResourcePath(ResourcePath): 

151 """S3 URI resource path implementation class. 

152 

153 Notes 

154 ----- 

155 In order to configure the behavior of instances of this class, the 

156 environment variable is inspected: 

157 

158 - LSST_S3_USE_THREADS: May be True, False, or None. Sets whether threading 

159 is used for downloads, with a value of None defaulting to boto's default 

160 value. Users may wish to set it to False when the downloads will be started 

161 within threads other than python's main thread. 

162 """ 

163 

164 use_threads: bool | None = None 

165 """Explicitly turn on or off threading in use of boto's download_fileobj. 

166 Setting this to None results in boto's default behavior.""" 

167 

168 @cached_property 

169 def _environ_use_threads(self) -> bool | None: 

170 try: 

171 use_threads_str = os.environ["LSST_S3_USE_THREADS"] 

172 except KeyError: 

173 use_threads_str = "None" 

174 

175 use_threads = _parse_string_to_maybe_bool(use_threads_str) 

176 

177 return use_threads 

178 

179 @contextlib.contextmanager 

180 def _use_threads_temp_override(self, multithreaded: bool) -> Iterator: 

181 """Temporarily override the value of use_threads.""" 

182 original = self.use_threads 

183 self.use_threads = multithreaded 

184 yield 

185 self.use_threads = original 

186 

187 @property 

188 def _transfer_config(self) -> TransferConfig: 

189 if self.use_threads is None: 

190 self.use_threads = self._environ_use_threads 

191 

192 if self.use_threads is None: 

193 transfer_config = TransferConfig() 

194 else: 

195 transfer_config = TransferConfig(use_threads=self.use_threads) 

196 

197 return transfer_config 

198 

199 @property 

200 def client(self) -> boto3.client: 

201 """Client object to address remote resource.""" 

202 return getS3Client(self._profile) 

203 

204 @property 

205 def _profile(self) -> str | None: 

206 """Profile name to use for looking up S3 credentials and endpoint.""" 

207 return self._uri.username 

208 

209 @property 

210 def _bucket(self) -> str: 

211 """S3 bucket where the files are stored.""" 

212 # Notionally the bucket is stored in the 'hostname' part of the URI. 

213 # However, Ceph S3 uses a "multi-tenant" syntax for bucket names in the 

214 # form 'tenant:bucket'. The part after the colon is parsed as the port 

215 # portion of the URI, and urllib throws an exception if you try to read 

216 # a non-integer port value. So manually split off this portion of the 

217 # URI. 

218 split = self._uri.netloc.split("@") 

219 num_components = len(split) 

220 if num_components == 2: 

221 # There is a profile@ portion of the URL, so take the second half. 

222 bucket = split[1] 

223 elif num_components == 1: 

224 # There is no profile@, so take the whole netloc. 

225 bucket = split[0] 

226 else: 

227 raise ValueError(f"Unexpected extra '@' in S3 URI: '{str(self)}'") 

228 

229 if not bucket: 

230 raise ValueError(f"S3 URI does not include bucket name: '{str(self)}'") 

231 

232 return bucket 

233 

234 @classmethod 

235 def _mexists( 

236 cls, uris: Iterable[ResourcePath], *, num_workers: int | None = None 

237 ) -> dict[ResourcePath, bool]: 

238 # Force client to be created for each profile before creating threads. 

239 profiles = set[str | None]() 

240 for path in uris: 

241 if path.scheme == "s3": 

242 path = cast(S3ResourcePath, path) 

243 profiles.add(path._profile) 

244 for profile in profiles: 

245 getS3Client(profile) 

246 

247 return super()._mexists(uris, num_workers=num_workers) 

248 

249 @classmethod 

250 def _mremove(cls, uris: Iterable[ResourcePath]) -> dict[ResourcePath, MBulkResult]: 

251 # Delete multiple objects in one API call. 

252 # Must group by profile and bucket. 

253 grouped_uris: dict[tuple[str | None, str], list[S3ResourcePath]] = defaultdict(list) 

254 for uri in uris: 

255 uri = cast(S3ResourcePath, uri) 

256 grouped_uris[uri._profile, uri._bucket].append(uri) 

257 

258 results: dict[ResourcePath, MBulkResult] = {} 

259 for related_uris in grouped_uris.values(): 

260 # API requires no more than 1000 per call. 

261 chunk_num = 0 

262 chunks: list[tuple[ResourcePath, ...]] = [] 

263 key_to_uri: dict[str, ResourcePath] = {} 

264 for chunk in chunk_iterable(related_uris, chunk_size=1_000): 

265 for uri in chunk: 

266 key = uri.relativeToPathRoot 

267 key_to_uri[key] = uri 

268 # Default to assuming everything worked. 

269 results[uri] = MBulkResult(True, None) 

270 chunk_num += 1 

271 chunks.append(chunk) 

272 

273 # Bulk remove. 

274 with time_this( 

275 log, 

276 msg="Bulk delete; %d chunk%s; totalling %d dataset%s", 

277 args=( 

278 len(chunks), 

279 "s" if len(chunks) != 1 else "", 

280 len(related_uris), 

281 "s" if len(related_uris) != 1 else "", 

282 ), 

283 ): 

284 errored = cls._mremove_select(chunks) 

285 

286 # Update with error information. 

287 results.update(errored) 

288 

289 return results 

290 

291 @classmethod 

292 def _mremove_select(cls, chunks: list[tuple[ResourcePath, ...]]) -> dict[ResourcePath, MBulkResult]: 

293 if len(chunks) == 1: 

294 # Do the removal directly without futures. 

295 return cls._delete_objects_wrapper(chunks[0]) 

296 pool_executor_class = _get_executor_class() 

297 if issubclass(pool_executor_class, concurrent.futures.ProcessPoolExecutor): 

298 # Patch the environment to make it think there is only one worker 

299 # for each subprocess. 

300 with _patch_environ({"LSST_RESOURCES_NUM_WORKERS": "1"}): 

301 return cls._mremove_with_pool(pool_executor_class, chunks) 

302 else: 

303 return cls._mremove_with_pool(pool_executor_class, chunks) 

304 

305 @classmethod 

306 def _mremove_with_pool( 

307 cls, 

308 pool_executor_class: _EXECUTOR_TYPE, 

309 chunks: list[tuple[ResourcePath, ...]], 

310 *, 

311 num_workers: int | None = None, 

312 ) -> dict[ResourcePath, MBulkResult]: 

313 # Different name because different API to base class. 

314 # No need to make more workers than we have chunks. 

315 max_workers = num_workers if num_workers is not None else min(len(chunks), _get_num_workers()) 

316 results: dict[ResourcePath, MBulkResult] = {} 

317 with pool_executor_class(max_workers=max_workers) as remove_executor: 

318 future_remove = { 

319 remove_executor.submit(cls._delete_objects_wrapper, chunk): i 

320 for i, chunk in enumerate(chunks) 

321 } 

322 for future in concurrent.futures.as_completed(future_remove): 

323 try: 

324 results.update(future.result()) 

325 except Exception as e: 

326 # The chunk utterly failed. 

327 chunk = chunks[future_remove[future]] 

328 for uri in chunk: 

329 results[uri] = MBulkResult(False, e) 

330 return results 

331 

332 @classmethod 

333 def _delete_objects_wrapper(cls, uris: tuple[ResourcePath, ...]) -> dict[ResourcePath, MBulkResult]: 

334 """Convert URIs to keys and call low-level API.""" 

335 if not uris: 

336 return {} 

337 keys: list[dict[str, str]] = [] 

338 key_to_uri: dict[str, ResourcePath] = {} 

339 for uri in uris: 

340 key = uri.relativeToPathRoot 

341 key_to_uri[key] = uri 

342 keys.append({"Key": key}) 

343 

344 first_uri = cast(S3ResourcePath, uris[0]) 

345 results = cls._delete_related_objects(first_uri.client, first_uri._bucket, keys) 

346 

347 # Remap error object keys to uris. 

348 return {key_to_uri[key]: result for key, result in results.items()} 

349 

350 @classmethod 

351 @backoff.on_exception(backoff.expo, retryable_io_errors, max_time=max_retry_time) 

352 def _delete_related_objects( 

353 cls, client: boto3.client, bucket: str, keys: list[dict[str, str]] 

354 ) -> dict[str, MBulkResult]: 

355 # Delete multiple objects from the same bucket, allowing for backoff 

356 # retry. 

357 response = client.delete_objects(Bucket=bucket, Delete={"Objects": keys, "Quiet": True}) 

358 # Use Quiet mode so we assume everything worked unless told otherwise. 

359 # Only returning errors -- indexed by Key name. 

360 errors: dict[str, MBulkResult] = {} 

361 for errored_key in response.get("Errors", []): 

362 errors[errored_key["Key"]] = MBulkResult( 

363 False, ClientError({"Error": errored_key}, f"delete_objects: {errored_key['Key']}") 

364 ) 

365 return errors 

366 

367 @backoff.on_exception(backoff.expo, retryable_io_errors, max_time=max_retry_time) 

368 def exists(self) -> bool: 

369 """Check that the S3 resource exists.""" 

370 if self.is_root: 

371 # Only check for the bucket since the path is irrelevant 

372 return bucketExists(self._bucket, self.client) 

373 exists, _ = s3CheckFileExists(self, bucket=self._bucket, client=self.client) 

374 return exists 

375 

376 @backoff.on_exception(backoff.expo, retryable_io_errors, max_time=max_retry_time) 

377 def size(self) -> int: 

378 """Return the size of the resource in bytes.""" 

379 if self.dirLike: 

380 return 0 

381 exists, sz = s3CheckFileExists(self, bucket=self._bucket, client=self.client) 

382 if not exists: 

383 raise FileNotFoundError(f"Resource {self} does not exist") 

384 return sz 

385 

386 @backoff.on_exception(backoff.expo, retryable_io_errors, max_time=max_retry_time) 

387 def get_info(self) -> ResourceInfo: 

388 """Return lightweight metadata about this S3 resource.""" 

389 if self.is_root: 

390 if not bucketExists(self._bucket, self.client): 

391 raise FileNotFoundError(f"Resource {self} does not exist") 

392 return ResourceInfo( 

393 uri=str(self), 

394 is_file=False, 

395 size=0, 

396 last_modified=None, 

397 checksums={}, 

398 ) 

399 

400 try: 

401 response = self.client.head_object( 

402 Bucket=self._bucket, 

403 Key=self.relativeToPathRoot, 

404 ChecksumMode="ENABLED", 

405 ) 

406 except (self.client.exceptions.NoSuchKey, self.client.exceptions.NoSuchBucket) as err: 

407 raise FileNotFoundError(f"No such resource: {self}") from err 

408 except ClientError as err: 

409 translate_client_error(err, self) 

410 raise 

411 

412 checksums = {} 

413 for response_key, checksum_name in ( 

414 ("ChecksumCRC32", "crc32"), 

415 ("ChecksumCRC32C", "crc32c"), 

416 ("ChecksumCRC64NVME", "crc64nvme"), 

417 ("ChecksumSHA1", "sha1"), 

418 ("ChecksumSHA256", "sha256"), 

419 ): 

420 if value := response.get(response_key): 

421 checksums[checksum_name] = value 

422 

423 last_modified = response.get("LastModified") 

424 if last_modified is not None: 

425 if getattr(last_modified, "tzinfo", None) is None: 

426 last_modified = last_modified.replace(tzinfo=datetime.UTC) 

427 else: 

428 last_modified = last_modified.astimezone(datetime.UTC) 

429 

430 # For ResourcePath usage a dirLike object with zero size is a directory 

431 # but in the general case anyone can create an object with a trailing 

432 # `/` and treat it as a file. For self-consistency with ResourcePath 

433 # call it a file if it has size > 0 even if dirLike. 

434 size = response["ContentLength"] 

435 is_file = (self.dirLike is not True) or (size > 0) 

436 

437 return ResourceInfo( 

438 uri=str(self), 

439 is_file=is_file, 

440 size=size, 

441 last_modified=last_modified, 

442 checksums=checksums, 

443 ) 

444 

445 @backoff.on_exception(backoff.expo, retryable_io_errors, max_time=max_retry_time) 

446 def remove(self) -> None: 

447 """Remove the resource.""" 

448 # https://github.com/boto/boto3/issues/507 - there is no 

449 # way of knowing if the file was actually deleted except 

450 # for checking all the keys again, reponse is HTTP 204 OK 

451 # response all the time 

452 try: 

453 self.client.delete_object(Bucket=self._bucket, Key=self.relativeToPathRoot) 

454 except (self.client.exceptions.NoSuchKey, self.client.exceptions.NoSuchBucket) as err: 

455 raise FileNotFoundError("No such resource: {self}") from err 

456 

457 @backoff.on_exception(backoff.expo, all_retryable_errors, max_time=max_retry_time) 

458 def read(self, size: int = -1) -> bytes: 

459 args = {} 

460 if size > 0: 

461 args["Range"] = f"bytes=0-{size - 1}" 

462 try: 

463 response = self.client.get_object(Bucket=self._bucket, Key=self.relativeToPathRoot, **args) 

464 except (self.client.exceptions.NoSuchKey, self.client.exceptions.NoSuchBucket) as err: 

465 raise FileNotFoundError(f"No such resource: {self}") from err 

466 except ClientError as err: 

467 translate_client_error(err, self) 

468 raise 

469 with time_this(log, msg="Read from %s", args=(self,)): 

470 body = response["Body"].read() 

471 response["Body"].close() 

472 return body 

473 

474 @backoff.on_exception(backoff.expo, all_retryable_errors, max_time=max_retry_time) 

475 def write(self, data: bytes, overwrite: bool = True) -> None: 

476 if not overwrite and self.exists(): 

477 raise FileExistsError(f"Remote resource {self} exists and overwrite has been disabled") 

478 with time_this(log, msg="Write to %s", args=(self,)): 

479 self.client.put_object(Bucket=self._bucket, Key=self.relativeToPathRoot, Body=data) 

480 

481 @backoff.on_exception(backoff.expo, all_retryable_errors, max_time=max_retry_time) 

482 def mkdir(self) -> None: 

483 """Write a directory key to S3.""" 

484 if not bucketExists(self._bucket, self.client): 

485 raise ValueError(f"Bucket {self._bucket} does not exist for {self}!") 

486 

487 if not self.dirLike: 

488 raise NotADirectoryError(f"Can not create a 'directory' for file-like URI {self}") 

489 

490 # don't create S3 key when root is at the top-level of an Bucket 

491 if self.path != "/": 

492 self.client.put_object(Bucket=self._bucket, Key=self.relativeToPathRoot) 

493 

494 @backoff.on_exception(backoff.expo, all_retryable_errors, max_time=max_retry_time) 

495 def _download_file( 

496 self, local_file: IO | ResourceHandleProtocol, progress: ProgressPercentage | None 

497 ) -> None: 

498 """Download the remote resource to a local file. 

499 

500 Helper routine for _as_local to allow backoff without regenerating 

501 the temporary file. 

502 """ 

503 try: 

504 self.client.download_fileobj( 

505 self._bucket, 

506 self.relativeToPathRoot, 

507 local_file, 

508 Callback=progress, 

509 Config=self._transfer_config, 

510 ) 

511 except ( 

512 self.client.exceptions.NoSuchKey, 

513 self.client.exceptions.NoSuchBucket, 

514 ) as err: 

515 raise FileNotFoundError(f"No such resource: {self}") from err 

516 except ClientError as err: 

517 translate_client_error(err, self) 

518 raise 

519 

520 def to_fsspec(self) -> tuple[AbstractFileSystem, str]: 

521 """Return an abstract file system and path that can be used by fsspec. 

522 

523 Returns 

524 ------- 

525 fs : `fsspec.spec.AbstractFileSystem` 

526 A file system object suitable for use with the returned path. 

527 path : `str` 

528 A path that can be opened by the file system object. 

529 """ 

530 if s3fs is None: 

531 raise ImportError("s3fs is not available") 

532 # Must remove the profile from the URL and form it again. 

533 endpoint_config = _get_s3_connection_parameters(self._profile) 

534 s3 = s3fs.S3FileSystem( 

535 profile=endpoint_config.profile, 

536 endpoint_url=endpoint_config.endpoint_url, 

537 key=endpoint_config.access_key_id, 

538 secret=endpoint_config.secret_access_key, 

539 ) 

540 if not _s3_should_validate_bucket(): 

541 # Accessing the s3 property forces the boto client to be 

542 # constructed and cached and allows the validation to be removed. 

543 _s3_disable_bucket_validation(s3.s3) 

544 

545 return s3, f"{self._bucket}/{self.relativeToPathRoot}" 

546 

547 @contextlib.contextmanager 

548 def _as_local( 

549 self, multithreaded: bool = True, tmpdir: ResourcePath | None = None 

550 ) -> Iterator[ResourcePath]: 

551 """Download object from S3 and place in temporary directory. 

552 

553 Parameters 

554 ---------- 

555 multithreaded : `bool`, optional 

556 If `True` the transfer will be allowed to attempt to improve 

557 throughput by using parallel download streams. This may of no 

558 effect if the URI scheme does not support parallel streams or 

559 if a global override has been applied. If `False` parallel 

560 streams will be disabled. 

561 tmpdir : `ResourcePath` or `None`, optional 

562 Explicit override of the temporary directory to use for remote 

563 downloads. 

564 

565 Returns 

566 ------- 

567 local_uri : `ResourcePath` 

568 A URI to a local POSIX file corresponding to a local temporary 

569 downloaded copy of the resource. 

570 """ 

571 with ( 

572 ResourcePath.temporary_uri(prefix=tmpdir, suffix=self.getExtension(), delete=True) as tmp_uri, 

573 self._use_threads_temp_override(multithreaded), 

574 time_this(log, msg="Downloading %s to local file", args=(self,)), 

575 ): 

576 progress = ( 

577 ProgressPercentage(self, msg="Downloading:") 

578 if log.isEnabledFor(ProgressPercentage.log_level) 

579 else None 

580 ) 

581 with tmp_uri.open("wb") as tmpFile: 

582 self._download_file(tmpFile, progress) 

583 yield tmp_uri 

584 

585 @backoff.on_exception(backoff.expo, all_retryable_errors, max_time=max_retry_time) 

586 def _upload_file(self, local_file: ResourcePath, progress: ProgressPercentage | None) -> None: 

587 """Upload a local file with backoff. 

588 

589 Helper method to wrap file uploading in backoff for transfer_from. 

590 """ 

591 try: 

592 self.client.upload_file( 

593 local_file.ospath, 

594 self._bucket, 

595 self.relativeToPathRoot, 

596 Callback=progress, 

597 Config=self._transfer_config, 

598 ) 

599 except self.client.exceptions.NoSuchBucket as err: 

600 raise NotADirectoryError(f"Target does not exist: {err}") from err 

601 except ClientError as err: 

602 translate_client_error(err, self) 

603 raise 

604 

605 @backoff.on_exception(backoff.expo, all_retryable_errors, max_time=max_retry_time) 

606 def _copy_from(self, src: S3ResourcePath) -> None: 

607 copy_source = { 

608 "Bucket": src._bucket, 

609 "Key": src.relativeToPathRoot, 

610 } 

611 try: 

612 self.client.copy_object(CopySource=copy_source, Bucket=self._bucket, Key=self.relativeToPathRoot) 

613 except (self.client.exceptions.NoSuchKey, self.client.exceptions.NoSuchBucket) as err: 

614 raise FileNotFoundError(f"No such resource to transfer: {src} -> {self}") from err 

615 except ClientError as err: 

616 translate_client_error(err, self) 

617 raise 

618 

619 def transfer_from( 

620 self, 

621 src: ResourcePath, 

622 transfer: str = "copy", 

623 overwrite: bool = False, 

624 transaction: TransactionProtocol | None = None, 

625 multithreaded: bool = True, 

626 ) -> None: 

627 """Transfer the current resource to an S3 bucket. 

628 

629 Parameters 

630 ---------- 

631 src : `ResourcePath` 

632 Source URI. 

633 transfer : `str` 

634 Mode to use for transferring the resource. Supports the following 

635 options: copy. 

636 overwrite : `bool`, optional 

637 Allow an existing file to be overwritten. Defaults to `False`. 

638 transaction : `~lsst.resources.utils.TransactionProtocol`, optional 

639 Currently unused. 

640 multithreaded : `bool`, optional 

641 If `True` the transfer will be allowed to attempt to improve 

642 throughput by using parallel download streams. This may of no 

643 effect if the URI scheme does not support parallel streams or 

644 if a global override has been applied. If `False` parallel 

645 streams will be disabled. 

646 """ 

647 # Fail early to prevent delays if remote resources are requested 

648 if transfer not in self.transferModes: 

649 raise ValueError(f"Transfer mode '{transfer}' not supported by URI scheme {self.scheme}") 

650 

651 # Existence checks cost time so do not call this unless we know 

652 # that debugging is enabled. 

653 if log.isEnabledFor(logging.DEBUG): 

654 log.debug( 

655 "Transferring %s [exists: %s] -> %s [exists: %s] (transfer=%s)", 

656 src, 

657 src.exists(), 

658 self, 

659 self.exists(), 

660 transfer, 

661 ) 

662 

663 # Short circuit if the URIs are identical immediately. 

664 if self == src: 

665 log.debug( 

666 "Target and destination URIs are identical: %s, returning immediately." 

667 " No further action required.", 

668 self, 

669 ) 

670 return 

671 

672 if not overwrite and self.exists(): 

673 raise FileExistsError(f"Destination path '{self}' already exists.") 

674 

675 if transfer == "auto": 

676 transfer = self.transferDefault 

677 

678 timer_msg = "Transfer from %s to %s" 

679 timer_args = (src, self) 

680 

681 if isinstance(src, type(self)) and self.client == src.client: 

682 # Looks like an S3 remote uri so we can use direct copy. 

683 # This only works if the source and destination are using the same 

684 # S3 endpoint and profile. 

685 with time_this(log, msg=timer_msg, args=timer_args): 

686 self._copy_from(src) 

687 

688 else: 

689 # Use local file and upload it 

690 with src.as_local(multithreaded=multithreaded) as local_uri: 

691 progress = ( 

692 ProgressPercentage(local_uri, file_for_msg=src, msg="Uploading:") 

693 if log.isEnabledFor(ProgressPercentage.log_level) 

694 else None 

695 ) 

696 with ( 

697 time_this(log, msg=timer_msg, args=timer_args), 

698 self._use_threads_temp_override(multithreaded), 

699 ): 

700 self._upload_file(local_uri, progress) 

701 

702 # This was an explicit move requested from a remote resource 

703 # try to remove that resource 

704 if transfer == "move": 

705 # Transactions do not work here 

706 src.remove() 

707 

708 @backoff.on_exception(backoff.expo, all_retryable_errors, max_time=max_retry_time) 

709 def walk( 

710 self, file_filter: str | re.Pattern | None = None 

711 ) -> Iterator[list | tuple[ResourcePath, list[str], list[str]]]: 

712 """Walk the directory tree returning matching files and directories. 

713 

714 Parameters 

715 ---------- 

716 file_filter : `str` or `re.Pattern`, optional 

717 Regex to filter out files from the list before it is returned. 

718 

719 Yields 

720 ------ 

721 dirpath : `ResourcePath` 

722 Current directory being examined. 

723 dirnames : `list` of `str` 

724 Names of subdirectories within dirpath. 

725 filenames : `list` of `str` 

726 Names of all the files within dirpath. 

727 """ 

728 # We pretend that S3 uses directories and files and not simply keys 

729 if not (self.isdir() or self.is_root): 

730 raise ValueError(f"Can not walk a non-directory URI: {self}") 

731 

732 if isinstance(file_filter, str): 

733 file_filter = re.compile(file_filter) 

734 

735 s3_paginator = self.client.get_paginator("list_objects_v2") 

736 

737 # Limit each query to a single "directory" to match os.walk 

738 # We could download all keys at once with no delimiter and work 

739 # it out locally but this could potentially lead to large memory 

740 # usage for millions of keys. It will also make the initial call 

741 # to this method potentially very slow. If making this method look 

742 # like os.walk was not required, we could query all keys with 

743 # pagination and return them in groups of 1000, but that would 

744 # be a different interface since we can't guarantee we would get 

745 # them all grouped properly across the 1000 limit boundary. 

746 prefix = self.relativeToPathRoot if not self.is_root else "" 

747 prefix_len = len(prefix) 

748 dirnames = [] 

749 filenames = [] 

750 files_there = False 

751 

752 for page in s3_paginator.paginate(Bucket=self._bucket, Prefix=prefix, Delimiter="/"): 

753 # All results are returned as full key names and we must 

754 # convert them back to the root form. The prefix is fixed 

755 # and delimited so that is a simple trim 

756 

757 # Directories are reported in the CommonPrefixes result 

758 # which reports the entire key and must be stripped. 

759 found_dirs = [dir["Prefix"][prefix_len:] for dir in page.get("CommonPrefixes", ())] 

760 dirnames.extend(found_dirs) 

761 

762 found_files = [file["Key"][prefix_len:] for file in page.get("Contents", ())] 

763 if found_files: 

764 files_there = True 

765 if file_filter is not None: 

766 found_files = [f for f in found_files if file_filter.search(f)] 

767 

768 filenames.extend(found_files) 

769 

770 # Directories do not exist so we can't test for them. If no files 

771 # or directories were found though, this means that it effectively 

772 # does not exist and we should match os.walk() behavior and return 

773 # immediately. 

774 if not dirnames and not files_there: 

775 return 

776 else: 

777 yield self, dirnames, filenames 

778 

779 for dir in dirnames: 

780 new_uri = self.join(dir) 

781 yield from new_uri.walk(file_filter) 

782 

783 @contextlib.contextmanager 

784 def _openImpl( 

785 self, 

786 mode: str = "r", 

787 *, 

788 encoding: str | None = None, 

789 ) -> Iterator[ResourceHandleProtocol]: 

790 with S3ResourceHandle(mode, log, self) as handle: 

791 if "b" in mode: 

792 yield handle 

793 else: 

794 if encoding is None: 

795 encoding = sys.getdefaultencoding() 

796 # cast because the protocol is compatible, but does not have 

797 # BytesIO in the inheritance tree 

798 with io.TextIOWrapper(cast(io.BytesIO, handle), encoding=encoding, write_through=True) as sub: 

799 yield sub 

800 

801 def generate_presigned_get_url(self, *, expiration_time_seconds: int) -> str: 

802 # Docstring inherited 

803 return self._generate_presigned_url("get_object", expiration_time_seconds) 

804 

805 def generate_presigned_put_url(self, *, expiration_time_seconds: int) -> str: 

806 # Docstring inherited 

807 return self._generate_presigned_url("put_object", expiration_time_seconds) 

808 

809 def _generate_presigned_url(self, method: str, expiration_time_seconds: int) -> str: 

810 url = self.client.generate_presigned_url( 

811 method, 

812 Params={"Bucket": self._bucket, "Key": self.relativeToPathRoot}, 

813 ExpiresIn=expiration_time_seconds, 

814 ) 

815 if self.fragment: 

816 resource = ResourcePath(url) 

817 url = str(resource.replace(fragment=self.fragment)) 

818 return url