Coverage for python / lsst / resources / gs.py: 19%

219 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-28 08:32 +0000

1# This file is part of lsst-resources. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12"""Accessing Google Cloud Storage resources.""" 

13 

14from __future__ import annotations 

15 

16__all__ = ("GSResourcePath",) 

17 

18import contextlib 

19import datetime 

20import logging 

21import re 

22from collections.abc import Iterator 

23from typing import TYPE_CHECKING 

24 

25from ._resourceHandles._baseResourceHandle import ResourceHandleProtocol 

26 

27try: 

28 import google.api_core.retry as retry 

29 import google.cloud.storage as storage 

30 from google.cloud.exceptions import ( 

31 BadGateway, 

32 InternalServerError, 

33 NotFound, 

34 ServiceUnavailable, 

35 TooManyRequests, 

36 ) 

37except ImportError: 

38 storage = None 

39 retry = None 

40 

41 # Must also fake the exception classes. 

42 class ClientError(Exception): 

43 """Generic client error.""" 

44 

45 pass 

46 

47 class NotFound(ClientError): # type: ignore # noqa: N818 

48 """Resource not found error.""" 

49 

50 pass 

51 

52 class TooManyRequests(ClientError): # type: ignore # noqa: N818 

53 """Too many requests error.""" 

54 

55 pass 

56 

57 class InternalServerError(ClientError): # type: ignore 

58 """Internal server error.""" 

59 

60 pass 

61 

62 class BadGateway(ClientError): # type: ignore # noqa: N818 

63 """Bad gateway error.""" 

64 

65 pass 

66 

67 class ServiceUnavailable(ClientError): # type: ignore # noqa: N818 

68 """Service unavailable error.""" 

69 

70 pass 

71 

72 

73from lsst.utils.timer import time_this 

74 

75from ._resourcePath import ResourceInfo, ResourcePath 

76 

77if TYPE_CHECKING: 

78 from .utils import TransactionProtocol 

79 

80log = logging.getLogger(__name__) 

81 

82 

83_RETRIEVABLE_TYPES = ( 

84 TooManyRequests, # 429 

85 InternalServerError, # 500 

86 BadGateway, # 502 

87 ServiceUnavailable, # 503 

88) 

89 

90 

91def is_retryable(exc: Exception) -> bool: 

92 """Report if the given exception is a condition that can be retried. 

93 

94 Parameters 

95 ---------- 

96 exc : `Exception` 

97 Exception to check. 

98 

99 Returns 

100 ------- 

101 `bool` 

102 Returns `True` if the given exception is a condition that can be 

103 retried. 

104 """ 

105 return isinstance(exc, _RETRIEVABLE_TYPES) 

106 

107 

108_RETRY_POLICY = retry.Retry(predicate=is_retryable) if retry else None 

109 

110 

111_client = None 

112"""Cached client connection.""" 

113 

114 

115def _coerce_gcs_datetime(value: datetime.datetime | str | None) -> datetime.datetime | None: 

116 """Convert GCS timestamp values to timezone-aware UTC datetimes. 

117 

118 Some emulators return RFC3339 timestamps with an explicit UTC offset 

119 instead of a trailing ``Z``, which the google-cloud-storage property 

120 accessors do not always accept. 

121 """ 

122 if value is None: 

123 return None 

124 if isinstance(value, datetime.datetime): 

125 if value.tzinfo is None: 

126 return value.replace(tzinfo=datetime.UTC) 

127 return value.astimezone(datetime.UTC) 

128 if value.endswith("Z"): 

129 value = value[:-1] + "+00:00" 

130 return datetime.datetime.fromisoformat(value).astimezone(datetime.UTC) 

131 

132 

133def _get_client() -> storage.Client: 

134 global _client 

135 if storage is None: 

136 raise ImportError("google-cloud-storage package not installed. Unable to communicate with GCS.") 

137 if _client is None: 

138 _client = storage.Client() 

139 return _client 

140 

141 

