Coverage for python / lsst / dax / apdb / cassandra / connectionContext.py: 40%

91 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:46 +0000

1# This file is part of dax_apdb. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22from __future__ import annotations 

23 

24__all__ = ["ConnectionContext", "DbVersions"] 

25 

26import dataclasses 

27import logging 

28from collections.abc import Mapping 

29 

30# If cassandra-driver is not there the module can still be imported 

31# but ApdbCassandra cannot be instantiated. 

32try: 

33 from cassandra.cluster import Session 

34except ImportError: 

35 pass 

36 

37from ..apdbConfigFreezer import ApdbConfigFreezer 

38from ..apdbSchema import ApdbTables 

39from ..monitor import MonAgent 

40from ..schema_model import Table 

41from ..versionTuple import VersionTuple 

42from .apdbCassandraReplica import ApdbCassandraReplica 

43from .apdbCassandraSchema import ApdbCassandraSchema 

44from .apdbMetadataCassandra import ApdbMetadataCassandra 

45from .cassandra_utils import PreparedStatementCache 

46from .config import ApdbCassandraConfig, ApdbCassandraTimePartitionRange 

47from .partitioner import Partitioner 

48 

49_LOG = logging.getLogger(__name__) 

50 

51_MON = MonAgent(__name__) 

52 

53 

54@dataclasses.dataclass 

55class DbVersions: 

56 """Versions defined in APDB metadata table.""" 

57 

58 schema_version: VersionTuple 

59 """Version of the schema from which database was created.""" 

60 

61 code_version: VersionTuple 

62 """Version of ApdbCassandra with which database was created.""" 

63 

64 replica_version: VersionTuple | None 

65 """Version of ApdbCassandraReplica with which database was created, None 

66 if replication was not configured. 

67 """ 

68 

69 

70class ConnectionContext: 

71 """Container for all kinds ob objects that are instantiated once the 

72 connection to Cassandra is established. 

73 

74 Parameters 

75 ---------- 

76 session : `cassandra.cluster.Sesion` 

77 Cassandra session. 

78 config : `ApdbCassandraConfig` 

79 Configuration object. 

80 table_schemas : `~collection.abc.Mapping` [`ApdbTables`, `Table`] 

81 Schema definitions for regular APDB tables. 

82 """ 

83 

84 metadataSchemaVersionKey = "version:schema" 

85 """Name of the metadata key to store schema version number.""" 

86 

87 metadataCodeVersionKey = "version:ApdbCassandra" 

88 """Name of the metadata key to store code version number.""" 

89 

90 metadataReplicaVersionKey = "version:ApdbCassandraReplica" 

91 """Name of the metadata key to store replica code version number.""" 

92 

93 metadataConfigKey = "config:apdb-cassandra.json" 

94 """Name of the metadata key to store frozen part of the configuration.""" 

95 

96 frozen_parameters = ( 

97 "enable_replica", 

98 "ra_dec_columns", 

99 "replica_skips_diaobjects", 

100 "replica_sub_chunk_count", 

101 "partitioning.part_pixelization", 

102 "partitioning.part_pix_level", 

103 "partitioning.time_partition_tables", 

104 "partitioning.time_partition_days", 

105 ) 

106 """Names of the config parameters to be frozen in metadata table.""" 

107 

108 def __init__( 

109 self, session: Session, config: ApdbCassandraConfig, table_schemas: Mapping[ApdbTables, Table] 

110 ): 

111 self.session = session 

112 

113 meta_table_name = ApdbTables.metadata.table_name(config.prefix) 

114 self.metadata = ApdbMetadataCassandra( 

115 self.session, meta_table_name, config.keyspace, "read_tuples", "write" 

116 ) 

117 

118 # Read frozen config from metadata. 

119 config_json = self.metadata.get(self.metadataConfigKey) 

120 if config_json is not None: 

121 # Update config from metadata. 

122 freezer = ApdbConfigFreezer[ApdbCassandraConfig](self.frozen_parameters) 

123 self.config = freezer.update(config, config_json) 

124 else: 

125 self.config = config 

126 del config 

127 

128 # Read versions stored in database. 

129 self.db_versions = self._readVersions(self.metadata) 

