Coverage for python / lsst / resources / file.py: 0%

245 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-17 08:44 +0000

1# This file is part of lsst-resources. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14__all__ = ("FileResourcePath",) 

15 

16import contextlib 

17import copy 

18import datetime 

19import logging 

20import os 

21import os.path 

22import posixpath 

23import re 

24import shutil 

25import stat 

26import urllib.parse 

27from collections.abc import Iterator 

28from pathlib import Path 

29from typing import IO, TYPE_CHECKING 

30 

31from ._resourceHandles._fileResourceHandle import FileResourceHandle 

32from ._resourcePath import ResourceInfo, ResourcePath 

33from .utils import NoTransaction, ensure_directory_is_writeable, os2posix, posix2os 

34 

35try: 

36 import fsspec 

37 from fsspec.spec import AbstractFileSystem 

38except ImportError: 

39 fsspec = None 

40 AbstractFileSystem = type 

41 

42if TYPE_CHECKING: 

43 from importlib.resources.abc import Traversable 

44 

45 from .utils import TransactionProtocol 

46 

47 

48log = logging.getLogger(__name__) 

49 

50 

51def _path_to_info(uri: str, path: str | Path | Traversable) -> ResourceInfo | None: 

52 """Given a path to a local file, return a `ResourceInfo`.""" 

53 if isinstance(path, Path): 

54 stat_result = path.stat() 

55 elif isinstance(path, str): 

56 stat_result = os.stat(path) 

57 elif (stat_method := getattr(path, "stat", None)) and callable(stat_method): 

58 # Edge case triggered by importlib.resources. 

59 stat_result = stat_method() 

60 if not isinstance(stat_result, os.stat_result): 

61 raise RuntimeError(f"Unexpected stat result from {path}.stat()") 

62 else: 

63 return None 

64 

65 return ResourceInfo( 

66 uri=uri, 

67 is_file=not stat.S_ISDIR(stat_result.st_mode), 

68 size=0 if stat.S_ISDIR(stat_result.st_mode) else stat_result.st_size, 

69 last_modified=datetime.datetime.fromtimestamp(stat_result.st_mtime, tz=datetime.UTC), 

70 checksums={}, 

71 ) 

72 

73 

74class FileResourcePath(ResourcePath): 

75 """Path for explicit ``file`` URI scheme.""" 

76 

77 transferModes = ("copy", "link", "symlink", "hardlink", "relsymlink", "auto", "move") 

78 transferDefault: str = "link" 

79 

80 # By definition refers to a local file 

81 isLocal = True 

82 

83 @property 

84 def ospath(self) -> str: 

85 """Path component of the URI localized to current OS. 

86 

87 Will unquote URI path since a formal URI must include the quoting. 

88 """ 

89 return urllib.parse.unquote(posix2os(self._uri.path)) 

90 

91 def exists(self) -> bool: 

92 """Indicate that the file exists.""" 

93 # Uses os.path.exists so if there is a soft link that points 

94 # to a file that no longer exists this will return False 

95 return os.path.exists(self.ospath) 

96 

97 def size(self) -> int: 

98 """Return the size of the file in bytes.""" 

99 if not os.path.isdir(self.ospath): 

100 stat = os.stat(self.ospath) 

101 sz = stat.st_size 

102 else: 

103 sz = 0 

104 return sz 

105 

106 def get_info(self) -> ResourceInfo: 

107 """Return lightweight metadata about this file.""" 

108 info = _path_to_info(str(self), self.ospath) 

109 if info is None: 

110 raise RuntimeError(f"Unexpected internal failure obtaining file info for {self}") 

111 return info 

112 

113 def remove(self) -> None: 

114 """Remove the resource.""" 

115 os.remove(self.ospath) 

116 

117 @contextlib.contextmanager 

118 def _as_local( 

119 self, multithreaded: bool = True, tmpdir: ResourcePath | None = None 

120 ) -> Iterator[ResourcePath]: 

