Coverage for tests / test_s3.py: 27%

235 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 08:38 +0000

1# This file is part of lsst-resources. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12import datetime 

13import os 

14import time 

15import unittest 

16from inspect import signature 

17from unittest import mock 

18from urllib.parse import parse_qs, urlparse 

19 

20from lsst.resources import ResourceInfo, ResourcePath 

21from lsst.resources.s3 import S3ResourcePath 

22from lsst.resources.s3utils import clean_test_environment_for_s3 

23from lsst.resources.tests import GenericReadWriteTestCase, GenericTestCase 

24 

25try: 

26 import boto3 

27 import botocore 

28 

29 try: 

30 from moto import mock_aws # v5 

31 except ImportError: 

32 from moto import mock_s3 as mock_aws 

33except ImportError: 

34 boto3 = None 

35 

36 def mock_aws(cls): 

37 """No-op decorator in case moto mock_aws can not be imported.""" 

38 return cls 

39 

40 

41try: 

42 import fsspec 

43except ImportError: 

44 fsspec = None 

45 

46 

47class GenericS3TestCase(GenericTestCase, unittest.TestCase): 

48 """Generic tests of S3 URIs.""" 

49 

50 scheme = "s3" 

51 netloc = "my_bucket" 

52 

53 

54class S3ReadWriteTestCaseBase(GenericReadWriteTestCase): 

55 """Tests of reading and writing S3 URIs.""" 

56 

57 scheme = "s3" 

58 s3_endpoint_url: str | None = None 

59 # S3 batches in 1000 files so need more than that. 

60 n_mremove_files: int = 1015 

61 

62 def setUp(self): 

63 self.enterContext(clean_test_environment_for_s3()) 

64 

65 # Enable S3 mocking of tests. 

66 self.enterContext(mock_aws()) 

67 

68 # MOTO needs to know that we expect Bucket bucketname to exist 

69 s3 = boto3.resource("s3", endpoint_url=self.s3_endpoint_url) 

70 s3.create_bucket(Bucket=self.bucket) 

71 

72 super().setUp() 

73 

74 def tearDown(self): 

75 s3 = boto3.resource("s3") 

76 bucket = s3.Bucket(self.bucket) 

77 try: 

78 bucket.objects.all().delete() 

79 except botocore.exceptions.ClientError as e: 

80 if e.response["Error"]["Code"] == "404": 

81 # the key was not reachable - pass 

82 pass 

83 else: 

84 raise 

85 

86 bucket = s3.Bucket(self.bucket) 

87 bucket.delete() 

88 

89 S3ResourcePath.use_threads = None 

90 

91 super().tearDown() 

92 

93 def test_bucket_fail(self): 

94 # Deliberately create URI with unknown bucket. 

95 uri = ResourcePath("s3://badbucket/something/") 

96 

97 with self.assertRaises(ValueError): 

98 uri.mkdir() 

99 

100 with self.assertRaises(FileNotFoundError): 

101 uri.remove() 

102 

103 def test_transfer_progress(self): 

104 """Test progress bar reporting for upload and download.""" 

105 remote = self.root_uri.join("test.dat") 

106 remote.write(b"42") 

107 with ResourcePath.temporary_uri(suffix=".dat") as tmp: 

108 # Download from S3. 

109 with self.assertLogs("lsst.resources", level="DEBUG") as cm: 

110 tmp.transfer_from(remote, transfer="auto") 

111 self.assertRegex("".join(cm.output), r"test\.dat.*100\%") 

112 

113 # Upload to S3. 

114 with self.assertLogs("lsst.resources", level="DEBUG") as cm: 

115 remote.transfer_from(tmp, transfer="auto", overwrite=True) 

116 self.assertRegex("".join(cm.output), rf"{tmp.basename()}.*100\%") 

117 

118 def test_handle(self): 

119 remote = self.root_uri.join("test_handle.dat") 

120 with remote.open("wb") as handle: 

121 self.assertTrue(handle.writable()) 

122 # write 6 megabytes to make sure partial write work 

123 handle.write(6 * 1024 * 1024 * b"a") 

124 self.assertEqual(handle.tell(), 6 * 1024 * 1024) 

125 handle.flush() 

126 self.assertGreaterEqual(len(handle._multiPartUpload), 1) 