130 _LOG.debug("Database versions: %s", self.db_versions) 

131 

132 # Since replica version 1.1.0 we use finer partitioning for replica 

133 # chunk tables. 

134 self.has_chunk_sub_partitions = False 

135 self.has_update_record_chunks_table = False 

136 if self.config.enable_replica: 

137 assert self.db_versions.replica_version is not None, "Replica version must be defined" 

138 self.has_chunk_sub_partitions = ApdbCassandraReplica.hasChunkSubPartitions( 

139 self.db_versions.replica_version 

140 ) 

141 self.has_update_record_chunks_table = ApdbCassandraReplica.hasUpdateRecordChunks( 

142 self.db_versions.replica_version 

143 ) 

144 

145 # Since version 0.1.3 we have metadata for time partitions. 

146 self.has_time_partition_meta = self.db_versions.code_version >= VersionTuple(0, 1, 3) 

147 

148 # Since version 0.1.2 we have an extra table for visit/detector. 

149 self.has_visit_detector_table = self.db_versions.code_version >= VersionTuple(0, 1, 2) 

150 

151 # Support for DiaObjectLastToPartition was added at code version 0.1.1 

152 # in a backward-compatible way (we only use the table if it is there). 

153 self.has_dia_object_last_to_partition = self.db_versions.code_version >= VersionTuple(0, 1, 1) 

154 

155 # Cache for prepared statements 

156 self.preparer = PreparedStatementCache(self.session) 

157 

158 self.partitioner = Partitioner(self.config) 

159 

160 self.schema = ApdbCassandraSchema( 

161 session=self.session, 

162 keyspace=self.config.keyspace, 

163 table_schemas=table_schemas, 

164 prefix=self.config.prefix, 

165 time_partition_tables=self.config.partitioning.time_partition_tables, 

166 enable_replica=self.config.enable_replica, 

167 replica_skips_diaobjects=self.config.replica_skips_diaobjects, 

168 has_chunk_sub_partitions=self.has_chunk_sub_partitions, 

169 has_visit_detector_table=self.has_visit_detector_table, 

170 ) 

171 

172 @property 

173 def time_partitions_range(self) -> ApdbCassandraTimePartitionRange | None: 

174 """Time partition range or None if instance does not use 

175 time-partitioned tables (`ApdbCassandraTimePartitionRange` or `None`). 

176 """ 

177 if not self.config.partitioning.time_partition_tables: 

178 return None 

179 

180 if self.has_time_partition_meta: 

181 return ApdbCassandraTimePartitionRange.from_meta(self.metadata) 

182 else: 

183 # Scan DiaSource tables and see which partitions are present. 

184 partitions = set() 

185 tables = self.schema.existing_tables(ApdbTables.DiaSource) 

186 for table_name in tables[ApdbTables.DiaSource]: 

187 _, _, part_str = table_name.rpartition("_") 

188 partitions.add(int(part_str)) 

189 if not partitions: 

190 raise LookupError("Failed to find any partitioned DiaSource table.") 

191 return ApdbCassandraTimePartitionRange(start=min(partitions), end=max(partitions)) 

192 

193 def _readVersions(self, metadata: ApdbMetadataCassandra) -> DbVersions: 

194 """Read versions of all objects from metadata.""" 

195 

196 def _get_version(key: str) -> VersionTuple: 

197 """Retrieve version number from given metadata key.""" 

198 version_str = metadata.get(key) 

199 if version_str is None: 

200 # Should not happen with existing metadata table. 

201 raise RuntimeError(f"Version key {key!r} does not exist in metadata table.") 

202 return VersionTuple.fromString(version_str) 

203 

204 db_schema_version = _get_version(self.metadataSchemaVersionKey) 

205 db_code_version = _get_version(self.metadataCodeVersionKey) 

206 

207 # Check replica code version only if replica is enabled. 

208 db_replica_version: VersionTuple | None = None 

209 if self.config.enable_replica: 

210 db_replica_version = _get_version(self.metadataReplicaVersionKey) 

211 

212 return DbVersions( 

213 schema_version=db_schema_version, code_version=db_code_version, replica_version=db_replica_version 

214 )