Coverage for python / lsst / resources / tests.py: 8%

701 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:32 +0000

1# This file is part of lsst-resources. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11from __future__ import annotations 

12 

13__all__ = ["GenericReadWriteTestCase", "GenericTestCase"] 

14 

15import datetime 

16import logging 

17import os 

18import pathlib 

19import random 

20import string 

21import sys 

22import tempfile 

23import unittest 

24import urllib.parse 

25import uuid 

26from collections.abc import Iterable 

27from typing import TYPE_CHECKING, Any 

28 

29try: 

30 import fsspec 

31except ImportError: 

32 fsspec = None 

33 

34from lsst.resources import ResourcePath 

35from lsst.resources.utils import makeTestTempDir, removeTestTempDir 

36 

37TESTDIR = os.path.abspath(os.path.dirname(__file__)) 

38 

39 

40def _check_open( 

41 test_case: _GenericTestCase | unittest.TestCase, 

42 uri: ResourcePath, 

43 *, 

44 mode_suffixes: Iterable[str] = ("", "t", "b"), 

45 **kwargs: Any, 

46) -> None: 

47 """Test an implementation of ButlerURI.open. 

48 

49 Parameters 

50 ---------- 

51 test_case : `unittest.TestCase` 

52 Test case to use for assertions. 

53 uri : `ResourcePath` 

54 URI to use for tests. Must point to a writeable location that is not 

55 yet occupied by a file. On return, the location may point to a file 

56 only if the test fails. 

57 mode_suffixes : `~collections.abc.Iterable` of `str` 

58 Suffixes to pass as part of the ``mode`` argument to 

59 `ResourcePath.open`, indicating whether to open as binary or as text; 

60 the only permitted elements are ``""``, ``"t"``, and ``"b"`. 

61 **kwargs 

62 Additional keyword arguments to forward to all calls to `open`. 

63 """ 

64 text_content = "abcdefghijklmnopqrstuvwxyz🙂" 

65 bytes_content = uuid.uuid4().bytes 

66 content_by_mode_suffix: dict[str, str | bytes] = { 

67 "": text_content, 

68 "t": text_content, 

69 "b": bytes_content, 

70 } 

71 empty_content_by_mode_suffix: dict[str, str | bytes] = { 

72 "": "", 

73 "t": "", 

74 "b": b"", 

75 } 

76 # To appease mypy 

77 double_content_by_mode_suffix: dict[str, str | bytes] = { 

78 "": text_content + text_content, 

79 "t": text_content + text_content, 

80 "b": bytes_content + bytes_content, 

81 } 

82 for mode_suffix in mode_suffixes: 

83 content = content_by_mode_suffix[mode_suffix] 

84 double_content = double_content_by_mode_suffix[mode_suffix] 

85 # Create file with mode='x', which prohibits overwriting. 

86 with uri.open("x" + mode_suffix, **kwargs) as write_buffer: 

87 write_buffer.write(content) 

88 test_case.assertTrue(uri.exists()) 

89 # Check that opening with 'x' now raises, and does not modify content. 

90 with test_case.assertRaises(FileExistsError): 

91 with uri.open("x" + mode_suffix, **kwargs) as write_buffer: 

92 write_buffer.write("bad") 

93 # Read the file we created and check the contents. 

94 with uri.open("r" + mode_suffix, **kwargs) as read_buffer: 

95 test_case.assertEqual(read_buffer.read(), content) 

96 # The names will not match if a local temporary is being written. 

97 if not kwargs.get("prefer_file_temporary"): 

98 test_case.assertIn(uri.basename(), read_buffer.name) 

99 # Check that we can read bytes in a loop and get EOF 

100 with uri.open("r" + mode_suffix, **kwargs) as read_buffer: 

101 # Seek off the end of the file and should read empty back. 

102 read_buffer.seek(1024) 

103 test_case.assertEqual(read_buffer.tell(), 1024) 

104 content_read = read_buffer.read() # Read as much as we can. 

105 test_case.assertEqual(len(content_read), 0, f"Read: {content_read!r}, expected empty.") 

106 

107 # First read more than the content. 

108 read_buffer.seek(0) 

109 size = len(content) * 3 

110 chunk_read = read_buffer.read(size) 

111 test_case.assertEqual(chunk_read, content) 

112 

113 # Repeated reads should always return empty string. 

114 chunk_read = read_buffer.read(size) 

115 test_case.assertEqual(len(chunk_read), 0) 

116 chunk_read = read_buffer.read(size) 

117 test_case.assertEqual(len(chunk_read), 0) 

118 

119 # Go back to start of file and read in smaller chunks. 

120 read_buffer.seek(0) 

121 size = len(content) // 3 

122 

123 content_read = empty_content_by_mode_suffix[mode_suffix] 

124 n_reads = 0 

125 while chunk_read := read_buffer.read(size): 

126 content_read += chunk_read 

127 n_reads += 1 

128 if n_reads > 10: # In case EOF never hits because of bug. 

129 raise AssertionError( 

130 f"Failed to stop reading from file after {n_reads} loops. " 

131 f"Read {len(content_read)} bytes/characters. Expected {len(content)}." 

132 ) 

133 test_case.assertEqual(content_read, content) 

134 

135 # Go back to start of file and read the entire thing. 

136 read_buffer.seek(0) 

137 content_read = read_buffer.read() 

138 test_case.assertEqual(content_read, content) 

139 

140 # Seek off the end of the file and should read empty back. 

141 # We run this check twice since in some cases the handle will 

142 # cache knowledge of the file size. 

143 read_buffer.seek(1024) 

144 test_case.assertEqual(read_buffer.tell(), 1024) 

145 content_read = read_buffer.read() 

146 test_case.assertEqual(len(content_read), 0, f"Read: {content_read!r}, expected empty.") 

147 

148 # Write multiple chunks with flushing to ensure that any handles that 

149 # cache without flushing work properly. 

150 n = 3 

151 with uri.open("w" + mode_suffix, **kwargs) as write_buffer: 

152 for _ in range(n): 

153 write_buffer.write(content) 

154 write_buffer.flush() 

155 with uri.open("r" + mode_suffix, **kwargs) as read_buffer: 

156 test_case.assertEqual(read_buffer.read(), content * n) 

157 

158 # Write two copies of the content, overwriting the single copy there. 

159 with uri.open("w" + mode_suffix, **kwargs) as write_buffer: 

160 write_buffer.write(double_content) 

161 # Read again, this time use mode='r+', which reads what is there and 

162 # then lets us write more; we'll use that to reset the file to one 

163 # copy of the content. 

