Coverage for python / lsst / resources / dav.py: 29%

367 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-28 08:32 +0000

1# This file is part of lsst-resources. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14__all__ = ("DavResourcePath",) 

15 

16import contextlib 

17import datetime 

18import functools 

19import io 

20import logging 

21import os 

22import re 

23import threading 

24import urllib 

25from collections.abc import Iterator 

26from typing import TYPE_CHECKING, Any, BinaryIO, cast 

27 

28try: 

29 from typing import override # Python 3.12+ 

30except ImportError: 

31 from typing_extensions import override # Python 3.11 

32 

33try: 

34 import fsspec 

35 from fsspec.spec import AbstractFileSystem 

36except ImportError: 

37 fsspec = None 

38 AbstractFileSystem = type 

39 

40from ._resourceHandles import ResourceHandleProtocol 

41from ._resourceHandles._davResourceHandle import DavReadResourceHandle 

42from ._resourcePath import ResourceInfo, ResourcePath, ResourcePathExpression 

43from .davutils import ( 

44 DavClient, 

45 DavClientPool, 

46 DavConfigPool, 

47 DavFileMetadata, 

48 normalize_path, 

49 normalize_url, 

50) 

51from .utils import get_tempdir 

52 

53if TYPE_CHECKING: 

54 from .utils import TransactionProtocol 

55 

56from lsst.utils.logging import getLogger 

57 

58log = getLogger(__name__) 

59 

60 

61@functools.lru_cache 

62def _calc_tmpdir_buffer_size(tmpdir: str) -> int: 

63 """Compute the block size to use for writing files in `tmpdir` as 

64 256 blocks of typical size (i.e. 4096 bytes) or 10 times the file system 

65 block size, whichever is higher. 

66 

67 This is a reasonable compromise between using memory for buffering and 

68 the number of system calls issued to read from or write to temporary 

69 files. 

70 """ 

71 fsstats = os.statvfs(tmpdir) 

72 return max(10 * fsstats.f_bsize, 256 * 4096) 

73 

74 

75class DavResourcePathConfig: 

76 """Configuration class to encapsulate the configurable items used by 

77 all instances of class `DavResourcePath`. 

78 

79 Instantiating this class creates a thread-safe singleton. 

80 """ 

81 

82 _instance = None 

83 _lock = threading.Lock() 

84 

85 def __new__(cls) -> DavResourcePathConfig: 

86 if cls._instance is None: 86 ↛ 91line 86 didn't jump to line 91 because the condition on line 86 was always true

87 with cls._lock: 

88 if cls._instance is None: 88 ↛ 91line 88 didn't jump to line 91

89 cls._instance = super().__new__(cls) 

90 

91 return cls._instance 

92 

93 def __init__(self) -> None: 

94 # Path to the local temporary directory all instances of 

95 # `DavResourcePath` must use and its associated buffer size (in bytes). 

96 self._tmpdir_buffersize: tuple[str, int] | None = None 

97 

98 @property 

99 def tmpdir_buffersize(self) -> tuple[str, int]: 

100 """Return the path to a temporary directory and the preferred buffer 

101 size to use when reading/writing files from/to that directory. 

102 """ 

103 if self._tmpdir_buffersize is not None: 

104 return self._tmpdir_buffersize 

105 

106 # Retrieve and cache the path and the blocksize for the temporary 

107 # directory if no other thread has done that in the meantime. 

108 with DavResourcePathConfig._lock: 

109 if self._tmpdir_buffersize is None: 

110 tmpdir = get_tempdir() 

111 bufsize = _calc_tmpdir_buffer_size(tmpdir) 

112 self._tmpdir_buffersize = (tmpdir, bufsize) 

113 

114 return self._tmpdir_buffersize 

115 

116 def _destroy(self) -> None: 

117 """Destroy this class singleton instance. 

118 

119 Helper method to be used in tests to reset global configuration. 

120 """ 

121 with DavResourcePathConfig._lock: 

122 DavResourcePathConfig._instance = None 

123 

124 

125class DavGlobals: 

126 """Helper container to encapsulate all the gloal objects needed by this 

127 module. 

128 """ 

129 

130 def __init__(self) -> None: 

