Coverage for python / lsst / resources / s3.py: 20%
365 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 08:44 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 08:44 +0000
1# This file is part of lsst-resources.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14__all__ = ("S3ResourcePath",)
16import concurrent.futures
17import contextlib
18import datetime
19import io
20import logging
21import os
22import re
23import sys
24import threading
25from collections import defaultdict
26from collections.abc import Iterable, Iterator
27from functools import cache, cached_property
28from typing import IO, TYPE_CHECKING, cast
30from botocore.exceptions import ClientError
32from lsst.utils.iteration import chunk_iterable
33from lsst.utils.timer import time_this
35from ._resourceHandles._baseResourceHandle import ResourceHandleProtocol
36from ._resourceHandles._s3ResourceHandle import S3ResourceHandle
37from ._resourcePath import (
38 _EXECUTOR_TYPE,
39 MBulkResult,
40 ResourceInfo,
41 ResourcePath,
42 _get_executor_class,
43 _patch_environ,
44)
45from .s3utils import (
46 _get_s3_connection_parameters,
47 _s3_disable_bucket_validation,
48 _s3_should_validate_bucket,
49 all_retryable_errors,
50 backoff,
51 bucketExists,
52 getS3Client,
53 max_retry_time,
54 retryable_io_errors,
55 s3CheckFileExists,
56 translate_client_error,
57)
58from .utils import _get_num_workers
60try:
61 from boto3.s3.transfer import TransferConfig # type: ignore
62except ImportError:
63 TransferConfig = None
65try:
66 import s3fs
67 from fsspec.spec import AbstractFileSystem
68except ImportError:
69 s3fs = None
70 AbstractFileSystem = type
72if TYPE_CHECKING:
73 with contextlib.suppress(ImportError):
74 import boto3
76 from .utils import TransactionProtocol
79log = logging.getLogger(__name__)
82class ProgressPercentage:
83 """Progress bar for S3 file uploads.
85 Parameters
86 ----------
87 file : `ResourcePath`
88 Resource that is relevant to the progress percentage. The size of this
89 resource will be used to determine progress. The name will be used
90 in the log messages unless overridden by ``file_for_msg``.
91 file_for_msg : `ResourcePath` or `None`, optional
92 Resource name to include in log messages in preference to ``file``.
93 msg : `str`, optional
94 Message text to be included in every progress log message.
95 """
97 log_level = logging.DEBUG
98 """Default log level to use when issuing a message."""
100 def __init__(self, file: ResourcePath, file_for_msg: ResourcePath | None = None, msg: str = ""):
101 self._filename = file
102 self._file_for_msg = str(file_for_msg) if file_for_msg is not None else str(file)
103 self._size = file.size()
104 self._seen_so_far = 0
105 self._lock = threading.Lock()
106 self._msg = msg
108 def __call__(self, bytes_amount: int) -> None:
109 # To simplify, assume this is hooked up to a single filename
110 with self._lock:
111 self._seen_so_far += bytes_amount
112 percentage = (100 * self._seen_so_far) // self._size
113 log.log(
114 self.log_level,
115 "%s %s %s / %s (%s%%)",
116 self._msg,
117 self._file_for_msg,
118 self._seen_so_far,
119 self._size,
120 percentage,
121 )
124@cache
125def _parse_string_to_maybe_bool(maybe_bool_str: str) -> bool | None:
126 """Map a string to either a boolean value or None.
128 Parameters
129 ----------
130 maybe_bool_str : `str`
131 The value to parse
133 Results
134 -------
135 maybe_bool : `bool` or `None`
136 The parsed value.
137 """
138 if maybe_bool_str.lower() in ["t", "true", "yes", "y", "1"]:
139 maybe_bool = True
140 elif maybe_bool_str.lower() in ["f", "false", "no", "n", "0"]:
141 maybe_bool = False
142 elif maybe_bool_str.lower() in ["none", ""]:
143 maybe_bool = None
144 else:
145 raise ValueError(f'Value of "{maybe_bool_str}" is not True, False, or None.')
147 return maybe_bool
150class S3ResourcePath(ResourcePath):
151 """S3 URI resource path implementation class.
153 Notes
154 -----
155 In order to configure the behavior of instances of this class, the
156 environment variable is inspected:
158 - LSST_S3_USE_THREADS: May be True, False, or None. Sets whether threading
159 is used for downloads, with a value of None defaulting to boto's default
160 value. Users may wish to set it to False when the downloads will be started
161 within threads other than python's main thread.
162 """
164 use_threads: bool | None = None
165 """Explicitly turn on or off threading in use of boto's download_fileobj.
166 Setting this to None results in boto's default behavior."""
168 @cached_property
169 def _environ_use_threads(self) -> bool | None:
170 try:
171 use_threads_str = os.environ["LSST_S3_USE_THREADS"]
172 except KeyError:
173 use_threads_str = "None"
175 use_threads = _parse_string_to_maybe_bool(use_threads_str)
177 return use_threads
179 @contextlib.contextmanager
180 def _use_threads_temp_override(self, multithreaded: bool) -> Iterator:
181 """Temporarily override the value of use_threads."""
182 original = self.use_threads
183 self.use_threads = multithreaded
184 yield
185 self.use_threads = original
187 @property
188 def _transfer_config(self) -> TransferConfig:
189 if self.use_threads is None:
190 self.use_threads = self._environ_use_threads
192 if self.use_threads is None:
193 transfer_config = TransferConfig()
194 else:
195 transfer_config = TransferConfig(use_threads=self.use_threads)
197 return transfer_config
199 @property
200 def client(self) -> boto3.client:
201 """Client object to address remote resource."""
202 return getS3Client(self._profile)
204 @property
205 def _profile(self) -> str | None:
206 """Profile name to use for looking up S3 credentials and endpoint."""
207 return self._uri.username
209 @property
210 def _bucket(self) -> str:
211 """S3 bucket where the files are stored."""
212 # Notionally the bucket is stored in the 'hostname' part of the URI.
213 # However, Ceph S3 uses a "multi-tenant" syntax for bucket names in the
214 # form 'tenant:bucket'. The part after the colon is parsed as the port
215 # portion of the URI, and urllib throws an exception if you try to read
216 # a non-integer port value. So manually split off this portion of the
217 # URI.
218 split = self._uri.netloc.split("@")
219 num_components = len(split)
220 if num_components == 2:
221 # There is a profile@ portion of the URL, so take the second half.
222 bucket = split[1]
223 elif num_components == 1:
224 # There is no profile@, so take the whole netloc.
225 bucket = split[0]
226 else:
227 raise ValueError(f"Unexpected extra '@' in S3 URI: '{str(self)}'")
229 if not bucket:
230 raise ValueError(f"S3 URI does not include bucket name: '{str(self)}'")
232 return bucket
234 @classmethod
235 def _mexists(
236 cls, uris: Iterable[ResourcePath], *, num_workers: int | None = None
237 ) -> dict[ResourcePath, bool]:
238 # Force client to be created for each profile before creating threads.
239 profiles = set[str | None]()
240 for path in uris:
241 if path.scheme == "s3":
242 path = cast(S3ResourcePath, path)
243 profiles.add(path._profile)
244 for profile in profiles:
245 getS3Client(profile)
247 return super()._mexists(uris, num_workers=num_workers)
249 @classmethod
250 def _mremove(cls, uris: Iterable[ResourcePath]) -> dict[ResourcePath, MBulkResult]:
251 # Delete multiple objects in one API call.
252 # Must group by profile and bucket.
253 grouped_uris: dict[tuple[str | None, str], list[S3ResourcePath]] = defaultdict(list)
254 for uri in uris:
255 uri = cast(S3ResourcePath, uri)
256 grouped_uris[uri._profile, uri._bucket].append(uri)
258 results: dict[ResourcePath, MBulkResult] = {}
259 for related_uris in grouped_uris.values():
260 # API requires no more than 1000 per call.
261 chunk_num = 0
262 chunks: list[tuple[ResourcePath, ...]] = []
263 key_to_uri: dict[str, ResourcePath] = {}
264 for chunk in chunk_iterable(related_uris, chunk_size=1_000):
265 for uri in chunk:
266 key = uri.relativeToPathRoot
267 key_to_uri[key] = uri
268 # Default to assuming everything worked.
269 results[uri] = MBulkResult(True, None)
270 chunk_num += 1
271 chunks.append(chunk)
273 # Bulk remove.
274 with time_this(
275 log,
276 msg="Bulk delete; %d chunk%s; totalling %d dataset%s",
277 args=(
278 len(chunks),
279 "s" if len(chunks) != 1 else "",
280 len(related_uris),
281 "s" if len(related_uris) != 1 else "",
282 ),
283 ):
284 errored = cls._mremove_select(chunks)
286 # Update with error information.
287 results.update(errored)
289 return results
291 @classmethod
292 def _mremove_select(cls, chunks: list[tuple[ResourcePath, ...]]) -> dict[ResourcePath, MBulkResult]:
293 if len(chunks) == 1:
294 # Do the removal directly without futures.
295 return cls._delete_objects_wrapper(chunks[0])
296 pool_executor_class = _get_executor_class()
297 if issubclass(pool_executor_class, concurrent.futures.ProcessPoolExecutor):
298 # Patch the environment to make it think there is only one worker
299 # for each subprocess.
300 with _patch_environ({"LSST_RESOURCES_NUM_WORKERS": "1"}):
301 return cls._mremove_with_pool(pool_executor_class, chunks)
302 else:
303 return cls._mremove_with_pool(pool_executor_class, chunks)
305 @classmethod
306 def _mremove_with_pool(
307 cls,
308 pool_executor_class: _EXECUTOR_TYPE,
309 chunks: list[tuple[ResourcePath, ...]],
310 *,
311 num_workers: int | None = None,
312 ) -> dict[ResourcePath, MBulkResult]:
313 # Different name because different API to base class.
314 # No need to make more workers than we have chunks.
315 max_workers = num_workers if num_workers is not None else min(len(chunks), _get_num_workers())
316 results: dict[ResourcePath, MBulkResult] = {}
317 with pool_executor_class(max_workers=max_workers) as remove_executor:
318 future_remove = {
319 remove_executor.submit(cls._delete_objects_wrapper, chunk): i
320 for i, chunk in enumerate(chunks)
321 }
322 for future in concurrent.futures.as_completed(future_remove):
323 try:
324 results.update(future.result())
325 except Exception as e:
326 # The chunk utterly failed.
327 chunk = chunks[future_remove[future]]
328 for uri in chunk:
329 results[uri] = MBulkResult(False, e)
330 return results
332 @classmethod
333 def _delete_objects_wrapper(cls, uris: tuple[ResourcePath, ...]) -> dict[ResourcePath, MBulkResult]:
334 """Convert URIs to keys and call low-level API."""
335 if not uris:
336 return {}
337 keys: list[dict[str, str]] = []
338 key_to_uri: dict[str, ResourcePath] = {}
339 for uri in uris:
340 key = uri.relativeToPathRoot
341 key_to_uri[key] = uri
342 keys.append({"Key": key})
344 first_uri = cast(S3ResourcePath, uris[0])
345 results = cls._delete_related_objects(first_uri.client, first_uri._bucket, keys)
347 # Remap error object keys to uris.
348 return {key_to_uri[key]: result for key, result in results.items()}
350 @classmethod
351 @backoff.on_exception(backoff.expo, retryable_io_errors, max_time=max_retry_time)
352 def _delete_related_objects(
353 cls, client: boto3.client, bucket: str, keys: list[dict[str, str]]
354 ) -> dict[str, MBulkResult]:
355 # Delete multiple objects from the same bucket, allowing for backoff
356 # retry.
357 response = client.delete_objects(Bucket=bucket, Delete={"Objects": keys, "Quiet": True})
358 # Use Quiet mode so we assume everything worked unless told otherwise.
359 # Only returning errors -- indexed by Key name.
360 errors: dict[str, MBulkResult] = {}
361 for errored_key in response.get("Errors", []):
362 errors[errored_key["Key"]] = MBulkResult(
363 False, ClientError({"Error": errored_key}, f"delete_objects: {errored_key['Key']}")
364 )
365 return errors
367 @backoff.on_exception(backoff.expo, retryable_io_errors, max_time=max_retry_time)
368 def exists(self) -> bool:
369 """Check that the S3 resource exists."""
370 if self.is_root:
371 # Only check for the bucket since the path is irrelevant
372 return bucketExists(self._bucket, self.client)
373 exists, _ = s3CheckFileExists(self, bucket=self._bucket, client=self.client)
374 return exists
376 @backoff.on_exception(backoff.expo, retryable_io_errors, max_time=max_retry_time)
377 def size(self) -> int:
378 """Return the size of the resource in bytes."""
379 if self.dirLike:
380 return 0
381 exists, sz = s3CheckFileExists(self, bucket=self._bucket, client=self.client)
382 if not exists:
383 raise FileNotFoundError(f"Resource {self} does not exist")
384 return sz
386 @backoff.on_exception(backoff.expo, retryable_io_errors, max_time=max_retry_time)
387 def get_info(self) -> ResourceInfo:
388 """Return lightweight metadata about this S3 resource."""
389 if self.is_root:
390 if not bucketExists(self._bucket, self.client):
391 raise FileNotFoundError(f"Resource {self} does not exist")
392 return ResourceInfo(
393 uri=str(self),
394 is_file=False,
395 size=0,
396 last_modified=None,
397 checksums={},
398 )
400 try:
401 response = self.client.head_object(
402 Bucket=self._bucket,
403 Key=self.relativeToPathRoot,
404 ChecksumMode="ENABLED",
405 )
406 except (self.client.exceptions.NoSuchKey, self.client.exceptions.NoSuchBucket) as err:
407 raise FileNotFoundError(f"No such resource: {self}") from err
408 except ClientError as err:
409 translate_client_error(err, self)
410 raise
412 checksums = {}
413 for response_key, checksum_name in (
414 ("ChecksumCRC32", "crc32"),
415 ("ChecksumCRC32C", "crc32c"),
416 ("ChecksumCRC64NVME", "crc64nvme"),
417 ("ChecksumSHA1", "sha1"),
418 ("ChecksumSHA256", "sha256"),
419 ):
420 if value := response.get(response_key):
421 checksums[checksum_name] = value
423 last_modified = response.get("LastModified")
424 if last_modified is not None:
425 if getattr(last_modified, "tzinfo", None) is None:
426 last_modified = last_modified.replace(tzinfo=datetime.UTC)
427 else:
428 last_modified = last_modified.astimezone(datetime.UTC)
430 # For ResourcePath usage a dirLike object with zero size is a directory
431 # but in the general case anyone can create an object with a trailing
432 # `/` and treat it as a file. For self-consistency with ResourcePath
433 # call it a file if it has size > 0 even if dirLike.
434 size = response["ContentLength"]
435 is_file = (self.dirLike is not True) or (size > 0)
437 return ResourceInfo(
438 uri=str(self),
439 is_file=is_file,
440 size=size,
441 last_modified=last_modified,
442 checksums=checksums,
443 )
445 @backoff.on_exception(backoff.expo, retryable_io_errors, max_time=max_retry_time)
446 def remove(self) -> None:
447 """Remove the resource."""
448 # https://github.com/boto/boto3/issues/507 - there is no
449 # way of knowing if the file was actually deleted except
450 # for checking all the keys again, reponse is HTTP 204 OK
451 # response all the time
452 try:
453 self.client.delete_object(Bucket=self._bucket, Key=self.relativeToPathRoot)
454 except (self.client.exceptions.NoSuchKey, self.client.exceptions.NoSuchBucket) as err:
455 raise FileNotFoundError("No such resource: {self}") from err
457 @backoff.on_exception(backoff.expo, all_retryable_errors, max_time=max_retry_time)
458 def read(self, size: int = -1) -> bytes:
459 args = {}
460 if size > 0:
461 args["Range"] = f"bytes=0-{size - 1}"
462 try:
463 response = self.client.get_object(Bucket=self._bucket, Key=self.relativeToPathRoot, **args)
464 except (self.client.exceptions.NoSuchKey, self.client.exceptions.NoSuchBucket) as err:
465 raise FileNotFoundError(f"No such resource: {self}") from err
466 except ClientError as err:
467 translate_client_error(err, self)
468 raise
469 with time_this(log, msg="Read from %s", args=(self,)):
470 body = response["Body"].read()
471 response["Body"].close()
472 return body
474 @backoff.on_exception(backoff.expo, all_retryable_errors, max_time=max_retry_time)
475 def write(self, data: bytes, overwrite: bool = True) -> None:
476 if not overwrite and self.exists():
477 raise FileExistsError(f"Remote resource {self} exists and overwrite has been disabled")
478 with time_this(log, msg="Write to %s", args=(self,)):
479 self.client.put_object(Bucket=self._bucket, Key=self.relativeToPathRoot, Body=data)
481 @backoff.on_exception(backoff.expo, all_retryable_errors, max_time=max_retry_time)
482 def mkdir(self) -> None:
483 """Write a directory key to S3."""
484 if not bucketExists(self._bucket, self.client):
485 raise ValueError(f"Bucket {self._bucket} does not exist for {self}!")
487 if not self.dirLike:
488 raise NotADirectoryError(f"Can not create a 'directory' for file-like URI {self}")
490 # don't create S3 key when root is at the top-level of an Bucket
491 if self.path != "/":
492 self.client.put_object(Bucket=self._bucket, Key=self.relativeToPathRoot)
494 @backoff.on_exception(backoff.expo, all_retryable_errors, max_time=max_retry_time)
495 def _download_file(
496 self, local_file: IO | ResourceHandleProtocol, progress: ProgressPercentage | None
497 ) -> None:
498 """Download the remote resource to a local file.
500 Helper routine for _as_local to allow backoff without regenerating
501 the temporary file.
502 """
503 try:
504 self.client.download_fileobj(
505 self._bucket,
506 self.relativeToPathRoot,
507 local_file,
508 Callback=progress,
509 Config=self._transfer_config,
510 )
511 except (
512 self.client.exceptions.NoSuchKey,
513 self.client.exceptions.NoSuchBucket,
514 ) as err:
515 raise FileNotFoundError(f"No such resource: {self}") from err
516 except ClientError as err:
517 translate_client_error(err, self)
518 raise
520 def to_fsspec(self) -> tuple[AbstractFileSystem, str]:
521 """Return an abstract file system and path that can be used by fsspec.
523 Returns
524 -------
525 fs : `fsspec.spec.AbstractFileSystem`
526 A file system object suitable for use with the returned path.
527 path : `str`
528 A path that can be opened by the file system object.
529 """
530 if s3fs is None:
531 raise ImportError("s3fs is not available")
532 # Must remove the profile from the URL and form it again.
533 endpoint_config = _get_s3_connection_parameters(self._profile)
534 s3 = s3fs.S3FileSystem(
535 profile=endpoint_config.profile,
536 endpoint_url=endpoint_config.endpoint_url,
537 key=endpoint_config.access_key_id,
538 secret=endpoint_config.secret_access_key,
539 )
540 if not _s3_should_validate_bucket():
541 # Accessing the s3 property forces the boto client to be
542 # constructed and cached and allows the validation to be removed.
543 _s3_disable_bucket_validation(s3.s3)
545 return s3, f"{self._bucket}/{self.relativeToPathRoot}"
547 @contextlib.contextmanager
548 def _as_local(
549 self, multithreaded: bool = True, tmpdir: ResourcePath | None = None
550 ) -> Iterator[ResourcePath]:
551 """Download object from S3 and place in temporary directory.
553 Parameters
554 ----------
555 multithreaded : `bool`, optional
556 If `True` the transfer will be allowed to attempt to improve
557 throughput by using parallel download streams. This may of no
558 effect if the URI scheme does not support parallel streams or
559 if a global override has been applied. If `False` parallel
560 streams will be disabled.
561 tmpdir : `ResourcePath` or `None`, optional
562 Explicit override of the temporary directory to use for remote
563 downloads.
565 Returns
566 -------
567 local_uri : `ResourcePath`
568 A URI to a local POSIX file corresponding to a local temporary
569 downloaded copy of the resource.
570 """
571 with (
572 ResourcePath.temporary_uri(prefix=tmpdir, suffix=self.getExtension(), delete=True) as tmp_uri,
573 self._use_threads_temp_override(multithreaded),
574 time_this(log, msg="Downloading %s to local file", args=(self,)),
575 ):
576 progress = (
577 ProgressPercentage(self, msg="Downloading:")
578 if log.isEnabledFor(ProgressPercentage.log_level)
579 else None
580 )
581 with tmp_uri.open("wb") as tmpFile:
582 self._download_file(tmpFile, progress)
583 yield tmp_uri
585 @backoff.on_exception(backoff.expo, all_retryable_errors, max_time=max_retry_time)
586 def _upload_file(self, local_file: ResourcePath, progress: ProgressPercentage | None) -> None:
587 """Upload a local file with backoff.
589 Helper method to wrap file uploading in backoff for transfer_from.
590 """
591 try:
592 self.client.upload_file(
593 local_file.ospath,
594 self._bucket,
595 self.relativeToPathRoot,
596 Callback=progress,
597 Config=self._transfer_config,
598 )
599 except self.client.exceptions.NoSuchBucket as err:
600 raise NotADirectoryError(f"Target does not exist: {err}") from err
601 except ClientError as err:
602 translate_client_error(err, self)
603 raise
605 @backoff.on_exception(backoff.expo, all_retryable_errors, max_time=max_retry_time)
606 def _copy_from(self, src: S3ResourcePath) -> None:
607 copy_source = {
608 "Bucket": src._bucket,
609 "Key": src.relativeToPathRoot,
610 }
611 try:
612 self.client.copy_object(CopySource=copy_source, Bucket=self._bucket, Key=self.relativeToPathRoot)
613 except (self.client.exceptions.NoSuchKey, self.client.exceptions.NoSuchBucket) as err:
614 raise FileNotFoundError(f"No such resource to transfer: {src} -> {self}") from err
615 except ClientError as err:
616 translate_client_error(err, self)
617 raise
619 def transfer_from(
620 self,
621 src: ResourcePath,
622 transfer: str = "copy",
623 overwrite: bool = False,
624 transaction: TransactionProtocol | None = None,
625 multithreaded: bool = True,
626 ) -> None:
627 """Transfer the current resource to an S3 bucket.
629 Parameters
630 ----------
631 src : `ResourcePath`
632 Source URI.
633 transfer : `str`
634 Mode to use for transferring the resource. Supports the following
635 options: copy.
636 overwrite : `bool`, optional
637 Allow an existing file to be overwritten. Defaults to `False`.
638 transaction : `~lsst.resources.utils.TransactionProtocol`, optional
639 Currently unused.
640 multithreaded : `bool`, optional
641 If `True` the transfer will be allowed to attempt to improve
642 throughput by using parallel download streams. This may of no
643 effect if the URI scheme does not support parallel streams or
644 if a global override has been applied. If `False` parallel
645 streams will be disabled.
646 """
647 # Fail early to prevent delays if remote resources are requested
648 if transfer not in self.transferModes:
649 raise ValueError(f"Transfer mode '{transfer}' not supported by URI scheme {self.scheme}")
651 # Existence checks cost time so do not call this unless we know
652 # that debugging is enabled.
653 if log.isEnabledFor(logging.DEBUG):
654 log.debug(
655 "Transferring %s [exists: %s] -> %s [exists: %s] (transfer=%s)",
656 src,
657 src.exists(),
658 self,
659 self.exists(),
660 transfer,
661 )
663 # Short circuit if the URIs are identical immediately.
664 if self == src:
665 log.debug(
666 "Target and destination URIs are identical: %s, returning immediately."
667 " No further action required.",
668 self,
669 )
670 return
672 if not overwrite and self.exists():
673 raise FileExistsError(f"Destination path '{self}' already exists.")
675 if transfer == "auto":
676 transfer = self.transferDefault
678 timer_msg = "Transfer from %s to %s"
679 timer_args = (src, self)
681 if isinstance(src, type(self)) and self.client == src.client:
682 # Looks like an S3 remote uri so we can use direct copy.
683 # This only works if the source and destination are using the same
684 # S3 endpoint and profile.
685 with time_this(log, msg=timer_msg, args=timer_args):
686 self._copy_from(src)
688 else:
689 # Use local file and upload it
690 with src.as_local(multithreaded=multithreaded) as local_uri:
691 progress = (
692 ProgressPercentage(local_uri, file_for_msg=src, msg="Uploading:")
693 if log.isEnabledFor(ProgressPercentage.log_level)
694 else None
695 )
696 with (
697 time_this(log, msg=timer_msg, args=timer_args),
698 self._use_threads_temp_override(multithreaded),
699 ):
700 self._upload_file(local_uri, progress)
702 # This was an explicit move requested from a remote resource
703 # try to remove that resource
704 if transfer == "move":
705 # Transactions do not work here
706 src.remove()
708 @backoff.on_exception(backoff.expo, all_retryable_errors, max_time=max_retry_time)
709 def walk(
710 self, file_filter: str | re.Pattern | None = None
711 ) -> Iterator[list | tuple[ResourcePath, list[str], list[str]]]:
712 """Walk the directory tree returning matching files and directories.
714 Parameters
715 ----------
716 file_filter : `str` or `re.Pattern`, optional
717 Regex to filter out files from the list before it is returned.
719 Yields
720 ------
721 dirpath : `ResourcePath`
722 Current directory being examined.
723 dirnames : `list` of `str`
724 Names of subdirectories within dirpath.
725 filenames : `list` of `str`
726 Names of all the files within dirpath.
727 """
728 # We pretend that S3 uses directories and files and not simply keys
729 if not (self.isdir() or self.is_root):
730 raise ValueError(f"Can not walk a non-directory URI: {self}")
732 if isinstance(file_filter, str):
733 file_filter = re.compile(file_filter)
735 s3_paginator = self.client.get_paginator("list_objects_v2")
737 # Limit each query to a single "directory" to match os.walk
738 # We could download all keys at once with no delimiter and work
739 # it out locally but this could potentially lead to large memory
740 # usage for millions of keys. It will also make the initial call
741 # to this method potentially very slow. If making this method look
742 # like os.walk was not required, we could query all keys with
743 # pagination and return them in groups of 1000, but that would
744 # be a different interface since we can't guarantee we would get
745 # them all grouped properly across the 1000 limit boundary.
746 prefix = self.relativeToPathRoot if not self.is_root else ""
747 prefix_len = len(prefix)
748 dirnames = []
749 filenames = []
750 files_there = False
752 for page in s3_paginator.paginate(Bucket=self._bucket, Prefix=prefix, Delimiter="/"):
753 # All results are returned as full key names and we must
754 # convert them back to the root form. The prefix is fixed
755 # and delimited so that is a simple trim
757 # Directories are reported in the CommonPrefixes result
758 # which reports the entire key and must be stripped.
759 found_dirs = [dir["Prefix"][prefix_len:] for dir in page.get("CommonPrefixes", ())]
760 dirnames.extend(found_dirs)
762 found_files = [file["Key"][prefix_len:] for file in page.get("Contents", ())]
763 if found_files:
764 files_there = True
765 if file_filter is not None:
766 found_files = [f for f in found_files if file_filter.search(f)]
768 filenames.extend(found_files)
770 # Directories do not exist so we can't test for them. If no files
771 # or directories were found though, this means that it effectively
772 # does not exist and we should match os.walk() behavior and return
773 # immediately.
774 if not dirnames and not files_there:
775 return
776 else:
777 yield self, dirnames, filenames
779 for dir in dirnames:
780 new_uri = self.join(dir)
781 yield from new_uri.walk(file_filter)
783 @contextlib.contextmanager
784 def _openImpl(
785 self,
786 mode: str = "r",
787 *,
788 encoding: str | None = None,
789 ) -> Iterator[ResourceHandleProtocol]:
790 with S3ResourceHandle(mode, log, self) as handle:
791 if "b" in mode:
792 yield handle
793 else:
794 if encoding is None:
795 encoding = sys.getdefaultencoding()
796 # cast because the protocol is compatible, but does not have
797 # BytesIO in the inheritance tree
798 with io.TextIOWrapper(cast(io.BytesIO, handle), encoding=encoding, write_through=True) as sub:
799 yield sub
801 def generate_presigned_get_url(self, *, expiration_time_seconds: int) -> str:
802 # Docstring inherited
803 return self._generate_presigned_url("get_object", expiration_time_seconds)
805 def generate_presigned_put_url(self, *, expiration_time_seconds: int) -> str:
806 # Docstring inherited
807 return self._generate_presigned_url("put_object", expiration_time_seconds)
809 def _generate_presigned_url(self, method: str, expiration_time_seconds: int) -> str:
810 url = self.client.generate_presigned_url(
811 method,
812 Params={"Bucket": self._bucket, "Key": self.relativeToPathRoot},
813 ExpiresIn=expiration_time_seconds,
814 )
815 if self.fragment:
816 resource = ResourcePath(url)
817 url = str(resource.replace(fragment=self.fragment))
818 return url