127 

128 # verify file can't be seeked back 

129 with self.assertRaises(OSError): 

130 handle.seek(0) 

131 

132 # write more bytes 

133 handle.write(1024 * b"c") 

134 

135 # seek back and overwrite 

136 handle.seek(6 * 1024 * 1024) 

137 handle.write(1024 * b"b") 

138 

139 with remote.open("rb") as handle: 

140 self.assertTrue(handle.readable()) 

141 # read the first 6 megabytes 

142 result = handle.read(6 * 1024 * 1024) 

143 self.assertEqual(result, 6 * 1024 * 1024 * b"a") 

144 self.assertEqual(handle.tell(), 6 * 1024 * 1024) 

145 # verify additional read gets the next part 

146 result = handle.read(1024) 

147 self.assertEqual(result, 1024 * b"b") 

148 # see back to the beginning to verify seeking 

149 handle.seek(0) 

150 result = handle.read(1024) 

151 self.assertEqual(result, 1024 * b"a") 

152 

153 remote = self.root_uri.join("missing_file.dat") 

154 with remote.open("rb") as handle: 

155 with self.assertRaises(FileNotFoundError): 

156 handle.read() 

157 

158 def test_url_signing(self): 

159 self._test_url_signing_case("url-signing-test.txt", b"test123") 

160 # A zero byte presigned S3 HTTP URL is a weird edge case, because we 

161 # emulate HEAD requests using a 1-byte GET. 

162 self._test_url_signing_case("url-signing-test-zero-bytes.txt", b"") 

163 # Should be the same as a normal case, but check it for paranoia since 

164 # it's on the boundary of the read size. 

165 self._test_url_signing_case("url-signing-test-one-byte.txt", b"t") 

166 

167 def _test_url_signing_case(self, filename: str, test_data: bytes): 

168 s3_path = self.root_uri.join(filename) 

169 

170 put_url = s3_path.generate_presigned_put_url(expiration_time_seconds=1800) 

171 self._check_presigned_url(put_url, 1800) 

172 get_url = s3_path.generate_presigned_get_url(expiration_time_seconds=3600) 

173 self._check_presigned_url(get_url, 3600) 

174 

175 # Check that fragments are retained. 

176 s3_path = s3_path.replace(fragment="zip-path=X") 

177 put_url = s3_path.generate_presigned_put_url(expiration_time_seconds=1800) 

178 self.assertEqual(ResourcePath(put_url).fragment, "zip-path=X") 

179 self._check_presigned_url(put_url, 1800) 

180 get_url = s3_path.generate_presigned_get_url(expiration_time_seconds=3600) 

181 self.assertEqual(ResourcePath(get_url).fragment, "zip-path=X") 

182 self._check_presigned_url(get_url, 3600) 

183 

184 # Moto monkeypatches the 'requests' library to mock access to presigned 

185 # URLs, so we are able to use HttpResourcePath to access the URLs in 

186 # this test. 

187 ResourcePath(put_url).write(test_data) 

188 get_path = ResourcePath(get_url) 

189 retrieved = get_path.read() 

190 self.assertEqual(retrieved, test_data) 

191 self.assertTrue(get_path.exists()) 

192 self.assertEqual(get_path.size(), len(test_data)) 

193 

194 # Try again with open(). 

195 with get_path.open("rb") as fd: 

196 self.assertEqual(fd.read(), test_data) 

197 

198 def test_nonexistent_presigned_url(self): 

199 s3_path = self.root_uri.join("this-is-a-missing-file.txt") 

200 get_url = s3_path.generate_presigned_get_url(expiration_time_seconds=3600) 

201 get_path = ResourcePath(get_url) 

202 # Check the HttpResourcePath implementation for presigned S3 urls. 

203 # Nothing has been uploaded to this URL, so it shouldn't exist. 

204 self.assertFalse(get_path.exists()) 

205 with self.assertRaises(FileNotFoundError): 

206 get_path.size() 

207 

208 def test_get_info(self): 

209 now = datetime.datetime.now(tz=datetime.UTC) 

210 remote = self.root_uri.join("test-info.dat") 

211 remote.write(b"abc") 

212 

213 info = remote.get_info() 

214 self.assertIsInstance(info, ResourceInfo) 

215 self.assertTrue(info.is_file) 

