Coverage for tests / test_dav.py: 14%
617 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:32 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:32 +0000
1# This file is part of lsst-resources.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12import concurrent
13import datetime
14import hashlib
15import io
16import os.path
17import random
18import shutil
19import socket
20import stat
21import string
22import tempfile
23import time
24import unittest
25import zlib
26from collections.abc import Callable
27from threading import Thread
28from typing import cast
29from zipfile import ZipFile, ZipInfo
31try:
32 from cheroot import wsgi
33 from wsgidav.wsgidav_app import WsgiDAVApp
34except ImportError:
35 WsgiDAVApp = None
37try:
38 import fsspec
39except ImportError:
40 fsspec = None
41 AbstractFileSystem = type
43from lsst.resources import ResourceInfo, ResourcePath
44from lsst.resources._resourceHandles._davResourceHandle import (
45 DavReadResourceHandle,
46)
47from lsst.resources.dav import (
48 DavResourcePathConfig,
49 dav_globals,
50)
51from lsst.resources.davutils import (
52 DavConfig,
53 DavConfigPool,
54 TokenAuthorizer,
55)
56from lsst.resources.tests import GenericReadWriteTestCase, GenericTestCase
57from lsst.resources.utils import get_tempdir, makeTestTempDir, removeTestTempDir
59TESTDIR = os.path.abspath(os.path.dirname(__file__))
62class GenericDavTestCase(GenericTestCase, unittest.TestCase):
63 """Generic tests of dav URIs."""
65 scheme = "dav"
66 netloc = "host.example.org"
68 def test_dav_root_uri(self):
69 root_uri_test_cases = {
70 # input : expected
71 "dav://host.example.org": "dav://host.example.org/",
72 "dav://host.example.org/some/path": "dav://host.example.org/",
73 "dav://host.example.org:12345": "dav://host.example.org:12345/",
74 "dav://host.example.org:12345/some/path": "dav://host.example.org:12345/",
75 "dav://user:pwd@host.example.org": "dav://user:pwd@host.example.org/",
76 "dav://user:pwd@host.example.org/some/path": "dav://user:pwd@host.example.org/",
77 "dav://user:pwd@host.example.org:12345/some/path": "dav://user:pwd@host.example.org:12345/",
78 "dav://user:pwd@host.example.org/some/path#fragment": "dav://user:pwd@host.example.org/",
79 "dav://user:pwd@host.example.org/some/path?param=value": "dav://user:pwd@host.example.org/",
80 "dav://user:pwd@host.example.org/some/path;parameters": "dav://user:pwd@host.example.org/",
81 }
82 for path, expected in root_uri_test_cases.items():
83 self.assertEqual(ResourcePath(expected), ResourcePath(path).root_uri())
85 clean_path_test_cases = {
86 # input : expected
87 "dav://host.example.org/": "dav://host.example.org/",
88 "dav://host.example.org/some/path": "dav://host.example.org/some/path",
89 "dav://host.example.org////some/path///": "dav://host.example.org/some/path/",
90 "dav://host.example.org/some/path///": "dav://host.example.org/some/path/",
91 "dav://host.example.org/some/./././path": "dav://host.example.org/some/path",
92 "dav://host.example.org/a/b/c/d/../../": "dav://host.example.org/a/b/",
93 "dav://host.example.org/a/b/c/d/../../../../../../": "dav://host.example.org/",
94 }
95 for path, expected in clean_path_test_cases.items():
96 self.assertEqual(ResourcePath(path).geturl(), expected)
99class DavReadWriteTestCase(GenericReadWriteTestCase, unittest.TestCase):
100 """Test with a real webDAV server, as opposed to mocking responses."""
102 scheme = "dav"
103 local_files_to_remove: list[str] = []
104 MEGABYTE: int = 1024 * 1024
106 @classmethod
107 def setUpClass(cls):
108 cls.webdav_tmpdir = tempfile.mkdtemp(prefix="webdav-server-test-")
109 cls.server_thread = None
111 # Reinitialize globals.
112 dav_globals._reset()
114 # Should we test against a running server?
115 #
116 # This is convenient for testing against real servers in the
117 # developer environment by initializing the environment variable
118 # LSST_RESOURCES_DAV_TEST_SERVER_URL with the URL of the server, e.g.
119 # dav://host.example.org:1234/path/to/top/dir
120 if (test_endpoint := os.getenv("LSST_RESOURCES_DAV_TEST_SERVER_URL")) is not None:
121 # Run this test case against the specified server.
122 uri = ResourcePath(test_endpoint)
123 cls.scheme = uri.scheme
124 cls.netloc = uri.netloc
125 cls.base_path = uri.path
126 elif WsgiDAVApp is not None:
127 # WsgiDAVApp is available, launch a local server in its own
128 # thread to expose a local temporary directory and run this
129 # test case against it.
130 cls.port_number = cls._get_port_number()
131 cls.stop_webdav_server = False
132 cls.server_thread = Thread(
133 target=cls._serve_webdav,
134 args=(cls, cls.webdav_tmpdir, cls.port_number, lambda: cls.stop_webdav_server),
135 daemon=True,
136 )
137 cls.server_thread.start()
139 # Wait for it to start
140 time.sleep(1)
142 # Initialize the server endpoint
143 cls.netloc = f"127.0.0.1:{cls.port_number}"
144 else:
145 cls.skipTest(
146 cls,
147 "neither WsgiDAVApp is available nor a webDAV test endpoint is configured to test against",
148 )
150 @classmethod
151 def tearDownClass(cls):
152 # Stop the WsgiDAVApp server, if any
153 if WsgiDAVApp is not None:
154 # Shut down of the webdav server and wait for the thread to exit
155 cls.stop_webdav_server = True
156 if cls.server_thread is not None:
157 cls.server_thread.join()
159 # Remove local temporary files
160 for file in cls.local_files_to_remove:
161 if os.path.exists(file):
162 os.remove(file)
164 # Remove temp dir
165 if cls.webdav_tmpdir:
166 shutil.rmtree(cls.webdav_tmpdir, ignore_errors=True)
168 def tearDown(self):
169 if self.tmpdir:
170 self.tmpdir.remove_dir(recursive=True)
172 super().tearDown()
174 def test_dav_file_handle(self):
175 # Upload a new file with known contents.
176 contents = "These are some \n bytes to read"
177 remote_file = self.tmpdir.join(self._get_file_name())
178 self.assertIsNone(remote_file.write(data=contents, overwrite=True))
180 # Test that the correct handle is returned.
181 with remote_file.open("rb") as handle:
182 self.assertIsInstance(handle, DavReadResourceHandle)
184 # Test reading byte ranges works
185 with remote_file.open("rb") as handle:
186 sub_contents = contents[:10]
187 handle = cast(DavReadResourceHandle, handle)
188 result = handle.read(len(sub_contents)).decode()
189 self.assertEqual(result, sub_contents)
191 # Verify the position.
192 self.assertEqual(handle.tell(), len(sub_contents))
194 # Jump back to the beginning and test if reading the whole file
195 # prompts the internal buffer to be read.
196 handle.seek(0)
197 self.assertEqual(handle.tell(), 0)
198 result = handle.read().decode()
199 self.assertEqual(result, contents)
201 # Check that flush works on read-only handle.
202 handle.flush()
204 # Verify reading as a string handle works as expected.
205 with remote_file.open("r") as handle:
206 self.assertIsInstance(handle, io.TextIOWrapper)
208 handle = cast(io.TextIOWrapper, handle)
209 self.assertIsInstance(handle.buffer, DavReadResourceHandle)
211 # Check if string methods work.
212 result = handle.read()
213 self.assertEqual(result, contents)
215 # Check that flush works on read-only handle.
216 handle.flush()
218 # Verify that write modes invoke the default base method
219 with remote_file.open("w") as handle:
220 self.assertIsInstance(handle, io.StringIO)
222 def test_dav_mkdir(self):
223 # Check creation and deletion of an empty directory
224 subdir = self.tmpdir.join(self._get_dir_name(), forceDirectory=True)
225 self.assertIsNone(subdir.mkdir())
226 self.assertTrue(subdir.exists())
227 self.assertTrue(subdir.isdir())
228 self.assertEqual(subdir.size(), 0)
230 # Creating an existing remote directory must succeed
231 self.assertIsNone(subdir.mkdir())
233 # Deleting an existing directory must succeed
234 self.assertIsNone(subdir.remove())
236 # Deleting a non-existing directory must succeed
237 subir_not_exists = self.tmpdir.join(self._get_dir_name(), forceDirectory=True)
238 self.assertIsNone(subir_not_exists.remove())
240 # Attempting to create a directory at a path where a file exists
241 # must raise
242 file = self.tmpdir.join(self._get_file_name(), forceDirectory=False)
243 file.write(data=None, overwrite=True)
244 self.assertTrue(file.exists())
246 existing_file = self.tmpdir.join(file.basename(), forceDirectory=True)
247 with self.assertRaises(NotADirectoryError):
248 self.assertIsNone(existing_file.mkdir())
250 # mkdir must create all missing ancestors
251 subsubdir = subdir.join("a/b/c/d/e", forceDirectory=True)
252 self.assertIsNone(subsubdir.mkdir())
253 self.assertTrue(subsubdir.exists())
255 def test_dav_upload_download(self):
256 # Test upload a randomly-generated file via write() with and without
257 # overwrite
258 local_file, file_size = self._generate_file()
259 with open(local_file, "rb") as f:
260 data = f.read()
262 remote_file = self.tmpdir.join(self._get_file_name())
263 self.assertIsNone(remote_file.write(data, overwrite=True))
264 self.assertTrue(remote_file.exists())
265 self.assertEqual(remote_file.size(), file_size)
267 # Write without overwrite must raise since target file exists
268 with self.assertRaises(FileExistsError):
269 remote_file.write(data, overwrite=False)
271 # Download the file we just uploaded. Compute and compare a digest of
272 # the uploaded and downloaded data and ensure they match
273 downloaded_data = remote_file.read()
274 self.assertEqual(len(downloaded_data), file_size)
275 upload_digest = self._compute_digest(data)
276 download_digest = self._compute_digest(downloaded_data)
277 self.assertEqual(upload_digest, download_digest)
278 os.remove(local_file)
280 def test_dav_as_local(self):
281 # Generate a file with random data and upload to a remote file.
282 original_file, original_file_size = self._generate_file()
283 original_digest = self._compute_digest_for_file(original_file)
285 with open(original_file, "rb") as file:
286 remote_file = self.tmpdir.join(self._get_file_name())
287 self.assertIsNone(remote_file.write(data=file, overwrite=True))
288 self.assertTrue(remote_file.exists())
289 remote_file_size = remote_file.size()
290 self.assertEqual(remote_file_size, original_file_size)
292 # Download the remote file to a temporary local file, check that
293 # the sizes and contents of the original file and the downloaded files
294 # match.
295 with remote_file._as_local() as local_uri:
296 self.assertTrue(local_uri.isTemporary)
297 self.assertTrue(os.path.exists(local_uri.ospath))
298 self.assertTrue(os.stat(local_uri.ospath).st_size, remote_file_size)
299 self.assertEqual(original_digest, self._compute_digest(local_uri.read()))
301 self.assertFalse(local_uri.exists())
303 def test_dav_size(self):
304 # Retrieving the size of a non-existent file must raise.
305 remote_file = self.tmpdir.join(self._get_file_name())
306 with self.assertRaises(FileNotFoundError):
307 remote_file.size()
309 # The size of a directory using a file-like path must be zero
310 remote_dir = self.tmpdir.join(self._get_dir_name(), forceDirectory=True)
311 self.assertIsNone(remote_dir.mkdir())
312 self.assertTrue(remote_dir.exists())
313 self.assertEqual(remote_dir.size(), 0)
315 dir_as_file = ResourcePath(remote_dir.geturl().rstrip("/"), forceDirectory=False)
316 self.assertEqual(dir_as_file.size(), 0)
318 def test_dav_upload_creates_dir(self):
319 # Uploading a file to a non existing directory must ensure its
320 # parent directories are automatically created and upload succeeds
321 non_existing_dir = self.tmpdir.join(self._get_dir_name(), forceDirectory=True)
322 non_existing_dir = non_existing_dir.join(self._get_dir_name(), forceDirectory=True)
323 non_existing_dir = non_existing_dir.join(self._get_dir_name(), forceDirectory=True)
324 remote_file = non_existing_dir.join(self._get_file_name())
326 local_file, file_size = self._generate_file()
327 with open(local_file, "rb") as f:
328 data = f.read()
329 self.assertIsNone(remote_file.write(data, overwrite=True))
331 self.assertTrue(remote_file.exists())
332 self.assertEqual(remote_file.size(), file_size)
333 self.assertTrue(remote_file.parent().exists())
335 downloaded_data = remote_file.read()
336 upload_digest = self._compute_digest(data)
337 download_digest = self._compute_digest(downloaded_data)
338 self.assertEqual(upload_digest, download_digest)
340 def test_dav_transfer_from(self):
341 # Transfer from local file via "copy", with and without overwrite
342 depth = random.randint(1, 5)
343 remote_file = self.tmpdir.join(self._get_file_name(depth))
344 local_file, _ = self._generate_file()
345 source_file = ResourcePath(local_file)
346 self.assertIsNone(remote_file.transfer_from(source_file, transfer="copy", overwrite=True))
347 self.assertTrue(remote_file.exists())
348 self.assertEqual(remote_file.size(), source_file.size())
349 with self.assertRaises(FileExistsError):
350 remote_file.transfer_from(ResourcePath(local_file), transfer="copy", overwrite=False)
352 # Transfer from remote file via "copy", with and without overwrite
353 source_file = remote_file
354 target_file = self.tmpdir.join(self._get_file_name(depth=random.randint(1, 5)))
355 self.assertIsNone(target_file.transfer_from(source_file, transfer="copy", overwrite=True))
356 self.assertTrue(target_file.exists())
357 self.assertEqual(target_file.size(), source_file.size())
359 # Transfer without overwrite must raise since target resource exists
360 with self.assertRaises(FileExistsError):
361 target_file.transfer_from(source_file, transfer="copy", overwrite=False)
363 # Test transfer from local file via "move", with and without overwrite
364 source_file = ResourcePath(local_file)
365 source_size = source_file.size()
366 target_file = self.tmpdir.join(self._get_file_name())
367 self.assertIsNone(target_file.transfer_from(source_file, transfer="move", overwrite=True))
368 self.assertTrue(target_file.exists())
369 self.assertEqual(target_file.size(), source_size)
370 self.assertFalse(source_file.exists())
372 # Test transfer without overwrite must raise since target resource
373 # exists
374 local_file, file_size = self._generate_file()
375 with self.assertRaises(FileExistsError):
376 source_file = ResourcePath(local_file)
377 target_file.transfer_from(source_file, transfer="move", overwrite=False)
379 # Test transfer from remote file via "move" with and without overwrite
380 # must succeed
381 source_file = target_file
382 source_size = source_file.size()
383 target_file = self.tmpdir.join(self._get_file_name(depth=random.randint(1, 5)))
384 self.assertIsNone(target_file.transfer_from(source_file, transfer="move", overwrite=True))
385 self.assertTrue(target_file.exists())
386 self.assertEqual(target_file.size(), source_size)
387 self.assertFalse(source_file.exists())
389 # Transfer without overwrite must raise since target resource exists
390 with self.assertRaises(FileExistsError):
391 source_file = ResourcePath(local_file)
392 target_file.transfer_from(source_file, transfer="move", overwrite=False)
394 def test_dav_handle(self):
395 # Resource handle must succeed
396 remote_file = self.tmpdir.join(self._get_file_name())
397 data = "abcdefghi"
398 self.assertIsNone(remote_file.write(data, overwrite=True))
399 with remote_file.open("rb") as handle:
400 handle.seek(1)
401 self.assertEqual(handle.read(4).decode("utf-8"), data[1:5])
403 # Ensure that reading the whole file content through a handle works
404 with remote_file.open("rb") as handle:
405 content = handle.read(-1)
406 self.assertEqual(remote_file.size(), len(content))
408 handle.seek(1)
409 self.assertEqual(handle.read(4).decode("utf-8"), data[1:5])
411 # Upload a multi-megabyte file and ensure a partial read succeeds
412 data = io.BytesIO(b"\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09" * self.MEGABYTE)
413 self.assertIsNone(remote_file.write(data, overwrite=True))
414 file_size: int = remote_file.size()
415 file_offset: int = random.randint(self.MEGABYTE, file_size)
416 bytes_to_read: int = random.randint(self.MEGABYTE // 2, self.MEGABYTE)
417 with remote_file.open("rb") as handle:
418 data.seek(file_offset)
419 handle.seek(file_offset)
420 self.assertEqual(handle.tell(), data.tell())
421 self.assertEqual(handle.read(bytes_to_read), data.read(bytes_to_read))
423 # Test readinto()
424 with remote_file.open("rb") as handle:
425 buffer = bytearray(random.randint(self.MEGABYTE, 2 * self.MEGABYTE))
426 offset = random.randint(file_size // 3, file_size // 2)
428 # Check the returned read count is as expected
429 handle.seek(offset)
430 count = handle.readinto(buffer)
431 self.assertEqual(count, len(buffer))
433 # Check the contents of the returned buffer is as expected
434 handle.seek(offset)
435 self.assertTrue(handle.read(count) == buffer)
437 def test_dav_repeated_write(self):
438 data = io.BytesIO(b"\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09" * self.MEGABYTE)
439 remote_file = self.tmpdir.join(self._get_file_name())
441 # Consecutive writes to the same file must succeed. It was noticed
442 # that XRootD server sometimes fail this operation with an error
443 # like:
444 #
445 # status 423 [Output file /path/to/file is already opened by
446 # 1 writer; open denied.]
447 self.assertIsNone(remote_file.write(data, overwrite=True))
448 self.assertIsNone(remote_file.write(data, overwrite=True))
449 self.assertIsNone(remote_file.write(data, overwrite=True))
451 def test_dav_remove(self):
452 # Deletion of an existing remote file must succeed
453 local_file, file_size = self._generate_file()
454 with open(local_file, "rb") as f:
455 remote_file = self.tmpdir.join(self._get_file_name())
456 self.assertIsNone(remote_file.write(f, overwrite=True))
458 self.assertTrue(remote_file.exists())
459 self.assertEqual(remote_file.size(), file_size)
460 self.assertIsNone(remote_file.remove())
462 # Deletion of a non-existing remote file must succeed
463 non_existing_file = self.tmpdir.join(self._get_file_name())
464 self.assertIsNone(non_existing_file.remove())
466 # Deletion of a non-empty remote directory must raise
467 subdir = self.tmpdir.join(self._get_dir_name(), forceDirectory=True)
468 self.assertIsNone(subdir.mkdir())
469 self.assertTrue(subdir.exists())
470 with open(local_file, "rb") as f:
471 remote_file = subdir.join("file_to_remove")
472 remote_file.write(f, overwrite=True)
473 self.assertTrue(remote_file.exists())
474 self.assertEqual(remote_file.size(), file_size)
476 with self.assertRaises(IsADirectoryError):
477 subdir.remove()
478 self.assertTrue(subdir.exists())
480 # Recursively removing a deep hierarchy of directories must succeed
481 top = subdir
482 for level in ["one", "two", "three", "four"]:
483 subdir = subdir.join(level, forceDirectory=True)
484 subdir.mkdir()
485 self.assertTrue(subdir.exists())
486 self.assertTrue(subdir.isdir())
488 top.remove_dir(recursive=True)
489 self.assertFalse(top.exists())
491 def test_dav_to_fsspec(self):
492 if fsspec is None:
493 self.skipTest("fsspec not available")
495 # Upload a randomly-generated file via write() with overwrite.
496 local_file, file_size = self._generate_file()
497 with open(local_file, "rb") as f:
498 data = f.read()
500 remote_file = self.tmpdir.join(self._get_file_name())
501 self.assertIsNone(remote_file.write(data, overwrite=True))
502 self.assertTrue(remote_file.exists())
503 self.assertEqual(remote_file.size(), file_size)
505 # Ensure that the contents of the remote file we just
506 # uploaded is identical to the contents of that file when
507 # retrieved via fsspec.open(), with and without a context manager.
508 fsys, path = remote_file.to_fsspec()
509 file = fsys.open(path)
510 self.assertEqual(data, file.read())
511 file.close()
513 with fsys.open(path) as file:
514 self.assertEqual(data, file.read())
516 # Ensure the file system inherits from `fsspec.AbstractFileSystem`
517 # as parquet expects that.
518 self.assertTrue(isinstance(fsys, fsspec.AbstractFileSystem))
520 # Ensure properties of the remote file are consistent with those
521 # same properties retrieved via the file system.
522 self.assertTrue(path, remote_file.geturl())
523 self.assertTrue(fsys.exists(path))
524 self.assertTrue(fsys.isfile(path))
525 self.assertFalse(fsys.isdir(path))
526 self.assertEqual(fsys.size(path), remote_file.size())
527 self.assertEqual(remote_file._stat().last_modified, fsys.modified(path))
529 info = fsys.info(path)
530 self.assertTrue(info["name"], path)
531 self.assertTrue(info["type"], "file")
532 self.assertTrue(info["size"], remote_file.size())
534 # Ensure that the file system raises with methods attempting to
535 # modifying the file system.
536 with self.assertRaises(NotImplementedError):
537 fsys.rm(path)
539 # Ensure that the file system raises with methods not implemented.
540 with self.assertRaises(NotImplementedError):
541 fsys.mkdir("a/b/c")
543 not_implemented_methods = [
544 fsys.mkdir,
545 fsys.makedirs,
546 fsys.rmdir,
547 fsys.walk,
548 fsys.find,
549 fsys.walk,
550 fsys.find,
551 fsys.du,
552 fsys.glob,
553 fsys.rm_file,
554 fsys.rm,
555 fsys.touch,
556 fsys.ukey,
557 fsys.created,
558 ]
559 for method in not_implemented_methods:
560 with self.assertRaises(NotImplementedError):
561 method(path="xxx")
563 # Ensure that the file system raises with methods with any path
564 # different from the only file returned by to_fsspec().
565 file_not_found_methods = [fsys.info, fsys.ls, fsys.modified, fsys.size]
566 for method in file_not_found_methods:
567 with self.assertRaises(FileNotFoundError):
568 method(path="xxx")
570 def test_dav_parquet_read(self):
571 # Check we can read a parquet file via to_fsspec()
573 if fsspec is None:
574 self.skipTest("fsspec not available")
576 try:
577 import numpy as np
578 import pyarrow as pa # type: ignore
579 import pyarrow.parquet as pq # type: ignore
581 # Create a local parquet file and upload it. Ensure it is bigger
582 # than the default buffer size so that it is not entirely
583 # cached when read by parquet library.
584 num_rows = 1_000_000
585 data = {
586 "one": np.arange(num_rows, dtype=np.int32),
587 "two": np.arange(num_rows, dtype=np.int64),
588 "three": np.arange(num_rows, dtype=np.float64),
589 }
590 table = pa.Table.from_pydict(data)
591 local_file = self._make_local_temp_file()
592 pq.write_table(table, local_file)
593 local_file_size = os.stat(local_file).st_size
595 remote_file = self.tmpdir.join("file.parquet")
596 with open(local_file, "rb") as file:
597 remote_file.write(file, overwrite=True)
599 self.assertTrue(remote_file.exists())
600 self.assertEqual(remote_file.size(), local_file_size)
602 # Read the remote file we just uploaded via parquet, using
603 # similar function as used by
604 # `lsst.daf.butler.formatters.ParquetFormatter`.
605 fsys, path = remote_file.to_fsspec()
606 schema = pq.read_schema(path, filesystem=fsys)
607 for column in data.keys():
608 self.assertTrue(column in schema.names)
610 table = pq.read_table(path, filesystem=fsys, use_threads=True, use_pandas_metadata=False)
611 for column in data.keys():
612 self.assertTrue(column in table.column_names)
613 self.assertEqual(table.num_rows, num_rows)
614 self.assertEqual(table.num_columns, len(data.keys()))
616 # Convert the parquet table to a Python dictionnary and compare
617 # its contents with the data originally used to create the
618 # parquet file.
619 data_from_parquet = table.to_pydict()
620 for column in data.keys():
621 self.assertTrue(np.array_equal(data[column], data_from_parquet[column]))
623 except ImportError:
624 self.skipTest("numpy or pyarrow are not available")
626 def test_dav_zip(self):
627 # Check we can read back a zip file
629 # Create a local zip file composed of a random number of identical
630 # files.
631 local_file, local_file_size = self._generate_file()
632 local_file_digest = self._compute_digest_for_file(local_file)
634 num_members = random.randint(10, 20)
635 basename = os.path.basename(local_file)
636 member_names = [f"{basename}-{i}" for i in range(num_members)]
637 zip_file_name = self._make_local_temp_file()
638 with ZipFile(zip_file_name, mode="w") as zf:
639 for name in member_names:
640 zf.write(local_file, name)
642 # Upload the zip file to the server
643 with open(zip_file_name, mode="rb") as file:
644 remote_zip_file = self.tmpdir.join("example.zip")
645 self.assertIsNone(remote_zip_file.write(file, overwrite=True))
646 self.assertEqual(os.stat(zip_file_name).st_size, remote_zip_file.size())
648 # Read the zip file back and check its contents.
649 with remote_zip_file.open("rb") as fd:
650 # Check the names of member files match
651 zf = ZipFile(fd)
652 zip_members = [info.filename for info in zf.infolist()]
653 for name in member_names:
654 self.assertTrue(name in zip_members)
656 # Check the sizes of member files match
657 for file_size in [info.file_size for info in zf.infolist()]:
658 self.assertTrue(file_size, local_file_size)
660 # Check that the contents of a randomly-selected member file
661 # is identical to the original file.
662 random_member = random.choice(zf.infolist())
663 with zf.open(random_member) as member:
664 self.assertEqual(local_file_digest, self._compute_digest(member.read()))
666 # Concurrently read all the members of the remote Zip file
667 def download_zip_member(uri: ResourcePath, zinfo: ZipInfo) -> tuple[int, str]:
668 # Download the member of Zip file at `uri` designated by `zinfo`
669 # and return its size in bytes and a checksum of its contents.
670 with uri.open("rb") as fd:
671 with ZipFile(fd).open(zinfo.filename) as member:
672 data = member.read()
673 return len(data), self._compute_digest(data)
675 with remote_zip_file.open("rb") as fd:
676 executor = concurrent.futures.ThreadPoolExecutor(max_workers=10)
677 zfile_components = ZipFile(fd).infolist()
678 futures = [
679 executor.submit(download_zip_member, remote_zip_file, zinfo) for zinfo in zfile_components
680 ]
682 # Gather results and check they match the expected values
683 for future in concurrent.futures.as_completed(futures):
684 member_size, member_digest = future.result()
685 self.assertEqual(member_size, local_file_size)
686 self.assertEqual(member_digest, local_file_digest)
688 def test_dav_get_info(self):
689 # Missing resources now raise instead of returning a partial dict.
690 subdir = self.tmpdir.join("inexistent", forceDirectory=True)
691 with self.assertRaises(FileNotFoundError):
692 subdir.get_info()
694 # Retrieve and check metadata details about an existing directory
695 subdir = self.tmpdir.join(self._get_dir_name(), forceDirectory=True)
696 self.assertIsNone(subdir.mkdir())
697 self.assertTrue(subdir.exists())
698 metadata = subdir.get_info()
699 self.assertIsInstance(metadata, ResourceInfo)
701 self.assertFalse(metadata.is_file)
702 self.assertEqual(metadata.size, 0)
703 self.assertEqual(len(metadata.checksums), 0)
704 self.assertEqual(metadata.last_modified.tzinfo, datetime.UTC)
705 self.assertEqual(metadata.last_modified, subdir._stat().last_modified)
707 # Retrieve and check metadata details about existing file
708 local_file, local_file_size = self._generate_file()
709 with open(local_file, "rb") as file:
710 content = file.read()
711 md5_checksum = self._compute_digest(content, algorithm="md5")
712 adler32_checksum = self._compute_digest(content, algorithm="adler32")
714 remote_file = self.tmpdir.join("example.data")
715 with open(local_file, mode="rb") as file:
716 self.assertIsNone(remote_file.write(file, overwrite=True))
717 self.assertEqual(os.stat(local_file).st_size, remote_file.size())
719 metadata = remote_file.get_info()
720 self.assertIsInstance(metadata, ResourceInfo)
721 self.assertTrue(metadata.is_file)
722 self.assertEqual(metadata.size, local_file_size)
723 self.assertEqual(metadata.last_modified.tzinfo, datetime.UTC)
724 self.assertEqual(metadata.last_modified, remote_file._stat().last_modified)
726 checksums = metadata.checksums
727 if "md5" in checksums:
728 self.assertEqual(checksums["md5"], md5_checksum)
729 if "adler32" in checksums:
730 self.assertEqual(checksums["adler32"], adler32_checksum)
732 @classmethod
733 def _get_port_number(cls) -> int:
734 """Return a port number the webDAV server can use to listen to."""
735 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
736 s.bind(("127.0.0.1", 0))
737 s.listen()
738 port = s.getsockname()[1]
739 s.close()
740 return port
742 def _serve_webdav(self, local_path: str, port: int, stop_webdav_server: Callable[[], bool]):
743 """Start a local webDAV server, listening on http://localhost:port
744 and exposing local_path.
746 This server only runs when this test class is instantiated,
747 and then shuts down. The server must be started is a separate thread.
749 Parameters
750 ----------
751 port : `int`
752 The port number on which the server should listen
753 local_path : `str`
754 Path to an existing local directory for the server to expose.
755 stop_webdav_server : `Callable[[], bool]`
756 Boolean function which returns True when the server should be
757 stopped.
758 """
759 try:
760 # Start the wsgi server in a separate thread
761 config = {
762 "host": "127.0.0.1",
763 "port": port,
764 "provider_mapping": {"/": local_path},
765 "http_authenticator": {"domain_controller": None},
766 "simple_dc": {"user_mapping": {"*": True}},
767 "verbose": 0,
768 "lock_storage": False,
769 "dir_browser": {
770 "enable": False,
771 "ms_sharepoint_support": False,
772 "libre_office_support": False,
773 "response_trailer": False,
774 "davmount_links": False,
775 },
776 }
777 server = wsgi.Server(wsgi_app=WsgiDAVApp(config), bind_addr=(config["host"], config["port"]))
778 t = Thread(target=server.start, daemon=True)
779 t.start()
781 # Shut down the server when done: stop_webdav_server() returns
782 # True when this test suite is being teared down
783 while not stop_webdav_server():
784 time.sleep(1)
785 except KeyboardInterrupt:
786 # Caught Ctrl-C, shut down the server
787 pass
788 finally:
789 server.stop()
790 t.join()
792 @classmethod
793 def _get_name(cls, prefix: str) -> str:
794 alphabet = string.ascii_lowercase + string.digits
795 return f"{prefix}-" + "".join(random.choices(alphabet, k=8))
797 @classmethod
798 def _get_dir_name(cls) -> str:
799 """Return a randomly selected name for a file"""
800 return cls._get_name(prefix="dir")
802 @classmethod
803 def _get_file_name(cls, depth: int = 0) -> str:
804 """Return a randomly selected name for a file.
806 Parameters
807 ----------
808 depth : `int`
809 Number of intermediate directory names to prefix the filename
810 with.
811 """
812 file_name = cls._get_name(prefix="file")
813 if depth == 0:
814 return file_name
816 # Generate names for intermediate directories
817 components = [cls._get_dir_name() for _ in range(depth)]
818 components.append(file_name)
819 return "/".join(components)
821 def _generate_file(self, remove_when_done=True) -> tuple[str, int]:
822 """Create a local file of random size with random contents.
824 Returns
825 -------
826 path : `str`
827 Path to local temporary file. The caller is responsible for
828 removing the file when appropriate.
829 size : `int`
830 Size of the generated file, in bytes.
831 """
832 megabyte = 1024 * 1024
833 size = random.randint(2 * megabyte, 5 * megabyte)
834 tmpfile, path = tempfile.mkstemp()
835 self.assertEqual(os.write(tmpfile, os.urandom(size)), size)
836 os.close(tmpfile)
838 if remove_when_done:
839 DavReadWriteTestCase.local_files_to_remove.append(path)
841 return path, size
843 def _make_local_temp_file(self, remove_when_done=True) -> str:
844 """Create an empty local temporary file.
846 Returns
847 -------
848 path : `str`
849 Path to local temporary file. The caller is responsible for
850 removing the file when appropriate.
851 """
852 tmpfile, path = tempfile.mkstemp()
853 os.close(tmpfile)
855 if remove_when_done:
856 DavReadWriteTestCase.local_files_to_remove.append(path)
858 return path
860 @classmethod
861 def _compute_digest(cls, data: bytes, algorithm: str = "sha256") -> str:
862 """Compute a hash of data."""
863 match algorithm:
864 case "sha256" | "md5":
865 m = hashlib.new(algorithm)
866 m.update(data)
867 return m.hexdigest().lower()
868 case "adler32":
869 return f"{zlib.adler32(data):08x}"
870 case _:
871 raise ValueError(f"unsupported checksum algorithm {algorithm}")
873 @classmethod
874 def _compute_digest_for_file(cls, filename: str, algorithm: str = "sha256") -> str:
875 """Compute a hash of the contents of file `filename`."""
876 with open(filename, "rb") as file:
877 return cls._compute_digest(file.read(), algorithm)
879 @classmethod
880 def _is_server_running(cls, port: int) -> bool:
881 """Return True if there is a server listening on local address
882 127.0.0.1:<port>.
883 """
884 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
885 try:
886 s.connect(("127.0.0.1", port))
887 return True
888 except ConnectionRefusedError:
889 return False
892class DavResourcePathConfigTestCase(unittest.TestCase):
893 """Test for the DavResourcePathConfig class."""
895 def setUp(self):
896 # Prepare temporary directory
897 self.tmpdir = ResourcePath(makeTestTempDir(TESTDIR))
899 # Reinitialize globals.
900 dav_globals._reset()
902 def tearDown(self):
903 # Clean up temporary directory
904 if self.tmpdir and self.tmpdir.isLocal:
905 removeTestTempDir(self.tmpdir.ospath)
907 # Reinitialize globals.
908 dav_globals._reset()
910 def test_dav_tmpdir_buffersize_default(self):
911 # Ensure that the configuration is initialized with the temporary
912 # directory extracted from the environment.
913 config: DavResourcePathConfig = dav_globals.config()
914 tmpdir, _ = config.tmpdir_buffersize
915 self.assertEqual(tmpdir, get_tempdir())
918class DavConfigPoolTestCase(unittest.TestCase):
919 """Test for the DavConfig class."""
921 @classmethod
922 def setUpClass(cls):
923 cls.tmpdir = tempfile.mkdtemp(prefix="webdav-config-test-")
925 @classmethod
926 def tearDownClass(cls):
927 if cls.tmpdir:
928 shutil.rmtree(cls.tmpdir, ignore_errors=True)
930 def tearDown(self):
931 if self.tmpdir:
932 shutil.rmtree(self.tmpdir)
934 def setUp(self):
935 self.tmpdir = makeTestTempDir(TESTDIR)
936 # Reinitialize globals.
937 dav_globals._reset()
939 def test_dav_default_config(self):
940 """Ensure default configuration is used by default."""
941 # Ensure the variable LSST_RESOURCES_WEBDAV_CONFIG is not initialized
942 # so that we use the default configuration.
943 with unittest.mock.patch.dict(os.environ, {}, clear=True):
944 config_pool: DavConfigPool = DavConfigPool()
945 config = config_pool.get_config_for_url("davs://example.org")
946 self.assertEqual(config.retries, DavConfig.DEFAULT_RETRIES)
947 self.assertEqual(config.timeout_connect, DavConfig.DEFAULT_TIMEOUT_CONNECT)
948 self.assertEqual(config.timeout_read, DavConfig.DEFAULT_TIMEOUT_READ)
949 self.assertEqual(config.token, DavConfig.DEFAULT_TOKEN)
950 self.assertEqual(
951 config.persistent_connections_per_host, DavConfig.DEFAULT_PERSISTENT_CONNECTIONS_PER_HOST
952 )
954 def test_dav_configuration_file_does_not_exist(self):
955 # Ensure an exception is raised if the configuration file pointed to
956 # by the environment variable does not exist.
957 with unittest.mock.patch.dict(os.environ, {"MY_VAR": "/does/not/exist"}, clear=True):
958 with self.assertRaises(FileNotFoundError):
959 DavConfigPool("MY_VAR")
961 def test_dav_configuration_file(self):
962 """Ensure the specified configuration file is used."""
963 config_contents: str = r"""
964- base_url: "davs://host1.example.org:1234/"
965 persistent_connections_per_host: 10
966 timeout_connect: 20.0
967 timeout_read: 120.0
968 retries: 3
969 retry_backoff_min: 1.0
970 retry_backoff_max: 3.0
971 user_cert: "${X509_USER_PROXY}"
972 user_key: "${X509_USER_PROXY}"
973 trusted_authorities: "/etc/grid-security/certificates"
974 buffer_size: 5
975 block_size: 18
976 enable_fsspec: false
977 collect_memory_usage: false
978 request_checksum: "md5"
979- base_url: "davs://host2.example.org:4321/"
980 persistent_connections_per_host: 1
981- base_url: "davs://host3.example.org:4321/"
982 token: "ABCDEF"
983- base_url: "dav://host4.example.org:5555/"
984 user_name: "user"
985 user_password: "password"
986 reuse_connection: false
987 frontend_base_urls:
988 - "dav://frontend1.example.org:5555/"
989 - "dav://frontend2.example.org:5555/"
990 - "dav://frontend3.example.org:5555/"
991"""
993 config_file = self._create_config(config_contents)
994 with unittest.mock.patch.dict(os.environ, {"LSST_RESOURCES_WEBDAV_CONFIG": config_file}, clear=True):
995 config_pool: DavConfigPool = DavConfigPool("LSST_RESOURCES_WEBDAV_CONFIG")
997 # Tests for base URL 'davs://host1.example.org:1234'
998 config = config_pool.get_config_for_url("davs://host1.example.org:1234/any/path")
999 self.assertEqual(config.base_url, "https://host1.example.org:1234/")
1000 self.assertEqual(config.retries, 3)
1001 self.assertEqual(config.token, DavConfig.DEFAULT_TOKEN)
1002 self.assertEqual(config.user_cert, "${X509_USER_PROXY}")
1003 self.assertEqual(config.trusted_authorities, "/etc/grid-security/certificates")
1004 self.assertFalse(config.enable_fsspec)
1005 self.assertFalse(config.collect_memory_usage)
1006 self.assertEqual(config.request_checksum, "md5")
1007 self.assertEqual(config.persistent_connections_per_host, 10)
1008 self.assertEqual(config.block_size, 18 * 1024 * 1024)
1010 # Tests for base URL 'davs://host2.example.org:4321/'
1011 config = config_pool.get_config_for_url("davs://host2.example.org:4321")
1012 self.assertEqual(config.base_url, "https://host2.example.org:4321/")
1013 self.assertEqual(config.persistent_connections_per_host, 1)
1015 # Tests for base URL 'davs://host3.example.org:4321/'
1016 config = config_pool.get_config_for_url("davs://host3.example.org:4321")
1017 self.assertEqual(config.base_url, "https://host3.example.org:4321/")
1018 self.assertEqual(config.token, "ABCDEF")
1019 self.assertEqual(config.retries, DavConfig.DEFAULT_RETRIES)
1021 # Tests for base URL 'dav://host4.example.org:5555/'
1022 config = config_pool.get_config_for_url("dav://host4.example.org:5555/")
1023 self.assertEqual(config.base_url, "http://host4.example.org:5555/")
1024 self.assertEqual(config.user_name, "user")
1025 self.assertEqual(config.user_password, "password")
1026 self.assertFalse(config.reuse_connection)
1027 for i in range(1, 3):
1028 url = f"http://frontend{i}.example.org:5555/"
1029 self.assertTrue(url in config.frontend_urls)
1031 # Test that the schemes of the base URL and the frontend URLs of a
1032 # given endpoint are checked and must be identical.
1033 config_contents: str = r"""
1034- base_url: "dav://host5.example.org:5555/"
1035 frontend_base_urls:
1036 - "davs://frontend1.example.org:5555/"
1037"""
1038 config_file = self._create_config(config_contents)
1039 with unittest.mock.patch.dict(os.environ, {"LSST_RESOURCES_WEBDAV_CONFIG": config_file}, clear=True):
1040 with self.assertRaises(ValueError):
1041 DavConfigPool("LSST_RESOURCES_WEBDAV_CONFIG")
1043 def test_dav_repeated_configurations(self):
1044 """Ensure duplicated endpoint errors are detected in configuration
1045 file.
1046 """
1047 config_contents: str = r"""
1048- base_url: "davs://host1.example.org:1234/"
1049- base_url: "davs://host1.example.org:1234/"
1050"""
1051 config_file = self._create_config(config_contents)
1052 with unittest.mock.patch.dict(os.environ, {"MY_VAR": config_file}, clear=True):
1053 with self.assertRaises(ValueError):
1054 DavConfigPool("MY_VAR")
1056 def _create_config(self, config: str) -> str:
1057 with tempfile.NamedTemporaryFile(mode="wt", dir=self.tmpdir, delete=False) as f:
1058 f.write(config)
1059 return f.name
1062class DavTokenAuthorizerTestCase(unittest.TestCase):
1063 """Test for the TokenAuthorizer class."""
1065 def setUp(self):
1066 self.tmpdir = ResourcePath(makeTestTempDir(TESTDIR))
1067 self.token = "ABCDE1234"
1069 def tearDown(self):
1070 if self.tmpdir and self.tmpdir.isLocal:
1071 removeTestTempDir(self.tmpdir.ospath)
1073 def test_dav_empty_token(self):
1074 """Ensure that when no token is provided the headers are not
1075 modified.
1076 """
1077 authorizer = TokenAuthorizer()
1078 headers = {}
1079 authorizer.set_authorization(headers)
1080 self.assertIsNone(headers.get("Authorization"))
1082 def test_dav_token_value(self):
1083 """Ensure that when a token value is provided, the 'Authorization'
1084 header is added to the requests.
1085 """
1086 authorizer = TokenAuthorizer(self.token)
1087 headers = {}
1088 authorizer.set_authorization(headers)
1089 self.assertEqual(headers.get("Authorization"), f"Bearer {self.token}")
1091 def test_dav_token_file(self):
1092 """Ensure when the provided token is a file path, its contents is
1093 correctly added as calue of the 'Authorization' header.
1094 """
1095 with tempfile.NamedTemporaryFile(mode="wt", dir=self.tmpdir.ospath, delete=False) as f:
1096 f.write(self.token)
1097 token_file_path = f.name
1099 # Ensure the request's "Authorization" header is set with the right
1100 # token value
1101 os.chmod(token_file_path, stat.S_IRUSR)
1102 authorizer = TokenAuthorizer(token_file_path)
1103 headers = {}
1104 authorizer.set_authorization(headers)
1105 self.assertEqual(headers.get("Authorization"), f"Bearer {self.token}")
1107 # Ensure an exception is raised if either group or other can read the
1108 # token file
1109 for mode in (stat.S_IRGRP, stat.S_IWGRP, stat.S_IXGRP, stat.S_IROTH, stat.S_IWOTH, stat.S_IXOTH):
1110 os.chmod(token_file_path, stat.S_IRUSR | mode)
1111 with self.assertRaises(PermissionError):
1112 TokenAuthorizer(token_file_path)
1115if __name__ == "__main__":
1116 unittest.main()