Coverage for python / lsst / resources / gs.py: 19%
219 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 08:38 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 08:38 +0000
1# This file is part of lsst-resources.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12"""Accessing Google Cloud Storage resources."""
14from __future__ import annotations
16__all__ = ("GSResourcePath",)
18import contextlib
19import datetime
20import logging
21import re
22from collections.abc import Iterator
23from typing import TYPE_CHECKING
25from ._resourceHandles._baseResourceHandle import ResourceHandleProtocol
27try:
28 import google.api_core.retry as retry
29 import google.cloud.storage as storage
30 from google.cloud.exceptions import (
31 BadGateway,
32 InternalServerError,
33 NotFound,
34 ServiceUnavailable,
35 TooManyRequests,
36 )
37except ImportError:
38 storage = None
39 retry = None
41 # Must also fake the exception classes.
42 class ClientError(Exception):
43 """Generic client error."""
45 pass
47 class NotFound(ClientError): # type: ignore # noqa: N818
48 """Resource not found error."""
50 pass
52 class TooManyRequests(ClientError): # type: ignore # noqa: N818
53 """Too many requests error."""
55 pass
57 class InternalServerError(ClientError): # type: ignore
58 """Internal server error."""
60 pass
62 class BadGateway(ClientError): # type: ignore # noqa: N818
63 """Bad gateway error."""
65 pass
67 class ServiceUnavailable(ClientError): # type: ignore # noqa: N818
68 """Service unavailable error."""
70 pass
73from lsst.utils.timer import time_this
75from ._resourcePath import ResourceInfo, ResourcePath
77if TYPE_CHECKING:
78 from .utils import TransactionProtocol
80log = logging.getLogger(__name__)
83_RETRIEVABLE_TYPES = (
84 TooManyRequests, # 429
85 InternalServerError, # 500
86 BadGateway, # 502
87 ServiceUnavailable, # 503
88)
91def is_retryable(exc: Exception) -> bool:
92 """Report if the given exception is a condition that can be retried.
94 Parameters
95 ----------
96 exc : `Exception`
97 Exception to check.
99 Returns
100 -------
101 `bool`
102 Returns `True` if the given exception is a condition that can be
103 retried.
104 """
105 return isinstance(exc, _RETRIEVABLE_TYPES)
108_RETRY_POLICY = retry.Retry(predicate=is_retryable) if retry else None
111_client = None
112"""Cached client connection."""
115def _coerce_gcs_datetime(value: datetime.datetime | str | None) -> datetime.datetime | None:
116 """Convert GCS timestamp values to timezone-aware UTC datetimes.
118 Some emulators return RFC3339 timestamps with an explicit UTC offset
119 instead of a trailing ``Z``, which the google-cloud-storage property
120 accessors do not always accept.
121 """
122 if value is None:
123 return None
124 if isinstance(value, datetime.datetime):
125 if value.tzinfo is None:
126 return value.replace(tzinfo=datetime.UTC)
127 return value.astimezone(datetime.UTC)
128 if value.endswith("Z"):
129 value = value[:-1] + "+00:00"
130 return datetime.datetime.fromisoformat(value).astimezone(datetime.UTC)
133def _get_client() -> storage.Client:
134 global _client
135 if storage is None:
136 raise ImportError("google-cloud-storage package not installed. Unable to communicate with GCS.")
137 if _client is None:
138 _client = storage.Client()
139 return _client
142class GSResourcePath(ResourcePath):
143 """Access Google Cloud Storage resources."""
145 _bucket: storage.Bucket | None = None
146 _blob: storage.Blob | None = None
147 _client: storage.Client | None = None
149 @property
150 def client(self) -> storage.Client:
151 return _get_client()
153 @property
154 def bucket(self) -> storage.Bucket:
155 if self._bucket is None:
156 self._bucket = self.client.bucket(self.netloc)
157 return self._bucket
159 @property
160 def blob(self) -> storage.Blob:
161 if self._blob is None:
162 self._blob = self.bucket.blob(self.relativeToPathRoot)
163 return self._blob
165 def exists(self) -> bool:
166 if self.is_root:
167 return self.bucket.exists(retry=_RETRY_POLICY)
168 if self.dirLike:
169 # GCS does not have concrete directory objects; treat any
170 # directory-like path within an existing bucket as existing.
171 return self.bucket.exists(retry=_RETRY_POLICY)
172 return self.blob.exists(retry=_RETRY_POLICY)
174 def size(self) -> int:
175 if self.dirLike:
176 return 0
177 # The first time this is called we need to sync from the remote.
178 # Force the blob to be recalculated.
179 try:
180 self.blob.reload(retry=_RETRY_POLICY)
181 except NotFound:
182 raise FileNotFoundError(f"Resource {self} does not exist") from None
183 size = self.blob.size
184 if size is None:
185 raise FileNotFoundError(f"Resource {self} does not exist")
186 return size
188 def get_info(self) -> ResourceInfo:
189 """Return lightweight metadata about this GCS resource."""
190 if self.is_root:
191 if not self.bucket.exists(retry=_RETRY_POLICY):
192 raise FileNotFoundError(f"Resource {self} does not exist")
193 return ResourceInfo(
194 uri=str(self),
195 is_file=False,
196 size=0,
197 last_modified=None,
198 checksums={},
199 )
201 if self.dirLike:
202 if not self.exists():
203 raise FileNotFoundError(f"Resource {self} does not exist")
204 return ResourceInfo(
205 uri=str(self),
206 is_file=False,
207 size=0,
208 last_modified=None,
209 checksums={},
210 )
212 try:
213 self.blob.reload(retry=_RETRY_POLICY)
214 except NotFound:
215 raise FileNotFoundError(f"Resource {self} does not exist") from None
217 size = self.blob.size
218 if size is None:
219 raise FileNotFoundError(f"Resource {self} does not exist")
221 checksums = {}
222 if self.blob.md5_hash:
223 checksums["md5"] = self.blob.md5_hash
224 if self.blob.crc32c:
225 checksums["crc32c"] = self.blob.crc32c
227 try:
228 updated = _coerce_gcs_datetime(self.blob.updated)
229 except ValueError:
230 updated = _coerce_gcs_datetime(self.blob._properties.get("updated"))
232 return ResourceInfo(
233 uri=str(self),
234 is_file=True,
235 size=size,
236 last_modified=updated,
237 checksums=checksums,
238 )
240 def remove(self) -> None:
241 try:
242 self.blob.delete(retry=_RETRY_POLICY)
243 except NotFound as e:
244 raise FileNotFoundError(f"No such resource: {self}") from e
246 def read(self, size: int = -1) -> bytes:
247 if size < 0:
248 start = None
249 end = None
250 else:
251 start = 0
252 end = size - 1
253 try:
254 with time_this(log, msg="Read from %s", args=(self,)):
255 body = self.blob.download_as_bytes(start=start, end=end, retry=_RETRY_POLICY)
256 except NotFound as e:
257 raise FileNotFoundError(f"No such resource: {self}") from e
258 return body
260 def write(self, data: bytes, overwrite: bool = True) -> None:
261 if not overwrite and self.exists():
262 raise FileExistsError(f"Remote resource {self} exists and overwrite has been disabled")
263 with time_this(log, msg="Write to %s", args=(self,)):
264 self.blob.upload_from_string(data, retry=_RETRY_POLICY)
266 def mkdir(self) -> None:
267 if not self.bucket.exists(retry=_RETRY_POLICY):
268 raise ValueError(f"Bucket {self.netloc} does not exist for {self}!")
270 if not self.dirLike:
271 raise NotADirectoryError(f"Can not create a 'directory' for a file-like URI {self}")
273 # GCS does not have directory objects, so mkdir is a no-op once the
274 # bucket exists.
275 return
277 @contextlib.contextmanager
278 def _as_local(
279 self, multithreaded: bool = True, tmpdir: ResourcePath | None = None
280 ) -> Iterator[ResourcePath]:
281 with (
282 ResourcePath.temporary_uri(prefix=tmpdir, suffix=self.getExtension(), delete=True) as tmp_uri,
283 time_this(log, msg="Downloading %s to local file", args=(self,)),
284 ):
285 try:
286 with tmp_uri.open("wb") as tmpFile:
287 self.blob.download_to_file(tmpFile, retry=_RETRY_POLICY)
288 yield tmp_uri
289 except NotFound as e:
290 raise FileNotFoundError(f"No such resource: {self}") from e
292 def transfer_from(
293 self,
294 src: ResourcePath,
295 transfer: str = "copy",
296 overwrite: bool = False,
297 transaction: TransactionProtocol | None = None,
298 multithreaded: bool = True,
299 ) -> None:
300 if transfer not in self.transferModes:
301 raise ValueError(f"Transfer mode '{transfer}' not supported by URI scheme {self.scheme}")
303 # Existence checks cost time so do not call this unless we know
304 # that debugging is enabled.
305 if log.isEnabledFor(logging.DEBUG):
306 log.debug(
307 "Transferring %s [exists: %s] -> %s [exists: %s] (transfer=%s)",
308 src,
309 src.exists(),
310 self,
311 self.exists(),
312 transfer,
313 )
315 # Short circuit if the URIs are identical immediately.
316 if self == src:
317 log.debug(
318 "Target and destination URIs are identical: %s, returning immediately."
319 " No further action required.",
320 self,
321 )
322 return
324 if not overwrite and self.exists():
325 raise FileExistsError(f"Destination path '{self}' already exists.")
327 if transfer == "auto":
328 transfer = self.transferDefault
330 timer_msg = "Transfer from %s to %s"
331 timer_args = (src, self)
333 if isinstance(src, type(self)):
334 # Looks like a GS remote uri so we can use direct copy
335 with time_this(log, msg=timer_msg, args=timer_args):
336 rewrite_token = None
337 while True:
338 try:
339 rewrite_token, bytes_copied, total_bytes = self.blob.rewrite(
340 src.blob, token=rewrite_token, retry=_RETRY_POLICY
341 )
342 except NotFound as e:
343 raise FileNotFoundError("No such resource to transfer: {self}") from e
344 log.debug("Copied %d bytes out of %d (%s to %s)", bytes_copied, total_bytes, src, self)
345 if rewrite_token is None:
346 # Copy has completed
347 break
348 else:
349 # Use local file and upload it
350 with (
351 src.as_local(multithreaded=multithreaded) as local_uri,
352 time_this(log, msg=timer_msg, args=timer_args),
353 ):
354 self.blob.upload_from_filename(local_uri.ospath, retry=_RETRY_POLICY)
356 # This was an explicit move requested from a remote resource
357 # try to remove that resource
358 if transfer == "move":
359 # Transactions do not work here
360 src.remove()
362 @contextlib.contextmanager
363 def open(
364 self,
365 mode: str = "r",
366 *,
367 encoding: str | None = None,
368 prefer_file_temporary: bool = False,
369 ) -> Iterator[ResourceHandleProtocol]:
370 # Docstring inherited
371 if self.isdir() or self.is_root:
372 raise IsADirectoryError(f"Can not 'open' a directory URI: {self}")
373 if "x" in mode:
374 if self.exists():
375 raise FileExistsError(f"File at {self} already exists.")
376 mode = mode.replace("x", "w")
378 # Clear the blob before calling open if we are in write mode.
379 # This ensures that everything is resynced.
380 if "w" in mode:
381 self._blob = None
383 # The GCS API does not support append or read/write modes so for
384 # those we use the base class implementation.
385 # There seems to be a bug in the Google open() API where it does not
386 # properly write a BOM at the start of the file in UTF-16 encoding
387 # which leads to python not being able to read the contents back.
388 if "+" in mode or "a" in mode or ("w" in mode and encoding == "utf-16"):
389 with super().open(mode, encoding=encoding, prefer_file_temporary=prefer_file_temporary) as buffer:
390 yield buffer
391 else:
392 with self.blob.open(mode, encoding=encoding, retry=_RETRY_POLICY) as buffer:
393 yield buffer
395 def walk(
396 self, file_filter: str | re.Pattern | None = None
397 ) -> Iterator[list | tuple[ResourcePath, list[str], list[str]]]:
398 # We pretend that GCS uses directories and files and not simply keys.
399 if not (self.isdir() or self.is_root):
400 raise ValueError(f"Can not walk a non-directory URI: {self}")
402 if isinstance(file_filter, str):
403 file_filter = re.compile(file_filter)
405 # Limit each query to a single "directory" to match os.walk
406 # We could download all keys at once with no delimiter and work
407 # it out locally but this could potentially lead to large memory
408 # usage for millions of keys. It will also make the initial call
409 # to this method potentially very slow. If making this method look
410 # like os.walk was not required, we could query all keys with
411 # pagination and return them in groups of 1000, but that would
412 # be a different interface since we can't guarantee we would get
413 # them all grouped properly across the 1000 limit boundary.
414 prefix = self.relativeToPathRoot if not self.is_root else ""
415 prefix_len = len(prefix)
416 dirnames: set[str] = set()
417 filenames = []
418 files_there = False
420 blobs = self.client.list_blobs(self.bucket, prefix=prefix, delimiter="/", retry=_RETRY_POLICY)
421 for page in blobs.pages:
422 # "Sub-directories" turn up as prefixes in each page.
423 dirnames.update(dir[prefix_len:] for dir in page.prefixes)
425 # Files are reported for this "directory" only.
426 # The prefix itself can be included as a file because we write
427 # a zero-length file for mkdir(). These must be filtered out.
428 found_files = [f.name[prefix_len:] for f in page if f.name != prefix]
429 if file_filter is not None:
430 found_files = [f for f in found_files if file_filter.search(f)]
431 if found_files:
432 files_there = True
434 filenames.extend(found_files)
436 if not dirnames and not files_there:
437 # Nothing found so match os.walk and return immediately.
438 return
439 else:
440 yield self, sorted(dirnames), filenames
442 for dir in sorted(dirnames):
443 new_uri = self.join(dir)
444 yield from new_uri.walk(file_filter)