142class GSResourcePath(ResourcePath): 

143 """Access Google Cloud Storage resources.""" 

144 

145 _bucket: storage.Bucket | None = None 

146 _blob: storage.Blob | None = None 

147 _client: storage.Client | None = None 

148 

149 @property 

150 def client(self) -> storage.Client: 

151 return _get_client() 

152 

153 @property 

154 def bucket(self) -> storage.Bucket: 

155 if self._bucket is None: 

156 self._bucket = self.client.bucket(self.netloc) 

157 return self._bucket 

158 

159 @property 

160 def blob(self) -> storage.Blob: 

161 if self._blob is None: 

162 self._blob = self.bucket.blob(self.relativeToPathRoot) 

163 return self._blob 

164 

165 def exists(self) -> bool: 

166 if self.is_root: 

167 return self.bucket.exists(retry=_RETRY_POLICY) 

168 if self.dirLike: 

169 # GCS does not have concrete directory objects; treat any 

170 # directory-like path within an existing bucket as existing. 

171 return self.bucket.exists(retry=_RETRY_POLICY) 

172 return self.blob.exists(retry=_RETRY_POLICY) 

173 

174 def size(self) -> int: 

175 if self.dirLike: 

176 return 0 

177 # The first time this is called we need to sync from the remote. 

178 # Force the blob to be recalculated. 

179 try: 

180 self.blob.reload(retry=_RETRY_POLICY) 

181 except NotFound: 

182 raise FileNotFoundError(f"Resource {self} does not exist") from None 

183 size = self.blob.size 

184 if size is None: 

185 raise FileNotFoundError(f"Resource {self} does not exist") 

186 return size 

187 

188 def get_info(self) -> ResourceInfo: 

189 """Return lightweight metadata about this GCS resource.""" 

190 if self.is_root: 

191 if not self.bucket.exists(retry=_RETRY_POLICY): 

192 raise FileNotFoundError(f"Resource {self} does not exist") 

193 return ResourceInfo( 

194 uri=str(self), 

195 is_file=False, 

196 size=0, 

197 last_modified=None, 

198 checksums={}, 

199 ) 

200 

201 if self.dirLike: 

202 if not self.exists(): 

203 raise FileNotFoundError(f"Resource {self} does not exist") 

204 return ResourceInfo( 

205 uri=str(self), 

206 is_file=False, 

207 size=0, 

208 last_modified=None, 

209 checksums={}, 

210 ) 

211 

212 try: 

213 self.blob.reload(retry=_RETRY_POLICY) 

214 except NotFound: 

215 raise FileNotFoundError(f"Resource {self} does not exist") from None 

216 

217 size = self.blob.size 

218 if size is None: 

219 raise FileNotFoundError(f"Resource {self} does not exist") 

220 

221 checksums = {} 

222 if self.blob.md5_hash: 

223 checksums["md5"] = self.blob.md5_hash 

224 if self.blob.crc32c: 

225 checksums["crc32c"] = self.blob.crc32c 

226 

227 try: 

228 updated = _coerce_gcs_datetime(self.blob.updated) 

229 except ValueError: 

230 updated = _coerce_gcs_datetime(self.blob._properties.get("updated")) 

231 

232 return ResourceInfo( 

233 uri=str(self), 

234 is_file=True, 

235 size=size, 

236 last_modified=updated, 

237 checksums=checksums, 

238 ) 

239 

240 def remove(self) -> None: 

241 try: 

242 self.blob.delete(retry=_RETRY_POLICY) 

243 except NotFound as e: 

244 raise FileNotFoundError(f"No such resource: {self}") from e 

245 

246 def read(self, size: int = -1) -> bytes: 

247 if size < 0: 

248 start = None 

249 end = None 

250 else: 

251 start = 0 

252 end = size - 1 

253 try: 

254 with time_this(log, msg="Read from %s", args=(self,)): 

255 body = self.blob.download_as_bytes(start=start, end=end, retry=_RETRY_POLICY) 

