Coverage for python / lsst / dax / apdb / cassandra / apdbCassandraSchema.py: 18%

313 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-22 08:49 +0000

1# This file is part of dax_apdb. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22from __future__ import annotations 

23 

24__all__ = ["ApdbCassandraSchema", "CreateTableOptions", "TableOptions"] 

25 

26import enum 

27import logging 

28from collections.abc import Mapping 

29from typing import TYPE_CHECKING, cast 

30 

31import felis.datamodel 

32import pydantic 

33 

34from .. import schema_model 

35from ..apdbSchema import ApdbTables 

36 

37if TYPE_CHECKING: 

38 import cassandra.cluster 

39 

40 from ..schema_model import Table 

41 from .config import ApdbCassandraTimePartitionRange 

42 

43 

44_LOG = logging.getLogger(__name__) 

45 

46 

47class InconsistentSchemaError(RuntimeError): 

48 """Exception raised when schema state is inconsistent.""" 

49 

50 

51class TableOptions(pydantic.BaseModel): 

52 """Set of per-table options for creating Cassandra tables.""" 

53 

54 model_config = pydantic.ConfigDict(extra="forbid") 

55 

56 tables: list[str] 

57 """List of table names for which the options should be applied.""" 

58 

59 options: str 

60 

61 

62class CreateTableOptions(pydantic.BaseModel): 

63 """Set of options for creating Cassandra tables.""" 

64 

65 model_config = pydantic.ConfigDict(extra="forbid") 

66 

67 table_options: list[TableOptions] = pydantic.Field(default_factory=list) 

68 """Collection of per-table options.""" 

69 

70 default_table_options: str = "" 

71 """Default options used for tables that are not in the above list.""" 

72 

73 def get_options(self, table_name: str) -> str: 

74 """Find table options for a given table name.""" 

75 for table_options in self.table_options: 

76 if table_name in table_options.tables: 

77 return table_options.options 

78 return self.default_table_options 

79 

80 

81@enum.unique 

82class ExtraTables(enum.Enum): 

83 """Names of the extra tables used by Cassandra implementation. 

84 

85 Chunk tables exist in two versions now to support both old and new schema. 

86 Eventually we will drop support for old tables. 

87 """ 

88 

89 ApdbReplicaChunks = "ApdbReplicaChunks" 

90 """Name of the table for replica chunk records.""" 

91 

92 DiaObjectChunks = "DiaObjectChunks" 

93 """Name of the table for DIAObject chunk data.""" 

94 

95 DiaSourceChunks = "DiaSourceChunks" 

96 """Name of the table for DIASource chunk data.""" 

97 

98 DiaForcedSourceChunks = "DiaForcedSourceChunks" 

99 """Name of the table for DIAForcedSource chunk data.""" 

100 

101 DiaObjectChunks2 = "DiaObjectChunks2" 

102 """Name of the table for DIAObject chunk data.""" 

103 

104 DiaSourceChunks2 = "DiaSourceChunks2" 

105 """Name of the table for DIASource chunk data.""" 

106 

107 DiaForcedSourceChunks2 = "DiaForcedSourceChunks2" 

108 """Name of the table for DIAForcedSource chunk data.""" 

109 

110 ApdbUpdateRecordChunks = "ApdbUpdateRecordChunks" 

111 """Name of the table for ApdbUpdateRecord chunk data.""" 

112 

113 DiaSourceToPartition = "DiaSourceToPartition" 

114 """Maps diaSourceId to its partition values (pixel and time).""" 

115 

116 DiaObjectLastToPartition = "DiaObjectLastToPartition" 

117 """Maps last diaObjectId version to its partition (pixel).""" 

118 

119 ApdbVisitDetector = "ApdbVisitDetector" 

120 """Records attempted processing of visit/detector.""" 

121 

122 def table_name(self, prefix: str = "", time_partition: int | None = None) -> str: 

123 """Return full table name. 

124 

125 Parameters 

126 ---------- 

127 prefix : `str`, optional 

128 Optional prefix for table name. 

129 time_partition : `int`, optional 

130 Optional time partition, only used for tables that support time 

131 patitioning. 

132 """ 

133 return f"{prefix}{self.value}" 

134 

135 @classmethod 

136 def replica_chunk_tables(cls, has_subchunks: bool) -> Mapping[ApdbTables, ExtraTables]: 

137 """Return mapping of APDB tables to corresponding replica chunks 

138 tables. 

139 """ 

140 if has_subchunks: 