164 with uri.open("r+" + mode_suffix, **kwargs) as rw_buffer: 

165 test_case.assertEqual(rw_buffer.read(), double_content) 

166 rw_buffer.seek(0) 

167 rw_buffer.truncate() 

168 rw_buffer.write(content) 

169 rw_buffer.seek(0) 

170 test_case.assertEqual(rw_buffer.read(), content) 

171 with uri.open("r" + mode_suffix, **kwargs) as read_buffer: 

172 test_case.assertEqual(read_buffer.read(), content) 

173 # Append some more content to the file; should now have two copies. 

174 with uri.open("a" + mode_suffix, **kwargs) as append_buffer: 

175 append_buffer.write(content) 

176 with uri.open("r" + mode_suffix, **kwargs) as read_buffer: 

177 test_case.assertEqual(read_buffer.read(), double_content) 

178 # Final mode to check is w+, which does read/write but truncates first. 

179 with uri.open("w+" + mode_suffix, **kwargs) as rw_buffer: 

180 test_case.assertEqual(rw_buffer.read(), empty_content_by_mode_suffix[mode_suffix]) 

181 rw_buffer.write(content) 

182 rw_buffer.seek(0) 

183 test_case.assertEqual(rw_buffer.read(), content) 

184 # Check that two seeks with reads to end return correctly. 

185 # Seek is only reliable with "b" mode. 

186 if mode_suffix == "b": 

187 with uri.open("r" + mode_suffix, **kwargs) as read_buffer: 

188 size = len(content) 

189 seek1 = 2 * size // 3 

190 read_buffer.seek(seek1) 

191 content1 = read_buffer.read() 

192 test_case.assertEqual(len(content1), size - seek1) 

193 # Seek earlier and then read to end. 

194 seek2 = size // 2 

195 read_buffer.seek(seek2) 

196 content2 = read_buffer.read() 

197 test_case.assertEqual(len(content2), size - seek2) 

198 # Check that we can seek from end and read and seek and read. 

199 # Negative seek only works in binary mode. 

200 with uri.open("rb", **kwargs) as read_buffer: 

201 read_buffer.seek(-5, 2) # Relative to end 

202 content_read = read_buffer.read() 

203 test_case.assertEqual(len(content_read), 5) 

204 read_buffer.seek(-10, 2) # Relative to end 

205 content_read = read_buffer.read() 

206 test_case.assertEqual(len(content_read), 10) 

207 with uri.open("r" + mode_suffix, **kwargs) as read_buffer: 

208 test_case.assertEqual(read_buffer.read(), content) 

209 # Remove file to make room for the next loop of tests with this URI. 

210 uri.remove() 

211 

212 

213if TYPE_CHECKING: 

214 

215 class TestCaseMixin(unittest.TestCase): 

216 """Base class for mixin test classes that use TestCase methods.""" 

217 

218 pass 

219 

220else: 

221 

222 class TestCaseMixin: 

223 """Do-nothing definition of mixin base class for regular execution.""" 

224 

225 pass 

226 

227 

228class _GenericTestCase(TestCaseMixin): 

229 """Generic base class for test mixin.""" 

230 

231 scheme: str | None = None 

232 netloc: str | None = None 

233 base_path: str | None = None 

234 path1 = "test_dir" 

235 path2 = "file.txt" 

236 

237 def _make_uri(self, path: str, netloc: str | None = None) -> str: 

238 if self.scheme is not None: 

239 if netloc is None: 

240 netloc = self.netloc 

241 if path.startswith("/"): 

242 path = path[1:] 

243 if self.base_path is not None: 

244 path = f"{self.base_path}/{path}".lstrip("/") 

245 

246 return f"{self.scheme}://{netloc}/{path}" 

247 else: 

248 return path 

249 

250 

251class GenericTestCase(_GenericTestCase): 

252 """Test cases for generic manipulation of a `ResourcePath`.""" 

253 

254 def setUp(self) -> None: 

255 if self.scheme is None: 

256 raise unittest.SkipTest("No scheme defined") 

257 self.root = self._make_uri("") 

258 self.root_uri = ResourcePath(self.root, forceDirectory=True, forceAbsolute=False) 

259 

260 def test_creation(self) -> None: 

261 self.assertEqual(self.root_uri.scheme, self.scheme) 

262 self.assertEqual(self.root_uri.netloc, self.netloc) 

263 self.assertFalse(self.root_uri.query) 

264 self.assertFalse(self.root_uri.params) 

265 

266 with self.assertRaises(ValueError): 

267 ResourcePath({}) # type: ignore 

268 

269 with self.assertRaises(RuntimeError): 

270 ResourcePath(self.root_uri, isTemporary=True) 

271 

272 file = self.root_uri.join("file.txt", forceDirectory=False) 

273 with self.assertRaises(RuntimeError): 

274 ResourcePath(file, forceDirectory=True) 

275 

276 file = self.root_uri.join("file.txt") 

277 file_as_dir = ResourcePath(file, forceDirectory=True) 

278 self.assertTrue(file_as_dir.isdir()) 

279 

280 dir = self._make_uri("a/b/c/") 

281 with self.assertRaises(ValueError): 

282 ResourcePath(dir, forceDirectory=False) 

283 

284 with self.assertRaises(NotImplementedError): 

285 ResourcePath("unknown://netloc") 

286 

287 replaced = file.replace(fragment="frag") 

288 self.assertEqual(replaced.fragment, "frag") 

289 

290 with self.assertRaises(ValueError): 

291 file.replace(scheme="new") 

292 

293 self.assertNotEqual(replaced, str(replaced)) 

294 self.assertNotEqual(str(replaced), replaced) 

295 

296 def test_extension(self) -> None: 

297 uri = ResourcePath(self._make_uri("dir/test.txt")) 

298 self.assertEqual(uri.updatedExtension(None), uri) 

299 self.assertEqual(uri.updatedExtension(".txt"), uri) 

300 self.assertEqual(id(uri.updatedExtension(".txt")), id(uri)) 

301 

302 fits = uri.updatedExtension(".fits.gz") 

303 self.assertEqual(fits.basename(), "test.fits.gz") 

304 self.assertEqual(fits.updatedExtension(".jpeg").basename(), "test.jpeg") 

305 

306 extensionless = self.root_uri.join("no_ext") 

307 self.assertEqual(extensionless.getExtension(), "") 

308 extension = extensionless.updatedExtension(".fits") 

309 self.assertEqual(extension.getExtension(), ".fits") 

310 

311 uri = ResourcePath("test.txt", forceAbsolute=False) 

312 self.assertEqual(uri.getExtension(), ".txt") 

