Coverage for tests / test_http.py: 16%

673 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-28 08:32 +0000

1# This file is part of lsst-resources. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12import hashlib 

13import io 

14import os.path 

15import pickle 

16import random 

17import shutil 

18import socket 

19import stat 

20import string 

21import tempfile 

22import time 

23import unittest 

24import unittest.mock 

25import warnings 

26from collections.abc import Callable 

27from datetime import UTC 

28from threading import Thread 

29from typing import cast 

30 

31try: 

32 from cheroot import wsgi 

33 from wsgidav.wsgidav_app import WsgiDAVApp 

34except ImportError: 

35 WsgiDAVApp = None 

36 

37import requests 

38import responses 

39import responses.matchers 

40 

41import lsst.resources 

42from lsst.resources import ResourceInfo, ResourcePath 

43from lsst.resources._resourceHandles._httpResourceHandle import ( 

44 HttpReadResourceHandle, 

45 parse_content_range_header, 

46) 

47from lsst.resources.http import ( 

48 BearerTokenAuth, 

49 HttpResourcePath, 

50 HttpResourcePathConfig, 

51 SessionStore, 

52 _get_dav_and_server_headers, 

53 _is_protected, 

54) 

55from lsst.resources.tests import GenericReadWriteTestCase, GenericTestCase 

56from lsst.resources.utils import _get_num_workers, makeTestTempDir, removeTestTempDir 

57 

58TESTDIR = os.path.abspath(os.path.dirname(__file__)) 

59 

60 

61class GenericHttpTestCase(GenericTestCase, unittest.TestCase): 

62 """Generic tests of http URIs.""" 

63 

64 scheme = "http" 

65 netloc = "server.example" 

66 

67 def test_root_uri(self): 

68 self.assertEqual(ResourcePath("http://server.com").root_uri(), ResourcePath("http://server.com/")) 

69 self.assertEqual( 

70 ResourcePath("http://user:password@server.com:3000/").root_uri(), 

71 ResourcePath("http://user:password@server.com:3000/"), 

72 ) 

73 self.assertEqual( 

74 ResourcePath("http://user:password@server.com:3000/some/path").root_uri(), 

75 ResourcePath("http://user:password@server.com:3000/"), 

76 ) 

77 self.assertEqual( 

78 ResourcePath("http://user:password@server.com:3000/some/path#fragment").root_uri(), 

79 ResourcePath("http://user:password@server.com:3000/"), 

80 ) 

81 self.assertEqual( 

82 ResourcePath("http://user:password@server.com:3000/some/path?param=value").root_uri(), 

83 ResourcePath("http://user:password@server.com:3000/"), 

84 ) 

85 self.assertEqual( 

86 ResourcePath("http://user:password@server.com:3000/some/path;parameters").root_uri(), 

87 ResourcePath("http://user:password@server.com:3000/"), 

88 ) 

89 

90 @responses.activate 

91 def test_extra_headers(self): 

92 url = "http://test.example/something.txt" 

93 path = HttpResourcePath.create_http_resource_path( 

94 url, extra_headers={"Authorization": "Bearer my-token"} 

95 ) 

96 

97 self.assertEqual(str(path), "http://test.example/something.txt") 

98 self.assertEqual(path._extra_headers, {"Authorization": "Bearer my-token"}) 

99 

100 # Make sure that headers are added to requests. 

101 responses.add( 

102 responses.GET, 

103 url, 

104 b"test", 

105 match=[responses.matchers.header_matcher({"Authorization": "Bearer my-token"})], 

106 ) 

107 self.assertEqual(path.read(), b"test") 

108 

109 # Make sure that headers are added to fsspec. 

110 # This triggers logic for "webdav" vs "not-webdav" that does an OPTIONS 

111 # request, so we need to check that too. 

112 responses.add( 

113 responses.OPTIONS, 

114 "http://test.example/", 

115 match=[responses.matchers.header_matcher({"Authorization": "Bearer my-token"})], 

116 ) 

117 fs, _ = path.to_fsspec() 

118 self.assertEqual(fs.client_kwargs.get("headers"), {"Authorization": "Bearer my-token"}) 

119 

120 # Extra headers should be preserved through pickle, to ensure that 

121 # `mtransfer` and similar methods work in multi-process mode. 

122 dump = pickle.dumps(path) 

123 restored = pickle.loads(dump) 

124 self.assertEqual(restored._extra_headers, {"Authorization": "Bearer my-token"}) 

125 

126 # Extra headers should be preserved when making a modified copy of the 

127 # ResourcePath using replace() or the ResourcePath constructor. 

128 replacement = path.replace(forceDirectory=True) 

129 self.assertEqual(replacement._extra_headers, {"Authorization": "Bearer my-token"}) 

130 copy = ResourcePath(path, forceDirectory=True) 

131 self.assertEqual(copy._extra_headers, {"Authorization": "Bearer my-token"}) 

132 

133 @responses.activate 

134 def test_get_info(self): 

135 _get_dav_and_server_headers.cache_clear() 

136 url = "http://test.example/something.txt" 

137 responses.add(responses.OPTIONS, "http://test.example/", status=200) 

138 responses.add( 

139 responses.HEAD, 

140 url, 

141 status=200, 

142 headers={ 

143 "Content-Length": "123", 

144 "Last-Modified": "Wed, 12 Mar 2025 10:11:13 GMT", 

145 "Digest": "md5=rL0Y20zC+Fzt72VPzMSk2A==, sha-256=def456", 

146 }, 

147 ) 

148 

149 info = ResourcePath(url).get_info() 

150 self.assertIsInstance(info, ResourceInfo) 

151 self.assertTrue(info.is_file) 

152 self.assertEqual(info.size, 123) 

153 self.assertEqual(info.last_modified.tzinfo, UTC) 

154 self.assertEqual(info.last_modified.year, 2025) 

155 self.assertEqual(info.checksums, {"md5": "rL0Y20zC+Fzt72VPzMSk2A==", "sha-256": "def456"}) 

156 self.assertEqual(len(responses.calls), 2) 

157 

158 

159class HttpReadWriteWebdavTestCase(GenericReadWriteTestCase, unittest.TestCase): 

160 """Test with a real webDAV server, as opposed to mocking responses.""" 

161 

162 scheme = "http" 

163 local_files_to_remove: list[str] = [] 

164 

165 @classmethod 

166 def setUpClass(cls): 

167 cls.webdav_tmpdir = tempfile.mkdtemp(prefix="webdav-server-test-") 

168 cls.server_thread = None 

169 

170 # Disable warnings about socket connections left open. We purposedly 

171 # keep network connections to the remote server open and have no 

172 # means through the API exposed by Requests of actually close the 

173 # underlyng sockets to make tests pass without warning. 

174 warnings.filterwarnings(action="ignore", message=r"unclosed.*socket", category=ResourceWarning) 

175 

176 # Should we test against a running server? 

177 # 

178 # This is convenient for testing against real servers in the 

179 # developer environment by initializing the environment variable 

180 # LSST_RESOURCES_HTTP_TEST_SERVER_URL with the URL of the server, e.g. 

