Coverage for python / lsst / resources / _resourcePath.py: 22%
578 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:32 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:32 +0000
1# This file is part of lsst-resources.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14__all__ = ("ResourceInfo", "ResourcePath", "ResourcePathExpression")
16import concurrent.futures
17import contextlib
18import copy
19import dataclasses
20import datetime
21import io
22import locale
23import logging
24import os
25import posixpath
26import re
27import sys
28import urllib.parse
29from collections import defaultdict
30from pathlib import Path, PurePath, PurePosixPath
31from random import Random
32from typing import TypeAlias
34try:
35 import fsspec
36 from fsspec.spec import AbstractFileSystem
37except ImportError:
38 fsspec = None
39 AbstractFileSystem = type
41from collections.abc import Iterable, Iterator
42from typing import TYPE_CHECKING, Any, Literal, NamedTuple, overload
44from ._resourceHandles._baseResourceHandle import ResourceHandleProtocol
45from .utils import _get_num_workers, get_tempdir
47if TYPE_CHECKING:
48 from .utils import TransactionProtocol
51log = logging.getLogger(__name__)
53# Regex for looking for URI escapes
54ESCAPES_RE = re.compile(r"%[A-F0-9]{2}")
56# Precomputed escaped hash
57ESCAPED_HASH = urllib.parse.quote("#")
60class MBulkResult(NamedTuple):
61 """Report on a bulk operation."""
63 success: bool
64 exception: Exception | None
67_EXECUTOR_TYPE: TypeAlias = type[
68 concurrent.futures.ThreadPoolExecutor | concurrent.futures.ProcessPoolExecutor
69]
71# Cache value for executor class so as not to issue warning multiple
72# times but still allow tests to override the value.
73_POOL_EXECUTOR_CLASS: _EXECUTOR_TYPE | None = None
76def _get_executor_class() -> _EXECUTOR_TYPE:
77 """Return the executor class used for parallelized execution.
79 Returns
80 -------
81 cls : `concurrent.futures.Executor`
82 The ``Executor`` class. Default is
83 `concurrent.futures.ThreadPoolExecutor`. Can be set explicitly by
84 setting the ``$LSST_RESOURCES_EXECUTOR`` environment variable to
85 "thread" or "process". Returns "thread" pool if the value of the
86 variable is not recognized.
87 """
88 global _POOL_EXECUTOR_CLASS
90 if _POOL_EXECUTOR_CLASS is not None:
91 return _POOL_EXECUTOR_CLASS
93 pool_executor_classes = {
94 "threads": concurrent.futures.ThreadPoolExecutor,
95 "process": concurrent.futures.ProcessPoolExecutor,
96 }
97 default_executor = "threads"
98 external = os.getenv("LSST_RESOURCES_EXECUTOR", default_executor)
99 if not external:
100 external = default_executor
101 if external not in pool_executor_classes:
102 log.warning(
103 "Unrecognized value of '%s' for LSST_RESOURCES_EXECUTOR env var. Using '%s'",
104 external,
105 default_executor,
106 )
107 external = default_executor
108 _POOL_EXECUTOR_CLASS = pool_executor_classes[external]
109 return _POOL_EXECUTOR_CLASS
112@contextlib.contextmanager
113def _patch_environ(new_values: dict[str, str]) -> Iterator[None]:
114 """Patch os.environ temporarily using the supplied values.
116 Parameters
117 ----------
118 new_values : `dict` [ `str`, `str` ]
119 New values to be stored in the environment.
120 """
121 old_values: dict[str, str] = {}
122 for k, v in new_values.items():
123 if k in os.environ:
124 old_values[k] = os.environ[k]
125 os.environ[k] = v
127 try:
128 yield
129 finally:
130 for k in new_values:
131 del os.environ[k]
132 if k in old_values:
133 os.environ[k] = old_values[k]
136@dataclasses.dataclass(frozen=True)
137class ResourceInfo:
138 """Information about this resource."""
140 uri: str
141 """URI in string form of the resource from which this information is
142 derived.
143 """
144 is_file: bool
145 """Indicate whether the resource is a file or a directory."""
146 size: int
147 """Size of the file in bytes. A directory or a URI that has no concept
148 of size returns 0."""
149 last_modified: datetime.datetime | None
150 """Modification date of the resource, if known."""
151 checksums: dict[str, Any]
152 """Checksums for this file. Supported checksum implementations are
153 backend dependent.
154 """
157class ResourcePath: # numpydoc ignore=PR02
158 """Convenience wrapper around URI parsers.
160 Provides access to URI components and can convert file
161 paths into absolute path URIs. Scheme-less URIs are treated as if
162 they are local file system paths and are converted to absolute URIs.
164 A specialist subclass is created for each supported URI scheme.
166 Parameters
167 ----------
168 uri : `str`, `pathlib.Path`, `urllib.parse.ParseResult`, or `ResourcePath`
169 URI in string form. Can be scheme-less if referring to a relative
170 path or an absolute path on the local file system.
171 root : `str` or `ResourcePath`, optional
172 When fixing up a relative path in a ``file`` scheme or if scheme-less,
173 use this as the root. Must be absolute. If `None` the current
174 working directory will be used. Can be any supported URI scheme.
175 Not used if ``forceAbsolute`` is `False`.
176 forceAbsolute : `bool`, optional
177 If `True`, scheme-less relative URI will be converted to an absolute
178 path using a ``file`` scheme. If `False` scheme-less URI will remain
179 scheme-less and will not be updated to ``file`` or absolute path unless
180 it is already an absolute path, in which case it will be updated to
181 a ``file`` scheme.
182 forceDirectory : `bool` or `None`, optional
183 If `True` forces the URI to end with a separator. If `False` the URI
184 is interpreted as a file-like entity. Default, `None`, is that the
185 given URI is interpreted as a directory if there is a trailing ``/`` or
186 for some schemes the system will check to see if it is a file or a
187 directory.
188 isTemporary : `bool`, optional
189 If `True` indicates that this URI points to a temporary resource.
190 The default is `False`, unless ``uri`` is already a `ResourcePath`
191 instance and ``uri.isTemporary is True``.
193 Notes
194 -----
195 A non-standard URI of the form ``file:dir/file.txt`` is always converted
196 to an absolute ``file`` URI.
197 """
199 _pathLib: type[PurePath] = PurePosixPath
200 """Path library to use for this scheme."""
202 _pathModule = posixpath
203 """Path module to use for this scheme."""
205 transferModes: tuple[str, ...] = ("copy", "auto", "move")
206 """Transfer modes supported by this implementation.
208 Move is special in that it is generally a copy followed by an unlink.
209 Whether that unlink works depends critically on whether the source URI
210 implements unlink. If it does not the move will be reported as a failure.
211 """
213 transferDefault: str = "copy"
214 """Default mode to use for transferring if ``auto`` is specified."""
216 quotePaths = True
217 """True if path-like elements modifying a URI should be quoted.
219 All non-schemeless URIs have to internally use quoted paths. Therefore
220 if a new file name is given (e.g. to updatedFile or join) a decision must
221 be made whether to quote it to be consistent.
222 """
224 isLocal = False
225 """If `True` this URI refers to a local file."""
227 # This is not an ABC with abstract methods because the __new__ being
228 # a factory confuses mypy such that it assumes that every constructor
229 # returns a ResourcePath and then determines that all the abstract methods
230 # are still abstract. If they are not marked abstract but just raise
231 # mypy is fine with it.
233 # mypy is confused without these
234 _uri: urllib.parse.ParseResult
235 isTemporary: bool
236 dirLike: bool | None
237 """Whether the resource looks like a directory resource. `None` means that
238 the status is uncertain."""
240 def __new__(
241 cls,
242 uri: ResourcePathExpression,
243 root: str | ResourcePath | None = None,
244 forceAbsolute: bool = True,
245 forceDirectory: bool | None = None,
246 isTemporary: bool | None = None,
247 ) -> ResourcePath:
248 """Create and return new specialist ResourcePath subclass."""
249 parsed: urllib.parse.ParseResult
250 dirLike: bool | None = forceDirectory
251 subclass: type[ResourcePath] | None = None
253 # Force root to be a ResourcePath -- this simplifies downstream
254 # code.
255 if root is None:
256 root_uri = None
257 elif isinstance(root, str):
258 root_uri = ResourcePath(root, forceDirectory=True, forceAbsolute=True)
259 else:
260 root_uri = root
262 if isinstance(uri, os.PathLike):
263 uri = str(uri)
265 # Record if we need to post process the URI components
266 # or if the instance is already fully configured
267 if isinstance(uri, str):
268 # Since local file names can have special characters in them
269 # we need to quote them for the parser but we can unquote
270 # later. Assume that all other URI schemes are quoted.
271 # Since sometimes people write file:/a/b and not file:///a/b
272 # we should not quote in the explicit case of file:
273 if "://" not in uri and not uri.startswith("file:"):
274 if ESCAPES_RE.search(uri):
275 log.warning("Possible double encoding of %s", uri)
276 else:
277 # Fragments are generally not encoded so we must search
278 # for the fragment boundary ourselves. This is making
279 # an assumption that the filename does not include a "#"
280 # and also that there is no "/" in the fragment itself.
281 to_encode = uri
282 fragment = ""
283 if "#" in uri:
284 dirpos = uri.rfind("/")
285 trailing = uri[dirpos + 1 :]
286 hashpos = trailing.rfind("#")
287 if hashpos != -1:
288 fragment = trailing[hashpos:]
289 to_encode = uri[: dirpos + hashpos + 1]
291 uri = urllib.parse.quote(to_encode) + fragment
293 parsed = urllib.parse.urlparse(uri)
294 elif isinstance(uri, urllib.parse.ParseResult):
295 parsed = copy.copy(uri)
296 # If we are being instantiated with a subclass, rather than
297 # ResourcePath, ensure that that subclass is used directly.
298 # This could lead to inconsistencies if this constructor
299 # is used externally outside of the ResourcePath.replace() method.
300 # S3ResourcePath(urllib.parse.urlparse("file://a/b.txt"))
301 # will be a problem.
302 # This is needed to prevent a schemeless absolute URI become
303 # a file URI unexpectedly when calling updatedFile or
304 # updatedExtension
305 if cls is not ResourcePath:
306 parsed, dirLike = cls._fixDirectorySep(parsed, forceDirectory)
307 subclass = cls
309 elif isinstance(uri, ResourcePath):
310 # Since ResourcePath is immutable we can return the argument
311 # unchanged if it already agrees with forceDirectory, isTemporary,
312 # and forceAbsolute.
313 # We invoke __new__ again with str(self) to add a scheme for
314 # forceAbsolute, but for the others that seems more likely to paper
315 # over logic errors than do something useful, so we just raise.
316 if forceDirectory is not None and uri.dirLike is not None and forceDirectory is not uri.dirLike:
317 # Can not force a file-like URI to become a dir-like one or
318 # vice versa.
319 raise RuntimeError(
320 f"{uri} can not be forced to change directory vs file state when previously declared."
321 )
322 if isTemporary is not None and isTemporary is not uri.isTemporary:
323 raise RuntimeError(
324 f"{uri} is already a {'temporary' if uri.isTemporary else 'permanent'} "
325 f"ResourcePath; cannot make it {'temporary' if isTemporary else 'permanent'}."
326 )
328 if forceAbsolute and not uri.scheme:
329 # Create new absolute from relative.
330 return ResourcePath(
331 str(uri),
332 root=root,
333 forceAbsolute=forceAbsolute,
334 forceDirectory=forceDirectory or uri.dirLike,
335 isTemporary=uri.isTemporary,
336 )
337 elif forceDirectory is not None and uri.dirLike is None:
338 # Clone but with a new dirLike status.
339 return uri.replace(forceDirectory=forceDirectory)
340 return uri
341 else:
342 raise ValueError(
343 f"Supplied URI must be string, Path, ResourcePath, or ParseResult but got '{uri!r}'"
344 )
346 if subclass is None:
347 # Work out the subclass from the URI scheme
348 if not parsed.scheme:
349 # Root may be specified as a ResourcePath that overrides
350 # the schemeless determination.
351 if (
352 root_uri is not None
353 and root_uri.scheme != "file" # file scheme has different code path
354 and not parsed.path.startswith("/") # Not already absolute path
355 ):
356 if root_uri.dirLike is False:
357 raise ValueError(
358 f"Root URI ({root}) was not a directory so can not be joined with"
359 f" path {parsed.path!r}"
360 )
361 # If root is temporary or this schemeless is temporary we
362 # assume this URI is temporary.
363 isTemporary = isTemporary or root_uri.isTemporary
364 joined = root_uri.join(
365 parsed.path, forceDirectory=forceDirectory, isTemporary=isTemporary
366 )
368 # Rather than returning this new ResourcePath directly we
369 # instead extract the path and the scheme and adjust the
370 # URI we were given -- we need to do this to preserve
371 # fragments since join() will drop them.
372 parsed = parsed._replace(scheme=joined.scheme, path=joined.path, netloc=joined.netloc)
373 subclass = type(joined)
375 # Clear the root parameter to indicate that it has
376 # been applied already.
377 root_uri = None
378 else:
379 from .schemeless import SchemelessResourcePath
381 subclass = SchemelessResourcePath
382 elif parsed.scheme == "file":
383 from .file import FileResourcePath
385 subclass = FileResourcePath
386 elif parsed.scheme == "s3":
387 from .s3 import S3ResourcePath
389 subclass = S3ResourcePath
390 elif parsed.scheme.startswith("http"):
391 from .http import HttpResourcePath
393 subclass = HttpResourcePath
394 elif parsed.scheme in {"dav", "davs"}:
395 from .dav import DavResourcePath
397 subclass = DavResourcePath
398 elif parsed.scheme == "gs":
399 from .gs import GSResourcePath
401 subclass = GSResourcePath
402 elif parsed.scheme == "resource":
403 # Rules for scheme names disallow pkg_resource
404 from .packageresource import PackageResourcePath
406 subclass = PackageResourcePath
407 elif parsed.scheme == "mem":
408 # in-memory datastore object
409 from .mem import InMemoryResourcePath
411 subclass = InMemoryResourcePath
412 elif parsed.scheme == "eups":
413 # EUPS package root.
414 from .eups import EupsResourcePath
416 subclass = EupsResourcePath
417 else:
418 raise NotImplementedError(
419 f"No URI support for scheme: '{parsed.scheme}' in {parsed.geturl()}"
420 )
422 parsed, dirLike = subclass._fixupPathUri(
423 parsed, root=root_uri, forceAbsolute=forceAbsolute, forceDirectory=forceDirectory
424 )
426 # It is possible for the class to change from schemeless
427 # to file or eups so handle that
428 if parsed.scheme == "file":
429 from .file import FileResourcePath
431 subclass = FileResourcePath
432 elif parsed.scheme == "eups":
433 from .eups import EupsResourcePath
435 subclass = EupsResourcePath
437 # Now create an instance of the correct subclass and set the
438 # attributes directly
439 self = object.__new__(subclass)
440 self._uri = parsed
441 self.dirLike = dirLike
442 if isTemporary is None:
443 isTemporary = False
444 self.isTemporary = isTemporary
445 self._set_proxy()
446 return self
448 def _set_proxy(self) -> None:
449 """Calculate internal proxy for externally visible resource path."""
450 pass
452 @property
453 def scheme(self) -> str:
454 """Return the URI scheme.
456 Notes
457 -----
458 (``://`` is not part of the scheme).
459 """
460 return self._uri.scheme
462 @property
463 def netloc(self) -> str:
464 """Return the URI network location."""
465 return self._uri.netloc
467 @property
468 def path(self) -> str:
469 """Return the path component of the URI."""
470 return self._uri.path
472 @property
473 def unquoted_path(self) -> str:
474 """Return path component of the URI with any URI quoting reversed."""
475 return urllib.parse.unquote(self._uri.path)
477 @property
478 def ospath(self) -> str:
479 """Return the path component of the URI localized to current OS."""
480 raise AttributeError(f"Non-file URI ({self}) has no local OS path.")
482 @property
483 def relativeToPathRoot(self) -> str:
484 """Return path relative to network location.
486 This is the path property with posix separator stripped
487 from the left hand side of the path.
489 Always unquotes.
490 """
491 relToRoot = self.path.lstrip("/")
492 if relToRoot == "":
493 return "./"
494 return urllib.parse.unquote(relToRoot)
496 @property
497 def is_root(self) -> bool:
498 """Return whether this URI points to the root of the network location.
500 This means that the path components refers to the top level.
501 """
502 relpath = self.relativeToPathRoot
503 if relpath == "./":
504 return True
505 return False
507 @property
508 def fragment(self) -> str:
509 """Return the fragment component of the URI. May be quoted."""
510 return self._uri.fragment
512 @property
513 def unquoted_fragment(self) -> str:
514 """Return unquoted fragment."""
515 return urllib.parse.unquote(self.fragment)
517 @property
518 def params(self) -> str:
519 """Return any parameters included in the URI."""
520 return self._uri.params
522 @property
523 def query(self) -> str:
524 """Return any query strings included in the URI."""
525 return self._uri.query
527 def geturl(self) -> str:
528 """Return the URI in string form.
530 Returns
531 -------
532 url : `str`
533 String form of URI.
534 """
535 return self._uri.geturl()
537 def to_fsspec(self) -> tuple[AbstractFileSystem, str]:
538 """Return an abstract file system and path that can be used by fsspec.
540 Returns
541 -------
542 fs : `fsspec.spec.AbstractFileSystem`
543 A file system object suitable for use with the returned path.
544 path : `str`
545 A path that can be opened by the file system object.
546 """
547 if fsspec is None:
548 raise ImportError("fsspec is not available")
549 # By default give the URL to fsspec and hope.
550 return fsspec.url_to_fs(self.geturl())
552 def root_uri(self) -> ResourcePath:
553 """Return the base root URI.
555 Returns
556 -------
557 uri : `ResourcePath`
558 Root URI.
559 """
560 return self.replace(path="", query="", fragment="", params="", forceDirectory=True)
562 def split(self) -> tuple[ResourcePath, str]:
563 """Split URI into head and tail.
565 Returns
566 -------
567 head: `ResourcePath`
568 Everything leading up to tail, expanded and normalized as per
569 ResourcePath rules.
570 tail : `str`
571 Last path component. Tail will be empty if path ends on a
572 separator or if the URI is known to be associated with a directory.
573 Tail will never contain separators. It will be unquoted.
575 Notes
576 -----
577 Equivalent to `os.path.split` where head preserves the URI
578 components. In some cases this method can result in a file system
579 check to verify whether the URI is a directory or not (only if
580 ``forceDirectory`` was `None` during construction). For a scheme-less
581 URI this can mean that the result might change depending on current
582 working directory.
583 """
584 if self.isdir():
585 # This is known to be a directory so must return itself and
586 # the empty string.
587 return self, ""
589 head, tail = self._pathModule.split(self.path)
590 headuri = self._uri._replace(path=head, fragment="", query="", params="")
592 # The file part should never include quoted metacharacters
593 tail = urllib.parse.unquote(tail)
595 # Schemeless is special in that it can be a relative path.
596 # We need to ensure that it stays that way. All other URIs will
597 # be absolute already.
598 forceAbsolute = self.isabs()
599 return ResourcePath(headuri, forceDirectory=True, forceAbsolute=forceAbsolute), tail
601 def basename(self) -> str:
602 """Return the base name, last element of path, of the URI.
604 Returns
605 -------
606 tail : `str`
607 Last part of the path attribute. Trail will be empty if path ends
608 on a separator.
610 Notes
611 -----
612 If URI ends on a slash returns an empty string. This is the second
613 element returned by `split()`.
615 Equivalent of `os.path.basename`.
616 """
617 return self.split()[1]
619 def dirname(self) -> ResourcePath:
620 """Return the directory component of the path as a new `ResourcePath`.
622 Returns
623 -------
624 head : `ResourcePath`
625 Everything except the tail of path attribute, expanded and
626 normalized as per ResourcePath rules.
628 Notes
629 -----
630 Equivalent of `os.path.dirname`. If this is a directory URI it will
631 be returned unchanged. If the parent directory is always required
632 use `parent`.
633 """
634 return self.split()[0]
636 def parent(self) -> ResourcePath:
637 """Return a `ResourcePath` of the parent directory.
639 Returns
640 -------
641 head : `ResourcePath`
642 Everything except the tail of path attribute, expanded and
643 normalized as per `ResourcePath` rules.
645 Notes
646 -----
647 For a file-like URI this will be the same as calling `dirname`.
648 For a directory-like URI this will always return the parent directory
649 whereas `dirname()` will return the original URI. This is consistent
650 with `os.path.dirname` compared to the `pathlib.Path` property
651 ``parent``.
652 """
653 if self.dirLike is False:
654 # os.path.split() is slightly faster than calling Path().parent.
655 return self.dirname()
656 # When self is dir-like, returns its parent directory,
657 # regardless of the presence of a trailing separator
658 originalPath = self._pathLib(self.path)
659 parentPath = originalPath.parent
660 return self.replace(path=str(parentPath), forceDirectory=True, fragment="", query="", params="")
662 def replace(
663 self, forceDirectory: bool | None = None, isTemporary: bool = False, **kwargs: Any
664 ) -> ResourcePath:
665 """Return new `ResourcePath` with specified components replaced.
667 Parameters
668 ----------
669 forceDirectory : `bool` or `None`, optional
670 Parameter passed to ResourcePath constructor to force this
671 new URI to be dir-like or file-like.
672 isTemporary : `bool`, optional
673 Indicate that the resulting URI is temporary resource.
674 **kwargs
675 Components of a `urllib.parse.ParseResult` that should be
676 modified for the newly-created `ResourcePath`.
678 Returns
679 -------
680 new : `ResourcePath`
681 New `ResourcePath` object with updated values.
683 Notes
684 -----
685 Does not, for now, allow a change in URI scheme.
686 """
687 # Disallow a change in scheme
688 if "scheme" in kwargs:
689 raise ValueError(f"Can not use replace() method to change URI scheme for {self}")
690 result = self.__class__(
691 self._uri._replace(**kwargs), forceDirectory=forceDirectory, isTemporary=isTemporary
692 )
693 result._copy_extra_attributes(self)
694 return result
696 def updatedFile(self, newfile: str) -> ResourcePath:
697 """Return new URI with an updated final component of the path.
699 Parameters
700 ----------
701 newfile : `str`
702 File name with no path component.
704 Returns
705 -------
706 updated : `ResourcePath`
707 Updated `ResourcePath` with new updated final component.
709 Notes
710 -----
711 Forces the ``ResourcePath.dirLike`` attribute to be false. The new file
712 path will be quoted if necessary. If the current URI is known to
713 refer to a directory, the new file will be joined to the current file.
714 It is recommended that this behavior no longer be used and a call
715 to `isdir` by the caller should be used to decide whether to join or
716 replace. In the future this method may be modified to always replace
717 the final element of the path.
718 """
719 if self.dirLike:
720 return self.join(newfile, forceDirectory=False)
721 return self.parent().join(newfile, forceDirectory=False)
723 def updatedExtension(self, ext: str | None) -> ResourcePath:
724 """Return a new `ResourcePath` with updated file extension.
726 All file extensions are replaced.
728 Parameters
729 ----------
730 ext : `str` or `None`
731 New extension. If an empty string is given any extension will
732 be removed. If `None` is given there will be no change.
734 Returns
735 -------
736 updated : `ResourcePath`
737 URI with the specified extension. Can return itself if
738 no extension was specified.
739 """
740 if ext is None:
741 return self
743 # Get the extension
744 current = self.getExtension()
746 # Nothing to do if the extension already matches
747 if current == ext:
748 return self
750 # Remove the current extension from the path
751 # .fits.gz counts as one extension do not use os.path.splitext
752 path = self.path
753 if current:
754 path = path.removesuffix(current)
756 # Ensure that we have a leading "." on file extension (and we do not
757 # try to modify the empty string)
758 if ext and not ext.startswith("."):
759 ext = "." + ext
761 return self.replace(path=path + ext, forceDirectory=False)
763 def getExtension(self) -> str:
764 """Return the extension(s) associated with this URI path.
766 Returns
767 -------
768 ext : `str`
769 The file extension (including the ``.``). Can be empty string
770 if there is no file extension. Usually returns only the last
771 file extension unless there is a special extension modifier
772 indicating file compression, in which case the combined
773 extension (e.g. ``.fits.gz``) will be returned.
775 Notes
776 -----
777 Does not distinguish between file and directory URIs when determining
778 a suffix. An extension is only determined from the final component
779 of the path.
780 """
781 special = {".gz", ".bz2", ".xz", ".fz"}
783 # path lib will ignore any "." in directories.
784 # path lib works well:
785 # extensions = self._pathLib(self.path).suffixes
786 # But the constructor is slow. Therefore write our own implementation.
787 # Strip trailing separator if present, do not care if this is a
788 # directory or not.
789 parts = self.path.rstrip("/").rsplit(self._pathModule.sep, 1)
790 _, *extensions = parts[-1].split(".")
792 if not extensions:
793 return ""
794 extensions = ["." + x for x in extensions]
796 ext = extensions.pop()
798 # Multiple extensions, decide whether to include the final two
799 if extensions and ext in special:
800 ext = f"{extensions[-1]}{ext}"
802 return ext
804 def join(
805 self, path: str | ResourcePath, isTemporary: bool | None = None, forceDirectory: bool | None = None
806 ) -> ResourcePath:
807 """Return new `ResourcePath` with additional path components.
809 Parameters
810 ----------
811 path : `str`, `ResourcePath`
812 Additional file components to append to the current URI. Will be
813 quoted depending on the associated URI scheme. If the path looks
814 like a URI referring to an absolute location, it will be returned
815 directly (matching the behavior of `os.path.join`). It can
816 also be a `ResourcePath`. Fragments are propagated.
817 isTemporary : `bool`, optional
818 Indicate that the resulting URI represents a temporary resource.
819 Default is ``self.isTemporary``.
820 forceDirectory : `bool` or `None`, optional
821 If `True` forces the URI to end with a separator. If `False` the
822 resultant URI is declared to refer to a file. `None` indicates
823 that the file directory status is unknown.
825 Returns
826 -------
827 new : `ResourcePath`
828 New URI with the path appended.
830 Notes
831 -----
832 Schemeless URIs assume local path separator but all other URIs assume
833 POSIX separator if the supplied path has directory structure. It
834 may be this never becomes a problem but datastore templates assume
835 POSIX separator is being used.
837 If an absolute `ResourcePath` is given for ``path`` is is assumed that
838 this should be returned directly. Giving a ``path`` of an absolute
839 scheme-less URI is not allowed for safety reasons as it may indicate
840 a mistake in the calling code.
842 It is an error to attempt to join to something that is known to
843 refer to a file. Use `updatedFile` if the file is to be
844 replaced.
846 If an unquoted ``#`` is included in the path it is assumed to be
847 referring to a fragment and not part of the file name.
849 Raises
850 ------
851 ValueError
852 Raised if the given path object refers to a directory but the
853 ``forceDirectory`` parameter insists the outcome should be a file,
854 and vice versa. Also raised if the URI being joined with is known
855 to refer to a file.
856 RuntimeError
857 Raised if this attempts to join a temporary URI to a non-temporary
858 URI.
859 """
860 if self.dirLike is False:
861 raise ValueError("Can not join a new path component to a file.")
862 if isTemporary is None:
863 isTemporary = self.isTemporary
864 elif not isTemporary and self.isTemporary:
865 raise RuntimeError("Cannot join temporary URI to non-temporary URI.")
866 # If we have a full URI in path we will use it directly
867 # but without forcing to absolute so that we can trap the
868 # expected option of relative path.
869 path_uri = ResourcePath(
870 path, forceAbsolute=False, forceDirectory=forceDirectory, isTemporary=isTemporary
871 )
872 if forceDirectory is not None and path_uri.dirLike is not forceDirectory:
873 raise ValueError(
874 "The supplied path URI to join has inconsistent directory state "
875 f"with forceDirectory parameter: {path_uri.dirLike} vs {forceDirectory}"
876 )
877 forceDirectory = path_uri.dirLike
879 if path_uri.isabs():
880 # Absolute URI so return it directly.
881 return path_uri
883 # We want to propagate fragments to the joined path and we rely on
884 # the ResourcePath parser to find these fragments for us even in plain
885 # strings. Must assume there are no `#` characters in filenames.
886 if not isinstance(path, str) or path_uri.fragment:
887 path = path_uri.unquoted_path
889 # Might need to quote the path.
890 if self.quotePaths:
891 path = urllib.parse.quote(path)
893 newpath = self._pathModule.normpath(self._pathModule.join(self.path, path))
895 # normpath can strip trailing / so we force directory if the supplied
896 # path ended with a /
897 has_dir_sep = path.endswith(self._pathModule.sep)
898 if forceDirectory is None and has_dir_sep:
899 forceDirectory = True
900 elif forceDirectory is False and has_dir_sep:
901 raise ValueError("Path to join has trailing / but is being forced to be a file.")
902 return self.replace(
903 path=newpath,
904 forceDirectory=forceDirectory,
905 isTemporary=isTemporary,
906 fragment=path_uri.fragment,
907 query=path_uri.query,
908 params=path_uri.params,
909 )
911 def relative_to(self, other: ResourcePath, walk_up: bool = False) -> str | None:
912 """Return the relative path from this URI to the other URI.
914 Parameters
915 ----------
916 other : `ResourcePath`
917 URI to use to calculate the relative path. Must be a parent
918 of this URI.
919 walk_up : `bool`, optional
920 Control whether "``..``" can be used to resolve a relative path.
921 Default is `False`. Can not be `True` on Python version 3.11.
923 Returns
924 -------
925 subpath : `str`
926 The sub path of this URI relative to the supplied other URI.
927 Returns `None` if there is no parent child relationship.
928 Scheme and netloc must match.
929 """
930 # Scheme-less self is handled elsewhere.
931 if self.scheme != other.scheme:
932 return None
933 if self.netloc != other.netloc:
934 # Special case for localhost vs empty string.
935 # There can be many variants of localhost.
936 local_netlocs = {"", "localhost", "localhost.localdomain", "127.0.0.1"}
937 if not {self.netloc, other.netloc}.issubset(local_netlocs):
938 return None
940 # Rather than trying to guess a failure reason from the TypeError
941 # explicitly check for python 3.11. Doing this will simplify the
942 # rediscovery of a useless python version check when we set a new
943 # minimum version.
944 kwargs = {}
945 if walk_up:
946 if sys.version_info < (3, 12, 0):
947 raise TypeError("walk_up parameter can not be true in python 3.11 and older")
949 kwargs["walk_up"] = True
951 enclosed_path = self._pathLib(self.relativeToPathRoot)
952 parent_path = other.relativeToPathRoot
953 subpath: str | None
954 try:
955 subpath = str(enclosed_path.relative_to(parent_path, **kwargs))
956 except ValueError:
957 subpath = None
958 else:
959 subpath = urllib.parse.unquote(subpath)
960 return subpath
962 def exists(self) -> bool:
963 """Indicate that the resource is available.
965 Returns
966 -------
967 exists : `bool`
968 `True` if the resource exists.
969 """
970 raise NotImplementedError()
972 @classmethod
973 def _group_uris(cls, uris: Iterable[ResourcePath]) -> dict[type[ResourcePath], list[ResourcePath]]:
974 """Group URIs by class/scheme."""
975 grouped: dict[type, list[ResourcePath]] = defaultdict(list)
976 for uri in uris:
977 grouped[uri.__class__].append(uri)
978 return grouped
980 @classmethod
981 def mexists(
982 cls, uris: Iterable[ResourcePath], *, num_workers: int | None = None
983 ) -> dict[ResourcePath, bool]:
984 """Check for existence of multiple URIs at once.
986 Parameters
987 ----------
988 uris : iterable of `ResourcePath`
989 The URIs to test.
990 num_workers : `int` or `None`, optional
991 The number of parallel workers to use when checking for existence
992 If `None`, the default value will be taken from the environment.
993 If this number is higher than the default and a thread pool is
994 used, there may not be enough cached connections available.
996 Returns
997 -------
998 existence : `dict` of [`ResourcePath`, `bool`]
999 Mapping of original URI to boolean indicating existence.
1000 """
1001 existence: dict[ResourcePath, bool] = {}
1002 for uri_class, group in cls._group_uris(uris).items():
1003 existence.update(uri_class._mexists(group, num_workers=num_workers))
1005 return existence
1007 @classmethod
1008 def _mexists(
1009 cls, uris: Iterable[ResourcePath], *, num_workers: int | None = None
1010 ) -> dict[ResourcePath, bool]:
1011 """Check for existence of multiple URIs at once.
1013 Implementation helper method for `mexists`.
1016 Parameters
1017 ----------
1018 uris : iterable of `ResourcePath`
1019 The URIs to test.
1020 num_workers : `int` or `None`, optional
1021 The number of parallel workers to use when checking for existence
1022 If `None`, the default value will be taken from the environment.
1024 Returns
1025 -------
1026 existence : `dict` of [`ResourcePath`, `bool`]
1027 Mapping of original URI to boolean indicating existence.
1028 """
1029 pool_executor_class = _get_executor_class()
1030 if issubclass(pool_executor_class, concurrent.futures.ProcessPoolExecutor):
1031 # Patch the environment to make it think there is only one worker
1032 # for each subprocess.
1033 with _patch_environ({"LSST_RESOURCES_NUM_WORKERS": "1"}):
1034 return cls._mexists_pool(pool_executor_class, uris)
1035 else:
1036 return cls._mexists_pool(pool_executor_class, uris, num_workers=num_workers)
1038 @classmethod
1039 def _mexists_pool(
1040 cls,
1041 pool_executor_class: _EXECUTOR_TYPE,
1042 uris: Iterable[ResourcePath],
1043 *,
1044 num_workers: int | None = None,
1045 ) -> dict[ResourcePath, bool]:
1046 """Check for existence of multiple URIs at once using specified pool
1047 executor.
1049 Implementation helper method for `_mexists`.
1051 Parameters
1052 ----------
1053 pool_executor_class : `type` [ `concurrent.futures.Executor` ]
1054 Type of executor pool to use.
1055 uris : iterable of `ResourcePath`
1056 The URIs to test.
1057 num_workers : `int` or `None`, optional
1058 The number of parallel workers to use when checking for existence
1059 If `None`, the default value will be taken from the environment.
1061 Returns
1062 -------
1063 existence : `dict` of [`ResourcePath`, `bool`]
1064 Mapping of original URI to boolean indicating existence.
1065 """
1066 max_workers = num_workers if num_workers is not None else _get_num_workers()
1067 with pool_executor_class(max_workers=max_workers) as exists_executor:
1068 future_exists = {exists_executor.submit(uri.exists): uri for uri in uris}
1070 results: dict[ResourcePath, bool] = {}
1071 for future in concurrent.futures.as_completed(future_exists):
1072 uri = future_exists[future]
1073 try:
1074 exists = future.result()
1075 except Exception:
1076 exists = False
1077 results[uri] = exists
1078 return results
1080 @classmethod
1081 def mtransfer(
1082 cls,
1083 transfer: str,
1084 from_to: Iterable[tuple[ResourcePath, ResourcePath]],
1085 overwrite: bool = False,
1086 transaction: TransactionProtocol | None = None,
1087 do_raise: bool = True,
1088 ) -> dict[ResourcePath, MBulkResult]:
1089 """Transfer many files in bulk.
1091 Parameters
1092 ----------
1093 transfer : `str`
1094 Mode to use for transferring the resource. Generically there are
1095 many standard options: copy, link, symlink, hardlink, relsymlink.
1096 Not all URIs support all modes.
1097 from_to : `list` [ `tuple` [ `ResourcePath`, `ResourcePath` ] ]
1098 A sequence of the source URIs and the target URIs.
1099 overwrite : `bool`, optional
1100 Allow an existing file to be overwritten. Defaults to `False`.
1101 transaction : `~lsst.resources.utils.TransactionProtocol`, optional
1102 A transaction object that can (depending on implementation)
1103 rollback transfers on error. Not guaranteed to be implemented.
1104 The transaction object must be thread safe.
1105 do_raise : `bool`, optional
1106 If `True` an `ExceptionGroup` will be raised containing any
1107 exceptions raised by the individual transfers. If `False`, or if
1108 there were no exceptions, a dict reporting the status of each
1109 `ResourcePath` will be returned.
1111 Returns
1112 -------
1113 copy_status : `dict` [ `ResourcePath`, `MBulkResult` ]
1114 A dict of all the transfer attempts with a value indicating
1115 whether the transfer succeeded for the target URI. If ``do_raise``
1116 is `True`, this will only be returned if there are no errors.
1117 """
1118 pool_executor_class = _get_executor_class()
1119 if issubclass(pool_executor_class, concurrent.futures.ProcessPoolExecutor):
1120 # Patch the environment to make it think there is only one worker
1121 # for each subprocess.
1122 with _patch_environ({"LSST_RESOURCES_NUM_WORKERS": "1"}):
1123 return cls._mtransfer(
1124 pool_executor_class,
1125 transfer,
1126 from_to,
1127 overwrite=overwrite,
1128 transaction=transaction,
1129 do_raise=do_raise,
1130 )
1131 return cls._mtransfer(
1132 pool_executor_class,
1133 transfer,
1134 from_to,
1135 overwrite=overwrite,
1136 transaction=transaction,
1137 do_raise=do_raise,
1138 )
1140 @classmethod
1141 def _mtransfer(
1142 cls,
1143 pool_executor_class: _EXECUTOR_TYPE,
1144 transfer: str,
1145 from_to: Iterable[tuple[ResourcePath, ResourcePath]],
1146 overwrite: bool = False,
1147 transaction: TransactionProtocol | None = None,
1148 do_raise: bool = True,
1149 ) -> dict[ResourcePath, MBulkResult]:
1150 """Transfer many files in bulk.
1152 Parameters
1153 ----------
1154 transfer : `str`
1155 Mode to use for transferring the resource. Generically there are
1156 many standard options: copy, link, symlink, hardlink, relsymlink.
1157 Not all URIs support all modes.
1158 from_to : `list` [ `tuple` [ `ResourcePath`, `ResourcePath` ] ]
1159 A sequence of the source URIs and the target URIs.
1160 overwrite : `bool`, optional
1161 Allow an existing file to be overwritten. Defaults to `False`.
1162 transaction : `~lsst.resources.utils.TransactionProtocol`, optional
1163 A transaction object that can (depending on implementation)
1164 rollback transfers on error. Not guaranteed to be implemented.
1165 The transaction object must be thread safe.
1166 do_raise : `bool`, optional
1167 If `True` an `ExceptionGroup` will be raised containing any
1168 exceptions raised by the individual transfers. Else a dict
1169 reporting the status of each `ResourcePath` will be returned.
1171 Returns
1172 -------
1173 copy_status : `dict` [ `ResourcePath`, `MBulkResult` ]
1174 A dict of all the transfer attempts with a value indicating
1175 whether the transfer succeeded for the target URI.
1176 """
1177 with pool_executor_class(max_workers=_get_num_workers()) as transfer_executor:
1178 future_transfers = {
1179 transfer_executor.submit(
1180 to_uri.transfer_from,
1181 from_uri,
1182 transfer=transfer,
1183 overwrite=overwrite,
1184 transaction=transaction,
1185 multithreaded=False,
1186 ): to_uri
1187 for from_uri, to_uri in from_to
1188 }
1189 results: dict[ResourcePath, MBulkResult] = {}
1190 failed = False
1191 for future in concurrent.futures.as_completed(future_transfers):
1192 to_uri = future_transfers[future]
1193 try:
1194 future.result()
1195 except Exception as e:
1196 transferred = MBulkResult(False, e)
1197 failed = True
1198 else:
1199 transferred = MBulkResult(True, None)
1200 results[to_uri] = transferred
1202 if do_raise and failed:
1203 raise ExceptionGroup(
1204 f"Errors transferring {len(results)} artifacts",
1205 tuple(res.exception for res in results.values() if res.exception is not None),
1206 )
1208 return results
1210 def remove(self) -> None:
1211 """Remove the resource."""
1212 raise NotImplementedError()
1214 @classmethod
1215 def mremove(
1216 cls, uris: Iterable[ResourcePath], *, do_raise: bool = True
1217 ) -> dict[ResourcePath, MBulkResult]:
1218 """Remove multiple URIs at once.
1220 Parameters
1221 ----------
1222 uris : iterable of `ResourcePath`
1223 URIs to remove.
1224 do_raise : `bool`, optional
1225 If `True` an `ExceptionGroup` will be raised containing any
1226 exceptions raised by the individual transfers. If `False`, or if
1227 there were no exceptions, a dict reporting the status of each
1228 `ResourcePath` will be returned.
1230 Returns
1231 -------
1232 results : `dict` [ `ResourcePath`, `MBulkResult` ]
1233 Dictionary mapping each URI to a result object indicating whether
1234 the removal succeeded or resulted in an exception. If ``do_raise``
1235 is `True` this will only be returned if everything succeeded.
1236 """
1237 # Group URIs by scheme since some URI schemes support native bulk
1238 # APIs.
1239 results: dict[ResourcePath, MBulkResult] = {}
1240 for uri_class, group in cls._group_uris(uris).items():
1241 results.update(uri_class._mremove(group))
1242 if do_raise:
1243 failed = any(not r.success for r in results.values())
1244 if failed:
1245 s = "s" if len(results) != 1 else ""
1246 raise ExceptionGroup(
1247 f"Error{s} removing {len(results)} artifact{s}",
1248 tuple(res.exception for res in results.values() if res.exception is not None),
1249 )
1251 return results
1253 @classmethod
1254 def _mremove(cls, uris: Iterable[ResourcePath]) -> dict[ResourcePath, MBulkResult]:
1255 """Remove multiple URIs using futures."""
1256 pool_executor_class = _get_executor_class()
1257 if issubclass(pool_executor_class, concurrent.futures.ProcessPoolExecutor):
1258 # Patch the environment to make it think there is only one worker
1259 # for each subprocess.
1260 with _patch_environ({"LSST_RESOURCES_NUM_WORKERS": "1"}):
1261 return cls._mremove_pool(pool_executor_class, uris)
1262 else:
1263 return cls._mremove_pool(pool_executor_class, uris)
1265 @classmethod
1266 def _mremove_pool(
1267 cls,
1268 pool_executor_class: _EXECUTOR_TYPE,
1269 uris: Iterable[ResourcePath],
1270 *,
1271 num_workers: int | None = None,
1272 ) -> dict[ResourcePath, MBulkResult]:
1273 """Remove URIs using a futures pool."""
1274 max_workers = num_workers if num_workers is not None else _get_num_workers()
1275 results: dict[ResourcePath, MBulkResult] = {}
1276 with pool_executor_class(max_workers=max_workers) as remove_executor:
1277 future_remove = {remove_executor.submit(uri.remove): uri for uri in uris}
1278 for future in concurrent.futures.as_completed(future_remove):
1279 try:
1280 future.result()
1281 except Exception as e:
1282 removed = MBulkResult(False, e)
1283 else:
1284 removed = MBulkResult(True, None)
1285 uri = future_remove[future]
1286 results[uri] = removed
1287 return results
1289 def isabs(self) -> bool:
1290 """Indicate that the resource is fully specified.
1292 For non-schemeless URIs this is always true.
1294 Returns
1295 -------
1296 isabs : `bool`
1297 `True` in all cases except schemeless URI.
1298 """
1299 return True
1301 def abspath(self) -> ResourcePath:
1302 """Return URI using an absolute path.
1304 Returns
1305 -------
1306 abs : `ResourcePath`
1307 Absolute URI. For non-schemeless URIs this always returns itself.
1308 Schemeless URIs are upgraded to file URIs.
1309 """
1310 return self
1312 @contextlib.contextmanager
1313 def _as_local(
1314 self, multithreaded: bool = True, tmpdir: ResourcePath | None = None
1315 ) -> Iterator[ResourcePath]:
1316 """Return the location of the (possibly remote) resource as local file.
1318 This is a helper function for `as_local` context manager.
1320 Parameters
1321 ----------
1322 multithreaded : `bool`, optional
1323 If `True` the transfer will be allowed to attempt to improve
1324 throughput by using parallel download streams. This may of no
1325 effect if the URI scheme does not support parallel streams or
1326 if a global override has been applied. If `False` parallel
1327 streams will be disabled.
1328 tmpdir : `ResourcePath` or `None`, optional
1329 Explicit override of the temporary directory to use for remote
1330 downloads.
1332 Returns
1333 -------
1334 local_uri : `ResourcePath`
1335 A URI to a local POSIX file. This can either be the same resource
1336 or a local downloaded copy of the resource.
1337 """
1338 raise NotImplementedError()
1340 @contextlib.contextmanager
1341 def as_local(
1342 self, multithreaded: bool = True, tmpdir: ResourcePathExpression | None = None
1343 ) -> Iterator[ResourcePath]:
1344 """Return the location of the (possibly remote) resource as local file.
1346 Parameters
1347 ----------
1348 multithreaded : `bool`, optional
1349 If `True` the transfer will be allowed to attempt to improve
1350 throughput by using parallel download streams. This may of no
1351 effect if the URI scheme does not support parallel streams or
1352 if a global override has been applied. If `False` parallel
1353 streams will be disabled.
1354 tmpdir : `lsst.resources.ResourcePathExpression` or `None`, optional
1355 Explicit override of the temporary directory to use for remote
1356 downloads. This directory must be a local POSIX directory and
1357 must exist.
1359 Yields
1360 ------
1361 local : `ResourcePath`
1362 If this is a remote resource, it will be a copy of the resource
1363 on the local file system, probably in a temporary directory.
1364 For a local resource this should be the actual path to the
1365 resource.
1367 Notes
1368 -----
1369 The context manager will automatically delete any local temporary
1370 file.
1372 Examples
1373 --------
1374 Should be used as a context manager:
1376 .. code-block:: py
1378 with uri.as_local() as local:
1379 ospath = local.ospath
1380 """
1381 if self.isdir():
1382 raise IsADirectoryError(f"Directory-like URI {self} cannot be fetched as local.")
1383 temp_dir = ResourcePath(tmpdir, forceDirectory=True) if tmpdir is not None else None
1384 if temp_dir is not None and not temp_dir.isLocal:
1385 raise ValueError(f"Temporary directory for as_local must be local resource not {temp_dir}")
1386 with self._as_local(multithreaded=multithreaded, tmpdir=temp_dir) as local_uri:
1387 yield local_uri
1389 @classmethod
1390 @contextlib.contextmanager
1391 def temporary_uri(
1392 cls,
1393 prefix: ResourcePath | None = None,
1394 suffix: str | None = None,
1395 delete: bool = True,
1396 ) -> Iterator[ResourcePath]:
1397 """Create a temporary file-like URI.
1399 Parameters
1400 ----------
1401 prefix : `ResourcePath`, optional
1402 Temporary directory to use (can be any scheme). Without this the
1403 path will be formed as a local file URI in a temporary directory
1404 obtained from `lsst.resources.utils.get_tempdir`. Ensuring that the
1405 prefix location exists is the responsibility of the caller.
1406 suffix : `str`, optional
1407 A file suffix to be used. The ``.`` should be included in this
1408 suffix.
1409 delete : `bool`, optional
1410 By default the resource will be deleted when the context manager
1411 is exited. Setting this flag to `False` will leave the resource
1412 alone.
1414 Yields
1415 ------
1416 uri : `ResourcePath`
1417 The temporary URI. Will be removed when the context is completed.
1418 """
1419 if prefix is None:
1420 prefix = ResourcePath(get_tempdir(), forceDirectory=True)
1422 # Need to create a randomized file name. For consistency do not
1423 # use mkstemp for local and something else for remote. Additionally
1424 # this method does not create the file to prevent name clashes.
1425 characters = "abcdefghijklmnopqrstuvwxyz0123456789_"
1426 rng = Random()
1427 tempname = "".join(rng.choice(characters) for _ in range(16))
1428 if suffix:
1429 tempname += suffix
1430 temporary_uri = prefix.join(tempname, isTemporary=True)
1431 if temporary_uri.isdir():
1432 # If we had a safe way to clean up a remote temporary directory, we
1433 # could support this.
1434 raise NotImplementedError("temporary_uri cannot be used to create a temporary directory.")
1435 try:
1436 yield temporary_uri
1437 finally:
1438 if delete:
1439 with contextlib.suppress(FileNotFoundError):
1440 # It's okay if this does not work because the user
1441 # removed the file.
1442 temporary_uri.remove()
1444 def read(self, size: int = -1) -> bytes:
1445 """Open the resource and return the contents in bytes.
1447 Parameters
1448 ----------
1449 size : `int`, optional
1450 The number of bytes to read. Negative or omitted indicates
1451 that all data should be read.
1452 """
1453 raise NotImplementedError()
1455 def write(self, data: bytes, overwrite: bool = True) -> None:
1456 """Write the supplied bytes to the new resource.
1458 Parameters
1459 ----------
1460 data : `bytes`
1461 The bytes to write to the resource. The entire contents of the
1462 resource will be replaced.
1463 overwrite : `bool`, optional
1464 If `True` the resource will be overwritten if it exists. Otherwise
1465 the write will fail.
1466 """
1467 raise NotImplementedError()
1469 def mkdir(self) -> None:
1470 """For a dir-like URI, create the directory resource if needed."""
1471 raise NotImplementedError()
1473 def isdir(self) -> bool:
1474 """Return True if this URI looks like a directory, else False."""
1475 return bool(self.dirLike)
1477 def size(self) -> int:
1478 """For non-dir-like URI, return the size of the resource.
1480 Returns
1481 -------
1482 sz : `int`
1483 The size in bytes of the resource associated with this URI.
1484 Returns 0 if dir-like.
1485 """
1486 raise NotImplementedError()
1488 def __str__(self) -> str:
1489 """Convert the URI to its native string form."""
1490 return self.geturl()
1492 def __repr__(self) -> str:
1493 """Return string representation suitable for evaluation."""
1494 return f'ResourcePath("{self.geturl()}")'
1496 def __eq__(self, other: Any) -> bool:
1497 """Compare supplied object with this `ResourcePath`."""
1498 if not isinstance(other, ResourcePath):
1499 return NotImplemented
1500 return self.geturl() == other.geturl()
1502 def __hash__(self) -> int:
1503 """Return hash of this object."""
1504 return hash(str(self))
1506 def __lt__(self, other: ResourcePath) -> bool:
1507 return self.geturl() < other.geturl()
1509 def __le__(self, other: ResourcePath) -> bool:
1510 return self.geturl() <= other.geturl()
1512 def __gt__(self, other: ResourcePath) -> bool:
1513 return self.geturl() > other.geturl()
1515 def __ge__(self, other: ResourcePath) -> bool:
1516 return self.geturl() >= other.geturl()
1518 def __copy__(self) -> ResourcePath:
1519 """Copy constructor.
1521 Object is immutable so copy can return itself.
1522 """
1523 # Implement here because the __new__ method confuses things
1524 return self
1526 def __deepcopy__(self, memo: Any) -> ResourcePath:
1527 """Deepcopy the object.
1529 Object is immutable so copy can return itself.
1530 """
1531 # Implement here because the __new__ method confuses things
1532 return self
1534 def __getnewargs__(self) -> tuple:
1535 """Support pickling."""
1536 return (str(self),)
1538 @classmethod
1539 def _fixDirectorySep(
1540 cls, parsed: urllib.parse.ParseResult, forceDirectory: bool | None = None
1541 ) -> tuple[urllib.parse.ParseResult, bool | None]:
1542 """Ensure that a path separator is present on directory paths.
1544 Parameters
1545 ----------
1546 parsed : `~urllib.parse.ParseResult`
1547 The result from parsing a URI using `urllib.parse`.
1548 forceDirectory : `bool` or `None`, optional
1549 If `True` forces the URI to end with a separator, otherwise given
1550 URI is interpreted as is. Specifying that the URI is conceptually
1551 equivalent to a directory can break some ambiguities when
1552 interpreting the last element of a path.
1554 Returns
1555 -------
1556 modified : `~urllib.parse.ParseResult`
1557 Update result if a URI is being handled.
1558 dirLike : `bool` or `None`
1559 `True` if given parsed URI has a trailing separator or
1560 ``forceDirectory`` is `True`. Otherwise returns the given value of
1561 ``forceDirectory``.
1562 """
1563 # Assume the forceDirectory flag can give us a clue.
1564 dirLike = forceDirectory
1566 # Directory separator
1567 sep = cls._pathModule.sep
1569 # URI is dir-like if explicitly stated or if it ends on a separator
1570 endsOnSep = parsed.path.endswith(sep)
1572 if forceDirectory is False and endsOnSep:
1573 raise ValueError(
1574 f"URI {parsed.geturl()} ends with {sep} but "
1575 "forceDirectory parameter declares it to be a file."
1576 )
1578 if forceDirectory or endsOnSep:
1579 dirLike = True
1580 # only add the separator if it's not already there
1581 if not endsOnSep:
1582 parsed = parsed._replace(path=parsed.path + sep)
1584 return parsed, dirLike
1586 @classmethod
1587 def _fixupPathUri(
1588 cls,
1589 parsed: urllib.parse.ParseResult,
1590 root: ResourcePath | None = None,
1591 forceAbsolute: bool = False,
1592 forceDirectory: bool | None = None,
1593 ) -> tuple[urllib.parse.ParseResult, bool | None]:
1594 """Correct any issues with the supplied URI.
1596 Parameters
1597 ----------
1598 parsed : `~urllib.parse.ParseResult`
1599 The result from parsing a URI using `urllib.parse`.
1600 root : `ResourcePath`, ignored
1601 Not used by the this implementation since all URIs are
1602 absolute except for those representing the local file system.
1603 forceAbsolute : `bool`, ignored.
1604 Not used by this implementation. URIs are generally always
1605 absolute.
1606 forceDirectory : `bool` or `None`, optional
1607 If `True` forces the URI to end with a separator, otherwise given
1608 URI is interpreted as is. Specifying that the URI is conceptually
1609 equivalent to a directory can break some ambiguities when
1610 interpreting the last element of a path.
1612 Returns
1613 -------
1614 modified : `~urllib.parse.ParseResult`
1615 Update result if a URI is being handled.
1616 dirLike : `bool`
1617 `True` if given parsed URI has a trailing separator or
1618 ``forceDirectory`` is `True`. Otherwise returns the given value
1619 of ``forceDirectory``.
1621 Notes
1622 -----
1623 Relative paths are explicitly not supported by RFC8089 but `urllib`
1624 does accept URIs of the form ``file:relative/path.ext``. They need
1625 to be turned into absolute paths before they can be used. This is
1626 always done regardless of the ``forceAbsolute`` parameter.
1628 AWS S3 differentiates between keys with trailing POSIX separators (i.e
1629 ``/dir`` and ``/dir/``) whereas POSIX does not necessarily.
1631 Scheme-less paths are normalized.
1632 """
1633 return cls._fixDirectorySep(parsed, forceDirectory)
1635 def transfer_from(
1636 self,
1637 src: ResourcePath,
1638 transfer: str,
1639 overwrite: bool = False,
1640 transaction: TransactionProtocol | None = None,
1641 multithreaded: bool = True,
1642 ) -> None:
1643 """Transfer to this URI from another.
1645 Parameters
1646 ----------
1647 src : `ResourcePath`
1648 Source URI.
1649 transfer : `str`
1650 Mode to use for transferring the resource. Generically there are
1651 many standard options: copy, link, symlink, hardlink, relsymlink.
1652 Not all URIs support all modes.
1653 overwrite : `bool`, optional
1654 Allow an existing file to be overwritten. Defaults to `False`.
1655 transaction : `~lsst.resources.utils.TransactionProtocol`, optional
1656 A transaction object that can (depending on implementation)
1657 rollback transfers on error. Not guaranteed to be implemented.
1658 multithreaded : `bool`, optional
1659 If `True` the transfer will be allowed to attempt to improve
1660 throughput by using parallel download streams. This may of no
1661 effect if the URI scheme does not support parallel streams or
1662 if a global override has been applied. If `False` parallel
1663 streams will be disabled.
1665 Notes
1666 -----
1667 Conceptually this is hard to scale as the number of URI schemes
1668 grow. The destination URI is more important than the source URI
1669 since that is where all the transfer modes are relevant (with the
1670 complication that "move" deletes the source).
1672 Local file to local file is the fundamental use case but every
1673 other scheme has to support "copy" to local file (with implicit
1674 support for "move") and copy from local file.
1675 All the "link" options tend to be specific to local file systems.
1677 "move" is a "copy" where the remote resource is deleted at the end.
1678 Whether this works depends on the source URI rather than the
1679 destination URI. Reverting a move on transaction rollback is
1680 expected to be problematic if a remote resource was involved.
1681 """
1682 raise NotImplementedError(f"No transfer modes supported by URI scheme {self.scheme}")
1684 def walk(
1685 self, file_filter: str | re.Pattern | None = None
1686 ) -> Iterator[list | tuple[ResourcePath, list[str], list[str]]]:
1687 """Walk the directory tree returning matching files and directories.
1689 Parameters
1690 ----------
1691 file_filter : `str` or `re.Pattern`, optional
1692 Regex to filter out files from the list before it is returned.
1694 Yields
1695 ------
1696 dirpath : `ResourcePath`
1697 Current directory being examined.
1698 dirnames : `list` of `str`
1699 Names of subdirectories within dirpath.
1700 filenames : `list` of `str`
1701 Names of all the files within dirpath.
1702 """
1703 raise NotImplementedError()
1705 @overload
1706 @classmethod
1707 def findFileResources( 1707 ↛ exitline 1707 didn't return from function 'findFileResources' because
1708 cls,
1709 candidates: Iterable[ResourcePathExpression],
1710 file_filter: str | re.Pattern | None,
1711 grouped: Literal[True],
1712 ) -> Iterator[Iterator[ResourcePath]]: ...
1714 @overload
1715 @classmethod
1716 def findFileResources( 1716 ↛ exitline 1716 didn't return from function 'findFileResources' because
1717 cls,
1718 candidates: Iterable[ResourcePathExpression],
1719 *,
1720 grouped: Literal[True],
1721 ) -> Iterator[Iterator[ResourcePath]]: ...
1723 @overload
1724 @classmethod
1725 def findFileResources( 1725 ↛ exitline 1725 didn't return from function 'findFileResources' because
1726 cls,
1727 candidates: Iterable[ResourcePathExpression],
1728 file_filter: str | re.Pattern | None = None,
1729 grouped: Literal[False] = False,
1730 ) -> Iterator[ResourcePath]: ...
1732 @classmethod
1733 def findFileResources(
1734 cls,
1735 candidates: Iterable[ResourcePathExpression],
1736 file_filter: str | re.Pattern | None = None,
1737 grouped: bool = False,
1738 ) -> Iterator[ResourcePath | Iterator[ResourcePath]]:
1739 """Get all the files from a list of values.
1741 Parameters
1742 ----------
1743 candidates : iterable [`str` or `ResourcePath`]
1744 The files to return and directories in which to look for files to
1745 return.
1746 file_filter : `str` or `re.Pattern`, optional
1747 The regex to use when searching for files within directories.
1748 By default returns all the found files.
1749 grouped : `bool`, optional
1750 If `True` the results will be grouped by directory and each
1751 yielded value will be an iterator over URIs. If `False` each
1752 URI will be returned separately.
1754 Yields
1755 ------
1756 found_file: `ResourcePath`
1757 The passed-in URIs and URIs found in passed-in directories.
1758 If grouping is enabled, each of the yielded values will be an
1759 iterator yielding members of the group. Files given explicitly
1760 will be returned as a single group at the end.
1762 Notes
1763 -----
1764 If a value is a file it is yielded immediately without checking that it
1765 exists. If a value is a directory, all the files in the directory
1766 (recursively) that match the regex will be yielded in turn.
1767 """
1768 fileRegex = None if file_filter is None else re.compile(file_filter)
1770 singles = []
1772 # Find all the files of interest
1773 for location in candidates:
1774 uri = ResourcePath(location)
1775 if uri.isdir():
1776 for found in uri.walk(fileRegex):
1777 if not found:
1778 # This means the uri does not exist and by
1779 # convention we ignore it
1780 continue
1781 root, dirs, files = found
1782 if not files:
1783 continue
1784 if grouped:
1785 yield (root.join(name) for name in files)
1786 else:
1787 for name in files:
1788 yield root.join(name)
1789 else:
1790 if grouped:
1791 singles.append(uri)
1792 else:
1793 yield uri
1795 # Finally, return any explicitly given files in one group
1796 if grouped and singles:
1797 yield iter(singles)
1799 @contextlib.contextmanager
1800 def open(
1801 self,
1802 mode: str = "r",
1803 *,
1804 encoding: str | None = None,
1805 prefer_file_temporary: bool = False,
1806 ) -> Iterator[ResourceHandleProtocol]:
1807 """Return a context manager that wraps an object that behaves like an
1808 open file at the location of the URI.
1810 Parameters
1811 ----------
1812 mode : `str`
1813 String indicating the mode in which to open the file. Values are
1814 the same as those accepted by `open`, though intrinsically
1815 read-only URI types may only support read modes, and
1816 `io.IOBase.seekable` is not guaranteed to be `True` on the returned
1817 object.
1818 encoding : `str`, optional
1819 Unicode encoding for text IO; ignored for binary IO. Defaults to
1820 ``locale.getpreferredencoding(False)``, just as `open`
1821 does.
1822 prefer_file_temporary : `bool`, optional
1823 If `True`, for implementations that require transfers from a remote
1824 system to temporary local storage and/or back, use a temporary file
1825 instead of an in-memory buffer; this is generally slower, but it
1826 may be necessary to avoid excessive memory usage by large files.
1827 Ignored by implementations that do not require a temporary.
1829 Yields
1830 ------
1831 cm : `~contextlib.AbstractContextManager`
1832 A context manager that wraps a `ResourceHandleProtocol` file-like
1833 object.
1835 Notes
1836 -----
1837 The default implementation of this method uses a local temporary buffer
1838 (in-memory or file, depending on ``prefer_file_temporary``) with calls
1839 to `read`, `write`, `as_local`, and `transfer_from` as necessary to
1840 read and write from/to remote systems. Remote writes thus occur only
1841 when the context manager is exited. `ResourcePath` implementations
1842 that can return a more efficient native buffer should do so whenever
1843 possible (as is guaranteed for local files). `ResourcePath`
1844 implementations for which `as_local` does not return a temporary are
1845 required to reimplement `open`, though they may delegate to `super`
1846 when ``prefer_file_temporary`` is `False`.
1847 """
1848 if self.isdir():
1849 raise IsADirectoryError(f"Directory-like URI {self} cannot be opened.")
1850 if "x" in mode and self.exists():
1851 raise FileExistsError(f"File at {self} already exists.")
1852 if prefer_file_temporary:
1853 if "r" in mode or "a" in mode:
1854 local_cm = self.as_local()
1855 else:
1856 local_cm = self.temporary_uri(suffix=self.getExtension())
1857 with local_cm as local_uri:
1858 assert local_uri.isTemporary, (
1859 "ResourcePath implementations for which as_local is not "
1860 "a temporary must reimplement `open`."
1861 )
1862 with open(local_uri.ospath, mode=mode, encoding=encoding) as file_buffer:
1863 if "a" in mode:
1864 file_buffer.seek(0, io.SEEK_END)
1865 yield file_buffer
1866 if "r" not in mode or "+" in mode:
1867 self.transfer_from(local_uri, transfer="copy", overwrite=("x" not in mode))
1868 else:
1869 with self._openImpl(mode, encoding=encoding) as handle:
1870 yield handle
1872 @contextlib.contextmanager
1873 def _openImpl(self, mode: str = "r", *, encoding: str | None = None) -> Iterator[ResourceHandleProtocol]:
1874 """Implement opening of a resource handle.
1876 This private method may be overridden by specific `ResourcePath`
1877 implementations to provide a customized handle like interface.
1879 Parameters
1880 ----------
1881 mode : `str`
1882 The mode the handle should be opened with
1883 encoding : `str`, optional
1884 The byte encoding of any binary text
1886 Yields
1887 ------
1888 handle : `~._resourceHandles.BaseResourceHandle`
1889 A handle that conforms to the
1890 `~._resourceHandles.BaseResourceHandle` interface
1892 Notes
1893 -----
1894 The base implementation of a file handle reads in a files entire
1895 contents into a buffer for manipulation, and then writes it back out
1896 upon close. Subclasses of this class may offer more fine grained
1897 control.
1898 """
1899 in_bytes = self.read() if "r" in mode or "a" in mode else b""
1900 if "b" in mode:
1901 bytes_buffer = io.BytesIO(in_bytes)
1902 bytes_buffer.name = str(self)
1903 if "a" in mode:
1904 bytes_buffer.seek(0, io.SEEK_END)
1905 yield bytes_buffer
1906 out_bytes = bytes_buffer.getvalue()
1907 else:
1908 if encoding is None:
1909 encoding = locale.getpreferredencoding(False)
1910 str_buffer = io.StringIO(in_bytes.decode(encoding))
1911 str_buffer.name = str(self)
1912 if "a" in mode:
1913 str_buffer.seek(0, io.SEEK_END)
1914 yield str_buffer
1915 out_bytes = str_buffer.getvalue().encode(encoding)
1916 if "r" not in mode or "+" in mode:
1917 self.write(out_bytes, overwrite=("x" not in mode))
1919 def generate_presigned_get_url(self, *, expiration_time_seconds: int) -> str:
1920 """Return a pre-signed URL that can be used to retrieve this resource
1921 using an HTTP GET without supplying any access credentials.
1923 Parameters
1924 ----------
1925 expiration_time_seconds : `int`
1926 Number of seconds until the generated URL is no longer valid.
1928 Returns
1929 -------
1930 url : `str`
1931 HTTP URL signed for GET.
1932 """
1933 raise NotImplementedError(f"URL signing is not supported for '{self.scheme}'")
1935 def generate_presigned_put_url(self, *, expiration_time_seconds: int) -> str:
1936 """Return a pre-signed URL that can be used to upload a file to this
1937 path using an HTTP PUT without supplying any access credentials.
1939 Parameters
1940 ----------
1941 expiration_time_seconds : `int`
1942 Number of seconds until the generated URL is no longer valid.
1944 Returns
1945 -------
1946 url : `str`
1947 HTTP URL signed for PUT.
1948 """
1949 raise NotImplementedError(f"URL signing is not supported for '{self.scheme}'")
1951 def _copy_extra_attributes(self, original_uri: ResourcePath) -> None:
1952 # May be overridden by subclasses to transfer attributes when a
1953 # ResourcePath is constructed using the "clone" version of the
1954 # ResourcePath constructor by passing in a ResourcePath object.
1955 pass
1957 def get_info(self) -> ResourceInfo:
1958 """Return lightweight metadata about this resource.
1960 Returns
1961 -------
1962 info : `ResourceInfo`
1963 The information about this resource that can be obtained from
1964 the backend. Will not read the file contents.
1965 """
1966 raise NotImplementedError("")
1969ResourcePathExpression = str | urllib.parse.ParseResult | ResourcePath | Path
1970"""Type-annotation alias for objects that can be coerced to ResourcePath.
1971"""