216 self.assertEqual(info.size, 3) 

217 self.assertIsInstance(info.checksums, dict) 

218 self.assertIn("crc32", info.checksums) # Only appears if ChecksumMode=ENABLED 

219 self.assertEqual(info.last_modified.tzinfo, datetime.UTC) 

220 self.assertGreaterEqual(info.last_modified.timestamp(), now.timestamp() - 1.0) 

221 

222 def _check_presigned_url(self, url: str, expiration_time_seconds: int): 

223 parsed = urlparse(url) 

224 self.assertEqual(parsed.scheme, "https") 

225 

226 actual_expiration_timestamp = int(parse_qs(parsed.query)["Expires"][0]) 

227 current_time = int(time.time()) 

228 expected_expiration_timestamp = current_time + expiration_time_seconds 

229 # Allow some flex in the expiration time in case this test process goes 

230 # out to lunch for a while on a busy CI machine 

231 self.assertLessEqual(abs(expected_expiration_timestamp - actual_expiration_timestamp), 120) 

232 

233 def test_threading_true(self): 

234 with mock.patch.dict(os.environ, {"LSST_S3_USE_THREADS": "True"}): 

235 S3ResourcePath.use_threads = None 

236 test_resource_path = self.root_uri.join("test_file.dat") 

237 self.assertTrue(test_resource_path._transfer_config.use_threads) 

238 

239 def test_implicit_default_threading(self): 

240 S3ResourcePath.use_threads = None 

241 boto_default = signature(boto3.s3.transfer.TransferConfig).parameters["use_threads"].default 

242 # Newer versions of boto return None as the default. 

243 if boto_default is None: 

244 boto_default = True 

245 test_resource_path = self.root_uri.join("test_file.dat") 

246 self.assertEqual(test_resource_path._transfer_config.use_threads, boto_default) 

247 

248 def test_explicit_default_threading(self): 

249 with mock.patch.dict(os.environ, {"LSST_S3_USE_THREADS": "None"}): 

250 S3ResourcePath.use_threads = None 

251 boto_default = signature(boto3.s3.transfer.TransferConfig).parameters["use_threads"].default 

252 # Newer versions of boto return None as the default. 

253 if boto_default is None: 

254 boto_default = True 

255 test_resource_path = self.root_uri.join("test_file.dat") 

256 self.assertEqual(test_resource_path._transfer_config.use_threads, boto_default) 

257 

258 def test_threading_false(self): 

259 with mock.patch.dict(os.environ, {"LSST_S3_USE_THREADS": "False"}): 

260 S3ResourcePath.use_threads = None 

261 test_resource_path = self.root_uri.join("test_file.dat") 

262 self.assertFalse(test_resource_path._transfer_config.use_threads) 

263 

264 self.test_local() 

265 

266 @unittest.skipIf(fsspec is None, "fsspec is not available") 

267 def test_fsspec_constructor(self) -> None: 

268 """Test that we can obtain an s3fs object.""" 

269 uri = self.root_uri.join("test_file.dat") 

270 fs, path = uri.to_fsspec() 

271 self.assertEqual(path, f"{uri._bucket}/{uri.relativeToPathRoot}") 

272 self.assertTrue(hasattr(fs, "open")) 

273 

274 def test_fsspec(self) -> None: 

275 raise unittest.SkipTest("fsspec s3fs incompatible with moto") 

276 

277 @unittest.mock.patch("lsst.resources._resourcePath._POOL_EXECUTOR_CLASS", None) 

278 @unittest.mock.patch.dict(os.environ, {"LSST_RESOURCES_EXECUTOR": "threads"}) 

279 def test_mexists(self) -> None: 

280 """Test mexists with override executor pool. 

281 

282 moto does not work with process pool. 

283 """ 

284 super().test_mexists() 

285 

286 @unittest.mock.patch("lsst.resources._resourcePath._POOL_EXECUTOR_CLASS", None) 

287 @unittest.mock.patch.dict(os.environ, {"LSST_RESOURCES_EXECUTOR": "threads"}) 

288 def test_mtransfer(self) -> None: 

289 """Test mtransfer with override executor pool. 

290 

291 moto does not work with process pool. 

292 """ 

293 super().test_mtransfer() 

294 

