Coverage for python / lsst / dax / apdb / cassandra / connectionContext.py: 42%

95 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-25 08:20 +0000

1# This file is part of dax_apdb. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22from __future__ import annotations 

23 

24__all__ = ["ConnectionContext", "DbVersions"] 

25 

26import dataclasses 

27import logging 

28from collections.abc import Mapping 

29 

30# If cassandra-driver is not there the module can still be imported 

31# but ApdbCassandra cannot be instantiated. 

32try: 

33 from cassandra.cluster import Session 

34except ImportError: 

35 pass 

36 

37from ..apdbConfigFreezer import ApdbConfigFreezer 

38from ..apdbSchema import ApdbTables 

39from ..monitor import MonAgent 

40from ..schema_model import Table 

41from ..versionTuple import IncompatibleVersionError, VersionTuple 

42from .apdbCassandraReplica import ApdbCassandraReplica 

43from .apdbCassandraSchema import ApdbCassandraSchema 

44from .apdbMetadataCassandra import ApdbMetadataCassandra 

45from .cassandra_utils import PreparedStatementCache, StatementFactory 

46from .config import ApdbCassandraConfig, ApdbCassandraTimePartitionRange 

47from .partitioner import Partitioner 

48 

49_LOG = logging.getLogger(__name__) 

50 

51_MON = MonAgent(__name__) 

52 

53 

54@dataclasses.dataclass 

55class DbVersions: 

56 """Versions defined in APDB metadata table.""" 

57 

58 schema_version: VersionTuple 

59 """Version of the schema from which database was created.""" 

60 

61 code_version: VersionTuple 

62 """Version of ApdbCassandra with which database was created.""" 

63 

64 replica_version: VersionTuple | None 

65 """Version of ApdbCassandraReplica with which database was created, None 

66 if replication was not configured. 

67 """ 

68 

69 

70class ConnectionContext: 

71 """Container for all kinds ob objects that are instantiated once the 

72 connection to Cassandra is established. 

73 

74 Parameters 

75 ---------- 

76 session : `cassandra.cluster.Sesion` 

77 Cassandra session. 

78 config : `ApdbCassandraConfig` 

79 Configuration object. 

80 table_schemas : `~collection.abc.Mapping` [`ApdbTables`, `Table`] 

81 Schema definitions for regular APDB tables. 

82 """ 

83 

84 metadataSchemaVersionKey = "version:schema" 

85 """Name of the metadata key to store schema version number.""" 

86 

87 metadataCodeVersionKey = "version:ApdbCassandra" 

88 """Name of the metadata key to store code version number.""" 

89 

90 metadataReplicaVersionKey = "version:ApdbCassandraReplica" 

91 """Name of the metadata key to store replica code version number.""" 

92 

93 metadataConfigKey = "config:apdb-cassandra.json" 

94 """Name of the metadata key to store frozen part of the configuration.""" 

95 

96 metadataDedupKey = "status:deduplication.json" 

97 """Name of the metadata key to store code version number.""" 

98 

99 frozen_parameters = ( 

100 "enable_replica", 

101 "ra_dec_columns", 

102 "replica_skips_diaobjects", 

103 "replica_sub_chunk_count", 

104 "partitioning.part_pixelization", 

105 "partitioning.part_pix_level", 

106 "partitioning.time_partition_tables", 

107 "partitioning.time_partition_days", 

108 "partitioning.num_part_dedup", 

109 ) 

110 """Names of the config parameters to be frozen in metadata table.""" 

111 

112 def __init__( 

113 self, 

114 session: Session, 

115 config: ApdbCassandraConfig, 

116 table_schemas: Mapping[ApdbTables, Table], 

117 current_versions: DbVersions, 

118 ): 

119 self.session = session 

120 

121 meta_table_name = ApdbTables.metadata.table_name(config.prefix) 

122 self.metadata = ApdbMetadataCassandra( 

123 self.session, meta_table_name, config.keyspace, "read_tuples", "write" 

124 ) 

125 

126 # Read versions stored in database. 

127 self.db_versions = self._readVersions(self.metadata) 

128 _LOG.debug("Database versions: %s", self.db_versions) 

129 

130 self._versionCheck(current_versions, self.db_versions) 

131 

132 # Read frozen config from metadata. 

133 config_json = self.metadata.get(self.metadataConfigKey) 

134 if config_json is not None: 

135 # Update config from metadata. 

136 freezer = ApdbConfigFreezer[ApdbCassandraConfig](self.frozen_parameters) 

137 self.config = freezer.update(config, config_json) 