121 """Return the local path of the file. 

122 

123 This is an internal helper for ``as_local()``. 

124 

125 Parameters 

126 ---------- 

127 multithreaded : `bool`, optional 

128 Unused. 

129 tmpdir : `ResourcePath` or `None`, optional 

130 Unused. 

131 

132 Returns 

133 ------- 

134 local_uri : `ResourcePath` 

135 A local URI. In this case it will be itself. 

136 """ 

137 yield self 

138 

139 def read(self, size: int = -1) -> bytes: 

140 with open(self.ospath, "rb") as fh: 

141 return fh.read(size) 

142 

143 def write(self, data: bytes, overwrite: bool = True) -> None: 

144 dir = os.path.dirname(self.ospath) 

145 if dir and not os.path.exists(dir): 

146 _create_directories(dir) 

147 mode = "wb" if overwrite else "xb" 

148 with open(self.ospath, mode) as f: 

149 f.write(data) 

150 

151 def mkdir(self) -> None: 

152 """Make the directory associated with this URI. 

153 

154 An attempt will be made to create the directory even if the URI 

155 looks like a file. 

156 

157 Raises 

158 ------ 

159 NotADirectoryError: 

160 Raised if a non-directory already exists. 

161 """ 

162 try: 

163 _create_directories(self.ospath) 

164 except FileExistsError: 

165 raise NotADirectoryError(f"{self.ospath} exists but is not a directory.") from None 

166 

167 def isdir(self) -> bool: 

168 """Return whether this URI is a directory. 

169 

170 Returns 

171 ------- 

172 isdir : `bool` 

173 `True` if this URI is a directory or looks like a directory, 

174 else `False`. 

175 """ 

176 if self.dirLike is None: 

177 # Cache state for next time. 

178 self.dirLike = os.path.isdir(self.ospath) 

179 return self.dirLike 

180 

181 def transfer_from( 

182 self, 

183 src: ResourcePath, 

184 transfer: str, 

185 overwrite: bool = False, 

186 transaction: TransactionProtocol | None = None, 

187 multithreaded: bool = True, 

188 ) -> None: 

189 """Transfer the current resource to a local file. 

190 

191 Parameters 

192 ---------- 

193 src : `ResourcePath` 

194 Source URI. 

195 transfer : `str` 

196 Mode to use for transferring the resource. Supports the following 

197 options: copy, link, symlink, hardlink, relsymlink. 

198 overwrite : `bool`, optional 

199 Allow an existing file to be overwritten. Defaults to `False`. 

200 transaction : `~lsst.resources.utils.TransactionProtocol`, optional 

201 If a transaction is provided, undo actions will be registered. 

202 multithreaded : `bool`, optional 

203 Whether threads are allowed to be used or not. 

204 """ 

205 # Fail early to prevent delays if remote resources are requested 

206 if transfer not in self.transferModes: 

207 raise ValueError(f"Transfer mode '{transfer}' not supported by URI scheme {self.scheme}") 

208 

209 # Existence checks can take time so only try if the log message 

210 # will be issued. 

211 if log.isEnabledFor(logging.DEBUG): 

212 log.debug( 

213 "Transferring %s [exists: %s] -> %s [exists: %s] (transfer=%s)", 

214 src, 

215 src.exists(), 

216 self, 

217 self.exists(), 

218 transfer, 

219 ) 

220 

221 # The output location should not exist unless overwrite=True. 

222 # Rather than use `exists()`, use os.stat since we might need 

223 # the full answer later. 

224 dest_stat: os.stat_result | None 

225 try: 

226 # Do not read through links of the file itself. 

227 dest_stat = os.lstat(self.ospath) 

228 except FileNotFoundError: 

229 dest_stat = None 

230 

231 # It is possible that the source URI and target URI refer 

232 # to the same file. This can happen for a number of reasons 

233 # (such as soft links in the path, or they really are the same). 

234 # In that case log a message and return as if the transfer 

235 # completed (it technically did). A temporary file download 

236 # can't be the same so the test can be skipped. 

237 if dest_stat and src.isLocal and not src.isTemporary: 

