Coverage for tests / test_dav.py: 14%

617 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-28 08:32 +0000

1# This file is part of lsst-resources. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12import concurrent 

13import datetime 

14import hashlib 

15import io 

16import os.path 

17import random 

18import shutil 

19import socket 

20import stat 

21import string 

22import tempfile 

23import time 

24import unittest 

25import zlib 

26from collections.abc import Callable 

27from threading import Thread 

28from typing import cast 

29from zipfile import ZipFile, ZipInfo 

30 

31try: 

32 from cheroot import wsgi 

33 from wsgidav.wsgidav_app import WsgiDAVApp 

34except ImportError: 

35 WsgiDAVApp = None 

36 

37try: 

38 import fsspec 

39except ImportError: 

40 fsspec = None 

41 AbstractFileSystem = type 

42 

43from lsst.resources import ResourceInfo, ResourcePath 

44from lsst.resources._resourceHandles._davResourceHandle import ( 

45 DavReadResourceHandle, 

46) 

47from lsst.resources.dav import ( 

48 DavResourcePathConfig, 

49 dav_globals, 

50) 

51from lsst.resources.davutils import ( 

52 DavConfig, 

53 DavConfigPool, 

54 TokenAuthorizer, 

55) 

56from lsst.resources.tests import GenericReadWriteTestCase, GenericTestCase 

57from lsst.resources.utils import get_tempdir, makeTestTempDir, removeTestTempDir 

58 

59TESTDIR = os.path.abspath(os.path.dirname(__file__)) 

60 

61 

62class GenericDavTestCase(GenericTestCase, unittest.TestCase): 

63 """Generic tests of dav URIs.""" 

64 

65 scheme = "dav" 

66 netloc = "host.example.org" 

67 

68 def test_dav_root_uri(self): 

69 root_uri_test_cases = { 

70 # input : expected 

71 "dav://host.example.org": "dav://host.example.org/", 

72 "dav://host.example.org/some/path": "dav://host.example.org/", 

73 "dav://host.example.org:12345": "dav://host.example.org:12345/", 

74 "dav://host.example.org:12345/some/path": "dav://host.example.org:12345/", 

75 "dav://user:pwd@host.example.org": "dav://user:pwd@host.example.org/", 

76 "dav://user:pwd@host.example.org/some/path": "dav://user:pwd@host.example.org/", 

77 "dav://user:pwd@host.example.org:12345/some/path": "dav://user:pwd@host.example.org:12345/", 

78 "dav://user:pwd@host.example.org/some/path#fragment": "dav://user:pwd@host.example.org/", 

79 "dav://user:pwd@host.example.org/some/path?param=value": "dav://user:pwd@host.example.org/", 

80 "dav://user:pwd@host.example.org/some/path;parameters": "dav://user:pwd@host.example.org/", 

81 } 

82 for path, expected in root_uri_test_cases.items(): 

83 self.assertEqual(ResourcePath(expected), ResourcePath(path).root_uri()) 

84 

85 clean_path_test_cases = { 

86 # input : expected 

87 "dav://host.example.org/": "dav://host.example.org/", 

88 "dav://host.example.org/some/path": "dav://host.example.org/some/path", 

89 "dav://host.example.org////some/path///": "dav://host.example.org/some/path/", 

90 "dav://host.example.org/some/path///": "dav://host.example.org/some/path/", 

91 "dav://host.example.org/some/./././path": "dav://host.example.org/some/path", 

92 "dav://host.example.org/a/b/c/d/../../": "dav://host.example.org/a/b/", 

93 "dav://host.example.org/a/b/c/d/../../../../../../": "dav://host.example.org/", 

94 } 

95 for path, expected in clean_path_test_cases.items(): 

96 self.assertEqual(ResourcePath(path).geturl(), expected) 

97 

98 

99class DavReadWriteTestCase(GenericReadWriteTestCase, unittest.TestCase): 

100 """Test with a real webDAV server, as opposed to mocking responses.""" 

101 

102 scheme = "dav" 

103 local_files_to_remove: list[str] = [] 

104 MEGABYTE: int = 1024 * 1024 

105 

106 @classmethod 

107 def setUpClass(cls): 

108 cls.webdav_tmpdir = tempfile.mkdtemp(prefix="webdav-server-test-") 

109 cls.server_thread = None 

110 

111 # Reinitialize globals. 

112 dav_globals._reset() 

113 

114 # Should we test against a running server? 

115 # 

116 # This is convenient for testing against real servers in the 

117 # developer environment by initializing the environment variable 

118 # LSST_RESOURCES_DAV_TEST_SERVER_URL with the URL of the server, e.g. 

119 # dav://host.example.org:1234/path/to/top/dir 

120 if (test_endpoint := os.getenv("LSST_RESOURCES_DAV_TEST_SERVER_URL")) is not None: 

121 # Run this test case against the specified server. 

122 uri = ResourcePath(test_endpoint) 

123 cls.scheme = uri.scheme 

124 cls.netloc = uri.netloc 

125 cls.base_path = uri.path 

126 elif WsgiDAVApp is not None: 

127 # WsgiDAVApp is available, launch a local server in its own 

128 # thread to expose a local temporary directory and run this 

129 # test case against it. 

130 cls.port_number = cls._get_port_number() 

131 cls.stop_webdav_server = False 

132 cls.server_thread = Thread( 

133 target=cls._serve_webdav, 

134 args=(cls, cls.webdav_tmpdir, cls.port_number, lambda: cls.stop_webdav_server), 

135 daemon=True, 

136 ) 

137 cls.server_thread.start() 

138 

139 # Wait for it to start 

140 time.sleep(1) 

141 

142 # Initialize the server endpoint 

143 cls.netloc = f"127.0.0.1:{cls.port_number}" 

144 else: 

145 cls.skipTest( 

146 cls, 

147 "neither WsgiDAVApp is available nor a webDAV test endpoint is configured to test against", 

148 ) 

149 

150 @classmethod 

151 def tearDownClass(cls): 

152 # Stop the WsgiDAVApp server, if any 

153 if WsgiDAVApp is not None: 

154 # Shut down of the webdav server and wait for the thread to exit 

