Coverage for tests / test_gs.py: 25%

133 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 08:38 +0000

1# This file is part of lsst-resources. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12"""Tests for the ``gs://`` resource backend. 

13 

14The emulator-backed tests in this module are enabled in either of these ways: 

15 

161. Set ``STORAGE_EMULATOR_HOST`` to an already-running GCS emulator 

17 endpoint. This is how GitHub Actions runs these tests. 

182. Install the ``fake-gcs-server`` binary locally and make it available on 

19 ``PATH``, or set ``FAKE_GCS_SERVER`` to its full path. The test helper will 

20 start and stop the emulator automatically. 

21 

22The server binary is available from: 

23https://github.com/fsouza/fake-gcs-server/releases 

24 

25If neither is configured, the emulator-backed tests are skipped. 

26""" 

27 

28from __future__ import annotations 

29 

30import contextlib 

31import os 

32import re 

33import shutil 

34import socket 

35import subprocess 

36import tempfile 

37import time 

38import unittest 

39import uuid 

40from collections import deque 

41from collections.abc import Iterator 

42from threading import Thread 

43from unittest import mock 

44 

45import lsst.resources.gs as gs_module 

46from lsst.resources import ResourceInfo, ResourcePath 

47from lsst.resources.gs import GSResourcePath 

48from lsst.resources.tests import GenericTestCase 

49 

50try: 

51 from google.cloud import storage 

52except ImportError: 

53 storage = None 

54 

55 

56def _find_free_port() -> int: 

57 with socket.socket() as sock: 

58 sock.bind(("127.0.0.1", 0)) 

59 return sock.getsockname()[1] 

60 

61 

62@contextlib.contextmanager 

63def _reset_gs_client() -> Iterator[None]: 

64 old_client = GSResourcePath._client 

65 old_global_client = gs_module._client 

66 GSResourcePath._client = None 

67 gs_module._client = None 

68 try: 

69 yield 

70 finally: 

71 GSResourcePath._client = None 

72 gs_module._client = old_global_client 

73 GSResourcePath._client = old_client 

74 

75 

76@contextlib.contextmanager 

77def fake_gcs_server(): 

78 """Start or connect to a fake GCS server.""" 

79 if storage is None: 

80 raise unittest.SkipTest("google-cloud-storage is not installed") 

81 

82 emulator_host = os.environ.get("STORAGE_EMULATOR_HOST") 

83 if emulator_host: 

84 env = {"GOOGLE_CLOUD_PROJECT": os.environ.get("GOOGLE_CLOUD_PROJECT", "test-project")} 

85 with mock.patch.dict(os.environ, env, clear=False): 

86 with _reset_gs_client(): 

87 yield storage.Client() 

88 return 

89 

90 binary = os.environ.get("FAKE_GCS_SERVER") or shutil.which("fake-gcs-server") 

91 if binary is None: 

92 raise unittest.SkipTest("fake-gcs-server is not installed") 

93 

94 port = _find_free_port() 

95 filesystem_root = tempfile.mkdtemp(prefix="fake-gcs-server-") 

96 startup_output: deque[str] = deque(maxlen=50) 

97 proc = subprocess.Popen( 

98 [binary, "-scheme", "http", "-port", str(port), "-filesystem-root", filesystem_root], 

99 stdout=subprocess.PIPE, 

100 stderr=subprocess.STDOUT, 

101 text=True, 

102 ) 

103 

104 def _drain_output() -> None: 

105 assert proc.stdout is not None 

106 for line in proc.stdout: 

107 startup_output.append(line.rstrip()) 

108 

109 output_thread = Thread(target=_drain_output, daemon=True) 

110 output_thread.start() 

111 try: 

112 deadline = time.time() + 10 

113 while True: 

114 try: 

115 with socket.create_connection(("127.0.0.1", port), timeout=0.2): 

116 break 

117 except OSError: 

118 if proc.poll() is not None: 

119 details = "\n".join(startup_output) or "no process output captured" 

120 raise RuntimeError( 

121 f"fake-gcs-server exited unexpectedly with code {proc.returncode}:\n{details}" 

122 ) from None 

123 if time.time() > deadline: 

124 details = "\n".join(startup_output) or "no process output captured" 

125 raise RuntimeError(f"Timed out waiting for fake-gcs-server:\n{details}") from None 

126 time.sleep(0.1) 

127 

128 env = { 

129 "STORAGE_EMULATOR_HOST": f"http://127.0.0.1:{port}", 

130 "GOOGLE_CLOUD_PROJECT": "test-project", 

131 } 

132 with mock.patch.dict(os.environ, env, clear=False): 

133 with _reset_gs_client(): 

134 yield storage.Client() 

135 finally: 

136 proc.terminate() 

137 with contextlib.suppress(subprocess.TimeoutExpired): 

138 proc.wait(timeout=5) 

139 if proc.poll() is None: 

140 proc.kill() 

141 proc.wait() 

142 output_thread.join(timeout=1) 

143 shutil.rmtree(filesystem_root, ignore_errors=True) 

144 

145 

146class GenericGSTestCase(GenericTestCase, unittest.TestCase): 

147 """Generic URI property testing.""" 

148 

149 scheme = "gs" 

150 netloc = "my_bucket" 

151 

152 

153class GSReadWriteTestCase(unittest.TestCase): 

154 """Test GCS backend with emulated server.""" 

155 

156 def setUp(self) -> None: 

157 self.server = self.enterContext(fake_gcs_server()) 

158 test_id = re.sub(r"[^a-z0-9-]", "-", self.id().lower()).strip("-") 

159 suffix = uuid.uuid4().hex[:8] 

160 self.bucket = f"{test_id[:54]}-{suffix}" 

161 self.server.create_bucket(self.bucket) 

162 self.root_uri = ResourcePath(f"gs://{self.bucket}/", forceDirectory=True, forceAbsolute=False) 

163 self.tmpdir = self.root_uri.join("TESTING/", forceDirectory=True) 

164 

165 def test_file_round_trip(self) -> None: 

166 uri = self.tmpdir.join("test.txt") 

167 content = b"abcdefghijklmnopqrstuv\n" 

168 

169 self.assertFalse(uri.exists()) 

170 uri.write(content) 

171 self.assertTrue(uri.exists()) 

172 self.assertEqual(uri.read(), content) 

173 self.assertEqual(uri.size(), len(content)) 

174 

175 def test_get_info(self) -> None: 

176 remote = self.tmpdir.join("test-info.dat") 

177 remote.write(b"abc") 

178 

179 info = remote.get_info() 

180 self.assertIsInstance(info, ResourceInfo) 

181 self.assertTrue(info.is_file) 

182 self.assertEqual(info.size, 3) 

183 self.assertIsNotNone(info.last_modified) 

184 self.assertIsInstance(info.checksums, dict) 

185 

186 def test_directory_semantics(self) -> None: 

187 newdir = self.tmpdir.join("newdir/seconddir", forceDirectory=True) 

188 newdir.mkdir() 

189 self.assertTrue(newdir.exists()) 

190 

191 info = newdir.get_info() 

192 self.assertFalse(info.is_file) 

193 self.assertEqual(info.size, 0) 

194 self.assertEqual(info.checksums, {}) 

195 

196 newfile = newdir.join("temp.txt") 

197 newfile.write(b"Data") 

198 self.assertTrue(newfile.exists()) 

199 

200 def test_root_missing_bucket(self) -> None: 

201 missing = ResourcePath("gs://missing-bucket/", forceDirectory=True, forceAbsolute=False) 

202 self.assertFalse(missing.exists()) 

203 with self.assertRaises(FileNotFoundError): 

204 missing.get_info()