Coverage for python / lsst / resources / _resourceHandles / _s3ResourceHandle.py: 16%
160 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 08:44 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 08:44 +0000
1# This file is part of lsst-resources.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14__all__ = ("S3ResourceHandle",)
16import logging
17from collections.abc import Iterable, Mapping
18from io import SEEK_CUR, SEEK_END, SEEK_SET, BytesIO, UnsupportedOperation
19from typing import TYPE_CHECKING
21from botocore.exceptions import ClientError
23from lsst.utils.timer import time_this
25from ..s3utils import all_retryable_errors, backoff, max_retry_time, translate_client_error
26from ._baseResourceHandle import BaseResourceHandle, CloseStatus
28if TYPE_CHECKING:
29 from ..s3 import S3ResourcePath
32class S3ResourceHandle(BaseResourceHandle[bytes]):
33 """S3 specialization of `.BaseResourceHandle`.
35 Parameters
36 ----------
37 mode : `str`
38 Handle modes as described in the python `io` module.
39 log : `~logging.Logger`
40 Logger to used when writing messages.
41 uri : `lsst.resources.s3.S3ResourcePath`
42 The `~lsst.resources.ResourcePath` object corresponding to this handle.
43 newline : `str`
44 When doing multiline operations, break the stream on given character.
45 Defaults to newline.
47 Notes
48 -----
49 It is only possible to incrementally flush this object if each chunk that
50 is flushed is above 5MB in size. The flush command is ignored until the
51 internal buffer reaches this size, or until close is called, whichever
52 comes first.
54 Once an instance in write mode is flushed, it is not possible to seek back
55 to a position in the byte stream before the flush is executed.
57 When opening a resource in read write mode (r+ or w+) no flushing is
58 possible, and all data will be buffered until the resource is closed and
59 the buffered data will be written. Additionally the entire contents of the
60 resource will be loaded into memory upon opening.
62 Documentation on the methods of this class line should refer to the
63 corresponding methods in the `io` module.
65 S3 handles only support operations in binary mode. To get other modes of
66 reading and writing, wrap this handle inside an `io.TextIOWrapper` context
67 manager. An example of this can be found in `S3ResourcePath`.
68 """
70 def __init__(
71 self,
72 mode: str,
73 log: logging.Logger,
74 uri: S3ResourcePath,
75 newline: bytes = b"\n",
76 ):
77 super().__init__(mode, log, uri, newline=newline)
78 self._client = uri.client
79 self._bucket = uri._bucket
80 self._key = uri.relativeToPathRoot
81 self._buffer = BytesIO()
82 self._position = 0
83 self._writable = False
84 self._last_flush_position: int | None = None
85 self._warned = False
86 self._readable = bool({"r", "+"} & set(self._mode))
87 self._max_size: int | None = None
88 self._recursing = False
89 self._total_size = -1 # Unknown size.
90 if {"w", "a", "x", "+"} & set(self._mode):
91 self._writable = True
92 self._multiPartUpload = self._client.create_multipart_upload(Bucket=self._bucket, Key=self._key)
93 self._partNo = 1
94 self._parts: list[Mapping] = []
95 # Below is a workaround for append mode. It basically must read in
96 # everything that exists in the file so that it is in the buffer to
97 # append to, and subsequently written back out appropriately with
98 # any newly added data.
99 if {"a", "+"} & set(self._mode):
100 # Cheat a bit to get the existing data from the handle using
101 # object interfaces, because we know this is safe.
102 # Save the requested mode and readability.
103 mode_save = self._mode
104 read_save = self._readable
105 # Update each of these internal variables to ensure the handle
106 # is strictly readable.
107 self._readable = True
108 self._mode += "r"
109 self._mode = self._mode.replace("+", "")
110 # As mentioned, this reads the existing contents and writes it
111 # out into the internal buffer, no writes actually happen until
112 # the handle is flushed.
113 self.write(self.read())
114 # Restore the requested states.
115 self._mode = mode_save
116 self._readable = read_save
117 # Set the state of the stream if the specified mode is read
118 # and write.
119 if "+" in self._mode:
120 self.seek(0)
121 # If a file is w+ it is read write, but should be truncated
122 # for future writes.
123 if "w" in self._mode:
124 self.truncate()
126 def tell(self) -> int:
127 return self._position
129 @backoff.on_exception(backoff.expo, all_retryable_errors, max_time=max_retry_time)
130 def close(self) -> None:
131 if self.writable():
132 # decide if this is a multipart upload
133 if self._parts:
134 # indicate that the object is in closing status
135 self._closed = CloseStatus.CLOSING
136 self.flush()
137 with time_this(self._log, msg="Finalize multipart upload to %s", args=(self,)):
138 self._client.complete_multipart_upload(
139 Bucket=self._multiPartUpload["Bucket"],
140 Key=self._multiPartUpload["Key"],
141 UploadId=self._multiPartUpload["UploadId"],
142 MultipartUpload={"Parts": self._parts},
143 )
144 else:
145 # Put the complete object at once
146 with time_this(self._log, msg="Write to %s", args=(self,)):
147 self._client.put_object(Bucket=self._bucket, Key=self._key, Body=self._buffer.getvalue())
148 self._closed = CloseStatus.CLOSED
150 @property
151 def closed(self) -> bool:
152 return self._closed == CloseStatus.CLOSED
154 def fileno(self) -> int:
155 raise UnsupportedOperation("S3 object does not have a file number")
157 @backoff.on_exception(backoff.expo, all_retryable_errors, max_time=max_retry_time)
158 def flush(self) -> None:
159 # If the object is closed, not writeable, or rw flush should be skipped
160 # rw mode skips flush because the whole bytestream must be kept in
161 # the buffer for seeking reasons.
162 if self.closed or not self.writable() or "+" in self._mode:
163 return
164 # Disallow writes to seek to a position prior to the previous flush
165 # this allows multipart uploads to upload content as the stream is
166 # written to.
167 s3_min_bits = 5 * 1024 * 1024 # S3 flush threshold is 5 Mib.
168 if (
169 self.tell() - (self._last_flush_position or 0)
170 ) < s3_min_bits and self._closed != CloseStatus.CLOSING:
171 # Return until the buffer is big enough.
172 return
173 # nothing to write, don't create an empty upload
174 if self.tell() == 0:
175 return
176 with time_this(
177 self._log,
178 msg="Upload multipart %d to %s",
179 args=(
180 self._partNo,
181 self,
182 ),
183 ):
184 response = self._client.upload_part(
185 Body=self._buffer.getvalue(),
186 Bucket=self._bucket,
187 Key=self._key,
188 UploadId=self._multiPartUpload["UploadId"],
189 PartNumber=self._partNo,
190 )
191 self._parts.append({"PartNumber": self._partNo, "ETag": response["ETag"]})
192 self._partNo += 1
193 self._last_flush_position = self._buffer.tell() + (self._last_flush_position or 0)
194 self._buffer = BytesIO()
196 @property
197 def isatty(self) -> bool:
198 return False
200 def _size(self) -> int:
201 # To allow SEEK_END to work.
202 if self._total_size == -1:
203 self._total_size = self._uri.size()
204 return self._total_size
206 def readable(self) -> bool:
207 return self._readable
209 def readline(self, size: int = -1) -> bytes:
210 raise OSError("S3 Does not support line by line reads")
212 def readlines(self, hint: int = -1) -> Iterable[bytes]:
213 self.seek(0)
214 return self.read().split(self._newline)
216 def seek(self, offset: int, whence: int = SEEK_SET) -> int:
217 if self.writable():
218 if self._last_flush_position is not None:
219 if whence == SEEK_SET:
220 offset -= self._last_flush_position
221 if offset < 0:
222 raise OSError("S3 ResourceHandle can not seek prior to already flushed positions")
223 if whence == SEEK_CUR and (self.tell() - self._last_flush_position) < 0:
224 raise OSError("S3 ResourceHandle can not seek prior to already flushed positions")
225 if whence == SEEK_END:
226 raise OSError("S3 ResourceHandle can not seek referencing the end of the resource")
227 self._buffer.seek(offset, whence)
228 self._position = self._buffer.tell()
229 else:
230 if whence == SEEK_SET:
231 self._position = offset
232 elif whence == SEEK_CUR:
233 self._position += offset
234 elif whence == SEEK_END:
235 self._position = self._size() + offset
236 return self._position
238 def seekable(self) -> bool:
239 return True
241 def truncate(self, size: int | None = None) -> int:
242 if self.writable():
243 self._buffer.truncate(size)
244 return self._position
245 else:
246 raise OSError("S3 ResourceHandle is not writable")
248 def writable(self) -> bool:
249 return self._writable
251 def writelines(self, lines: Iterable[bytes]) -> None:
252 if self.writable():
253 self._buffer.writelines(lines)
254 self._position = self._buffer.tell()
255 else:
256 raise OSError("S3 ResourceHandle is not writable")
258 @backoff.on_exception(backoff.expo, all_retryable_errors, max_time=max_retry_time)
259 def read(self, size: int = -1) -> bytes:
260 if not self.readable():
261 raise OSError("S3 ResourceHandle is not readable")
262 # If the object is rw, then read from the internal io buffer
263 if "+" in self._mode:
264 self._buffer.seek(self._position)
265 return self._buffer.read(size)
266 # otherwise fetch the appropriate bytes from the remote resource
267 if self._max_size is not None and self._position >= self._max_size:
268 return b""
269 stop = f"{self._position + size - 1}" if size > 0 else ""
270 args = {"Range": f"bytes={self._position}-{stop}"}
271 try:
272 response = self._client.get_object(Bucket=self._bucket, Key=self._key, **args)
273 contents = response["Body"].read()
274 response["Body"].close()
275 self._position += len(contents)
276 return contents
277 except (self._client.exceptions.NoSuchKey, self._client.exceptions.NoSuchBucket) as err:
278 raise FileNotFoundError(f"No such resource: {self}") from err
279 except ClientError as exc:
280 if exc.response["ResponseMetadata"]["HTTPStatusCode"] == 416:
281 if self._recursing:
282 # This means the function has attempted to read the whole
283 # byte range and failed again, meaning the previous byte
284 # was the last byte
285 return b""
286 self._recursing = True
287 result = self.read()
288 self._max_size = self._position
289 self._recursing = False
290 return result
291 else:
292 translate_client_error(exc, self._uri)
293 raise
295 def write(self, b: bytes) -> int:
296 if self.writable():
297 result = self._buffer.write(b)
298 self._position = self._buffer.tell()
299 return result
300 else:
301 raise OSError("S3 ResourceHandle is not writable")