256 except NotFound as e: 

257 raise FileNotFoundError(f"No such resource: {self}") from e 

258 return body 

259 

260 def write(self, data: bytes, overwrite: bool = True) -> None: 

261 if not overwrite and self.exists(): 

262 raise FileExistsError(f"Remote resource {self} exists and overwrite has been disabled") 

263 with time_this(log, msg="Write to %s", args=(self,)): 

264 self.blob.upload_from_string(data, retry=_RETRY_POLICY) 

265 

266 def mkdir(self) -> None: 

267 if not self.bucket.exists(retry=_RETRY_POLICY): 

268 raise ValueError(f"Bucket {self.netloc} does not exist for {self}!") 

269 

270 if not self.dirLike: 

271 raise NotADirectoryError(f"Can not create a 'directory' for a file-like URI {self}") 

272 

273 # GCS does not have directory objects, so mkdir is a no-op once the 

274 # bucket exists. 

275 return 

276 

277 @contextlib.contextmanager 

278 def _as_local( 

279 self, multithreaded: bool = True, tmpdir: ResourcePath | None = None 

280 ) -> Iterator[ResourcePath]: 

281 with ( 

282 ResourcePath.temporary_uri(prefix=tmpdir, suffix=self.getExtension(), delete=True) as tmp_uri, 

283 time_this(log, msg="Downloading %s to local file", args=(self,)), 

284 ): 

285 try: 

286 with tmp_uri.open("wb") as tmpFile: 

287 self.blob.download_to_file(tmpFile, retry=_RETRY_POLICY) 

288 yield tmp_uri 

289 except NotFound as e: 

290 raise FileNotFoundError(f"No such resource: {self}") from e 

291 

292 def transfer_from( 

293 self, 

294 src: ResourcePath, 

295 transfer: str = "copy", 

296 overwrite: bool = False, 

297 transaction: TransactionProtocol | None = None, 

298 multithreaded: bool = True, 

299 ) -> None: 

300 if transfer not in self.transferModes: 

301 raise ValueError(f"Transfer mode '{transfer}' not supported by URI scheme {self.scheme}") 

302 

303 # Existence checks cost time so do not call this unless we know 

304 # that debugging is enabled. 

305 if log.isEnabledFor(logging.DEBUG): 

306 log.debug( 

307 "Transferring %s [exists: %s] -> %s [exists: %s] (transfer=%s)", 

308 src, 

309 src.exists(), 

310 self, 

311 self.exists(), 

312 transfer, 

313 ) 

314 

315 # Short circuit if the URIs are identical immediately. 

316 if self == src: 

317 log.debug( 

318 "Target and destination URIs are identical: %s, returning immediately." 

319 " No further action required.", 

320 self, 

321 ) 

322 return 

323 

324 if not overwrite and self.exists(): 

325 raise FileExistsError(f"Destination path '{self}' already exists.") 

326 

327 if transfer == "auto": 

328 transfer = self.transferDefault 

329 

330 timer_msg = "Transfer from %s to %s" 

331 timer_args = (src, self) 

332 

333 if isinstance(src, type(self)): 

334 # Looks like a GS remote uri so we can use direct copy 

335 with time_this(log, msg=timer_msg, args=timer_args): 

336 rewrite_token = None 

337 while True: 

338 try: 

339 rewrite_token, bytes_copied, total_bytes = self.blob.rewrite( 

340 src.blob, token=rewrite_token, retry=_RETRY_POLICY 

341 ) 

342 except NotFound as e: 

343 raise FileNotFoundError("No such resource to transfer: {self}") from e 

344 log.debug("Copied %d bytes out of %d (%s to %s)", bytes_copied, total_bytes, src, self) 

345 if rewrite_token is None: 

346 # Copy has completed 

347 break 

348 else: 

349 # Use local file and upload it 

350 with ( 

351 src.as_local(multithreaded=multithreaded) as local_uri, 

352 time_this(log, msg=timer_msg, args=timer_args), 

353 ): 