181 # https://dav.example.org:1234/path/to/top/dir 

182 if (test_endpoint := os.getenv("LSST_RESOURCES_HTTP_TEST_SERVER_URL")) is not None: 

183 # Run this test case against the specified server. 

184 uri = ResourcePath(test_endpoint) 

185 cls.scheme = uri.scheme 

186 cls.netloc = uri.netloc 

187 cls.base_path = uri.path 

188 elif WsgiDAVApp is not None: 

189 # WsgiDAVApp is available, launch a local server in its own 

190 # thread to expose a local temporary directory and run this 

191 # test case against it. 

192 cls.port_number = cls._get_port_number() 

193 cls.stop_webdav_server = False 

194 cls.server_thread = Thread( 

195 target=cls._serve_webdav, 

196 args=(cls, cls.webdav_tmpdir, cls.port_number, lambda: cls.stop_webdav_server), 

197 daemon=True, 

198 ) 

199 cls.server_thread.start() 

200 

201 # Wait for it to start 

202 time.sleep(1) 

203 

204 # Initialize the server endpoint 

205 cls.netloc = f"127.0.0.1:{cls.port_number}" 

206 else: 

207 cls.skipTest( 

208 cls, 

209 "neither WsgiDAVApp is available nor a webDAV test endpoint is configured to test against", 

210 ) 

211 

212 @classmethod 

213 def tearDownClass(cls): 

214 # Stop the WsgiDAVApp server, if any 

215 if WsgiDAVApp is not None: 

216 # Shut down of the webdav server and wait for the thread to exit 

217 cls.stop_webdav_server = True 

218 if cls.server_thread is not None: 

219 cls.server_thread.join() 

220 

221 # Remove local temporary files 

222 for file in cls.local_files_to_remove: 

223 if os.path.exists(file): 

224 os.remove(file) 

225 

226 # Remove temp dir 

227 if cls.webdav_tmpdir: 

228 shutil.rmtree(cls.webdav_tmpdir, ignore_errors=True) 

229 

230 # Reset the warnings filter. 

231 warnings.resetwarnings() 

232 

233 def tearDown(self): 

234 if self.tmpdir: 

235 self.tmpdir.remove() 

236 

237 # Clear sessions. Some sockets may be left open, because urllib3 

238 # doest not close in-flight connections. 

239 # See https://urllib3.readthedocs.io > API Reference > 

240 # Pool Manager > clear() 

241 # I cannot add the full URL here because it is longer than 79 

242 # characters. 

243 self.tmpdir._clear_sessions() 

244 

245 super().tearDown() 

246 

247 def test_dav_file_handle(self): 

248 # Upload a new file with known contents. 

249 contents = "These are some \n bytes to read" 

250 remote_file = self.tmpdir.join(self._get_file_name()) 

251 self.assertIsNone(remote_file.write(data=contents, overwrite=True)) 

252 

253 # Test that the correct handle is returned. 

254 with remote_file.open("rb") as handle: 

255 self.assertIsInstance(handle, HttpReadResourceHandle) 

256 

257 # Test reading byte ranges works 

258 with remote_file.open("rb") as handle: 

259 sub_contents = contents[:10] 

260 handle = cast(HttpReadResourceHandle, handle) 

261 result = handle.read(len(sub_contents)).decode() 

262 self.assertEqual(result, sub_contents) 

263 # Verify there is no internal buffer. 

264 self.assertIsNone(handle._completeBuffer) 

265 # Verify the position. 

266 self.assertEqual(handle.tell(), len(sub_contents)) 

267 

268 # Jump back to the beginning and test if reading the whole file 

269 # prompts the internal buffer to be read. 

270 handle.seek(0) 

271 self.assertEqual(handle.tell(), 0) 

272 result = handle.read().decode() 

273 self.assertIsNotNone(handle._completeBuffer) 

274 self.assertEqual(result, contents) 

275 

276 # Check that flush works on read-only handle. 

277 handle.flush() 

278 

279 # Verify reading as a string handle works as expected. 

280 with remote_file.open("r") as handle: 

281 self.assertIsInstance(handle, io.TextIOWrapper) 

282 

283 handle = cast(io.TextIOWrapper, handle) 

284 self.assertIsInstance(handle.buffer, HttpReadResourceHandle) 

285 

286 # Check if string methods work. 

287 result = handle.read() 

288 self.assertEqual(result, contents) 

289 

290 # Check that flush works on read-only handle. 

291 handle.flush() 

292 

293 # Verify that write modes invoke the default base method 

294 with remote_file.open("w") as handle: 

295 self.assertIsInstance(handle, io.StringIO) 

296 

297 def test_dav_is_dav_enpoint(self): 

298 # Ensure the server is a webDAV endpoint 

299 self.assertTrue(self.tmpdir.is_webdav_endpoint) 

300 

301 def test_dav_mkdir(self): 

302 # Check creation and deletion of an empty directory 

303 subdir = self.tmpdir.join(self._get_dir_name(), forceDirectory=True) 

304 self.assertIsNone(subdir.mkdir()) 

305 self.assertTrue(subdir.exists()) 

306 

307 # Creating an existing remote directory must succeed 

308 self.assertIsNone(subdir.mkdir()) 

309 

310 # Deletion of an existing directory must succeed 

311 self.assertIsNone(subdir.remove()) 

312 

313 # Deletion of an non-existing directory must succeed 

314 subir_not_exists = self.tmpdir.join(self._get_dir_name(), forceDirectory=True) 

315 self.assertIsNone(subir_not_exists.remove()) 

316 

317 # Creation of a directory at a path where a file exists must raise 

318 file = self.tmpdir.join(self._get_file_name(), forceDirectory=False) 

319 file.write(data=None, overwrite=True) 

320 self.assertTrue(file.exists()) 

321 

322 existing_file = self.tmpdir.join(file.basename(), forceDirectory=True) 

323 with self.assertRaises(NotADirectoryError): 

324 self.assertIsNone(existing_file.mkdir()) 

325 

326 def test_dav_upload_download(self): 

327 # Test upload a randomly-generated file via write() with and without 

328 # overwrite 

329 local_file, file_size = self._generate_file() 

330 with open(local_file, "rb") as f: 

331 data = f.read() 

332 

333 remote_file = self.tmpdir.join(self._get_file_name()) 

334 self.assertIsNone(remote_file.write(data, overwrite=True)) 

335 self.assertTrue(remote_file.exists()) 

336 self.assertEqual(remote_file.size(), file_size) 

337 

338 # Write without overwrite must raise since target file exists 

339 with self.assertRaises(FileExistsError): 

340 remote_file.write(data, overwrite=False) 

341 

342 # Download the file we just uploaded. Compute and compare a digest of 

343 # the uploaded and downloaded data and ensure they match 

344 downloaded_data = remote_file.read() 

345 self.assertEqual(len(downloaded_data), file_size) 

346 upload_digest = self._compute_digest(data) 

347 download_digest = self._compute_digest(downloaded_data) 

348 self.assertEqual(upload_digest, download_digest) 

