Coverage for python / lsst / resources / proxied.py: 37%
85 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 08:44 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 08:44 +0000
1# This file is part of lsst-resources.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14__all__ = ("ProxiedResourcePath",)
16import contextlib
17import dataclasses
18import logging
19import re
20from abc import ABC, abstractmethod
21from collections.abc import Iterator
23from ._resourcePath import ResourceHandleProtocol, ResourceInfo, ResourcePath, ResourcePathExpression
24from .utils import TransactionProtocol
26try:
27 import fsspec
28 from fsspec.spec import AbstractFileSystem
29except ImportError:
30 fsspec = None
31 AbstractFileSystem = type
34log = logging.getLogger(__name__)
37class ProxiedResourcePath(ABC, ResourcePath):
38 """URI that is represented internally by another type of URI for file I/O.
40 For example ``abc://xyz/file.txt`` could be the public URI form but
41 internally all file access is forwarded to a ``file`` URI.
42 """
44 _proxy: ResourcePath | None = None
46 @abstractmethod
47 def _set_proxy(self) -> None:
48 """Calculate the internal `ResourcePath` corresponding to the public
49 version.
50 """
51 raise NotImplementedError("Proxy must be configured")
53 def _get_proxy(self) -> ResourcePath:
54 """Retrieve the proxied ResourcePath."""
55 proxy = self._proxy
56 if proxy is None:
57 raise FileNotFoundError(f"Internal error: No proxy ResourcePath available for {self}")
58 return proxy
60 def to_fsspec(self) -> tuple[AbstractFileSystem, str]:
61 try:
62 proxy = self._get_proxy()
63 except FileNotFoundError:
64 raise NotImplementedError(f"No proxy registered for {self}. Resource does not exist.") from None
65 return proxy.to_fsspec()
67 def isdir(self) -> bool:
68 if self.dirLike is None:
69 try:
70 proxy = self._get_proxy()
71 except FileNotFoundError:
72 return False
73 self.dirLike = proxy.isdir()
74 return self.dirLike
76 def exists(self) -> bool:
77 try:
78 proxy = self._get_proxy()
79 except FileNotFoundError:
80 # If there is no proxy registered then the resource can not exist.
81 return False
82 return proxy.exists()
84 def remove(self) -> None:
85 proxy = self._get_proxy()
86 proxy.remove()
88 def read(self, size: int = -1) -> bytes:
89 proxy = self._get_proxy()
90 return proxy.read(size=size)
92 @contextlib.contextmanager
93 def as_local(
94 self, multithreaded: bool = True, tmpdir: ResourcePathExpression | None = None
95 ) -> Iterator[ResourcePath]:
96 proxy = self._get_proxy()
97 with proxy.as_local(multithreaded=multithreaded, tmpdir=tmpdir) as loc:
98 yield loc
100 @contextlib.contextmanager
101 def open(
102 self,
103 mode: str = "r",
104 *,
105 encoding: str | None = None,
106 prefer_file_temporary: bool = False,
107 ) -> Iterator[ResourceHandleProtocol]:
108 proxy = self._get_proxy()
109 with proxy.open(mode, encoding=encoding, prefer_file_temporary=prefer_file_temporary) as fh:
110 yield fh
112 def walk(
113 self, file_filter: str | re.Pattern | None = None
114 ) -> Iterator[list | tuple[ResourcePath, list[str], list[str]]]:
115 try:
116 proxy = self._get_proxy()
117 except FileNotFoundError as e:
118 raise ValueError(str(e)) from None
119 for proxied_root, dirs, files in proxy.walk(file_filter=file_filter):
120 # Need to return the directory in the original form and not the
121 # proxy form.
122 relative_to_self = proxied_root.path.removeprefix(proxy.path)
123 root = self.replace(path=self._pathModule.join(self.path, relative_to_self))
124 yield root, dirs, files
126 def size(self) -> int:
127 proxy = self._get_proxy()
128 return proxy.size()
130 def get_info(self) -> ResourceInfo:
131 proxy = self._get_proxy()
132 return dataclasses.replace(proxy.get_info(), uri=str(self))
134 def write(self, data: bytes, overwrite: bool = True) -> None:
135 proxy = self._get_proxy()
136 proxy.write(data, overwrite=overwrite)
138 def mkdir(self) -> None:
139 proxy = self._get_proxy()
140 proxy.mkdir()
142 def transfer_from(
143 self,
144 src: ResourcePath,
145 transfer: str = "copy",
146 overwrite: bool = False,
147 transaction: TransactionProtocol | None = None,
148 multithreaded: bool = True,
149 ) -> None:
150 proxy = self._get_proxy()
151 proxy.transfer_from(
152 src, transfer=transfer, overwrite=overwrite, transaction=transaction, multithreaded=multithreaded
153 )