313 uri = ResourcePath(self._make_uri("dir.1/dir.2/test.txt"), forceDirectory=False) 

314 self.assertEqual(uri.getExtension(), ".txt") 

315 uri = ResourcePath(self._make_uri("dir.1/dir.2/"), forceDirectory=True) 

316 self.assertEqual(uri.getExtension(), ".2") 

317 uri = ResourcePath(self._make_uri("dir.1/dir/"), forceDirectory=True) 

318 self.assertEqual(uri.getExtension(), "") 

319 

320 def test_relative(self) -> None: 

321 """Check that we can get subpaths back from two URIs.""" 

322 parent = ResourcePath(self._make_uri(self.path1), forceDirectory=True) 

323 self.assertTrue(parent.isdir()) 

324 child = parent.join("dir1/file.txt") 

325 

326 self.assertEqual(child.relative_to(parent), "dir1/file.txt") 

327 

328 not_child = ResourcePath("/a/b/dir1/file.txt") 

329 self.assertIsNone(not_child.relative_to(parent)) 

330 self.assertFalse(not_child.isdir()) 

331 

332 not_directory = parent.join("dir1/file2.txt") 

333 self.assertIsNone(child.relative_to(not_directory)) 

334 

335 # Relative URIs 

336 parent = ResourcePath("a/b/", forceAbsolute=False) 

337 child = ResourcePath("a/b/c/d.txt", forceAbsolute=False) 

338 self.assertFalse(child.scheme) 

339 self.assertEqual(child.relative_to(parent), "c/d.txt") 

340 

341 # forceAbsolute=True should work even on an existing ResourcePath 

342 self.assertTrue(pathlib.Path(ResourcePath(child, forceAbsolute=True).ospath).is_absolute()) 

343 

344 # Absolute URI and schemeless URI 

345 parent = self.root_uri.join("/a/b/c/") 

346 child = ResourcePath("e/f/g.txt", forceAbsolute=False) 

347 

348 # If the child is relative and the parent is absolute we assume 

349 # that the child is a child of the parent unless it uses ".." 

350 self.assertEqual(child.relative_to(parent), "e/f/g.txt", f"{child}.relative_to({parent})") 

351 

352 child = ResourcePath("../e/f/g.txt", forceAbsolute=False) 

353 self.assertIsNone(child.relative_to(parent)) 

354 

355 child = ResourcePath("../c/e/f/g.txt", forceAbsolute=False) 

356 self.assertEqual(child.relative_to(parent), "e/f/g.txt") 

357 

358 # Test with different netloc 

359 child = ResourcePath(self._make_uri("a/b/c.txt", netloc="my.host")) 

360 parent = ResourcePath(self._make_uri("a", netloc="other"), forceDirectory=True) 

361 self.assertIsNone(child.relative_to(parent), f"{child}.relative_to({parent})") 

362 

363 # This is an absolute path so will *always* return a file URI and 

364 # ignore the root parameter. 

365 parent = ResourcePath("/a/b/c", root=self.root_uri, forceDirectory=True) 

366 self.assertEqual(parent.geturl(), "file:///a/b/c/") 

367 

368 parent = ResourcePath(self._make_uri("/a/b/c"), forceDirectory=True) 

369 child = ResourcePath("d/e.txt", root=parent) 

370 self.assertEqual(child.relative_to(parent), "d/e.txt", f"{child}.relative_to({parent})") 

371 

372 parent = ResourcePath("c/", root=ResourcePath(self._make_uri("/a/b/"))) 

373 self.assertEqual(child.relative_to(parent), "d/e.txt", f"{child}.relative_to({parent})") 

374 

375 # Absolute schemeless child with relative parent will always fail. 

376 child = ResourcePath("d/e.txt", root="/a/b/c") 

377 parent = ResourcePath("d/e.txt", forceAbsolute=False) 

378 self.assertIsNone(child.relative_to(parent), f"{child}.relative_to({parent})") 

379 

380 # Allow .. in response. 

381 child = ResourcePath(self._make_uri("a/b/c/d.txt"), forceAbsolute=False) 

382 parent = ResourcePath(self._make_uri("a/b/d/e/"), forceAbsolute=False) 

383 self.assertIsNone(child.relative_to(parent), f"{child}.relative_to({parent})") 

384 

385 if sys.version_info >= (3, 12, 0): 

386 # Fails on python 3.11. 

387 self.assertEqual( 

388 child.relative_to(parent, walk_up=True), 

389 "../../c/d.txt", 

390 f"{child}.relative_to({parent}, walk_up=True)", 

391 ) 

392 

393 def test_parents(self) -> None: 

394 """Test of splitting and parent walking.""" 

395 parent = ResourcePath(self._make_uri("somedir"), forceDirectory=True) 

396 child_file = parent.join("subdir/file.txt") 

397 self.assertFalse(child_file.isdir()) 

398 child_subdir, file = child_file.split() 

399 self.assertEqual(file, "file.txt") 

400 self.assertTrue(child_subdir.isdir()) 

401 self.assertEqual(child_file.dirname(), child_subdir) 

402 self.assertEqual(child_file.basename(), file) 

403 self.assertEqual(child_file.parent(), child_subdir) 

404 derived_parent = child_subdir.parent() 

405 self.assertEqual(derived_parent, parent) 

406 self.assertTrue(derived_parent.isdir()) 

407 self.assertEqual(child_file.parent().parent(), parent) 

408 self.assertEqual(child_subdir.dirname(), child_subdir) 

409 

410 # Make sure that the parent doesn't retain any fragment from the 

411 # child. 

412 child_fragment = child_subdir.join("a.txt#fragment") 

413 self.assertEqual(child_fragment.fragment, "fragment") 

414 fragment_parent = child_fragment.parent() 

415 self.assertEqual(fragment_parent.fragment, "") 

416 self.assertTrue(str(fragment_parent).endswith("/")) 

417 

418 def test_escapes(self) -> None: 

419 """Special characters in file paths.""" 

420 src = self.root_uri.join("bbb/???/test.txt") 

421 quotes = src.quotePaths 

422 

423 if quotes: 

424 self.assertNotIn("???", src.path) 

425 else: 

426 self.assertIn("???", src.path) 

427 self.assertIn("???", src.unquoted_path) 

428 

429 file = src.updatedFile("tests??.txt") 

430 if quotes: 

431 self.assertNotIn("??.txt", file.path) 

432 else: 

433 self.assertIn("??.txt", file.path) 

434 

435 src = src.updatedFile("tests??.txt") 

436 self.assertIn("??.txt", src.unquoted_path) 

437 

438 # File URI and schemeless URI 