131 # Client pool used by all DavResourcePath instances. 

132 # Use Any as type annotation to keep mypy happy. 

133 self._client_pool: Any = None 

134 

135 # Configuration used by all DavResourcePath instances. 

136 self._config: Any = None 

137 

138 # (Re)Initialize the objects above. 

139 self._reset() 

140 

141 def _reset(self) -> None: 

142 """Initialize all the globals. 

143 

144 This method is a helper for reinitializing globals in tests. 

145 """ 

146 # Initialize the singleton instance of the webdav endpoint 

147 # configuration pool. 

148 config_pool: DavConfigPool = DavConfigPool("LSST_RESOURCES_WEBDAV_CONFIG") 

149 

150 # Initialize the singleton instance of the webdav client pool. This is 

151 # a thread-safe singleton shared by all instances of DavResourcePath. 

152 if self._client_pool is not None: 152 ↛ 153line 152 didn't jump to line 153 because the condition on line 152 was never true

153 self._client_pool._destroy() 

154 

155 self._client_pool = DavClientPool(config_pool) 

156 

157 # Initialize the singleton instance of the configuration shared 

158 # all DavResourcePath objects. 

159 if self._config is not None: 159 ↛ 160line 159 didn't jump to line 160 because the condition on line 159 was never true

160 self._config._destroy() 

161 

162 self._config = DavResourcePathConfig() 

163 

164 def client_pool(self) -> DavClientPool: 

165 """Return the pool of reusable webDAV clients.""" 

166 return self._client_pool 

167 

168 def config(self) -> DavResourcePathConfig: 

169 """Return the configuration settings for all `DavResourcePath` 

170 objects. 

171 """ 

172 return self._config 

173 

174 

175# Convenience object to encapsulate all global objects needed by this module. 

176dav_globals: DavGlobals = DavGlobals() 

177 

178 

179class DavResourcePath(ResourcePath): 

180 """WebDAV resource. 

181 

182 Parameters 

183 ---------- 

184 uri : `ResourcePathExpression` 

185 URI to store in object. 

186 root : `str` or `ResourcePath` or `None`, optional 

187 Root for relative URIs. Not used in this constructor. 

188 forceAbsolute : `bool` 

189 Whether to force absolute URI. A WebDAV URI is always absolute. 

190 forceDirectory : `bool` or `None`, optional 

191 Whether this URI represents a directory. 

192 isTemporary : `bool` or `None`, optional 

193 Whether this URI represents a temporary resource. 

194 """ 

195 

196 def __init__( 

197 self, 

198 uri: ResourcePathExpression, 

199 root: str | ResourcePath | None = None, 

200 forceAbsolute: bool = True, 

201 forceDirectory: bool | None = None, 

202 isTemporary: bool | None = None, 

203 ) -> None: 

204 # Build the internal URL we use to talk to the server, which 

205 # uses "http" or "https" as scheme instead of "dav" or "davs". 

206 self._internal_url: str = normalize_url(self.geturl()) 

207 

208 # WebDAV client this path must use to interact with the server. 

209 self._dav_client: DavClient | None = None 

210 

211 # Retrieve the configuration shared by all instances of this class. 

212 self._config: DavResourcePathConfig = dav_globals.config() 

213 

214 log.debug("created instance of DavResourcePath %s [%#x]", self, id(self)) 

215 

216 @classmethod 

217 def _fixupPathUri( 

218 cls, 

219 parsed: urllib.parse.ParseResult, 

220 root: ResourcePath | None = None, 

221 forceAbsolute: bool = False, 

222 forceDirectory: bool | None = None, 

223 ) -> tuple[urllib.parse.ParseResult, bool | None]: 

224 """Correct any issues with the supplied URI. 

225 

226 This function ensures that the path of the URI is normalized. 

227 """ 

228 # Call the superclass' _fixupPathUri. 

229 parsed, dirLike = super()._fixupPathUri(parsed, forceDirectory=forceDirectory) 

230 

231 # Clean the URL's path and ensure dir-like paths end by "/". 

232 path = normalize_path(parsed.path) 

233 if dirLike and path != "/": 

234 path += "/" 

235 

