Coverage for python / lsst / dax / apdb / cassandra / apdbCassandraReplica.py: 14%

180 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-22 08:49 +0000

1# This file is part of dax_apdb. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22from __future__ import annotations 

23 

24__all__ = ["ApdbCassandraReplica"] 

25 

26import logging 

27from collections.abc import Iterable, Mapping, Sequence 

28from typing import TYPE_CHECKING, cast 

29 

30import astropy.time 

31import felis.datamodel 

32 

33from ..apdbReplica import ApdbReplica, ApdbTableData, ReplicaChunk 

34from ..apdbSchema import ApdbTables 

35from ..apdbUpdateRecord import ApdbUpdateRecord 

36from ..monitor import MonAgent 

37from ..schema_model import ExtraDataTypes 

38from ..timer import Timer 

39from ..versionTuple import VersionTuple 

40from .apdbCassandraSchema import ExtraTables 

41from .cassandra_utils import ( 

42 ApdbCassandraTableData, 

43 execute_concurrent, 

44 select_concurrent, 

45) 

46 

47if TYPE_CHECKING: 

48 from .apdbCassandra import ApdbCassandra 

49 

50_LOG = logging.getLogger(__name__) 

51 

52_MON = MonAgent(__name__) 

53 

54VERSION = VersionTuple(1, 1, 1) 

55"""Version for the code controlling replication tables. This needs to be 

56updated following compatibility rules when schema produced by this code 

57changes. 

58""" 

59 

60 

61class ApdbCassandraReplica(ApdbReplica): 

62 """Implementation of `ApdbReplica` for Cassandra backend. 

63 

64 Parameters 

65 ---------- 

66 apdb : `ApdbCassandra` 

67 Instance of ApbdCassandra for database. 

68 """ 

69 

70 def __init__(self, apdb: ApdbCassandra): 

71 # Note that ApdbCassandra instance must stay alive while this object 

72 # exists, so we keep reference to it. 

73 self._apdb = apdb 

74 

75 def _timer(self, name: str, *, tags: Mapping[str, str | int] | None = None) -> Timer: 

76 """Create `Timer` instance given its name.""" 

77 return Timer(name, _MON, tags=tags) 

78 

79 def schemaVersion(self) -> VersionTuple: 

80 # Docstring inherited from base class. 

81 context = self._apdb._context 

82 return context.db_versions.schema_version 

83 

84 @classmethod 

85 def apdbReplicaImplementationVersion(cls) -> VersionTuple: 

86 # Docstring inherited from base class. 

87 return VERSION 

88 

89 @classmethod 

90 def hasChunkSubPartitions(cls, version: VersionTuple) -> bool: 

91 """Return True if replica chunk tables have sub-partitions.""" 

92 return version >= VersionTuple(1, 1, 0) 

93 

94 @classmethod 

95 def hasUpdateRecordChunks(cls, version: VersionTuple) -> bool: 

96 """Return True if ApdbUpdateRecordChunks should exists.""" 

97 return version >= VersionTuple(1, 1, 1) 

98 

99 def getReplicaChunks(self) -> list[ReplicaChunk] | None: 

100 # docstring is inherited from a base class 

101 context = self._apdb._context 

102 config = context.config 

103 

104 if not context.schema.replication_enabled: 

105 return None 

106 

107 # everything goes into a single partition 

108 partition = 0 

109 

110 table_name = context.schema.tableName(ExtraTables.ApdbReplicaChunks) 

111 # We want to avoid timezone mess so return timestamps as milliseconds. 

112 query = ( 

113 "SELECT toUnixTimestamp(last_update_time), apdb_replica_chunk, unique_id " 

114 f'FROM "{config.keyspace}"."{table_name}" WHERE partition = %s' 

115 ) 

116 

117 with self._timer("chunks_select_time") as timer: 

118 result = context.session.execute( 

119 query, 

120 (partition,), 

121 timeout=config.connection_config.read_timeout, 

122 execution_profile="read_tuples", 

123 ) 

124 # order by last_update_time 

125 rows = sorted(result) 

126 timer.add_values(row_count=len(rows)) 

127 return [ 

128 ReplicaChunk( 

129 id=row[1], 

130 last_update_time=astropy.time.Time(row[0] / 1000, format="unix_tai"), 

131 unique_id=row[2], 

132 ) 

133 for row in rows 

134 ] 

135 

136 def deleteReplicaChunks(self, chunks: Iterable[int]) -> None: 

137 # docstring is inherited from a base class 

138 context = self._apdb._context 

139 config = context.config 

140 

141 if not context.schema.replication_enabled: 

142 raise ValueError("APDB is not configured for replication") 

143 

144 # everything goes into a single partition 

145 partition = 0 

146 

147 # Iterable can be single pass, make everything that we need from it 

148 # in a single loop. 

149 repl_table_params = [] 

150 chunk_table_params: list[tuple] = [] 

151 for chunk in chunks: 