439 parent = ResourcePath(self._make_uri(urllib.parse.quote("/a/b/c/de/??/"))) 

440 child = ResourcePath("e/f/g.txt", forceAbsolute=False) 

441 self.assertEqual(child.relative_to(parent), "e/f/g.txt") 

442 

443 child = ResourcePath("e/f??#/g.txt", forceAbsolute=False) 

444 self.assertEqual(child.relative_to(parent), "e/f??#/g.txt") 

445 

446 child = ResourcePath(self._make_uri(urllib.parse.quote("/a/b/c/de/??/e/f??#/g.txt"))) 

447 self.assertEqual(child.relative_to(parent), "e/f??#/g.txt") 

448 

449 self.assertEqual(child.relativeToPathRoot, "a/b/c/de/??/e/f??#/g.txt") 

450 

451 # dir.join() morphs into a file scheme 

452 dir = ResourcePath(self._make_uri(urllib.parse.quote("bbb/???/"))) 

453 new = dir.join("test_j.txt") 

454 self.assertIn("???", new.unquoted_path, f"Checking {new}") 

455 

456 new2name = "###/test??.txt" 

457 new2 = dir.join(new2name) 

458 self.assertIn("???", new2.unquoted_path) 

459 self.assertTrue(new2.unquoted_path.endswith(new2name)) 

460 

461 fdir = dir.abspath() 

462 self.assertNotIn("???", fdir.path) 

463 self.assertIn("???", fdir.unquoted_path) 

464 self.assertEqual(fdir.scheme, self.scheme) 

465 

466 fnew2 = fdir.join(new2name) 

467 self.assertTrue(fnew2.unquoted_path.endswith(new2name)) 

468 if quotes: 

469 self.assertNotIn("###", fnew2.path) 

470 else: 

471 self.assertIn("###", fnew2.path) 

472 

473 # Test that children relative to schemeless and file schemes 

474 # still return the same unquoted name 

475 self.assertEqual(fnew2.relative_to(fdir), new2name, f"{fnew2}.relative_to({fdir})") 

476 self.assertEqual(fnew2.relative_to(dir), new2name, f"{fnew2}.relative_to({dir})") 

477 self.assertEqual(new2.relative_to(fdir), new2name, f"{new2}.relative_to({fdir})") 

478 self.assertEqual(new2.relative_to(dir), new2name, f"{new2}.relative_to({dir})") 

479 

480 # Check for double quoting 

481 plus_path = "/a/b/c+d/" 

482 with self.assertLogs(level="WARNING"): 

483 uri = ResourcePath(urllib.parse.quote(plus_path), forceDirectory=True) 

484 self.assertEqual(uri.ospath, plus_path) 

485 

486 # Check that # is not escaped for schemeless URIs 

487 hash_path = "/a/b#/c&d#xyz" 

488 hpos = hash_path.rfind("#") 

489 uri = ResourcePath(hash_path) 

490 self.assertEqual(uri.ospath, hash_path[:hpos]) 

491 self.assertEqual(uri.fragment, hash_path[hpos + 1 :]) 

492 self.assertEqual(uri.unquoted_fragment, uri.fragment) 

493 

494 # Fragments can be quoted, although this is not enforced anywhere. 

495 with_frag = ResourcePath(self._make_uri("a/b.txt#" + urllib.parse.quote("zip-path=ingést"))) 

496 self.assertEqual(with_frag.fragment, "zip-path%3Ding%C3%A9st") 

497 self.assertEqual(with_frag.unquoted_fragment, "zip-path=ingést") 

498 

499 def test_hash(self) -> None: 

500 """Test that we can store URIs in sets and as keys.""" 

501 uri1 = self.root_uri 

502 uri2 = uri1.join("test/") 

503 s = {uri1, uri2} 

504 self.assertIn(uri1, s) 

505 

506 d = {uri1: "1", uri2: "2"} 

507 self.assertEqual(d[uri2], "2") 

508 

509 def test_root_uri(self) -> None: 

510 """Test ResourcePath.root_uri().""" 

511 uri = ResourcePath(self._make_uri("a/b/c.txt")) 

512 self.assertEqual(uri.root_uri().geturl(), self.root) 

513 

514 def test_join(self) -> None: 

515 """Test .join method.""" 

516 root_str = self.root 

517 root = self.root_uri 

518 

519 self.assertEqual(root.join("b/test.txt").geturl(), f"{root_str}b/test.txt") 

520 add_dir = root.join("b/c/d/") 

521 self.assertTrue(add_dir.isdir()) 

522 self.assertEqual(add_dir.geturl(), f"{root_str}b/c/d/") 

523 

524 up_relative = root.join("../b/c.txt") 

525 self.assertFalse(up_relative.isdir()) 

526 self.assertEqual(up_relative.geturl(), f"{root_str}b/c.txt") 

527 

528 # Check that fragment is passed through join (simple unquoted case). 

529 fnew3 = root.join("a/b.txt#fragment") 

530 self.assertEqual(fnew3.fragment, "fragment") 

531 self.assertEqual(fnew3.basename(), "b.txt", msg=f"Got: {fnew3._uri}") 

532 

533 # Check that fragment on the directory is dropped on join. 

534 frag_dir = add_dir.join("subdir/#dir_fragment") 

535 self.assertEqual(frag_dir.fragment, "dir_fragment") 

536 fnew4 = frag_dir.join("a.txt") 

537 self.assertEqual(fnew4.fragment, "") 

538 self.assertTrue(str(fnew4).endswith("/a.txt")) 

539 

540 # Join a resource path. 

541 subpath = ResourcePath("a/b.txt#fragment2", forceAbsolute=False, forceDirectory=False) 

542 fnew3 = root.join(subpath) 

543 self.assertEqual(fnew3.fragment, "fragment2") 

544 self.assertEqual(fnew3.basename(), "b.txt", msg=f"Got: {fnew3._uri}") 

545 

546 # Quoted string with fragment. 

547 quote_example = "hsc/payload/b&c.t@x#t" 

548 needs_quote = root.join(quote_example) 

549 self.assertEqual(needs_quote.unquoted_path, "/" + quote_example[:-2]) 

550 self.assertEqual(needs_quote.fragment, "t") 

551 

552 other = ResourcePath(f"{self.root}test.txt") 

553 self.assertEqual(root.join(other), other) 

554 self.assertEqual(other.join("b/new.txt").geturl(), f"{self.root}test.txt/b/new.txt") 

555 

556 other = ResourcePath(f"{self.root}text.txt", forceDirectory=False) 

557 with self.assertRaises(ValueError): 

558 other.join("b/new.text") 

559 

