Coverage for python / lsst / daf / butler / remote_butler / authentication / cadc.py: 0%
27 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 08:42 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 08:42 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30import os
31from fnmatch import fnmatchcase
32from urllib.parse import urlparse
34from .interface import RemoteButlerAuthenticationProvider
37class CadcAuthenticationProvider(RemoteButlerAuthenticationProvider):
38 """
39 Represents an authentication provider for remote Butler services specific
40 to CADC connection requirements.
42 This class handles the creation and management of authentication headers
43 required for interaction with remote Butler services by handling bearer
44 tokens. It ensures that the object is pickleable as it may need to be
45 serialized and transferred between processes for file transfer operations.
47 Parameters
48 ----------
49 access_token : `str`
50 The bearer token used for authentication with CADC StorageInventory.
51 """
53 # NOTE -- This object needs to be pickleable. It will sometimes be
54 # serialized and transferred to another process to execute file transfers.
56 def __init__(self, access_token: str):
57 # Access tokens are opaque bearer tokens. See https://sqr-069.lsst.io/
58 self._headers = {"Authorization": f"Bearer {access_token}"}
60 @staticmethod
61 def create_from_environment(server_url: str) -> CadcAuthenticationProvider:
62 access_token = _get_authentication_token_from_environment(server_url)
63 if access_token is None:
64 raise RuntimeError(
65 "Attempting to connect to Butler server,"
66 " but no access credentials were found in the environment."
67 )
68 return CadcAuthenticationProvider(access_token)
70 def get_server_headers(self) -> dict[str, str]:
71 return {}
73 def get_datastore_headers(self) -> dict[str, str]:
74 return self._headers
77_SERVER_WHITELIST = ["*.cadc-ccda.hia-hia.nrc-cnrc.gc.ca", "*.canfar.net", "host.docker.internal"]
78_CADC_TOKEN_ENVIRONMENT_KEY = "CADC_TOKEN"
81def _get_authentication_token_from_environment(server_url: str) -> str | None:
82 """
83 Retrieve an authentication token from the environment.
85 This function checks if the provided server URL's hostname matches any
86 pattern in the server whitelist and if a valid token is available in
87 the environment variable. If both conditions are satisfied, the token is
88 returned; otherwise, None is returned.
90 Parameters
91 ----------
92 server_url : The URL of the server for which an authentication
93 token is being retrieved.
95 Returns
96 -------
97 str | None
98 The authentication token if available and hostname matches
99 the whitelist; otherwise, None.
100 """
101 hostname = urlparse(server_url.lower()).hostname
102 hostname_in_whitelist = any(hostname and fnmatchcase(hostname, pattern) for pattern in _SERVER_WHITELIST)
103 notebook_token = os.getenv(_CADC_TOKEN_ENVIRONMENT_KEY)
104 if hostname_in_whitelist and notebook_token:
105 return notebook_token
107 return None