Coverage for tests / test_gs.py: 25%
133 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:32 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:32 +0000
1# This file is part of lsst-resources.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12"""Tests for the ``gs://`` resource backend.
14The emulator-backed tests in this module are enabled in either of these ways:
161. Set ``STORAGE_EMULATOR_HOST`` to an already-running GCS emulator
17 endpoint. This is how GitHub Actions runs these tests.
182. Install the ``fake-gcs-server`` binary locally and make it available on
19 ``PATH``, or set ``FAKE_GCS_SERVER`` to its full path. The test helper will
20 start and stop the emulator automatically.
22The server binary is available from:
23https://github.com/fsouza/fake-gcs-server/releases
25If neither is configured, the emulator-backed tests are skipped.
26"""
28from __future__ import annotations
30import contextlib
31import os
32import re
33import shutil
34import socket
35import subprocess
36import tempfile
37import time
38import unittest
39import uuid
40from collections import deque
41from collections.abc import Iterator
42from threading import Thread
43from unittest import mock
45import lsst.resources.gs as gs_module
46from lsst.resources import ResourceInfo, ResourcePath
47from lsst.resources.gs import GSResourcePath
48from lsst.resources.tests import GenericTestCase
50try:
51 from google.cloud import storage
52except ImportError:
53 storage = None
56def _find_free_port() -> int:
57 with socket.socket() as sock:
58 sock.bind(("127.0.0.1", 0))
59 return sock.getsockname()[1]
62@contextlib.contextmanager
63def _reset_gs_client() -> Iterator[None]:
64 old_client = GSResourcePath._client
65 old_global_client = gs_module._client
66 GSResourcePath._client = None
67 gs_module._client = None
68 try:
69 yield
70 finally:
71 GSResourcePath._client = None
72 gs_module._client = old_global_client
73 GSResourcePath._client = old_client
76@contextlib.contextmanager
77def fake_gcs_server():
78 """Start or connect to a fake GCS server."""
79 if storage is None:
80 raise unittest.SkipTest("google-cloud-storage is not installed")
82 emulator_host = os.environ.get("STORAGE_EMULATOR_HOST")
83 if emulator_host:
84 env = {"GOOGLE_CLOUD_PROJECT": os.environ.get("GOOGLE_CLOUD_PROJECT", "test-project")}
85 with mock.patch.dict(os.environ, env, clear=False):
86 with _reset_gs_client():
87 yield storage.Client()
88 return
90 binary = os.environ.get("FAKE_GCS_SERVER") or shutil.which("fake-gcs-server")
91 if binary is None:
92 raise unittest.SkipTest("fake-gcs-server is not installed")
94 port = _find_free_port()
95 filesystem_root = tempfile.mkdtemp(prefix="fake-gcs-server-")
96 startup_output: deque[str] = deque(maxlen=50)
97 proc = subprocess.Popen(
98 [binary, "-scheme", "http", "-port", str(port), "-filesystem-root", filesystem_root],
99 stdout=subprocess.PIPE,
100 stderr=subprocess.STDOUT,
101 text=True,
102 )
104 def _drain_output() -> None:
105 assert proc.stdout is not None
106 for line in proc.stdout:
107 startup_output.append(line.rstrip())
109 output_thread = Thread(target=_drain_output, daemon=True)
110 output_thread.start()
111 try:
112 deadline = time.time() + 10
113 while True:
114 try:
115 with socket.create_connection(("127.0.0.1", port), timeout=0.2):
116 break
117 except OSError:
118 if proc.poll() is not None:
119 details = "\n".join(startup_output) or "no process output captured"
120 raise RuntimeError(
121 f"fake-gcs-server exited unexpectedly with code {proc.returncode}:\n{details}"
122 ) from None
123 if time.time() > deadline:
124 details = "\n".join(startup_output) or "no process output captured"
125 raise RuntimeError(f"Timed out waiting for fake-gcs-server:\n{details}") from None
126 time.sleep(0.1)
128 env = {
129 "STORAGE_EMULATOR_HOST": f"http://127.0.0.1:{port}",
130 "GOOGLE_CLOUD_PROJECT": "test-project",
131 }
132 with mock.patch.dict(os.environ, env, clear=False):
133 with _reset_gs_client():
134 yield storage.Client()
135 finally:
136 proc.terminate()
137 with contextlib.suppress(subprocess.TimeoutExpired):
138 proc.wait(timeout=5)
139 if proc.poll() is None:
140 proc.kill()
141 proc.wait()
142 output_thread.join(timeout=1)
143 shutil.rmtree(filesystem_root, ignore_errors=True)
146class GenericGSTestCase(GenericTestCase, unittest.TestCase):
147 """Generic URI property testing."""
149 scheme = "gs"
150 netloc = "my_bucket"
153class GSReadWriteTestCase(unittest.TestCase):
154 """Test GCS backend with emulated server."""
156 def setUp(self) -> None:
157 self.server = self.enterContext(fake_gcs_server())
158 test_id = re.sub(r"[^a-z0-9-]", "-", self.id().lower()).strip("-")
159 suffix = uuid.uuid4().hex[:8]
160 self.bucket = f"{test_id[:54]}-{suffix}"
161 self.server.create_bucket(self.bucket)
162 self.root_uri = ResourcePath(f"gs://{self.bucket}/", forceDirectory=True, forceAbsolute=False)
163 self.tmpdir = self.root_uri.join("TESTING/", forceDirectory=True)
165 def test_file_round_trip(self) -> None:
166 uri = self.tmpdir.join("test.txt")
167 content = b"abcdefghijklmnopqrstuv\n"
169 self.assertFalse(uri.exists())
170 uri.write(content)
171 self.assertTrue(uri.exists())
172 self.assertEqual(uri.read(), content)
173 self.assertEqual(uri.size(), len(content))
175 def test_get_info(self) -> None:
176 remote = self.tmpdir.join("test-info.dat")
177 remote.write(b"abc")
179 info = remote.get_info()
180 self.assertIsInstance(info, ResourceInfo)
181 self.assertTrue(info.is_file)
182 self.assertEqual(info.size, 3)
183 self.assertIsNotNone(info.last_modified)
184 self.assertIsInstance(info.checksums, dict)
186 def test_directory_semantics(self) -> None:
187 newdir = self.tmpdir.join("newdir/seconddir", forceDirectory=True)
188 newdir.mkdir()
189 self.assertTrue(newdir.exists())
191 info = newdir.get_info()
192 self.assertFalse(info.is_file)
193 self.assertEqual(info.size, 0)
194 self.assertEqual(info.checksums, {})
196 newfile = newdir.join("temp.txt")
197 newfile.write(b"Data")
198 self.assertTrue(newfile.exists())
200 def test_root_missing_bucket(self) -> None:
201 missing = ResourcePath("gs://missing-bucket/", forceDirectory=True, forceAbsolute=False)
202 self.assertFalse(missing.exists())
203 with self.assertRaises(FileNotFoundError):
204 missing.get_info()