560 joined = ResourcePath(f"{self.root}hsc/payload/").join( 

561 ResourcePath("test.qgraph", forceAbsolute=False) 

562 ) 

563 self.assertEqual(joined, ResourcePath(f"{self.root}hsc/payload/test.qgraph")) 

564 

565 qgraph = ResourcePath("test.qgraph") # Absolute URI 

566 joined = ResourcePath(f"{self.root}hsc/payload/").join(qgraph) 

567 self.assertEqual(joined, qgraph) 

568 

569 with self.assertRaises(ValueError): 

570 root.join("dir/", forceDirectory=False) 

571 

572 temp = root.join("dir2/", isTemporary=True) 

573 with self.assertRaises(RuntimeError): 

574 temp.join("test.txt", isTemporary=False) 

575 

576 rel = ResourcePath("new.txt", forceAbsolute=False, forceDirectory=False) 

577 with self.assertRaises(RuntimeError): 

578 root.join(rel, forceDirectory=True) 

579 

580 def test_quoting(self) -> None: 

581 """Check that quoting works.""" 

582 parent = ResourcePath(self._make_uri("rootdir"), forceDirectory=True) 

583 subpath = "rootdir/dir1+/file?.txt" 

584 child = ResourcePath(self._make_uri(urllib.parse.quote(subpath))) 

585 

586 self.assertEqual(child.relative_to(parent), "dir1+/file?.txt") 

587 self.assertEqual(child.basename(), "file?.txt") 

588 self.assertEqual(child.relativeToPathRoot, subpath) 

589 self.assertIn("%", child.path) 

590 self.assertEqual(child.unquoted_path, "/" + subpath) 

591 

592 def test_ordering(self) -> None: 

593 """Check that greater/less comparison operators work.""" 

594 a = self._make_uri("a.txt") 

595 b = self._make_uri("b/") 

596 self.assertLess(a, b) 

597 self.assertFalse(a < a) 

598 self.assertLessEqual(a, b) 

599 self.assertLessEqual(a, a) 

600 self.assertGreater(b, a) 

601 self.assertFalse(b > b) 

602 self.assertGreaterEqual(b, a) 

603 self.assertGreaterEqual(b, b) 

604 

605 

606class GenericReadWriteTestCase(_GenericTestCase): 

607 """Test schemes that can read and write using concrete resources.""" 

608 

609 transfer_modes: tuple[str, ...] = ("copy", "move") 

610 testdir: str | None = None 

611 # Number of files to use for mremove() testing to ensure difference code 

612 # paths are hit. Do not want to generically use many files for schemes 

613 # where it makes no difference. 

614 n_mremove_files: int = 15 

615 

616 def setUp(self) -> None: 

617 if self.scheme is None: 

618 raise unittest.SkipTest("No scheme defined") 

619 self.root = self._make_uri("") 

620 self.root_uri = ResourcePath(self.root, forceDirectory=True, forceAbsolute=False) 

621 

622 if self.scheme == "file": 

623 # Use a local tempdir because on macOS the temp dirs use symlinks 

624 # so relsymlink gets quite confused. 

625 self.tmpdir = ResourcePath(makeTestTempDir(self.testdir), forceDirectory=True) 

626 else: 

627 # Create random tmp directory relative to the test root. 

628 self.tmpdir = self.root_uri.join( 

629 "TESTING-" + "".join(random.choices(string.ascii_lowercase + string.digits, k=8)), 

630 forceDirectory=True, 

631 ) 

632 self.tmpdir.mkdir() 

633 

634 def tearDown(self) -> None: 

635 if self.tmpdir and self.tmpdir.isLocal: 

636 removeTestTempDir(self.tmpdir.ospath) 

637 

638 def test_file(self) -> None: 

639 uri = self.tmpdir.join("test.txt") 

640 self.assertFalse(uri.exists(), f"{uri} should not exist") 

641 self.assertTrue(uri.path.endswith("test.txt")) 

642 

643 content = "abcdefghijklmnopqrstuv\n" 

644 uri.write(content.encode()) 

645 self.assertTrue(uri.exists(), f"{uri} should now exist") 

646 self.assertEqual(uri.read().decode(), content) 

647 self.assertEqual(uri.size(), len(content.encode())) 

648 

649 with self.assertRaises(FileExistsError): 

650 uri.write(b"", overwrite=False) 

651 

652 # Not all backends can tell if a remove fails so we can not 

653 # test that a remove of a non-existent entry is guaranteed to raise. 

654 uri.remove() 

655 self.assertFalse(uri.exists()) 

656 

657 # Ideally the test would remove the file again and raise a 

658 # FileNotFoundError. This is not reliable for remote resources 

659 # and doing an explicit check before trying to remove the resource 

660 # just to raise an exception is deemed an unacceptable overhead. 

661 

662 with self.assertRaises(FileNotFoundError): 

663 uri.read() 

664 

665 with self.assertRaises(FileNotFoundError): 

666 self.tmpdir.join("file/not/there.txt").size() 

667 

668 # Check that creating a URI from a URI returns the same thing 

669 uri2 = ResourcePath(uri) 

670 self.assertEqual(uri, uri2) 

671 self.assertEqual(id(uri), id(uri2)) 

672 

673 def test_get_info_generic(self) -> None: 

674 """Test generic get_info properties.""" 

675 now = datetime.datetime.now(tz=datetime.UTC) 

676 uri = self.tmpdir.join("test.txt") 

677 

678 with self.assertRaises(FileNotFoundError): 

679 uri.get_info() 

680 

681 content = "abcdefghijklmnopqrstuv\n" 

682 uri.write(content.encode()) 

683 

684 info = uri.get_info() 

685 self.assertTrue(info.is_file) 

686 self.assertEqual(info.size, len(content)) 

687 assert info.last_modified is not None 

688 self.assertGreaterEqual(info.last_modified.timestamp(), now.timestamp() - 1.0) 

689 self.assertIsInstance(info.checksums, dict) # Checksums are backend dependent. 

690 

691 for dir_uri in (uri.parent(), uri.root_uri()): 

692 # File URIs can return values for modification dates for 

693 # directories. S3 URIs can return checksums for directories. 

694 dirinfo = dir_uri.get_info() 

695 self.assertEqual(dirinfo.uri, str(dir_uri)) 

696 self.assertFalse(dirinfo.is_file) 

697 self.assertEqual(dirinfo.size, 0) 

698 

699 newdir = self.tmpdir.join("newdir/", forceDirectory=True) 

700 with self.assertRaises(FileNotFoundError): 

701 newdir.get_info() 

702 

703 def test_mkdir(self) -> None: 

