Coverage for python / lsst / resources / _resourceHandles / _s3ResourceHandle.py: 16%

160 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-17 08:44 +0000

1# This file is part of lsst-resources. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14__all__ = ("S3ResourceHandle",) 

15 

16import logging 

17from collections.abc import Iterable, Mapping 

18from io import SEEK_CUR, SEEK_END, SEEK_SET, BytesIO, UnsupportedOperation 

19from typing import TYPE_CHECKING 

20 

21from botocore.exceptions import ClientError 

22 

23from lsst.utils.timer import time_this 

24 

25from ..s3utils import all_retryable_errors, backoff, max_retry_time, translate_client_error 

26from ._baseResourceHandle import BaseResourceHandle, CloseStatus 

27 

28if TYPE_CHECKING: 

29 from ..s3 import S3ResourcePath 

30 

31 

32class S3ResourceHandle(BaseResourceHandle[bytes]): 

33 """S3 specialization of `.BaseResourceHandle`. 

34 

35 Parameters 

36 ---------- 

37 mode : `str` 

38 Handle modes as described in the python `io` module. 

39 log : `~logging.Logger` 

40 Logger to used when writing messages. 

41 uri : `lsst.resources.s3.S3ResourcePath` 

42 The `~lsst.resources.ResourcePath` object corresponding to this handle. 

43 newline : `str` 

44 When doing multiline operations, break the stream on given character. 

45 Defaults to newline. 

46 

47 Notes 

48 ----- 

49 It is only possible to incrementally flush this object if each chunk that 

50 is flushed is above 5MB in size. The flush command is ignored until the 

51 internal buffer reaches this size, or until close is called, whichever 

52 comes first. 

53 

54 Once an instance in write mode is flushed, it is not possible to seek back 

55 to a position in the byte stream before the flush is executed. 

56 

57 When opening a resource in read write mode (r+ or w+) no flushing is 

58 possible, and all data will be buffered until the resource is closed and 

59 the buffered data will be written. Additionally the entire contents of the 

60 resource will be loaded into memory upon opening. 

61 

62 Documentation on the methods of this class line should refer to the 

63 corresponding methods in the `io` module. 

64 

65 S3 handles only support operations in binary mode. To get other modes of 

66 reading and writing, wrap this handle inside an `io.TextIOWrapper` context 

67 manager. An example of this can be found in `S3ResourcePath`. 

68 """ 

69 

70 def __init__( 

71 self, 

72 mode: str, 

73 log: logging.Logger, 

74 uri: S3ResourcePath, 

75 newline: bytes = b"\n", 

76 ): 

77 super().__init__(mode, log, uri, newline=newline) 

78 self._client = uri.client 

79 self._bucket = uri._bucket 

80 self._key = uri.relativeToPathRoot 

81 self._buffer = BytesIO() 

82 self._position = 0 

83 self._writable = False 

84 self._last_flush_position: int | None = None 

85 self._warned = False 

86 self._readable = bool({"r", "+"} & set(self._mode)) 

87 self._max_size: int | None = None 

88 self._recursing = False 

89 self._total_size = -1 # Unknown size. 

90 if {"w", "a", "x", "+"} & set(self._mode): 

91 self._writable = True 

92 self._multiPartUpload = self._client.create_multipart_upload(Bucket=self._bucket, Key=self._key) 

93 self._partNo = 1 

94 self._parts: list[Mapping] = [] 

95 # Below is a workaround for append mode. It basically must read in 

96 # everything that exists in the file so that it is in the buffer to 

97 # append to, and subsequently written back out appropriately with 

98 # any newly added data. 

99 if {"a", "+"} & set(self._mode): 

100 # Cheat a bit to get the existing data from the handle using 

101 # object interfaces, because we know this is safe. 

102 # Save the requested mode and readability. 

103 mode_save = self._mode 

104 read_save = self._readable 

105 # Update each of these internal variables to ensure the handle 

