Coverage for python / lsst / dax / apdb / cassandra / apdbCassandraReplica.py: 14%

195 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-01 08:19 +0000

1# This file is part of dax_apdb. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22from __future__ import annotations 

23 

24__all__ = ["ApdbCassandraReplica"] 

25 

26import logging 

27from collections.abc import Iterable, Mapping, Sequence 

28from typing import TYPE_CHECKING, cast 

29 

30import astropy.time 

31import felis.datamodel 

32 

33from ..apdbReplica import ApdbReplica, ApdbTableData, ReplicaChunk 

34from ..apdbSchema import ApdbTables 

35from ..apdbUpdateRecord import ApdbUpdateRecord 

36from ..monitor import MonAgent 

37from ..schema_model import ExtraDataTypes 

38from ..timer import Timer 

39from ..versionTuple import VersionTuple 

40from .apdbCassandraSchema import ExtraTables 

41from .cassandra_utils import ( 

42 ApdbCassandraTableData, 

43 execute_concurrent, 

44 select_concurrent, 

45) 

46from .queries import Column as C # noqa: N817 

47from .queries import ColumnExpr, Delete, Select 

48 

49if TYPE_CHECKING: 

50 from .apdbCassandra import ApdbCassandra 

51 

52_LOG = logging.getLogger(__name__) 

53 

54_MON = MonAgent(__name__) 

55 

56VERSION = VersionTuple(1, 1, 1) 

57"""Version for the code controlling replication tables. This needs to be 

58updated following compatibility rules when schema produced by this code 

59changes. 

60""" 

61 

62 

63class ApdbCassandraReplica(ApdbReplica): 

64 """Implementation of `ApdbReplica` for Cassandra backend. 

65 

66 Parameters 

67 ---------- 

68 apdb : `ApdbCassandra` 

69 Instance of ApbdCassandra for database. 

70 """ 

71 

72 def __init__(self, apdb: ApdbCassandra): 

73 # Note that ApdbCassandra instance must stay alive while this object 

74 # exists, so we keep reference to it. 

75 self._apdb = apdb 

76 

77 def _timer(self, name: str, *, tags: Mapping[str, str | int] | None = None) -> Timer: 

78 """Create `Timer` instance given its name.""" 

79 return Timer(name, _MON, tags=tags) 

80 

81 def schemaVersion(self) -> VersionTuple: 

82 # Docstring inherited from base class. 

83 context = self._apdb._context 

84 return context.db_versions.schema_version 

85 

86 @classmethod 

87 def apdbReplicaImplementationVersion(cls) -> VersionTuple: 

88 # Docstring inherited from base class. 

89 return VERSION 

90 

91 @classmethod 

92 def hasChunkSubPartitions(cls, version: VersionTuple) -> bool: 

93 """Return True if replica chunk tables have sub-partitions.""" 

94 return version >= VersionTuple(1, 1, 0) 

95 

96 @classmethod 

97 def hasUpdateRecordChunks(cls, version: VersionTuple) -> bool: 

98 """Return True if ApdbUpdateRecordChunks should exists.""" 

99 return version >= VersionTuple(1, 1, 1) 

100 

101 def getReplicaChunks(self) -> list[ReplicaChunk] | None: 

102 # docstring is inherited from a base class 

103 context = self._apdb._context 

104 config = context.config 

105 

106 if not context.schema.replication_enabled: 

107 return None 

108 

109 # everything goes into a single partition 

110 partition = 0 

111 

112 table_name = context.schema.tableName(ExtraTables.ApdbReplicaChunks) 

113 # We want to avoid timezone mess so return timestamps as milliseconds. 

114 columns = (ColumnExpr("toUnixTimestamp(last_update_time)"), "apdb_replica_chunk", "unique_id") 

115 query = Select(config.keyspace, table_name, columns).where(C("partition") == partition) 

116 statement, params = context.stmt_factory.with_params(query, prepare=False) 

117 

118 with self._timer("chunks_select_time") as timer: 

119 result = context.session.execute( 

120 statement, 

121 params, 

122 timeout=config.connection_config.read_timeout, 

123 execution_profile="read_tuples", 

124 ) 

125 # order by last_update_time 

126 rows = sorted(result) 

127 timer.add_values(row_count=len(rows)) 

128 return [ 

129 ReplicaChunk( 

130 id=row[1], 

131 last_update_time=astropy.time.Time(row[0] / 1000, format="unix_tai"), 

132 unique_id=row[2], 

133 ) 

134 for row in rows 

135 ] 

136 

137 def deleteReplicaChunks(self, chunks: Iterable[int]) -> None: 

138 # docstring is inherited from a base class 

139 context = self._apdb._context 

140 config = context.config 

141 

142 if not context.schema.replication_enabled: 

143 raise ValueError("APDB is not configured for replication") 

144 

145 # everything goes into a single partition 

146 partition = 0 

147 

148 # Iterable can be single pass, make everything that we need from it 

149 # in a single loop. 

150 repl_table_params = [] 