704 newdir = self.tmpdir.join("newdir/seconddir", forceDirectory=True) 

705 newdir.mkdir() 

706 self.assertTrue(newdir.exists()) 

707 self.assertEqual(newdir.size(), 0) 

708 

709 newfile = newdir.join("temp.txt") 

710 newfile.write(b"Data") 

711 self.assertTrue(newfile.exists()) 

712 

713 file = self.tmpdir.join("file.txt") 

714 # Some schemes will realize that the URI is not a file and so 

715 # will raise NotADirectoryError. The file scheme is more permissive 

716 # and lets you write anything but will raise NotADirectoryError 

717 # if a non-directory is already there. We therefore write something 

718 # to the file to ensure that we trigger a portable exception. 

719 file.write(b"") 

720 with self.assertRaises(NotADirectoryError): 

721 file.mkdir() 

722 

723 # The root should exist. 

724 self.root_uri.mkdir() 

725 self.assertTrue(self.root_uri.exists()) 

726 

727 def test_transfer(self) -> None: 

728 src = self.tmpdir.join("test.txt") 

729 content = "Content is some content\nwith something to say\n\n" 

730 src.write(content.encode()) 

731 

732 can_move = "move" in self.transfer_modes 

733 for mode in self.transfer_modes: 

734 if mode == "move": 

735 continue 

736 

737 dest = self.tmpdir.join(f"dest_{mode}.txt") 

738 # Ensure that we get some debugging output. 

739 with self.assertLogs("lsst.resources", level=logging.DEBUG) as cm: 

740 dest.transfer_from(src, transfer=mode) 

741 self.assertIn("Transferring ", "\n".join(cm.output)) 

742 self.assertTrue(dest.exists(), f"Check that {dest} exists (transfer={mode})") 

743 

744 new_content = dest.read().decode() 

745 self.assertEqual(new_content, content) 

746 

747 if mode in ("symlink", "relsymlink"): 

748 self.assertTrue(os.path.islink(dest.ospath), f"Check that {dest} is symlink") 

749 

750 # If the source and destination are hardlinks of each other 

751 # the transfer should work even if overwrite=False. 

752 if mode in ("link", "hardlink"): 

753 dest.transfer_from(src, transfer=mode) 

754 else: 

755 with self.assertRaises( 

756 FileExistsError, msg=f"Overwrite of {dest} should not be allowed ({mode})" 

757 ): 

758 dest.transfer_from(src, transfer=mode) 

759 

760 # Transfer again and overwrite. 

761 dest.transfer_from(src, transfer=mode, overwrite=True) 

762 

763 dest.remove() 

764 

765 b = src.read() 

766 self.assertEqual(b.decode(), new_content) 

767 

768 nbytes = 10 

769 subset = src.read(size=nbytes) 

770 self.assertEqual(len(subset), nbytes) 

771 self.assertEqual(subset.decode(), content[:nbytes]) 

772 

773 # Transferring to self should be okay. 

774 src.transfer_from(src, "auto") 

775 

776 with self.assertRaises(ValueError): 

777 src.transfer_from(src, transfer="unknown") 

778 

779 # A move transfer is special. 

780 if can_move: 

781 dest.transfer_from(src, transfer="move") 

782 self.assertFalse(src.exists()) 

783 self.assertTrue(dest.exists()) 

784 else: 

785 src.remove() 

786 

787 dest.remove() 

788 with self.assertRaises(FileNotFoundError): 

789 dest.transfer_from(src, "auto") 

790 

791 def test_mtransfer(self) -> None: 

792 n_files = 10 

793 sources = [self.tmpdir.join(f"test{n}.txt") for n in range(n_files)] 

794 destinations = [self.tmpdir.join(f"dest_test{n}.txt") for n in range(n_files)] 

795 

796 for i, src in enumerate(sources): 

797 content = f"{i}\nContent is some content\nwith something to say\n\n" 

798 src.write(content.encode()) 

799 

800 results = ResourcePath.mtransfer("copy", zip(sources, destinations, strict=True)) 

801 self.assertTrue(all(res.success for res in results.values())) 

802 self.assertTrue(all(dest.exists() for dest in results)) 

803 

804 for i, dest in enumerate(destinations): 

805 new_content = dest.read().decode() 

806 self.assertTrue(new_content.startswith(f"{i}\n")) 

807 

808 # Overwrite should work. 

809 results = ResourcePath.mtransfer("copy", zip(sources, destinations, strict=True), overwrite=True) 

810 

811 # Overwrite failure. 

812 results = ResourcePath.mtransfer( 

813 "copy", zip(sources, destinations, strict=True), overwrite=False, do_raise=False 

814 ) 

815 self.assertFalse(all(res.success for res in results.values())) 

816 

817 with self.assertRaises(ExceptionGroup): 

818 results = ResourcePath.mtransfer( 

819 "copy", zip(sources, destinations, strict=True), overwrite=False, do_raise=True 

820 ) 

821 

822 def test_local_transfer(self) -> None: 

823 """Test we can transfer to and from local file.""" 

824 remote_src = self.tmpdir.join("src.json") 

825 remote_src.write(b"42") 

826 remote_dest = self.tmpdir.join("dest.json") 

827 

828 with ResourcePath.temporary_uri(suffix=".json") as tmp: 

829 self.assertTrue(tmp.isLocal) 

830 tmp.transfer_from(remote_src, transfer="auto") 

831 self.assertEqual(tmp.read(), remote_src.read()) 

832 

833 remote_dest.transfer_from(tmp, transfer="auto") 

834 self.assertEqual(remote_dest.read(), tmp.read()) 

835 

836 # Temporary (possibly remote) resource. 

837 # Transfers between temporary resources. 

838 with ( 

839 ResourcePath.temporary_uri(prefix=self.tmpdir.join("tmp"), suffix=".json") as remote_tmp, 

840 ResourcePath.temporary_uri(suffix=".json") as local_tmp, 

841 ): 

842 remote_tmp.write(b"42") 

843 if not remote_tmp.isLocal: 

844 for transfer in ("link", "symlink", "hardlink", "relsymlink"): 

845 with self.assertRaises(RuntimeError): 

846 # Trying to symlink a remote resource is not going 

847 # to work. A hardlink could work but would rely 

848 # on the local temp space being on the same 

849 # filesystem as the target. 

850 local_tmp.transfer_from(remote_tmp, transfer) 

851 local_tmp.transfer_from(remote_tmp, "move") 

852 self.assertFalse(remote_tmp.exists()) 

853 remote_tmp.transfer_from(local_tmp, "auto", overwrite=True) 

854 self.assertEqual(local_tmp.read(), remote_tmp.read()) 