106 # is strictly readable. 

107 self._readable = True 

108 self._mode += "r" 

109 self._mode = self._mode.replace("+", "") 

110 # As mentioned, this reads the existing contents and writes it 

111 # out into the internal buffer, no writes actually happen until 

112 # the handle is flushed. 

113 self.write(self.read()) 

114 # Restore the requested states. 

115 self._mode = mode_save 

116 self._readable = read_save 

117 # Set the state of the stream if the specified mode is read 

118 # and write. 

119 if "+" in self._mode: 

120 self.seek(0) 

121 # If a file is w+ it is read write, but should be truncated 

122 # for future writes. 

123 if "w" in self._mode: 

124 self.truncate() 

125 

126 def tell(self) -> int: 

127 return self._position 

128 

129 @backoff.on_exception(backoff.expo, all_retryable_errors, max_time=max_retry_time) 

130 def close(self) -> None: 

131 if self.writable(): 

132 # decide if this is a multipart upload 

133 if self._parts: 

134 # indicate that the object is in closing status 

135 self._closed = CloseStatus.CLOSING 

136 self.flush() 

137 with time_this(self._log, msg="Finalize multipart upload to %s", args=(self,)): 

138 self._client.complete_multipart_upload( 

139 Bucket=self._multiPartUpload["Bucket"], 

140 Key=self._multiPartUpload["Key"], 

141 UploadId=self._multiPartUpload["UploadId"], 

142 MultipartUpload={"Parts": self._parts}, 

143 ) 

144 else: 

145 # Put the complete object at once 

146 with time_this(self._log, msg="Write to %s", args=(self,)): 

147 self._client.put_object(Bucket=self._bucket, Key=self._key, Body=self._buffer.getvalue()) 

148 self._closed = CloseStatus.CLOSED 

149 

150 @property 

151 def closed(self) -> bool: 

152 return self._closed == CloseStatus.CLOSED 

153 

154 def fileno(self) -> int: 

155 raise UnsupportedOperation("S3 object does not have a file number") 

156 

157 @backoff.on_exception(backoff.expo, all_retryable_errors, max_time=max_retry_time) 

158 def flush(self) -> None: 

159 # If the object is closed, not writeable, or rw flush should be skipped 

160 # rw mode skips flush because the whole bytestream must be kept in 

161 # the buffer for seeking reasons. 

162 if self.closed or not self.writable() or "+" in self._mode: 

163 return 

164 # Disallow writes to seek to a position prior to the previous flush 

165 # this allows multipart uploads to upload content as the stream is 

166 # written to. 

167 s3_min_bits = 5 * 1024 * 1024 # S3 flush threshold is 5 Mib. 

168 if ( 

169 self.tell() - (self._last_flush_position or 0) 

170 ) < s3_min_bits and self._closed != CloseStatus.CLOSING: 

171 # Return until the buffer is big enough. 

172 return 

173 # nothing to write, don't create an empty upload 

174 if self.tell() == 0: 

175 return 

176 with time_this( 

177 self._log, 

178 msg="Upload multipart %d to %s", 

179 args=( 

180 self._partNo, 

181 self, 

182 ), 

183 ): 

184 response = self._client.upload_part( 

185 Body=self._buffer.getvalue(), 

186 Bucket=self._bucket, 

187 Key=self._key, 

188 UploadId=self._multiPartUpload["UploadId"], 

189 PartNumber=self._partNo, 

190 ) 

191 self._parts.append({"PartNumber": self._partNo, "ETag": response["ETag"]}) 

192 self._partNo += 1 

193 self._last_flush_position = self._buffer.tell() + (self._last_flush_position or 0) 

194 self._buffer = BytesIO() 

195 

196 @property 

197 def isatty(self) -> bool: 

198 return False 

199 

200 def _size(self) -> int: 

201 # To allow SEEK_END to work. 

202 if self._total_size == -1: 