236 return parsed._replace(path=path), dirLike 

237 

238 @property 

239 def _client(self) -> DavClient: 

240 """Return the webDAV client for this resource.""" 

241 # If we already have a client, use it. 

242 if self._dav_client is not None: 

243 return self._dav_client 

244 

245 # Retrieve the client this resource must use to interact with the 

246 # server from the global client pool. 

247 self._dav_client = dav_globals.client_pool().get_client_for_url(self._internal_url) 

248 return self._dav_client 

249 

250 def _stat(self) -> DavFileMetadata: 

251 """Retrieve metadata about this resource.""" 

252 return self._client.stat(self._internal_url) 

253 

254 @override 

255 def mkdir(self) -> None: 

256 """Create the directory resource if it does not already exist.""" 

257 log.debug("mkdir %s [%#x]", self, id(self)) 

258 

259 if not self.isdir(): 

260 raise NotADirectoryError(f"Can not create a directory for file-like URI {self}") 

261 

262 stat = self._stat() 

263 if stat.is_dir: 

264 return 

265 

266 if stat.is_file: 

267 # A file exists at this path. 

268 raise NotADirectoryError( 

269 f"Can not create a directory for {self} because a file already exists at that URL" 

270 ) 

271 

272 # The underlying webDAV client will use the knowledge it has about 

273 # the specific server to create the requested directory 

274 # hierarchy by issueing the minimum possible number of requests. 

275 self._client.mkcol(self._internal_url) 

276 

277 @override 

278 def exists(self) -> bool: 

279 """Check that this resource exists.""" 

280 log.debug("exists %s [%#x]", self, id(self)) 

281 

282 return self._stat().exists 

283 

284 @override 

285 def size(self) -> int: 

286 """Return the size of the remote resource in bytes.""" 

287 log.debug("size %s [%#x]", self, id(self)) 

288 

289 return 0 if self.isdir() else self._client.size(self._internal_url) 

290 

291 @override 

292 def get_info(self) -> ResourceInfo: 

293 """Return lightweight metadata details about this resource.""" 

294 log.debug("get_info %s [%#x]", self, id(self)) 

295 

296 info = self._client.info(self._internal_url) 

297 if info["type"] is None: 

298 raise FileNotFoundError(f"Resource {self} does not exist") 

299 

300 return ResourceInfo( 

301 uri=str(self), 

302 is_file=info["type"] == "file", 

303 size=info["size"], 

304 last_modified=info["last_modified"], 

305 checksums=info["checksums"], 

306 ) 

307 

308 @override 

309 def read(self, size: int = -1) -> bytes: 

310 """Open the resource and return the contents in bytes. 

311 

312 Parameters 

313 ---------- 

314 size : `int`, optional 

315 The number of bytes to read. Negative or omitted indicates that 

316 all data should be read. 

317 """ 

318 log.debug("read %s [%#x] size=%d", self, id(self), size) 

319 

320 # A GET request on a dCache directory returns the contents of the 

321 # directory in HTML, to be visualized with a browser. This means 

322 # that we need to check first that this resource is not a directory. 

323 # 

324 # Since isdir() only checks that the URL of the resource ends in "/" 

325 # without actually asking the server, this check is not robust. 

326 # However, it is a reasonable compromise since it prevents doing 

327 # an additional roundtrip to the server to retrieve this resource's 

328 # metadata. 

329 if self.isdir(): 

330 raise ValueError(f"method read() is not implemented for directory {self}") 

331 

332 if size < 0: 

333 # Read the entire file content 

334 _, data = self._client.read(self._internal_url) 

335 return data 

336 

337 # This is a partial read. Retrieve the file size. 

338 stat = self._stat() 

339 if not stat.is_file: 

340 raise FileNotFoundError(f"No file found at {self}") 

341 

342 if size == 0 or stat.size == 0: 

343 return b"" 

344 

345 # Read the requested chunk of data and release the backend server. 

346 end_range = min(stat.size, size) - 1 

347 _, data = self._client.read_range(self._internal_url, start=0, end=end_range, release_backend=True) 

348 return data 

349 

350 @override 

351 @contextlib.contextmanager 

