Coverage for python / lsst / daf / butler / remote_butler / server / _gafaelfawr.py: 0%

47 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-28 08:36 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30from collections import defaultdict 

31 

32import httpx 

33import pydantic 

34 

35from ..authentication.rubin import RubinAuthenticationProvider 

36 

37 

38class GafaelfawrClient: 

39 """REST client for retrieving authentication information from 

40 Gafaelfawr. 

41 

42 Parameters 

43 ---------- 

44 base_url : `str` 

45 The top-level HTTP path where Gafaelfawr can be found (e.g. 

46 ``"https://data-int.lsst.cloud/auth"``). 

47 transport : ``httpx.AsyncBaseTransport``, optional 

48 Override the HTTP client's transport. (For unit tests). 

49 """ 

50 

51 def __init__(self, base_url: str, *, transport: httpx.AsyncBaseTransport | None = None) -> None: 

52 if transport is None: 

53 transport = httpx.AsyncHTTPTransport(retries=3) 

54 self._client = httpx.AsyncClient(base_url=base_url, transport=transport, timeout=20.0) 

55 

56 async def get_groups(self, user_token: str) -> list[str]: 

57 response = await self._client.get( 

58 "/api/v1/user-info", headers=RubinAuthenticationProvider(user_token).get_server_headers() 

59 ) 

60 response.raise_for_status() 

61 info = _GafaelfawrUserInfo.model_validate_json(response.content) 

62 if info.groups is None: 

63 return [] 

64 return [group.name for group in info.groups] 

65 

66 

67class _GafaelfawrUserInfo(pydantic.BaseModel): 

68 username: str 

69 groups: list[_GafaelfawrGroup] | None = None 

70 

71 

72class _GafaelfawrGroup(pydantic.BaseModel): 

73 name: str 

74 

75 

76class GafaelfawrGroupAuthorizer: 

77 """Authorizes access to Butler repositories on the basis of Gafaelfawr 

78 groups. 

79 

80 Parameters 

81 ---------- 

82 client : `GafaelfawrClient` 

83 `GafaelfawrClient` instance that will be used to access group 

84 information. 

85 repository_groups : `dict` [ `str, `list` [ `str` ]] 

86 Mapping from repository name to list of Gafaelfawr groups authorized to 

87 access that repository. If a user is a member of any one of the groups 

88 in the list, access will be granted. 

89 """ 

90 

91 def __init__(self, client: GafaelfawrClient, repository_groups: dict[str, list[str]]) -> None: 

92 self._client = client 

93 self._repository_groups = repository_groups 

94 self._cache: dict[str, set[str]] = defaultdict(set) 

95 

96 async def is_user_authorized_for_repository( 

97 self, *, repository: str, user_name: str, user_token: str 

98 ) -> bool: 

99 allowed_groups = self._repository_groups.get(repository) 

100 if allowed_groups is None: 

101 raise ValueError(f"Unknown repository '${repository}'") 

102 

103 if "*" in allowed_groups: 

104 return True 

105 

106 if user_name in self._cache[repository]: 

107 return True 

108 

109 user_groups = await self._client.get_groups(user_token) 

110 if any(group in allowed_groups for group in user_groups): 

111 self._cache[repository].add(user_name) 

112 return True 

113 

114 return False 

115 

116 

117class MockGafaelfawrGroupAuthorizer: 

118 """Mock implementation of ``GafaelfawrGroupAuthorizer`` for unit tests.""" 

119 

120 def __init__(self) -> None: 

121 self._response = True 

122 

123 def set_response(self, value: bool) -> None: 

124 self._response = value 

125 

126 async def is_user_authorized_for_repository(self, **kwargs: str) -> bool: 

127 return self._response