141 return { 

142 ApdbTables.DiaObject: cls.DiaObjectChunks2, 

143 ApdbTables.DiaSource: cls.DiaSourceChunks2, 

144 ApdbTables.DiaForcedSource: cls.DiaForcedSourceChunks2, 

145 } 

146 else: 

147 return { 

148 ApdbTables.DiaObject: cls.DiaObjectChunks, 

149 ApdbTables.DiaSource: cls.DiaSourceChunks, 

150 ApdbTables.DiaForcedSource: cls.DiaForcedSourceChunks, 

151 } 

152 

153 

154class ApdbCassandraSchema: 

155 """Class for management of APDB schema. 

156 

157 Parameters 

158 ---------- 

159 session : `cassandra.cluster.Session` 

160 Cassandra session object 

161 keyspace : `str` 

162 Keyspace name for all tables. 

163 schema_file : `str` 

164 Name of the YAML schema file. 

165 schema_name : `str`, optional 

166 Name of the schema in YAML files. 

167 prefix : `str`, optional 

168 Prefix to add to all schema elements. 

169 time_partition_tables : `bool` 

170 If `True` then schema will have a separate table for each time 

171 partition. 

172 enable_replica : `bool`, optional 

173 If `True` then use additional tables for replica chunks. 

174 has_chunk_sub_partitions : `bool`, optional 

175 If `True` then replica chunk tables have sub-partition columns. Only 

176 used if ``enable_replica`` is `True`. 

177 """ 

178 

179 _type_map = { 

180 felis.datamodel.DataType.double: "DOUBLE", 

181 felis.datamodel.DataType.float: "FLOAT", 

182 felis.datamodel.DataType.timestamp: "TIMESTAMP", 

183 felis.datamodel.DataType.long: "BIGINT", 

184 felis.datamodel.DataType.int: "INT", 

185 felis.datamodel.DataType.short: "SMALLINT", 

186 felis.datamodel.DataType.byte: "TINYINT", 

187 felis.datamodel.DataType.binary: "BLOB", 

188 felis.datamodel.DataType.char: "TEXT", 

189 felis.datamodel.DataType.string: "TEXT", 

190 felis.datamodel.DataType.unicode: "TEXT", 

191 felis.datamodel.DataType.text: "TEXT", 

192 felis.datamodel.DataType.boolean: "BOOLEAN", 

193 schema_model.ExtraDataTypes.UUID: "UUID", 

194 } 

195 """Map YAML column types to Cassandra""" 

196 

197 _time_partitioned_tables = [ 

198 ApdbTables.DiaObject, 

199 ApdbTables.DiaSource, 

200 ApdbTables.DiaForcedSource, 

201 ] 

202 _spatially_partitioned_tables = [ApdbTables.DiaObjectLast] 

203 

204 def __init__( 

205 self, 

206 session: cassandra.cluster.Session, 

207 keyspace: str, 

208 table_schemas: Mapping[ApdbTables, Table], 

209 prefix: str = "", 

210 time_partition_tables: bool = False, 

211 enable_replica: bool = False, 

212 replica_skips_diaobjects: bool = False, 

213 has_chunk_sub_partitions: bool = True, 

214 has_visit_detector_table: bool = True, 

215 ): 

216 self._session = session 

217 self._keyspace = keyspace 

218 self._table_schemas = table_schemas 

219 self._prefix = prefix 

220 self._time_partition_tables = time_partition_tables 

221 self._enable_replica = enable_replica 

222 self._replica_skips_diaobjects = replica_skips_diaobjects 

223 self._has_chunk_sub_partitions = has_chunk_sub_partitions 

224 self._has_visit_detector_table = has_visit_detector_table 

225 

226 self._apdb_tables = self._apdb_tables_schema(time_partition_tables) 

227 self._extra_tables = self._extra_tables_schema() 

228 

229 def _apdb_tables_schema(self, time_partition_tables: bool) -> Mapping[ApdbTables, schema_model.Table]: 

230 """Generate schema for regular APDB tables.""" 

231 apdb_tables: dict[ApdbTables, schema_model.Table] = {} 

232 

233 # add columns and index for partitioning. 

234 for table, apdb_table_def in self._table_schemas.items(): 

235 part_columns = [] 

236 add_columns = [] 

237 primary_key = apdb_table_def.primary_key[:] 

238 if table in self._spatially_partitioned_tables: 

239 # DiaObjectLast does not need temporal partitioning 

240 part_columns = ["apdb_part"] 

241 add_columns = part_columns 

242 elif table in self._time_partitioned_tables: 

243 if time_partition_tables: 