352 def _as_local( 

353 self, multithreaded: bool = True, tmpdir: ResourcePath | None = None 

354 ) -> Iterator[ResourcePath]: 

355 """Download object and place in temporary directory. 

356 

357 Parameters 

358 ---------- 

359 multithreaded : `bool`, optional 

360 If `True` the transfer will be allowed to attempt to improve 

361 throughput by using parallel download streams. This may of no 

362 effect if the URI scheme does not support parallel streams or 

363 if a global override has been applied. If `False` parallel 

364 streams will be disabled. 

365 tmpdir : `ResourcePath` or `None`, optional 

366 Explicit override of the temporary directory to use for remote 

367 downloads. 

368 

369 Returns 

370 ------- 

371 local_uri : `ResourcePath` 

372 A URI to a local POSIX file corresponding to a local temporary 

373 downloaded copy of the resource. 

374 """ 

375 log.debug("_as_local %s [%#x] tmpdir: %s", self, id(self), tmpdir) 

376 

377 # We need to ensure that this resource is actually a file since 

378 # the response to a GET request on a directory may be implemented in 

379 # several ways, according to RFC 4818. 

380 if self.isdir(): 

381 raise FileNotFoundError(f"{self} is a directory") 

382 

383 if tmpdir is None: 

384 local_dir, buffer_size = self._config.tmpdir_buffersize 

385 tmpdir = ResourcePath(local_dir, forceDirectory=True) 

386 else: 

387 buffer_size = _calc_tmpdir_buffer_size(tmpdir.ospath) 

388 

389 with ResourcePath.temporary_uri(suffix=self.getExtension(), prefix=tmpdir, delete=True) as tmp_uri: 

390 log.debug( 

391 "downloading %s [%#x] to local file %s [buffer_size %d]", 

392 self, 

393 id(self), 

394 tmp_uri.ospath, 

395 buffer_size, 

396 ) 

397 self._client.download(self._internal_url, tmp_uri.ospath, buffer_size) 

398 yield tmp_uri 

399 

400 @override 

401 def write(self, data: BinaryIO | bytes, overwrite: bool = True) -> None: 

402 """Write the supplied bytes to the new resource. 

403 

404 Parameters 

405 ---------- 

406 data : `bytes` 

407 The bytes to write to the resource. The entire contents of the 

408 resource will be replaced. 

409 overwrite : `bool`, optional 

410 If `True` the resource will be overwritten if it exists. Otherwise 

411 the write will fail. 

412 """ 

413 log.debug("write %s [%#x] overwrite=%s", self, id(self), overwrite) 

414 

415 if self.isdir(): 

416 raise ValueError(f"Method write() is not implemented for directory {self}") 

417 

418 if not overwrite and self._stat().is_file: 

419 raise FileExistsError(f"File {self} exists and overwrite has been disabled") 

420 

421 self._client.write(self._internal_url, data) 

422 

423 @override 

424 def remove(self) -> None: 

425 """Remove the resource. 

426 

427 If the resource is a directory, it must be empty otherwise this 

428 method raises. Removing a non-existent file or directory is not 

429 considered an error. 

430 """ 

431 log.debug("remove %s [%#x]", self, id(self)) 

432 

433 stat = self._stat() 

434 if not stat.exists: 

435 # There is no resource at this uri. There is nothing to do. 

436 return 

437 

438 if stat.is_dir: 

439 entries = self._client.read_dir(self._internal_url) 

440 if len(entries) > 0: 

441 raise IsADirectoryError(f"Directory {self} is not empty") 

442 

443 # This resource is a either file or an empty directory, we can remove 

444 # it. 

445 self._client.delete(self._internal_url) 

446 

447 def remove_dir(self, recursive: bool = False) -> None: 

448 """Remove a directory if empty. 

449 

450 Parameters 

451 ---------- 

452 recursive : `bool` 

453 If `True` recursively remove all files and directories under this 

454 directory. 

455 

456 Notes 

457 ----- 

458 This method is not present in the superclass. 

459 """ 

460 log.debug("remove_dir %s [%#x] recursive=%s", self, id(self), recursive) 

461 

462 if not self.isdir(): 

463 raise NotADirectoryError(f"{self} is not a directory") 