152 repl_table_params.append((partition, chunk)) 

153 if context.has_chunk_sub_partitions: 

154 for subchunk in range(config.replica_sub_chunk_count): 

155 chunk_table_params.append((chunk, subchunk)) 

156 else: 

157 chunk_table_params.append((chunk,)) 

158 # Anything to do att all? 

159 if not repl_table_params: 

160 return 

161 

162 table_name = context.schema.tableName(ExtraTables.ApdbReplicaChunks) 

163 query = ( 

164 f'DELETE FROM "{config.keyspace}"."{table_name}" WHERE partition = ? AND apdb_replica_chunk = ?' 

165 ) 

166 statement = context.preparer.prepare(query) 

167 

168 queries = [(statement, param) for param in repl_table_params] 

169 with self._timer("chunks_delete_time") as timer: 

170 execute_concurrent(context.session, queries) 

171 timer.add_values(row_count=len(queries)) 

172 

173 # Also remove those chunk_ids from Dia*Chunks tables. 

174 tables = list(ExtraTables.replica_chunk_tables(context.has_chunk_sub_partitions).values()) 

175 if context.has_update_record_chunks_table: 

176 tables.append(ExtraTables.ApdbUpdateRecordChunks) 

177 for table in tables: 

178 table_name = context.schema.tableName(table) 

179 query = f'DELETE FROM "{config.keyspace}"."{table_name}" WHERE apdb_replica_chunk = ?' 

180 if context.has_chunk_sub_partitions: 

181 query += " AND apdb_replica_subchunk = ?" 

182 statement = context.preparer.prepare(query) 

183 

184 queries = [(statement, param) for param in chunk_table_params] 

185 with self._timer("table_chunk_detele_time", tags={"table": table_name}) as timer: 

186 execute_concurrent(context.session, queries) 

187 timer.add_values(row_count=len(queries)) 

188 

189 def getTableDataChunks(self, table: ApdbTables, chunks: Iterable[int]) -> ApdbTableData: 

190 # docstring is inherited from a base class 

191 context = self._apdb._context 

192 config = context.config 

193 

194 if not context.schema.replication_enabled: 

195 raise ValueError("APDB is not configured for replication") 

196 if table not in ExtraTables.replica_chunk_tables(False): 

197 raise ValueError(f"Table {table} does not support replica chunks.") 

198 

199 # We need to iterate few times. 

200 chunks = list(chunks) 

201 

202 # If schema was migrated then a chunk can appear in either old or new 

203 # chunk table (e.g. DiaObjectChunks or DiaObjectChunks2). Chunk table 

204 # has a column which will be set to true for new table. 

205 has_chunk_sub_partitions: dict[int, bool] = {} 

206 if context.has_chunk_sub_partitions: 

207 table_name = context.schema.tableName(ExtraTables.ApdbReplicaChunks) 

208 chunks_str = ",".join(str(chunk_id) for chunk_id in chunks) 

209 query = ( 

210 f'SELECT apdb_replica_chunk, has_subchunks FROM "{config.keyspace}"."{table_name}" ' 

211 f"WHERE partition = %s and apdb_replica_chunk IN ({chunks_str})" 

212 ) 

213 partition = 0 

214 result = context.session.execute( 

215 query, 

216 (partition,), 

217 timeout=config.connection_config.read_timeout, 

218 execution_profile="read_tuples", 

219 ) 

220 has_chunk_sub_partitions = dict(result) 

221 else: 

222 has_chunk_sub_partitions = dict.fromkeys(chunks, False) 

223 

224 # Check what kind of tables we want to query, if chunk list is empty 

225 # then use tables which should exist in the schema. 

226 if has_chunk_sub_partitions: 

227 have_subchunks = any(has_chunk_sub_partitions.values()) 

228 have_non_subchunks = not all(has_chunk_sub_partitions.values()) 

229 else: 

230 have_subchunks = context.has_chunk_sub_partitions 

231 have_non_subchunks = not have_subchunks 

232 

233 # NOTE: if an existing database is migrated and has both types of chunk 

234 # tables (e.g. DiaObjectChunks and DiaObjectChunks2) it is possible 

235 # that the same chunk can appear in both tables. In reality schema 

236 # migration should only happen during the downtime, so there will be 

237 # suffient gap and a different chunk ID will be used for new chunks. 

238 

239 table_data: ApdbCassandraTableData | None = None 

240 table_data_subchunk: ApdbCassandraTableData | None = None 

241 

242 table_name = context.schema.tableName(ExtraTables.replica_chunk_tables(False)[table]) 

243 with self._timer("table_chunk_select_time", tags={"table": table_name}) as timer: 

244 if have_subchunks: 

245 replica_table = ExtraTables.replica_chunk_tables(True)[table] 

246 table_name = context.schema.tableName(replica_table) 

247 query = ( 

248 f'SELECT * FROM "{config.keyspace}"."{table_name}" ' 

249 "WHERE apdb_replica_chunk = ? AND apdb_replica_subchunk = ?" 

250 ) 