238 # Be consistent and use lstat here (even though realpath 

239 # has been called). It does not harm. 

240 local_src_stat = os.lstat(src.ospath) 

241 if dest_stat.st_ino == local_src_stat.st_ino and dest_stat.st_dev == local_src_stat.st_dev: 

242 log.debug( 

243 "Destination URI %s is the same file as source URI %s, returning immediately." 

244 " No further action required.", 

245 self, 

246 src, 

247 ) 

248 return 

249 

250 if not overwrite and dest_stat: 

251 raise FileExistsError( 

252 f"Destination path '{self}' already exists. Transfer from {src} cannot be completed." 

253 ) 

254 

255 # Make the destination path absolute (but don't follow links since 

256 # that would possibly cause us to end up in the wrong place if the 

257 # file existed already as a soft link) 

258 newFullPath = os.path.abspath(self.ospath) 

259 outputDir = os.path.dirname(newFullPath) 

260 

261 # We do not have to special case FileResourcePath here because 

262 # as_local handles that. If remote download, download it to the 

263 # destination directory to allow an atomic rename but only if that 

264 # directory exists because we do not want to create a directory 

265 # but then end up with the download failing. 

266 tmpdir = outputDir if os.path.exists(outputDir) else None 

267 with src.as_local(multithreaded=multithreaded, tmpdir=tmpdir) as local_uri: 

268 is_temporary = local_uri.isTemporary 

269 local_src = local_uri.ospath 

270 

271 # Short circuit if the URIs are identical immediately. 

272 if self == local_uri: 

273 log.debug( 

274 "Target and destination URIs are identical: %s, returning immediately." 

275 " No further action required.", 

276 self, 

277 ) 

278 return 

279 

280 # Default transfer mode depends on whether we have a temporary 

281 # file or not. 

282 if transfer == "auto": 

283 transfer = self.transferDefault if not is_temporary else "copy" 

284 

285 if not os.path.exists(local_src): 

286 if is_temporary: 

287 if src == local_uri: 

288 msg = f"Local temporary file {src} has gone missing." 

289 else: 

290 # This will not happen in normal scenarios. 

291 msg = f"Local file {local_uri} downloaded from {src} has gone missing" 

292 else: 

293 msg = f"Source URI {src} does not exist" 

294 raise FileNotFoundError(msg) 

295 

296 # Follow soft links 

297 local_src = os.path.realpath(os.path.normpath(local_src)) 

298 

299 # Creating a symlink to a local copy of a remote resource 

300 # should never work. Creating a hardlink will work but should 

301 # not be allowed since it is highly unlikely that this is ever 

302 # an intended option and depends on the local target being 

303 # on the same file system as was used for the temporary file 

304 # download. 

305 # If a symlink is being requested for a local temporary file 

306 # that is likely undesirable but should not be refused. 

307 if is_temporary and src != local_uri and "link" in transfer: 

308 raise RuntimeError( 

309 f"Can not use local file system transfer mode {transfer} for remote resource ({src})" 

310 ) 

311 elif is_temporary and src == local_uri and "symlink" in transfer: 

312 log.debug( 

313 "Using a symlink for a temporary resource may lead to unexpected downstream failures." 

314 ) 

315 

316 # For temporary files we can own them if we created it. 

317 requested_transfer = transfer 

318 if src != local_uri and is_temporary and transfer == "copy": 

319 transfer = "move" 

320 

321 if not os.path.isdir(outputDir): 

322 # Must create the directory -- this can not be rolled back 

323 # since another transfer running concurrently may 

324 # be relying on this existing. 

325 _create_directories(outputDir) 

326 

327 if transaction is None: 

328 # Use a no-op transaction to reduce code duplication 

329 transaction = NoTransaction() 

330 

331 # For links the OS doesn't let us overwrite so if something does 

332 # exist we have to remove it before we do the actual "transfer" 

333 # below 

334 if "link" in transfer and overwrite and dest_stat: 

335 with contextlib.suppress(Exception): 