349 os.remove(local_file) 

350 

351 def test_dav_as_local(self): 

352 contents = str.encode("12345") 

353 remote_file = self.tmpdir.join(self._get_file_name()) 

354 self.assertIsNone(remote_file.write(data=contents, overwrite=True)) 

355 

356 with remote_file._as_local() as local_uri: 

357 self.assertTrue(local_uri.isTemporary) 

358 self.assertTrue(os.path.exists(local_uri.ospath)) 

359 self.assertTrue(os.stat(local_uri.ospath).st_size, len(contents)) 

360 self.assertEqual(local_uri.read(), contents) 

361 self.assertFalse(local_uri.exists()) 

362 

363 def test_dav_size(self): 

364 # Size of a non-existent file must raise. 

365 remote_file = self.tmpdir.join(self._get_file_name()) 

366 with self.assertRaises(FileNotFoundError): 

367 remote_file.size() 

368 

369 # Retrieving the size of a remote directory using a file-like path must 

370 # raise 

371 remote_dir = self.tmpdir.join(self._get_dir_name(), forceDirectory=True) 

372 self.assertIsNone(remote_dir.mkdir()) 

373 self.assertTrue(remote_dir.exists()) 

374 

375 dir_as_file = ResourcePath(remote_dir.geturl().rstrip("/"), forceDirectory=False) 

376 with self.assertRaises(IsADirectoryError): 

377 dir_as_file.size() 

378 

379 def test_dav_upload_creates_dir(self): 

380 # Uploading a file to a non existing directory must ensure its 

381 # parent directories are automatically created and upload succeeds 

382 non_existing_dir = self.tmpdir.join(self._get_dir_name(), forceDirectory=True) 

383 non_existing_dir = non_existing_dir.join(self._get_dir_name(), forceDirectory=True) 

384 non_existing_dir = non_existing_dir.join(self._get_dir_name(), forceDirectory=True) 

385 remote_file = non_existing_dir.join(self._get_file_name()) 

386 

387 local_file, file_size = self._generate_file() 

388 with open(local_file, "rb") as f: 

389 data = f.read() 

390 self.assertIsNone(remote_file.write(data, overwrite=True)) 

391 

392 self.assertTrue(remote_file.exists()) 

393 self.assertEqual(remote_file.size(), file_size) 

394 self.assertTrue(remote_file.parent().exists()) 

395 

396 downloaded_data = remote_file.read() 

397 upload_digest = self._compute_digest(data) 

398 download_digest = self._compute_digest(downloaded_data) 

399 self.assertEqual(upload_digest, download_digest) 

400 os.remove(local_file) 

401 

402 def test_dav_transfer_from(self): 

403 # Transfer from local file via "copy", with and without overwrite 

404 remote_file = self.tmpdir.join(self._get_file_name()) 

405 local_file, _ = self._generate_file() 

406 source_file = ResourcePath(local_file) 

407 self.assertIsNone(remote_file.transfer_from(source_file, transfer="copy", overwrite=True)) 

408 self.assertTrue(remote_file.exists()) 

409 self.assertEqual(remote_file.size(), source_file.size()) 

410 with self.assertRaises(FileExistsError): 

411 remote_file.transfer_from(ResourcePath(local_file), transfer="copy", overwrite=False) 

412 

413 # Transfer from remote file via "copy", with and without overwrite 

414 source_file = remote_file 

415 target_file = self.tmpdir.join(self._get_file_name()) 

416 self.assertIsNone(target_file.transfer_from(source_file, transfer="copy", overwrite=True)) 

417 self.assertTrue(target_file.exists()) 

418 self.assertEqual(target_file.size(), source_file.size()) 

419 

420 # Transfer without overwrite must raise since target resource exists 

421 with self.assertRaises(FileExistsError): 

422 target_file.transfer_from(source_file, transfer="copy", overwrite=False) 

423 

424 # Test transfer from local file via "move", with and without overwrite 

425 source_file = ResourcePath(local_file) 

426 source_size = source_file.size() 

427 target_file = self.tmpdir.join(self._get_file_name()) 

428 self.assertIsNone(target_file.transfer_from(source_file, transfer="move", overwrite=True)) 

429 self.assertTrue(target_file.exists()) 

430 self.assertEqual(target_file.size(), source_size) 

431 self.assertFalse(source_file.exists()) 

432 

433 # Test transfer without overwrite must raise since target resource 

434 # exists 

435 local_file, file_size = self._generate_file() 

436 with self.assertRaises(FileExistsError): 

437 source_file = ResourcePath(local_file) 

438 target_file.transfer_from(source_file, transfer="move", overwrite=False) 

439 

440 # Test transfer from remote file via "move" with and without overwrite 

441 # must succeed 

442 source_file = target_file 

443 source_size = source_file.size() 

444 target_file = self.tmpdir.join(self._get_file_name()) 

445 self.assertIsNone(target_file.transfer_from(source_file, transfer="move", overwrite=True)) 

446 self.assertTrue(target_file.exists()) 

447 self.assertEqual(target_file.size(), source_size) 

448 self.assertFalse(source_file.exists()) 

449 

450 # Transfer without overwrite must raise since target resource exists 

451 with self.assertRaises(FileExistsError): 

452 source_file = ResourcePath(local_file) 

453 target_file.transfer_from(source_file, transfer="move", overwrite=False) 

454 

455 def test_dav_handle(self): 

456 # Resource handle must succeed 

457 target_file = self.tmpdir.join(self._get_file_name()) 

458 data = "abcdefghi" 

459 self.assertIsNone(target_file.write(data, overwrite=True)) 

460 with target_file.open("rb") as handle: 

461 handle.seek(1) 

462 self.assertEqual(handle.read(4).decode("utf-8"), data[1:5]) 

463 

464 def test_dav_delete(self): 

465 # Deletion of an existing remote file must succeed 

466 local_file, file_size = self._generate_file() 

467 with open(local_file, "rb") as f: 

468 data = f.read() 

469 

470 remote_file = self.tmpdir.join(self._get_file_name()) 

471 self.assertIsNone(remote_file.write(data, overwrite=True)) 

472 self.assertTrue(remote_file.exists()) 

473 self.assertEqual(remote_file.size(), file_size) 

474 self.assertIsNone(remote_file.remove()) 

475 os.remove(local_file) 

476 

477 # Deletion of a non-existing remote file must succeed 

478 non_existing_file = self.tmpdir.join(self._get_file_name()) 

479 self.assertIsNone(non_existing_file.remove()) 

480 

481 # Deletion of a non-empty remote directory must succeed 

482 subdir = self.tmpdir.join(self._get_dir_name(), forceDirectory=True) 

483 self.assertIsNone(subdir.mkdir()) 

484 self.assertTrue(subdir.exists()) 

485 local_file, _ = self._generate_file() 

486 source_file = ResourcePath(local_file) 

487 target_file = self.tmpdir.join(self._get_file_name(), forceDirectory=True) 

488 self.assertIsNone(target_file.transfer_from(source_file, transfer="copy", overwrite=True)) 