295 @unittest.mock.patch("lsst.resources._resourcePath._POOL_EXECUTOR_CLASS", None) 

296 @unittest.mock.patch.dict(os.environ, {"LSST_RESOURCES_EXECUTOR": "threads"}) 

297 def test_local_mtransfer(self) -> None: 

298 """Test local mtransfer with override executor pool. 

299 

300 moto does not work with process pool. 

301 """ 

302 super().test_local_mtransfer() 

303 

304 

305@unittest.skipIf(not boto3, "Warning: boto3 AWS SDK not found!") 

306class S3ReadWriteTestCase(S3ReadWriteTestCaseBase, unittest.TestCase): 

307 """Test S3 with no explicit profile/endpoint specified. 

308 (``s3://bucketname/...``). 

309 """ 

310 

311 bucket = "my_2nd_bucket" 

312 netloc = bucket 

313 

314 

315@unittest.skipIf(not boto3, "Warning: boto3 AWS SDK not found!") 

316class S3WithProfileReadWriteTestCase(S3ReadWriteTestCaseBase, unittest.TestCase): 

317 """Test S3 URLs with explicit profile specified. 

318 (``s3://profile@bucketname/...``). 

319 """ 

320 

321 bucket = "3rd_bucket" 

322 netloc = f"myprofile@{bucket}" 

323 s3_endpoint_url = "https://endpoint1.test.example" 

324 

325 def setUp(self): 

326 # Configure custom S3 endpoints that we can target from tests using 

327 # non-default profile. 

328 self.enterContext( 

329 mock.patch.dict( 

330 os.environ, 

331 { 

332 "MOTO_S3_CUSTOM_ENDPOINTS": self.s3_endpoint_url, 

333 "LSST_RESOURCES_S3_PROFILE_myprofile": "https://access_key:security_key@endpoint1.test.example", 

334 }, 

335 ) 

336 ) 

337 

338 super().setUp() 

339 

340 def test_missing_profile(self): 

341 with self.assertRaises(botocore.exceptions.ProfileNotFound): 

342 ResourcePath("s3://otherprofile@bucket").read() 

343 

344 def test_s3_endpoint_url(self): 

345 with mock.patch.dict( 

346 os.environ, 

347 {"S3_ENDPOINT_URL": self.s3_endpoint_url}, 

348 ): 

349 path = ResourcePath(f"s3://{self.bucket}/test-s3-endpoint-url.txt") 

350 data = b"123" 

351 path.write(data) 

352 self.assertEqual(path.read(), data) 

353 self.assertIn( 

354 "https://endpoint1.test.example", 

355 path.generate_presigned_get_url(expiration_time_seconds=3600), 

356 ) 

357 

358 def test_uri_syntax(self): 

359 path1 = ResourcePath("s3://profile@bucket/path") 

360 self.assertEqual(path1._bucket, "bucket") 

361 self.assertEqual(path1._profile, "profile") 

362 path2 = ResourcePath("s3://bucket2/path") 

363 self.assertEqual(path2._bucket, "bucket2") 

364 self.assertIsNone(path2._profile) 

365 

366 def test_ceph_uri_syntax(self): 

367 # The Ceph S3 'multi-tenant' syntax for buckets can include colons. 

368 path1 = ResourcePath("s3://profile@ceph:bucket/path") 

369 self.assertEqual(path1._bucket, "ceph:bucket") 

370 self.assertEqual(path1._profile, "profile") 

371 path2 = ResourcePath("s3://ceph:bucket2/path") 

372 self.assertEqual(path2._bucket, "ceph:bucket2") 

373 self.assertIsNone(path2._profile) 

374 

375 def test_transfer_from_different_endpoints(self): 

376 # Create a bucket using a different endpoint (the default endpoint.) 

377 boto3.resource("s3").create_bucket(Bucket="source-bucket") 

378 source_path = ResourcePath("s3://source-bucket/file.txt") 

379 source_path.write(b"123") 

380 target_path = ResourcePath(f"s3://{self.netloc}/target.txt") 

381 # Transfer from default endpoint to custom endpoint with custom 

382 # profile. 

383 target_path.transfer_from(source_path) 

384 self.assertEqual(target_path.read(), b"123") 

385 

386 

387if __name__ == "__main__": 

388 unittest.main()