155 cls.stop_webdav_server = True 

156 if cls.server_thread is not None: 

157 cls.server_thread.join() 

158 

159 # Remove local temporary files 

160 for file in cls.local_files_to_remove: 

161 if os.path.exists(file): 

162 os.remove(file) 

163 

164 # Remove temp dir 

165 if cls.webdav_tmpdir: 

166 shutil.rmtree(cls.webdav_tmpdir, ignore_errors=True) 

167 

168 def tearDown(self): 

169 if self.tmpdir: 

170 self.tmpdir.remove_dir(recursive=True) 

171 

172 super().tearDown() 

173 

174 def test_dav_file_handle(self): 

175 # Upload a new file with known contents. 

176 contents = "These are some \n bytes to read" 

177 remote_file = self.tmpdir.join(self._get_file_name()) 

178 self.assertIsNone(remote_file.write(data=contents, overwrite=True)) 

179 

180 # Test that the correct handle is returned. 

181 with remote_file.open("rb") as handle: 

182 self.assertIsInstance(handle, DavReadResourceHandle) 

183 

184 # Test reading byte ranges works 

185 with remote_file.open("rb") as handle: 

186 sub_contents = contents[:10] 

187 handle = cast(DavReadResourceHandle, handle) 

188 result = handle.read(len(sub_contents)).decode() 

189 self.assertEqual(result, sub_contents) 

190 

191 # Verify the position. 

192 self.assertEqual(handle.tell(), len(sub_contents)) 

193 

194 # Jump back to the beginning and test if reading the whole file 

195 # prompts the internal buffer to be read. 

196 handle.seek(0) 

197 self.assertEqual(handle.tell(), 0) 

198 result = handle.read().decode() 

199 self.assertEqual(result, contents) 

200 

201 # Check that flush works on read-only handle. 

202 handle.flush() 

203 

204 # Verify reading as a string handle works as expected. 

205 with remote_file.open("r") as handle: 

206 self.assertIsInstance(handle, io.TextIOWrapper) 

207 

208 handle = cast(io.TextIOWrapper, handle) 

209 self.assertIsInstance(handle.buffer, DavReadResourceHandle) 

210 

211 # Check if string methods work. 

212 result = handle.read() 

213 self.assertEqual(result, contents) 

214 

215 # Check that flush works on read-only handle. 

216 handle.flush() 

217 

218 # Verify that write modes invoke the default base method 

219 with remote_file.open("w") as handle: 

220 self.assertIsInstance(handle, io.StringIO) 

221 

222 def test_dav_mkdir(self): 

223 # Check creation and deletion of an empty directory 

224 subdir = self.tmpdir.join(self._get_dir_name(), forceDirectory=True) 

225 self.assertIsNone(subdir.mkdir()) 

226 self.assertTrue(subdir.exists()) 

227 self.assertTrue(subdir.isdir()) 

228 self.assertEqual(subdir.size(), 0) 

229 

230 # Creating an existing remote directory must succeed 

231 self.assertIsNone(subdir.mkdir()) 

232 

233 # Deleting an existing directory must succeed 

234 self.assertIsNone(subdir.remove()) 

235 

236 # Deleting a non-existing directory must succeed 

237 subir_not_exists = self.tmpdir.join(self._get_dir_name(), forceDirectory=True) 

238 self.assertIsNone(subir_not_exists.remove()) 

239 

240 # Attempting to create a directory at a path where a file exists 

241 # must raise 

242 file = self.tmpdir.join(self._get_file_name(), forceDirectory=False) 

243 file.write(data=None, overwrite=True) 

244 self.assertTrue(file.exists()) 

245 

246 existing_file = self.tmpdir.join(file.basename(), forceDirectory=True) 

247 with self.assertRaises(NotADirectoryError): 

248 self.assertIsNone(existing_file.mkdir()) 

249 

250 # mkdir must create all missing ancestors 

251 subsubdir = subdir.join("a/b/c/d/e", forceDirectory=True) 

252 self.assertIsNone(subsubdir.mkdir()) 

253 self.assertTrue(subsubdir.exists()) 

254 

255 def test_dav_upload_download(self): 

256 # Test upload a randomly-generated file via write() with and without 

257 # overwrite 

258 local_file, file_size = self._generate_file() 

259 with open(local_file, "rb") as f: 

260 data = f.read() 

261 

262 remote_file = self.tmpdir.join(self._get_file_name()) 

263 self.assertIsNone(remote_file.write(data, overwrite=True)) 

264 self.assertTrue(remote_file.exists()) 

265 self.assertEqual(remote_file.size(), file_size) 

266 

267 # Write without overwrite must raise since target file exists 

268 with self.assertRaises(FileExistsError): 

269 remote_file.write(data, overwrite=False) 

270 

271 # Download the file we just uploaded. Compute and compare a digest of 

272 # the uploaded and downloaded data and ensure they match 

273 downloaded_data = remote_file.read() 

274 self.assertEqual(len(downloaded_data), file_size) 

275 upload_digest = self._compute_digest(data) 

276 download_digest = self._compute_digest(downloaded_data) 

277 self.assertEqual(upload_digest, download_digest) 

278 os.remove(local_file) 

279 

280 def test_dav_as_local(self): 

281 # Generate a file with random data and upload to a remote file. 

282 original_file, original_file_size = self._generate_file() 

283 original_digest = self._compute_digest_for_file(original_file) 

284 

285 with open(original_file, "rb") as file: 

286 remote_file = self.tmpdir.join(self._get_file_name()) 

287 self.assertIsNone(remote_file.write(data=file, overwrite=True)) 

288 self.assertTrue(remote_file.exists()) 

289 remote_file_size = remote_file.size() 

290 self.assertEqual(remote_file_size, original_file_size) 

291 

292 # Download the remote file to a temporary local file, check that 

293 # the sizes and contents of the original file and the downloaded files 

294 # match. 

295 with remote_file._as_local() as local_uri: 

296 self.assertTrue(local_uri.isTemporary) 

297 self.assertTrue(os.path.exists(local_uri.ospath)) 