251 statement = context.preparer.prepare(query) 

252 

253 queries: list[tuple] = [] 

254 for chunk in chunks: 

255 if has_chunk_sub_partitions.get(chunk, False): 

256 for subchunk in range(config.replica_sub_chunk_count): 

257 queries.append((statement, (chunk, subchunk))) 

258 if not queries and not have_non_subchunks: 

259 # Add a dummy query to return correct set of columns. 

260 queries.append((statement, (-1, -1))) 

261 

262 if queries: 

263 table_data_subchunk = cast( 

264 ApdbCassandraTableData, 

265 select_concurrent( 

266 context.session, 

267 queries, 

268 "read_raw_multi", 

269 config.connection_config.read_concurrency, 

270 ), 

271 ) 

272 

273 if have_non_subchunks: 

274 replica_table = ExtraTables.replica_chunk_tables(False)[table] 

275 table_name = context.schema.tableName(replica_table) 

276 query = f'SELECT * FROM "{config.keyspace}"."{table_name}" WHERE apdb_replica_chunk = ?' 

277 statement = context.preparer.prepare(query) 

278 

279 queries = [] 

280 for chunk in chunks: 

281 if not has_chunk_sub_partitions.get(chunk, True): 

282 queries.append((statement, (chunk,))) 

283 if not queries and not table_data_subchunk: 

284 # Add a dummy query to return correct set of columns. 

285 queries.append((statement, (-1,))) 

286 

287 if queries: 

288 table_data = cast( 

289 ApdbCassandraTableData, 

290 select_concurrent( 

291 context.session, 

292 queries, 

293 "read_raw_multi", 

294 config.connection_config.read_concurrency, 

295 ), 

296 ) 

297 

298 # Merge if both are non-empty. 

299 if table_data and table_data_subchunk: 

300 table_data_subchunk.project(drop=["apdb_replica_subchunk"]) 

301 table_data.append(table_data_subchunk) 

302 elif table_data_subchunk: 

303 table_data = table_data_subchunk 

304 elif not table_data: 

305 raise AssertionError("above logic is incorrect") 

306 

307 timer.add_values(row_count=len(table_data.rows())) 

308 

309 table_schema = self._apdb._schema.tableSchemas[table] 

310 # Regular tables should never have columns of ExtraDataTypes, this 

311 # is just to make mypy happy. 

312 column_types = { 

313 column.name: column.datatype 

314 for column in table_schema.columns 

315 if not isinstance(column.datatype, ExtraDataTypes) 

316 } 

317 column_types["apdb_replica_chunk"] = felis.datamodel.DataType.long 

318 # It may also have subchunk column, we do not always drop it, and 

319 # clients should not need it, but we need to provide type for it. 

320 column_types["apdb_replica_subchunk"] = felis.datamodel.DataType.int 

321 table_data.set_column_types(column_types) 

322 

323 return table_data 

324 

325 def getUpdateRecordChunks(self, chunks: Iterable[int]) -> Sequence[ApdbUpdateRecord]: 

326 # docstring is inherited from a base class 

327 context = self._apdb._context 

328 config = context.config 

329 

330 if not context.schema.replication_enabled: 

331 raise ValueError("APDB is not configured for replication") 

332 

333 if not context.has_update_record_chunks_table: 

334 # Table does not exist yet. 

335 return [] 

336 

337 table_name = context.schema.tableName(ExtraTables.ApdbUpdateRecordChunks) 

338 

339 records = [] 

340 if context.has_chunk_sub_partitions: 

341 subchunks = ",".join(str(val) for val in range(config.replica_sub_chunk_count)) 

342 query = ( 

343 f'SELECT * FROM "{config.keyspace}"."{table_name}" ' 

344 f"WHERE apdb_replica_chunk = %s AND apdb_replica_subchunk IN ({subchunks})" 

345 ) 

346 

347 with self._timer("select_update_record_time", tags={"table": table_name}) as timer: 

348 for chunk in chunks: 

349 result = context.session.execute(query, [chunk]) 

350 for row in result: 

351 records.append( 

352 ApdbUpdateRecord.from_json( 

353 row.update_time_ns, row.update_order, row.update_payload 

354 ) 

355 ) 

356 timer.add_values(row_count=len(records)) 

357 

358 else: 

359 chunks_str = ",".join(str(val) for val in chunks) 

360 query = ( 

361 f'SELECT * FROM "{config.keyspace}"."{table_name}" WHERE apdb_replica_chunk IN ({chunks_str})' 

362 ) 

363 

364 with self._timer("select_update_record_time", tags={"table": table_name}) as timer: 

365 result = context.session.execute(query) 

366 for row in result: 

367 records.append( 

368 ApdbUpdateRecord.from_json(row.update_time_ns, row.update_order, row.update_payload) 

369 ) 

370 timer.add_values(row_count=len(records)) 

371 

372 records.sort() 

373 return records