336 # If this fails we ignore it since it's a problem 

337 # that will manifest immediately below with a more relevant 

338 # error message 

339 self.remove() 

340 

341 if transfer == "move": 

342 # If a rename works we try that since that is guaranteed to 

343 # be atomic. If that fails we copy and rename. We do this 

344 # in case other processes are trying to move to the same 

345 # file and we want the "winner" to not be corrupted. 

346 try: 

347 with transaction.undoWith(f"move from {local_src}", os.rename, newFullPath, local_src): 

348 os.rename(local_src, newFullPath) 

349 except OSError: 

350 with self.temporary_uri(prefix=self.parent(), suffix=self.getExtension()) as temp_copy: 

351 shutil.copy(local_src, temp_copy.ospath) 

352 with transaction.undoWith( 

353 f"move from {local_src}", 

354 shutil.move, 

355 newFullPath, 

356 local_src, 

357 copy_function=shutil.copy, 

358 ): 

359 os.rename(temp_copy.ospath, newFullPath) 

360 os.remove(local_src) 

361 elif transfer == "copy": 

362 # We want atomic copies so first copy to a temp location in 

363 # the same output directory. This at least guarantees that 

364 # if multiple processes are writing to the same file 

365 # simultaneously the file we end up with will not be corrupt. 

366 if overwrite: 

367 with self.temporary_uri(prefix=self.parent(), suffix=self.getExtension()) as temp_copy: 

368 shutil.copy(local_src, temp_copy.ospath) 

369 with transaction.undoWith(f"copy from {local_src}", os.remove, newFullPath): 

370 os.rename(temp_copy.ospath, newFullPath) 

371 else: 

372 # Create the file exclusively to ensure that no others are 

373 # trying to write. 

374 temp_path = newFullPath + ".transfer-tmp" 

375 try: 

376 with open(temp_path, "x"): 

377 pass 

378 except FileExistsError: 

379 raise FileExistsError( 

380 f"Another process is writing to '{self}'." 

381 f" Transfer from {src} cannot be completed." 

382 ) 

383 with transaction.undoWith(f"copy from {local_src}", os.remove, temp_path): 

384 # Make sure file is writable, no matter the umask. 

385 st = os.stat(temp_path) 

386 os.chmod(temp_path, st.st_mode | stat.S_IWUSR) 

387 shutil.copy(local_src, temp_path) 

388 # Use link/remove to atomically and exclusively move the 

389 # file into place (only one concurrent linker can win). 

390 try: 

391 os.link(temp_path, newFullPath) 

392 except FileExistsError: 

393 raise FileExistsError( 

394 f"Another process wrote to '{self}'. Transfer from {src} cannot be completed." 

395 ) 

396 finally: 

397 os.remove(temp_path) 

398 elif transfer == "link": 

399 # Try hard link and if that fails use a symlink 

400 with transaction.undoWith(f"link to {local_src}", os.remove, newFullPath): 

401 try: 

402 os.link(local_src, newFullPath) 

403 except OSError: 

404 # Read through existing symlinks 

405 os.symlink(local_src, newFullPath) 

406 elif transfer == "hardlink": 

407 with transaction.undoWith(f"hardlink to {local_src}", os.remove, newFullPath): 

408 os.link(local_src, newFullPath) 

409 elif transfer == "symlink": 

410 # Read through existing symlinks 

411 with transaction.undoWith(f"symlink to {local_src}", os.remove, newFullPath): 

412 os.symlink(local_src, newFullPath) 

413 elif transfer == "relsymlink": 

414 # This is a standard symlink but using a relative path 

415 # Need the directory name to give to relative root 

416 # A full file path confuses it into an extra ../ 

417 newFullPathRoot = os.path.dirname(newFullPath) 

418 relPath = os.path.relpath(local_src, newFullPathRoot) 

419 with transaction.undoWith(f"relsymlink to {local_src}", os.remove, newFullPath): 

420 os.symlink(relPath, newFullPath) 

421 else: 

422 raise NotImplementedError(f"Transfer type '{transfer}' not supported.") 

