Coverage for python / lsst / ctrl / bps / panda / panda_auth_utils.py: 14%
87 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:50 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:50 +0000
1# This file is part of ctrl_bps_panda.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28"""Functions for each panda_auth subcommand."""
30__all__ = [
31 "panda_auth_clean",
32 "panda_auth_expiration",
33 "panda_auth_refresh",
34 "panda_auth_setup",
35 "panda_auth_status",
36 "panda_auth_update",
37]
40import base64
41import json
42import logging
43import os
44from datetime import UTC, datetime, timedelta
46import idds.common.utils as idds_utils
47import pandaclient.idds_api
48from pandaclient.openidc_utils import OpenIdConnect_Utils
50from lsst.ctrl.bps.panda.panda_exceptions import (
51 AuthConfigError,
52 PandaAuthError,
53 TokenExpiredError,
54 TokenNotFoundError,
55 TokenRefreshError,
56 TokenTooEarlyError,
57)
59_LOG = logging.getLogger(__name__)
62def panda_auth_clean():
63 """Clean up token and token cache files."""
64 open_id = panda_auth_setup()
65 open_id.cleanup()
68def panda_auth_expiration():
69 """Get number of seconds until token expires.
71 Returns
72 -------
73 expiration : `int`
74 Number of seconds until token expires.
75 """
76 expiration = 0
77 ret = panda_auth_status()
78 if ret:
79 expiration = ret[-1]["exp"]
80 return expiration
83def panda_auth_setup():
84 """Initialize auth object used by various auth functions.
86 Returns
87 -------
88 open_id : `pandaclient.openidc_utils.OpenIdConnect_Utils`
89 Auth object which can interact with auth token.
90 """
91 for key in [
92 "PANDA_AUTH",
93 "PANDA_VERIFY_HOST",
94 "PANDA_AUTH_VO",
95 "PANDA_URL_SSL",
96 "PANDA_URL",
97 ]:
98 if key not in os.environ:
99 raise OSError(f"Missing environment variable {key}")
101 # OpenIdConnect_Utils have a verbose flag that filters
102 # some debugging messages. If user chose debug, just
103 # turn on all of the messages.
104 verbose = False
105 if _LOG.isEnabledFor(logging.DEBUG):
106 verbose = True
108 open_id = OpenIdConnect_Utils(None, log_stream=_LOG, verbose=verbose)
109 return open_id
112def panda_auth_status():
113 """Gather information about a token if it exists.
115 Returns
116 -------
117 status : `dict`
118 Status information about a token if it exists.
119 Includes filename and expiration epoch.
120 """
121 status = None
122 open_id = panda_auth_setup()
123 ret = open_id.check_token()
124 if ret and ret[0]:
125 # get_token_path will return the path even if a token doesn't
126 # currently exist. So check for token first via check_token, then
127 # add path.
128 status = {"filename": open_id.get_token_path()}
129 status.update(ret[-1])
130 return status
133def panda_auth_update(idds_server=None, reset=False):
134 """Get new auth token if needed or reset is True.
136 Parameters
137 ----------
138 idds_server : `str`, optional
139 URL for the iDDS server. Defaults to None which means that the
140 underlying functions use any value in the panda relay service.
141 reset : `bool`, optional
142 Whether to first clean up any previous token. Defaults to False.
143 """
144 if reset:
145 panda_auth_clean()
147 # Create client manager
148 # (There is a function in OpenIdConnect_Utils, but it takes several
149 # parameters. Letting the client manager do it is currently easiest
150 # way to match what happens when the workflow is actually submitted.)
151 cm = pandaclient.idds_api.get_api(
152 idds_utils.json_dumps, idds_host=idds_server, compress=True, manager=True, verbose=False
153 )
155 # Must call some function to actually check auth
156 # https://panda-wms.readthedocs.io/en/latest/client/notebooks/jupyter_setup.html#Get-an-OIDC-ID-token
157 ret = cm.get_status(request_id=0, with_detail=False)
158 _LOG.debug("get_status results: %s", ret)
160 # Check success
161 # https://panda-wms.readthedocs.io/en/latest/client/rest_idds.html
162 if ret[0] == 0 and ret[1][0]:
163 # The success keys from get_status currently do not catch if invalid
164 # idds server given. So for now, check result string for keywords.
165 if "request_id" not in ret[1][-1] or "status" not in ret[1][-1]:
166 raise RuntimeError(f"Error contacting PanDA service: {ret}")
169def panda_auth_refresh(days=4, verbose=False):
170 """
171 Refresh the current valid IAM OpenID authentication token.
173 This function checks the expiration time of the existing token stored
174 in the local token file and attempts to refresh it if it is close to
175 expiring (within a specified number of days).
177 Parameters
178 ----------
179 days : `int`, optional
180 The minimum number of days before token expiration to trigger a
181 refresh. If the token expires in more than this number of days,
182 the refresh is skipped. Default is 4.
183 verbose : `bool`, optional
184 If True, enables verbose output for debugging or logging.
185 Default is False.
187 Returns
188 -------
189 status: `dict`
190 A dictionary containing the refreshed token status.
191 """
192 panda_url = os.environ.get("PANDA_URL")
193 panda_auth_vo = os.environ.get("PANDA_AUTH_VO")
195 if not panda_url or not panda_auth_vo:
196 raise PandaAuthError("Missing required environment variables: PANDA_URL or PANDA_AUTH_VO")
198 url_prefix = panda_url.split("/server", 1)[0]
199 auth_url = f"{url_prefix}/auth/{panda_auth_vo}_auth_config.json"
200 open_id = OpenIdConnect_Utils(auth_url, log_stream=_LOG, verbose=verbose)
202 token_file = open_id.get_token_path()
203 if not os.path.exists(token_file):
204 raise TokenNotFoundError("Cannot find token file. Use 'panda_auth reset' to obtain a new token.")
206 with open(token_file) as f:
207 data = json.load(f)
208 enc = data["id_token"].split(".")[1]
209 enc += "=" * (-len(enc) % 4)
210 dec = json.loads(base64.urlsafe_b64decode(enc.encode()))
211 exp_time = datetime.fromtimestamp(dec["exp"], tz=UTC)
212 delta = exp_time - datetime.now(UTC)
213 minutes = delta.total_seconds() / 60
214 print(f"Token will expire in {minutes} minutes.")
215 print(f"Token expiration time : {exp_time.strftime('%Y-%m-%d %H:%M:%S')} UTC")
216 if delta < timedelta(minutes=0):
217 raise TokenExpiredError("Token already expired. Cannot refresh.")
218 elif delta > timedelta(days=days):
219 raise TokenTooEarlyError(
220 f"Too early to refresh. More than {days} day(s) until expiration.\n"
221 f"Use '--days' option to adjust threshold, e.g.:\n"
222 f" panda_auth refresh --days 10"
223 )
225 refresh_token_string = data["refresh_token"]
227 s, auth_config = open_id.fetch_page(open_id.auth_config_url)
228 if not s:
229 raise AuthConfigError("Failed to get Auth configuration.")
231 s, endpoint_config = open_id.fetch_page(auth_config["oidc_config_url"])
232 if not s:
233 raise AuthConfigError("Failed to get endpoint configuration.")
235 s, o = open_id.refresh_token(
236 endpoint_config["token_endpoint"],
237 auth_config["client_id"],
238 auth_config["client_secret"],
239 refresh_token_string,
240 )
242 if not s:
243 raise TokenRefreshError("Failed to refresh token.")
245 status = panda_auth_status()
246 if status:
247 exp_time = datetime.fromtimestamp(status["exp"], tz=UTC)
248 print(f"{'New expiration time:':23} {exp_time.strftime('%Y-%m-%d %H:%M:%S')} UTC")
249 print("Success to refresh token")
250 return status