464 

465 for root, subdirs, files in self.walk(): 

466 if not recursive and (len(subdirs) > 0 or len(files) > 0): 

467 raise IsADirectoryError(f"Directory at {self} is not empty and recursive argument is False") 

468 

469 for file in files: 

470 root.join(file).remove() 

471 

472 for subdir in subdirs: 

473 DavResourcePath(root.join(subdir, forceDirectory=True)).remove_dir(recursive=recursive) 

474 

475 # Remove empty top directory 

476 self.remove() 

477 

478 @override 

479 def transfer_from( 

480 self, 

481 src: ResourcePath, 

482 transfer: str = "copy", 

483 overwrite: bool = False, 

484 transaction: TransactionProtocol | None = None, 

485 multithreaded: bool = True, 

486 ) -> None: 

487 """Transfer to this URI from another. 

488 

489 Parameters 

490 ---------- 

491 src : `ResourcePath` 

492 Source URI. 

493 transfer : `str` 

494 Mode to use for transferring the resource. Generically there are 

495 many standard options: copy, link, symlink, hardlink, relsymlink. 

496 Not all URIs support all modes. 

497 overwrite : `bool`, optional 

498 Allow an existing file to be overwritten. Defaults to `False`. 

499 transaction : `~lsst.resources.utils.TransactionProtocol`, optional 

500 A transaction object that can (depending on implementation) 

501 rollback transfers on error. Not guaranteed to be implemented. 

502 multithreaded : `bool`, optional 

503 If `True` the transfer will be allowed to attempt to improve 

504 throughput by using parallel download streams. This may of no 

505 effect if the URI scheme does not support parallel streams or 

506 if a global override has been applied. If `False` parallel 

507 streams will be disabled. 

508 """ 

509 log.debug( 

510 "transfer_from %s [%#x] src=%s transfer=%s overwrite=%s", 

511 self, 

512 id(self), 

513 src, 

514 transfer, 

515 overwrite, 

516 ) 

517 

518 # Fail early to prevent delays if remote resources are requested. 

519 if transfer not in self.transferModes: 

520 raise ValueError(f"Transfer mode {transfer} not supported by URI scheme {self.scheme}") 

521 

522 # Existence checks cost time so do not call this unless we know 

523 # that debugging is enabled. 

524 destination_exists = None 

525 if log.isEnabledFor(logging.DEBUG): 

526 destination_exists = self.exists() 

527 log.debug( 

528 "Transferring %s [exists: %s] -> %s [exists: %s] (transfer=%s)", 

529 src.geturl(), 

530 src.exists(), 

531 self, 

532 destination_exists, 

533 transfer, 

534 ) 

535 

536 # Short circuit immediately if the URIs are identical. 

537 if self == src: 

538 log.debug( 

539 "Target and destination URIs are identical: %s, returning immediately." 

540 " No further action required.", 

541 self, 

542 ) 

543 return 

544 

545 if not overwrite: 

546 if destination_exists is None: 

547 destination_exists = self.exists() 

548 

549 if destination_exists: 

550 raise FileExistsError(f"Destination path {self} already exists.") 

551 

552 if transfer == "auto": 

553 transfer = self.transferDefault 

554 

555 # We can use webDAV 'COPY' or 'MOVE' if both the current and source 

556 # resources are located in the same server. 

557 if isinstance(src, type(self)) and self.root_uri() == src.root_uri(): 

558 log.debug("Transfer from %s to %s [%#x] directly", src, self, id(self)) 

559 return ( 

560 self._move_from(src, overwrite=overwrite) 

561 if transfer == "move" 

562 else self._copy_from(src, overwrite=overwrite) 

563 ) 

564 

565 # For resources of different classes we can perform the copy or move 

566 # operation by downloading to a local file and uploading to the 

567 # destination. 

568 self._copy_via_local(src) 

569 

570 # This was an explicit move, try to remove the source. 

571 if transfer == "move": 

572 src.remove() 

573 

574 def _copy_via_local(self, source: ResourcePath) -> None: 

575 """Replace the contents of this resource with the contents of a remote 

576 resource by using a local temporary file. 

577 

578 Parameters 

579 ---------- 

580 source : `ResourcePath` 

581 The source of the contents to copy to `self`. 

582 """ 