423 

424 # This was an explicit move requested from a remote resource 

425 # try to remove that remote resource. We check is_temporary because 

426 # the local file would have been moved by shutil.move already. 

427 if requested_transfer == "move" and is_temporary and src != local_uri: 

428 # Transactions do not work here 

429 src.remove() 

430 

431 def walk( 

432 self, file_filter: str | re.Pattern | None = None 

433 ) -> Iterator[list | tuple[ResourcePath, list[str], list[str]]]: 

434 """Walk the directory tree returning matching files and directories. 

435 

436 Parameters 

437 ---------- 

438 file_filter : `str` or `re.Pattern`, optional 

439 Regex to filter out files from the list before it is returned. 

440 

441 Yields 

442 ------ 

443 dirpath : `ResourcePath` 

444 Current directory being examined. 

445 dirnames : `list` of `str` 

446 Names of subdirectories within dirpath. 

447 filenames : `list` of `str` 

448 Names of all the files within dirpath. 

449 """ 

450 if not self.isdir(): 

451 raise ValueError("Can not walk a non-directory URI") 

452 

453 if isinstance(file_filter, str): 

454 file_filter = re.compile(file_filter) 

455 

456 for root, dirs, files in os.walk(self.ospath, followlinks=True): 

457 # Filter by the regex 

458 if file_filter is not None: 

459 files = [f for f in files if file_filter.search(f)] 

460 yield type(self)(root, forceAbsolute=False, forceDirectory=True), dirs, files 

461 

462 @classmethod 

463 def _fixupPathUri( 

464 cls, 

465 parsed: urllib.parse.ParseResult, 

466 root: ResourcePath | None = None, 

467 forceAbsolute: bool = False, 

468 forceDirectory: bool | None = None, 

469 ) -> tuple[urllib.parse.ParseResult, bool | None]: 

470 """Fix up relative paths in URI instances. 

471 

472 Parameters 

473 ---------- 

474 parsed : `~urllib.parse.ParseResult` 

475 The result from parsing a URI using `urllib.parse`. 

476 root : `ResourcePath`, optional 

477 Path to use as root when converting relative to absolute. 

478 If `None`, it will be the current working directory. It is only 

479 used if a file-scheme is used incorrectly with a relative path. 

480 forceAbsolute : `bool`, ignored 

481 Has no effect for this subclass. ``file`` URIs are always 

482 absolute. 

483 forceDirectory : `bool`, optional 

484 If `True` forces the URI to end with a separator, otherwise given 

485 URI is interpreted as is. 

486 

487 Returns 

488 ------- 

489 modified : `~urllib.parse.ParseResult` 

490 Update result if a URI is being handled. 

491 dirLike : `bool` or `None` 

492 `True` if given parsed URI has a trailing separator or 

493 ``forceDirectory`` is `True`. Otherwise can return the given 

494 value of ``forceDirectory``. 

495 

496 Notes 

497 ----- 

498 Relative paths are explicitly not supported by RFC8089 but `urllib` 

499 does accept URIs of the form ``file:relative/path.ext``. They need 

500 to be turned into absolute paths before they can be used. This is 

501 always done regardless of the ``forceAbsolute`` parameter. 

502 """ 

503 # assume we are not dealing with a directory like URI 

504 dirLike = forceDirectory 

505 

506 # file URI implies POSIX path separators so split as POSIX, 

507 # then join as os, and convert to abspath. Do not handle 

508 # home directories since "file" scheme is explicitly documented 

509 # to not do tilde expansion. 

510 sep = posixpath.sep 

511 

512 # Consistency check. 

513 if forceDirectory is False and parsed.path.endswith(sep): 

514 raise ValueError( 

515 f"URI {parsed.geturl()} ends with {sep} but " 

516 "forceDirectory parameter declares it to be a file." 

517 ) 

518 

519 # For an absolute path all we need to do is check if we need 

520 # to force the directory separator 

521 if posixpath.isabs(parsed.path): 

522 if forceDirectory: 

