Coverage for python / lsst / dax / apdb / cassandra / connectionContext.py: 40%
91 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:46 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:46 +0000
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
22from __future__ import annotations
24__all__ = ["ConnectionContext", "DbVersions"]
26import dataclasses
27import logging
28from collections.abc import Mapping
30# If cassandra-driver is not there the module can still be imported
31# but ApdbCassandra cannot be instantiated.
32try:
33 from cassandra.cluster import Session
34except ImportError:
35 pass
37from ..apdbConfigFreezer import ApdbConfigFreezer
38from ..apdbSchema import ApdbTables
39from ..monitor import MonAgent
40from ..schema_model import Table
41from ..versionTuple import VersionTuple
42from .apdbCassandraReplica import ApdbCassandraReplica
43from .apdbCassandraSchema import ApdbCassandraSchema
44from .apdbMetadataCassandra import ApdbMetadataCassandra
45from .cassandra_utils import PreparedStatementCache
46from .config import ApdbCassandraConfig, ApdbCassandraTimePartitionRange
47from .partitioner import Partitioner
49_LOG = logging.getLogger(__name__)
51_MON = MonAgent(__name__)
54@dataclasses.dataclass
55class DbVersions:
56 """Versions defined in APDB metadata table."""
58 schema_version: VersionTuple
59 """Version of the schema from which database was created."""
61 code_version: VersionTuple
62 """Version of ApdbCassandra with which database was created."""
64 replica_version: VersionTuple | None
65 """Version of ApdbCassandraReplica with which database was created, None
66 if replication was not configured.
67 """
70class ConnectionContext:
71 """Container for all kinds ob objects that are instantiated once the
72 connection to Cassandra is established.
74 Parameters
75 ----------
76 session : `cassandra.cluster.Sesion`
77 Cassandra session.
78 config : `ApdbCassandraConfig`
79 Configuration object.
80 table_schemas : `~collection.abc.Mapping` [`ApdbTables`, `Table`]
81 Schema definitions for regular APDB tables.
82 """
84 metadataSchemaVersionKey = "version:schema"
85 """Name of the metadata key to store schema version number."""
87 metadataCodeVersionKey = "version:ApdbCassandra"
88 """Name of the metadata key to store code version number."""
90 metadataReplicaVersionKey = "version:ApdbCassandraReplica"
91 """Name of the metadata key to store replica code version number."""
93 metadataConfigKey = "config:apdb-cassandra.json"
94 """Name of the metadata key to store frozen part of the configuration."""
96 frozen_parameters = (
97 "enable_replica",
98 "ra_dec_columns",
99 "replica_skips_diaobjects",
100 "replica_sub_chunk_count",
101 "partitioning.part_pixelization",
102 "partitioning.part_pix_level",
103 "partitioning.time_partition_tables",
104 "partitioning.time_partition_days",
105 )
106 """Names of the config parameters to be frozen in metadata table."""
108 def __init__(
109 self, session: Session, config: ApdbCassandraConfig, table_schemas: Mapping[ApdbTables, Table]
110 ):
111 self.session = session
113 meta_table_name = ApdbTables.metadata.table_name(config.prefix)
114 self.metadata = ApdbMetadataCassandra(
115 self.session, meta_table_name, config.keyspace, "read_tuples", "write"
116 )
118 # Read frozen config from metadata.
119 config_json = self.metadata.get(self.metadataConfigKey)
120 if config_json is not None:
121 # Update config from metadata.
122 freezer = ApdbConfigFreezer[ApdbCassandraConfig](self.frozen_parameters)
123 self.config = freezer.update(config, config_json)
124 else:
125 self.config = config
126 del config
128 # Read versions stored in database.
129 self.db_versions = self._readVersions(self.metadata)
130 _LOG.debug("Database versions: %s", self.db_versions)
132 # Since replica version 1.1.0 we use finer partitioning for replica
133 # chunk tables.
134 self.has_chunk_sub_partitions = False
135 self.has_update_record_chunks_table = False
136 if self.config.enable_replica:
137 assert self.db_versions.replica_version is not None, "Replica version must be defined"
138 self.has_chunk_sub_partitions = ApdbCassandraReplica.hasChunkSubPartitions(
139 self.db_versions.replica_version
140 )
141 self.has_update_record_chunks_table = ApdbCassandraReplica.hasUpdateRecordChunks(
142 self.db_versions.replica_version
143 )
145 # Since version 0.1.3 we have metadata for time partitions.
146 self.has_time_partition_meta = self.db_versions.code_version >= VersionTuple(0, 1, 3)
148 # Since version 0.1.2 we have an extra table for visit/detector.
149 self.has_visit_detector_table = self.db_versions.code_version >= VersionTuple(0, 1, 2)
151 # Support for DiaObjectLastToPartition was added at code version 0.1.1
152 # in a backward-compatible way (we only use the table if it is there).
153 self.has_dia_object_last_to_partition = self.db_versions.code_version >= VersionTuple(0, 1, 1)
155 # Cache for prepared statements
156 self.preparer = PreparedStatementCache(self.session)
158 self.partitioner = Partitioner(self.config)
160 self.schema = ApdbCassandraSchema(
161 session=self.session,
162 keyspace=self.config.keyspace,
163 table_schemas=table_schemas,
164 prefix=self.config.prefix,
165 time_partition_tables=self.config.partitioning.time_partition_tables,
166 enable_replica=self.config.enable_replica,
167 replica_skips_diaobjects=self.config.replica_skips_diaobjects,
168 has_chunk_sub_partitions=self.has_chunk_sub_partitions,
169 has_visit_detector_table=self.has_visit_detector_table,
170 )
172 @property
173 def time_partitions_range(self) -> ApdbCassandraTimePartitionRange | None:
174 """Time partition range or None if instance does not use
175 time-partitioned tables (`ApdbCassandraTimePartitionRange` or `None`).
176 """
177 if not self.config.partitioning.time_partition_tables:
178 return None
180 if self.has_time_partition_meta:
181 return ApdbCassandraTimePartitionRange.from_meta(self.metadata)
182 else:
183 # Scan DiaSource tables and see which partitions are present.
184 partitions = set()
185 tables = self.schema.existing_tables(ApdbTables.DiaSource)
186 for table_name in tables[ApdbTables.DiaSource]:
187 _, _, part_str = table_name.rpartition("_")
188 partitions.add(int(part_str))
189 if not partitions:
190 raise LookupError("Failed to find any partitioned DiaSource table.")
191 return ApdbCassandraTimePartitionRange(start=min(partitions), end=max(partitions))
193 def _readVersions(self, metadata: ApdbMetadataCassandra) -> DbVersions:
194 """Read versions of all objects from metadata."""
196 def _get_version(key: str) -> VersionTuple:
197 """Retrieve version number from given metadata key."""
198 version_str = metadata.get(key)
199 if version_str is None:
200 # Should not happen with existing metadata table.
201 raise RuntimeError(f"Version key {key!r} does not exist in metadata table.")
202 return VersionTuple.fromString(version_str)
204 db_schema_version = _get_version(self.metadataSchemaVersionKey)
205 db_code_version = _get_version(self.metadataCodeVersionKey)
207 # Check replica code version only if replica is enabled.
208 db_replica_version: VersionTuple | None = None
209 if self.config.enable_replica:
210 db_replica_version = _get_version(self.metadataReplicaVersionKey)
212 return DbVersions(
213 schema_version=db_schema_version, code_version=db_code_version, replica_version=db_replica_version
214 )