583 with source.as_local() as local_uri: 

584 log.debug( 

585 "Transfer from %s to %s [%#x] via local file %s", 

586 source.geturl(), 

587 self, 

588 id(self), 

589 local_uri, 

590 ) 

591 with open(local_uri.ospath, "rb") as f: 

592 self.write(data=f) 

593 

594 def _copy_from(self, source: DavResourcePath, overwrite: bool = False) -> None: 

595 """Copy the contents of `source` to this resource. `source` must 

596 be a file. 

597 """ 

598 log.debug("_copy_from %s [%#x] source=%s overwrite=%s", self, id(self), source, overwrite) 

599 

600 # Copy is only supported for files, not directories. 

601 if self.isdir(): 

602 raise ValueError(f"Copy is not supported because destination {self} is a directory") 

603 

604 if source.isdir(): 

605 raise ValueError(f"Copy is not supported for directory {source}") 

606 

607 if not source.exists(): 

608 raise FileNotFoundError(f"No file found at {source}") 

609 

610 # If the server supports file duplication, use that method. 

611 if self._client.supports_duplicate: 

612 return self._client.duplicate(source._internal_url, self._internal_url, overwrite) 

613 

614 # Make this copy via a local file 

615 if not overwrite and self.exists(): 

616 raise FileExistsError(f"Destination path {self} already exists.") 

617 

618 self._copy_via_local(source) 

619 

620 def _move_from(self, source: DavResourcePath, overwrite: bool = False) -> None: 

621 """Send a MOVE webDAV request to replace the contents of this resource 

622 with the contents of another resource located in the same server. 

623 

624 Parameters 

625 ---------- 

626 source : `DavResourcePath` 

627 The source of the contents to move to `self`. 

628 """ 

629 log.debug("_move_from %s [%#x] source=%s overwrite=%s", self, id(self), source, overwrite) 

630 

631 # Move is only supported for files, not directories. 

632 if self.isdir(): 

633 raise ValueError(f"Move is not supported for destination directory {self}") 

634 

635 if source.isdir(): 

636 raise ValueError(f"Move is not supported for directory {source}") 

637 

638 if not source.exists(): 

639 raise FileNotFoundError(f"No file found at {source}") 

640 

641 self._client.rename(source._internal_url, self._internal_url, overwrite) 

642 

643 @override 

644 def walk( 

645 self, file_filter: str | re.Pattern | None = None 

646 ) -> Iterator[list | tuple[ResourcePath, list[str], list[str]]]: 

647 """Walk the directory tree returning matching files and directories. 

648 

649 Parameters 

650 ---------- 

651 file_filter : `str` or `re.Pattern`, optional 

652 Regex to filter out files from the list before it is returned. 

653 

654 Yields 

655 ------ 

656 dirpath : `ResourcePath` 

657 Current directory being examined. 

658 dirnames : `list` of `str` 

659 Names of subdirectories within dirpath. 

660 filenames : `list` of `str` 

661 Names of all the files within dirpath. 

662 """ 

663 if not self.isdir(): 

664 raise ValueError(f"Can not walk non-directory URI {self}") 

665 

666 # We must return no entries for non-existent directories. 

667 if not self.exists(): 

668 return 

669 

670 # Retrieve the entries in this directory 

671 entries = self._client.read_dir(self._internal_url) 

672 files = [e.name for e in entries if e.is_file] 

673 subdirs = [e.name for e in entries if e.is_dir] 

674 

675 # Filter files 

676 if isinstance(file_filter, str): 

677 file_filter = re.compile(file_filter) 

678 

679 if file_filter is not None: 

680 files = [f for f in files if file_filter.search(f)] 

681 

682 if not subdirs and not files: 

683 return 

684 else: 

685 yield type(self)(self, forceAbsolute=False, forceDirectory=True), subdirs, files 

686 

687 for subdir in subdirs: 

688 new_uri = self.join(subdir, forceDirectory=True) 

689 yield from new_uri.walk(file_filter) 

690 

691 @override 

692 def generate_presigned_get_url(self, *, expiration_time_seconds: int) -> str: 

