Coverage for python / lsst / dax / apdb / cassandra / sessionFactory.py: 30%
95 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-25 08:20 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-25 08:20 +0000
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
22from __future__ import annotations
24__all__ = ["SessionContext", "SessionFactory"]
26import logging
27from collections.abc import Mapping
28from contextlib import ExitStack
29from typing import TYPE_CHECKING, Any
31# If cassandra-driver is not there the module can still be imported
32# but ApdbCassandra cannot be instantiated.
33try:
34 import cassandra
35 import cassandra.query
36 from cassandra.auth import AuthProvider, PlainTextAuthProvider
37 from cassandra.cluster import EXEC_PROFILE_DEFAULT, Cluster, ExecutionProfile, Session
38 from cassandra.policies import AddressTranslator, RoundRobinPolicy, WhiteListRoundRobinPolicy
40 CASSANDRA_IMPORTED = True
41except ImportError:
42 CASSANDRA_IMPORTED = False
44from lsst.utils.db_auth import DbAuth, DbAuthNotFoundError
46from ..monitor import MonAgent
47from ..timer import Timer
48from .cassandra_utils import raw_data_factory
50if TYPE_CHECKING:
51 from .config import ApdbCassandraConfig
53_LOG = logging.getLogger(__name__)
55_MON = MonAgent(__name__)
58def _dump_query(rf: Any) -> None:
59 """Dump cassandra query to debug log."""
60 _LOG.debug("Cassandra query: %s", rf.query)
63if CASSANDRA_IMPORTED: 63 ↛ 78line 63 didn't jump to line 78 because the condition on line 63 was always true
65 class _AddressTranslator(AddressTranslator):
66 """Translate internal IP address to external.
68 Only used for docker-based setup, not a viable long-term solution.
69 """
71 def __init__(self, public_ips: tuple[str, ...], private_ips: tuple[str, ...]):
72 self._map = dict(zip(private_ips, public_ips))
74 def translate(self, private_ip: str) -> str:
75 return self._map.get(private_ip, private_ip)
78class SessionFactory:
79 """Implementation of SessionFactory that uses parameters from Apdb
80 configuration.
82 Parameters
83 ----------
84 config : `ApdbCassandraConfig`
85 Configuration object.
86 """
88 def __init__(self, config: ApdbCassandraConfig):
89 self._config = config
90 self._cluster: Cluster | None = None
91 self._session: Session | None = None
93 def __del__(self) -> None:
94 # Need to call Cluster.shutdown() to avoid warnings.
95 if hasattr(self, "_cluster"):
96 if self._cluster:
97 self._cluster.shutdown()
99 def session(self) -> Session:
100 """Return Cassandra Session, making new connection if necessary.
102 Returns
103 -------
104 session : `cassandra.cluster.Sesion`
105 Cassandra session object.
106 """
107 if self._session is None:
108 self._cluster, self._session = self._make_session()
109 return self._session
111 def _make_session(self) -> tuple[Cluster, Session]:
112 """Make Cassandra session.
114 Returns
115 -------
116 cluster : `cassandra.cluster.Cluster`
117 Cassandra Cluster object
118 session : `cassandra.cluster.Session`
119 Cassandra session object
120 """
121 addressTranslator: AddressTranslator | None = None
122 if self._config.connection_config.private_ips:
123 addressTranslator = _AddressTranslator(
124 self._config.contact_points, self._config.connection_config.private_ips
125 )
127 extra_parameters = {
128 "idle_heartbeat_interval": 0,
129 "idle_heartbeat_timeout": 30,
130 "control_connection_timeout": 100,
131 "executor_threads": 10,
132 }
133 extra_parameters.update(self._config.connection_config.extra_parameters)
134 with Timer("cluster_connect", _MON):
135 cluster = Cluster(
136 execution_profiles=self._make_profiles(),
137 contact_points=self._config.contact_points,
138 port=self._config.connection_config.port,
139 address_translator=addressTranslator,
140 protocol_version=self._config.connection_config.protocol_version,
141 auth_provider=self._make_auth_provider(),
142 **extra_parameters,
143 )
144 session = cluster.connect()
146 # Dump queries if debug level is enabled.
147 if _LOG.isEnabledFor(logging.DEBUG):
148 session.add_request_init_listener(_dump_query)
150 # Disable result paging
151 session.default_fetch_size = None
153 return cluster, session
155 def _make_auth_provider(self) -> AuthProvider | None:
156 """Make Cassandra authentication provider instance."""
157 try:
158 dbauth = DbAuth()
159 except DbAuthNotFoundError:
160 # Credentials file doesn't exist, use anonymous login.
161 return None
163 empty_username = True
164 # Try every contact point in turn.
165 for hostname in self._config.contact_points:
166 try:
167 username, password = dbauth.getAuth(
168 "cassandra",
169 self._config.connection_config.username,
170 hostname,
171 self._config.connection_config.port,
172 self._config.keyspace,
173 )
174 if not username:
175 # Password without user name, try next hostname, but give
176 # warning later if no better match is found.
177 empty_username = True
178 else:
179 return PlainTextAuthProvider(username=username, password=password)
180 except DbAuthNotFoundError:
181 pass
183 if empty_username:
184 _LOG.warning(
185 f"Credentials file ({dbauth.db_auth_path}) provided password but not "
186 "user name, anonymous Cassandra logon will be attempted."
187 )
189 return None
191 def _make_profiles(self) -> Mapping[Any, ExecutionProfile]:
192 """Make all execution profiles used in the code."""
193 config = self._config
194 if config.connection_config.private_ips:
195 loadBalancePolicy = WhiteListRoundRobinPolicy(hosts=config.contact_points)
196 else:
197 loadBalancePolicy = RoundRobinPolicy()
199 read_tuples_profile = ExecutionProfile(
200 consistency_level=getattr(cassandra.ConsistencyLevel, config.connection_config.read_consistency),
201 request_timeout=config.connection_config.read_timeout,
202 row_factory=cassandra.query.tuple_factory,
203 load_balancing_policy=loadBalancePolicy,
204 )
205 read_named_tuples_profile = ExecutionProfile(
206 consistency_level=getattr(cassandra.ConsistencyLevel, config.connection_config.read_consistency),
207 request_timeout=config.connection_config.read_timeout,
208 row_factory=cassandra.query.named_tuple_factory,
209 load_balancing_policy=loadBalancePolicy,
210 )
211 read_raw_profile = ExecutionProfile(
212 consistency_level=getattr(cassandra.ConsistencyLevel, config.connection_config.read_consistency),
213 request_timeout=config.connection_config.read_timeout,
214 row_factory=raw_data_factory,
215 load_balancing_policy=loadBalancePolicy,
216 )
217 # Profile to use with select_concurrent to return raw data (columns and
218 # rows)
219 read_raw_multi_profile = ExecutionProfile(
220 consistency_level=getattr(cassandra.ConsistencyLevel, config.connection_config.read_consistency),
221 request_timeout=config.connection_config.read_timeout,
222 row_factory=raw_data_factory,
223 load_balancing_policy=loadBalancePolicy,
224 )
225 # Profile to use with select_concurrent to return raw data,
226 # this also has very long timeout, to be be use for querying
227 # DiaObjectDedup table that can return a lot of data.
228 read_raw_multi_dedup_profile = ExecutionProfile(
229 consistency_level=getattr(cassandra.ConsistencyLevel, config.connection_config.read_consistency),
230 request_timeout=3600.0,
231 row_factory=raw_data_factory,
232 load_balancing_policy=loadBalancePolicy,
233 )
234 write_profile = ExecutionProfile(
235 consistency_level=getattr(cassandra.ConsistencyLevel, config.connection_config.write_consistency),
236 request_timeout=config.connection_config.write_timeout,
237 load_balancing_policy=loadBalancePolicy,
238 )
239 # To replace default DCAwareRoundRobinPolicy
240 default_profile = ExecutionProfile(
241 load_balancing_policy=loadBalancePolicy,
242 )
243 return {
244 "read_tuples": read_tuples_profile,
245 "read_named_tuples": read_named_tuples_profile,
246 "read_raw": read_raw_profile,
247 "read_raw_multi": read_raw_multi_profile,
248 "read_raw_multi_dedup": read_raw_multi_dedup_profile,
249 "write": write_profile,
250 EXEC_PROFILE_DEFAULT: default_profile,
251 }
254class SessionContext(ExitStack):
255 """Context manager for creating short-lived Cassandra sessions.
257 Parameters
258 ----------
259 config : `ApdbCassandraConfig`
260 Configuration object.
261 """
263 def __init__(self, config: ApdbCassandraConfig):
264 super().__init__()
265 self._session_factory = SessionFactory(config)
267 def __enter__(self) -> Session:
268 super().__enter__()
269 cluster, session = self._session_factory._make_session()
270 self.enter_context(cluster)
271 self.enter_context(session)
272 return session