354 self.blob.upload_from_filename(local_uri.ospath, retry=_RETRY_POLICY) 

355 

356 # This was an explicit move requested from a remote resource 

357 # try to remove that resource 

358 if transfer == "move": 

359 # Transactions do not work here 

360 src.remove() 

361 

362 @contextlib.contextmanager 

363 def open( 

364 self, 

365 mode: str = "r", 

366 *, 

367 encoding: str | None = None, 

368 prefer_file_temporary: bool = False, 

369 ) -> Iterator[ResourceHandleProtocol]: 

370 # Docstring inherited 

371 if self.isdir() or self.is_root: 

372 raise IsADirectoryError(f"Can not 'open' a directory URI: {self}") 

373 if "x" in mode: 

374 if self.exists(): 

375 raise FileExistsError(f"File at {self} already exists.") 

376 mode = mode.replace("x", "w") 

377 

378 # Clear the blob before calling open if we are in write mode. 

379 # This ensures that everything is resynced. 

380 if "w" in mode: 

381 self._blob = None 

382 

383 # The GCS API does not support append or read/write modes so for 

384 # those we use the base class implementation. 

385 # There seems to be a bug in the Google open() API where it does not 

386 # properly write a BOM at the start of the file in UTF-16 encoding 

387 # which leads to python not being able to read the contents back. 

388 if "+" in mode or "a" in mode or ("w" in mode and encoding == "utf-16"): 

389 with super().open(mode, encoding=encoding, prefer_file_temporary=prefer_file_temporary) as buffer: 

390 yield buffer 

391 else: 

392 with self.blob.open(mode, encoding=encoding, retry=_RETRY_POLICY) as buffer: 

393 yield buffer 

394 

395 def walk( 

396 self, file_filter: str | re.Pattern | None = None 

397 ) -> Iterator[list | tuple[ResourcePath, list[str], list[str]]]: 

398 # We pretend that GCS uses directories and files and not simply keys. 

399 if not (self.isdir() or self.is_root): 

400 raise ValueError(f"Can not walk a non-directory URI: {self}") 

401 

402 if isinstance(file_filter, str): 

403 file_filter = re.compile(file_filter) 

404 

405 # Limit each query to a single "directory" to match os.walk 

406 # We could download all keys at once with no delimiter and work 

407 # it out locally but this could potentially lead to large memory 

408 # usage for millions of keys. It will also make the initial call 

409 # to this method potentially very slow. If making this method look 

410 # like os.walk was not required, we could query all keys with 

411 # pagination and return them in groups of 1000, but that would 

412 # be a different interface since we can't guarantee we would get 

413 # them all grouped properly across the 1000 limit boundary. 

414 prefix = self.relativeToPathRoot if not self.is_root else "" 

415 prefix_len = len(prefix) 

416 dirnames: set[str] = set() 

417 filenames = [] 

418 files_there = False 

419 

420 blobs = self.client.list_blobs(self.bucket, prefix=prefix, delimiter="/", retry=_RETRY_POLICY) 

421 for page in blobs.pages: 

422 # "Sub-directories" turn up as prefixes in each page. 

423 dirnames.update(dir[prefix_len:] for dir in page.prefixes) 

424 

425 # Files are reported for this "directory" only. 

426 # The prefix itself can be included as a file because we write 

427 # a zero-length file for mkdir(). These must be filtered out. 

428 found_files = [f.name[prefix_len:] for f in page if f.name != prefix] 

429 if file_filter is not None: 

430 found_files = [f for f in found_files if file_filter.search(f)] 

431 if found_files: 

432 files_there = True 

433 

434 filenames.extend(found_files) 

435 

436 if not dirnames and not files_there: 

437 # Nothing found so match os.walk and return immediately. 

438 return 

439 else: 

440 yield self, sorted(dirnames), filenames 

441 

442 for dir in sorted(dirnames): 

443 new_uri = self.join(dir) 

444 yield from new_uri.walk(file_filter)