244 part_columns = ["apdb_part"] 

245 else: 

246 part_columns = ["apdb_part", "apdb_time_part"] 

247 add_columns = part_columns 

248 elif table is ApdbTables.SSObject: 

249 # For SSObject there is no natural partition key but we have 

250 # to partition it because there are too many of them. I'm 

251 # going to partition on its primary key (and drop separate 

252 # primary key index). 

253 part_columns = ["ssObjectId"] 

254 primary_key = [] 

255 elif table is ApdbTables.metadata: 

256 # Metadata is in one partition because we want to read all of 

257 # it in one query, add an extra column for partition. 

258 part_columns = ["meta_part"] 

259 add_columns = part_columns 

260 else: 

261 # TODO: Do not know what to do with the other tables 

262 continue 

263 

264 column_defs = [] 

265 if add_columns: 

266 column_defs = [ 

267 schema_model.Column( 

268 id=f"#{name}", name=name, datatype=felis.datamodel.DataType.long, nullable=False 

269 ) 

270 for name in add_columns 

271 ] 

272 

273 if table is ApdbTables.DiaObjectLast: 

274 # In the past DiaObjectLast table did not have validityStart. 

275 validity_start_column = "validityStartMjdTai" 

276 try: 

277 if not self.check_column(ApdbTables.DiaObjectLast, validity_start_column): 

278 for column in apdb_table_def.columns: 

279 if column.name == validity_start_column: 

280 apdb_table_def.columns.remove(column) 

281 break 

282 except LookupError: 

283 # Table has not been created yet. 

284 pass 

285 

286 annotations = dict(apdb_table_def.annotations) 

287 annotations["cassandra:apdb_column_names"] = [column.name for column in apdb_table_def.columns] 

288 if part_columns: 

289 annotations["cassandra:partitioning_columns"] = part_columns 

290 

291 apdb_tables[table] = schema_model.Table( 

292 id=apdb_table_def.id, 

293 name=apdb_table_def.name, 

294 columns=column_defs + apdb_table_def.columns, 

295 primary_key=primary_key, 

296 indexes=[], 

297 constraints=[], 

298 annotations=annotations, 

299 ) 

300 

301 return apdb_tables 

302 

303 def _extra_tables_schema(self) -> Mapping[ExtraTables, schema_model.Table]: 

304 """Generate schema for extra tables.""" 

305 extra_tables: dict[ExtraTables, schema_model.Table] = {} 

306 

307 if self._has_visit_detector_table: 

308 columns = [ 

309 schema_model.Column( 

310 id="#visit", 

311 name="visit", 

312 datatype=felis.datamodel.DataType.long, 

313 nullable=False, 

314 ), 

315 schema_model.Column( 

316 id="#detector", 

317 name="detector", 

318 datatype=felis.datamodel.DataType.short, 

319 nullable=False, 

320 ), 

321 ] 

322 extra_tables[ExtraTables.ApdbVisitDetector] = schema_model.Table( 

323 id="#" + ExtraTables.ApdbVisitDetector.value, 

324 name=ExtraTables.ApdbVisitDetector.table_name(self._prefix), 

325 columns=columns, 

326 primary_key=[], 

327 indexes=[], 

328 constraints=[], 

329 annotations={"cassandra:partitioning_columns": ["visit", "detector"]}, 

330 ) 

331 

332 # This table maps DiaSource ID to its partitions in DiaSource table and 

333 # DiaSourceChunks tables. 

334 extra_tables[ExtraTables.DiaSourceToPartition] = schema_model.Table( 

335 id="#" + ExtraTables.DiaSourceToPartition.value, 

336 name=ExtraTables.DiaSourceToPartition.table_name(self._prefix), 

337 columns=[ 

338 schema_model.Column( 

339 id="#diaSourceId", 

340 name="diaSourceId", 

341 datatype=felis.datamodel.DataType.long, 

342 nullable=False, 

343 ), 

344 schema_model.Column( 

345 id="#apdb_part", name="apdb_part", datatype=felis.datamodel.DataType.long, nullable=False 

346 ), 

347 schema_model.Column( 

348 id="#apdb_time_part", 

349 name="apdb_time_part", 

350 datatype=felis.datamodel.DataType.int, 

351 nullable=False, 

352 ), 

353 schema_model.Column( 

354 id="#apdb_replica_chunk", 

355 name="apdb_replica_chunk", 

356 datatype=felis.datamodel.DataType.long, 

357 nullable=True, 

358 ), 

359 schema_model.Column( 

360 id="#apdb_replica_subchunk", 

361 name="apdb_replica_subchunk", 

362 datatype=felis.datamodel.DataType.int, 

363 nullable=True, 

364 ), 

365 ], 

366 primary_key=[], 

367 indexes=[], 

368 constraints=[], 

369 annotations={"cassandra:partitioning_columns": ["diaSourceId"]}, 

370 ) 