489 self.assertIsNone(subdir.remove()) 

490 self.assertFalse(subdir.exists()) 

491 os.remove(local_file) 

492 

493 def test_dav_to_fsspec(self): 

494 # Upload a randomly-generated file via write() with overwrite. 

495 local_file, file_size = self._generate_file() 

496 with open(local_file, "rb") as f: 

497 data = f.read() 

498 

499 remote_file = self.tmpdir.join(self._get_file_name()) 

500 self.assertIsNone(remote_file.write(data, overwrite=True)) 

501 self.assertTrue(remote_file.exists()) 

502 self.assertEqual(remote_file.size(), file_size) 

503 remote_file_url = remote_file.geturl() 

504 

505 # to_fsspec() may raise if that feature is not specifically 

506 # enabled in the environment and remote server is one of the 

507 # webDAV servers that support signing URLs. 

508 with unittest.mock.patch.dict(os.environ, {}, clear=True): 

509 try: 

510 # Force reinitialization of the config from the environment 

511 HttpResourcePath._reload_config() 

512 fsys, url = ResourcePath(remote_file_url).to_fsspec() 

513 self.assertEqual(data, fsys.cat(url)) 

514 except ImportError as e: 

515 self.assertTrue("disable" in str(e)) 

516 

517 # Ensure to_fsspec() works if that feature is enabled in the 

518 # environment. 

519 with unittest.mock.patch.dict(os.environ, {"LSST_HTTP_ENABLE_FSSPEC": "true"}, clear=True): 

520 try: 

521 # Force reinitialization of the config from the environment. 

522 HttpResourcePath._reload_config() 

523 rpath = ResourcePath(remote_file_url) 

524 

525 # Ensure that the contents of the remote file we just 

526 # uploaded is identical to the contents of that file when 

527 # retrieved via fsspec.open(). 

528 fsys, url = rpath.to_fsspec() 

529 with fsys.open(url) as f: 

530 self.assertEqual(data, f.read()) 

531 

532 # Ensure the contents is identical to the result of 

533 # fsspec.cat() 

534 self.assertEqual(data, fsys.cat(url)) 

535 

536 # Ensure that attempting to modify a remote via via fsspec 

537 # fails, since the returned URL is signed for download only. 

538 # fsspec.rm() raises NotImplementedError if it cannot remove 

539 # the remote file. 

540 if rpath.server_signs_urls: 

541 with self.assertRaises(NotImplementedError): 

542 fsys, url = rpath.to_fsspec() 

543 fsys.rm(url) 

544 except NotImplementedError as e: 

545 # to_fsspec() must succeed if remote server knows how to 

546 # sign URLs 

547 if rpath.server_signs_urls: 

548 raise e 

549 

550 # Force reinitialization of the config from the environment and 

551 # clean up local file. 

552 HttpResourcePath._reload_config() 

553 os.remove(local_file) 

554 

555 @responses.activate 

556 def test_is_webdav_endpoint(self): 

557 davEndpoint = "http://www.lsstwithwebdav.org" 

558 responses.add(responses.OPTIONS, davEndpoint, status=200, headers={"DAV": "1,2,3"}) 

559 self.assertTrue(ResourcePath(davEndpoint).is_webdav_endpoint) 

560 

561 plainHttpEndpoint = "http://www.lsstwithoutwebdav.org" 

562 responses.add(responses.OPTIONS, plainHttpEndpoint, status=200) 

563 self.assertFalse(ResourcePath(plainHttpEndpoint).is_webdav_endpoint) 

564 

565 notWebdavEndpoint = "http://www.notwebdav.org" 

566 responses.add(responses.OPTIONS, notWebdavEndpoint, status=403) 

567 self.assertFalse(ResourcePath(notWebdavEndpoint).is_webdav_endpoint) 

568 

569 @responses.activate 

570 def test_plain_http_url_signing(self): 

571 # As in test_is_webdav_endpoint above, configure a URL to appear as a 

572 # non-webdav HTTP server. 

573 plainHttpEndpoint = "http://nonwebdav.test" 

574 responses.add(responses.OPTIONS, plainHttpEndpoint, status=200) 

575 

576 # Plain HTTP URLs are already readable without authentication, so 

577 # generating a pre-signed URL is a no-op. 

578 path = ResourcePath("http://nonwebdav.test/file#frag") 

579 self.assertEqual( 

580 path.generate_presigned_get_url(expiration_time_seconds=300), "http://nonwebdav.test/file#frag" 

581 ) 

582 

583 # Writing to an arbitrary plain HTTP URL is unlikely to work, so we 

584 # don't generate put URLs. 

585 with self.assertRaises(NotImplementedError): 

586 path.generate_presigned_put_url(expiration_time_seconds=300) 

587 

588 @responses.activate 

589 def test_server_identity(self): 

590 server = "MyServer/v1.2.3" 

591 endpointWithServer = "http://www.lsstwithserverheader.org" 

592 responses.add(responses.OPTIONS, endpointWithServer, status=200, headers={"Server": server}) 

593 self.assertEqual(ResourcePath(endpointWithServer).server, "myserver") 

594 

595 endpointWithoutServer = "http://www.lsstwithoutserverheader.org" 

596 responses.add(responses.OPTIONS, endpointWithoutServer, status=200) 

597 self.assertIsNone(ResourcePath(endpointWithoutServer).server) 

598 

599 @classmethod 

600 def _get_port_number(cls) -> int: 

601 """Return a port number the webDAV server can use to listen to.""" 

602 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 

603 s.bind(("127.0.0.1", 0)) 

604 s.listen() 

605 port = s.getsockname()[1] 

606 s.close() 

607 return port 

608 

609 def _serve_webdav(self, local_path: str, port: int, stop_webdav_server: Callable[[], bool]): 

610 """Start a local webDAV server, listening on http://localhost:port 

611 and exposing local_path. 

612 

613 This server only runs when this test class is instantiated, 

614 and then shuts down. The server must be started is a separate thread. 

615 

616 Parameters 

617 ---------- 

618 port : `int` 

619 The port number on which the server should listen 

620 local_path : `str` 

621 Path to an existing local directory for the server to expose. 

622 stop_webdav_server : `Callable[[], bool]` 

623 Boolean function which returns True when the server should be 

624 stopped. 

625 """ 

626 try: 

627 # Start the wsgi server in a separate thread 

628 config = { 

629 "host": "127.0.0.1", 

630 "port": port, 

631 "provider_mapping": {"/": local_path}, 

632 "http_authenticator": {"domain_controller": None}, 

633 "simple_dc": {"user_mapping": {"*": True}}, 

634 "verbose": 0, 

635 "lock_storage": False, 

636 "dir_browser": { 

637 "enable": False, 

638 "ms_sharepoint_support": False, 

639 "libre_office_support": False, 

640 "response_trailer": False, 

641 "davmount_links": False, 

642 }, 

643 } 

644 server = wsgi.Server(wsgi_app=WsgiDAVApp(config), bind_addr=(config["host"], config["port"])) 

645 t = Thread(target=server.start, daemon=True) 

646 t.start() 

647 