523 if not parsed.path.endswith(sep): 

524 parsed = parsed._replace(path=parsed.path + sep) 

525 dirLike = True 

526 return copy.copy(parsed), dirLike 

527 

528 # Relative path so must fix it to be compliant with the standard 

529 

530 # Replacement values for the URI 

531 replacements = {} 

532 

533 if root is None: 

534 root_str = os.path.abspath(os.path.curdir) 

535 else: 

536 if root.scheme and root.scheme != "file": 

537 raise RuntimeError(f"The override root must be a file URI not {root.scheme}") 

538 root_str = os.path.abspath(root.ospath) 

539 

540 replacements["path"] = posixpath.normpath(posixpath.join(os2posix(root_str), parsed.path)) 

541 

542 # normpath strips trailing "/" so put it back if necessary 

543 # Acknowledge that trailing separator exists. 

544 if forceDirectory or (parsed.path.endswith(sep) and not replacements["path"].endswith(sep)): 

545 replacements["path"] += sep 

546 dirLike = True 

547 

548 # ParseResult is a NamedTuple so _replace is standard API 

549 parsed = parsed._replace(**replacements) 

550 

551 if parsed.params or parsed.query: 

552 log.warning("Additional items unexpectedly encountered in file URI: %s", parsed.geturl()) 

553 

554 return parsed, dirLike 

555 

556 @contextlib.contextmanager 

557 def _openImpl( 

558 self, 

559 mode: str = "r", 

560 *, 

561 encoding: str | None = None, 

562 ) -> Iterator[IO]: 

563 with FileResourceHandle(mode=mode, log=log, uri=self, encoding=encoding) as buffer: 

564 yield buffer # type: ignore 

565 

566 def to_fsspec(self) -> tuple[AbstractFileSystem, str]: 

567 """Return an abstract file system and path that can be used by fsspec. 

568 

569 Returns 

570 ------- 

571 fs : `fsspec.spec.AbstractFileSystem` 

572 A file system object suitable for use with the returned path. 

573 path : `str` 

574 A path that can be opened by the file system object. 

575 """ 

576 if fsspec is None: 

577 raise ImportError("fsspec is not available") 

578 # fsspec does not like URL encodings in file URIs so pass it the os 

579 # path instead. 

580 return fsspec.url_to_fs(self.ospath) 

581 

582 

583def _create_directories(name: str | bytes) -> None: 

584 """Create a directory and all of its parent directories that don't yet 

585 exist. 

586 

587 Parameters 

588 ---------- 

589 name : `str` or `bytes` 

590 Path to the directory to be created 

591 

592 Notes 

593 ----- 

594 The code in this function is duplicated from the Python standard library 

595 function os.makedirs with one change: if the user has set a process umask 

596 that prevents us from creating/accessing files in the newly created 

597 directories, the permissions of the directories are altered to allow 

598 owner-write and owner-traverse so that they can be used. 

599 """ 

600 # These are optional parameters in the original function, but they can be 

601 # constant here. 

602 mode = 0o777 

603 exist_ok = True 

604 

605 head, tail = os.path.split(name) 

606 if not tail: 

607 head, tail = os.path.split(head) 

608 if head and tail and not os.path.exists(head): 

609 try: 

610 _create_directories(head) 

611 except FileExistsError: 

612 # Defeats race condition when another thread created the path 

613 pass 

614 cdir: str | bytes = os.curdir 

615 if isinstance(tail, bytes): 

616 cdir = bytes(os.curdir, "ASCII") 

617 if tail == cdir: # xxx/newdir/. exists if xxx/newdir exists 

618 return 

619 try: 

620 os.mkdir(name, mode) 

621 # This is the portion that is modified relative to the standard library 

622 # version of the function. 

623 ensure_directory_is_writeable(name) 

624 # end modified portion 

625 except OSError: 

626 # Cannot rely on checking for EEXIST, since the operating system 

627 # could give priority to other errors like EACCES or EROFS 

628 if not exist_ok or not os.path.isdir(name): 

629 raise