Coverage for python / lsst / dax / apdb / cassandra / connectionContext.py: 42%
95 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:19 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:19 +0000
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
22from __future__ import annotations
24__all__ = ["ConnectionContext", "DbVersions"]
26import dataclasses
27import logging
28from collections.abc import Mapping
30# If cassandra-driver is not there the module can still be imported
31# but ApdbCassandra cannot be instantiated.
32try:
33 from cassandra.cluster import Session
34except ImportError:
35 pass
37from ..apdbConfigFreezer import ApdbConfigFreezer
38from ..apdbSchema import ApdbTables
39from ..monitor import MonAgent
40from ..schema_model import Table
41from ..versionTuple import IncompatibleVersionError, VersionTuple
42from .apdbCassandraReplica import ApdbCassandraReplica
43from .apdbCassandraSchema import ApdbCassandraSchema
44from .apdbMetadataCassandra import ApdbMetadataCassandra
45from .cassandra_utils import PreparedStatementCache, StatementFactory
46from .config import ApdbCassandraConfig, ApdbCassandraTimePartitionRange
47from .partitioner import Partitioner
49_LOG = logging.getLogger(__name__)
51_MON = MonAgent(__name__)
54@dataclasses.dataclass
55class DbVersions:
56 """Versions defined in APDB metadata table."""
58 schema_version: VersionTuple
59 """Version of the schema from which database was created."""
61 code_version: VersionTuple
62 """Version of ApdbCassandra with which database was created."""
64 replica_version: VersionTuple | None
65 """Version of ApdbCassandraReplica with which database was created, None
66 if replication was not configured.
67 """
70class ConnectionContext:
71 """Container for all kinds ob objects that are instantiated once the
72 connection to Cassandra is established.
74 Parameters
75 ----------
76 session : `cassandra.cluster.Sesion`
77 Cassandra session.
78 config : `ApdbCassandraConfig`
79 Configuration object.
80 table_schemas : `~collection.abc.Mapping` [`ApdbTables`, `Table`]
81 Schema definitions for regular APDB tables.
82 """
84 metadataSchemaVersionKey = "version:schema"
85 """Name of the metadata key to store schema version number."""
87 metadataCodeVersionKey = "version:ApdbCassandra"
88 """Name of the metadata key to store code version number."""
90 metadataReplicaVersionKey = "version:ApdbCassandraReplica"
91 """Name of the metadata key to store replica code version number."""
93 metadataConfigKey = "config:apdb-cassandra.json"
94 """Name of the metadata key to store frozen part of the configuration."""
96 metadataDedupKey = "status:deduplication.json"
97 """Name of the metadata key to store code version number."""
99 frozen_parameters = (
100 "enable_replica",
101 "ra_dec_columns",
102 "replica_skips_diaobjects",
103 "replica_sub_chunk_count",
104 "partitioning.part_pixelization",
105 "partitioning.part_pix_level",
106 "partitioning.time_partition_tables",
107 "partitioning.time_partition_days",
108 "partitioning.num_part_dedup",
109 )
110 """Names of the config parameters to be frozen in metadata table."""
112 def __init__(
113 self,
114 session: Session,
115 config: ApdbCassandraConfig,
116 table_schemas: Mapping[ApdbTables, Table],
117 current_versions: DbVersions,
118 ):
119 self.session = session
121 meta_table_name = ApdbTables.metadata.table_name(config.prefix)
122 self.metadata = ApdbMetadataCassandra(
123 self.session, meta_table_name, config.keyspace, "read_tuples", "write"
124 )
126 # Read versions stored in database.
127 self.db_versions = self._readVersions(self.metadata)
128 _LOG.debug("Database versions: %s", self.db_versions)
130 self._versionCheck(current_versions, self.db_versions)
132 # Read frozen config from metadata.
133 config_json = self.metadata.get(self.metadataConfigKey)
134 if config_json is not None:
135 # Update config from metadata.
136 freezer = ApdbConfigFreezer[ApdbCassandraConfig](self.frozen_parameters)
137 self.config = freezer.update(config, config_json)
138 else:
139 self.config = config
140 del config
142 # Since replica version 1.1.0 we use finer partitioning for replica
143 # chunk tables.
144 self.has_chunk_sub_partitions = False
145 self.has_update_record_chunks_table = False
146 if self.config.enable_replica:
147 assert self.db_versions.replica_version is not None, "Replica version must be defined"
148 self.has_chunk_sub_partitions = ApdbCassandraReplica.hasChunkSubPartitions(
149 self.db_versions.replica_version
150 )
151 self.has_update_record_chunks_table = ApdbCassandraReplica.hasUpdateRecordChunks(
152 self.db_versions.replica_version
153 )
155 # Since version 1.3.0 we have metadata for time partitions.
156 self.has_dedup_table = self.db_versions.code_version >= VersionTuple(1, 3, 0)
158 # Cache for prepared statements
159 self.preparer = PreparedStatementCache(self.session)
161 # Statement factory,
162 self.stmt_factory = StatementFactory(self.session, self.preparer)
164 self.partitioner = Partitioner(self.config)
166 self.schema = ApdbCassandraSchema(
167 session=self.session,
168 keyspace=self.config.keyspace,
169 table_schemas=table_schemas,
170 prefix=self.config.prefix,
171 time_partition_tables=self.config.partitioning.time_partition_tables,
172 enable_replica=self.config.enable_replica,
173 replica_skips_diaobjects=self.config.replica_skips_diaobjects,
174 has_chunk_sub_partitions=self.has_chunk_sub_partitions,
175 )
177 @property
178 def time_partitions_range(self) -> ApdbCassandraTimePartitionRange | None:
179 """Time partition range or None if instance does not use
180 time-partitioned tables (`ApdbCassandraTimePartitionRange` or `None`).
181 """
182 if not self.config.partitioning.time_partition_tables:
183 return None
185 return ApdbCassandraTimePartitionRange.from_meta(self.metadata)
187 @classmethod
188 def _readVersions(cls, metadata: ApdbMetadataCassandra) -> DbVersions:
189 """Read versions of all objects from metadata."""
191 def _get_version(key: str) -> VersionTuple:
192 """Retrieve version number from given metadata key."""
193 version_str = metadata.get(key)
194 if version_str is None:
195 # Should not happen with existing metadata table.
196 raise RuntimeError(f"Version key {key!r} does not exist in metadata table.")
197 return VersionTuple.fromString(version_str)
199 db_schema_version = _get_version(cls.metadataSchemaVersionKey)
200 db_code_version = _get_version(cls.metadataCodeVersionKey)
202 db_replica_version: VersionTuple | None = None
203 if version_str := metadata.get(cls.metadataReplicaVersionKey):
204 db_replica_version = VersionTuple.fromString(version_str)
206 return DbVersions(
207 schema_version=db_schema_version, code_version=db_code_version, replica_version=db_replica_version
208 )
210 @classmethod
211 def _versionCheck(cls, current_versions: DbVersions, db_versions: DbVersions) -> None:
212 """Check schema version compatibility."""
213 if not current_versions.schema_version.checkCompatibility(db_versions.schema_version):
214 raise IncompatibleVersionError(
215 f"Configured schema version {current_versions.schema_version} "
216 f"is not compatible with database version {db_versions.schema_version}"
217 )
218 if not current_versions.code_version.checkCompatibility(db_versions.code_version):
219 raise IncompatibleVersionError(
220 f"Current code version {current_versions.code_version} "
221 f"is not compatible with database version {db_versions.code_version}"
222 )
224 # Check replica code version only if replica is enabled. Sort of
225 # chicken and egg problem - `enable_replica` is a part of frozen
226 # configuration, but we cannot read frozen configuration until we
227 # validate versions. Assume that if the replica version is present
228 # then replication is enabled.
229 if db_versions.replica_version is not None:
230 assert current_versions.replica_version is not None, "Do not expect None"
231 if not current_versions.replica_version.checkCompatibility(db_versions.replica_version):
232 raise IncompatibleVersionError(
233 f"Current replication code version {current_versions.replica_version} "
234 f"is not compatible with database version {db_versions.replica_version}"
235 )