648 # Shut down the server when done: stop_webdav_server() returns 

649 # True when this test suite is being teared down 

650 while not stop_webdav_server(): 

651 time.sleep(1) 

652 except KeyboardInterrupt: 

653 # Caught Ctrl-C, shut down the server 

654 pass 

655 finally: 

656 server.stop() 

657 t.join() 

658 

659 @classmethod 

660 def _get_name(cls, prefix: str) -> str: 

661 alphabet = string.ascii_lowercase + string.digits 

662 return f"{prefix}-" + "".join(random.choices(alphabet, k=8)) 

663 

664 @classmethod 

665 def _get_dir_name(cls) -> str: 

666 """Return a randomly selected name for a file""" 

667 return cls._get_name(prefix="dir") 

668 

669 @classmethod 

670 def _get_file_name(cls) -> str: 

671 """Return a randomly selected name for a file""" 

672 return cls._get_name(prefix="file") 

673 

674 def _generate_file(self, remove_when_done=True) -> tuple[str, int]: 

675 """Create a local file of random size with random contents. 

676 

677 Returns 

678 ------- 

679 path : `str` 

680 Path to local temporary file. The caller is responsible for 

681 removing the file when appropriate. 

682 size : `int` 

683 Size of the generated file, in bytes. 

684 """ 

685 megabyte = 1024 * 1024 

686 size = random.randint(2 * megabyte, 5 * megabyte) 

687 tmpfile, path = tempfile.mkstemp() 

688 self.assertEqual(os.write(tmpfile, os.urandom(size)), size) 

689 os.close(tmpfile) 

690 

691 if remove_when_done: 

692 HttpReadWriteWebdavTestCase.local_files_to_remove.append(path) 

693 

694 return path, size 

695 

696 @classmethod 

697 def _compute_digest(cls, data: bytes) -> str: 

698 """Compute a SHA256 hash of data.""" 

699 m = hashlib.sha256() 

700 m.update(data) 

701 return m.hexdigest() 

702 

703 @classmethod 

704 def _is_server_running(cls, port: int) -> bool: 

705 """Return True if there is a server listening on local address 

706 127.0.0.1:<port>. 

707 """ 

708 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: 

709 try: 

710 s.connect(("127.0.0.1", port)) 

711 return True 

712 except ConnectionRefusedError: 

713 return False 

714 

715 

716class HttpResourcePathConfigTestCase(unittest.TestCase): 

717 """Test for the HttpResourcePathConfig class.""" 

718 

719 def setUp(self): 

720 self.tmpdir = ResourcePath(makeTestTempDir(TESTDIR)) 

721 

722 def tearDown(self): 

723 if self.tmpdir and self.tmpdir.isLocal: 

724 removeTestTempDir(self.tmpdir.ospath) 

725 

726 def test_send_expect_header(self): 

727 # Ensure environment variable LSST_HTTP_PUT_SEND_EXPECT_HEADER is 

728 # inspected to initialize the HttpResourcePathConfig class. 

729 with unittest.mock.patch.dict(os.environ, {}, clear=True): 

730 config = HttpResourcePathConfig() 

731 self.assertFalse(config.send_expect_on_put) 

732 

733 with unittest.mock.patch.dict(os.environ, {"LSST_HTTP_PUT_SEND_EXPECT_HEADER": "true"}, clear=True): 

734 config = HttpResourcePathConfig() 

735 self.assertTrue(config.send_expect_on_put) 

736 

737 def test_enable_fsspec(self): 

738 # Ensure environment variable LSST_HTTP_ENABLE_FSSPEC is 

739 # inspected to initialize the HttpResourcePathConfig class. 

740 with unittest.mock.patch.dict(os.environ, {}, clear=True): 

741 config = HttpResourcePathConfig() 

742 self.assertFalse(config.fsspec_is_enabled) 

743 

744 with unittest.mock.patch.dict(os.environ, {"LSST_HTTP_ENABLE_FSSPEC": "any value"}, clear=True): 

745 config = HttpResourcePathConfig() 

746 self.assertTrue(config.fsspec_is_enabled) 

747 

748 def test_collect_memory_usage(self): 

749 # Ensure environment variable LSST_HTTP_COLLECT_MEMORY_USAGE is 

750 # inspected to initialize the HttpResourcePathConfig class. 

751 with unittest.mock.patch.dict(os.environ, {}, clear=True): 

752 config = HttpResourcePathConfig() 

753 self.assertFalse(config.collect_memory_usage) 

754 

755 with unittest.mock.patch.dict(os.environ, {"LSST_HTTP_COLLECT_MEMORY_USAGE": "true"}, clear=True): 

756 config = HttpResourcePathConfig() 

757 self.assertTrue(config.collect_memory_usage) 

758 

759 def test_timeout(self): 

760 # Ensure that when the connect and read timeouts are not specified 

761 # the default values are stored in the config. 

762 with unittest.mock.patch.dict(os.environ, {}, clear=True): 

763 config = HttpResourcePathConfig() 

764 self.assertAlmostEqual(config.timeout[0], config.DEFAULT_TIMEOUT_CONNECT) 

765 self.assertAlmostEqual(config.timeout[1], config.DEFAULT_TIMEOUT_READ) 

766 

767 # Ensure that when both the connect and read timeouts are specified 

768 # they are both stored in the config. 

769 connect_timeout, read_timeout = 100.5, 200.8 

770 with unittest.mock.patch.dict( 

771 os.environ, 

772 {"LSST_HTTP_TIMEOUT_CONNECT": str(connect_timeout), "LSST_HTTP_TIMEOUT_READ": str(read_timeout)}, 

773 clear=True, 

774 ): 

775 config = HttpResourcePathConfig() 

776 self.assertAlmostEqual(config.timeout[0], connect_timeout) 

777 self.assertAlmostEqual(config.timeout[1], read_timeout) 

778 

779 # Ensure that invalid float values (including NaN values) raise a 

780 # ValueError. 

781 for value in ("invalid", "NaN"): 

782 with unittest.mock.patch.dict( 

783 os.environ, 

784 {"LSST_HTTP_TIMEOUT_CONNECT": value, "LSST_HTTP_TIMEOUT_READ": value}, 

785 clear=True, 

786 ): 

787 with self.assertRaises(ValueError): 

788 config = HttpResourcePathConfig() 

789 config.timeout() 

790 

791 def test_front_end_connections(self): 

792 # Ensure that when the number of front end connections is not specified 

793 # the default comes from the number of workers.. 

794 with unittest.mock.patch.dict(os.environ, {}, clear=True): 

795 config = HttpResourcePathConfig() 

796 self.assertEqual(config.front_end_connections, _get_num_workers()) 

797 

798 # Ensure that when the number of front end connections is specified 

799 # it is stored in the config. 

800 connections = 42 

801 with unittest.mock.patch.dict( 

802 os.environ, {"LSST_HTTP_FRONTEND_PERSISTENT_CONNECTIONS": str(connections)}, clear=True 

803 ): 

804 config = HttpResourcePathConfig() 

805 self.assertTrue(config.front_end_connections, connections) 

