Coverage for python / lsst / ctrl / bps / panda / panda_auth_utils.py: 14%

87 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-28 08:50 +0000

1# This file is part of ctrl_bps_panda. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28"""Functions for each panda_auth subcommand.""" 

29 

30__all__ = [ 

31 "panda_auth_clean", 

32 "panda_auth_expiration", 

33 "panda_auth_refresh", 

34 "panda_auth_setup", 

35 "panda_auth_status", 

36 "panda_auth_update", 

37] 

38 

39 

40import base64 

41import json 

42import logging 

43import os 

44from datetime import UTC, datetime, timedelta 

45 

46import idds.common.utils as idds_utils 

47import pandaclient.idds_api 

48from pandaclient.openidc_utils import OpenIdConnect_Utils 

49 

50from lsst.ctrl.bps.panda.panda_exceptions import ( 

51 AuthConfigError, 

52 PandaAuthError, 

53 TokenExpiredError, 

54 TokenNotFoundError, 

55 TokenRefreshError, 

56 TokenTooEarlyError, 

57) 

58 

59_LOG = logging.getLogger(__name__) 

60 

61 

62def panda_auth_clean(): 

63 """Clean up token and token cache files.""" 

64 open_id = panda_auth_setup() 

65 open_id.cleanup() 

66 

67 

68def panda_auth_expiration(): 

69 """Get number of seconds until token expires. 

70 

71 Returns 

72 ------- 

73 expiration : `int` 

74 Number of seconds until token expires. 

75 """ 

76 expiration = 0 

77 ret = panda_auth_status() 

78 if ret: 

79 expiration = ret[-1]["exp"] 

80 return expiration 

81 

82 

83def panda_auth_setup(): 

84 """Initialize auth object used by various auth functions. 

85 

86 Returns 

87 ------- 

88 open_id : `pandaclient.openidc_utils.OpenIdConnect_Utils` 

89 Auth object which can interact with auth token. 

90 """ 

91 for key in [ 

92 "PANDA_AUTH", 

93 "PANDA_VERIFY_HOST", 

94 "PANDA_AUTH_VO", 

95 "PANDA_URL_SSL", 

96 "PANDA_URL", 

97 ]: 

98 if key not in os.environ: 

99 raise OSError(f"Missing environment variable {key}") 

100 

101 # OpenIdConnect_Utils have a verbose flag that filters 

102 # some debugging messages. If user chose debug, just 

103 # turn on all of the messages. 

104 verbose = False 

105 if _LOG.isEnabledFor(logging.DEBUG): 

106 verbose = True 

107 

108 open_id = OpenIdConnect_Utils(None, log_stream=_LOG, verbose=verbose) 

109 return open_id 

110 

111 

112def panda_auth_status(): 

113 """Gather information about a token if it exists. 

114 

115 Returns 

116 ------- 

117 status : `dict` 

118 Status information about a token if it exists. 

119 Includes filename and expiration epoch. 

120 """ 

121 status = None 

122 open_id = panda_auth_setup() 

123 ret = open_id.check_token() 

124 if ret and ret[0]: 

125 # get_token_path will return the path even if a token doesn't 

126 # currently exist. So check for token first via check_token, then 

127 # add path. 

128 status = {"filename": open_id.get_token_path()} 

129 status.update(ret[-1]) 

130 return status 

131 

132 

133def panda_auth_update(idds_server=None, reset=False): 

134 """Get new auth token if needed or reset is True. 

135 

136 Parameters 

137 ---------- 

138 idds_server : `str`, optional 

139 URL for the iDDS server. Defaults to None which means that the 

140 underlying functions use any value in the panda relay service. 

141 reset : `bool`, optional 

142 Whether to first clean up any previous token. Defaults to False. 

143 """ 

144 if reset: 

145 panda_auth_clean() 

146 

147 # Create client manager 

148 # (There is a function in OpenIdConnect_Utils, but it takes several 

149 # parameters. Letting the client manager do it is currently easiest 

150 # way to match what happens when the workflow is actually submitted.) 

151 cm = pandaclient.idds_api.get_api( 

152 idds_utils.json_dumps, idds_host=idds_server, compress=True, manager=True, verbose=False 

153 ) 

154 

155 # Must call some function to actually check auth 

156 # https://panda-wms.readthedocs.io/en/latest/client/notebooks/jupyter_setup.html#Get-an-OIDC-ID-token 

