Coverage for python / lsst / dax / apdb / cassandra / apdbCassandraReplica.py: 14%
195 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-25 08:20 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-25 08:20 +0000
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
22from __future__ import annotations
24__all__ = ["ApdbCassandraReplica"]
26import logging
27from collections.abc import Iterable, Mapping, Sequence
28from typing import TYPE_CHECKING, cast
30import astropy.time
31import felis.datamodel
33from ..apdbReplica import ApdbReplica, ApdbTableData, ReplicaChunk
34from ..apdbSchema import ApdbTables
35from ..apdbUpdateRecord import ApdbUpdateRecord
36from ..monitor import MonAgent
37from ..schema_model import ExtraDataTypes
38from ..timer import Timer
39from ..versionTuple import VersionTuple
40from .apdbCassandraSchema import ExtraTables
41from .cassandra_utils import (
42 ApdbCassandraTableData,
43 execute_concurrent,
44 select_concurrent,
45)
46from .queries import Column as C # noqa: N817
47from .queries import ColumnExpr, Delete, Select
49if TYPE_CHECKING:
50 from .apdbCassandra import ApdbCassandra
52_LOG = logging.getLogger(__name__)
54_MON = MonAgent(__name__)
56VERSION = VersionTuple(1, 1, 1)
57"""Version for the code controlling replication tables. This needs to be
58updated following compatibility rules when schema produced by this code
59changes.
60"""
63class ApdbCassandraReplica(ApdbReplica):
64 """Implementation of `ApdbReplica` for Cassandra backend.
66 Parameters
67 ----------
68 apdb : `ApdbCassandra`
69 Instance of ApbdCassandra for database.
70 """
72 def __init__(self, apdb: ApdbCassandra):
73 # Note that ApdbCassandra instance must stay alive while this object
74 # exists, so we keep reference to it.
75 self._apdb = apdb
77 def _timer(self, name: str, *, tags: Mapping[str, str | int] | None = None) -> Timer:
78 """Create `Timer` instance given its name."""
79 return Timer(name, _MON, tags=tags)
81 def schemaVersion(self) -> VersionTuple:
82 # Docstring inherited from base class.
83 context = self._apdb._context
84 return context.db_versions.schema_version
86 @classmethod
87 def apdbReplicaImplementationVersion(cls) -> VersionTuple:
88 # Docstring inherited from base class.
89 return VERSION
91 @classmethod
92 def hasChunkSubPartitions(cls, version: VersionTuple) -> bool:
93 """Return True if replica chunk tables have sub-partitions."""
94 return version >= VersionTuple(1, 1, 0)
96 @classmethod
97 def hasUpdateRecordChunks(cls, version: VersionTuple) -> bool:
98 """Return True if ApdbUpdateRecordChunks should exists."""
99 return version >= VersionTuple(1, 1, 1)
101 def getReplicaChunks(self) -> list[ReplicaChunk] | None:
102 # docstring is inherited from a base class
103 context = self._apdb._context
104 config = context.config
106 if not context.schema.replication_enabled:
107 return None
109 # everything goes into a single partition
110 partition = 0
112 table_name = context.schema.tableName(ExtraTables.ApdbReplicaChunks)
113 # We want to avoid timezone mess so return timestamps as milliseconds.
114 columns = (ColumnExpr("toUnixTimestamp(last_update_time)"), "apdb_replica_chunk", "unique_id")
115 query = Select(config.keyspace, table_name, columns).where(C("partition") == partition)
116 statement, params = context.stmt_factory.with_params(query, prepare=False)
118 with self._timer("chunks_select_time") as timer:
119 result = context.session.execute(
120 statement,
121 params,
122 timeout=config.connection_config.read_timeout,
123 execution_profile="read_tuples",
124 )
125 # order by last_update_time
126 rows = sorted(result)
127 timer.add_values(row_count=len(rows))
128 return [
129 ReplicaChunk(
130 id=row[1],
131 last_update_time=astropy.time.Time(row[0] / 1000, format="unix_tai"),
132 unique_id=row[2],
133 )
134 for row in rows
135 ]
137 def deleteReplicaChunks(self, chunks: Iterable[int]) -> None:
138 # docstring is inherited from a base class
139 context = self._apdb._context
140 config = context.config
142 if not context.schema.replication_enabled:
143 raise ValueError("APDB is not configured for replication")
145 # everything goes into a single partition
146 partition = 0
148 # Iterable can be single pass, make everything that we need from it
149 # in a single loop.
150 repl_table_params = []
151 chunk_table_params: list[tuple] = []
152 for chunk in chunks:
153 repl_table_params.append((partition, chunk))
154 if context.has_chunk_sub_partitions:
155 for subchunk in range(config.replica_sub_chunk_count):
156 chunk_table_params.append((chunk, subchunk))
157 else:
158 chunk_table_params.append((chunk,))
159 # Anything to do att all?
160 if not repl_table_params:
161 return
163 table_name = context.schema.tableName(ExtraTables.ApdbReplicaChunks)
164 query = Delete(config.keyspace, table_name)
165 query = query.where(C("partition") == -1)
166 query = query.where(C("apdb_replica_chunk") == -1)
167 statement = context.stmt_factory(query)
169 queries = [(statement, param) for param in repl_table_params]
170 with self._timer("chunks_delete_time") as timer:
171 execute_concurrent(context.session, queries)
172 timer.add_values(row_count=len(queries))
174 # Also remove those chunk_ids from Dia*Chunks tables.
175 tables = list(ExtraTables.replica_chunk_tables(context.has_chunk_sub_partitions).values())
176 if context.has_update_record_chunks_table:
177 tables.append(ExtraTables.ApdbUpdateRecordChunks)
178 for table in tables:
179 table_name = context.schema.tableName(table)
180 query = Delete(config.keyspace, table_name)
181 query = query.where(C("apdb_replica_chunk") == -1)
182 if context.has_chunk_sub_partitions:
183 query = query.where(C("apdb_replica_subchunk") == -1)
184 statement = context.stmt_factory(query)
186 queries = [(statement, param) for param in chunk_table_params]
187 with self._timer("table_chunk_detele_time", tags={"table": table_name}) as timer:
188 execute_concurrent(context.session, queries)
189 timer.add_values(row_count=len(queries))
191 def getTableDataChunks(self, table: ApdbTables, chunks: Iterable[int]) -> ApdbTableData:
192 # docstring is inherited from a base class
193 context = self._apdb._context
194 config = context.config
196 if not context.schema.replication_enabled:
197 raise ValueError("APDB is not configured for replication")
198 if table not in ExtraTables.replica_chunk_tables(False):
199 raise ValueError(f"Table {table} does not support replica chunks.")
201 # We need to iterate few times.
202 chunks = list(chunks)
204 # If schema was migrated then a chunk can appear in either old or new
205 # chunk table (e.g. DiaObjectChunks or DiaObjectChunks2). Chunk table
206 # has a column which will be set to true for new table.
207 has_chunk_sub_partitions: dict[int, bool] = {}
208 if context.has_chunk_sub_partitions:
209 table_name = context.schema.tableName(ExtraTables.ApdbReplicaChunks)
210 partition = 0
211 query = Select(config.keyspace, table_name, ("apdb_replica_chunk", "has_subchunks"))
212 query = query.where(C("partition") == partition)
213 query = query.where(C("apdb_replica_chunk").in_(chunks))
214 stmt, params = context.stmt_factory.with_params(query, prepare=False)
215 result = context.session.execute(
216 stmt,
217 params,
218 timeout=config.connection_config.read_timeout,
219 execution_profile="read_tuples",
220 )
221 has_chunk_sub_partitions = dict(result)
222 else:
223 has_chunk_sub_partitions = dict.fromkeys(chunks, False)
225 # Check what kind of tables we want to query, if chunk list is empty
226 # then use tables which should exist in the schema.
227 if has_chunk_sub_partitions:
228 have_subchunks = any(has_chunk_sub_partitions.values())
229 have_non_subchunks = not all(has_chunk_sub_partitions.values())
230 else:
231 have_subchunks = context.has_chunk_sub_partitions
232 have_non_subchunks = not have_subchunks
234 # NOTE: if an existing database is migrated and has both types of chunk
235 # tables (e.g. DiaObjectChunks and DiaObjectChunks2) it is possible
236 # that the same chunk can appear in both tables. In reality schema
237 # migration should only happen during the downtime, so there will be
238 # suffient gap and a different chunk ID will be used for new chunks.
240 table_data: ApdbCassandraTableData | None = None
241 table_data_subchunk: ApdbCassandraTableData | None = None
243 table_name = context.schema.tableName(ExtraTables.replica_chunk_tables(False)[table])
244 with self._timer("table_chunk_select_time", tags={"table": table_name}) as timer:
245 if have_subchunks:
246 replica_table = ExtraTables.replica_chunk_tables(True)[table]
247 table_name = context.schema.tableName(replica_table)
248 query = Select(config.keyspace, table_name, ["*"])
249 query = query.where(C("apdb_replica_chunk") == 0)
250 query = query.where(C("apdb_replica_subchunk") == 0)
251 statement = context.stmt_factory(query, prepare=True)
253 queries: list[tuple] = []
254 for chunk in chunks:
255 if has_chunk_sub_partitions.get(chunk, False):
256 for subchunk in range(config.replica_sub_chunk_count):
257 queries.append((statement, (chunk, subchunk)))
258 if not queries and not have_non_subchunks:
259 # Add a dummy query to return correct set of columns.
260 queries.append((statement, (-1, -1)))
262 if queries:
263 table_data_subchunk = cast(
264 ApdbCassandraTableData,
265 select_concurrent(
266 context.session,
267 queries,
268 "read_raw_multi",
269 config.connection_config.read_concurrency,
270 ),
271 )
273 if have_non_subchunks:
274 replica_table = ExtraTables.replica_chunk_tables(False)[table]
275 table_name = context.schema.tableName(replica_table)
276 query = Select(config.keyspace, table_name, ["*"]).where(C("apdb_replica_chunk") == 0)
277 statement = context.stmt_factory(query, prepare=True)
279 queries = []
280 for chunk in chunks:
281 if not has_chunk_sub_partitions.get(chunk, True):
282 queries.append((statement, (chunk,)))
283 if not queries and not table_data_subchunk:
284 # Add a dummy query to return correct set of columns.
285 queries.append((statement, (-1,)))
287 if queries:
288 table_data = cast(
289 ApdbCassandraTableData,
290 select_concurrent(
291 context.session,
292 queries,
293 "read_raw_multi",
294 config.connection_config.read_concurrency,
295 ),
296 )
298 # Merge if both are non-empty.
299 if table_data and table_data_subchunk:
300 table_data_subchunk.project(drop=["apdb_replica_subchunk"])
301 table_data.append(table_data_subchunk)
302 elif table_data_subchunk:
303 table_data = table_data_subchunk
304 elif not table_data:
305 raise AssertionError("above logic is incorrect")
307 timer.add_values(row_count=len(table_data.rows()))
309 table_schema = self._apdb._schema.tableSchemas[table]
310 # Regular tables should never have columns of ExtraDataTypes, this
311 # is just to make mypy happy.
312 column_types = {
313 column.name: column.datatype
314 for column in table_schema.columns
315 if not isinstance(column.datatype, ExtraDataTypes)
316 }
317 column_types["apdb_replica_chunk"] = felis.datamodel.DataType.long
318 # It may also have subchunk column, we do not always drop it, and
319 # clients should not need it, but we need to provide type for it.
320 column_types["apdb_replica_subchunk"] = felis.datamodel.DataType.int
321 table_data.set_column_types(column_types)
323 return table_data
325 def getUpdateRecordChunks(self, chunks: Iterable[int]) -> Sequence[ApdbUpdateRecord]:
326 # docstring is inherited from a base class
327 context = self._apdb._context
328 config = context.config
330 if not context.schema.replication_enabled:
331 raise ValueError("APDB is not configured for replication")
333 if not context.has_update_record_chunks_table:
334 # Table does not exist yet.
335 return []
337 table_name = context.schema.tableName(ExtraTables.ApdbUpdateRecordChunks)
339 records = []
340 if context.has_chunk_sub_partitions:
341 subchunks = list(range(config.replica_sub_chunk_count))
342 query = Select(config.keyspace, table_name, ["*"])
343 query = query.where(C("apdb_replica_chunk") == 0)
344 query = query.where(C("apdb_replica_subchunk").in_(subchunks))
345 statement = context.stmt_factory(query, prepare=False)
347 with self._timer("select_update_record_time", tags={"table": table_name}) as timer:
348 for chunk in chunks:
349 result = context.session.execute(statement, [chunk] + subchunks)
350 for row in result:
351 records.append(
352 ApdbUpdateRecord.from_json(
353 row.update_time_ns, row.update_order, row.update_payload
354 )
355 )
356 timer.add_values(row_count=len(records))
358 else:
359 query = Select(config.keyspace, table_name, ["*"])
360 query = query.where(C("apdb_replica_chunk").in_(chunks))
361 statement, params = context.stmt_factory(query, prepare=False)
363 with self._timer("select_update_record_time", tags={"table": table_name}) as timer:
364 result = context.session.execute(statement, params)
365 for row in result:
366 records.append(
367 ApdbUpdateRecord.from_json(row.update_time_ns, row.update_order, row.update_payload)
368 )
369 timer.add_values(row_count=len(records))
371 records.sort()
372 return records