203 self._total_size = self._uri.size() 

204 return self._total_size 

205 

206 def readable(self) -> bool: 

207 return self._readable 

208 

209 def readline(self, size: int = -1) -> bytes: 

210 raise OSError("S3 Does not support line by line reads") 

211 

212 def readlines(self, hint: int = -1) -> Iterable[bytes]: 

213 self.seek(0) 

214 return self.read().split(self._newline) 

215 

216 def seek(self, offset: int, whence: int = SEEK_SET) -> int: 

217 if self.writable(): 

218 if self._last_flush_position is not None: 

219 if whence == SEEK_SET: 

220 offset -= self._last_flush_position 

221 if offset < 0: 

222 raise OSError("S3 ResourceHandle can not seek prior to already flushed positions") 

223 if whence == SEEK_CUR and (self.tell() - self._last_flush_position) < 0: 

224 raise OSError("S3 ResourceHandle can not seek prior to already flushed positions") 

225 if whence == SEEK_END: 

226 raise OSError("S3 ResourceHandle can not seek referencing the end of the resource") 

227 self._buffer.seek(offset, whence) 

228 self._position = self._buffer.tell() 

229 else: 

230 if whence == SEEK_SET: 

231 self._position = offset 

232 elif whence == SEEK_CUR: 

233 self._position += offset 

234 elif whence == SEEK_END: 

235 self._position = self._size() + offset 

236 return self._position 

237 

238 def seekable(self) -> bool: 

239 return True 

240 

241 def truncate(self, size: int | None = None) -> int: 

242 if self.writable(): 

243 self._buffer.truncate(size) 

244 return self._position 

245 else: 

246 raise OSError("S3 ResourceHandle is not writable") 

247 

248 def writable(self) -> bool: 

249 return self._writable 

250 

251 def writelines(self, lines: Iterable[bytes]) -> None: 

252 if self.writable(): 

253 self._buffer.writelines(lines) 

254 self._position = self._buffer.tell() 

255 else: 

256 raise OSError("S3 ResourceHandle is not writable") 

257 

258 @backoff.on_exception(backoff.expo, all_retryable_errors, max_time=max_retry_time) 

259 def read(self, size: int = -1) -> bytes: 

260 if not self.readable(): 

261 raise OSError("S3 ResourceHandle is not readable") 

262 # If the object is rw, then read from the internal io buffer 

263 if "+" in self._mode: 

264 self._buffer.seek(self._position) 

265 return self._buffer.read(size) 

266 # otherwise fetch the appropriate bytes from the remote resource 

267 if self._max_size is not None and self._position >= self._max_size: 

268 return b"" 

269 stop = f"{self._position + size - 1}" if size > 0 else "" 

270 args = {"Range": f"bytes={self._position}-{stop}"} 

271 try: 

272 response = self._client.get_object(Bucket=self._bucket, Key=self._key, **args) 

273 contents = response["Body"].read() 

274 response["Body"].close() 

275 self._position += len(contents) 

276 return contents 

277 except (self._client.exceptions.NoSuchKey, self._client.exceptions.NoSuchBucket) as err: 

278 raise FileNotFoundError(f"No such resource: {self}") from err 

279 except ClientError as exc: 

280 if exc.response["ResponseMetadata"]["HTTPStatusCode"] == 416: 

281 if self._recursing: 

282 # This means the function has attempted to read the whole 

283 # byte range and failed again, meaning the previous byte 

284 # was the last byte 

285 return b"" 

286 self._recursing = True 

287 result = self.read() 

288 self._max_size = self._position 

289 self._recursing = False 

290 return result 

291 else: 

292 translate_client_error(exc, self._uri) 

293 raise 

294 

295 def write(self, b: bytes) -> int: 

296 if self.writable(): 

297 result = self._buffer.write(b) 

298 self._position = self._buffer.tell() 

299 return result 

300 else: 

301 raise OSError("S3 ResourceHandle is not writable")