Coverage for python / lsst / dax / apdb / scripts / list_cassandra.py: 9%
39 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:46 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:46 +0000
1# This file is part of dax_apdb
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22from __future__ import annotations
24__all__ = ["list_cassandra"]
26from astropy.table import Table
28from ..cassandra.apdbCassandraAdmin import ApdbCassandraAdmin
31def list_cassandra(host: str, verbose: bool) -> None:
32 """List APDB instances in Cassandra cluster.
34 Parameters
35 ----------
36 host : `str`
37 Name of one of the hosts in Cassandra cluster.
38 verbose : `bool`
39 If `True` provide detailed output.
40 """
41 databases = ApdbCassandraAdmin.list_databases(host=host)
42 if databases:
43 if verbose:
44 table = Table(names=["Keyspace", "Role", "Permissions"], dtype=[str, str, str])
45 for database in databases:
46 if database.permissions is None:
47 # Can't access auth info.
48 table.add_row([database.name, "N/A", ""])
49 elif database.permissions:
50 for role, perm_list in database.permissions.items():
51 permissions = ", ".join(sorted(perm_list))
52 table.add_row([database.name, role, permissions])
53 else:
54 # Anonymous access should have all permissions, do not
55 # list any specific permissions for that. Anon access
56 # should not be enabled in production.
57 table.add_row([database.name, "anonymous", ""])
58 table.sort(["Keyspace", "Role"])
59 else:
60 table = Table(names=["Keyspace", "Roles[access]"], dtype=[str, str])
61 for database in databases:
62 if database.permissions is None:
63 # Can't access auth info.
64 table.add_row([database.name, "N/A"])
65 elif database.permissions:
66 roles = []
67 for role, perm_list in database.permissions.items():
68 access = _access(perm_list)
69 roles.append(f"{role}[{access}]")
70 table.add_row([database.name, ", ".join(sorted(roles))])
71 else:
72 table.add_row([database.name, "anonymous"])
73 table.sort("Keyspace")
75 table.pprint_all(align="<")
78def _access(permissions: set[str]) -> str:
79 """Convert list of Cassandra permissions into access mode string"""
80 if {"CREATE", "DROP"} & permissions:
81 return "manage"
82 elif "MODIFY" in permissions:
83 return "update"
84 elif "SELECT" in permissions:
85 return "read"
86 else:
87 return "none"