371 

372 # This table maps diaObjectId to its partition in DiaObjectLast table. 

373 extra_tables[ExtraTables.DiaObjectLastToPartition] = schema_model.Table( 

374 id="#" + ExtraTables.DiaObjectLastToPartition.value, 

375 name=ExtraTables.DiaObjectLastToPartition.table_name(self._prefix), 

376 columns=[ 

377 schema_model.Column( 

378 id="#diaObjectId", 

379 name="diaObjectId", 

380 datatype=felis.datamodel.DataType.long, 

381 nullable=False, 

382 ), 

383 schema_model.Column( 

384 id="#apdb_part", name="apdb_part", datatype=felis.datamodel.DataType.long, nullable=False 

385 ), 

386 ], 

387 primary_key=[], 

388 indexes=[], 

389 constraints=[], 

390 annotations={"cassandra:partitioning_columns": ["diaObjectId"]}, 

391 ) 

392 

393 if not self._enable_replica: 

394 return extra_tables 

395 

396 replica_chunk_column = schema_model.Column( 

397 id="#apdb_replica_chunk", 

398 name="apdb_replica_chunk", 

399 datatype=felis.datamodel.DataType.long, 

400 nullable=False, 

401 ) 

402 

403 replica_chunk_columns = [replica_chunk_column] 

404 if self._has_chunk_sub_partitions: 

405 replica_chunk_columns.append( 

406 schema_model.Column( 

407 id="#apdb_replica_subchunk", 

408 name="apdb_replica_subchunk", 

409 datatype=felis.datamodel.DataType.int, 

410 nullable=False, 

411 ) 

412 ) 

413 

414 # Table containing replica chunks, this one is not partitioned, but 

415 # partition key must be defined. 

416 extra_tables[ExtraTables.ApdbReplicaChunks] = schema_model.Table( 

417 id="#" + ExtraTables.ApdbReplicaChunks.value, 

418 name=ExtraTables.ApdbReplicaChunks.table_name(self._prefix), 

419 columns=[ 

420 schema_model.Column( 

421 id="#partition", name="partition", datatype=felis.datamodel.DataType.int, nullable=False 

422 ), 

423 replica_chunk_column, 

424 schema_model.Column( 

425 id="#last_update_time", 

426 name="last_update_time", 

427 datatype=felis.datamodel.DataType.timestamp, 

428 nullable=False, 

429 ), 

430 schema_model.Column( 

431 id="#unique_id", 

432 name="unique_id", 

433 datatype=schema_model.ExtraDataTypes.UUID, 

434 nullable=False, 

435 ), 

436 schema_model.Column( 

437 id="#has_subchunks", 

438 name="has_subchunks", 

439 datatype=felis.datamodel.DataType.boolean, 

440 nullable=True, 

441 ), 

442 ], 

443 primary_key=[replica_chunk_column], 

444 indexes=[], 

445 constraints=[], 

446 annotations={"cassandra:partitioning_columns": ["partition"]}, 

447 ) 

448 

449 replica_chunk_tables = ExtraTables.replica_chunk_tables(self._has_chunk_sub_partitions) 

450 for apdb_table_enum, chunk_table_enum in replica_chunk_tables.items(): 

451 apdb_table_def = self._table_schemas[apdb_table_enum] 

452 

453 extra_tables[chunk_table_enum] = schema_model.Table( 

454 id="#" + chunk_table_enum.value, 

455 name=chunk_table_enum.table_name(self._prefix), 

456 columns=replica_chunk_columns + apdb_table_def.columns, 

457 primary_key=apdb_table_def.primary_key[:], 

458 indexes=[], 

459 constraints=[], 

460 annotations={ 

461 "cassandra:partitioning_columns": [column.name for column in replica_chunk_columns], 

462 "cassandra:apdb_column_names": [column.name for column in apdb_table_def.columns], 

463 }, 

464 ) 

465 

466 # Table with replica chunk data for ApdbUpdateRecord. 