693 """Return a pre-signed URL that can be used to retrieve this resource 

694 using an HTTP GET without supplying any access credentials. 

695 

696 Parameters 

697 ---------- 

698 expiration_time_seconds : `int` 

699 Number of seconds until the generated URL is no longer valid. 

700 

701 Returns 

702 ------- 

703 url : `str` 

704 HTTP URL signed for GET. 

705 """ 

706 return self._client.generate_presigned_get_url(self._internal_url, expiration_time_seconds) 

707 

708 @override 

709 def generate_presigned_put_url(self, *, expiration_time_seconds: int) -> str: 

710 """Return a pre-signed URL that can be used to upload a file to this 

711 path using an HTTP PUT without supplying any access credentials. 

712 

713 Parameters 

714 ---------- 

715 expiration_time_seconds : `int` 

716 Number of seconds until the generated URL is no longer valid. 

717 

718 Returns 

719 ------- 

720 url : `str` 

721 HTTP URL signed for PUT. 

722 """ 

723 return self._client.generate_presigned_put_url(self._internal_url, expiration_time_seconds) 

724 

725 @override 

726 def to_fsspec(self) -> tuple[DavFileSystem, str]: 

727 """Return an abstract file system and path that can be used by fsspec. 

728 

729 Returns 

730 ------- 

731 fs : `fsspec.spec.AbstractFileSystem` 

732 A file system object suitable for use with the returned path. 

733 path : `str` 

734 A path that can be opened by the file system object. 

735 """ 

736 if fsspec is None or not self._client._config.enable_fsspec: 

737 raise ImportError("fsspec is not available") 

738 

739 log.debug("DavResourcePath.to_fsspec: %s", self) 

740 fsys = DavFileSystem(self) 

741 return fsys, fsys._path 

742 

743 @override 

744 @contextlib.contextmanager 

745 def _openImpl( 

746 self, 

747 mode: str = "r", 

748 *, 

749 encoding: str | None = None, 

750 ) -> Iterator[ResourceHandleProtocol]: 

751 log.debug("DavResourcePath._openImpl: %s mode: %s", self, mode) 

752 

753 if mode in ("rb", "r") and self._client.accepts_ranges(self._internal_url): 

754 stat = self._stat() 

755 if stat.is_dir: 

756 raise OSError(f"open is not implemented for directory {self}") 

757 

758 if not stat.is_file: 

759 raise FileNotFoundError(f"No such file {self}") 

760 

761 with DavReadResourceHandle(mode, log.logger, uri=self, file_size=stat.size) as handle: 

762 if mode == "r": 

763 # cast because the protocol is compatible, but does not 

764 # have BytesIO in the inheritance tree 

765 yield io.TextIOWrapper(cast(Any, handle), encoding=encoding) 

766 else: 

767 yield handle 

768 else: 

769 with super()._openImpl(mode, encoding=encoding) as handle: 

770 yield handle 

771 

772 

773class DavFileSystem(AbstractFileSystem): 

774 """Minimal fsspec-compatible read-only file system which contains a single 

775 file. 

776 

777 Parameters 

778 ---------- 

779 uri : `DavResourcePath` 

780 URI of the single resource contained in the file system. 

781 """ 

782 

783 protocol = ("davs", "dav") 

784 

785 def __init__(self, uri: DavResourcePath): 

786 super().__init__() 

787 self._uri: DavResourcePath = uri 

788 self._path: str = self._uri.geturl() 

789 self._size: int | None = None 

790 

791 @override 

792 def info(self, path: str, **kwargs: Any) -> dict[str, Any]: 

793 log.debug("DavFileSystem.info %s", path) 

794 if path != self._path: 

795 raise FileNotFoundError(path) 

796 

797 return { 

798 "name": path, 

799 "size": self.size(self._path), 

800 "type": "file", 

801 } 

802 

803 @override 

804 def ls(self, path: str, detail: bool = True, **kwargs: Any) -> list[str] | list[dict[str, str]]: 

805 log.debug("DavFileSystem.ls %s", path) 

806 if path != self._path: 

807 raise FileNotFoundError(path) 

808 

809 return list(self.info(path)) if detail else list(path) 