806 

807 def test_back_end_connections(self): 

808 # Ensure that when the number of back end connections is not specified 

809 # the default comes from the number of workers. 

810 with unittest.mock.patch.dict(os.environ, {}, clear=True): 

811 config = HttpResourcePathConfig() 

812 self.assertEqual(config.back_end_connections, _get_num_workers()) 

813 

814 # Ensure that when the number of back end connections is specified 

815 # it is stored in the config. 

816 connections = 42 

817 with unittest.mock.patch.dict( 

818 os.environ, {"LSST_HTTP_BACKEND_PERSISTENT_CONNECTIONS": str(connections)}, clear=True 

819 ): 

820 config = HttpResourcePathConfig() 

821 self.assertTrue(config.back_end_connections, connections) 

822 

823 def test_digest_algorithm(self): 

824 # Ensure that when no digest is specified in the environment, the 

825 # configured digest algorithm is the empty string. 

826 with unittest.mock.patch.dict(os.environ, {}, clear=True): 

827 config = HttpResourcePathConfig() 

828 self.assertEqual(config.digest_algorithm, "") 

829 

830 # Ensure that an invalid digest algorithm is ignored. 

831 digest = "invalid" 

832 with unittest.mock.patch.dict(os.environ, {"LSST_HTTP_DIGEST": digest}, clear=True): 

833 config = HttpResourcePathConfig() 

834 self.assertEqual(config.digest_algorithm, "") 

835 

836 # Ensure that an accepted digest algorithm is stored. 

837 for digest in HttpResourcePathConfig().ACCEPTED_DIGESTS: 

838 with unittest.mock.patch.dict(os.environ, {"LSST_HTTP_DIGEST": digest}, clear=True): 

839 config = HttpResourcePathConfig() 

840 self.assertTrue(config.digest_algorithm, digest) 

841 

842 def test_backoff_interval(self): 

843 # Ensure that when no backoff interval is defined, the default values 

844 # are used. 

845 with unittest.mock.patch.dict(os.environ, {}, clear=True): 

846 config = HttpResourcePathConfig() 

847 self.assertAlmostEqual(config.backoff_min, config.DEFAULT_BACKOFF_MIN) 

848 self.assertAlmostEqual(config.backoff_max, config.DEFAULT_BACKOFF_MAX) 

849 

850 # Ensure that an invalid value for backoff interval is ignored and 

851 # the default value is used. 

852 with unittest.mock.patch.dict( 

853 os.environ, {"LSST_HTTP_BACKOFF_MIN": "XXX", "LSST_HTTP_BACKOFF_MAX": "YYY"}, clear=True 

854 ): 

855 config = HttpResourcePathConfig() 

856 self.assertAlmostEqual(config.backoff_min, config.DEFAULT_BACKOFF_MIN) 

857 self.assertAlmostEqual(config.backoff_max, config.DEFAULT_BACKOFF_MAX) 

858 

859 # Ensure that NaN values are ignored and the defaults values are used. 

860 with unittest.mock.patch.dict( 

861 os.environ, {"LSST_HTTP_BACKOFF_MIN": "NaN", "LSST_HTTP_BACKOFF_MAX": "NaN"}, clear=True 

862 ): 

863 config = HttpResourcePathConfig() 

864 self.assertAlmostEqual(config.backoff_min, config.DEFAULT_BACKOFF_MIN) 

865 self.assertAlmostEqual(config.backoff_max, config.DEFAULT_BACKOFF_MAX) 

866 

867 # Ensure that when specified, valid limits backoff interval are used. 

868 backoff_min, backoff_max = 3.0, 8.0 

869 with unittest.mock.patch.dict( 

870 os.environ, 

871 {"LSST_HTTP_BACKOFF_MIN": str(backoff_min), "LSST_HTTP_BACKOFF_MAX": str(backoff_max)}, 

872 clear=True, 

873 ): 

874 config = HttpResourcePathConfig() 

875 self.assertAlmostEqual(config.backoff_min, backoff_min) 

876 self.assertAlmostEqual(config.backoff_max, backoff_max) 

877 

878 def test_ca_bundle(self): 

879 # Ensure that when no bundle is defined via environment variable 

880 # LSST_HTTP_CACERT_BUNDLE either None is returned or the returned 

881 # path does exist. 

882 with unittest.mock.patch.dict(os.environ, {}, clear=True): 

883 config = HttpResourcePathConfig() 

884 if config.ca_bundle is not None: 

885 self.assertTrue(os.path.exists(config.ca_bundle)) 

886 

887 # Ensure that if LSST_HTTP_CACERT_BUNDLE is specified, the returned 

888 # path is identical to the value of that variable (we don't check 

889 # here that the path actually exists). 

890 ca_bundle = "/path/to/bundle/dir" 

891 with unittest.mock.patch.dict(os.environ, {"LSST_HTTP_CACERT_BUNDLE": ca_bundle}, clear=True): 

892 config = HttpResourcePathConfig() 

893 self.assertEqual(config.ca_bundle, ca_bundle) 

894 

895 def test_client_token(self): 

896 # Ensure that when no token is defined via environment variable 

897 # LSST_HTTP_AUTH_BEARER_TOKEN None is returned. 

898 with unittest.mock.patch.dict(os.environ, {}, clear=True): 

899 config = HttpResourcePathConfig() 

900 self.assertIsNone(config.client_token) 

901 

902 # Ensure that if LSST_HTTP_AUTH_BEARER_TOKEN is specified, the returned 

903 # path is identical to the value of that variable (we don't check 

904 # here that the path actually exists). 

905 token = "ABCDE12345" 

906 with unittest.mock.patch.dict(os.environ, {"LSST_HTTP_AUTH_BEARER_TOKEN": token}, clear=True): 

907 config = HttpResourcePathConfig() 

908 self.assertEqual(config.client_token, token) 

909 

910 def test_client_cert_key(self): 

911 """Ensure if user certificate and private key are provided via 

912 environment variables, the configuration is correctly configured. 

913 """ 

914 # Ensure that when no client certificate nor private key are provided 

915 # via environment variables, both certificate and key are None. 

916 with unittest.mock.patch.dict(os.environ, {}, clear=True): 

917 config = HttpResourcePathConfig() 

918 cert, key = config.client_cert_key 

919 self.assertIsNone(cert) 

920 self.assertIsNone(key) 

921 

922 # Create mock certificate and private key files. 

923 with tempfile.NamedTemporaryFile(mode="wt", dir=self.tmpdir.ospath, delete=False) as f: 

924 f.write("CERT") 

925 client_cert = f.name 

926 

927 with tempfile.NamedTemporaryFile(mode="wt", dir=self.tmpdir.ospath, delete=False) as f: 

928 f.write("KEY") 

929 client_key = f.name 

930 

931 # Check that if only LSST_HTTP_AUTH_CLIENT_CERT is initialized 

932 # an exception is raised. 

933 with unittest.mock.patch.dict(os.environ, {"LSST_HTTP_AUTH_CLIENT_CERT": client_cert}, clear=True): 