467 columns = [ 

468 schema_model.Column( 

469 id=f"#{ExtraTables.ApdbUpdateRecordChunks.value}.update_time_ns", 

470 name="update_time_ns", 

471 datatype=felis.datamodel.DataType.long, 

472 nullable=False, 

473 ), 

474 schema_model.Column( 

475 id=f"#{ExtraTables.ApdbUpdateRecordChunks.value}.update_order", 

476 name="update_order", 

477 datatype=felis.datamodel.DataType.int, 

478 nullable=False, 

479 ), 

480 schema_model.Column( 

481 id=f"#{ExtraTables.ApdbUpdateRecordChunks.value}.update_unique_id", 

482 name="update_unique_id", 

483 datatype=schema_model.ExtraDataTypes.UUID, 

484 nullable=False, 

485 ), 

486 schema_model.Column( 

487 id=f"#{ExtraTables.ApdbUpdateRecordChunks.value}.update_payload", 

488 name="update_payload", 

489 datatype=felis.datamodel.DataType.string, 

490 nullable=False, 

491 ), 

492 ] 

493 extra_tables[ExtraTables.ApdbUpdateRecordChunks] = schema_model.Table( 

494 id=f"#{ExtraTables.ApdbUpdateRecordChunks.value}", 

495 name=ExtraTables.ApdbUpdateRecordChunks.table_name(self._prefix), 

496 columns=replica_chunk_columns + columns, 

497 primary_key=columns[:3], 

498 indexes=[], 

499 constraints=[], 

500 annotations={ 

501 "cassandra:partitioning_columns": [column.name for column in replica_chunk_columns], 

502 }, 

503 ) 

504 

505 return extra_tables 

506 

507 @property 

508 def replication_enabled(self) -> bool: 

509 """True when replication is enabled (`bool`).""" 

510 return self._enable_replica 

511 

512 def empty(self) -> bool: 

513 """Return True if database schema is empty. 

514 

515 Returns 

516 ------- 

517 empty : `bool` 

518 `True` if none of the required APDB tables exist in the database, 

519 `False` if all required tables exist. 

520 

521 Raises 

522 ------ 

523 InconsistentSchemaError 

524 Raised when some of the required tables exist but not all. 

525 """ 

526 query = "SELECT table_name FROM system_schema.tables WHERE keyspace_name = %s" 

527 result = self._session.execute(query, (self._keyspace,)) 

528 table_names = {row[0] for row in result.all()} 

529 

530 existing_tables = [] 

531 missing_tables = [] 

532 for table_enum in self._apdb_tables: 

533 table_name = table_enum.table_name(self._prefix) 

534 if self._time_partition_tables and table_enum in self._time_partitioned_tables: 

535 # Check prefix for time-partitioned tables. 

536 exists = any(table.startswith(f"{table_name}_") for table in table_names) 

537 else: 

538 exists = table_name in table_names 

539 if exists: 

540 existing_tables.append(table_name) 

541 else: 

542 missing_tables.append(table_name) 

543 

544 if not missing_tables: 

545 return False 

546 elif not existing_tables: 

547 return True 

548 else: 

549 raise InconsistentSchemaError( 

550 f"Only some required APDB tables exist: {existing_tables}, missing tables: {missing_tables}" 

551 ) 

552 

553 def existing_tables(self, *args: ApdbTables) -> dict[ApdbTables, list[str]]: 

554 """Return the list of existing table names for given table. 

555 

556 Parameters 

557 ---------- 

558 *args : `ApdbTables` 

559 Tables for which to return their existing table names. 

560 

561 Returns 

562 ------- 

563 tables : `dict` [`ApdbTables`, `list`[`str`]] 

564 Mapping of the APDB table to the list of the existing table names. 

565 More than one name can be present in the list if configuration 

566 specifies per-partition tables. 

567 """ 

568 if self._time_partition_tables and not set(args).isdisjoint(self._time_partitioned_tables): 

569 # Some of the tables should have per-partition tables. 

570 query = "SELECT table_name FROM system_schema.tables WHERE keyspace_name = %s" 

571 result = self._session.execute(query, (self._keyspace,)) 

572 table_names = {row[0] for row in result.all()} 

573 

574 tables = {} 

575 for table_enum in args: 

576 base_name = table_enum.table_name(self._prefix) 

577 if table_enum in self._time_partitioned_tables: 

578 tables[table_enum] = [table for table in table_names if table.startswith(f"{base_name}_")] 

579 else: 

580 tables[table_enum] = [base_name] 

581 return tables 

582 else: 

583 # Do not check that they exist, we know that they should. 

584 return {table_enum: [table_enum.table_name(self._prefix)] for table_enum in args} 

585 