855 

856 # Transfer of missing remote. 

857 remote_tmp.remove() 

858 with self.assertRaises(FileNotFoundError): 

859 local_tmp.transfer_from(remote_tmp, "auto", overwrite=True) 

860 

861 def test_local(self) -> None: 

862 """Check that remote resources can be made local.""" 

863 src = self.tmpdir.join("test.txt") 

864 original_content = "Content is some content\nwith something to say\n\n" 

865 src.write(original_content.encode()) 

866 

867 # Run this twice to ensure use of cache in code coverage 

868 # if applicable. 

869 for _ in (1, 2): 

870 with src.as_local() as local_uri: 

871 self.assertTrue(local_uri.isLocal) 

872 content = local_uri.read().decode() 

873 self.assertEqual(content, original_content) 

874 

875 if src.isLocal: 

876 self.assertEqual(src, local_uri) 

877 

878 with self.assertRaises(IsADirectoryError): 

879 with self.root_uri.as_local() as local_uri: 

880 pass 

881 

882 if not src.isLocal: 

883 # as_local tmpdir can not be a remote resource. 

884 with self.assertRaises(ValueError): 

885 with src.as_local(tmpdir=self.root_uri) as local_uri: 

886 pass 

887 

888 # tmpdir is ignored for local file. 

889 with tempfile.TemporaryDirectory() as tmpdir: 

890 temp_dir = ResourcePath(tmpdir, forceDirectory=True) 

891 with src.as_local(tmpdir=temp_dir) as local_uri: 

892 self.assertEqual(local_uri.dirname(), temp_dir) 

893 self.assertTrue(local_uri.exists()) 

894 

895 def test_local_mtransfer(self) -> None: 

896 """Check that bulk transfer to/from local works.""" 

897 # Create remote resources 

898 n_files = 10 

899 sources = [self.tmpdir.join(f"test{n}.txt") for n in range(n_files)] 

900 

901 for i, src in enumerate(sources): 

902 content = f"{i}\nContent is some content\nwith something to say\n\n" 

903 src.write(content.encode()) 

904 

905 # Potentially remote to local. 

906 with tempfile.TemporaryDirectory() as tmpdir: 

907 temp_dir = ResourcePath(tmpdir, forceDirectory=True) 

908 destinations = [temp_dir.join(f"dest_test{n}.txt") for n in range(n_files)] 

909 

910 results = ResourcePath.mtransfer("copy", zip(sources, destinations, strict=True)) 

911 self.assertTrue(all(res.success for res in results.values())) 

912 self.assertTrue(all(dest.exists() for dest in results)) 

913 

914 # Overwrite should work. 

915 results = ResourcePath.mtransfer("copy", zip(sources, destinations, strict=True), overwrite=True) 

916 

917 # Now reverse so local to potentially remote. 

918 for src in sources: 

919 src.remove() 

920 results = ResourcePath.mtransfer("copy", zip(destinations, sources, strict=True), overwrite=False) 

921 self.assertTrue(all(res.success for res in results.values())) 

922 self.assertTrue(all(dest.exists() for dest in results)) 

923 

924 def test_walk(self) -> None: 

925 """Walk a directory hierarchy.""" 

926 root = self.tmpdir.join("walk/") 

927 

928 # Look for a file that is not there 

929 file = root.join("config/basic/butler.yaml") 

930 found_list = list(ResourcePath.findFileResources([file])) 

931 self.assertEqual(found_list[0], file) 

932 

933 # First create the files (content is irrelevant). 

934 expected_files = { 

935 "dir1/a.yaml", 

936 "dir1/b.yaml", 

937 "dir1/c.json", 

938 "dir2/d.json", 

939 "dir2/e.yaml", 

940 } 

941 expected_uris = {root.join(f) for f in expected_files} 

942 for uri in expected_uris: 

943 uri.write(b"") 

944 self.assertTrue(uri.exists()) 

945 

946 # Look for the files. 

947 found = set(ResourcePath.findFileResources([root])) 

948 self.assertEqual(found, expected_uris) 

949 

950 # Now solely the YAML files. 

951 expected_yaml = {u for u in expected_uris if u.getExtension() == ".yaml"} 

952 found = set(ResourcePath.findFileResources([root], file_filter=r".*\.yaml$")) 

953 self.assertEqual(found, expected_yaml) 

954 

955 # Now two explicit directories and a file 

956 expected = set(expected_yaml) 

957 expected.add(file) 

958 

959 found = set( 

960 ResourcePath.findFileResources( 

961 [file, root.join("dir1/"), root.join("dir2/")], 

962 file_filter=r".*\.yaml$", 

963 ) 

964 ) 

965 self.assertEqual(found, expected) 

966 

967 # Group by directory -- find everything and compare it with what 

968 # we expected to be there in total. 

969 found_yaml = set() 

970 counter = 0 

971 for uris in ResourcePath.findFileResources([file, root], file_filter=r".*\.yaml$", grouped=True): 

972 assert not isinstance(uris, ResourcePath) # for mypy. 

973 found_uris = set(uris) 

974 if found_uris: 

975 counter += 1 

976 

977 found_yaml.update(found_uris) 

978 

979 expected_yaml_2 = expected_yaml 

980 expected_yaml_2.add(file) 

981 self.assertEqual(found_yaml, expected_yaml) 

982 self.assertEqual(counter, 3) 

983 

984 # Grouping but check that single files are returned in a single group 

985 # at the end 

986 file2 = root.join("config/templates/templates-bad.yaml") 

987 found_grouped = [ 

988 list(group) 

989 for group in ResourcePath.findFileResources([file, file2, root.join("dir2/")], grouped=True) 

990 if not isinstance(group, ResourcePath) # For mypy. 

991 ] 

992 self.assertEqual(len(found_grouped), 2, f"Found: {list(found_grouped)}") 

993 self.assertEqual(list(found_grouped[1]), [file, file2]) 

994 

995 with self.assertRaises(ValueError): 

996 # The list forces the generator to run. 

997 list(file.walk()) 

998 

999 # A directory that does not exist returns nothing. 

1000 self.assertEqual(list(root.join("dir3/").walk()), []) 

1001 

1002 def test_large_walk(self) -> None: 

1003 # In some systems pagination is used so ensure that we can handle 

1004 # large numbers of files. For example S3 limits us to 1000 responses 

1005 # per listing call. 

1006 created = set() 

1007 counter = 1 

1008 n_dir1 = 1100 

1009 root = self.tmpdir.join("large_walk", forceDirectory=True) 

1010 while counter <= n_dir1: 

1011 new = ResourcePath(root.join(f"file{counter:04d}.txt")) 