934 with self.assertRaises(ValueError): 

935 HttpResourcePathConfig().client_cert_key 

936 

937 # Check that if only LSST_HTTP_AUTH_CLIENT_KEY is initialized 

938 # an exception is raised. 

939 with unittest.mock.patch.dict(os.environ, {"LSST_HTTP_AUTH_CLIENT_KEY": client_key}, clear=True): 

940 with self.assertRaises(ValueError): 

941 HttpResourcePathConfig().client_cert_key 

942 

943 # Check that the private key file must be accessible only by its owner. 

944 with unittest.mock.patch.dict( 

945 os.environ, 

946 {"LSST_HTTP_AUTH_CLIENT_CERT": client_cert, "LSST_HTTP_AUTH_CLIENT_KEY": client_key}, 

947 clear=True, 

948 ): 

949 # Ensure the client certificate is initialized when only the owner 

950 # can read the private key file. 

951 os.chmod(client_key, stat.S_IRUSR) 

952 config = HttpResourcePathConfig() 

953 cert, key = config.client_cert_key 

954 self.assertEqual(cert, client_cert) 

955 self.assertEqual(key, client_key) 

956 

957 # Ensure an exception is raised if either group or other can access 

958 # the private key file. 

959 for mode in (stat.S_IRGRP, stat.S_IWGRP, stat.S_IXGRP, stat.S_IROTH, stat.S_IWOTH, stat.S_IXOTH): 

960 os.chmod(client_key, stat.S_IRUSR | mode) 

961 with self.assertRaises(PermissionError): 

962 HttpResourcePathConfig().client_cert_key 

963 

964 # Check that if environment variable X509_USER_PROXY is initialized 

965 # the configuration uses its value as the client's certificate and key. 

966 with unittest.mock.patch.dict(os.environ, {"X509_USER_PROXY": client_cert}, clear=True): 

967 config = HttpResourcePathConfig() 

968 cert, key = config.client_cert_key 

969 self.assertEqual(cert, client_cert) 

970 self.assertEqual(key, client_cert) 

971 

972 

973class WebdavUtilsTestCase(unittest.TestCase): 

974 """Test for the Webdav related utilities.""" 

975 

976 def setUp(self): 

977 self.tmpdir = ResourcePath(makeTestTempDir(TESTDIR)) 

978 

979 def tearDown(self): 

980 if self.tmpdir and self.tmpdir.isLocal: 

981 removeTestTempDir(self.tmpdir.ospath) 

982 

983 def test_is_protected(self): 

984 self.assertFalse(_is_protected("/this-file-does-not-exist")) 

985 

986 with tempfile.NamedTemporaryFile(mode="wt", dir=self.tmpdir.ospath, delete=False) as f: 

987 f.write("XXXX") 

988 file_path = f.name 

989 

990 os.chmod(file_path, stat.S_IRUSR) 

991 self.assertTrue(_is_protected(file_path)) 

992 

993 for mode in (stat.S_IRGRP, stat.S_IWGRP, stat.S_IXGRP, stat.S_IROTH, stat.S_IWOTH, stat.S_IXOTH): 

994 os.chmod(file_path, stat.S_IRUSR | mode) 

995 self.assertFalse(_is_protected(file_path)) 

996 

997 

998class BearerTokenAuthTestCase(unittest.TestCase): 

999 """Test for the BearerTokenAuth class.""" 

1000 

1001 def setUp(self): 

1002 self.tmpdir = ResourcePath(makeTestTempDir(TESTDIR)) 

1003 self.token = "ABCDE1234" 

1004 

1005 def tearDown(self): 

1006 if self.tmpdir and self.tmpdir.isLocal: 

1007 removeTestTempDir(self.tmpdir.ospath) 

1008 

1009 def test_empty_token(self): 

1010 """Ensure that when no token is provided the request is not 

1011 modified. 

1012 """ 

1013 auth = BearerTokenAuth(None) 

1014 auth._refresh() 

1015 self.assertIsNone(auth._token) 

1016 self.assertIsNone(auth._path) 

1017 req = requests.Request("GET", "https://example.org") 

1018 self.assertEqual(auth(req), req) 

1019 

1020 def test_token_value(self): 

1021 """Ensure that when a token value is provided, the 'Authorization' 

1022 header is added to the requests. 

1023 """ 

1024 auth = BearerTokenAuth(self.token) 

1025 req = auth(requests.Request("GET", "https://example.org").prepare()) 

1026 self.assertEqual(req.headers.get("Authorization"), f"Bearer {self.token}") 

1027 

1028 def test_token_insecure_http(self): 

1029 """Ensure that no 'Authorization' header is attached to a request when 

1030 using insecure HTTP. 

1031 """ 

1032 auth = BearerTokenAuth(self.token) 

1033 for url in ("http://example.org", "HTTP://example.org", "HttP://example.org"): 

1034 req = auth(requests.Request("GET", url).prepare()) 

1035 self.assertIsNone(req.headers.get("Authorization")) 

1036 

1037 def test_token_file(self): 

1038 """Ensure when the provided token is a file path, its contents is 

1039 correctly used in the the 'Authorization' header of the requests. 

1040 """ 

1041 with tempfile.NamedTemporaryFile(mode="wt", dir=self.tmpdir.ospath, delete=False) as f: 

1042 f.write(self.token) 

1043 token_file_path = f.name 

1044 

1045 # Ensure the request's "Authorization" header is set with the right 

1046 # token value 

1047 os.chmod(token_file_path, stat.S_IRUSR) 

1048 auth = BearerTokenAuth(token_file_path) 

1049 req = auth(requests.Request("GET", "https://example.org").prepare()) 

1050 self.assertEqual(req.headers.get("Authorization"), f"Bearer {self.token}") 

1051 

1052 # Ensure an exception is raised if either group or other can read the 

1053 # token file 

1054 for mode in (stat.S_IRGRP, stat.S_IWGRP, stat.S_IXGRP, stat.S_IROTH, stat.S_IWOTH, stat.S_IXOTH): 

1055 os.chmod(token_file_path, stat.S_IRUSR | mode) 

1056 with self.assertRaises(PermissionError): 

1057 BearerTokenAuth(token_file_path) 

1058 

1059 

1060class SessionStoreTestCase(unittest.TestCase): 

1061 """Test for the SessionStore class.""" 

1062 

1063 def setUp(self): 

1064 self.tmpdir = ResourcePath(makeTestTempDir(TESTDIR)) 

1065 self.rpath = ResourcePath("https://example.org") 

1066 

1067 def tearDown(self): 

1068 if self.tmpdir and self.tmpdir.isLocal: 

1069 removeTestTempDir(self.tmpdir.ospath) 

1070 

1071 def test_ca_cert_bundle(self): 

1072 """Ensure that, if specified, a certificate authorities bundle is used 

1073 to authentify the remote server. 

1074 """ 

1075 with tempfile.NamedTemporaryFile(mode="wt", dir=self.tmpdir.ospath, delete=False) as f: 

1076 f.write("CERT BUNDLE") 

1077 cert_bundle = f.name 

1078 