586 def check_column(self, table_enum: ApdbTables | ExtraTables, column: str) -> bool: 

587 """Check for the existence of the column in a given table. 

588 

589 Parameters 

590 ---------- 

591 table_enum : `ApdbTables` or `ExtraTables` 

592 Table to check for a column. 

593 column : `str` 

594 Name of the column to check. 

595 

596 Returns 

597 ------- 

598 exists : `bool` 

599 True if column exists, False otherwise. 

600 

601 Raises 

602 ------ 

603 LookupError 

604 Raised if table does not exist. 

605 """ 

606 if self._time_partition_tables and table_enum in self._time_partitioned_tables: 

607 query = ( 

608 "SELECT table_name FROM system_schema.columns WHERE keyspace_name = %s AND column_name = %s " 

609 "ALLOW FILTERING" 

610 ) 

611 result = self._session.execute(query, (self._keyspace, column)) 

612 base_name = table_enum.table_name(self._prefix) 

613 for row in result.all(): 

614 table_name = row[0] 

615 if table_name.startswith(f"{base_name}_"): 

616 return True 

617 # Check that there is any table with matching name. 

618 assert isinstance(table_enum, ApdbTables), "Can only be ApdbTables" 

619 tables = self.existing_tables(table_enum) 

620 if not tables[table_enum]: 

621 raise LookupError(f"Table {base_name} does not exist.") 

622 return False 

623 else: 

624 table_name = table_enum.table_name(self._prefix) 

625 query = ( 

626 "SELECT column_name FROM system_schema.columns WHERE keyspace_name = %s AND table_name = %s" 

627 ) 

628 result = self._session.execute(query, (self._keyspace, table_name)) 

629 rows = list(result) 

630 if not rows: 

631 raise LookupError(f"Table {table_name} does not exist.") 

632 for row in rows: 

633 if row.column_name == column: 

634 return True 

635 return False 

636 

637 def tableName(self, table_name: ApdbTables | ExtraTables, time_partition: int | None = None) -> str: 

638 """Return Cassandra table name for APDB table. 

639 

640 Parameters 

641 ---------- 

642 table_name : `ApdbTables` or `ExtraTables` 

643 Table enum for which to generate table name. 

644 time_partition : `int`, optional 

645 Optional time partition, only used for tables that support time 

646 patitioning. 

647 """ 

648 return table_name.table_name(self._prefix, time_partition) 

649 

650 def keyspace(self) -> str: 

651 """Return Cassandra keyspace for APDB tables.""" 

652 return self._keyspace 

653 

654 def getColumnMap(self, table_name: ApdbTables | ExtraTables) -> Mapping[str, schema_model.Column]: 

655 """Return mapping of column names to Column definitions. 

656 

657 Parameters 

658 ---------- 

659 table_name : `ApdbTables` 

660 One of known APDB table names. 

661 

662 Returns 

663 ------- 

664 column_map : `dict` 

665 Mapping of column names to `ColumnDef` instances. 

666 """ 

667 table_schema = self._table_schema(table_name) 

668 cmap = {column.name: column for column in table_schema.columns} 

669 return cmap 

670 

671 def apdbColumnNames(self, table_name: ApdbTables | ExtraTables) -> list[str]: 

672 """Return a list of columns names for a table as defined in APDB 

673 schema. 

674 

675 Parameters 

676 ---------- 

677 table_name : `ApdbTables` or `ExtraTables` 

678 Enum for a table in APDB schema. 

679 

680 Returns 

681 ------- 

682 columns : `list` of `str` 

683 Names of regular columns in the table. 

684 """ 

685 table_schema = self._table_schema(table_name) 

686 return table_schema.annotations["cassandra:apdb_column_names"] 

687 

688 def partitionColumns(self, table_name: ApdbTables | ExtraTables) -> list[str]: 

689 """Return a list of columns used for table partitioning. 

690 

691 Parameters 

692 ---------- 

693 table_name : `ApdbTables` 

694 Table name in APDB schema 

695 

696 Returns 

697 ------- 

698 columns : `list` of `str` 

699 Names of columns used for partitioning. 

700 """ 

701 table_schema = self._table_schema(table_name) 

702 return table_schema.annotations.get("cassandra:partitioning_columns", []) 

703 

704 def clusteringColumns(self, table_name: ApdbTables | ExtraTables) -> list[str]: 

705 """Return a list of columns used for clustering. 

706 

707 Parameters 

708 ---------- 

709 table_name : `ApdbTables` 

710 Table name in APDB schema 

711 

712 Returns 

713 ------- 

714 columns : `list` of `str` 

715 Names of columns for used for clustering. 

716 """ 