157 ret = cm.get_status(request_id=0, with_detail=False) 

158 _LOG.debug("get_status results: %s", ret) 

159 

160 # Check success 

161 # https://panda-wms.readthedocs.io/en/latest/client/rest_idds.html 

162 if ret[0] == 0 and ret[1][0]: 

163 # The success keys from get_status currently do not catch if invalid 

164 # idds server given. So for now, check result string for keywords. 

165 if "request_id" not in ret[1][-1] or "status" not in ret[1][-1]: 

166 raise RuntimeError(f"Error contacting PanDA service: {ret}") 

167 

168 

169def panda_auth_refresh(days=4, verbose=False): 

170 """ 

171 Refresh the current valid IAM OpenID authentication token. 

172 

173 This function checks the expiration time of the existing token stored 

174 in the local token file and attempts to refresh it if it is close to 

175 expiring (within a specified number of days). 

176 

177 Parameters 

178 ---------- 

179 days : `int`, optional 

180 The minimum number of days before token expiration to trigger a 

181 refresh. If the token expires in more than this number of days, 

182 the refresh is skipped. Default is 4. 

183 verbose : `bool`, optional 

184 If True, enables verbose output for debugging or logging. 

185 Default is False. 

186 

187 Returns 

188 ------- 

189 status: `dict` 

190 A dictionary containing the refreshed token status. 

191 """ 

192 panda_url = os.environ.get("PANDA_URL") 

193 panda_auth_vo = os.environ.get("PANDA_AUTH_VO") 

194 

195 if not panda_url or not panda_auth_vo: 

196 raise PandaAuthError("Missing required environment variables: PANDA_URL or PANDA_AUTH_VO") 

197 

198 url_prefix = panda_url.split("/server", 1)[0] 

199 auth_url = f"{url_prefix}/auth/{panda_auth_vo}_auth_config.json" 

200 open_id = OpenIdConnect_Utils(auth_url, log_stream=_LOG, verbose=verbose) 

201 

202 token_file = open_id.get_token_path() 

203 if not os.path.exists(token_file): 

204 raise TokenNotFoundError("Cannot find token file. Use 'panda_auth reset' to obtain a new token.") 

205 

206 with open(token_file) as f: 

207 data = json.load(f) 

208 enc = data["id_token"].split(".")[1] 

209 enc += "=" * (-len(enc) % 4) 

210 dec = json.loads(base64.urlsafe_b64decode(enc.encode())) 

211 exp_time = datetime.fromtimestamp(dec["exp"], tz=UTC) 

212 delta = exp_time - datetime.now(UTC) 

213 minutes = delta.total_seconds() / 60 

214 print(f"Token will expire in {minutes} minutes.") 

215 print(f"Token expiration time : {exp_time.strftime('%Y-%m-%d %H:%M:%S')} UTC") 

216 if delta < timedelta(minutes=0): 

217 raise TokenExpiredError("Token already expired. Cannot refresh.") 

218 elif delta > timedelta(days=days): 

219 raise TokenTooEarlyError( 

220 f"Too early to refresh. More than {days} day(s) until expiration.\n" 

221 f"Use '--days' option to adjust threshold, e.g.:\n" 

222 f" panda_auth refresh --days 10" 

223 ) 

224 

225 refresh_token_string = data["refresh_token"] 

226 

227 s, auth_config = open_id.fetch_page(open_id.auth_config_url) 

228 if not s: 

229 raise AuthConfigError("Failed to get Auth configuration.") 

230 

231 s, endpoint_config = open_id.fetch_page(auth_config["oidc_config_url"]) 

232 if not s: 

233 raise AuthConfigError("Failed to get endpoint configuration.") 

234 

235 s, o = open_id.refresh_token( 

236 endpoint_config["token_endpoint"], 

237 auth_config["client_id"], 

238 auth_config["client_secret"], 

239 refresh_token_string, 

240 ) 

241 

242 if not s: 

243 raise TokenRefreshError("Failed to refresh token.") 

244 

245 status = panda_auth_status() 

246 if status: 

247 exp_time = datetime.fromtimestamp(status["exp"], tz=UTC) 

248 print(f"{'New expiration time:':23} {exp_time.strftime('%Y-%m-%d %H:%M:%S')} UTC") 

249 print("Success to refresh token") 

250 return status