Coverage for tests / test_http.py: 16%
673 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 08:44 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 08:44 +0000
1# This file is part of lsst-resources.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12import hashlib
13import io
14import os.path
15import pickle
16import random
17import shutil
18import socket
19import stat
20import string
21import tempfile
22import time
23import unittest
24import unittest.mock
25import warnings
26from collections.abc import Callable
27from datetime import UTC
28from threading import Thread
29from typing import cast
31try:
32 from cheroot import wsgi
33 from wsgidav.wsgidav_app import WsgiDAVApp
34except ImportError:
35 WsgiDAVApp = None
37import requests
38import responses
39import responses.matchers
41import lsst.resources
42from lsst.resources import ResourceInfo, ResourcePath
43from lsst.resources._resourceHandles._httpResourceHandle import (
44 HttpReadResourceHandle,
45 parse_content_range_header,
46)
47from lsst.resources.http import (
48 BearerTokenAuth,
49 HttpResourcePath,
50 HttpResourcePathConfig,
51 SessionStore,
52 _get_dav_and_server_headers,
53 _is_protected,
54)
55from lsst.resources.tests import GenericReadWriteTestCase, GenericTestCase
56from lsst.resources.utils import _get_num_workers, makeTestTempDir, removeTestTempDir
58TESTDIR = os.path.abspath(os.path.dirname(__file__))
61class GenericHttpTestCase(GenericTestCase, unittest.TestCase):
62 """Generic tests of http URIs."""
64 scheme = "http"
65 netloc = "server.example"
67 def test_root_uri(self):
68 self.assertEqual(ResourcePath("http://server.com").root_uri(), ResourcePath("http://server.com/"))
69 self.assertEqual(
70 ResourcePath("http://user:password@server.com:3000/").root_uri(),
71 ResourcePath("http://user:password@server.com:3000/"),
72 )
73 self.assertEqual(
74 ResourcePath("http://user:password@server.com:3000/some/path").root_uri(),
75 ResourcePath("http://user:password@server.com:3000/"),
76 )
77 self.assertEqual(
78 ResourcePath("http://user:password@server.com:3000/some/path#fragment").root_uri(),
79 ResourcePath("http://user:password@server.com:3000/"),
80 )
81 self.assertEqual(
82 ResourcePath("http://user:password@server.com:3000/some/path?param=value").root_uri(),
83 ResourcePath("http://user:password@server.com:3000/"),
84 )
85 self.assertEqual(
86 ResourcePath("http://user:password@server.com:3000/some/path;parameters").root_uri(),
87 ResourcePath("http://user:password@server.com:3000/"),
88 )
90 @responses.activate
91 def test_extra_headers(self):
92 url = "http://test.example/something.txt"
93 path = HttpResourcePath.create_http_resource_path(
94 url, extra_headers={"Authorization": "Bearer my-token"}
95 )
97 self.assertEqual(str(path), "http://test.example/something.txt")
98 self.assertEqual(path._extra_headers, {"Authorization": "Bearer my-token"})
100 # Make sure that headers are added to requests.
101 responses.add(
102 responses.GET,
103 url,
104 b"test",
105 match=[responses.matchers.header_matcher({"Authorization": "Bearer my-token"})],
106 )
107 self.assertEqual(path.read(), b"test")
109 # Make sure that headers are added to fsspec.
110 # This triggers logic for "webdav" vs "not-webdav" that does an OPTIONS
111 # request, so we need to check that too.
112 responses.add(
113 responses.OPTIONS,
114 "http://test.example/",
115 match=[responses.matchers.header_matcher({"Authorization": "Bearer my-token"})],
116 )
117 fs, _ = path.to_fsspec()
118 self.assertEqual(fs.client_kwargs.get("headers"), {"Authorization": "Bearer my-token"})
120 # Extra headers should be preserved through pickle, to ensure that
121 # `mtransfer` and similar methods work in multi-process mode.
122 dump = pickle.dumps(path)
123 restored = pickle.loads(dump)
124 self.assertEqual(restored._extra_headers, {"Authorization": "Bearer my-token"})
126 # Extra headers should be preserved when making a modified copy of the
127 # ResourcePath using replace() or the ResourcePath constructor.
128 replacement = path.replace(forceDirectory=True)
129 self.assertEqual(replacement._extra_headers, {"Authorization": "Bearer my-token"})
130 copy = ResourcePath(path, forceDirectory=True)
131 self.assertEqual(copy._extra_headers, {"Authorization": "Bearer my-token"})
133 @responses.activate
134 def test_get_info(self):
135 _get_dav_and_server_headers.cache_clear()
136 url = "http://test.example/something.txt"
137 responses.add(responses.OPTIONS, "http://test.example/", status=200)
138 responses.add(
139 responses.HEAD,
140 url,
141 status=200,
142 headers={
143 "Content-Length": "123",
144 "Last-Modified": "Wed, 12 Mar 2025 10:11:13 GMT",
145 "Digest": "md5=rL0Y20zC+Fzt72VPzMSk2A==, sha-256=def456",
146 },
147 )
149 info = ResourcePath(url).get_info()
150 self.assertIsInstance(info, ResourceInfo)
151 self.assertTrue(info.is_file)
152 self.assertEqual(info.size, 123)
153 self.assertEqual(info.last_modified.tzinfo, UTC)
154 self.assertEqual(info.last_modified.year, 2025)
155 self.assertEqual(info.checksums, {"md5": "rL0Y20zC+Fzt72VPzMSk2A==", "sha-256": "def456"})
156 self.assertEqual(len(responses.calls), 2)
159class HttpReadWriteWebdavTestCase(GenericReadWriteTestCase, unittest.TestCase):
160 """Test with a real webDAV server, as opposed to mocking responses."""
162 scheme = "http"
163 local_files_to_remove: list[str] = []
165 @classmethod
166 def setUpClass(cls):
167 cls.webdav_tmpdir = tempfile.mkdtemp(prefix="webdav-server-test-")
168 cls.server_thread = None
170 # Disable warnings about socket connections left open. We purposedly
171 # keep network connections to the remote server open and have no
172 # means through the API exposed by Requests of actually close the
173 # underlyng sockets to make tests pass without warning.
174 warnings.filterwarnings(action="ignore", message=r"unclosed.*socket", category=ResourceWarning)
176 # Should we test against a running server?
177 #
178 # This is convenient for testing against real servers in the
179 # developer environment by initializing the environment variable
180 # LSST_RESOURCES_HTTP_TEST_SERVER_URL with the URL of the server, e.g.
181 # https://dav.example.org:1234/path/to/top/dir
182 if (test_endpoint := os.getenv("LSST_RESOURCES_HTTP_TEST_SERVER_URL")) is not None:
183 # Run this test case against the specified server.
184 uri = ResourcePath(test_endpoint)
185 cls.scheme = uri.scheme
186 cls.netloc = uri.netloc
187 cls.base_path = uri.path
188 elif WsgiDAVApp is not None:
189 # WsgiDAVApp is available, launch a local server in its own
190 # thread to expose a local temporary directory and run this
191 # test case against it.
192 cls.port_number = cls._get_port_number()
193 cls.stop_webdav_server = False
194 cls.server_thread = Thread(
195 target=cls._serve_webdav,
196 args=(cls, cls.webdav_tmpdir, cls.port_number, lambda: cls.stop_webdav_server),
197 daemon=True,
198 )
199 cls.server_thread.start()
201 # Wait for it to start
202 time.sleep(1)
204 # Initialize the server endpoint
205 cls.netloc = f"127.0.0.1:{cls.port_number}"
206 else:
207 cls.skipTest(
208 cls,
209 "neither WsgiDAVApp is available nor a webDAV test endpoint is configured to test against",
210 )
212 @classmethod
213 def tearDownClass(cls):
214 # Stop the WsgiDAVApp server, if any
215 if WsgiDAVApp is not None:
216 # Shut down of the webdav server and wait for the thread to exit
217 cls.stop_webdav_server = True
218 if cls.server_thread is not None:
219 cls.server_thread.join()
221 # Remove local temporary files
222 for file in cls.local_files_to_remove:
223 if os.path.exists(file):
224 os.remove(file)
226 # Remove temp dir
227 if cls.webdav_tmpdir:
228 shutil.rmtree(cls.webdav_tmpdir, ignore_errors=True)
230 # Reset the warnings filter.
231 warnings.resetwarnings()
233 def tearDown(self):
234 if self.tmpdir:
235 self.tmpdir.remove()
237 # Clear sessions. Some sockets may be left open, because urllib3
238 # doest not close in-flight connections.
239 # See https://urllib3.readthedocs.io > API Reference >
240 # Pool Manager > clear()
241 # I cannot add the full URL here because it is longer than 79
242 # characters.
243 self.tmpdir._clear_sessions()
245 super().tearDown()
247 def test_dav_file_handle(self):
248 # Upload a new file with known contents.
249 contents = "These are some \n bytes to read"
250 remote_file = self.tmpdir.join(self._get_file_name())
251 self.assertIsNone(remote_file.write(data=contents, overwrite=True))
253 # Test that the correct handle is returned.
254 with remote_file.open("rb") as handle:
255 self.assertIsInstance(handle, HttpReadResourceHandle)
257 # Test reading byte ranges works
258 with remote_file.open("rb") as handle:
259 sub_contents = contents[:10]
260 handle = cast(HttpReadResourceHandle, handle)
261 result = handle.read(len(sub_contents)).decode()
262 self.assertEqual(result, sub_contents)
263 # Verify there is no internal buffer.
264 self.assertIsNone(handle._completeBuffer)
265 # Verify the position.
266 self.assertEqual(handle.tell(), len(sub_contents))
268 # Jump back to the beginning and test if reading the whole file
269 # prompts the internal buffer to be read.
270 handle.seek(0)
271 self.assertEqual(handle.tell(), 0)
272 result = handle.read().decode()
273 self.assertIsNotNone(handle._completeBuffer)
274 self.assertEqual(result, contents)
276 # Check that flush works on read-only handle.
277 handle.flush()
279 # Verify reading as a string handle works as expected.
280 with remote_file.open("r") as handle:
281 self.assertIsInstance(handle, io.TextIOWrapper)
283 handle = cast(io.TextIOWrapper, handle)
284 self.assertIsInstance(handle.buffer, HttpReadResourceHandle)
286 # Check if string methods work.
287 result = handle.read()
288 self.assertEqual(result, contents)
290 # Check that flush works on read-only handle.
291 handle.flush()
293 # Verify that write modes invoke the default base method
294 with remote_file.open("w") as handle:
295 self.assertIsInstance(handle, io.StringIO)
297 def test_dav_is_dav_enpoint(self):
298 # Ensure the server is a webDAV endpoint
299 self.assertTrue(self.tmpdir.is_webdav_endpoint)
301 def test_dav_mkdir(self):
302 # Check creation and deletion of an empty directory
303 subdir = self.tmpdir.join(self._get_dir_name(), forceDirectory=True)
304 self.assertIsNone(subdir.mkdir())
305 self.assertTrue(subdir.exists())
307 # Creating an existing remote directory must succeed
308 self.assertIsNone(subdir.mkdir())
310 # Deletion of an existing directory must succeed
311 self.assertIsNone(subdir.remove())
313 # Deletion of an non-existing directory must succeed
314 subir_not_exists = self.tmpdir.join(self._get_dir_name(), forceDirectory=True)
315 self.assertIsNone(subir_not_exists.remove())
317 # Creation of a directory at a path where a file exists must raise
318 file = self.tmpdir.join(self._get_file_name(), forceDirectory=False)
319 file.write(data=None, overwrite=True)
320 self.assertTrue(file.exists())
322 existing_file = self.tmpdir.join(file.basename(), forceDirectory=True)
323 with self.assertRaises(NotADirectoryError):
324 self.assertIsNone(existing_file.mkdir())
326 def test_dav_upload_download(self):
327 # Test upload a randomly-generated file via write() with and without
328 # overwrite
329 local_file, file_size = self._generate_file()
330 with open(local_file, "rb") as f:
331 data = f.read()
333 remote_file = self.tmpdir.join(self._get_file_name())
334 self.assertIsNone(remote_file.write(data, overwrite=True))
335 self.assertTrue(remote_file.exists())
336 self.assertEqual(remote_file.size(), file_size)
338 # Write without overwrite must raise since target file exists
339 with self.assertRaises(FileExistsError):
340 remote_file.write(data, overwrite=False)
342 # Download the file we just uploaded. Compute and compare a digest of
343 # the uploaded and downloaded data and ensure they match
344 downloaded_data = remote_file.read()
345 self.assertEqual(len(downloaded_data), file_size)
346 upload_digest = self._compute_digest(data)
347 download_digest = self._compute_digest(downloaded_data)
348 self.assertEqual(upload_digest, download_digest)
349 os.remove(local_file)
351 def test_dav_as_local(self):
352 contents = str.encode("12345")
353 remote_file = self.tmpdir.join(self._get_file_name())
354 self.assertIsNone(remote_file.write(data=contents, overwrite=True))
356 with remote_file._as_local() as local_uri:
357 self.assertTrue(local_uri.isTemporary)
358 self.assertTrue(os.path.exists(local_uri.ospath))
359 self.assertTrue(os.stat(local_uri.ospath).st_size, len(contents))
360 self.assertEqual(local_uri.read(), contents)
361 self.assertFalse(local_uri.exists())
363 def test_dav_size(self):
364 # Size of a non-existent file must raise.
365 remote_file = self.tmpdir.join(self._get_file_name())
366 with self.assertRaises(FileNotFoundError):
367 remote_file.size()
369 # Retrieving the size of a remote directory using a file-like path must
370 # raise
371 remote_dir = self.tmpdir.join(self._get_dir_name(), forceDirectory=True)
372 self.assertIsNone(remote_dir.mkdir())
373 self.assertTrue(remote_dir.exists())
375 dir_as_file = ResourcePath(remote_dir.geturl().rstrip("/"), forceDirectory=False)
376 with self.assertRaises(IsADirectoryError):
377 dir_as_file.size()
379 def test_dav_upload_creates_dir(self):
380 # Uploading a file to a non existing directory must ensure its
381 # parent directories are automatically created and upload succeeds
382 non_existing_dir = self.tmpdir.join(self._get_dir_name(), forceDirectory=True)
383 non_existing_dir = non_existing_dir.join(self._get_dir_name(), forceDirectory=True)
384 non_existing_dir = non_existing_dir.join(self._get_dir_name(), forceDirectory=True)
385 remote_file = non_existing_dir.join(self._get_file_name())
387 local_file, file_size = self._generate_file()
388 with open(local_file, "rb") as f:
389 data = f.read()
390 self.assertIsNone(remote_file.write(data, overwrite=True))
392 self.assertTrue(remote_file.exists())
393 self.assertEqual(remote_file.size(), file_size)
394 self.assertTrue(remote_file.parent().exists())
396 downloaded_data = remote_file.read()
397 upload_digest = self._compute_digest(data)
398 download_digest = self._compute_digest(downloaded_data)
399 self.assertEqual(upload_digest, download_digest)
400 os.remove(local_file)
402 def test_dav_transfer_from(self):
403 # Transfer from local file via "copy", with and without overwrite
404 remote_file = self.tmpdir.join(self._get_file_name())
405 local_file, _ = self._generate_file()
406 source_file = ResourcePath(local_file)
407 self.assertIsNone(remote_file.transfer_from(source_file, transfer="copy", overwrite=True))
408 self.assertTrue(remote_file.exists())
409 self.assertEqual(remote_file.size(), source_file.size())
410 with self.assertRaises(FileExistsError):
411 remote_file.transfer_from(ResourcePath(local_file), transfer="copy", overwrite=False)
413 # Transfer from remote file via "copy", with and without overwrite
414 source_file = remote_file
415 target_file = self.tmpdir.join(self._get_file_name())
416 self.assertIsNone(target_file.transfer_from(source_file, transfer="copy", overwrite=True))
417 self.assertTrue(target_file.exists())
418 self.assertEqual(target_file.size(), source_file.size())
420 # Transfer without overwrite must raise since target resource exists
421 with self.assertRaises(FileExistsError):
422 target_file.transfer_from(source_file, transfer="copy", overwrite=False)
424 # Test transfer from local file via "move", with and without overwrite
425 source_file = ResourcePath(local_file)
426 source_size = source_file.size()
427 target_file = self.tmpdir.join(self._get_file_name())
428 self.assertIsNone(target_file.transfer_from(source_file, transfer="move", overwrite=True))
429 self.assertTrue(target_file.exists())
430 self.assertEqual(target_file.size(), source_size)
431 self.assertFalse(source_file.exists())
433 # Test transfer without overwrite must raise since target resource
434 # exists
435 local_file, file_size = self._generate_file()
436 with self.assertRaises(FileExistsError):
437 source_file = ResourcePath(local_file)
438 target_file.transfer_from(source_file, transfer="move", overwrite=False)
440 # Test transfer from remote file via "move" with and without overwrite
441 # must succeed
442 source_file = target_file
443 source_size = source_file.size()
444 target_file = self.tmpdir.join(self._get_file_name())
445 self.assertIsNone(target_file.transfer_from(source_file, transfer="move", overwrite=True))
446 self.assertTrue(target_file.exists())
447 self.assertEqual(target_file.size(), source_size)
448 self.assertFalse(source_file.exists())
450 # Transfer without overwrite must raise since target resource exists
451 with self.assertRaises(FileExistsError):
452 source_file = ResourcePath(local_file)
453 target_file.transfer_from(source_file, transfer="move", overwrite=False)
455 def test_dav_handle(self):
456 # Resource handle must succeed
457 target_file = self.tmpdir.join(self._get_file_name())
458 data = "abcdefghi"
459 self.assertIsNone(target_file.write(data, overwrite=True))
460 with target_file.open("rb") as handle:
461 handle.seek(1)
462 self.assertEqual(handle.read(4).decode("utf-8"), data[1:5])
464 def test_dav_delete(self):
465 # Deletion of an existing remote file must succeed
466 local_file, file_size = self._generate_file()
467 with open(local_file, "rb") as f:
468 data = f.read()
470 remote_file = self.tmpdir.join(self._get_file_name())
471 self.assertIsNone(remote_file.write(data, overwrite=True))
472 self.assertTrue(remote_file.exists())
473 self.assertEqual(remote_file.size(), file_size)
474 self.assertIsNone(remote_file.remove())
475 os.remove(local_file)
477 # Deletion of a non-existing remote file must succeed
478 non_existing_file = self.tmpdir.join(self._get_file_name())
479 self.assertIsNone(non_existing_file.remove())
481 # Deletion of a non-empty remote directory must succeed
482 subdir = self.tmpdir.join(self._get_dir_name(), forceDirectory=True)
483 self.assertIsNone(subdir.mkdir())
484 self.assertTrue(subdir.exists())
485 local_file, _ = self._generate_file()
486 source_file = ResourcePath(local_file)
487 target_file = self.tmpdir.join(self._get_file_name(), forceDirectory=True)
488 self.assertIsNone(target_file.transfer_from(source_file, transfer="copy", overwrite=True))
489 self.assertIsNone(subdir.remove())
490 self.assertFalse(subdir.exists())
491 os.remove(local_file)
493 def test_dav_to_fsspec(self):
494 # Upload a randomly-generated file via write() with overwrite.
495 local_file, file_size = self._generate_file()
496 with open(local_file, "rb") as f:
497 data = f.read()
499 remote_file = self.tmpdir.join(self._get_file_name())
500 self.assertIsNone(remote_file.write(data, overwrite=True))
501 self.assertTrue(remote_file.exists())
502 self.assertEqual(remote_file.size(), file_size)
503 remote_file_url = remote_file.geturl()
505 # to_fsspec() may raise if that feature is not specifically
506 # enabled in the environment and remote server is one of the
507 # webDAV servers that support signing URLs.
508 with unittest.mock.patch.dict(os.environ, {}, clear=True):
509 try:
510 # Force reinitialization of the config from the environment
511 HttpResourcePath._reload_config()
512 fsys, url = ResourcePath(remote_file_url).to_fsspec()
513 self.assertEqual(data, fsys.cat(url))
514 except ImportError as e:
515 self.assertTrue("disable" in str(e))
517 # Ensure to_fsspec() works if that feature is enabled in the
518 # environment.
519 with unittest.mock.patch.dict(os.environ, {"LSST_HTTP_ENABLE_FSSPEC": "true"}, clear=True):
520 try:
521 # Force reinitialization of the config from the environment.
522 HttpResourcePath._reload_config()
523 rpath = ResourcePath(remote_file_url)
525 # Ensure that the contents of the remote file we just
526 # uploaded is identical to the contents of that file when
527 # retrieved via fsspec.open().
528 fsys, url = rpath.to_fsspec()
529 with fsys.open(url) as f:
530 self.assertEqual(data, f.read())
532 # Ensure the contents is identical to the result of
533 # fsspec.cat()
534 self.assertEqual(data, fsys.cat(url))
536 # Ensure that attempting to modify a remote via via fsspec
537 # fails, since the returned URL is signed for download only.
538 # fsspec.rm() raises NotImplementedError if it cannot remove
539 # the remote file.
540 if rpath.server_signs_urls:
541 with self.assertRaises(NotImplementedError):
542 fsys, url = rpath.to_fsspec()
543 fsys.rm(url)
544 except NotImplementedError as e:
545 # to_fsspec() must succeed if remote server knows how to
546 # sign URLs
547 if rpath.server_signs_urls:
548 raise e
550 # Force reinitialization of the config from the environment and
551 # clean up local file.
552 HttpResourcePath._reload_config()
553 os.remove(local_file)
555 @responses.activate
556 def test_is_webdav_endpoint(self):
557 davEndpoint = "http://www.lsstwithwebdav.org"
558 responses.add(responses.OPTIONS, davEndpoint, status=200, headers={"DAV": "1,2,3"})
559 self.assertTrue(ResourcePath(davEndpoint).is_webdav_endpoint)
561 plainHttpEndpoint = "http://www.lsstwithoutwebdav.org"
562 responses.add(responses.OPTIONS, plainHttpEndpoint, status=200)
563 self.assertFalse(ResourcePath(plainHttpEndpoint).is_webdav_endpoint)
565 notWebdavEndpoint = "http://www.notwebdav.org"
566 responses.add(responses.OPTIONS, notWebdavEndpoint, status=403)
567 self.assertFalse(ResourcePath(notWebdavEndpoint).is_webdav_endpoint)
569 @responses.activate
570 def test_plain_http_url_signing(self):
571 # As in test_is_webdav_endpoint above, configure a URL to appear as a
572 # non-webdav HTTP server.
573 plainHttpEndpoint = "http://nonwebdav.test"
574 responses.add(responses.OPTIONS, plainHttpEndpoint, status=200)
576 # Plain HTTP URLs are already readable without authentication, so
577 # generating a pre-signed URL is a no-op.
578 path = ResourcePath("http://nonwebdav.test/file#frag")
579 self.assertEqual(
580 path.generate_presigned_get_url(expiration_time_seconds=300), "http://nonwebdav.test/file#frag"
581 )
583 # Writing to an arbitrary plain HTTP URL is unlikely to work, so we
584 # don't generate put URLs.
585 with self.assertRaises(NotImplementedError):
586 path.generate_presigned_put_url(expiration_time_seconds=300)
588 @responses.activate
589 def test_server_identity(self):
590 server = "MyServer/v1.2.3"
591 endpointWithServer = "http://www.lsstwithserverheader.org"
592 responses.add(responses.OPTIONS, endpointWithServer, status=200, headers={"Server": server})
593 self.assertEqual(ResourcePath(endpointWithServer).server, "myserver")
595 endpointWithoutServer = "http://www.lsstwithoutserverheader.org"
596 responses.add(responses.OPTIONS, endpointWithoutServer, status=200)
597 self.assertIsNone(ResourcePath(endpointWithoutServer).server)
599 @classmethod
600 def _get_port_number(cls) -> int:
601 """Return a port number the webDAV server can use to listen to."""
602 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
603 s.bind(("127.0.0.1", 0))
604 s.listen()
605 port = s.getsockname()[1]
606 s.close()
607 return port
609 def _serve_webdav(self, local_path: str, port: int, stop_webdav_server: Callable[[], bool]):
610 """Start a local webDAV server, listening on http://localhost:port
611 and exposing local_path.
613 This server only runs when this test class is instantiated,
614 and then shuts down. The server must be started is a separate thread.
616 Parameters
617 ----------
618 port : `int`
619 The port number on which the server should listen
620 local_path : `str`
621 Path to an existing local directory for the server to expose.
622 stop_webdav_server : `Callable[[], bool]`
623 Boolean function which returns True when the server should be
624 stopped.
625 """
626 try:
627 # Start the wsgi server in a separate thread
628 config = {
629 "host": "127.0.0.1",
630 "port": port,
631 "provider_mapping": {"/": local_path},
632 "http_authenticator": {"domain_controller": None},
633 "simple_dc": {"user_mapping": {"*": True}},
634 "verbose": 0,
635 "lock_storage": False,
636 "dir_browser": {
637 "enable": False,
638 "ms_sharepoint_support": False,
639 "libre_office_support": False,
640 "response_trailer": False,
641 "davmount_links": False,
642 },
643 }
644 server = wsgi.Server(wsgi_app=WsgiDAVApp(config), bind_addr=(config["host"], config["port"]))
645 t = Thread(target=server.start, daemon=True)
646 t.start()
648 # Shut down the server when done: stop_webdav_server() returns
649 # True when this test suite is being teared down
650 while not stop_webdav_server():
651 time.sleep(1)
652 except KeyboardInterrupt:
653 # Caught Ctrl-C, shut down the server
654 pass
655 finally:
656 server.stop()
657 t.join()
659 @classmethod
660 def _get_name(cls, prefix: str) -> str:
661 alphabet = string.ascii_lowercase + string.digits
662 return f"{prefix}-" + "".join(random.choices(alphabet, k=8))
664 @classmethod
665 def _get_dir_name(cls) -> str:
666 """Return a randomly selected name for a file"""
667 return cls._get_name(prefix="dir")
669 @classmethod
670 def _get_file_name(cls) -> str:
671 """Return a randomly selected name for a file"""
672 return cls._get_name(prefix="file")
674 def _generate_file(self, remove_when_done=True) -> tuple[str, int]:
675 """Create a local file of random size with random contents.
677 Returns
678 -------
679 path : `str`
680 Path to local temporary file. The caller is responsible for
681 removing the file when appropriate.
682 size : `int`
683 Size of the generated file, in bytes.
684 """
685 megabyte = 1024 * 1024
686 size = random.randint(2 * megabyte, 5 * megabyte)
687 tmpfile, path = tempfile.mkstemp()
688 self.assertEqual(os.write(tmpfile, os.urandom(size)), size)
689 os.close(tmpfile)
691 if remove_when_done:
692 HttpReadWriteWebdavTestCase.local_files_to_remove.append(path)
694 return path, size
696 @classmethod
697 def _compute_digest(cls, data: bytes) -> str:
698 """Compute a SHA256 hash of data."""
699 m = hashlib.sha256()
700 m.update(data)
701 return m.hexdigest()
703 @classmethod
704 def _is_server_running(cls, port: int) -> bool:
705 """Return True if there is a server listening on local address
706 127.0.0.1:<port>.
707 """
708 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
709 try:
710 s.connect(("127.0.0.1", port))
711 return True
712 except ConnectionRefusedError:
713 return False
716class HttpResourcePathConfigTestCase(unittest.TestCase):
717 """Test for the HttpResourcePathConfig class."""
719 def setUp(self):
720 self.tmpdir = ResourcePath(makeTestTempDir(TESTDIR))
722 def tearDown(self):
723 if self.tmpdir and self.tmpdir.isLocal:
724 removeTestTempDir(self.tmpdir.ospath)
726 def test_send_expect_header(self):
727 # Ensure environment variable LSST_HTTP_PUT_SEND_EXPECT_HEADER is
728 # inspected to initialize the HttpResourcePathConfig class.
729 with unittest.mock.patch.dict(os.environ, {}, clear=True):
730 config = HttpResourcePathConfig()
731 self.assertFalse(config.send_expect_on_put)
733 with unittest.mock.patch.dict(os.environ, {"LSST_HTTP_PUT_SEND_EXPECT_HEADER": "true"}, clear=True):
734 config = HttpResourcePathConfig()
735 self.assertTrue(config.send_expect_on_put)
737 def test_enable_fsspec(self):
738 # Ensure environment variable LSST_HTTP_ENABLE_FSSPEC is
739 # inspected to initialize the HttpResourcePathConfig class.
740 with unittest.mock.patch.dict(os.environ, {}, clear=True):
741 config = HttpResourcePathConfig()
742 self.assertFalse(config.fsspec_is_enabled)
744 with unittest.mock.patch.dict(os.environ, {"LSST_HTTP_ENABLE_FSSPEC": "any value"}, clear=True):
745 config = HttpResourcePathConfig()
746 self.assertTrue(config.fsspec_is_enabled)
748 def test_collect_memory_usage(self):
749 # Ensure environment variable LSST_HTTP_COLLECT_MEMORY_USAGE is
750 # inspected to initialize the HttpResourcePathConfig class.
751 with unittest.mock.patch.dict(os.environ, {}, clear=True):
752 config = HttpResourcePathConfig()
753 self.assertFalse(config.collect_memory_usage)
755 with unittest.mock.patch.dict(os.environ, {"LSST_HTTP_COLLECT_MEMORY_USAGE": "true"}, clear=True):
756 config = HttpResourcePathConfig()
757 self.assertTrue(config.collect_memory_usage)
759 def test_timeout(self):
760 # Ensure that when the connect and read timeouts are not specified
761 # the default values are stored in the config.
762 with unittest.mock.patch.dict(os.environ, {}, clear=True):
763 config = HttpResourcePathConfig()
764 self.assertAlmostEqual(config.timeout[0], config.DEFAULT_TIMEOUT_CONNECT)
765 self.assertAlmostEqual(config.timeout[1], config.DEFAULT_TIMEOUT_READ)
767 # Ensure that when both the connect and read timeouts are specified
768 # they are both stored in the config.
769 connect_timeout, read_timeout = 100.5, 200.8
770 with unittest.mock.patch.dict(
771 os.environ,
772 {"LSST_HTTP_TIMEOUT_CONNECT": str(connect_timeout), "LSST_HTTP_TIMEOUT_READ": str(read_timeout)},
773 clear=True,
774 ):
775 config = HttpResourcePathConfig()
776 self.assertAlmostEqual(config.timeout[0], connect_timeout)
777 self.assertAlmostEqual(config.timeout[1], read_timeout)
779 # Ensure that invalid float values (including NaN values) raise a
780 # ValueError.
781 for value in ("invalid", "NaN"):
782 with unittest.mock.patch.dict(
783 os.environ,
784 {"LSST_HTTP_TIMEOUT_CONNECT": value, "LSST_HTTP_TIMEOUT_READ": value},
785 clear=True,
786 ):
787 with self.assertRaises(ValueError):
788 config = HttpResourcePathConfig()
789 config.timeout()
791 def test_front_end_connections(self):
792 # Ensure that when the number of front end connections is not specified
793 # the default comes from the number of workers..
794 with unittest.mock.patch.dict(os.environ, {}, clear=True):
795 config = HttpResourcePathConfig()
796 self.assertEqual(config.front_end_connections, _get_num_workers())
798 # Ensure that when the number of front end connections is specified
799 # it is stored in the config.
800 connections = 42
801 with unittest.mock.patch.dict(
802 os.environ, {"LSST_HTTP_FRONTEND_PERSISTENT_CONNECTIONS": str(connections)}, clear=True
803 ):
804 config = HttpResourcePathConfig()
805 self.assertTrue(config.front_end_connections, connections)
807 def test_back_end_connections(self):
808 # Ensure that when the number of back end connections is not specified
809 # the default comes from the number of workers.
810 with unittest.mock.patch.dict(os.environ, {}, clear=True):
811 config = HttpResourcePathConfig()
812 self.assertEqual(config.back_end_connections, _get_num_workers())
814 # Ensure that when the number of back end connections is specified
815 # it is stored in the config.
816 connections = 42
817 with unittest.mock.patch.dict(
818 os.environ, {"LSST_HTTP_BACKEND_PERSISTENT_CONNECTIONS": str(connections)}, clear=True
819 ):
820 config = HttpResourcePathConfig()
821 self.assertTrue(config.back_end_connections, connections)
823 def test_digest_algorithm(self):
824 # Ensure that when no digest is specified in the environment, the
825 # configured digest algorithm is the empty string.
826 with unittest.mock.patch.dict(os.environ, {}, clear=True):
827 config = HttpResourcePathConfig()
828 self.assertEqual(config.digest_algorithm, "")
830 # Ensure that an invalid digest algorithm is ignored.
831 digest = "invalid"
832 with unittest.mock.patch.dict(os.environ, {"LSST_HTTP_DIGEST": digest}, clear=True):
833 config = HttpResourcePathConfig()
834 self.assertEqual(config.digest_algorithm, "")
836 # Ensure that an accepted digest algorithm is stored.
837 for digest in HttpResourcePathConfig().ACCEPTED_DIGESTS:
838 with unittest.mock.patch.dict(os.environ, {"LSST_HTTP_DIGEST": digest}, clear=True):
839 config = HttpResourcePathConfig()
840 self.assertTrue(config.digest_algorithm, digest)
842 def test_backoff_interval(self):
843 # Ensure that when no backoff interval is defined, the default values
844 # are used.
845 with unittest.mock.patch.dict(os.environ, {}, clear=True):
846 config = HttpResourcePathConfig()
847 self.assertAlmostEqual(config.backoff_min, config.DEFAULT_BACKOFF_MIN)
848 self.assertAlmostEqual(config.backoff_max, config.DEFAULT_BACKOFF_MAX)
850 # Ensure that an invalid value for backoff interval is ignored and
851 # the default value is used.
852 with unittest.mock.patch.dict(
853 os.environ, {"LSST_HTTP_BACKOFF_MIN": "XXX", "LSST_HTTP_BACKOFF_MAX": "YYY"}, clear=True
854 ):
855 config = HttpResourcePathConfig()
856 self.assertAlmostEqual(config.backoff_min, config.DEFAULT_BACKOFF_MIN)
857 self.assertAlmostEqual(config.backoff_max, config.DEFAULT_BACKOFF_MAX)
859 # Ensure that NaN values are ignored and the defaults values are used.
860 with unittest.mock.patch.dict(
861 os.environ, {"LSST_HTTP_BACKOFF_MIN": "NaN", "LSST_HTTP_BACKOFF_MAX": "NaN"}, clear=True
862 ):
863 config = HttpResourcePathConfig()
864 self.assertAlmostEqual(config.backoff_min, config.DEFAULT_BACKOFF_MIN)
865 self.assertAlmostEqual(config.backoff_max, config.DEFAULT_BACKOFF_MAX)
867 # Ensure that when specified, valid limits backoff interval are used.
868 backoff_min, backoff_max = 3.0, 8.0
869 with unittest.mock.patch.dict(
870 os.environ,
871 {"LSST_HTTP_BACKOFF_MIN": str(backoff_min), "LSST_HTTP_BACKOFF_MAX": str(backoff_max)},
872 clear=True,
873 ):
874 config = HttpResourcePathConfig()
875 self.assertAlmostEqual(config.backoff_min, backoff_min)
876 self.assertAlmostEqual(config.backoff_max, backoff_max)
878 def test_ca_bundle(self):
879 # Ensure that when no bundle is defined via environment variable
880 # LSST_HTTP_CACERT_BUNDLE either None is returned or the returned
881 # path does exist.
882 with unittest.mock.patch.dict(os.environ, {}, clear=True):
883 config = HttpResourcePathConfig()
884 if config.ca_bundle is not None:
885 self.assertTrue(os.path.exists(config.ca_bundle))
887 # Ensure that if LSST_HTTP_CACERT_BUNDLE is specified, the returned
888 # path is identical to the value of that variable (we don't check
889 # here that the path actually exists).
890 ca_bundle = "/path/to/bundle/dir"
891 with unittest.mock.patch.dict(os.environ, {"LSST_HTTP_CACERT_BUNDLE": ca_bundle}, clear=True):
892 config = HttpResourcePathConfig()
893 self.assertEqual(config.ca_bundle, ca_bundle)
895 def test_client_token(self):
896 # Ensure that when no token is defined via environment variable
897 # LSST_HTTP_AUTH_BEARER_TOKEN None is returned.
898 with unittest.mock.patch.dict(os.environ, {}, clear=True):
899 config = HttpResourcePathConfig()
900 self.assertIsNone(config.client_token)
902 # Ensure that if LSST_HTTP_AUTH_BEARER_TOKEN is specified, the returned
903 # path is identical to the value of that variable (we don't check
904 # here that the path actually exists).
905 token = "ABCDE12345"
906 with unittest.mock.patch.dict(os.environ, {"LSST_HTTP_AUTH_BEARER_TOKEN": token}, clear=True):
907 config = HttpResourcePathConfig()
908 self.assertEqual(config.client_token, token)
910 def test_client_cert_key(self):
911 """Ensure if user certificate and private key are provided via
912 environment variables, the configuration is correctly configured.
913 """
914 # Ensure that when no client certificate nor private key are provided
915 # via environment variables, both certificate and key are None.
916 with unittest.mock.patch.dict(os.environ, {}, clear=True):
917 config = HttpResourcePathConfig()
918 cert, key = config.client_cert_key
919 self.assertIsNone(cert)
920 self.assertIsNone(key)
922 # Create mock certificate and private key files.
923 with tempfile.NamedTemporaryFile(mode="wt", dir=self.tmpdir.ospath, delete=False) as f:
924 f.write("CERT")
925 client_cert = f.name
927 with tempfile.NamedTemporaryFile(mode="wt", dir=self.tmpdir.ospath, delete=False) as f:
928 f.write("KEY")
929 client_key = f.name
931 # Check that if only LSST_HTTP_AUTH_CLIENT_CERT is initialized
932 # an exception is raised.
933 with unittest.mock.patch.dict(os.environ, {"LSST_HTTP_AUTH_CLIENT_CERT": client_cert}, clear=True):
934 with self.assertRaises(ValueError):
935 HttpResourcePathConfig().client_cert_key
937 # Check that if only LSST_HTTP_AUTH_CLIENT_KEY is initialized
938 # an exception is raised.
939 with unittest.mock.patch.dict(os.environ, {"LSST_HTTP_AUTH_CLIENT_KEY": client_key}, clear=True):
940 with self.assertRaises(ValueError):
941 HttpResourcePathConfig().client_cert_key
943 # Check that the private key file must be accessible only by its owner.
944 with unittest.mock.patch.dict(
945 os.environ,
946 {"LSST_HTTP_AUTH_CLIENT_CERT": client_cert, "LSST_HTTP_AUTH_CLIENT_KEY": client_key},
947 clear=True,
948 ):
949 # Ensure the client certificate is initialized when only the owner
950 # can read the private key file.
951 os.chmod(client_key, stat.S_IRUSR)
952 config = HttpResourcePathConfig()
953 cert, key = config.client_cert_key
954 self.assertEqual(cert, client_cert)
955 self.assertEqual(key, client_key)
957 # Ensure an exception is raised if either group or other can access
958 # the private key file.
959 for mode in (stat.S_IRGRP, stat.S_IWGRP, stat.S_IXGRP, stat.S_IROTH, stat.S_IWOTH, stat.S_IXOTH):
960 os.chmod(client_key, stat.S_IRUSR | mode)
961 with self.assertRaises(PermissionError):
962 HttpResourcePathConfig().client_cert_key
964 # Check that if environment variable X509_USER_PROXY is initialized
965 # the configuration uses its value as the client's certificate and key.
966 with unittest.mock.patch.dict(os.environ, {"X509_USER_PROXY": client_cert}, clear=True):
967 config = HttpResourcePathConfig()
968 cert, key = config.client_cert_key
969 self.assertEqual(cert, client_cert)
970 self.assertEqual(key, client_cert)
973class WebdavUtilsTestCase(unittest.TestCase):
974 """Test for the Webdav related utilities."""
976 def setUp(self):
977 self.tmpdir = ResourcePath(makeTestTempDir(TESTDIR))
979 def tearDown(self):
980 if self.tmpdir and self.tmpdir.isLocal:
981 removeTestTempDir(self.tmpdir.ospath)
983 def test_is_protected(self):
984 self.assertFalse(_is_protected("/this-file-does-not-exist"))
986 with tempfile.NamedTemporaryFile(mode="wt", dir=self.tmpdir.ospath, delete=False) as f:
987 f.write("XXXX")
988 file_path = f.name
990 os.chmod(file_path, stat.S_IRUSR)
991 self.assertTrue(_is_protected(file_path))
993 for mode in (stat.S_IRGRP, stat.S_IWGRP, stat.S_IXGRP, stat.S_IROTH, stat.S_IWOTH, stat.S_IXOTH):
994 os.chmod(file_path, stat.S_IRUSR | mode)
995 self.assertFalse(_is_protected(file_path))
998class BearerTokenAuthTestCase(unittest.TestCase):
999 """Test for the BearerTokenAuth class."""
1001 def setUp(self):
1002 self.tmpdir = ResourcePath(makeTestTempDir(TESTDIR))
1003 self.token = "ABCDE1234"
1005 def tearDown(self):
1006 if self.tmpdir and self.tmpdir.isLocal:
1007 removeTestTempDir(self.tmpdir.ospath)
1009 def test_empty_token(self):
1010 """Ensure that when no token is provided the request is not
1011 modified.
1012 """
1013 auth = BearerTokenAuth(None)
1014 auth._refresh()
1015 self.assertIsNone(auth._token)
1016 self.assertIsNone(auth._path)
1017 req = requests.Request("GET", "https://example.org")
1018 self.assertEqual(auth(req), req)
1020 def test_token_value(self):
1021 """Ensure that when a token value is provided, the 'Authorization'
1022 header is added to the requests.
1023 """
1024 auth = BearerTokenAuth(self.token)
1025 req = auth(requests.Request("GET", "https://example.org").prepare())
1026 self.assertEqual(req.headers.get("Authorization"), f"Bearer {self.token}")
1028 def test_token_insecure_http(self):
1029 """Ensure that no 'Authorization' header is attached to a request when
1030 using insecure HTTP.
1031 """
1032 auth = BearerTokenAuth(self.token)
1033 for url in ("http://example.org", "HTTP://example.org", "HttP://example.org"):
1034 req = auth(requests.Request("GET", url).prepare())
1035 self.assertIsNone(req.headers.get("Authorization"))
1037 def test_token_file(self):
1038 """Ensure when the provided token is a file path, its contents is
1039 correctly used in the the 'Authorization' header of the requests.
1040 """
1041 with tempfile.NamedTemporaryFile(mode="wt", dir=self.tmpdir.ospath, delete=False) as f:
1042 f.write(self.token)
1043 token_file_path = f.name
1045 # Ensure the request's "Authorization" header is set with the right
1046 # token value
1047 os.chmod(token_file_path, stat.S_IRUSR)
1048 auth = BearerTokenAuth(token_file_path)
1049 req = auth(requests.Request("GET", "https://example.org").prepare())
1050 self.assertEqual(req.headers.get("Authorization"), f"Bearer {self.token}")
1052 # Ensure an exception is raised if either group or other can read the
1053 # token file
1054 for mode in (stat.S_IRGRP, stat.S_IWGRP, stat.S_IXGRP, stat.S_IROTH, stat.S_IWOTH, stat.S_IXOTH):
1055 os.chmod(token_file_path, stat.S_IRUSR | mode)
1056 with self.assertRaises(PermissionError):
1057 BearerTokenAuth(token_file_path)
1060class SessionStoreTestCase(unittest.TestCase):
1061 """Test for the SessionStore class."""
1063 def setUp(self):
1064 self.tmpdir = ResourcePath(makeTestTempDir(TESTDIR))
1065 self.rpath = ResourcePath("https://example.org")
1067 def tearDown(self):
1068 if self.tmpdir and self.tmpdir.isLocal:
1069 removeTestTempDir(self.tmpdir.ospath)
1071 def test_ca_cert_bundle(self):
1072 """Ensure that, if specified, a certificate authorities bundle is used
1073 to authentify the remote server.
1074 """
1075 with tempfile.NamedTemporaryFile(mode="wt", dir=self.tmpdir.ospath, delete=False) as f:
1076 f.write("CERT BUNDLE")
1077 cert_bundle = f.name
1079 with unittest.mock.patch.dict(os.environ, {"LSST_HTTP_CACERT_BUNDLE": cert_bundle}, clear=True):
1080 config = HttpResourcePathConfig()
1081 session = SessionStore(config=config).get(self.rpath)
1082 self.assertEqual(session.verify, cert_bundle)
1084 def test_user_cert(self):
1085 """Ensure if user certificate and private key are provided, they are
1086 used for authenticating the client.
1087 """
1088 # Create mock certificate and private key files.
1089 with tempfile.NamedTemporaryFile(mode="wt", dir=self.tmpdir.ospath, delete=False) as f:
1090 f.write("CERT")
1091 client_cert = f.name
1093 with tempfile.NamedTemporaryFile(mode="wt", dir=self.tmpdir.ospath, delete=False) as f:
1094 f.write("KEY")
1095 client_key = f.name
1097 # Check both LSST_HTTP_AUTH_CLIENT_CERT and LSST_HTTP_AUTH_CLIENT_KEY
1098 # must be initialized.
1099 with unittest.mock.patch.dict(os.environ, {"LSST_HTTP_AUTH_CLIENT_CERT": client_cert}, clear=True):
1100 with self.assertRaises(ValueError):
1101 config = HttpResourcePathConfig()
1102 SessionStore(config=config).get(self.rpath)
1104 with unittest.mock.patch.dict(os.environ, {"LSST_HTTP_AUTH_CLIENT_KEY": client_key}, clear=True):
1105 with self.assertRaises(ValueError):
1106 config = HttpResourcePathConfig()
1107 SessionStore(config=config).get(self.rpath)
1109 # Check private key file must be accessible only by its owner.
1110 with unittest.mock.patch.dict(
1111 os.environ,
1112 {"LSST_HTTP_AUTH_CLIENT_CERT": client_cert, "LSST_HTTP_AUTH_CLIENT_KEY": client_key},
1113 clear=True,
1114 ):
1115 # Ensure the session client certificate is initialized when
1116 # only the owner can read the private key file.
1117 os.chmod(client_key, stat.S_IRUSR)
1118 config = HttpResourcePathConfig()
1119 session = SessionStore(config=config).get(self.rpath)
1120 self.assertEqual(session.cert[0], client_cert)
1121 self.assertEqual(session.cert[1], client_key)
1123 # Ensure an exception is raised if either group or other can access
1124 # the private key file.
1125 for mode in (stat.S_IRGRP, stat.S_IWGRP, stat.S_IXGRP, stat.S_IROTH, stat.S_IWOTH, stat.S_IXOTH):
1126 os.chmod(client_key, stat.S_IRUSR | mode)
1127 with self.assertRaises(PermissionError):
1128 config = HttpResourcePathConfig()
1129 SessionStore(config=config).get(self.rpath)
1131 def test_token_env(self):
1132 """Ensure when a token is provided via an environment variable
1133 the sessions are equipped with a BearerTokenAuth.
1134 """
1135 token = "ABCDE"
1136 with unittest.mock.patch.dict(os.environ, {"LSST_HTTP_AUTH_BEARER_TOKEN": token}, clear=True):
1137 config = HttpResourcePathConfig()
1138 session = SessionStore(config=config).get(self.rpath)
1139 self.assertEqual(type(session.auth), lsst.resources.http.BearerTokenAuth)
1140 self.assertEqual(session.auth._token, token)
1141 self.assertIsNone(session.auth._path)
1143 def test_sessions(self):
1144 """Ensure the session caching mechanism works."""
1145 # Ensure the store provides a session for a given URL
1146 root_url = "https://example.org"
1147 config = HttpResourcePathConfig()
1148 store = SessionStore(config=config)
1149 session = store.get(ResourcePath(root_url))
1150 self.assertIsNotNone(session)
1152 # Ensure the sessions retrieved from a single store with the same
1153 # root URIs are equal
1154 for u in (f"{root_url}", f"{root_url}/path/to/file"):
1155 self.assertEqual(session, store.get(ResourcePath(u)))
1157 # Ensure sessions retrieved for different root URIs are different
1158 another_url = "https://another.example.org"
1159 self.assertNotEqual(session, store.get(ResourcePath(another_url)))
1161 # Ensure the sessions retrieved from a single store for URLs with
1162 # different port numbers are different
1163 root_url_with_port = f"{another_url}:12345"
1164 session = store.get(ResourcePath(root_url_with_port))
1165 self.assertNotEqual(session, store.get(ResourcePath(another_url)))
1167 # Ensure the sessions retrieved from a single store with the same
1168 # root URIs (including port numbers) are equal
1169 for u in (f"{root_url_with_port}", f"{root_url_with_port}/path/to/file"):
1170 self.assertEqual(session, store.get(ResourcePath(u)))
1173class TestContentRange(unittest.TestCase):
1174 """Test parsing of Content-Range header."""
1176 def test_full_data(self):
1177 parsed = parse_content_range_header("bytes 123-2555/12345")
1178 self.assertEqual(parsed.range_start, 123)
1179 self.assertEqual(parsed.range_end, 2555)
1180 self.assertEqual(parsed.total, 12345)
1182 parsed = parse_content_range_header(" bytes 0-0/5 ")
1183 self.assertEqual(parsed.range_start, 0)
1184 self.assertEqual(parsed.range_end, 0)
1185 self.assertEqual(parsed.total, 5)
1187 def test_empty_total(self):
1188 parsed = parse_content_range_header("bytes 123-2555/*")
1189 self.assertEqual(parsed.range_start, 123)
1190 self.assertEqual(parsed.range_end, 2555)
1191 self.assertIsNone(parsed.total)
1193 parsed = parse_content_range_header(" bytes 0-0/* ")
1194 self.assertEqual(parsed.range_start, 0)
1195 self.assertEqual(parsed.range_end, 0)
1196 self.assertIsNone(parsed.total)
1198 def test_empty_range(self):
1199 parsed = parse_content_range_header("bytes */12345")
1200 self.assertIsNone(parsed.range_start)
1201 self.assertIsNone(parsed.range_end)
1202 self.assertEqual(parsed.total, 12345)
1204 parsed = parse_content_range_header(" bytes */5 ")
1205 self.assertIsNone(parsed.range_start)
1206 self.assertIsNone(parsed.range_end)
1207 self.assertEqual(parsed.total, 5)
1209 def test_invalid_input(self):
1210 with self.assertRaises(ValueError):
1211 parse_content_range_header("pages 0-10/12")
1214if __name__ == "__main__":
1215 unittest.main()