717 table_schema = self._table_schema(table_name) 

718 return [column.name for column in table_schema.primary_key] 

719 

720 def makeSchema( 

721 self, 

722 *, 

723 drop: bool = False, 

724 part_range: ApdbCassandraTimePartitionRange | None = None, 

725 replication_factor: int | None = None, 

726 table_options: CreateTableOptions | None = None, 

727 ) -> None: 

728 """Create or re-create all tables. 

729 

730 Parameters 

731 ---------- 

732 drop : `bool` 

733 If True then drop tables before creating new ones. Note that 

734 only tables are dropped and not the whole keyspace. 

735 part_range : `ApdbCassandraTimePartitionRange` or `None` 

736 Start and end partition number for time partitions. Used to create 

737 per-partition DiaObject, DiaSource, and DiaForcedSource tables. If 

738 `None` then per-partition tables are not created. 

739 replication_factor : `int`, optional 

740 Replication factor used when creating new keyspace, if keyspace 

741 already exists its replication factor is not changed. 

742 """ 

743 # Try to create keyspace if it does not exist 

744 if replication_factor is None: 

745 replication_factor = 1 

746 

747 # If keyspace exists check its replication factor. 

748 query = "SELECT replication FROM system_schema.keyspaces WHERE keyspace_name = %s" 

749 result = self._session.execute(query, (self._keyspace,)) 

750 if row := result.one(): 

751 # Check replication factor, this depends strategy class. 

752 repl_config = cast(Mapping[str, str], row[0]) 

753 _, _, repl_class = repl_config["class"].rpartition(".") 

754 if repl_class == "SimpleStrategy": 

755 current_repl = {int(repl_config["replication_factor"])} 

756 elif repl_class == "NetworkTopologyStrategy": 

757 # There may by multiple datacenters with different replication 

758 # factors. 

759 current_repl = { 

760 int(val) for key, val in repl_config.items() if key != "class" and val.isdecimal() 

761 } 

762 else: 

763 raise ValueError(f"Unexpected replication strategy: {repl_class}") 

764 if replication_factor not in current_repl: 

765 raise ValueError( 

766 f"New replication factor {replication_factor} differs from the replication factor " 

767 f"for already existing keyspace: {current_repl}" 

768 ) 

769 else: 

770 # Need a new keyspace. 

771 query = ( 

772 f'CREATE KEYSPACE "{self._keyspace}"' 

773 " WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': " 

774 f"{replication_factor}" 

775 "}" 

776 ) 

777 self._session.execute(query) 

778 

779 table_options = self._update_table_options(table_options) 

780 for table in self._apdb_tables: 

781 if table is ApdbTables.DiaObject and self._enable_replica and self._replica_skips_diaobjects: 

782 continue 

783 if table in (ApdbTables.SSObject, ApdbTables.SSSource): 

784 # SSObject/SSSource do not exist in APDB, but are defined in 

785 # ApdbTables. The reason is that AP wants to have schema of 

786 # these tables for alert-related business. 

787 continue 

788 self._makeTableSchema(table, drop, part_range, table_options) 

789 for extra_table in self._extra_tables: 

790 self._makeTableSchema(extra_table, drop, part_range, table_options) 

791 

792 def _update_table_options(self, options: CreateTableOptions | None) -> CreateTableOptions | None: 

793 """Extend table options with options for internal tables.""" 

794 # We want to add TTL option to ApdbVisitDetector table. 

795 if not self._has_visit_detector_table: 

796 return options 

797 

798 if not options: 

799 options = CreateTableOptions() 

800 

801 # set both TTL and gc_grace_seconds to 24h. 

802 options.table_options.append( 

803 TableOptions( 

804 tables=[ExtraTables.ApdbVisitDetector.table_name(self._prefix)], 

805 options="default_time_to_live=86400 AND gc_grace_seconds=86400", 

806 ) 

807 ) 

808 

809 return options 

810 

811 def _makeTableSchema( 

812 self, 

813 table: ApdbTables | ExtraTables, 

814 drop: bool = False, 

815 part_range: ApdbCassandraTimePartitionRange | None = None, 

816 table_options: CreateTableOptions | None = None, 

817 ) -> None: 

818 _LOG.debug("Making table %s", table) 

819 

820 fullTable = table.table_name(self._prefix) 

821 

822 table_list = [fullTable] 

823 if part_range is not None: 

824 if table in self._time_partitioned_tables: 

