Coverage for tests / test_authentication.py: 35%

35 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-01 08:17 +0000

1import unittest 

2 

3from lsst.daf.butler.tests.server_available import butler_server_import_error, butler_server_is_available 

4from lsst.daf.butler.tests.utils import mock_env 

5 

6if butler_server_is_available: 6 ↛ 7line 6 didn't jump to line 7 because the condition on line 6 was never true

7 from lsst.daf.butler.remote_butler.authentication import cadc 

8 from lsst.daf.butler.remote_butler.authentication.rubin import ( 

9 _EXPLICIT_BUTLER_ACCESS_TOKEN_ENVIRONMENT_KEY, 

10 _RSP_JUPYTER_ACCESS_TOKEN_ENVIRONMENT_KEY, 

11 RubinAuthenticationProvider, 

12 _get_authentication_token_from_environment, 

13 ) 

14 

15 

16@unittest.skipIf(not butler_server_is_available, butler_server_import_error) 

17class TestButlerClientAuthentication(unittest.TestCase): 

18 """Test access-token logic""" 

19 

20 def test_explicit_butler_token(self): 

21 with mock_env( 

22 { 

23 _EXPLICIT_BUTLER_ACCESS_TOKEN_ENVIRONMENT_KEY: "token1", 

24 _RSP_JUPYTER_ACCESS_TOKEN_ENVIRONMENT_KEY: "not-this-token", 

25 } 

26 ): 

27 token = _get_authentication_token_from_environment("https://untrustedserver.com") 

28 self.assertEqual(token, "token1") 

29 

30 def test_jupyter_token_with_safe_server(self): 

31 with mock_env({_RSP_JUPYTER_ACCESS_TOKEN_ENVIRONMENT_KEY: "token2"}): 

32 token = _get_authentication_token_from_environment("https://data.LSST.cloud/butler") 

33 self.assertEqual(token, "token2") 

34 

35 def test_jupyter_token_with_unsafe_server(self): 

36 with mock_env({_RSP_JUPYTER_ACCESS_TOKEN_ENVIRONMENT_KEY: "token2"}): 

37 token = _get_authentication_token_from_environment("https://untrustedserver.com/butler") 

38 self.assertIsNone(token) 

39 

40 def test_missing_token(self): 

41 with mock_env({}): 

42 token = _get_authentication_token_from_environment("https://data.lsst.cloud/butler") 

43 self.assertIsNone(token) 

44 

45 def test_header_generation(self): 

46 auth = RubinAuthenticationProvider("tokendata") 

47 self.assertEqual(auth.get_server_headers(), {"Authorization": "Bearer tokendata"}) 

48 # At the Rubin Science Platform, the server sends pre-signed URLs that 

49 # do not require authentication. 

50 self.assertEqual(auth.get_datastore_headers(), {}) 

51 

52 def test_cadc_auth(self): 

53 auth = cadc.CadcAuthenticationProvider("tokendata") 

54 self.assertEqual(auth.get_server_headers(), {}) 

55 self.assertEqual(auth.get_datastore_headers(), {"Authorization": "Bearer tokendata"}) 

56 with mock_env({cadc._CADC_TOKEN_ENVIRONMENT_KEY: "tokendata"}): 

57 token = cadc._get_authentication_token_from_environment("https://www.canfar.net/butler") 

58 self.assertEqual(token, "tokendata")