Coverage for tests / test_s3.py: 27%
235 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:32 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:32 +0000
1# This file is part of lsst-resources.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12import datetime
13import os
14import time
15import unittest
16from inspect import signature
17from unittest import mock
18from urllib.parse import parse_qs, urlparse
20from lsst.resources import ResourceInfo, ResourcePath
21from lsst.resources.s3 import S3ResourcePath
22from lsst.resources.s3utils import clean_test_environment_for_s3
23from lsst.resources.tests import GenericReadWriteTestCase, GenericTestCase
25try:
26 import boto3
27 import botocore
29 try:
30 from moto import mock_aws # v5
31 except ImportError:
32 from moto import mock_s3 as mock_aws
33except ImportError:
34 boto3 = None
36 def mock_aws(cls):
37 """No-op decorator in case moto mock_aws can not be imported."""
38 return cls
41try:
42 import fsspec
43except ImportError:
44 fsspec = None
47class GenericS3TestCase(GenericTestCase, unittest.TestCase):
48 """Generic tests of S3 URIs."""
50 scheme = "s3"
51 netloc = "my_bucket"
54class S3ReadWriteTestCaseBase(GenericReadWriteTestCase):
55 """Tests of reading and writing S3 URIs."""
57 scheme = "s3"
58 s3_endpoint_url: str | None = None
59 # S3 batches in 1000 files so need more than that.
60 n_mremove_files: int = 1015
62 def setUp(self):
63 self.enterContext(clean_test_environment_for_s3())
65 # Enable S3 mocking of tests.
66 self.enterContext(mock_aws())
68 # MOTO needs to know that we expect Bucket bucketname to exist
69 s3 = boto3.resource("s3", endpoint_url=self.s3_endpoint_url)
70 s3.create_bucket(Bucket=self.bucket)
72 super().setUp()
74 def tearDown(self):
75 s3 = boto3.resource("s3")
76 bucket = s3.Bucket(self.bucket)
77 try:
78 bucket.objects.all().delete()
79 except botocore.exceptions.ClientError as e:
80 if e.response["Error"]["Code"] == "404":
81 # the key was not reachable - pass
82 pass
83 else:
84 raise
86 bucket = s3.Bucket(self.bucket)
87 bucket.delete()
89 S3ResourcePath.use_threads = None
91 super().tearDown()
93 def test_bucket_fail(self):
94 # Deliberately create URI with unknown bucket.
95 uri = ResourcePath("s3://badbucket/something/")
97 with self.assertRaises(ValueError):
98 uri.mkdir()
100 with self.assertRaises(FileNotFoundError):
101 uri.remove()
103 def test_transfer_progress(self):
104 """Test progress bar reporting for upload and download."""
105 remote = self.root_uri.join("test.dat")
106 remote.write(b"42")
107 with ResourcePath.temporary_uri(suffix=".dat") as tmp:
108 # Download from S3.
109 with self.assertLogs("lsst.resources", level="DEBUG") as cm:
110 tmp.transfer_from(remote, transfer="auto")
111 self.assertRegex("".join(cm.output), r"test\.dat.*100\%")
113 # Upload to S3.
114 with self.assertLogs("lsst.resources", level="DEBUG") as cm:
115 remote.transfer_from(tmp, transfer="auto", overwrite=True)
116 self.assertRegex("".join(cm.output), rf"{tmp.basename()}.*100\%")
118 def test_handle(self):
119 remote = self.root_uri.join("test_handle.dat")
120 with remote.open("wb") as handle:
121 self.assertTrue(handle.writable())
122 # write 6 megabytes to make sure partial write work
123 handle.write(6 * 1024 * 1024 * b"a")
124 self.assertEqual(handle.tell(), 6 * 1024 * 1024)
125 handle.flush()
126 self.assertGreaterEqual(len(handle._multiPartUpload), 1)
128 # verify file can't be seeked back
129 with self.assertRaises(OSError):
130 handle.seek(0)
132 # write more bytes
133 handle.write(1024 * b"c")
135 # seek back and overwrite
136 handle.seek(6 * 1024 * 1024)
137 handle.write(1024 * b"b")
139 with remote.open("rb") as handle:
140 self.assertTrue(handle.readable())
141 # read the first 6 megabytes
142 result = handle.read(6 * 1024 * 1024)
143 self.assertEqual(result, 6 * 1024 * 1024 * b"a")
144 self.assertEqual(handle.tell(), 6 * 1024 * 1024)
145 # verify additional read gets the next part
146 result = handle.read(1024)
147 self.assertEqual(result, 1024 * b"b")
148 # see back to the beginning to verify seeking
149 handle.seek(0)
150 result = handle.read(1024)
151 self.assertEqual(result, 1024 * b"a")
153 remote = self.root_uri.join("missing_file.dat")
154 with remote.open("rb") as handle:
155 with self.assertRaises(FileNotFoundError):
156 handle.read()
158 def test_url_signing(self):
159 self._test_url_signing_case("url-signing-test.txt", b"test123")
160 # A zero byte presigned S3 HTTP URL is a weird edge case, because we
161 # emulate HEAD requests using a 1-byte GET.
162 self._test_url_signing_case("url-signing-test-zero-bytes.txt", b"")
163 # Should be the same as a normal case, but check it for paranoia since
164 # it's on the boundary of the read size.
165 self._test_url_signing_case("url-signing-test-one-byte.txt", b"t")
167 def _test_url_signing_case(self, filename: str, test_data: bytes):
168 s3_path = self.root_uri.join(filename)
170 put_url = s3_path.generate_presigned_put_url(expiration_time_seconds=1800)
171 self._check_presigned_url(put_url, 1800)
172 get_url = s3_path.generate_presigned_get_url(expiration_time_seconds=3600)
173 self._check_presigned_url(get_url, 3600)
175 # Check that fragments are retained.
176 s3_path = s3_path.replace(fragment="zip-path=X")
177 put_url = s3_path.generate_presigned_put_url(expiration_time_seconds=1800)
178 self.assertEqual(ResourcePath(put_url).fragment, "zip-path=X")
179 self._check_presigned_url(put_url, 1800)
180 get_url = s3_path.generate_presigned_get_url(expiration_time_seconds=3600)
181 self.assertEqual(ResourcePath(get_url).fragment, "zip-path=X")
182 self._check_presigned_url(get_url, 3600)
184 # Moto monkeypatches the 'requests' library to mock access to presigned
185 # URLs, so we are able to use HttpResourcePath to access the URLs in
186 # this test.
187 ResourcePath(put_url).write(test_data)
188 get_path = ResourcePath(get_url)
189 retrieved = get_path.read()
190 self.assertEqual(retrieved, test_data)
191 self.assertTrue(get_path.exists())
192 self.assertEqual(get_path.size(), len(test_data))
194 # Try again with open().
195 with get_path.open("rb") as fd:
196 self.assertEqual(fd.read(), test_data)
198 def test_nonexistent_presigned_url(self):
199 s3_path = self.root_uri.join("this-is-a-missing-file.txt")
200 get_url = s3_path.generate_presigned_get_url(expiration_time_seconds=3600)
201 get_path = ResourcePath(get_url)
202 # Check the HttpResourcePath implementation for presigned S3 urls.
203 # Nothing has been uploaded to this URL, so it shouldn't exist.
204 self.assertFalse(get_path.exists())
205 with self.assertRaises(FileNotFoundError):
206 get_path.size()
208 def test_get_info(self):
209 now = datetime.datetime.now(tz=datetime.UTC)
210 remote = self.root_uri.join("test-info.dat")
211 remote.write(b"abc")
213 info = remote.get_info()
214 self.assertIsInstance(info, ResourceInfo)
215 self.assertTrue(info.is_file)
216 self.assertEqual(info.size, 3)
217 self.assertIsInstance(info.checksums, dict)
218 self.assertIn("crc32", info.checksums) # Only appears if ChecksumMode=ENABLED
219 self.assertEqual(info.last_modified.tzinfo, datetime.UTC)
220 self.assertGreaterEqual(info.last_modified.timestamp(), now.timestamp() - 1.0)
222 def _check_presigned_url(self, url: str, expiration_time_seconds: int):
223 parsed = urlparse(url)
224 self.assertEqual(parsed.scheme, "https")
226 actual_expiration_timestamp = int(parse_qs(parsed.query)["Expires"][0])
227 current_time = int(time.time())
228 expected_expiration_timestamp = current_time + expiration_time_seconds
229 # Allow some flex in the expiration time in case this test process goes
230 # out to lunch for a while on a busy CI machine
231 self.assertLessEqual(abs(expected_expiration_timestamp - actual_expiration_timestamp), 120)
233 def test_threading_true(self):
234 with mock.patch.dict(os.environ, {"LSST_S3_USE_THREADS": "True"}):
235 S3ResourcePath.use_threads = None
236 test_resource_path = self.root_uri.join("test_file.dat")
237 self.assertTrue(test_resource_path._transfer_config.use_threads)
239 def test_implicit_default_threading(self):
240 S3ResourcePath.use_threads = None
241 boto_default = signature(boto3.s3.transfer.TransferConfig).parameters["use_threads"].default
242 # Newer versions of boto return None as the default.
243 if boto_default is None:
244 boto_default = True
245 test_resource_path = self.root_uri.join("test_file.dat")
246 self.assertEqual(test_resource_path._transfer_config.use_threads, boto_default)
248 def test_explicit_default_threading(self):
249 with mock.patch.dict(os.environ, {"LSST_S3_USE_THREADS": "None"}):
250 S3ResourcePath.use_threads = None
251 boto_default = signature(boto3.s3.transfer.TransferConfig).parameters["use_threads"].default
252 # Newer versions of boto return None as the default.
253 if boto_default is None:
254 boto_default = True
255 test_resource_path = self.root_uri.join("test_file.dat")
256 self.assertEqual(test_resource_path._transfer_config.use_threads, boto_default)
258 def test_threading_false(self):
259 with mock.patch.dict(os.environ, {"LSST_S3_USE_THREADS": "False"}):
260 S3ResourcePath.use_threads = None
261 test_resource_path = self.root_uri.join("test_file.dat")
262 self.assertFalse(test_resource_path._transfer_config.use_threads)
264 self.test_local()
266 @unittest.skipIf(fsspec is None, "fsspec is not available")
267 def test_fsspec_constructor(self) -> None:
268 """Test that we can obtain an s3fs object."""
269 uri = self.root_uri.join("test_file.dat")
270 fs, path = uri.to_fsspec()
271 self.assertEqual(path, f"{uri._bucket}/{uri.relativeToPathRoot}")
272 self.assertTrue(hasattr(fs, "open"))
274 def test_fsspec(self) -> None:
275 raise unittest.SkipTest("fsspec s3fs incompatible with moto")
277 @unittest.mock.patch("lsst.resources._resourcePath._POOL_EXECUTOR_CLASS", None)
278 @unittest.mock.patch.dict(os.environ, {"LSST_RESOURCES_EXECUTOR": "threads"})
279 def test_mexists(self) -> None:
280 """Test mexists with override executor pool.
282 moto does not work with process pool.
283 """
284 super().test_mexists()
286 @unittest.mock.patch("lsst.resources._resourcePath._POOL_EXECUTOR_CLASS", None)
287 @unittest.mock.patch.dict(os.environ, {"LSST_RESOURCES_EXECUTOR": "threads"})
288 def test_mtransfer(self) -> None:
289 """Test mtransfer with override executor pool.
291 moto does not work with process pool.
292 """
293 super().test_mtransfer()
295 @unittest.mock.patch("lsst.resources._resourcePath._POOL_EXECUTOR_CLASS", None)
296 @unittest.mock.patch.dict(os.environ, {"LSST_RESOURCES_EXECUTOR": "threads"})
297 def test_local_mtransfer(self) -> None:
298 """Test local mtransfer with override executor pool.
300 moto does not work with process pool.
301 """
302 super().test_local_mtransfer()
305@unittest.skipIf(not boto3, "Warning: boto3 AWS SDK not found!")
306class S3ReadWriteTestCase(S3ReadWriteTestCaseBase, unittest.TestCase):
307 """Test S3 with no explicit profile/endpoint specified.
308 (``s3://bucketname/...``).
309 """
311 bucket = "my_2nd_bucket"
312 netloc = bucket
315@unittest.skipIf(not boto3, "Warning: boto3 AWS SDK not found!")
316class S3WithProfileReadWriteTestCase(S3ReadWriteTestCaseBase, unittest.TestCase):
317 """Test S3 URLs with explicit profile specified.
318 (``s3://profile@bucketname/...``).
319 """
321 bucket = "3rd_bucket"
322 netloc = f"myprofile@{bucket}"
323 s3_endpoint_url = "https://endpoint1.test.example"
325 def setUp(self):
326 # Configure custom S3 endpoints that we can target from tests using
327 # non-default profile.
328 self.enterContext(
329 mock.patch.dict(
330 os.environ,
331 {
332 "MOTO_S3_CUSTOM_ENDPOINTS": self.s3_endpoint_url,
333 "LSST_RESOURCES_S3_PROFILE_myprofile": "https://access_key:security_key@endpoint1.test.example",
334 },
335 )
336 )
338 super().setUp()
340 def test_missing_profile(self):
341 with self.assertRaises(botocore.exceptions.ProfileNotFound):
342 ResourcePath("s3://otherprofile@bucket").read()
344 def test_s3_endpoint_url(self):
345 with mock.patch.dict(
346 os.environ,
347 {"S3_ENDPOINT_URL": self.s3_endpoint_url},
348 ):
349 path = ResourcePath(f"s3://{self.bucket}/test-s3-endpoint-url.txt")
350 data = b"123"
351 path.write(data)
352 self.assertEqual(path.read(), data)
353 self.assertIn(
354 "https://endpoint1.test.example",
355 path.generate_presigned_get_url(expiration_time_seconds=3600),
356 )
358 def test_uri_syntax(self):
359 path1 = ResourcePath("s3://profile@bucket/path")
360 self.assertEqual(path1._bucket, "bucket")
361 self.assertEqual(path1._profile, "profile")
362 path2 = ResourcePath("s3://bucket2/path")
363 self.assertEqual(path2._bucket, "bucket2")
364 self.assertIsNone(path2._profile)
366 def test_ceph_uri_syntax(self):
367 # The Ceph S3 'multi-tenant' syntax for buckets can include colons.
368 path1 = ResourcePath("s3://profile@ceph:bucket/path")
369 self.assertEqual(path1._bucket, "ceph:bucket")
370 self.assertEqual(path1._profile, "profile")
371 path2 = ResourcePath("s3://ceph:bucket2/path")
372 self.assertEqual(path2._bucket, "ceph:bucket2")
373 self.assertIsNone(path2._profile)
375 def test_transfer_from_different_endpoints(self):
376 # Create a bucket using a different endpoint (the default endpoint.)
377 boto3.resource("s3").create_bucket(Bucket="source-bucket")
378 source_path = ResourcePath("s3://source-bucket/file.txt")
379 source_path.write(b"123")
380 target_path = ResourcePath(f"s3://{self.netloc}/target.txt")
381 # Transfer from default endpoint to custom endpoint with custom
382 # profile.
383 target_path.transfer_from(source_path)
384 self.assertEqual(target_path.read(), b"123")
387if __name__ == "__main__":
388 unittest.main()