151 chunk_table_params: list[tuple] = [] 

152 for chunk in chunks: 

153 repl_table_params.append((partition, chunk)) 

154 if context.has_chunk_sub_partitions: 

155 for subchunk in range(config.replica_sub_chunk_count): 

156 chunk_table_params.append((chunk, subchunk)) 

157 else: 

158 chunk_table_params.append((chunk,)) 

159 # Anything to do att all? 

160 if not repl_table_params: 

161 return 

162 

163 table_name = context.schema.tableName(ExtraTables.ApdbReplicaChunks) 

164 query = Delete(config.keyspace, table_name) 

165 query = query.where(C("partition") == -1) 

166 query = query.where(C("apdb_replica_chunk") == -1) 

167 statement = context.stmt_factory(query) 

168 

169 queries = [(statement, param) for param in repl_table_params] 

170 with self._timer("chunks_delete_time") as timer: 

171 execute_concurrent(context.session, queries) 

172 timer.add_values(row_count=len(queries)) 

173 

174 # Also remove those chunk_ids from Dia*Chunks tables. 

175 tables = list(ExtraTables.replica_chunk_tables(context.has_chunk_sub_partitions).values()) 

176 if context.has_update_record_chunks_table: 

177 tables.append(ExtraTables.ApdbUpdateRecordChunks) 

178 for table in tables: 

179 table_name = context.schema.tableName(table) 

180 query = Delete(config.keyspace, table_name) 

181 query = query.where(C("apdb_replica_chunk") == -1) 

182 if context.has_chunk_sub_partitions: 

183 query = query.where(C("apdb_replica_subchunk") == -1) 

184 statement = context.stmt_factory(query) 

185 

186 queries = [(statement, param) for param in chunk_table_params] 

187 with self._timer("table_chunk_detele_time", tags={"table": table_name}) as timer: 

188 execute_concurrent(context.session, queries) 

189 timer.add_values(row_count=len(queries)) 

190 

191 def getTableDataChunks(self, table: ApdbTables, chunks: Iterable[int]) -> ApdbTableData: 

192 # docstring is inherited from a base class 

193 context = self._apdb._context 

194 config = context.config 

195 

196 if not context.schema.replication_enabled: 

197 raise ValueError("APDB is not configured for replication") 

198 if table not in ExtraTables.replica_chunk_tables(False): 

199 raise ValueError(f"Table {table} does not support replica chunks.") 

200 

201 # We need to iterate few times. 

202 chunks = list(chunks) 

203 

204 # If schema was migrated then a chunk can appear in either old or new 

205 # chunk table (e.g. DiaObjectChunks or DiaObjectChunks2). Chunk table 

206 # has a column which will be set to true for new table. 

207 has_chunk_sub_partitions: dict[int, bool] = {} 

208 if context.has_chunk_sub_partitions: 

209 table_name = context.schema.tableName(ExtraTables.ApdbReplicaChunks) 

210 partition = 0 

211 query = Select(config.keyspace, table_name, ("apdb_replica_chunk", "has_subchunks")) 

212 query = query.where(C("partition") == partition) 

213 query = query.where(C("apdb_replica_chunk").in_(chunks)) 

214 stmt, params = context.stmt_factory.with_params(query, prepare=False) 

215 result = context.session.execute( 

216 stmt, 

217 params, 

218 timeout=config.connection_config.read_timeout, 

219 execution_profile="read_tuples", 

220 ) 

221 has_chunk_sub_partitions = dict(result) 

222 else: 

223 has_chunk_sub_partitions = dict.fromkeys(chunks, False) 

224 

225 # Check what kind of tables we want to query, if chunk list is empty 

226 # then use tables which should exist in the schema. 

227 if has_chunk_sub_partitions: 

228 have_subchunks = any(has_chunk_sub_partitions.values()) 

229 have_non_subchunks = not all(has_chunk_sub_partitions.values()) 

230 else: 

231 have_subchunks = context.has_chunk_sub_partitions 

232 have_non_subchunks = not have_subchunks 

233 

234 # NOTE: if an existing database is migrated and has both types of chunk 

235 # tables (e.g. DiaObjectChunks and DiaObjectChunks2) it is possible 

236 # that the same chunk can appear in both tables. In reality schema 

237 # migration should only happen during the downtime, so there will be 

238 # suffient gap and a different chunk ID will be used for new chunks. 

239 

240 table_data: ApdbCassandraTableData | None = None 

241 table_data_subchunk: ApdbCassandraTableData | None = None 

242 

243 table_name = context.schema.tableName(ExtraTables.replica_chunk_tables(False)[table]) 

244 with self._timer("table_chunk_select_time", tags={"table": table_name}) as timer: 

245 if have_subchunks: 

246 replica_table = ExtraTables.replica_chunk_tables(True)[table] 

247 table_name = context.schema.tableName(replica_table) 

248 query = Select(config.keyspace, table_name, ["*"]) 

249 query = query.where(C("apdb_replica_chunk") == 0) 