298 self.assertTrue(os.stat(local_uri.ospath).st_size, remote_file_size) 

299 self.assertEqual(original_digest, self._compute_digest(local_uri.read())) 

300 

301 self.assertFalse(local_uri.exists()) 

302 

303 def test_dav_size(self): 

304 # Retrieving the size of a non-existent file must raise. 

305 remote_file = self.tmpdir.join(self._get_file_name()) 

306 with self.assertRaises(FileNotFoundError): 

307 remote_file.size() 

308 

309 # The size of a directory using a file-like path must be zero 

310 remote_dir = self.tmpdir.join(self._get_dir_name(), forceDirectory=True) 

311 self.assertIsNone(remote_dir.mkdir()) 

312 self.assertTrue(remote_dir.exists()) 

313 self.assertEqual(remote_dir.size(), 0) 

314 

315 dir_as_file = ResourcePath(remote_dir.geturl().rstrip("/"), forceDirectory=False) 

316 self.assertEqual(dir_as_file.size(), 0) 

317 

318 def test_dav_upload_creates_dir(self): 

319 # Uploading a file to a non existing directory must ensure its 

320 # parent directories are automatically created and upload succeeds 

321 non_existing_dir = self.tmpdir.join(self._get_dir_name(), forceDirectory=True) 

322 non_existing_dir = non_existing_dir.join(self._get_dir_name(), forceDirectory=True) 

323 non_existing_dir = non_existing_dir.join(self._get_dir_name(), forceDirectory=True) 

324 remote_file = non_existing_dir.join(self._get_file_name()) 

325 

326 local_file, file_size = self._generate_file() 

327 with open(local_file, "rb") as f: 

328 data = f.read() 

329 self.assertIsNone(remote_file.write(data, overwrite=True)) 

330 

331 self.assertTrue(remote_file.exists()) 

332 self.assertEqual(remote_file.size(), file_size) 

333 self.assertTrue(remote_file.parent().exists()) 

334 

335 downloaded_data = remote_file.read() 

336 upload_digest = self._compute_digest(data) 

337 download_digest = self._compute_digest(downloaded_data) 

338 self.assertEqual(upload_digest, download_digest) 

339 

340 def test_dav_transfer_from(self): 

341 # Transfer from local file via "copy", with and without overwrite 

342 depth = random.randint(1, 5) 

343 remote_file = self.tmpdir.join(self._get_file_name(depth)) 

344 local_file, _ = self._generate_file() 

345 source_file = ResourcePath(local_file) 

346 self.assertIsNone(remote_file.transfer_from(source_file, transfer="copy", overwrite=True)) 

347 self.assertTrue(remote_file.exists()) 

348 self.assertEqual(remote_file.size(), source_file.size()) 

349 with self.assertRaises(FileExistsError): 

350 remote_file.transfer_from(ResourcePath(local_file), transfer="copy", overwrite=False) 

351 

352 # Transfer from remote file via "copy", with and without overwrite 

353 source_file = remote_file 

354 target_file = self.tmpdir.join(self._get_file_name(depth=random.randint(1, 5))) 

355 self.assertIsNone(target_file.transfer_from(source_file, transfer="copy", overwrite=True)) 

356 self.assertTrue(target_file.exists()) 

357 self.assertEqual(target_file.size(), source_file.size()) 

358 

359 # Transfer without overwrite must raise since target resource exists 

360 with self.assertRaises(FileExistsError): 

361 target_file.transfer_from(source_file, transfer="copy", overwrite=False) 

362 

363 # Test transfer from local file via "move", with and without overwrite 

364 source_file = ResourcePath(local_file) 

365 source_size = source_file.size() 

366 target_file = self.tmpdir.join(self._get_file_name()) 

367 self.assertIsNone(target_file.transfer_from(source_file, transfer="move", overwrite=True)) 

368 self.assertTrue(target_file.exists()) 

369 self.assertEqual(target_file.size(), source_size) 

370 self.assertFalse(source_file.exists()) 

371 

372 # Test transfer without overwrite must raise since target resource 

373 # exists 

374 local_file, file_size = self._generate_file() 

375 with self.assertRaises(FileExistsError): 

376 source_file = ResourcePath(local_file) 

377 target_file.transfer_from(source_file, transfer="move", overwrite=False) 

378 

379 # Test transfer from remote file via "move" with and without overwrite 

380 # must succeed 

381 source_file = target_file 

382 source_size = source_file.size() 

383 target_file = self.tmpdir.join(self._get_file_name(depth=random.randint(1, 5))) 

384 self.assertIsNone(target_file.transfer_from(source_file, transfer="move", overwrite=True)) 

385 self.assertTrue(target_file.exists()) 

386 self.assertEqual(target_file.size(), source_size) 

387 self.assertFalse(source_file.exists()) 

388 

389 # Transfer without overwrite must raise since target resource exists 

390 with self.assertRaises(FileExistsError): 

391 source_file = ResourcePath(local_file) 

392 target_file.transfer_from(source_file, transfer="move", overwrite=False) 

393 

394 def test_dav_handle(self): 

395 # Resource handle must succeed 

396 remote_file = self.tmpdir.join(self._get_file_name()) 

397 data = "abcdefghi" 

398 self.assertIsNone(remote_file.write(data, overwrite=True)) 

399 with remote_file.open("rb") as handle: 

400 handle.seek(1) 

401 self.assertEqual(handle.read(4).decode("utf-8"), data[1:5]) 

402 

403 # Ensure that reading the whole file content through a handle works 

404 with remote_file.open("rb") as handle: 

405 content = handle.read(-1) 

406 self.assertEqual(remote_file.size(), len(content)) 

407 

408 handle.seek(1) 

409 self.assertEqual(handle.read(4).decode("utf-8"), data[1:5]) 

410 

411 # Upload a multi-megabyte file and ensure a partial read succeeds 

412 data = io.BytesIO(b"\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09" * self.MEGABYTE) 

413 self.assertIsNone(remote_file.write(data, overwrite=True)) 

414 file_size: int = remote_file.size() 

415 file_offset: int = random.randint(self.MEGABYTE, file_size) 

