Coverage for python / lsst / resources / file.py: 0%
245 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 08:38 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 08:38 +0000
1# This file is part of lsst-resources.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14__all__ = ("FileResourcePath",)
16import contextlib
17import copy
18import datetime
19import logging
20import os
21import os.path
22import posixpath
23import re
24import shutil
25import stat
26import urllib.parse
27from collections.abc import Iterator
28from pathlib import Path
29from typing import IO, TYPE_CHECKING
31from ._resourceHandles._fileResourceHandle import FileResourceHandle
32from ._resourcePath import ResourceInfo, ResourcePath
33from .utils import NoTransaction, ensure_directory_is_writeable, os2posix, posix2os
35try:
36 import fsspec
37 from fsspec.spec import AbstractFileSystem
38except ImportError:
39 fsspec = None
40 AbstractFileSystem = type
42if TYPE_CHECKING:
43 from importlib.resources.abc import Traversable
45 from .utils import TransactionProtocol
48log = logging.getLogger(__name__)
51def _path_to_info(uri: str, path: str | Path | Traversable) -> ResourceInfo | None:
52 """Given a path to a local file, return a `ResourceInfo`."""
53 if isinstance(path, Path):
54 stat_result = path.stat()
55 elif isinstance(path, str):
56 stat_result = os.stat(path)
57 elif (stat_method := getattr(path, "stat", None)) and callable(stat_method):
58 # Edge case triggered by importlib.resources.
59 stat_result = stat_method()
60 if not isinstance(stat_result, os.stat_result):
61 raise RuntimeError(f"Unexpected stat result from {path}.stat()")
62 else:
63 return None
65 return ResourceInfo(
66 uri=uri,
67 is_file=not stat.S_ISDIR(stat_result.st_mode),
68 size=0 if stat.S_ISDIR(stat_result.st_mode) else stat_result.st_size,
69 last_modified=datetime.datetime.fromtimestamp(stat_result.st_mtime, tz=datetime.UTC),
70 checksums={},
71 )
74class FileResourcePath(ResourcePath):
75 """Path for explicit ``file`` URI scheme."""
77 transferModes = ("copy", "link", "symlink", "hardlink", "relsymlink", "auto", "move")
78 transferDefault: str = "link"
80 # By definition refers to a local file
81 isLocal = True
83 @property
84 def ospath(self) -> str:
85 """Path component of the URI localized to current OS.
87 Will unquote URI path since a formal URI must include the quoting.
88 """
89 return urllib.parse.unquote(posix2os(self._uri.path))
91 def exists(self) -> bool:
92 """Indicate that the file exists."""
93 # Uses os.path.exists so if there is a soft link that points
94 # to a file that no longer exists this will return False
95 return os.path.exists(self.ospath)
97 def size(self) -> int:
98 """Return the size of the file in bytes."""
99 if not os.path.isdir(self.ospath):
100 stat = os.stat(self.ospath)
101 sz = stat.st_size
102 else:
103 sz = 0
104 return sz
106 def get_info(self) -> ResourceInfo:
107 """Return lightweight metadata about this file."""
108 info = _path_to_info(str(self), self.ospath)
109 if info is None:
110 raise RuntimeError(f"Unexpected internal failure obtaining file info for {self}")
111 return info
113 def remove(self) -> None:
114 """Remove the resource."""
115 os.remove(self.ospath)
117 @contextlib.contextmanager
118 def _as_local(
119 self, multithreaded: bool = True, tmpdir: ResourcePath | None = None
120 ) -> Iterator[ResourcePath]:
121 """Return the local path of the file.
123 This is an internal helper for ``as_local()``.
125 Parameters
126 ----------
127 multithreaded : `bool`, optional
128 Unused.
129 tmpdir : `ResourcePath` or `None`, optional
130 Unused.
132 Returns
133 -------
134 local_uri : `ResourcePath`
135 A local URI. In this case it will be itself.
136 """
137 yield self
139 def read(self, size: int = -1) -> bytes:
140 with open(self.ospath, "rb") as fh:
141 return fh.read(size)
143 def write(self, data: bytes, overwrite: bool = True) -> None:
144 dir = os.path.dirname(self.ospath)
145 if dir and not os.path.exists(dir):
146 _create_directories(dir)
147 mode = "wb" if overwrite else "xb"
148 with open(self.ospath, mode) as f:
149 f.write(data)
151 def mkdir(self) -> None:
152 """Make the directory associated with this URI.
154 An attempt will be made to create the directory even if the URI
155 looks like a file.
157 Raises
158 ------
159 NotADirectoryError:
160 Raised if a non-directory already exists.
161 """
162 try:
163 _create_directories(self.ospath)
164 except FileExistsError:
165 raise NotADirectoryError(f"{self.ospath} exists but is not a directory.") from None
167 def isdir(self) -> bool:
168 """Return whether this URI is a directory.
170 Returns
171 -------
172 isdir : `bool`
173 `True` if this URI is a directory or looks like a directory,
174 else `False`.
175 """
176 if self.dirLike is None:
177 # Cache state for next time.
178 self.dirLike = os.path.isdir(self.ospath)
179 return self.dirLike
181 def transfer_from(
182 self,
183 src: ResourcePath,
184 transfer: str,
185 overwrite: bool = False,
186 transaction: TransactionProtocol | None = None,
187 multithreaded: bool = True,
188 ) -> None:
189 """Transfer the current resource to a local file.
191 Parameters
192 ----------
193 src : `ResourcePath`
194 Source URI.
195 transfer : `str`
196 Mode to use for transferring the resource. Supports the following
197 options: copy, link, symlink, hardlink, relsymlink.
198 overwrite : `bool`, optional
199 Allow an existing file to be overwritten. Defaults to `False`.
200 transaction : `~lsst.resources.utils.TransactionProtocol`, optional
201 If a transaction is provided, undo actions will be registered.
202 multithreaded : `bool`, optional
203 Whether threads are allowed to be used or not.
204 """
205 # Fail early to prevent delays if remote resources are requested
206 if transfer not in self.transferModes:
207 raise ValueError(f"Transfer mode '{transfer}' not supported by URI scheme {self.scheme}")
209 # Existence checks can take time so only try if the log message
210 # will be issued.
211 if log.isEnabledFor(logging.DEBUG):
212 log.debug(
213 "Transferring %s [exists: %s] -> %s [exists: %s] (transfer=%s)",
214 src,
215 src.exists(),
216 self,
217 self.exists(),
218 transfer,
219 )
221 # The output location should not exist unless overwrite=True.
222 # Rather than use `exists()`, use os.stat since we might need
223 # the full answer later.
224 dest_stat: os.stat_result | None
225 try:
226 # Do not read through links of the file itself.
227 dest_stat = os.lstat(self.ospath)
228 except FileNotFoundError:
229 dest_stat = None
231 # It is possible that the source URI and target URI refer
232 # to the same file. This can happen for a number of reasons
233 # (such as soft links in the path, or they really are the same).
234 # In that case log a message and return as if the transfer
235 # completed (it technically did). A temporary file download
236 # can't be the same so the test can be skipped.
237 if dest_stat and src.isLocal and not src.isTemporary:
238 # Be consistent and use lstat here (even though realpath
239 # has been called). It does not harm.
240 local_src_stat = os.lstat(src.ospath)
241 if dest_stat.st_ino == local_src_stat.st_ino and dest_stat.st_dev == local_src_stat.st_dev:
242 log.debug(
243 "Destination URI %s is the same file as source URI %s, returning immediately."
244 " No further action required.",
245 self,
246 src,
247 )
248 return
250 if not overwrite and dest_stat:
251 raise FileExistsError(
252 f"Destination path '{self}' already exists. Transfer from {src} cannot be completed."
253 )
255 # Make the destination path absolute (but don't follow links since
256 # that would possibly cause us to end up in the wrong place if the
257 # file existed already as a soft link)
258 newFullPath = os.path.abspath(self.ospath)
259 outputDir = os.path.dirname(newFullPath)
261 # We do not have to special case FileResourcePath here because
262 # as_local handles that. If remote download, download it to the
263 # destination directory to allow an atomic rename but only if that
264 # directory exists because we do not want to create a directory
265 # but then end up with the download failing.
266 tmpdir = outputDir if os.path.exists(outputDir) else None
267 with src.as_local(multithreaded=multithreaded, tmpdir=tmpdir) as local_uri:
268 is_temporary = local_uri.isTemporary
269 local_src = local_uri.ospath
271 # Short circuit if the URIs are identical immediately.
272 if self == local_uri:
273 log.debug(
274 "Target and destination URIs are identical: %s, returning immediately."
275 " No further action required.",
276 self,
277 )
278 return
280 # Default transfer mode depends on whether we have a temporary
281 # file or not.
282 if transfer == "auto":
283 transfer = self.transferDefault if not is_temporary else "copy"
285 if not os.path.exists(local_src):
286 if is_temporary:
287 if src == local_uri:
288 msg = f"Local temporary file {src} has gone missing."
289 else:
290 # This will not happen in normal scenarios.
291 msg = f"Local file {local_uri} downloaded from {src} has gone missing"
292 else:
293 msg = f"Source URI {src} does not exist"
294 raise FileNotFoundError(msg)
296 # Follow soft links
297 local_src = os.path.realpath(os.path.normpath(local_src))
299 # Creating a symlink to a local copy of a remote resource
300 # should never work. Creating a hardlink will work but should
301 # not be allowed since it is highly unlikely that this is ever
302 # an intended option and depends on the local target being
303 # on the same file system as was used for the temporary file
304 # download.
305 # If a symlink is being requested for a local temporary file
306 # that is likely undesirable but should not be refused.
307 if is_temporary and src != local_uri and "link" in transfer:
308 raise RuntimeError(
309 f"Can not use local file system transfer mode {transfer} for remote resource ({src})"
310 )
311 elif is_temporary and src == local_uri and "symlink" in transfer:
312 log.debug(
313 "Using a symlink for a temporary resource may lead to unexpected downstream failures."
314 )
316 # For temporary files we can own them if we created it.
317 requested_transfer = transfer
318 if src != local_uri and is_temporary and transfer == "copy":
319 transfer = "move"
321 if not os.path.isdir(outputDir):
322 # Must create the directory -- this can not be rolled back
323 # since another transfer running concurrently may
324 # be relying on this existing.
325 _create_directories(outputDir)
327 if transaction is None:
328 # Use a no-op transaction to reduce code duplication
329 transaction = NoTransaction()
331 # For links the OS doesn't let us overwrite so if something does
332 # exist we have to remove it before we do the actual "transfer"
333 # below
334 if "link" in transfer and overwrite and dest_stat:
335 with contextlib.suppress(Exception):
336 # If this fails we ignore it since it's a problem
337 # that will manifest immediately below with a more relevant
338 # error message
339 self.remove()
341 if transfer == "move":
342 # If a rename works we try that since that is guaranteed to
343 # be atomic. If that fails we copy and rename. We do this
344 # in case other processes are trying to move to the same
345 # file and we want the "winner" to not be corrupted.
346 try:
347 with transaction.undoWith(f"move from {local_src}", os.rename, newFullPath, local_src):
348 os.rename(local_src, newFullPath)
349 except OSError:
350 with self.temporary_uri(prefix=self.parent(), suffix=self.getExtension()) as temp_copy:
351 shutil.copy(local_src, temp_copy.ospath)
352 with transaction.undoWith(
353 f"move from {local_src}",
354 shutil.move,
355 newFullPath,
356 local_src,
357 copy_function=shutil.copy,
358 ):
359 os.rename(temp_copy.ospath, newFullPath)
360 os.remove(local_src)
361 elif transfer == "copy":
362 # We want atomic copies so first copy to a temp location in
363 # the same output directory. This at least guarantees that
364 # if multiple processes are writing to the same file
365 # simultaneously the file we end up with will not be corrupt.
366 if overwrite:
367 with self.temporary_uri(prefix=self.parent(), suffix=self.getExtension()) as temp_copy:
368 shutil.copy(local_src, temp_copy.ospath)
369 with transaction.undoWith(f"copy from {local_src}", os.remove, newFullPath):
370 os.rename(temp_copy.ospath, newFullPath)
371 else:
372 # Create the file exclusively to ensure that no others are
373 # trying to write.
374 temp_path = newFullPath + ".transfer-tmp"
375 try:
376 with open(temp_path, "x"):
377 pass
378 except FileExistsError:
379 raise FileExistsError(
380 f"Another process is writing to '{self}'."
381 f" Transfer from {src} cannot be completed."
382 )
383 with transaction.undoWith(f"copy from {local_src}", os.remove, temp_path):
384 # Make sure file is writable, no matter the umask.
385 st = os.stat(temp_path)
386 os.chmod(temp_path, st.st_mode | stat.S_IWUSR)
387 shutil.copy(local_src, temp_path)
388 # Use link/remove to atomically and exclusively move the
389 # file into place (only one concurrent linker can win).
390 try:
391 os.link(temp_path, newFullPath)
392 except FileExistsError:
393 raise FileExistsError(
394 f"Another process wrote to '{self}'. Transfer from {src} cannot be completed."
395 )
396 finally:
397 os.remove(temp_path)
398 elif transfer == "link":
399 # Try hard link and if that fails use a symlink
400 with transaction.undoWith(f"link to {local_src}", os.remove, newFullPath):
401 try:
402 os.link(local_src, newFullPath)
403 except OSError:
404 # Read through existing symlinks
405 os.symlink(local_src, newFullPath)
406 elif transfer == "hardlink":
407 with transaction.undoWith(f"hardlink to {local_src}", os.remove, newFullPath):
408 os.link(local_src, newFullPath)
409 elif transfer == "symlink":
410 # Read through existing symlinks
411 with transaction.undoWith(f"symlink to {local_src}", os.remove, newFullPath):
412 os.symlink(local_src, newFullPath)
413 elif transfer == "relsymlink":
414 # This is a standard symlink but using a relative path
415 # Need the directory name to give to relative root
416 # A full file path confuses it into an extra ../
417 newFullPathRoot = os.path.dirname(newFullPath)
418 relPath = os.path.relpath(local_src, newFullPathRoot)
419 with transaction.undoWith(f"relsymlink to {local_src}", os.remove, newFullPath):
420 os.symlink(relPath, newFullPath)
421 else:
422 raise NotImplementedError(f"Transfer type '{transfer}' not supported.")
424 # This was an explicit move requested from a remote resource
425 # try to remove that remote resource. We check is_temporary because
426 # the local file would have been moved by shutil.move already.
427 if requested_transfer == "move" and is_temporary and src != local_uri:
428 # Transactions do not work here
429 src.remove()
431 def walk(
432 self, file_filter: str | re.Pattern | None = None
433 ) -> Iterator[list | tuple[ResourcePath, list[str], list[str]]]:
434 """Walk the directory tree returning matching files and directories.
436 Parameters
437 ----------
438 file_filter : `str` or `re.Pattern`, optional
439 Regex to filter out files from the list before it is returned.
441 Yields
442 ------
443 dirpath : `ResourcePath`
444 Current directory being examined.
445 dirnames : `list` of `str`
446 Names of subdirectories within dirpath.
447 filenames : `list` of `str`
448 Names of all the files within dirpath.
449 """
450 if not self.isdir():
451 raise ValueError("Can not walk a non-directory URI")
453 if isinstance(file_filter, str):
454 file_filter = re.compile(file_filter)
456 for root, dirs, files in os.walk(self.ospath, followlinks=True):
457 # Filter by the regex
458 if file_filter is not None:
459 files = [f for f in files if file_filter.search(f)]
460 yield type(self)(root, forceAbsolute=False, forceDirectory=True), dirs, files
462 @classmethod
463 def _fixupPathUri(
464 cls,
465 parsed: urllib.parse.ParseResult,
466 root: ResourcePath | None = None,
467 forceAbsolute: bool = False,
468 forceDirectory: bool | None = None,
469 ) -> tuple[urllib.parse.ParseResult, bool | None]:
470 """Fix up relative paths in URI instances.
472 Parameters
473 ----------
474 parsed : `~urllib.parse.ParseResult`
475 The result from parsing a URI using `urllib.parse`.
476 root : `ResourcePath`, optional
477 Path to use as root when converting relative to absolute.
478 If `None`, it will be the current working directory. It is only
479 used if a file-scheme is used incorrectly with a relative path.
480 forceAbsolute : `bool`, ignored
481 Has no effect for this subclass. ``file`` URIs are always
482 absolute.
483 forceDirectory : `bool`, optional
484 If `True` forces the URI to end with a separator, otherwise given
485 URI is interpreted as is.
487 Returns
488 -------
489 modified : `~urllib.parse.ParseResult`
490 Update result if a URI is being handled.
491 dirLike : `bool` or `None`
492 `True` if given parsed URI has a trailing separator or
493 ``forceDirectory`` is `True`. Otherwise can return the given
494 value of ``forceDirectory``.
496 Notes
497 -----
498 Relative paths are explicitly not supported by RFC8089 but `urllib`
499 does accept URIs of the form ``file:relative/path.ext``. They need
500 to be turned into absolute paths before they can be used. This is
501 always done regardless of the ``forceAbsolute`` parameter.
502 """
503 # assume we are not dealing with a directory like URI
504 dirLike = forceDirectory
506 # file URI implies POSIX path separators so split as POSIX,
507 # then join as os, and convert to abspath. Do not handle
508 # home directories since "file" scheme is explicitly documented
509 # to not do tilde expansion.
510 sep = posixpath.sep
512 # Consistency check.
513 if forceDirectory is False and parsed.path.endswith(sep):
514 raise ValueError(
515 f"URI {parsed.geturl()} ends with {sep} but "
516 "forceDirectory parameter declares it to be a file."
517 )
519 # For an absolute path all we need to do is check if we need
520 # to force the directory separator
521 if posixpath.isabs(parsed.path):
522 if forceDirectory:
523 if not parsed.path.endswith(sep):
524 parsed = parsed._replace(path=parsed.path + sep)
525 dirLike = True
526 return copy.copy(parsed), dirLike
528 # Relative path so must fix it to be compliant with the standard
530 # Replacement values for the URI
531 replacements = {}
533 if root is None:
534 root_str = os.path.abspath(os.path.curdir)
535 else:
536 if root.scheme and root.scheme != "file":
537 raise RuntimeError(f"The override root must be a file URI not {root.scheme}")
538 root_str = os.path.abspath(root.ospath)
540 replacements["path"] = posixpath.normpath(posixpath.join(os2posix(root_str), parsed.path))
542 # normpath strips trailing "/" so put it back if necessary
543 # Acknowledge that trailing separator exists.
544 if forceDirectory or (parsed.path.endswith(sep) and not replacements["path"].endswith(sep)):
545 replacements["path"] += sep
546 dirLike = True
548 # ParseResult is a NamedTuple so _replace is standard API
549 parsed = parsed._replace(**replacements)
551 if parsed.params or parsed.query:
552 log.warning("Additional items unexpectedly encountered in file URI: %s", parsed.geturl())
554 return parsed, dirLike
556 @contextlib.contextmanager
557 def _openImpl(
558 self,
559 mode: str = "r",
560 *,
561 encoding: str | None = None,
562 ) -> Iterator[IO]:
563 with FileResourceHandle(mode=mode, log=log, uri=self, encoding=encoding) as buffer:
564 yield buffer # type: ignore
566 def to_fsspec(self) -> tuple[AbstractFileSystem, str]:
567 """Return an abstract file system and path that can be used by fsspec.
569 Returns
570 -------
571 fs : `fsspec.spec.AbstractFileSystem`
572 A file system object suitable for use with the returned path.
573 path : `str`
574 A path that can be opened by the file system object.
575 """
576 if fsspec is None:
577 raise ImportError("fsspec is not available")
578 # fsspec does not like URL encodings in file URIs so pass it the os
579 # path instead.
580 return fsspec.url_to_fs(self.ospath)
583def _create_directories(name: str | bytes) -> None:
584 """Create a directory and all of its parent directories that don't yet
585 exist.
587 Parameters
588 ----------
589 name : `str` or `bytes`
590 Path to the directory to be created
592 Notes
593 -----
594 The code in this function is duplicated from the Python standard library
595 function os.makedirs with one change: if the user has set a process umask
596 that prevents us from creating/accessing files in the newly created
597 directories, the permissions of the directories are altered to allow
598 owner-write and owner-traverse so that they can be used.
599 """
600 # These are optional parameters in the original function, but they can be
601 # constant here.
602 mode = 0o777
603 exist_ok = True
605 head, tail = os.path.split(name)
606 if not tail:
607 head, tail = os.path.split(head)
608 if head and tail and not os.path.exists(head):
609 try:
610 _create_directories(head)
611 except FileExistsError:
612 # Defeats race condition when another thread created the path
613 pass
614 cdir: str | bytes = os.curdir
615 if isinstance(tail, bytes):
616 cdir = bytes(os.curdir, "ASCII")
617 if tail == cdir: # xxx/newdir/. exists if xxx/newdir exists
618 return
619 try:
620 os.mkdir(name, mode)
621 # This is the portion that is modified relative to the standard library
622 # version of the function.
623 ensure_directory_is_writeable(name)
624 # end modified portion
625 except OSError:
626 # Cannot rely on checking for EEXIST, since the operating system
627 # could give priority to other errors like EACCES or EROFS
628 if not exist_ok or not os.path.isdir(name):
629 raise