1012 new.write(f"{counter}".encode()) 

1013 created.add(new) 

1014 counter += 1 

1015 counter = 1 

1016 # Put some in a subdirectory to make sure we are looking in a 

1017 # hierarchy. 

1018 n_dir2 = 100 

1019 subdir = root.join("subdir", forceDirectory=True) 

1020 while counter <= n_dir2: 

1021 new = ResourcePath(subdir.join(f"file{counter:04d}.txt")) 

1022 new.write(f"{counter}".encode()) 

1023 created.add(new) 

1024 counter += 1 

1025 

1026 found = set(ResourcePath.findFileResources([root])) 

1027 self.assertEqual(len(found), n_dir1 + n_dir2) 

1028 self.assertEqual(found, created) 

1029 

1030 # Again with grouping. 

1031 # (mypy gets upset not knowing which of the two options is being 

1032 # returned so add useless instance check). 

1033 found_list = [ 

1034 list(group) 

1035 for group in ResourcePath.findFileResources([root], grouped=True) 

1036 if not isinstance(group, ResourcePath) # For mypy. 

1037 ] 

1038 self.assertEqual(len(found_list), 2) 

1039 self.assertEqual(len(found_list[0]), n_dir1) 

1040 self.assertEqual(len(found_list[1]), n_dir2) 

1041 

1042 def test_temporary(self) -> None: 

1043 prefix = self.tmpdir.join("tmp", forceDirectory=True) 

1044 with ResourcePath.temporary_uri(prefix=prefix, suffix=".json") as tmp: 

1045 self.assertEqual(tmp.getExtension(), ".json", f"uri: {tmp}") 

1046 self.assertTrue(tmp.isabs(), f"uri: {tmp}") 

1047 self.assertFalse(tmp.exists(), f"uri: {tmp}") 

1048 tmp.write(b"abcd") 

1049 self.assertTrue(tmp.exists(), f"uri: {tmp}") 

1050 self.assertTrue(tmp.isTemporary) 

1051 self.assertFalse(tmp.exists(), f"uri: {tmp}") 

1052 

1053 tmpdir = ResourcePath(self.tmpdir, forceDirectory=True) 

1054 with ResourcePath.temporary_uri(prefix=tmpdir) as tmp: 

1055 # Use a specified tmpdir and check it is okay for the file 

1056 # to not be created. 

1057 self.assertFalse(tmp.getExtension()) 

1058 self.assertFalse(tmp.exists(), f"uri: {tmp}") 

1059 self.assertEqual(tmp.scheme, self.scheme) 

1060 self.assertTrue(tmp.isTemporary) 

1061 self.assertTrue(tmpdir.exists(), f"uri: {tmpdir} still exists") 

1062 

1063 # Fake a directory suffix. 

1064 with self.assertRaises(NotImplementedError): 

1065 with ResourcePath.temporary_uri(prefix=self.root_uri, suffix="xxx/") as tmp: 

1066 pass 

1067 

1068 @unittest.skipIf(fsspec is None, "fsspec is not available.") 

1069 def test_fsspec(self) -> None: 

1070 """Simple read of a file.""" 

1071 uri = self.tmpdir.join("test.txt") 

1072 self.assertFalse(uri.exists(), f"{uri} should not exist") 

1073 self.assertTrue(uri.path.endswith("test.txt")) 

1074 

1075 content = "abcdefghijklmnopqrstuv\n" 

1076 uri.write(content.encode()) 

1077 

1078 try: 

1079 fs, path = uri.to_fsspec() 

1080 except NotImplementedError as e: 

1081 raise unittest.SkipTest(str(e)) from e 

1082 except ImportError as e: 

1083 # HttpResourcePath.to_fsspec() raises if support 

1084 # of fsspec for webDAV back ends is disabled. 

1085 raise unittest.SkipTest(str(e)) from e 

1086 with fs.open(path, "r") as fd: 

1087 as_read = fd.read() 

1088 self.assertEqual(as_read, content) 

1089 

1090 def test_open(self) -> None: 

1091 tmpdir = ResourcePath(self.tmpdir, forceDirectory=True) 

1092 with ResourcePath.temporary_uri(prefix=tmpdir, suffix=".txt") as tmp: 

1093 _check_open(self, tmp, mode_suffixes=("", "t")) 

1094 _check_open(self, tmp, mode_suffixes=("t",), encoding="utf-16") 

1095 _check_open(self, tmp, mode_suffixes=("t",), prefer_file_temporary=True) 

1096 _check_open(self, tmp, mode_suffixes=("t",), encoding="utf-16", prefer_file_temporary=True) 

1097 with ResourcePath.temporary_uri(prefix=tmpdir, suffix=".dat") as tmp: 

1098 _check_open(self, tmp, mode_suffixes=("b",)) 

1099 _check_open(self, tmp, mode_suffixes=("b",), prefer_file_temporary=True) 

1100 

1101 with self.assertRaises(IsADirectoryError): 

1102 with self.root_uri.open(): 

1103 pass 

1104 

1105 def test_mexists(self) -> None: 

1106 root = self.tmpdir.join("mexists/") 

1107 

1108 # A file that is not there. 

1109 file = root.join("config/basic/butler.yaml") 

1110 

1111 # Create some files. Most schemes the code paths do not change for 10 

1112 # vs 1000 files but in some schemes it does. 

1113 expected_files = [f"dir1/f{n}.yaml" for n in range(self.n_mremove_files)] 

1114 expected_uris = [root.join(f) for f in expected_files] 

1115 for uri in expected_uris: 

1116 uri.write(b"") 

1117 self.assertTrue(uri.exists()) 

1118 expected_uris.append(file) 

1119 

1120 # Force to run with fewer workers than there are files. 

1121 multi = ResourcePath.mexists(expected_uris, num_workers=3) 

1122 

1123 for uri, is_there in multi.items(): 

1124 if uri == file: 

1125 self.assertFalse(is_there) 

1126 else: 

1127 self.assertTrue(is_there) 

1128 

1129 # Clean up. Unfortunately POSIX raises a FileNotFoundError but 

1130 # S3 boto does not complain if there is no key. 

1131 ResourcePath.mremove(expected_uris, do_raise=False) 

1132 

1133 # Check they were really removed. 

1134 multi = ResourcePath.mexists(expected_uris, num_workers=3) 

1135 for uri, is_there in multi.items(): 

1136 self.assertFalse(is_there) 

1137 

1138 # Clean up a subset of files that are already gone, but this can 

1139 # trigger a different code path. 

1140 ResourcePath.mremove(expected_uris[:5], do_raise=False)