1079 with unittest.mock.patch.dict(os.environ, {"LSST_HTTP_CACERT_BUNDLE": cert_bundle}, clear=True): 

1080 config = HttpResourcePathConfig() 

1081 session = SessionStore(config=config).get(self.rpath) 

1082 self.assertEqual(session.verify, cert_bundle) 

1083 

1084 def test_user_cert(self): 

1085 """Ensure if user certificate and private key are provided, they are 

1086 used for authenticating the client. 

1087 """ 

1088 # Create mock certificate and private key files. 

1089 with tempfile.NamedTemporaryFile(mode="wt", dir=self.tmpdir.ospath, delete=False) as f: 

1090 f.write("CERT") 

1091 client_cert = f.name 

1092 

1093 with tempfile.NamedTemporaryFile(mode="wt", dir=self.tmpdir.ospath, delete=False) as f: 

1094 f.write("KEY") 

1095 client_key = f.name 

1096 

1097 # Check both LSST_HTTP_AUTH_CLIENT_CERT and LSST_HTTP_AUTH_CLIENT_KEY 

1098 # must be initialized. 

1099 with unittest.mock.patch.dict(os.environ, {"LSST_HTTP_AUTH_CLIENT_CERT": client_cert}, clear=True): 

1100 with self.assertRaises(ValueError): 

1101 config = HttpResourcePathConfig() 

1102 SessionStore(config=config).get(self.rpath) 

1103 

1104 with unittest.mock.patch.dict(os.environ, {"LSST_HTTP_AUTH_CLIENT_KEY": client_key}, clear=True): 

1105 with self.assertRaises(ValueError): 

1106 config = HttpResourcePathConfig() 

1107 SessionStore(config=config).get(self.rpath) 

1108 

1109 # Check private key file must be accessible only by its owner. 

1110 with unittest.mock.patch.dict( 

1111 os.environ, 

1112 {"LSST_HTTP_AUTH_CLIENT_CERT": client_cert, "LSST_HTTP_AUTH_CLIENT_KEY": client_key}, 

1113 clear=True, 

1114 ): 

1115 # Ensure the session client certificate is initialized when 

1116 # only the owner can read the private key file. 

1117 os.chmod(client_key, stat.S_IRUSR) 

1118 config = HttpResourcePathConfig() 

1119 session = SessionStore(config=config).get(self.rpath) 

1120 self.assertEqual(session.cert[0], client_cert) 

1121 self.assertEqual(session.cert[1], client_key) 

1122 

1123 # Ensure an exception is raised if either group or other can access 

1124 # the private key file. 

1125 for mode in (stat.S_IRGRP, stat.S_IWGRP, stat.S_IXGRP, stat.S_IROTH, stat.S_IWOTH, stat.S_IXOTH): 

1126 os.chmod(client_key, stat.S_IRUSR | mode) 

1127 with self.assertRaises(PermissionError): 

1128 config = HttpResourcePathConfig() 

1129 SessionStore(config=config).get(self.rpath) 

1130 

1131 def test_token_env(self): 

1132 """Ensure when a token is provided via an environment variable 

1133 the sessions are equipped with a BearerTokenAuth. 

1134 """ 

1135 token = "ABCDE" 

1136 with unittest.mock.patch.dict(os.environ, {"LSST_HTTP_AUTH_BEARER_TOKEN": token}, clear=True): 

1137 config = HttpResourcePathConfig() 

1138 session = SessionStore(config=config).get(self.rpath) 

1139 self.assertEqual(type(session.auth), lsst.resources.http.BearerTokenAuth) 

1140 self.assertEqual(session.auth._token, token) 

1141 self.assertIsNone(session.auth._path) 

1142 

1143 def test_sessions(self): 

1144 """Ensure the session caching mechanism works.""" 

1145 # Ensure the store provides a session for a given URL 

1146 root_url = "https://example.org" 

1147 config = HttpResourcePathConfig() 

1148 store = SessionStore(config=config) 

1149 session = store.get(ResourcePath(root_url)) 

1150 self.assertIsNotNone(session) 

1151 

1152 # Ensure the sessions retrieved from a single store with the same 

1153 # root URIs are equal 

1154 for u in (f"{root_url}", f"{root_url}/path/to/file"): 

1155 self.assertEqual(session, store.get(ResourcePath(u))) 

1156 

1157 # Ensure sessions retrieved for different root URIs are different 

1158 another_url = "https://another.example.org" 

1159 self.assertNotEqual(session, store.get(ResourcePath(another_url))) 

1160 

1161 # Ensure the sessions retrieved from a single store for URLs with 

1162 # different port numbers are different 

1163 root_url_with_port = f"{another_url}:12345" 

1164 session = store.get(ResourcePath(root_url_with_port)) 

1165 self.assertNotEqual(session, store.get(ResourcePath(another_url))) 

1166 

1167 # Ensure the sessions retrieved from a single store with the same 

1168 # root URIs (including port numbers) are equal 

1169 for u in (f"{root_url_with_port}", f"{root_url_with_port}/path/to/file"): 

1170 self.assertEqual(session, store.get(ResourcePath(u))) 

1171 

1172 

1173class TestContentRange(unittest.TestCase): 

1174 """Test parsing of Content-Range header.""" 

1175 

1176 def test_full_data(self): 

1177 parsed = parse_content_range_header("bytes 123-2555/12345") 

1178 self.assertEqual(parsed.range_start, 123) 

1179 self.assertEqual(parsed.range_end, 2555) 

1180 self.assertEqual(parsed.total, 12345) 

1181 

1182 parsed = parse_content_range_header(" bytes 0-0/5 ") 

1183 self.assertEqual(parsed.range_start, 0) 

1184 self.assertEqual(parsed.range_end, 0) 

1185 self.assertEqual(parsed.total, 5) 

1186 

1187 def test_empty_total(self): 

1188 parsed = parse_content_range_header("bytes 123-2555/*") 

1189 self.assertEqual(parsed.range_start, 123) 

1190 self.assertEqual(parsed.range_end, 2555) 

1191 self.assertIsNone(parsed.total) 

1192 

1193 parsed = parse_content_range_header(" bytes 0-0/* ") 

1194 self.assertEqual(parsed.range_start, 0) 

1195 self.assertEqual(parsed.range_end, 0) 

1196 self.assertIsNone(parsed.total) 

1197 

1198 def test_empty_range(self): 

1199 parsed = parse_content_range_header("bytes */12345") 

1200 self.assertIsNone(parsed.range_start) 

1201 self.assertIsNone(parsed.range_end) 

1202 self.assertEqual(parsed.total, 12345) 

1203 

1204 parsed = parse_content_range_header(" bytes */5 ") 

1205 self.assertIsNone(parsed.range_start) 

1206 self.assertIsNone(parsed.range_end) 

1207 self.assertEqual(parsed.total, 5) 

1208 

1209 def test_invalid_input(self): 

1210 with self.assertRaises(ValueError): 

1211 parse_content_range_header("pages 0-10/12") 

1212 

1213 

1214if __name__ == "__main__": 

1215 unittest.main()