810 

811 @override 

812 def modified(self, path: str) -> datetime.datetime: 

813 log.debug("DavFileSystem.modified %s", path) 

814 if path != self._path: 

815 raise FileNotFoundError(path) 

816 

817 return self._uri._stat().last_modified 

818 

819 @override 

820 def size(self, path: str) -> int: 

821 log.debug("DavFileSystem.size %s", path) 

822 if path != self._path: 

823 raise FileNotFoundError(path) 

824 

825 if self._size is None: 

826 self._size = self._uri.size() 

827 

828 return self._size 

829 

830 @override 

831 def isfile(self, path: str) -> bool: 

832 log.debug("DavFileSystem.isfile %s", path) 

833 return path == self._path 

834 

835 @override 

836 def isdir(self, path: str) -> bool: 

837 log.debug("DavFileSystem.isdir %s", path) 

838 return False 

839 

840 @override 

841 def exists(self, path: str, **kwargs: Any) -> bool: 

842 log.debug("DavFileSystem.exists %s", path) 

843 return path == self._path 

844 

845 @override 

846 def open( 

847 self, 

848 path: str, 

849 mode: str = "rb", 

850 encoding: str | None = None, 

851 block_size: int | None = None, 

852 cache_options: dict[Any, Any] | None = None, 

853 compression: str | None = None, 

854 **kwargs: Any, 

855 ) -> DavReadResourceHandle | io.TextIOWrapper: 

856 log.debug( 

857 "DavFileSystem.open path: %s mode: %s encoding: %s blocksize: %s", 

858 path, 

859 mode, 

860 encoding, 

861 block_size, 

862 ) 

863 if path != self._path: 

864 raise FileNotFoundError(f"File {path} does not exist") 

865 

866 if mode not in ("rb", "r"): 

867 raise OSError(f"Opening {path} for writing is not supported") 

868 

869 handle = DavReadResourceHandle(mode, log.logger, self._uri, self.size(self._path)) 

870 if mode == "rb": 

871 return handle 

872 else: 

873 return io.TextIOWrapper(cast(Any, handle), encoding=encoding) 

874 

875 @property 

876 def fsid(self) -> Any: 

877 return "davs" 

878 

879 @override 

880 def mkdir(self, path: str, create_parents: bool = True, **kwargs: Any) -> None: 

881 raise NotImplementedError 

882 

883 @override 

884 def makedirs(self, path: str, exist_ok: bool = False) -> None: 

885 raise NotImplementedError 

886 

887 @override 

888 def rmdir(self, path: str) -> None: 

889 raise NotImplementedError 

890 

891 @override 

892 def walk( 

893 self, 

894 path: str, 

895 maxdepth: int | None = None, 

896 topdown: bool = True, 

897 on_error: str = "omit", 

898 **kwargs: Any, 

899 ) -> None: 

900 raise NotImplementedError 

901 

902 @override 

903 def find( 

904 self, 

905 path: str, 

906 maxdepth: int | None = None, 

907 withdirs: bool = False, 

908 detail: bool = False, 

909 **kwargs: Any, 

910 ) -> None: 

911 raise NotImplementedError 

912 

913 @override 

914 def du( 

915 self, 

916 path: str, 

917 total: bool = True, 

918 maxdepth: int | None = None, 

919 withdirs: bool = False, 

920 **kwargs: Any, 

921 ) -> None: 

922 raise NotImplementedError 

923 

924 @override 

925 def glob(self, path: str, maxdepth: int | None = None, **kwargs: Any) -> None: 

926 raise NotImplementedError 

927 

928 @override 

929 def rm_file(self, path: str) -> None: 

930 raise NotImplementedError 

931 

932 @override 

933 def rm(self, path: str, recursive: bool = False, maxdepth: int | None = None) -> None: 

934 raise NotImplementedError 

935 

936 @override 

937 def touch(self, path: str, truncate: bool = True, **kwargs: Any) -> None: 

938 raise NotImplementedError 

939 

940 @override 

941 def ukey(self, path: str) -> None: 

942 raise NotImplementedError 

943 

944 @override 

945 def created(self, path: str) -> None: 

946 raise NotImplementedError