250 query = query.where(C("apdb_replica_subchunk") == 0) 

251 statement = context.stmt_factory(query, prepare=True) 

252 

253 queries: list[tuple] = [] 

254 for chunk in chunks: 

255 if has_chunk_sub_partitions.get(chunk, False): 

256 for subchunk in range(config.replica_sub_chunk_count): 

257 queries.append((statement, (chunk, subchunk))) 

258 if not queries and not have_non_subchunks: 

259 # Add a dummy query to return correct set of columns. 

260 queries.append((statement, (-1, -1))) 

261 

262 if queries: 

263 table_data_subchunk = cast( 

264 ApdbCassandraTableData, 

265 select_concurrent( 

266 context.session, 

267 queries, 

268 "read_raw_multi", 

269 config.connection_config.read_concurrency, 

270 ), 

271 ) 

272 

273 if have_non_subchunks: 

274 replica_table = ExtraTables.replica_chunk_tables(False)[table] 

275 table_name = context.schema.tableName(replica_table) 

276 query = Select(config.keyspace, table_name, ["*"]).where(C("apdb_replica_chunk") == 0) 

277 statement = context.stmt_factory(query, prepare=True) 

278 

279 queries = [] 

280 for chunk in chunks: 

281 if not has_chunk_sub_partitions.get(chunk, True): 

282 queries.append((statement, (chunk,))) 

283 if not queries and not table_data_subchunk: 

284 # Add a dummy query to return correct set of columns. 

285 queries.append((statement, (-1,))) 

286 

287 if queries: 

288 table_data = cast( 

289 ApdbCassandraTableData, 

290 select_concurrent( 

291 context.session, 

292 queries, 

293 "read_raw_multi", 

294 config.connection_config.read_concurrency, 

295 ), 

296 ) 

297 

298 # Merge if both are non-empty. 

299 if table_data and table_data_subchunk: 

300 table_data_subchunk.project(drop=["apdb_replica_subchunk"]) 

301 table_data.append(table_data_subchunk) 

302 elif table_data_subchunk: 

303 table_data = table_data_subchunk 

304 elif not table_data: 

305 raise AssertionError("above logic is incorrect") 

306 

307 timer.add_values(row_count=len(table_data.rows())) 

308 

309 table_schema = self._apdb._schema.tableSchemas[table] 

310 # Regular tables should never have columns of ExtraDataTypes, this 

311 # is just to make mypy happy. 

312 column_types = { 

313 column.name: column.datatype 

314 for column in table_schema.columns 

315 if not isinstance(column.datatype, ExtraDataTypes) 

316 } 

317 column_types["apdb_replica_chunk"] = felis.datamodel.DataType.long 

318 # It may also have subchunk column, we do not always drop it, and 

319 # clients should not need it, but we need to provide type for it. 

320 column_types["apdb_replica_subchunk"] = felis.datamodel.DataType.int 

321 table_data.set_column_types(column_types) 

322 

323 return table_data 

324 

325 def getUpdateRecordChunks(self, chunks: Iterable[int]) -> Sequence[ApdbUpdateRecord]: 

326 # docstring is inherited from a base class 

327 context = self._apdb._context 

328 config = context.config 

329 

330 if not context.schema.replication_enabled: 

331 raise ValueError("APDB is not configured for replication") 

332 

333 if not context.has_update_record_chunks_table: 

334 # Table does not exist yet. 

335 return [] 

336 

337 table_name = context.schema.tableName(ExtraTables.ApdbUpdateRecordChunks) 

338 

339 records = [] 

340 if context.has_chunk_sub_partitions: 

341 subchunks = list(range(config.replica_sub_chunk_count)) 

342 query = Select(config.keyspace, table_name, ["*"]) 

343 query = query.where(C("apdb_replica_chunk") == 0) 

344 query = query.where(C("apdb_replica_subchunk").in_(subchunks)) 

345 statement = context.stmt_factory(query, prepare=False) 

346 

347 with self._timer("select_update_record_time", tags={"table": table_name}) as timer: 

348 for chunk in chunks: 

349 result = context.session.execute(statement, [chunk] + subchunks) 

350 for row in result: 

351 records.append( 

352 ApdbUpdateRecord.from_json( 

353 row.update_time_ns, row.update_order, row.update_payload 

354 ) 

355 ) 

356 timer.add_values(row_count=len(records)) 

357 

358 else: 

359 query = Select(config.keyspace, table_name, ["*"]) 

360 query = query.where(C("apdb_replica_chunk").in_(chunks)) 

361 statement, params = context.stmt_factory(query, prepare=False) 

362 

363 with self._timer("select_update_record_time", tags={"table": table_name}) as timer: 

364 result = context.session.execute(statement, params) 

365 for row in result: 

366 records.append( 

367 ApdbUpdateRecord.from_json(row.update_time_ns, row.update_order, row.update_payload) 

368 ) 

369 timer.add_values(row_count=len(records)) 

370 

371 records.sort() 

372 return records