138 else: 

139 self.config = config 

140 del config 

141 

142 # Since replica version 1.1.0 we use finer partitioning for replica 

143 # chunk tables. 

144 self.has_chunk_sub_partitions = False 

145 self.has_update_record_chunks_table = False 

146 if self.config.enable_replica: 

147 assert self.db_versions.replica_version is not None, "Replica version must be defined" 

148 self.has_chunk_sub_partitions = ApdbCassandraReplica.hasChunkSubPartitions( 

149 self.db_versions.replica_version 

150 ) 

151 self.has_update_record_chunks_table = ApdbCassandraReplica.hasUpdateRecordChunks( 

152 self.db_versions.replica_version 

153 ) 

154 

155 # Since version 1.3.0 we have metadata for time partitions. 

156 self.has_dedup_table = self.db_versions.code_version >= VersionTuple(1, 3, 0) 

157 

158 # Cache for prepared statements 

159 self.preparer = PreparedStatementCache(self.session) 

160 

161 # Statement factory, 

162 self.stmt_factory = StatementFactory(self.session, self.preparer) 

163 

164 self.partitioner = Partitioner(self.config) 

165 

166 self.schema = ApdbCassandraSchema( 

167 session=self.session, 

168 keyspace=self.config.keyspace, 

169 table_schemas=table_schemas, 

170 prefix=self.config.prefix, 

171 time_partition_tables=self.config.partitioning.time_partition_tables, 

172 enable_replica=self.config.enable_replica, 

173 replica_skips_diaobjects=self.config.replica_skips_diaobjects, 

174 has_chunk_sub_partitions=self.has_chunk_sub_partitions, 

175 ) 

176 

177 @property 

178 def time_partitions_range(self) -> ApdbCassandraTimePartitionRange | None: 

179 """Time partition range or None if instance does not use 

180 time-partitioned tables (`ApdbCassandraTimePartitionRange` or `None`). 

181 """ 

182 if not self.config.partitioning.time_partition_tables: 

183 return None 

184 

185 return ApdbCassandraTimePartitionRange.from_meta(self.metadata) 

186 

187 @classmethod 

188 def _readVersions(cls, metadata: ApdbMetadataCassandra) -> DbVersions: 

189 """Read versions of all objects from metadata.""" 

190 

191 def _get_version(key: str) -> VersionTuple: 

192 """Retrieve version number from given metadata key.""" 

193 version_str = metadata.get(key) 

194 if version_str is None: 

195 # Should not happen with existing metadata table. 

196 raise RuntimeError(f"Version key {key!r} does not exist in metadata table.") 

197 return VersionTuple.fromString(version_str) 

198 

199 db_schema_version = _get_version(cls.metadataSchemaVersionKey) 

200 db_code_version = _get_version(cls.metadataCodeVersionKey) 

201 

202 db_replica_version: VersionTuple | None = None 

203 if version_str := metadata.get(cls.metadataReplicaVersionKey): 

204 db_replica_version = VersionTuple.fromString(version_str) 

205 

206 return DbVersions( 

207 schema_version=db_schema_version, code_version=db_code_version, replica_version=db_replica_version 

208 ) 

209 

210 @classmethod 

211 def _versionCheck(cls, current_versions: DbVersions, db_versions: DbVersions) -> None: 

212 """Check schema version compatibility.""" 

213 if not current_versions.schema_version.checkCompatibility(db_versions.schema_version): 

214 raise IncompatibleVersionError( 

215 f"Configured schema version {current_versions.schema_version} " 

216 f"is not compatible with database version {db_versions.schema_version}" 

217 ) 

218 if not current_versions.code_version.checkCompatibility(db_versions.code_version): 

219 raise IncompatibleVersionError( 

220 f"Current code version {current_versions.code_version} " 

221 f"is not compatible with database version {db_versions.code_version}" 

222 ) 

223 

224 # Check replica code version only if replica is enabled. Sort of 

225 # chicken and egg problem - `enable_replica` is a part of frozen 

226 # configuration, but we cannot read frozen configuration until we 

227 # validate versions. Assume that if the replica version is present 

228 # then replication is enabled. 

229 if db_versions.replica_version is not None: 

230 assert current_versions.replica_version is not None, "Do not expect None" 

231 if not current_versions.replica_version.checkCompatibility(db_versions.replica_version): 

232 raise IncompatibleVersionError( 

233 f"Current replication code version {current_versions.replica_version} " 

234 f"is not compatible with database version {db_versions.replica_version}" 

235 )