Coverage for python / lsst / dax / apdb / scripts / list_cassandra.py: 9%

39 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-28 08:43 +0000

1# This file is part of dax_apdb 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22from __future__ import annotations 

23 

24__all__ = ["list_cassandra"] 

25 

26from astropy.table import Table 

27 

28from ..cassandra.apdbCassandraAdmin import ApdbCassandraAdmin 

29 

30 

31def list_cassandra(host: str, verbose: bool) -> None: 

32 """List APDB instances in Cassandra cluster. 

33 

34 Parameters 

35 ---------- 

36 host : `str` 

37 Name of one of the hosts in Cassandra cluster. 

38 verbose : `bool` 

39 If `True` provide detailed output. 

40 """ 

41 databases = ApdbCassandraAdmin.list_databases(host=host) 

42 if databases: 

43 if verbose: 

44 table = Table(names=["Keyspace", "Role", "Permissions"], dtype=[str, str, str]) 

45 for database in databases: 

46 if database.permissions is None: 

47 # Can't access auth info. 

48 table.add_row([database.name, "N/A", ""]) 

49 elif database.permissions: 

50 for role, perm_list in database.permissions.items(): 

51 permissions = ", ".join(sorted(perm_list)) 

52 table.add_row([database.name, role, permissions]) 

53 else: 

54 # Anonymous access should have all permissions, do not 

55 # list any specific permissions for that. Anon access 

56 # should not be enabled in production. 

57 table.add_row([database.name, "anonymous", ""]) 

58 table.sort(["Keyspace", "Role"]) 

59 else: 

60 table = Table(names=["Keyspace", "Roles[access]"], dtype=[str, str]) 

61 for database in databases: 

62 if database.permissions is None: 

63 # Can't access auth info. 

64 table.add_row([database.name, "N/A"]) 

65 elif database.permissions: 

66 roles = [] 

67 for role, perm_list in database.permissions.items(): 

68 access = _access(perm_list) 

69 roles.append(f"{role}[{access}]") 

70 table.add_row([database.name, ", ".join(sorted(roles))]) 

71 else: 

72 table.add_row([database.name, "anonymous"]) 

73 table.sort("Keyspace") 

74 

75 table.pprint_all(align="<") 

76 

77 

78def _access(permissions: set[str]) -> str: 

79 """Convert list of Cassandra permissions into access mode string""" 

80 if {"CREATE", "DROP"} & permissions: 

81 return "manage" 

82 elif "MODIFY" in permissions: 

83 return "update" 

84 elif "SELECT" in permissions: 

85 return "read" 

86 else: 

87 return "none"