Coverage for python / lsst / resources / tests.py: 8%
701 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:32 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:32 +0000
1# This file is part of lsst-resources.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
11from __future__ import annotations
13__all__ = ["GenericReadWriteTestCase", "GenericTestCase"]
15import datetime
16import logging
17import os
18import pathlib
19import random
20import string
21import sys
22import tempfile
23import unittest
24import urllib.parse
25import uuid
26from collections.abc import Iterable
27from typing import TYPE_CHECKING, Any
29try:
30 import fsspec
31except ImportError:
32 fsspec = None
34from lsst.resources import ResourcePath
35from lsst.resources.utils import makeTestTempDir, removeTestTempDir
37TESTDIR = os.path.abspath(os.path.dirname(__file__))
40def _check_open(
41 test_case: _GenericTestCase | unittest.TestCase,
42 uri: ResourcePath,
43 *,
44 mode_suffixes: Iterable[str] = ("", "t", "b"),
45 **kwargs: Any,
46) -> None:
47 """Test an implementation of ButlerURI.open.
49 Parameters
50 ----------
51 test_case : `unittest.TestCase`
52 Test case to use for assertions.
53 uri : `ResourcePath`
54 URI to use for tests. Must point to a writeable location that is not
55 yet occupied by a file. On return, the location may point to a file
56 only if the test fails.
57 mode_suffixes : `~collections.abc.Iterable` of `str`
58 Suffixes to pass as part of the ``mode`` argument to
59 `ResourcePath.open`, indicating whether to open as binary or as text;
60 the only permitted elements are ``""``, ``"t"``, and ``"b"`.
61 **kwargs
62 Additional keyword arguments to forward to all calls to `open`.
63 """
64 text_content = "abcdefghijklmnopqrstuvwxyz🙂"
65 bytes_content = uuid.uuid4().bytes
66 content_by_mode_suffix: dict[str, str | bytes] = {
67 "": text_content,
68 "t": text_content,
69 "b": bytes_content,
70 }
71 empty_content_by_mode_suffix: dict[str, str | bytes] = {
72 "": "",
73 "t": "",
74 "b": b"",
75 }
76 # To appease mypy
77 double_content_by_mode_suffix: dict[str, str | bytes] = {
78 "": text_content + text_content,
79 "t": text_content + text_content,
80 "b": bytes_content + bytes_content,
81 }
82 for mode_suffix in mode_suffixes:
83 content = content_by_mode_suffix[mode_suffix]
84 double_content = double_content_by_mode_suffix[mode_suffix]
85 # Create file with mode='x', which prohibits overwriting.
86 with uri.open("x" + mode_suffix, **kwargs) as write_buffer:
87 write_buffer.write(content)
88 test_case.assertTrue(uri.exists())
89 # Check that opening with 'x' now raises, and does not modify content.
90 with test_case.assertRaises(FileExistsError):
91 with uri.open("x" + mode_suffix, **kwargs) as write_buffer:
92 write_buffer.write("bad")
93 # Read the file we created and check the contents.
94 with uri.open("r" + mode_suffix, **kwargs) as read_buffer:
95 test_case.assertEqual(read_buffer.read(), content)
96 # The names will not match if a local temporary is being written.
97 if not kwargs.get("prefer_file_temporary"):
98 test_case.assertIn(uri.basename(), read_buffer.name)
99 # Check that we can read bytes in a loop and get EOF
100 with uri.open("r" + mode_suffix, **kwargs) as read_buffer:
101 # Seek off the end of the file and should read empty back.
102 read_buffer.seek(1024)
103 test_case.assertEqual(read_buffer.tell(), 1024)
104 content_read = read_buffer.read() # Read as much as we can.
105 test_case.assertEqual(len(content_read), 0, f"Read: {content_read!r}, expected empty.")
107 # First read more than the content.
108 read_buffer.seek(0)
109 size = len(content) * 3
110 chunk_read = read_buffer.read(size)
111 test_case.assertEqual(chunk_read, content)
113 # Repeated reads should always return empty string.
114 chunk_read = read_buffer.read(size)
115 test_case.assertEqual(len(chunk_read), 0)
116 chunk_read = read_buffer.read(size)
117 test_case.assertEqual(len(chunk_read), 0)
119 # Go back to start of file and read in smaller chunks.
120 read_buffer.seek(0)
121 size = len(content) // 3
123 content_read = empty_content_by_mode_suffix[mode_suffix]
124 n_reads = 0
125 while chunk_read := read_buffer.read(size):
126 content_read += chunk_read
127 n_reads += 1
128 if n_reads > 10: # In case EOF never hits because of bug.
129 raise AssertionError(
130 f"Failed to stop reading from file after {n_reads} loops. "
131 f"Read {len(content_read)} bytes/characters. Expected {len(content)}."
132 )
133 test_case.assertEqual(content_read, content)
135 # Go back to start of file and read the entire thing.
136 read_buffer.seek(0)
137 content_read = read_buffer.read()
138 test_case.assertEqual(content_read, content)
140 # Seek off the end of the file and should read empty back.
141 # We run this check twice since in some cases the handle will
142 # cache knowledge of the file size.
143 read_buffer.seek(1024)
144 test_case.assertEqual(read_buffer.tell(), 1024)
145 content_read = read_buffer.read()
146 test_case.assertEqual(len(content_read), 0, f"Read: {content_read!r}, expected empty.")
148 # Write multiple chunks with flushing to ensure that any handles that
149 # cache without flushing work properly.
150 n = 3
151 with uri.open("w" + mode_suffix, **kwargs) as write_buffer:
152 for _ in range(n):
153 write_buffer.write(content)
154 write_buffer.flush()
155 with uri.open("r" + mode_suffix, **kwargs) as read_buffer:
156 test_case.assertEqual(read_buffer.read(), content * n)
158 # Write two copies of the content, overwriting the single copy there.
159 with uri.open("w" + mode_suffix, **kwargs) as write_buffer:
160 write_buffer.write(double_content)
161 # Read again, this time use mode='r+', which reads what is there and
162 # then lets us write more; we'll use that to reset the file to one
163 # copy of the content.
164 with uri.open("r+" + mode_suffix, **kwargs) as rw_buffer:
165 test_case.assertEqual(rw_buffer.read(), double_content)
166 rw_buffer.seek(0)
167 rw_buffer.truncate()
168 rw_buffer.write(content)
169 rw_buffer.seek(0)
170 test_case.assertEqual(rw_buffer.read(), content)
171 with uri.open("r" + mode_suffix, **kwargs) as read_buffer:
172 test_case.assertEqual(read_buffer.read(), content)
173 # Append some more content to the file; should now have two copies.
174 with uri.open("a" + mode_suffix, **kwargs) as append_buffer:
175 append_buffer.write(content)
176 with uri.open("r" + mode_suffix, **kwargs) as read_buffer:
177 test_case.assertEqual(read_buffer.read(), double_content)
178 # Final mode to check is w+, which does read/write but truncates first.
179 with uri.open("w+" + mode_suffix, **kwargs) as rw_buffer:
180 test_case.assertEqual(rw_buffer.read(), empty_content_by_mode_suffix[mode_suffix])
181 rw_buffer.write(content)
182 rw_buffer.seek(0)
183 test_case.assertEqual(rw_buffer.read(), content)
184 # Check that two seeks with reads to end return correctly.
185 # Seek is only reliable with "b" mode.
186 if mode_suffix == "b":
187 with uri.open("r" + mode_suffix, **kwargs) as read_buffer:
188 size = len(content)
189 seek1 = 2 * size // 3
190 read_buffer.seek(seek1)
191 content1 = read_buffer.read()
192 test_case.assertEqual(len(content1), size - seek1)
193 # Seek earlier and then read to end.
194 seek2 = size // 2
195 read_buffer.seek(seek2)
196 content2 = read_buffer.read()
197 test_case.assertEqual(len(content2), size - seek2)
198 # Check that we can seek from end and read and seek and read.
199 # Negative seek only works in binary mode.
200 with uri.open("rb", **kwargs) as read_buffer:
201 read_buffer.seek(-5, 2) # Relative to end
202 content_read = read_buffer.read()
203 test_case.assertEqual(len(content_read), 5)
204 read_buffer.seek(-10, 2) # Relative to end
205 content_read = read_buffer.read()
206 test_case.assertEqual(len(content_read), 10)
207 with uri.open("r" + mode_suffix, **kwargs) as read_buffer:
208 test_case.assertEqual(read_buffer.read(), content)
209 # Remove file to make room for the next loop of tests with this URI.
210 uri.remove()
213if TYPE_CHECKING:
215 class TestCaseMixin(unittest.TestCase):
216 """Base class for mixin test classes that use TestCase methods."""
218 pass
220else:
222 class TestCaseMixin:
223 """Do-nothing definition of mixin base class for regular execution."""
225 pass
228class _GenericTestCase(TestCaseMixin):
229 """Generic base class for test mixin."""
231 scheme: str | None = None
232 netloc: str | None = None
233 base_path: str | None = None
234 path1 = "test_dir"
235 path2 = "file.txt"
237 def _make_uri(self, path: str, netloc: str | None = None) -> str:
238 if self.scheme is not None:
239 if netloc is None:
240 netloc = self.netloc
241 if path.startswith("/"):
242 path = path[1:]
243 if self.base_path is not None:
244 path = f"{self.base_path}/{path}".lstrip("/")
246 return f"{self.scheme}://{netloc}/{path}"
247 else:
248 return path
251class GenericTestCase(_GenericTestCase):
252 """Test cases for generic manipulation of a `ResourcePath`."""
254 def setUp(self) -> None:
255 if self.scheme is None:
256 raise unittest.SkipTest("No scheme defined")
257 self.root = self._make_uri("")
258 self.root_uri = ResourcePath(self.root, forceDirectory=True, forceAbsolute=False)
260 def test_creation(self) -> None:
261 self.assertEqual(self.root_uri.scheme, self.scheme)
262 self.assertEqual(self.root_uri.netloc, self.netloc)
263 self.assertFalse(self.root_uri.query)
264 self.assertFalse(self.root_uri.params)
266 with self.assertRaises(ValueError):
267 ResourcePath({}) # type: ignore
269 with self.assertRaises(RuntimeError):
270 ResourcePath(self.root_uri, isTemporary=True)
272 file = self.root_uri.join("file.txt", forceDirectory=False)
273 with self.assertRaises(RuntimeError):
274 ResourcePath(file, forceDirectory=True)
276 file = self.root_uri.join("file.txt")
277 file_as_dir = ResourcePath(file, forceDirectory=True)
278 self.assertTrue(file_as_dir.isdir())
280 dir = self._make_uri("a/b/c/")
281 with self.assertRaises(ValueError):
282 ResourcePath(dir, forceDirectory=False)
284 with self.assertRaises(NotImplementedError):
285 ResourcePath("unknown://netloc")
287 replaced = file.replace(fragment="frag")
288 self.assertEqual(replaced.fragment, "frag")
290 with self.assertRaises(ValueError):
291 file.replace(scheme="new")
293 self.assertNotEqual(replaced, str(replaced))
294 self.assertNotEqual(str(replaced), replaced)
296 def test_extension(self) -> None:
297 uri = ResourcePath(self._make_uri("dir/test.txt"))
298 self.assertEqual(uri.updatedExtension(None), uri)
299 self.assertEqual(uri.updatedExtension(".txt"), uri)
300 self.assertEqual(id(uri.updatedExtension(".txt")), id(uri))
302 fits = uri.updatedExtension(".fits.gz")
303 self.assertEqual(fits.basename(), "test.fits.gz")
304 self.assertEqual(fits.updatedExtension(".jpeg").basename(), "test.jpeg")
306 extensionless = self.root_uri.join("no_ext")
307 self.assertEqual(extensionless.getExtension(), "")
308 extension = extensionless.updatedExtension(".fits")
309 self.assertEqual(extension.getExtension(), ".fits")
311 uri = ResourcePath("test.txt", forceAbsolute=False)
312 self.assertEqual(uri.getExtension(), ".txt")
313 uri = ResourcePath(self._make_uri("dir.1/dir.2/test.txt"), forceDirectory=False)
314 self.assertEqual(uri.getExtension(), ".txt")
315 uri = ResourcePath(self._make_uri("dir.1/dir.2/"), forceDirectory=True)
316 self.assertEqual(uri.getExtension(), ".2")
317 uri = ResourcePath(self._make_uri("dir.1/dir/"), forceDirectory=True)
318 self.assertEqual(uri.getExtension(), "")
320 def test_relative(self) -> None:
321 """Check that we can get subpaths back from two URIs."""
322 parent = ResourcePath(self._make_uri(self.path1), forceDirectory=True)
323 self.assertTrue(parent.isdir())
324 child = parent.join("dir1/file.txt")
326 self.assertEqual(child.relative_to(parent), "dir1/file.txt")
328 not_child = ResourcePath("/a/b/dir1/file.txt")
329 self.assertIsNone(not_child.relative_to(parent))
330 self.assertFalse(not_child.isdir())
332 not_directory = parent.join("dir1/file2.txt")
333 self.assertIsNone(child.relative_to(not_directory))
335 # Relative URIs
336 parent = ResourcePath("a/b/", forceAbsolute=False)
337 child = ResourcePath("a/b/c/d.txt", forceAbsolute=False)
338 self.assertFalse(child.scheme)
339 self.assertEqual(child.relative_to(parent), "c/d.txt")
341 # forceAbsolute=True should work even on an existing ResourcePath
342 self.assertTrue(pathlib.Path(ResourcePath(child, forceAbsolute=True).ospath).is_absolute())
344 # Absolute URI and schemeless URI
345 parent = self.root_uri.join("/a/b/c/")
346 child = ResourcePath("e/f/g.txt", forceAbsolute=False)
348 # If the child is relative and the parent is absolute we assume
349 # that the child is a child of the parent unless it uses ".."
350 self.assertEqual(child.relative_to(parent), "e/f/g.txt", f"{child}.relative_to({parent})")
352 child = ResourcePath("../e/f/g.txt", forceAbsolute=False)
353 self.assertIsNone(child.relative_to(parent))
355 child = ResourcePath("../c/e/f/g.txt", forceAbsolute=False)
356 self.assertEqual(child.relative_to(parent), "e/f/g.txt")
358 # Test with different netloc
359 child = ResourcePath(self._make_uri("a/b/c.txt", netloc="my.host"))
360 parent = ResourcePath(self._make_uri("a", netloc="other"), forceDirectory=True)
361 self.assertIsNone(child.relative_to(parent), f"{child}.relative_to({parent})")
363 # This is an absolute path so will *always* return a file URI and
364 # ignore the root parameter.
365 parent = ResourcePath("/a/b/c", root=self.root_uri, forceDirectory=True)
366 self.assertEqual(parent.geturl(), "file:///a/b/c/")
368 parent = ResourcePath(self._make_uri("/a/b/c"), forceDirectory=True)
369 child = ResourcePath("d/e.txt", root=parent)
370 self.assertEqual(child.relative_to(parent), "d/e.txt", f"{child}.relative_to({parent})")
372 parent = ResourcePath("c/", root=ResourcePath(self._make_uri("/a/b/")))
373 self.assertEqual(child.relative_to(parent), "d/e.txt", f"{child}.relative_to({parent})")
375 # Absolute schemeless child with relative parent will always fail.
376 child = ResourcePath("d/e.txt", root="/a/b/c")
377 parent = ResourcePath("d/e.txt", forceAbsolute=False)
378 self.assertIsNone(child.relative_to(parent), f"{child}.relative_to({parent})")
380 # Allow .. in response.
381 child = ResourcePath(self._make_uri("a/b/c/d.txt"), forceAbsolute=False)
382 parent = ResourcePath(self._make_uri("a/b/d/e/"), forceAbsolute=False)
383 self.assertIsNone(child.relative_to(parent), f"{child}.relative_to({parent})")
385 if sys.version_info >= (3, 12, 0):
386 # Fails on python 3.11.
387 self.assertEqual(
388 child.relative_to(parent, walk_up=True),
389 "../../c/d.txt",
390 f"{child}.relative_to({parent}, walk_up=True)",
391 )
393 def test_parents(self) -> None:
394 """Test of splitting and parent walking."""
395 parent = ResourcePath(self._make_uri("somedir"), forceDirectory=True)
396 child_file = parent.join("subdir/file.txt")
397 self.assertFalse(child_file.isdir())
398 child_subdir, file = child_file.split()
399 self.assertEqual(file, "file.txt")
400 self.assertTrue(child_subdir.isdir())
401 self.assertEqual(child_file.dirname(), child_subdir)
402 self.assertEqual(child_file.basename(), file)
403 self.assertEqual(child_file.parent(), child_subdir)
404 derived_parent = child_subdir.parent()
405 self.assertEqual(derived_parent, parent)
406 self.assertTrue(derived_parent.isdir())
407 self.assertEqual(child_file.parent().parent(), parent)
408 self.assertEqual(child_subdir.dirname(), child_subdir)
410 # Make sure that the parent doesn't retain any fragment from the
411 # child.
412 child_fragment = child_subdir.join("a.txt#fragment")
413 self.assertEqual(child_fragment.fragment, "fragment")
414 fragment_parent = child_fragment.parent()
415 self.assertEqual(fragment_parent.fragment, "")
416 self.assertTrue(str(fragment_parent).endswith("/"))
418 def test_escapes(self) -> None:
419 """Special characters in file paths."""
420 src = self.root_uri.join("bbb/???/test.txt")
421 quotes = src.quotePaths
423 if quotes:
424 self.assertNotIn("???", src.path)
425 else:
426 self.assertIn("???", src.path)
427 self.assertIn("???", src.unquoted_path)
429 file = src.updatedFile("tests??.txt")
430 if quotes:
431 self.assertNotIn("??.txt", file.path)
432 else:
433 self.assertIn("??.txt", file.path)
435 src = src.updatedFile("tests??.txt")
436 self.assertIn("??.txt", src.unquoted_path)
438 # File URI and schemeless URI
439 parent = ResourcePath(self._make_uri(urllib.parse.quote("/a/b/c/de/??/")))
440 child = ResourcePath("e/f/g.txt", forceAbsolute=False)
441 self.assertEqual(child.relative_to(parent), "e/f/g.txt")
443 child = ResourcePath("e/f??#/g.txt", forceAbsolute=False)
444 self.assertEqual(child.relative_to(parent), "e/f??#/g.txt")
446 child = ResourcePath(self._make_uri(urllib.parse.quote("/a/b/c/de/??/e/f??#/g.txt")))
447 self.assertEqual(child.relative_to(parent), "e/f??#/g.txt")
449 self.assertEqual(child.relativeToPathRoot, "a/b/c/de/??/e/f??#/g.txt")
451 # dir.join() morphs into a file scheme
452 dir = ResourcePath(self._make_uri(urllib.parse.quote("bbb/???/")))
453 new = dir.join("test_j.txt")
454 self.assertIn("???", new.unquoted_path, f"Checking {new}")
456 new2name = "###/test??.txt"
457 new2 = dir.join(new2name)
458 self.assertIn("???", new2.unquoted_path)
459 self.assertTrue(new2.unquoted_path.endswith(new2name))
461 fdir = dir.abspath()
462 self.assertNotIn("???", fdir.path)
463 self.assertIn("???", fdir.unquoted_path)
464 self.assertEqual(fdir.scheme, self.scheme)
466 fnew2 = fdir.join(new2name)
467 self.assertTrue(fnew2.unquoted_path.endswith(new2name))
468 if quotes:
469 self.assertNotIn("###", fnew2.path)
470 else:
471 self.assertIn("###", fnew2.path)
473 # Test that children relative to schemeless and file schemes
474 # still return the same unquoted name
475 self.assertEqual(fnew2.relative_to(fdir), new2name, f"{fnew2}.relative_to({fdir})")
476 self.assertEqual(fnew2.relative_to(dir), new2name, f"{fnew2}.relative_to({dir})")
477 self.assertEqual(new2.relative_to(fdir), new2name, f"{new2}.relative_to({fdir})")
478 self.assertEqual(new2.relative_to(dir), new2name, f"{new2}.relative_to({dir})")
480 # Check for double quoting
481 plus_path = "/a/b/c+d/"
482 with self.assertLogs(level="WARNING"):
483 uri = ResourcePath(urllib.parse.quote(plus_path), forceDirectory=True)
484 self.assertEqual(uri.ospath, plus_path)
486 # Check that # is not escaped for schemeless URIs
487 hash_path = "/a/b#/c&d#xyz"
488 hpos = hash_path.rfind("#")
489 uri = ResourcePath(hash_path)
490 self.assertEqual(uri.ospath, hash_path[:hpos])
491 self.assertEqual(uri.fragment, hash_path[hpos + 1 :])
492 self.assertEqual(uri.unquoted_fragment, uri.fragment)
494 # Fragments can be quoted, although this is not enforced anywhere.
495 with_frag = ResourcePath(self._make_uri("a/b.txt#" + urllib.parse.quote("zip-path=ingést")))
496 self.assertEqual(with_frag.fragment, "zip-path%3Ding%C3%A9st")
497 self.assertEqual(with_frag.unquoted_fragment, "zip-path=ingést")
499 def test_hash(self) -> None:
500 """Test that we can store URIs in sets and as keys."""
501 uri1 = self.root_uri
502 uri2 = uri1.join("test/")
503 s = {uri1, uri2}
504 self.assertIn(uri1, s)
506 d = {uri1: "1", uri2: "2"}
507 self.assertEqual(d[uri2], "2")
509 def test_root_uri(self) -> None:
510 """Test ResourcePath.root_uri()."""
511 uri = ResourcePath(self._make_uri("a/b/c.txt"))
512 self.assertEqual(uri.root_uri().geturl(), self.root)
514 def test_join(self) -> None:
515 """Test .join method."""
516 root_str = self.root
517 root = self.root_uri
519 self.assertEqual(root.join("b/test.txt").geturl(), f"{root_str}b/test.txt")
520 add_dir = root.join("b/c/d/")
521 self.assertTrue(add_dir.isdir())
522 self.assertEqual(add_dir.geturl(), f"{root_str}b/c/d/")
524 up_relative = root.join("../b/c.txt")
525 self.assertFalse(up_relative.isdir())
526 self.assertEqual(up_relative.geturl(), f"{root_str}b/c.txt")
528 # Check that fragment is passed through join (simple unquoted case).
529 fnew3 = root.join("a/b.txt#fragment")
530 self.assertEqual(fnew3.fragment, "fragment")
531 self.assertEqual(fnew3.basename(), "b.txt", msg=f"Got: {fnew3._uri}")
533 # Check that fragment on the directory is dropped on join.
534 frag_dir = add_dir.join("subdir/#dir_fragment")
535 self.assertEqual(frag_dir.fragment, "dir_fragment")
536 fnew4 = frag_dir.join("a.txt")
537 self.assertEqual(fnew4.fragment, "")
538 self.assertTrue(str(fnew4).endswith("/a.txt"))
540 # Join a resource path.
541 subpath = ResourcePath("a/b.txt#fragment2", forceAbsolute=False, forceDirectory=False)
542 fnew3 = root.join(subpath)
543 self.assertEqual(fnew3.fragment, "fragment2")
544 self.assertEqual(fnew3.basename(), "b.txt", msg=f"Got: {fnew3._uri}")
546 # Quoted string with fragment.
547 quote_example = "hsc/payload/b&c.t@x#t"
548 needs_quote = root.join(quote_example)
549 self.assertEqual(needs_quote.unquoted_path, "/" + quote_example[:-2])
550 self.assertEqual(needs_quote.fragment, "t")
552 other = ResourcePath(f"{self.root}test.txt")
553 self.assertEqual(root.join(other), other)
554 self.assertEqual(other.join("b/new.txt").geturl(), f"{self.root}test.txt/b/new.txt")
556 other = ResourcePath(f"{self.root}text.txt", forceDirectory=False)
557 with self.assertRaises(ValueError):
558 other.join("b/new.text")
560 joined = ResourcePath(f"{self.root}hsc/payload/").join(
561 ResourcePath("test.qgraph", forceAbsolute=False)
562 )
563 self.assertEqual(joined, ResourcePath(f"{self.root}hsc/payload/test.qgraph"))
565 qgraph = ResourcePath("test.qgraph") # Absolute URI
566 joined = ResourcePath(f"{self.root}hsc/payload/").join(qgraph)
567 self.assertEqual(joined, qgraph)
569 with self.assertRaises(ValueError):
570 root.join("dir/", forceDirectory=False)
572 temp = root.join("dir2/", isTemporary=True)
573 with self.assertRaises(RuntimeError):
574 temp.join("test.txt", isTemporary=False)
576 rel = ResourcePath("new.txt", forceAbsolute=False, forceDirectory=False)
577 with self.assertRaises(RuntimeError):
578 root.join(rel, forceDirectory=True)
580 def test_quoting(self) -> None:
581 """Check that quoting works."""
582 parent = ResourcePath(self._make_uri("rootdir"), forceDirectory=True)
583 subpath = "rootdir/dir1+/file?.txt"
584 child = ResourcePath(self._make_uri(urllib.parse.quote(subpath)))
586 self.assertEqual(child.relative_to(parent), "dir1+/file?.txt")
587 self.assertEqual(child.basename(), "file?.txt")
588 self.assertEqual(child.relativeToPathRoot, subpath)
589 self.assertIn("%", child.path)
590 self.assertEqual(child.unquoted_path, "/" + subpath)
592 def test_ordering(self) -> None:
593 """Check that greater/less comparison operators work."""
594 a = self._make_uri("a.txt")
595 b = self._make_uri("b/")
596 self.assertLess(a, b)
597 self.assertFalse(a < a)
598 self.assertLessEqual(a, b)
599 self.assertLessEqual(a, a)
600 self.assertGreater(b, a)
601 self.assertFalse(b > b)
602 self.assertGreaterEqual(b, a)
603 self.assertGreaterEqual(b, b)
606class GenericReadWriteTestCase(_GenericTestCase):
607 """Test schemes that can read and write using concrete resources."""
609 transfer_modes: tuple[str, ...] = ("copy", "move")
610 testdir: str | None = None
611 # Number of files to use for mremove() testing to ensure difference code
612 # paths are hit. Do not want to generically use many files for schemes
613 # where it makes no difference.
614 n_mremove_files: int = 15
616 def setUp(self) -> None:
617 if self.scheme is None:
618 raise unittest.SkipTest("No scheme defined")
619 self.root = self._make_uri("")
620 self.root_uri = ResourcePath(self.root, forceDirectory=True, forceAbsolute=False)
622 if self.scheme == "file":
623 # Use a local tempdir because on macOS the temp dirs use symlinks
624 # so relsymlink gets quite confused.
625 self.tmpdir = ResourcePath(makeTestTempDir(self.testdir), forceDirectory=True)
626 else:
627 # Create random tmp directory relative to the test root.
628 self.tmpdir = self.root_uri.join(
629 "TESTING-" + "".join(random.choices(string.ascii_lowercase + string.digits, k=8)),
630 forceDirectory=True,
631 )
632 self.tmpdir.mkdir()
634 def tearDown(self) -> None:
635 if self.tmpdir and self.tmpdir.isLocal:
636 removeTestTempDir(self.tmpdir.ospath)
638 def test_file(self) -> None:
639 uri = self.tmpdir.join("test.txt")
640 self.assertFalse(uri.exists(), f"{uri} should not exist")
641 self.assertTrue(uri.path.endswith("test.txt"))
643 content = "abcdefghijklmnopqrstuv\n"
644 uri.write(content.encode())
645 self.assertTrue(uri.exists(), f"{uri} should now exist")
646 self.assertEqual(uri.read().decode(), content)
647 self.assertEqual(uri.size(), len(content.encode()))
649 with self.assertRaises(FileExistsError):
650 uri.write(b"", overwrite=False)
652 # Not all backends can tell if a remove fails so we can not
653 # test that a remove of a non-existent entry is guaranteed to raise.
654 uri.remove()
655 self.assertFalse(uri.exists())
657 # Ideally the test would remove the file again and raise a
658 # FileNotFoundError. This is not reliable for remote resources
659 # and doing an explicit check before trying to remove the resource
660 # just to raise an exception is deemed an unacceptable overhead.
662 with self.assertRaises(FileNotFoundError):
663 uri.read()
665 with self.assertRaises(FileNotFoundError):
666 self.tmpdir.join("file/not/there.txt").size()
668 # Check that creating a URI from a URI returns the same thing
669 uri2 = ResourcePath(uri)
670 self.assertEqual(uri, uri2)
671 self.assertEqual(id(uri), id(uri2))
673 def test_get_info_generic(self) -> None:
674 """Test generic get_info properties."""
675 now = datetime.datetime.now(tz=datetime.UTC)
676 uri = self.tmpdir.join("test.txt")
678 with self.assertRaises(FileNotFoundError):
679 uri.get_info()
681 content = "abcdefghijklmnopqrstuv\n"
682 uri.write(content.encode())
684 info = uri.get_info()
685 self.assertTrue(info.is_file)
686 self.assertEqual(info.size, len(content))
687 assert info.last_modified is not None
688 self.assertGreaterEqual(info.last_modified.timestamp(), now.timestamp() - 1.0)
689 self.assertIsInstance(info.checksums, dict) # Checksums are backend dependent.
691 for dir_uri in (uri.parent(), uri.root_uri()):
692 # File URIs can return values for modification dates for
693 # directories. S3 URIs can return checksums for directories.
694 dirinfo = dir_uri.get_info()
695 self.assertEqual(dirinfo.uri, str(dir_uri))
696 self.assertFalse(dirinfo.is_file)
697 self.assertEqual(dirinfo.size, 0)
699 newdir = self.tmpdir.join("newdir/", forceDirectory=True)
700 with self.assertRaises(FileNotFoundError):
701 newdir.get_info()
703 def test_mkdir(self) -> None:
704 newdir = self.tmpdir.join("newdir/seconddir", forceDirectory=True)
705 newdir.mkdir()
706 self.assertTrue(newdir.exists())
707 self.assertEqual(newdir.size(), 0)
709 newfile = newdir.join("temp.txt")
710 newfile.write(b"Data")
711 self.assertTrue(newfile.exists())
713 file = self.tmpdir.join("file.txt")
714 # Some schemes will realize that the URI is not a file and so
715 # will raise NotADirectoryError. The file scheme is more permissive
716 # and lets you write anything but will raise NotADirectoryError
717 # if a non-directory is already there. We therefore write something
718 # to the file to ensure that we trigger a portable exception.
719 file.write(b"")
720 with self.assertRaises(NotADirectoryError):
721 file.mkdir()
723 # The root should exist.
724 self.root_uri.mkdir()
725 self.assertTrue(self.root_uri.exists())
727 def test_transfer(self) -> None:
728 src = self.tmpdir.join("test.txt")
729 content = "Content is some content\nwith something to say\n\n"
730 src.write(content.encode())
732 can_move = "move" in self.transfer_modes
733 for mode in self.transfer_modes:
734 if mode == "move":
735 continue
737 dest = self.tmpdir.join(f"dest_{mode}.txt")
738 # Ensure that we get some debugging output.
739 with self.assertLogs("lsst.resources", level=logging.DEBUG) as cm:
740 dest.transfer_from(src, transfer=mode)
741 self.assertIn("Transferring ", "\n".join(cm.output))
742 self.assertTrue(dest.exists(), f"Check that {dest} exists (transfer={mode})")
744 new_content = dest.read().decode()
745 self.assertEqual(new_content, content)
747 if mode in ("symlink", "relsymlink"):
748 self.assertTrue(os.path.islink(dest.ospath), f"Check that {dest} is symlink")
750 # If the source and destination are hardlinks of each other
751 # the transfer should work even if overwrite=False.
752 if mode in ("link", "hardlink"):
753 dest.transfer_from(src, transfer=mode)
754 else:
755 with self.assertRaises(
756 FileExistsError, msg=f"Overwrite of {dest} should not be allowed ({mode})"
757 ):
758 dest.transfer_from(src, transfer=mode)
760 # Transfer again and overwrite.
761 dest.transfer_from(src, transfer=mode, overwrite=True)
763 dest.remove()
765 b = src.read()
766 self.assertEqual(b.decode(), new_content)
768 nbytes = 10
769 subset = src.read(size=nbytes)
770 self.assertEqual(len(subset), nbytes)
771 self.assertEqual(subset.decode(), content[:nbytes])
773 # Transferring to self should be okay.
774 src.transfer_from(src, "auto")
776 with self.assertRaises(ValueError):
777 src.transfer_from(src, transfer="unknown")
779 # A move transfer is special.
780 if can_move:
781 dest.transfer_from(src, transfer="move")
782 self.assertFalse(src.exists())
783 self.assertTrue(dest.exists())
784 else:
785 src.remove()
787 dest.remove()
788 with self.assertRaises(FileNotFoundError):
789 dest.transfer_from(src, "auto")
791 def test_mtransfer(self) -> None:
792 n_files = 10
793 sources = [self.tmpdir.join(f"test{n}.txt") for n in range(n_files)]
794 destinations = [self.tmpdir.join(f"dest_test{n}.txt") for n in range(n_files)]
796 for i, src in enumerate(sources):
797 content = f"{i}\nContent is some content\nwith something to say\n\n"
798 src.write(content.encode())
800 results = ResourcePath.mtransfer("copy", zip(sources, destinations, strict=True))
801 self.assertTrue(all(res.success for res in results.values()))
802 self.assertTrue(all(dest.exists() for dest in results))
804 for i, dest in enumerate(destinations):
805 new_content = dest.read().decode()
806 self.assertTrue(new_content.startswith(f"{i}\n"))
808 # Overwrite should work.
809 results = ResourcePath.mtransfer("copy", zip(sources, destinations, strict=True), overwrite=True)
811 # Overwrite failure.
812 results = ResourcePath.mtransfer(
813 "copy", zip(sources, destinations, strict=True), overwrite=False, do_raise=False
814 )
815 self.assertFalse(all(res.success for res in results.values()))
817 with self.assertRaises(ExceptionGroup):
818 results = ResourcePath.mtransfer(
819 "copy", zip(sources, destinations, strict=True), overwrite=False, do_raise=True
820 )
822 def test_local_transfer(self) -> None:
823 """Test we can transfer to and from local file."""
824 remote_src = self.tmpdir.join("src.json")
825 remote_src.write(b"42")
826 remote_dest = self.tmpdir.join("dest.json")
828 with ResourcePath.temporary_uri(suffix=".json") as tmp:
829 self.assertTrue(tmp.isLocal)
830 tmp.transfer_from(remote_src, transfer="auto")
831 self.assertEqual(tmp.read(), remote_src.read())
833 remote_dest.transfer_from(tmp, transfer="auto")
834 self.assertEqual(remote_dest.read(), tmp.read())
836 # Temporary (possibly remote) resource.
837 # Transfers between temporary resources.
838 with (
839 ResourcePath.temporary_uri(prefix=self.tmpdir.join("tmp"), suffix=".json") as remote_tmp,
840 ResourcePath.temporary_uri(suffix=".json") as local_tmp,
841 ):
842 remote_tmp.write(b"42")
843 if not remote_tmp.isLocal:
844 for transfer in ("link", "symlink", "hardlink", "relsymlink"):
845 with self.assertRaises(RuntimeError):
846 # Trying to symlink a remote resource is not going
847 # to work. A hardlink could work but would rely
848 # on the local temp space being on the same
849 # filesystem as the target.
850 local_tmp.transfer_from(remote_tmp, transfer)
851 local_tmp.transfer_from(remote_tmp, "move")
852 self.assertFalse(remote_tmp.exists())
853 remote_tmp.transfer_from(local_tmp, "auto", overwrite=True)
854 self.assertEqual(local_tmp.read(), remote_tmp.read())
856 # Transfer of missing remote.
857 remote_tmp.remove()
858 with self.assertRaises(FileNotFoundError):
859 local_tmp.transfer_from(remote_tmp, "auto", overwrite=True)
861 def test_local(self) -> None:
862 """Check that remote resources can be made local."""
863 src = self.tmpdir.join("test.txt")
864 original_content = "Content is some content\nwith something to say\n\n"
865 src.write(original_content.encode())
867 # Run this twice to ensure use of cache in code coverage
868 # if applicable.
869 for _ in (1, 2):
870 with src.as_local() as local_uri:
871 self.assertTrue(local_uri.isLocal)
872 content = local_uri.read().decode()
873 self.assertEqual(content, original_content)
875 if src.isLocal:
876 self.assertEqual(src, local_uri)
878 with self.assertRaises(IsADirectoryError):
879 with self.root_uri.as_local() as local_uri:
880 pass
882 if not src.isLocal:
883 # as_local tmpdir can not be a remote resource.
884 with self.assertRaises(ValueError):
885 with src.as_local(tmpdir=self.root_uri) as local_uri:
886 pass
888 # tmpdir is ignored for local file.
889 with tempfile.TemporaryDirectory() as tmpdir:
890 temp_dir = ResourcePath(tmpdir, forceDirectory=True)
891 with src.as_local(tmpdir=temp_dir) as local_uri:
892 self.assertEqual(local_uri.dirname(), temp_dir)
893 self.assertTrue(local_uri.exists())
895 def test_local_mtransfer(self) -> None:
896 """Check that bulk transfer to/from local works."""
897 # Create remote resources
898 n_files = 10
899 sources = [self.tmpdir.join(f"test{n}.txt") for n in range(n_files)]
901 for i, src in enumerate(sources):
902 content = f"{i}\nContent is some content\nwith something to say\n\n"
903 src.write(content.encode())
905 # Potentially remote to local.
906 with tempfile.TemporaryDirectory() as tmpdir:
907 temp_dir = ResourcePath(tmpdir, forceDirectory=True)
908 destinations = [temp_dir.join(f"dest_test{n}.txt") for n in range(n_files)]
910 results = ResourcePath.mtransfer("copy", zip(sources, destinations, strict=True))
911 self.assertTrue(all(res.success for res in results.values()))
912 self.assertTrue(all(dest.exists() for dest in results))
914 # Overwrite should work.
915 results = ResourcePath.mtransfer("copy", zip(sources, destinations, strict=True), overwrite=True)
917 # Now reverse so local to potentially remote.
918 for src in sources:
919 src.remove()
920 results = ResourcePath.mtransfer("copy", zip(destinations, sources, strict=True), overwrite=False)
921 self.assertTrue(all(res.success for res in results.values()))
922 self.assertTrue(all(dest.exists() for dest in results))
924 def test_walk(self) -> None:
925 """Walk a directory hierarchy."""
926 root = self.tmpdir.join("walk/")
928 # Look for a file that is not there
929 file = root.join("config/basic/butler.yaml")
930 found_list = list(ResourcePath.findFileResources([file]))
931 self.assertEqual(found_list[0], file)
933 # First create the files (content is irrelevant).
934 expected_files = {
935 "dir1/a.yaml",
936 "dir1/b.yaml",
937 "dir1/c.json",
938 "dir2/d.json",
939 "dir2/e.yaml",
940 }
941 expected_uris = {root.join(f) for f in expected_files}
942 for uri in expected_uris:
943 uri.write(b"")
944 self.assertTrue(uri.exists())
946 # Look for the files.
947 found = set(ResourcePath.findFileResources([root]))
948 self.assertEqual(found, expected_uris)
950 # Now solely the YAML files.
951 expected_yaml = {u for u in expected_uris if u.getExtension() == ".yaml"}
952 found = set(ResourcePath.findFileResources([root], file_filter=r".*\.yaml$"))
953 self.assertEqual(found, expected_yaml)
955 # Now two explicit directories and a file
956 expected = set(expected_yaml)
957 expected.add(file)
959 found = set(
960 ResourcePath.findFileResources(
961 [file, root.join("dir1/"), root.join("dir2/")],
962 file_filter=r".*\.yaml$",
963 )
964 )
965 self.assertEqual(found, expected)
967 # Group by directory -- find everything and compare it with what
968 # we expected to be there in total.
969 found_yaml = set()
970 counter = 0
971 for uris in ResourcePath.findFileResources([file, root], file_filter=r".*\.yaml$", grouped=True):
972 assert not isinstance(uris, ResourcePath) # for mypy.
973 found_uris = set(uris)
974 if found_uris:
975 counter += 1
977 found_yaml.update(found_uris)
979 expected_yaml_2 = expected_yaml
980 expected_yaml_2.add(file)
981 self.assertEqual(found_yaml, expected_yaml)
982 self.assertEqual(counter, 3)
984 # Grouping but check that single files are returned in a single group
985 # at the end
986 file2 = root.join("config/templates/templates-bad.yaml")
987 found_grouped = [
988 list(group)
989 for group in ResourcePath.findFileResources([file, file2, root.join("dir2/")], grouped=True)
990 if not isinstance(group, ResourcePath) # For mypy.
991 ]
992 self.assertEqual(len(found_grouped), 2, f"Found: {list(found_grouped)}")
993 self.assertEqual(list(found_grouped[1]), [file, file2])
995 with self.assertRaises(ValueError):
996 # The list forces the generator to run.
997 list(file.walk())
999 # A directory that does not exist returns nothing.
1000 self.assertEqual(list(root.join("dir3/").walk()), [])
1002 def test_large_walk(self) -> None:
1003 # In some systems pagination is used so ensure that we can handle
1004 # large numbers of files. For example S3 limits us to 1000 responses
1005 # per listing call.
1006 created = set()
1007 counter = 1
1008 n_dir1 = 1100
1009 root = self.tmpdir.join("large_walk", forceDirectory=True)
1010 while counter <= n_dir1:
1011 new = ResourcePath(root.join(f"file{counter:04d}.txt"))
1012 new.write(f"{counter}".encode())
1013 created.add(new)
1014 counter += 1
1015 counter = 1
1016 # Put some in a subdirectory to make sure we are looking in a
1017 # hierarchy.
1018 n_dir2 = 100
1019 subdir = root.join("subdir", forceDirectory=True)
1020 while counter <= n_dir2:
1021 new = ResourcePath(subdir.join(f"file{counter:04d}.txt"))
1022 new.write(f"{counter}".encode())
1023 created.add(new)
1024 counter += 1
1026 found = set(ResourcePath.findFileResources([root]))
1027 self.assertEqual(len(found), n_dir1 + n_dir2)
1028 self.assertEqual(found, created)
1030 # Again with grouping.
1031 # (mypy gets upset not knowing which of the two options is being
1032 # returned so add useless instance check).
1033 found_list = [
1034 list(group)
1035 for group in ResourcePath.findFileResources([root], grouped=True)
1036 if not isinstance(group, ResourcePath) # For mypy.
1037 ]
1038 self.assertEqual(len(found_list), 2)
1039 self.assertEqual(len(found_list[0]), n_dir1)
1040 self.assertEqual(len(found_list[1]), n_dir2)
1042 def test_temporary(self) -> None:
1043 prefix = self.tmpdir.join("tmp", forceDirectory=True)
1044 with ResourcePath.temporary_uri(prefix=prefix, suffix=".json") as tmp:
1045 self.assertEqual(tmp.getExtension(), ".json", f"uri: {tmp}")
1046 self.assertTrue(tmp.isabs(), f"uri: {tmp}")
1047 self.assertFalse(tmp.exists(), f"uri: {tmp}")
1048 tmp.write(b"abcd")
1049 self.assertTrue(tmp.exists(), f"uri: {tmp}")
1050 self.assertTrue(tmp.isTemporary)
1051 self.assertFalse(tmp.exists(), f"uri: {tmp}")
1053 tmpdir = ResourcePath(self.tmpdir, forceDirectory=True)
1054 with ResourcePath.temporary_uri(prefix=tmpdir) as tmp:
1055 # Use a specified tmpdir and check it is okay for the file
1056 # to not be created.
1057 self.assertFalse(tmp.getExtension())
1058 self.assertFalse(tmp.exists(), f"uri: {tmp}")
1059 self.assertEqual(tmp.scheme, self.scheme)
1060 self.assertTrue(tmp.isTemporary)
1061 self.assertTrue(tmpdir.exists(), f"uri: {tmpdir} still exists")
1063 # Fake a directory suffix.
1064 with self.assertRaises(NotImplementedError):
1065 with ResourcePath.temporary_uri(prefix=self.root_uri, suffix="xxx/") as tmp:
1066 pass
1068 @unittest.skipIf(fsspec is None, "fsspec is not available.")
1069 def test_fsspec(self) -> None:
1070 """Simple read of a file."""
1071 uri = self.tmpdir.join("test.txt")
1072 self.assertFalse(uri.exists(), f"{uri} should not exist")
1073 self.assertTrue(uri.path.endswith("test.txt"))
1075 content = "abcdefghijklmnopqrstuv\n"
1076 uri.write(content.encode())
1078 try:
1079 fs, path = uri.to_fsspec()
1080 except NotImplementedError as e:
1081 raise unittest.SkipTest(str(e)) from e
1082 except ImportError as e:
1083 # HttpResourcePath.to_fsspec() raises if support
1084 # of fsspec for webDAV back ends is disabled.
1085 raise unittest.SkipTest(str(e)) from e
1086 with fs.open(path, "r") as fd:
1087 as_read = fd.read()
1088 self.assertEqual(as_read, content)
1090 def test_open(self) -> None:
1091 tmpdir = ResourcePath(self.tmpdir, forceDirectory=True)
1092 with ResourcePath.temporary_uri(prefix=tmpdir, suffix=".txt") as tmp:
1093 _check_open(self, tmp, mode_suffixes=("", "t"))
1094 _check_open(self, tmp, mode_suffixes=("t",), encoding="utf-16")
1095 _check_open(self, tmp, mode_suffixes=("t",), prefer_file_temporary=True)
1096 _check_open(self, tmp, mode_suffixes=("t",), encoding="utf-16", prefer_file_temporary=True)
1097 with ResourcePath.temporary_uri(prefix=tmpdir, suffix=".dat") as tmp:
1098 _check_open(self, tmp, mode_suffixes=("b",))
1099 _check_open(self, tmp, mode_suffixes=("b",), prefer_file_temporary=True)
1101 with self.assertRaises(IsADirectoryError):
1102 with self.root_uri.open():
1103 pass
1105 def test_mexists(self) -> None:
1106 root = self.tmpdir.join("mexists/")
1108 # A file that is not there.
1109 file = root.join("config/basic/butler.yaml")
1111 # Create some files. Most schemes the code paths do not change for 10
1112 # vs 1000 files but in some schemes it does.
1113 expected_files = [f"dir1/f{n}.yaml" for n in range(self.n_mremove_files)]
1114 expected_uris = [root.join(f) for f in expected_files]
1115 for uri in expected_uris:
1116 uri.write(b"")
1117 self.assertTrue(uri.exists())
1118 expected_uris.append(file)
1120 # Force to run with fewer workers than there are files.
1121 multi = ResourcePath.mexists(expected_uris, num_workers=3)
1123 for uri, is_there in multi.items():
1124 if uri == file:
1125 self.assertFalse(is_there)
1126 else:
1127 self.assertTrue(is_there)
1129 # Clean up. Unfortunately POSIX raises a FileNotFoundError but
1130 # S3 boto does not complain if there is no key.
1131 ResourcePath.mremove(expected_uris, do_raise=False)
1133 # Check they were really removed.
1134 multi = ResourcePath.mexists(expected_uris, num_workers=3)
1135 for uri, is_there in multi.items():
1136 self.assertFalse(is_there)
1138 # Clean up a subset of files that are already gone, but this can
1139 # trigger a different code path.
1140 ResourcePath.mremove(expected_uris[:5], do_raise=False)