416 bytes_to_read: int = random.randint(self.MEGABYTE // 2, self.MEGABYTE) 

417 with remote_file.open("rb") as handle: 

418 data.seek(file_offset) 

419 handle.seek(file_offset) 

420 self.assertEqual(handle.tell(), data.tell()) 

421 self.assertEqual(handle.read(bytes_to_read), data.read(bytes_to_read)) 

422 

423 # Test readinto() 

424 with remote_file.open("rb") as handle: 

425 buffer = bytearray(random.randint(self.MEGABYTE, 2 * self.MEGABYTE)) 

426 offset = random.randint(file_size // 3, file_size // 2) 

427 

428 # Check the returned read count is as expected 

429 handle.seek(offset) 

430 count = handle.readinto(buffer) 

431 self.assertEqual(count, len(buffer)) 

432 

433 # Check the contents of the returned buffer is as expected 

434 handle.seek(offset) 

435 self.assertTrue(handle.read(count) == buffer) 

436 

437 def test_dav_repeated_write(self): 

438 data = io.BytesIO(b"\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09" * self.MEGABYTE) 

439 remote_file = self.tmpdir.join(self._get_file_name()) 

440 

441 # Consecutive writes to the same file must succeed. It was noticed 

442 # that XRootD server sometimes fail this operation with an error 

443 # like: 

444 # 

445 # status 423 [Output file /path/to/file is already opened by 

446 # 1 writer; open denied.] 

447 self.assertIsNone(remote_file.write(data, overwrite=True)) 

448 self.assertIsNone(remote_file.write(data, overwrite=True)) 

449 self.assertIsNone(remote_file.write(data, overwrite=True)) 

450 

451 def test_dav_remove(self): 

452 # Deletion of an existing remote file must succeed 

453 local_file, file_size = self._generate_file() 

454 with open(local_file, "rb") as f: 

455 remote_file = self.tmpdir.join(self._get_file_name()) 

456 self.assertIsNone(remote_file.write(f, overwrite=True)) 

457 

458 self.assertTrue(remote_file.exists()) 

459 self.assertEqual(remote_file.size(), file_size) 

460 self.assertIsNone(remote_file.remove()) 

461 

462 # Deletion of a non-existing remote file must succeed 

463 non_existing_file = self.tmpdir.join(self._get_file_name()) 

464 self.assertIsNone(non_existing_file.remove()) 

465 

466 # Deletion of a non-empty remote directory must raise 

467 subdir = self.tmpdir.join(self._get_dir_name(), forceDirectory=True) 

468 self.assertIsNone(subdir.mkdir()) 

469 self.assertTrue(subdir.exists()) 

470 with open(local_file, "rb") as f: 

471 remote_file = subdir.join("file_to_remove") 

472 remote_file.write(f, overwrite=True) 

473 self.assertTrue(remote_file.exists()) 

474 self.assertEqual(remote_file.size(), file_size) 

475 

476 with self.assertRaises(IsADirectoryError): 

477 subdir.remove() 

478 self.assertTrue(subdir.exists()) 

479 

480 # Recursively removing a deep hierarchy of directories must succeed 

481 top = subdir 

482 for level in ["one", "two", "three", "four"]: 

483 subdir = subdir.join(level, forceDirectory=True) 

484 subdir.mkdir() 

485 self.assertTrue(subdir.exists()) 

486 self.assertTrue(subdir.isdir()) 

487 

488 top.remove_dir(recursive=True) 

489 self.assertFalse(top.exists()) 

490 

491 def test_dav_to_fsspec(self): 

492 if fsspec is None: 

493 self.skipTest("fsspec not available") 

494 

495 # Upload a randomly-generated file via write() with overwrite. 

496 local_file, file_size = self._generate_file() 

497 with open(local_file, "rb") as f: 

498 data = f.read() 

499 

500 remote_file = self.tmpdir.join(self._get_file_name()) 

501 self.assertIsNone(remote_file.write(data, overwrite=True)) 

502 self.assertTrue(remote_file.exists()) 

503 self.assertEqual(remote_file.size(), file_size) 

504 

505 # Ensure that the contents of the remote file we just 

506 # uploaded is identical to the contents of that file when 

507 # retrieved via fsspec.open(), with and without a context manager. 

508 fsys, path = remote_file.to_fsspec() 

509 file = fsys.open(path) 

510 self.assertEqual(data, file.read()) 

511 file.close() 

512 

513 with fsys.open(path) as file: 

514 self.assertEqual(data, file.read()) 

515 

516 # Ensure the file system inherits from `fsspec.AbstractFileSystem` 

517 # as parquet expects that. 

518 self.assertTrue(isinstance(fsys, fsspec.AbstractFileSystem)) 

519 

520 # Ensure properties of the remote file are consistent with those 

521 # same properties retrieved via the file system. 

522 self.assertTrue(path, remote_file.geturl()) 

523 self.assertTrue(fsys.exists(path)) 

524 self.assertTrue(fsys.isfile(path)) 

525 self.assertFalse(fsys.isdir(path)) 

526 self.assertEqual(fsys.size(path), remote_file.size()) 

527 self.assertEqual(remote_file._stat().last_modified, fsys.modified(path)) 

528 

529 info = fsys.info(path) 

530 self.assertTrue(info["name"], path) 

531 self.assertTrue(info["type"], "file") 

532 self.assertTrue(info["size"], remote_file.size()) 

533 

534 # Ensure that the file system raises with methods attempting to 

535 # modifying the file system. 

536 with self.assertRaises(NotImplementedError): 

537 fsys.rm(path) 

538 

539 # Ensure that the file system raises with methods not implemented. 

540 with self.assertRaises(NotImplementedError): 

541 fsys.mkdir("a/b/c") 

542 

543 not_implemented_methods = [ 

544 fsys.mkdir, 

545 fsys.makedirs, 

546 fsys.rmdir, 

547 fsys.walk, 

548 fsys.find, 

549 fsys.walk, 

550 fsys.find, 

551 fsys.du, 

552 fsys.glob, 

553 fsys.rm_file, 

554 fsys.rm, 

555 fsys.touch, 

556 fsys.ukey, 

557 fsys.created, 

558 ] 

559 for method in not_implemented_methods: 

560 with self.assertRaises(NotImplementedError): 

561 method(path="xxx") 

562 

563 # Ensure that the file system raises with methods with any path 

564 # different from the only file returned by to_fsspec(). 

565 file_not_found_methods = [fsys.info, fsys.ls, fsys.modified, fsys.size] 

566 for method in file_not_found_methods: 

567 with self.assertRaises(FileNotFoundError): 

568 method(path="xxx") 

569 

570 def test_dav_parquet_read(self): 

571 # Check we can read a parquet file via to_fsspec() 

572 

573 if fsspec is None: 

574 self.skipTest("fsspec not available") 

575 

576 try: 

577 import numpy as np 

578 import pyarrow as pa # type: ignore 

579 import pyarrow.parquet as pq # type: ignore 

580 

581 # Create a local parquet file and upload it. Ensure it is bigger 

582 # than the default buffer size so that it is not entirely 

583 # cached when read by parquet library. 

584 num_rows = 1_000_000 

585 data = { 

586 "one": np.arange(num_rows, dtype=np.int32), 

587 "two": np.arange(num_rows, dtype=np.int64), 

588 "three": np.arange(num_rows, dtype=np.float64), 

589 } 

590 table = pa.Table.from_pydict(data) 

591 local_file = self._make_local_temp_file() 

592 pq.write_table(table, local_file) 

593 local_file_size = os.stat(local_file).st_size 

594 

595 remote_file = self.tmpdir.join("file.parquet") 

596 with open(local_file, "rb") as file: 

597 remote_file.write(file, overwrite=True) 

598 

599 self.assertTrue(remote_file.exists()) 

600 self.assertEqual(remote_file.size(), local_file_size) 

601 

602 # Read the remote file we just uploaded via parquet, using 

603 # similar function as used by 

604 # `lsst.daf.butler.formatters.ParquetFormatter`. 

605 fsys, path = remote_file.to_fsspec() 

606 schema = pq.read_schema(path, filesystem=fsys) 

607 for column in data.keys(): 

608 self.assertTrue(column in schema.names) 

609 

610 table = pq.read_table(path, filesystem=fsys, use_threads=True, use_pandas_metadata=False) 

611 for column in data.keys(): 

612 self.assertTrue(column in table.column_names) 

613 self.assertEqual(table.num_rows, num_rows) 

614 self.assertEqual(table.num_columns, len(data.keys())) 

615 

616 # Convert the parquet table to a Python dictionnary and compare 

617 # its contents with the data originally used to create the 

618 # parquet file. 

619 data_from_parquet = table.to_pydict() 

620 for column in data.keys(): 

621 self.assertTrue(np.array_equal(data[column], data_from_parquet[column])) 

622 

623 except ImportError: 

624 self.skipTest("numpy or pyarrow are not available") 

625 

626 def test_dav_zip(self): 

627 # Check we can read back a zip file 

628 

629 # Create a local zip file composed of a random number of identical 

630 # files. 

631 local_file, local_file_size = self._generate_file() 

632 local_file_digest = self._compute_digest_for_file(local_file) 

633 

634 num_members = random.randint(10, 20) 

635 basename = os.path.basename(local_file) 

636 member_names = [f"{basename}-{i}" for i in range(num_members)] 

637 zip_file_name = self._make_local_temp_file() 

638 with ZipFile(zip_file_name, mode="w") as zf: 

639 for name in member_names: 

640 zf.write(local_file, name) 

641 

642 # Upload the zip file to the server 

643 with open(zip_file_name, mode="rb") as file: 

644 remote_zip_file = self.tmpdir.join("example.zip") 

645 self.assertIsNone(remote_zip_file.write(file, overwrite=True)) 

646 self.assertEqual(os.stat(zip_file_name).st_size, remote_zip_file.size()) 

647 

648 # Read the zip file back and check its contents. 

649 with remote_zip_file.open("rb") as fd: 

650 # Check the names of member files match 

651 zf = ZipFile(fd) 

652 zip_members = [info.filename for info in zf.infolist()] 

653 for name in member_names: 

654 self.assertTrue(name in zip_members) 

655 

656 # Check the sizes of member files match 

657 for file_size in [info.file_size for info in zf.infolist()]: 

658 self.assertTrue(file_size, local_file_size) 

659 

660 # Check that the contents of a randomly-selected member file 

661 # is identical to the original file. 

662 random_member = random.choice(zf.infolist()) 

663 with zf.open(random_member) as member: 

664 self.assertEqual(local_file_digest, self._compute_digest(member.read())) 

665 

666 # Concurrently read all the members of the remote Zip file 

667 def download_zip_member(uri: ResourcePath, zinfo: ZipInfo) -> tuple[int, str]: 

668 # Download the member of Zip file at `uri` designated by `zinfo` 

669 # and return its size in bytes and a checksum of its contents. 

670 with uri.open("rb") as fd: 

671 with ZipFile(fd).open(zinfo.filename) as member: 

672 data = member.read() 

673 return len(data), self._compute_digest(data) 

674 

675 with remote_zip_file.open("rb") as fd: 

676 executor = concurrent.futures.ThreadPoolExecutor(max_workers=10) 

677 zfile_components = ZipFile(fd).infolist() 

678 futures = [ 

679 executor.submit(download_zip_member, remote_zip_file, zinfo) for zinfo in zfile_components 

680 ] 

681 

682 # Gather results and check they match the expected values 

683 for future in concurrent.futures.as_completed(futures): 

684 member_size, member_digest = future.result() 

685 self.assertEqual(member_size, local_file_size) 

686 self.assertEqual(member_digest, local_file_digest) 

687 

688 def test_dav_get_info(self): 

689 # Missing resources now raise instead of returning a partial dict. 

690 subdir = self.tmpdir.join("inexistent", forceDirectory=True) 

691 with self.assertRaises(FileNotFoundError): 

692 subdir.get_info() 

693 

694 # Retrieve and check metadata details about an existing directory 

695 subdir = self.tmpdir.join(self._get_dir_name(), forceDirectory=True) 

696 self.assertIsNone(subdir.mkdir()) 

697 self.assertTrue(subdir.exists()) 

698 metadata = subdir.get_info() 

699 self.assertIsInstance(metadata, ResourceInfo) 

700 

701 self.assertFalse(metadata.is_file) 

702 self.assertEqual(metadata.size, 0) 

703 self.assertEqual(len(metadata.checksums), 0) 

704 self.assertEqual(metadata.last_modified.tzinfo, datetime.UTC) 

705 self.assertEqual(metadata.last_modified, subdir._stat().last_modified) 

706 

707 # Retrieve and check metadata details about existing file 

708 local_file, local_file_size = self._generate_file() 

709 with open(local_file, "rb") as file: 

710 content = file.read() 

711 md5_checksum = self._compute_digest(content, algorithm="md5") 

712 adler32_checksum = self._compute_digest(content, algorithm="adler32") 

713 

714 remote_file = self.tmpdir.join("example.data") 

715 with open(local_file, mode="rb") as file: 

716 self.assertIsNone(remote_file.write(file, overwrite=True)) 

717 self.assertEqual(os.stat(local_file).st_size, remote_file.size()) 

718 

719 metadata = remote_file.get_info() 

720 self.assertIsInstance(metadata, ResourceInfo) 

721 self.assertTrue(metadata.is_file) 

722 self.assertEqual(metadata.size, local_file_size) 

723 self.assertEqual(metadata.last_modified.tzinfo, datetime.UTC) 

724 self.assertEqual(metadata.last_modified, remote_file._stat().last_modified) 

725 

726 checksums = metadata.checksums 

727 if "md5" in checksums: 

728 self.assertEqual(checksums["md5"], md5_checksum) 

729 if "adler32" in checksums: 

730 self.assertEqual(checksums["adler32"], adler32_checksum) 

731 

732 @classmethod 

733 def _get_port_number(cls) -> int: 

734 """Return a port number the webDAV server can use to listen to.""" 

735 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 

736 s.bind(("127.0.0.1", 0)) 

737 s.listen() 

738 port = s.getsockname()[1] 

739 s.close() 

740 return port 

741 

742 def _serve_webdav(self, local_path: str, port: int, stop_webdav_server: Callable[[], bool]): 

743 """Start a local webDAV server, listening on http://localhost:port 

744 and exposing local_path. 

745 

746 This server only runs when this test class is instantiated, 

747 and then shuts down. The server must be started is a separate thread. 

748 

749 Parameters 

750 ---------- 

751 port : `int` 

752 The port number on which the server should listen 

753 local_path : `str` 

754 Path to an existing local directory for the server to expose. 

755 stop_webdav_server : `Callable[[], bool]` 

756 Boolean function which returns True when the server should be 

757 stopped. 

758 """ 

759 try: 

760 # Start the wsgi server in a separate thread 

761 config = { 

762 "host": "127.0.0.1", 

763 "port": port, 

764 "provider_mapping": {"/": local_path}, 

765 "http_authenticator": {"domain_controller": None}, 

766 "simple_dc": {"user_mapping": {"*": True}}, 

767 "verbose": 0, 

768 "lock_storage": False, 

769 "dir_browser": { 

770 "enable": False, 

771 "ms_sharepoint_support": False, 

772 "libre_office_support": False, 

773 "response_trailer": False, 

774 "davmount_links": False, 

775 }, 

776 } 

777 server = wsgi.Server(wsgi_app=WsgiDAVApp(config), bind_addr=(config["host"], config["port"])) 

778 t = Thread(target=server.start, daemon=True) 

779 t.start() 

780 

781 # Shut down the server when done: stop_webdav_server() returns 

782 # True when this test suite is being teared down 

783 while not stop_webdav_server(): 

784 time.sleep(1) 

785 except KeyboardInterrupt: 

786 # Caught Ctrl-C, shut down the server 

787 pass 

788 finally: 

789 server.stop() 

790 t.join() 

791 

792 @classmethod 

793 def _get_name(cls, prefix: str) -> str: 

794 alphabet = string.ascii_lowercase + string.digits 

795 return f"{prefix}-" + "".join(random.choices(alphabet, k=8)) 

796 

797 @classmethod 

798 def _get_dir_name(cls) -> str: 

799 """Return a randomly selected name for a file""" 

800 return cls._get_name(prefix="dir") 

801 

802 @classmethod 

803 def _get_file_name(cls, depth: int = 0) -> str: 

804 """Return a randomly selected name for a file. 

805 

806 Parameters 

807 ---------- 

808 depth : `int` 

809 Number of intermediate directory names to prefix the filename 

810 with. 

811 """ 

812 file_name = cls._get_name(prefix="file") 

813 if depth == 0: 

814 return file_name 

815 

816 # Generate names for intermediate directories 

817 components = [cls._get_dir_name() for _ in range(depth)] 

818 components.append(file_name) 

819 return "/".join(components) 

820 

821 def _generate_file(self, remove_when_done=True) -> tuple[str, int]: 

822 """Create a local file of random size with random contents. 

823 

824 Returns 

825 ------- 

826 path : `str` 

827 Path to local temporary file. The caller is responsible for 

828 removing the file when appropriate. 

829 size : `int` 

830 Size of the generated file, in bytes. 

831 """ 

832 megabyte = 1024 * 1024 

833 size = random.randint(2 * megabyte, 5 * megabyte) 

834 tmpfile, path = tempfile.mkstemp() 

835 self.assertEqual(os.write(tmpfile, os.urandom(size)), size) 

836 os.close(tmpfile) 

837 

838 if remove_when_done: 

839 DavReadWriteTestCase.local_files_to_remove.append(path) 

840 

841 return path, size 

842 

843 def _make_local_temp_file(self, remove_when_done=True) -> str: 

844 """Create an empty local temporary file. 

845 

846 Returns 

847 ------- 

848 path : `str` 

849 Path to local temporary file. The caller is responsible for 

850 removing the file when appropriate. 

851 """ 

852 tmpfile, path = tempfile.mkstemp() 

853 os.close(tmpfile) 

854 

855 if remove_when_done: 

856 DavReadWriteTestCase.local_files_to_remove.append(path) 

857 

858 return path 

859 

860 @classmethod 

861 def _compute_digest(cls, data: bytes, algorithm: str = "sha256") -> str: 

862 """Compute a hash of data.""" 

863 match algorithm: 

864 case "sha256" | "md5": 

865 m = hashlib.new(algorithm) 

866 m.update(data) 

867 return m.hexdigest().lower() 

868 case "adler32": 

869 return f"{zlib.adler32(data):08x}" 

870 case _: 

871 raise ValueError(f"unsupported checksum algorithm {algorithm}") 

872 

873 @classmethod 

874 def _compute_digest_for_file(cls, filename: str, algorithm: str = "sha256") -> str: 

875 """Compute a hash of the contents of file `filename`.""" 

876 with open(filename, "rb") as file: 

877 return cls._compute_digest(file.read(), algorithm) 

878 

879 @classmethod 

880 def _is_server_running(cls, port: int) -> bool: 

881 """Return True if there is a server listening on local address 

882 127.0.0.1:<port>. 

883 """ 

884 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: 

885 try: 

886 s.connect(("127.0.0.1", port)) 

887 return True 

888 except ConnectionRefusedError: 

889 return False 

890 

891 

892class DavResourcePathConfigTestCase(unittest.TestCase): 

893 """Test for the DavResourcePathConfig class.""" 

894 

895 def setUp(self): 

896 # Prepare temporary directory 

897 self.tmpdir = ResourcePath(makeTestTempDir(TESTDIR)) 

898 

899 # Reinitialize globals. 

900 dav_globals._reset() 

901 

902 def tearDown(self): 

903 # Clean up temporary directory 

904 if self.tmpdir and self.tmpdir.isLocal: 

905 removeTestTempDir(self.tmpdir.ospath) 

906 

907 # Reinitialize globals. 

908 dav_globals._reset() 

909 

910 def test_dav_tmpdir_buffersize_default(self): 

911 # Ensure that the configuration is initialized with the temporary 

912 # directory extracted from the environment. 

913 config: DavResourcePathConfig = dav_globals.config() 

914 tmpdir, _ = config.tmpdir_buffersize 

915 self.assertEqual(tmpdir, get_tempdir()) 

916 

917 

918class DavConfigPoolTestCase(unittest.TestCase): 

919 """Test for the DavConfig class.""" 

920 

921 @classmethod 

922 def setUpClass(cls): 

923 cls.tmpdir = tempfile.mkdtemp(prefix="webdav-config-test-") 

924 

925 @classmethod 

926 def tearDownClass(cls): 

927 if cls.tmpdir: 

928 shutil.rmtree(cls.tmpdir, ignore_errors=True) 

929 

930 def tearDown(self): 

931 if self.tmpdir: 

932 shutil.rmtree(self.tmpdir) 

933 

934 def setUp(self): 

935 self.tmpdir = makeTestTempDir(TESTDIR) 

936 # Reinitialize globals. 

937 dav_globals._reset() 

938 

939 def test_dav_default_config(self): 

940 """Ensure default configuration is used by default.""" 

941 # Ensure the variable LSST_RESOURCES_WEBDAV_CONFIG is not initialized 

942 # so that we use the default configuration. 

943 with unittest.mock.patch.dict(os.environ, {}, clear=True): 

944 config_pool: DavConfigPool = DavConfigPool() 

945 config = config_pool.get_config_for_url("davs://example.org") 

946 self.assertEqual(config.retries, DavConfig.DEFAULT_RETRIES) 

947 self.assertEqual(config.timeout_connect, DavConfig.DEFAULT_TIMEOUT_CONNECT) 

948 self.assertEqual(config.timeout_read, DavConfig.DEFAULT_TIMEOUT_READ) 

949 self.assertEqual(config.token, DavConfig.DEFAULT_TOKEN) 

950 self.assertEqual( 

951 config.persistent_connections_per_host, DavConfig.DEFAULT_PERSISTENT_CONNECTIONS_PER_HOST 

952 ) 

953 

954 def test_dav_configuration_file_does_not_exist(self): 

955 # Ensure an exception is raised if the configuration file pointed to 

956 # by the environment variable does not exist. 

957 with unittest.mock.patch.dict(os.environ, {"MY_VAR": "/does/not/exist"}, clear=True): 

958 with self.assertRaises(FileNotFoundError): 

959 DavConfigPool("MY_VAR") 

960 

961 def test_dav_configuration_file(self): 

962 """Ensure the specified configuration file is used.""" 

963 config_contents: str = r""" 

964- base_url: "davs://host1.example.org:1234/" 

965 persistent_connections_per_host: 10 

966 timeout_connect: 20.0 

967 timeout_read: 120.0 

968 retries: 3 

969 retry_backoff_min: 1.0 

970 retry_backoff_max: 3.0 

971 user_cert: "${X509_USER_PROXY}" 

972 user_key: "${X509_USER_PROXY}" 

973 trusted_authorities: "/etc/grid-security/certificates" 

974 buffer_size: 5 

975 block_size: 18 

976 enable_fsspec: false 

977 collect_memory_usage: false 

978 request_checksum: "md5" 

979- base_url: "davs://host2.example.org:4321/" 

980 persistent_connections_per_host: 1 

981- base_url: "davs://host3.example.org:4321/" 

982 token: "ABCDEF" 

983- base_url: "dav://host4.example.org:5555/" 

984 user_name: "user" 

985 user_password: "password" 

986 reuse_connection: false 

987 frontend_base_urls: 

988 - "dav://frontend1.example.org:5555/" 

989 - "dav://frontend2.example.org:5555/" 

990 - "dav://frontend3.example.org:5555/" 

991""" 

992 

993 config_file = self._create_config(config_contents) 

994 with unittest.mock.patch.dict(os.environ, {"LSST_RESOURCES_WEBDAV_CONFIG": config_file}, clear=True): 

995 config_pool: DavConfigPool = DavConfigPool("LSST_RESOURCES_WEBDAV_CONFIG") 

996 

997 # Tests for base URL 'davs://host1.example.org:1234' 

998 config = config_pool.get_config_for_url("davs://host1.example.org:1234/any/path") 

999 self.assertEqual(config.base_url, "https://host1.example.org:1234/") 

1000 self.assertEqual(config.retries, 3) 

1001 self.assertEqual(config.token, DavConfig.DEFAULT_TOKEN) 

1002 self.assertEqual(config.user_cert, "${X509_USER_PROXY}") 

1003 self.assertEqual(config.trusted_authorities, "/etc/grid-security/certificates") 

1004 self.assertFalse(config.enable_fsspec) 

1005 self.assertFalse(config.collect_memory_usage) 

1006 self.assertEqual(config.request_checksum, "md5") 

1007 self.assertEqual(config.persistent_connections_per_host, 10) 

1008 self.assertEqual(config.block_size, 18 * 1024 * 1024) 

1009 

1010 # Tests for base URL 'davs://host2.example.org:4321/' 

1011 config = config_pool.get_config_for_url("davs://host2.example.org:4321") 

1012 self.assertEqual(config.base_url, "https://host2.example.org:4321/") 

1013 self.assertEqual(config.persistent_connections_per_host, 1) 

1014 

1015 # Tests for base URL 'davs://host3.example.org:4321/' 

1016 config = config_pool.get_config_for_url("davs://host3.example.org:4321") 

1017 self.assertEqual(config.base_url, "https://host3.example.org:4321/") 

1018 self.assertEqual(config.token, "ABCDEF") 

1019 self.assertEqual(config.retries, DavConfig.DEFAULT_RETRIES) 

1020 

1021 # Tests for base URL 'dav://host4.example.org:5555/' 

1022 config = config_pool.get_config_for_url("dav://host4.example.org:5555/") 

1023 self.assertEqual(config.base_url, "http://host4.example.org:5555/") 

1024 self.assertEqual(config.user_name, "user") 

1025 self.assertEqual(config.user_password, "password") 

1026 self.assertFalse(config.reuse_connection) 

1027 for i in range(1, 3): 

1028 url = f"http://frontend{i}.example.org:5555/" 

1029 self.assertTrue(url in config.frontend_urls) 

1030 

1031 # Test that the schemes of the base URL and the frontend URLs of a 

1032 # given endpoint are checked and must be identical. 

1033 config_contents: str = r""" 

1034- base_url: "dav://host5.example.org:5555/" 

1035 frontend_base_urls: 

1036 - "davs://frontend1.example.org:5555/" 

1037""" 

1038 config_file = self._create_config(config_contents) 

1039 with unittest.mock.patch.dict(os.environ, {"LSST_RESOURCES_WEBDAV_CONFIG": config_file}, clear=True): 

1040 with self.assertRaises(ValueError): 

1041 DavConfigPool("LSST_RESOURCES_WEBDAV_CONFIG") 

1042 

1043 def test_dav_repeated_configurations(self): 

1044 """Ensure duplicated endpoint errors are detected in configuration 

1045 file. 

1046 """ 

1047 config_contents: str = r""" 

1048- base_url: "davs://host1.example.org:1234/" 

1049- base_url: "davs://host1.example.org:1234/" 

1050""" 

1051 config_file = self._create_config(config_contents) 

1052 with unittest.mock.patch.dict(os.environ, {"MY_VAR": config_file}, clear=True): 

1053 with self.assertRaises(ValueError): 

1054 DavConfigPool("MY_VAR") 

1055 

1056 def _create_config(self, config: str) -> str: 

1057 with tempfile.NamedTemporaryFile(mode="wt", dir=self.tmpdir, delete=False) as f: 

1058 f.write(config) 

1059 return f.name 

1060 

1061 

1062class DavTokenAuthorizerTestCase(unittest.TestCase): 

1063 """Test for the TokenAuthorizer class.""" 

1064 

1065 def setUp(self): 

1066 self.tmpdir = ResourcePath(makeTestTempDir(TESTDIR)) 

1067 self.token = "ABCDE1234" 

1068 

1069 def tearDown(self): 

1070 if self.tmpdir and self.tmpdir.isLocal: 

1071 removeTestTempDir(self.tmpdir.ospath) 

1072 

1073 def test_dav_empty_token(self): 

1074 """Ensure that when no token is provided the headers are not 

1075 modified. 

1076 """ 

1077 authorizer = TokenAuthorizer() 

1078 headers = {} 

1079 authorizer.set_authorization(headers) 

1080 self.assertIsNone(headers.get("Authorization")) 

1081 

1082 def test_dav_token_value(self): 

1083 """Ensure that when a token value is provided, the 'Authorization' 

1084 header is added to the requests. 

1085 """ 

1086 authorizer = TokenAuthorizer(self.token) 

1087 headers = {} 

1088 authorizer.set_authorization(headers) 

1089 self.assertEqual(headers.get("Authorization"), f"Bearer {self.token}") 

1090 

1091 def test_dav_token_file(self): 

1092 """Ensure when the provided token is a file path, its contents is 

1093 correctly added as calue of the 'Authorization' header. 

1094 """ 

1095 with tempfile.NamedTemporaryFile(mode="wt", dir=self.tmpdir.ospath, delete=False) as f: 

1096 f.write(self.token) 

1097 token_file_path = f.name 

1098 

1099 # Ensure the request's "Authorization" header is set with the right 

1100 # token value 

1101 os.chmod(token_file_path, stat.S_IRUSR) 

1102 authorizer = TokenAuthorizer(token_file_path) 

1103 headers = {} 

1104 authorizer.set_authorization(headers) 

1105 self.assertEqual(headers.get("Authorization"), f"Bearer {self.token}") 

1106 

1107 # Ensure an exception is raised if either group or other can read the 

1108 # token file 

1109 for mode in (stat.S_IRGRP, stat.S_IWGRP, stat.S_IXGRP, stat.S_IROTH, stat.S_IWOTH, stat.S_IXOTH): 

1110 os.chmod(token_file_path, stat.S_IRUSR | mode) 

1111 with self.assertRaises(PermissionError): 

1112 TokenAuthorizer(token_file_path) 

1113 

1114 

1115if __name__ == "__main__": 

1116 unittest.main()