825 table_list = [table.table_name(self._prefix, part) for part in part_range.range()] 

826 

827 if drop: 

828 queries = [f'DROP TABLE IF EXISTS "{self._keyspace}"."{table_name}"' for table_name in table_list] 

829 futures = [self._session.execute_async(query, timeout=None) for query in queries] 

830 for future in futures: 

831 _LOG.debug("wait for query: %s", future.query) 

832 future.result() 

833 _LOG.debug("query finished: %s", future.query) 

834 

835 queries = [] 

836 options = table_options.get_options(fullTable).strip() if table_options else None 

837 for table_name in table_list: 

838 if_not_exists = "" if drop else "IF NOT EXISTS" 

839 columns = ", ".join(self._tableColumns(table)) 

840 query = f'CREATE TABLE {if_not_exists} "{self._keyspace}"."{table_name}" ({columns})' 

841 if options: 

842 query = f"{query} WITH {options}" 

843 _LOG.debug("query: %s", query) 

844 queries.append(query) 

845 futures = [self._session.execute_async(query, timeout=None) for query in queries] 

846 for future in futures: 

847 _LOG.debug("wait for query: %s", future.query) 

848 future.result() 

849 _LOG.debug("query finished: %s", future.query) 

850 

851 def _tableColumns(self, table_name: ApdbTables | ExtraTables) -> list[str]: 

852 """Return set of columns in a table 

853 

854 Parameters 

855 ---------- 

856 table_name : `ApdbTables` 

857 Name of the table. 

858 

859 Returns 

860 ------- 

861 column_defs : `list` 

862 List of strings in the format "column_name type". 

863 """ 

864 table_schema = self._table_schema(table_name) 

865 

866 # must have partition columns and clustering columns 

867 part_columns = table_schema.annotations.get("cassandra:partitioning_columns", []) 

868 clust_columns = [column.name for column in table_schema.primary_key] 

869 _LOG.debug("part_columns: %s", part_columns) 

870 _LOG.debug("clust_columns: %s", clust_columns) 

871 if not part_columns: 

872 raise ValueError(f"Table {table_name} configuration is missing partition index") 

873 

874 # all columns 

875 column_defs = [] 

876 for column in table_schema.columns: 

877 ctype = self._type_map[column.datatype] 

878 column_defs.append(f'"{column.name}" {ctype}') 

879 

880 # primary key definition 

881 part_columns = [f'"{col}"' for col in part_columns] 

882 clust_columns = [f'"{col}"' for col in clust_columns] 

883 if len(part_columns) > 1: 

884 columns = ", ".join(part_columns) 

885 part_columns = [f"({columns})"] 

886 pkey = ", ".join(part_columns + clust_columns) 

887 _LOG.debug("pkey: %s", pkey) 

888 column_defs.append(f"PRIMARY KEY ({pkey})") 

889 

890 return column_defs 

891 

892 def _table_schema(self, table: ApdbTables | ExtraTables) -> schema_model.Table: 

893 """Return schema definition for a table.""" 

894 if isinstance(table, ApdbTables): 

895 table_schema = self._apdb_tables[table] 

896 else: 

897 table_schema = self._extra_tables[table] 

898 return table_schema 

899 

900 def table_row_size(self, table: ApdbTables | ExtraTables) -> int: 

901 """Return an estimate of the row size of a given table. 

902 

903 Parameters 

904 ---------- 

905 table : `ApdbTables` or `ExtraTables` 

906 

907 Returns 

908 ------- 

909 size : `int` 

910 An estimate of a table row size. 

911 

912 Notes 

913 ----- 

914 Returned size is not exact. When table has variable-size columns (e.g. 

915 strings) may be incorrect. Stored data size or wire-level protocol size 

916 can be smaller if some columns are not set or set to NULL. 

917 """ 

918 table_schema = self._table_schema(table) 

919 size = sum(column.size() for column in table_schema.columns) 

920 return size 

921 

922 def time_partitioned_tables(self) -> list[ApdbTables]: 

923 """Make the list of time-partitioned tables. 

924 

925 Returns 

926 ------- 

927 tables : `list` [`ApdbTables`] 

928 Tables the are time-partitioned. 

929 """ 

930 if not self._time_partition_tables: 

931 return [] 

932 has_dia_object_table = not (self._enable_replica and self._replica_skips_diaobjects) 

933 tables = [ApdbTables.DiaSource, ApdbTables.DiaForcedSource] 

934 if has_dia_object_table: 

935 tables.append(ApdbTables.DiaObject) 

936 return tables