Coverage for python / lsst / dax / apdb / cassandra / apdbCassandraSchema.py: 18%
328 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 08:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 08:48 +0000
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
22from __future__ import annotations
24__all__ = ["ApdbCassandraSchema", "CreateTableOptions", "TableOptions"]
26import dataclasses
27import enum
28import logging
29from collections.abc import Mapping
30from typing import TYPE_CHECKING, cast
32import felis.datamodel
33import pydantic
35from .. import schema_model
36from ..apdbSchema import ApdbTables
38if TYPE_CHECKING:
39 import cassandra.cluster
41 from ..schema_model import Table
42 from .config import ApdbCassandraTimePartitionRange
45_LOG = logging.getLogger(__name__)
48class InconsistentSchemaError(RuntimeError):
49 """Exception raised when schema state is inconsistent."""
52class TableOptions(pydantic.BaseModel):
53 """Set of per-table options for creating Cassandra tables."""
55 model_config = pydantic.ConfigDict(extra="forbid")
57 tables: list[str]
58 """List of table names for which the options should be applied."""
60 options: str
63class CreateTableOptions(pydantic.BaseModel):
64 """Set of options for creating Cassandra tables."""
66 model_config = pydantic.ConfigDict(extra="forbid")
68 table_options: list[TableOptions] = pydantic.Field(default_factory=list)
69 """Collection of per-table options."""
71 default_table_options: str = ""
72 """Default options used for tables that are not in the above list."""
74 def get_options(self, table_name: str) -> str:
75 """Find table options for a given table name."""
76 for table_options in self.table_options:
77 if table_name in table_options.tables:
78 return table_options.options
79 return self.default_table_options
82@enum.unique
83class ExtraTables(enum.Enum):
84 """Names of the extra tables used by Cassandra implementation.
86 Chunk tables exist in two versions now to support both old and new schema.
87 Eventually we will drop support for old tables.
88 """
90 ApdbReplicaChunks = "ApdbReplicaChunks"
91 """Name of the table for replica chunk records."""
93 DiaObjectChunks = "DiaObjectChunks"
94 """Name of the table for DIAObject chunk data."""
96 DiaSourceChunks = "DiaSourceChunks"
97 """Name of the table for DIASource chunk data."""
99 DiaForcedSourceChunks = "DiaForcedSourceChunks"
100 """Name of the table for DIAForcedSource chunk data."""
102 DiaObjectChunks2 = "DiaObjectChunks2"
103 """Name of the table for DIAObject chunk data."""
105 DiaSourceChunks2 = "DiaSourceChunks2"
106 """Name of the table for DIASource chunk data."""
108 DiaForcedSourceChunks2 = "DiaForcedSourceChunks2"
109 """Name of the table for DIAForcedSource chunk data."""
111 ApdbUpdateRecordChunks = "ApdbUpdateRecordChunks"
112 """Name of the table for ApdbUpdateRecord chunk data."""
114 DiaSourceToPartition = "DiaSourceToPartition"
115 """Maps diaSourceId to its partition values (pixel and time)."""
117 DiaObjectLastToPartition = "DiaObjectLastToPartition"
118 """Maps last diaObjectId version to its partition (pixel)."""
120 ApdbVisitDetector = "ApdbVisitDetector"
121 """Records attempted processing of visit/detector."""
123 DiaObjectDedup = "DiaObjectDedup"
124 """DiaObject records used for deduplication."""
126 def table_name(self, prefix: str = "", time_partition: int | None = None) -> str:
127 """Return full table name.
129 Parameters
130 ----------
131 prefix : `str`, optional
132 Optional prefix for table name.
133 time_partition : `int`, optional
134 Optional time partition, only used for tables that support time
135 patitioning.
136 """
137 return f"{prefix}{self.value}"
139 @classmethod
140 def replica_chunk_tables(cls, has_subchunks: bool) -> Mapping[ApdbTables, ExtraTables]:
141 """Return mapping of APDB tables to corresponding replica chunks
142 tables.
143 """
144 if has_subchunks:
145 return {
146 ApdbTables.DiaObject: cls.DiaObjectChunks2,
147 ApdbTables.DiaSource: cls.DiaSourceChunks2,
148 ApdbTables.DiaForcedSource: cls.DiaForcedSourceChunks2,
149 }
150 else:
151 return {
152 ApdbTables.DiaObject: cls.DiaObjectChunks,
153 ApdbTables.DiaSource: cls.DiaSourceChunks,
154 ApdbTables.DiaForcedSource: cls.DiaForcedSourceChunks,
155 }
158class ApdbCassandraSchema:
159 """Class for management of APDB schema.
161 Parameters
162 ----------
163 session : `cassandra.cluster.Session`
164 Cassandra session object
165 keyspace : `str`
166 Keyspace name for all tables.
167 schema_file : `str`
168 Name of the YAML schema file.
169 schema_name : `str`, optional
170 Name of the schema in YAML files.
171 prefix : `str`, optional
172 Prefix to add to all schema elements.
173 time_partition_tables : `bool`
174 If `True` then schema will have a separate table for each time
175 partition.
176 enable_replica : `bool`, optional
177 If `True` then use additional tables for replica chunks.
178 has_chunk_sub_partitions : `bool`, optional
179 If `True` then replica chunk tables have sub-partition columns. Only
180 used if ``enable_replica`` is `True`.
181 """
183 _type_map = {
184 felis.datamodel.DataType.double: "DOUBLE",
185 felis.datamodel.DataType.float: "FLOAT",
186 felis.datamodel.DataType.timestamp: "TIMESTAMP",
187 felis.datamodel.DataType.long: "BIGINT",
188 felis.datamodel.DataType.int: "INT",
189 felis.datamodel.DataType.short: "SMALLINT",
190 felis.datamodel.DataType.byte: "TINYINT",
191 felis.datamodel.DataType.binary: "BLOB",
192 felis.datamodel.DataType.char: "TEXT",
193 felis.datamodel.DataType.string: "TEXT",
194 felis.datamodel.DataType.unicode: "TEXT",
195 felis.datamodel.DataType.text: "TEXT",
196 felis.datamodel.DataType.boolean: "BOOLEAN",
197 schema_model.ExtraDataTypes.UUID: "UUID",
198 }
199 """Map YAML column types to Cassandra"""
201 _time_partitioned_tables = [
202 ApdbTables.DiaObject,
203 ApdbTables.DiaSource,
204 ApdbTables.DiaForcedSource,
205 ]
206 _spatially_partitioned_tables = [ApdbTables.DiaObjectLast]
208 def __init__(
209 self,
210 session: cassandra.cluster.Session,
211 keyspace: str,
212 table_schemas: Mapping[ApdbTables, Table],
213 prefix: str = "",
214 time_partition_tables: bool = False,
215 enable_replica: bool = False,
216 replica_skips_diaobjects: bool = False,
217 has_chunk_sub_partitions: bool = True,
218 ):
219 self._session = session
220 self._keyspace = keyspace
221 self._table_schemas = table_schemas
222 self._prefix = prefix
223 self._time_partition_tables = time_partition_tables
224 self._enable_replica = enable_replica
225 self._replica_skips_diaobjects = replica_skips_diaobjects
226 self._has_chunk_sub_partitions = has_chunk_sub_partitions
228 self._apdb_tables = self._apdb_tables_schema(time_partition_tables)
229 self._extra_tables = self._extra_tables_schema(self._apdb_tables)
231 def _apdb_tables_schema(self, time_partition_tables: bool) -> Mapping[ApdbTables, schema_model.Table]:
232 """Generate schema for regular APDB tables."""
233 apdb_tables: dict[ApdbTables, schema_model.Table] = {}
235 # add columns and index for partitioning.
236 for table, apdb_table_def in self._table_schemas.items():
237 part_columns = []
238 add_columns = []
239 primary_key = apdb_table_def.primary_key[:]
240 if table in self._spatially_partitioned_tables:
241 # DiaObjectLast does not need temporal partitioning
242 part_columns = ["apdb_part"]
243 add_columns = part_columns
244 elif table in self._time_partitioned_tables:
245 if time_partition_tables:
246 part_columns = ["apdb_part"]
247 else:
248 part_columns = ["apdb_part", "apdb_time_part"]
249 add_columns = part_columns
250 elif table is ApdbTables.SSObject:
251 # For SSObject there is no natural partition key but we have
252 # to partition it because there are too many of them. I'm
253 # going to partition on its primary key (and drop separate
254 # primary key index).
255 part_columns = ["ssObjectId"]
256 primary_key = []
257 elif table is ApdbTables.metadata:
258 # Metadata is in one partition because we want to read all of
259 # it in one query, add an extra column for partition.
260 part_columns = ["meta_part"]
261 add_columns = part_columns
262 else:
263 # TODO: Do not know what to do with the other tables
264 continue
266 column_defs = []
267 if add_columns:
268 column_defs = [
269 schema_model.Column(
270 id=f"#{name}", name=name, datatype=felis.datamodel.DataType.long, nullable=False
271 )
272 for name in add_columns
273 ]
275 if table is ApdbTables.DiaObjectLast:
276 # In the past DiaObjectLast table did not have validityStart.
277 validity_start_column = "validityStartMjdTai"
278 try:
279 if not self.check_column(ApdbTables.DiaObjectLast, validity_start_column):
280 for column in apdb_table_def.columns:
281 if column.name == validity_start_column:
282 apdb_table_def.columns.remove(column)
283 break
284 except LookupError:
285 # Table has not been created yet.
286 pass
288 annotations = dict(apdb_table_def.annotations)
289 annotations["cassandra:apdb_column_names"] = [column.name for column in apdb_table_def.columns]
290 if part_columns:
291 annotations["cassandra:partitioning_columns"] = part_columns
293 apdb_tables[table] = schema_model.Table(
294 id=apdb_table_def.id,
295 name=apdb_table_def.name,
296 columns=column_defs + apdb_table_def.columns,
297 primary_key=primary_key,
298 indexes=[],
299 constraints=[],
300 annotations=annotations,
301 )
303 return apdb_tables
305 def _extra_tables_schema(
306 self, apdb_tables: Mapping[ApdbTables, schema_model.Table]
307 ) -> Mapping[ExtraTables, schema_model.Table]:
308 """Generate schema for extra tables."""
309 extra_tables: dict[ExtraTables, schema_model.Table] = {}
311 columns = [
312 schema_model.Column(
313 id="#visit",
314 name="visit",
315 datatype=felis.datamodel.DataType.long,
316 nullable=False,
317 ),
318 schema_model.Column(
319 id="#detector",
320 name="detector",
321 datatype=felis.datamodel.DataType.short,
322 nullable=False,
323 ),
324 ]
325 extra_tables[ExtraTables.ApdbVisitDetector] = schema_model.Table(
326 id="#" + ExtraTables.ApdbVisitDetector.value,
327 name=ExtraTables.ApdbVisitDetector.table_name(self._prefix),
328 columns=columns,
329 primary_key=[],
330 indexes=[],
331 constraints=[],
332 annotations={"cassandra:partitioning_columns": ["visit", "detector"]},
333 )
335 # DiaObjectDedup table contains a subset of columns of DiaObject, the
336 # table is used for deduplication, it is partitioned on some random
337 # key. This column list defines non-PK columns in this table, PK
338 # columns from DiaObject are the same as in DiaObject.
339 dedup_column_names = {"ra", "dec", "nDiaSources", "firstDiaSourceMjdTai"}
340 columns = [
341 schema_model.Column(
342 id="#dedup_part",
343 name="dedup_part",
344 datatype=felis.datamodel.DataType.short,
345 nullable=False,
346 )
347 ]
348 primary_keys = []
349 found_column_names = []
350 # Coly column definitions from DiaObject.
351 dia_object_table = apdb_tables[ApdbTables.DiaObject]
352 for column in dia_object_table.columns:
353 if column in dia_object_table.primary_key or column.name in dedup_column_names:
354 cloned_column = dataclasses.replace(column, table=None)
355 columns.append(cloned_column)
356 found_column_names.append(column.name)
357 if column in dia_object_table.primary_key:
358 primary_keys.append(cloned_column)
360 # Check that we found all expected columns.
361 missing_columns = dedup_column_names - set(found_column_names)
362 if missing_columns:
363 raise LookupError(f"Expected columns not found in DiaObject table: {missing_columns}")
365 extra_tables[ExtraTables.DiaObjectDedup] = schema_model.Table(
366 id="#" + ExtraTables.DiaObjectDedup.value,
367 name=ExtraTables.DiaObjectDedup.table_name(self._prefix),
368 columns=columns,
369 primary_key=primary_keys,
370 indexes=[],
371 constraints=[],
372 annotations={
373 "cassandra:partitioning_columns": ["dedup_part"],
374 "cassandra:apdb_column_names": found_column_names,
375 },
376 )
378 # This table maps DiaSource ID to its partitions in DiaSource table and
379 # DiaSourceChunks tables.
380 extra_tables[ExtraTables.DiaSourceToPartition] = schema_model.Table(
381 id="#" + ExtraTables.DiaSourceToPartition.value,
382 name=ExtraTables.DiaSourceToPartition.table_name(self._prefix),
383 columns=[
384 schema_model.Column(
385 id="#diaSourceId",
386 name="diaSourceId",
387 datatype=felis.datamodel.DataType.long,
388 nullable=False,
389 ),
390 schema_model.Column(
391 id="#apdb_part", name="apdb_part", datatype=felis.datamodel.DataType.long, nullable=False
392 ),
393 schema_model.Column(
394 id="#apdb_time_part",
395 name="apdb_time_part",
396 datatype=felis.datamodel.DataType.int,
397 nullable=False,
398 ),
399 schema_model.Column(
400 id="#apdb_replica_chunk",
401 name="apdb_replica_chunk",
402 datatype=felis.datamodel.DataType.long,
403 nullable=True,
404 ),
405 schema_model.Column(
406 id="#apdb_replica_subchunk",
407 name="apdb_replica_subchunk",
408 datatype=felis.datamodel.DataType.int,
409 nullable=True,
410 ),
411 ],
412 primary_key=[],
413 indexes=[],
414 constraints=[],
415 annotations={"cassandra:partitioning_columns": ["diaSourceId"]},
416 )
418 # This table maps diaObjectId to its partition in DiaObjectLast table.
419 extra_tables[ExtraTables.DiaObjectLastToPartition] = schema_model.Table(
420 id="#" + ExtraTables.DiaObjectLastToPartition.value,
421 name=ExtraTables.DiaObjectLastToPartition.table_name(self._prefix),
422 columns=[
423 schema_model.Column(
424 id="#diaObjectId",
425 name="diaObjectId",
426 datatype=felis.datamodel.DataType.long,
427 nullable=False,
428 ),
429 schema_model.Column(
430 id="#apdb_part", name="apdb_part", datatype=felis.datamodel.DataType.long, nullable=False
431 ),
432 ],
433 primary_key=[],
434 indexes=[],
435 constraints=[],
436 annotations={"cassandra:partitioning_columns": ["diaObjectId"]},
437 )
439 if not self._enable_replica:
440 return extra_tables
442 replica_chunk_column = schema_model.Column(
443 id="#apdb_replica_chunk",
444 name="apdb_replica_chunk",
445 datatype=felis.datamodel.DataType.long,
446 nullable=False,
447 )
449 replica_chunk_columns = [replica_chunk_column]
450 if self._has_chunk_sub_partitions:
451 replica_chunk_columns.append(
452 schema_model.Column(
453 id="#apdb_replica_subchunk",
454 name="apdb_replica_subchunk",
455 datatype=felis.datamodel.DataType.int,
456 nullable=False,
457 )
458 )
460 # Table containing replica chunks, this one is not partitioned, but
461 # partition key must be defined.
462 extra_tables[ExtraTables.ApdbReplicaChunks] = schema_model.Table(
463 id="#" + ExtraTables.ApdbReplicaChunks.value,
464 name=ExtraTables.ApdbReplicaChunks.table_name(self._prefix),
465 columns=[
466 schema_model.Column(
467 id="#partition", name="partition", datatype=felis.datamodel.DataType.int, nullable=False
468 ),
469 replica_chunk_column,
470 schema_model.Column(
471 id="#last_update_time",
472 name="last_update_time",
473 datatype=felis.datamodel.DataType.timestamp,
474 nullable=False,
475 ),
476 schema_model.Column(
477 id="#unique_id",
478 name="unique_id",
479 datatype=schema_model.ExtraDataTypes.UUID,
480 nullable=False,
481 ),
482 schema_model.Column(
483 id="#has_subchunks",
484 name="has_subchunks",
485 datatype=felis.datamodel.DataType.boolean,
486 nullable=True,
487 ),
488 ],
489 primary_key=[replica_chunk_column],
490 indexes=[],
491 constraints=[],
492 annotations={"cassandra:partitioning_columns": ["partition"]},
493 )
495 replica_chunk_tables = ExtraTables.replica_chunk_tables(self._has_chunk_sub_partitions)
496 for apdb_table_enum, chunk_table_enum in replica_chunk_tables.items():
497 apdb_table_def = self._table_schemas[apdb_table_enum]
499 extra_tables[chunk_table_enum] = schema_model.Table(
500 id="#" + chunk_table_enum.value,
501 name=chunk_table_enum.table_name(self._prefix),
502 columns=replica_chunk_columns + apdb_table_def.columns,
503 primary_key=apdb_table_def.primary_key[:],
504 indexes=[],
505 constraints=[],
506 annotations={
507 "cassandra:partitioning_columns": [column.name for column in replica_chunk_columns],
508 "cassandra:apdb_column_names": [column.name for column in apdb_table_def.columns],
509 },
510 )
512 # Table with replica chunk data for ApdbUpdateRecord.
513 columns = [
514 schema_model.Column(
515 id=f"#{ExtraTables.ApdbUpdateRecordChunks.value}.update_time_ns",
516 name="update_time_ns",
517 datatype=felis.datamodel.DataType.long,
518 nullable=False,
519 ),
520 schema_model.Column(
521 id=f"#{ExtraTables.ApdbUpdateRecordChunks.value}.update_order",
522 name="update_order",
523 datatype=felis.datamodel.DataType.int,
524 nullable=False,
525 ),
526 schema_model.Column(
527 id=f"#{ExtraTables.ApdbUpdateRecordChunks.value}.update_unique_id",
528 name="update_unique_id",
529 datatype=schema_model.ExtraDataTypes.UUID,
530 nullable=False,
531 ),
532 schema_model.Column(
533 id=f"#{ExtraTables.ApdbUpdateRecordChunks.value}.update_payload",
534 name="update_payload",
535 datatype=felis.datamodel.DataType.string,
536 nullable=False,
537 ),
538 ]
539 extra_tables[ExtraTables.ApdbUpdateRecordChunks] = schema_model.Table(
540 id=f"#{ExtraTables.ApdbUpdateRecordChunks.value}",
541 name=ExtraTables.ApdbUpdateRecordChunks.table_name(self._prefix),
542 columns=replica_chunk_columns + columns,
543 primary_key=columns[:3],
544 indexes=[],
545 constraints=[],
546 annotations={
547 "cassandra:partitioning_columns": [column.name for column in replica_chunk_columns],
548 },
549 )
551 return extra_tables
553 @property
554 def replication_enabled(self) -> bool:
555 """True when replication is enabled (`bool`)."""
556 return self._enable_replica
558 def empty(self) -> bool:
559 """Return True if database schema is empty.
561 Returns
562 -------
563 empty : `bool`
564 `True` if none of the required APDB tables exist in the database,
565 `False` if all required tables exist.
567 Raises
568 ------
569 InconsistentSchemaError
570 Raised when some of the required tables exist but not all.
571 """
572 query = "SELECT table_name FROM system_schema.tables WHERE keyspace_name = %s"
573 result = self._session.execute(query, (self._keyspace,))
574 table_names = {row[0] for row in result.all()}
576 existing_tables = []
577 missing_tables = []
578 for table_enum in self._apdb_tables:
579 table_name = table_enum.table_name(self._prefix)
580 if self._time_partition_tables and table_enum in self._time_partitioned_tables:
581 # Check prefix for time-partitioned tables.
582 exists = any(table.startswith(f"{table_name}_") for table in table_names)
583 else:
584 exists = table_name in table_names
585 if exists:
586 existing_tables.append(table_name)
587 else:
588 missing_tables.append(table_name)
590 if not missing_tables:
591 return False
592 elif not existing_tables:
593 return True
594 else:
595 raise InconsistentSchemaError(
596 f"Only some required APDB tables exist: {existing_tables}, missing tables: {missing_tables}"
597 )
599 def existing_tables(self, *args: ApdbTables) -> dict[ApdbTables, list[str]]:
600 """Return the list of existing table names for given table.
602 Parameters
603 ----------
604 *args : `ApdbTables`
605 Tables for which to return their existing table names.
607 Returns
608 -------
609 tables : `dict` [`ApdbTables`, `list`[`str`]]
610 Mapping of the APDB table to the list of the existing table names.
611 More than one name can be present in the list if configuration
612 specifies per-partition tables.
613 """
614 if self._time_partition_tables and not set(args).isdisjoint(self._time_partitioned_tables):
615 # Some of the tables should have per-partition tables.
616 query = "SELECT table_name FROM system_schema.tables WHERE keyspace_name = %s"
617 result = self._session.execute(query, (self._keyspace,))
618 table_names = {row[0] for row in result.all()}
620 tables = {}
621 for table_enum in args:
622 base_name = table_enum.table_name(self._prefix)
623 if table_enum in self._time_partitioned_tables:
624 tables[table_enum] = [table for table in table_names if table.startswith(f"{base_name}_")]
625 else:
626 tables[table_enum] = [base_name]
627 return tables
628 else:
629 # Do not check that they exist, we know that they should.
630 return {table_enum: [table_enum.table_name(self._prefix)] for table_enum in args}
632 def check_column(self, table_enum: ApdbTables | ExtraTables, column: str) -> bool:
633 """Check for the existence of the column in a given table.
635 Parameters
636 ----------
637 table_enum : `ApdbTables` or `ExtraTables`
638 Table to check for a column.
639 column : `str`
640 Name of the column to check.
642 Returns
643 -------
644 exists : `bool`
645 True if column exists, False otherwise.
647 Raises
648 ------
649 LookupError
650 Raised if table does not exist.
651 """
652 if self._time_partition_tables and table_enum in self._time_partitioned_tables:
653 query = (
654 "SELECT table_name FROM system_schema.columns WHERE keyspace_name = %s AND column_name = %s "
655 "ALLOW FILTERING"
656 )
657 result = self._session.execute(query, (self._keyspace, column))
658 base_name = table_enum.table_name(self._prefix)
659 for row in result.all():
660 table_name = row[0]
661 if table_name.startswith(f"{base_name}_"):
662 return True
663 # Check that there is any table with matching name.
664 assert isinstance(table_enum, ApdbTables), "Can only be ApdbTables"
665 tables = self.existing_tables(table_enum)
666 if not tables[table_enum]:
667 raise LookupError(f"Table {base_name} does not exist.")
668 return False
669 else:
670 table_name = table_enum.table_name(self._prefix)
671 query = (
672 "SELECT column_name FROM system_schema.columns WHERE keyspace_name = %s AND table_name = %s"
673 )
674 result = self._session.execute(query, (self._keyspace, table_name))
675 rows = list(result)
676 if not rows:
677 raise LookupError(f"Table {table_name} does not exist.")
678 for row in rows:
679 if row.column_name == column:
680 return True
681 return False
683 def tableName(self, table_name: ApdbTables | ExtraTables, time_partition: int | None = None) -> str:
684 """Return Cassandra table name for APDB table.
686 Parameters
687 ----------
688 table_name : `ApdbTables` or `ExtraTables`
689 Table enum for which to generate table name.
690 time_partition : `int`, optional
691 Optional time partition, only used for tables that support time
692 patitioning.
693 """
694 return table_name.table_name(self._prefix, time_partition)
696 def keyspace(self) -> str:
697 """Return Cassandra keyspace for APDB tables."""
698 return self._keyspace
700 def getColumnMap(self, table_name: ApdbTables | ExtraTables) -> Mapping[str, schema_model.Column]:
701 """Return mapping of column names to Column definitions.
703 Parameters
704 ----------
705 table_name : `ApdbTables`
706 One of known APDB table names.
708 Returns
709 -------
710 column_map : `dict`
711 Mapping of column names to `ColumnDef` instances.
712 """
713 table_schema = self._table_schema(table_name)
714 cmap = {column.name: column for column in table_schema.columns}
715 return cmap
717 def apdbColumnNames(self, table_name: ApdbTables | ExtraTables) -> list[str]:
718 """Return a list of columns names for a table as defined in APDB
719 schema.
721 Parameters
722 ----------
723 table_name : `ApdbTables` or `ExtraTables`
724 Enum for a table in APDB schema.
726 Returns
727 -------
728 columns : `list` of `str`
729 Names of regular columns in the table.
730 """
731 table_schema = self._table_schema(table_name)
732 return table_schema.annotations["cassandra:apdb_column_names"]
734 def partitionColumns(self, table_name: ApdbTables | ExtraTables) -> list[str]:
735 """Return a list of columns used for table partitioning.
737 Parameters
738 ----------
739 table_name : `ApdbTables`
740 Table name in APDB schema
742 Returns
743 -------
744 columns : `list` of `str`
745 Names of columns used for partitioning.
746 """
747 table_schema = self._table_schema(table_name)
748 return table_schema.annotations.get("cassandra:partitioning_columns", [])
750 def clusteringColumns(self, table_name: ApdbTables | ExtraTables) -> list[str]:
751 """Return a list of columns used for clustering.
753 Parameters
754 ----------
755 table_name : `ApdbTables`
756 Table name in APDB schema
758 Returns
759 -------
760 columns : `list` of `str`
761 Names of columns for used for clustering.
762 """
763 table_schema = self._table_schema(table_name)
764 return [column.name for column in table_schema.primary_key]
766 def makeSchema(
767 self,
768 *,
769 drop: bool = False,
770 part_range: ApdbCassandraTimePartitionRange | None = None,
771 replication_factor: int | None = None,
772 table_options: CreateTableOptions | None = None,
773 ) -> None:
774 """Create or re-create all tables.
776 Parameters
777 ----------
778 drop : `bool`
779 If True then drop tables before creating new ones. Note that
780 only tables are dropped and not the whole keyspace.
781 part_range : `ApdbCassandraTimePartitionRange` or `None`
782 Start and end partition number for time partitions. Used to create
783 per-partition DiaObject, DiaSource, and DiaForcedSource tables. If
784 `None` then per-partition tables are not created.
785 replication_factor : `int`, optional
786 Replication factor used when creating new keyspace, if keyspace
787 already exists its replication factor is not changed.
788 """
789 # Try to create keyspace if it does not exist
790 if replication_factor is None:
791 replication_factor = 1
793 # If keyspace exists check its replication factor.
794 query = "SELECT replication FROM system_schema.keyspaces WHERE keyspace_name = %s"
795 result = self._session.execute(query, (self._keyspace,))
796 if row := result.one():
797 # Check replication factor, this depends strategy class.
798 repl_config = cast(Mapping[str, str], row[0])
799 _, _, repl_class = repl_config["class"].rpartition(".")
800 if repl_class == "SimpleStrategy":
801 current_repl = {int(repl_config["replication_factor"])}
802 elif repl_class == "NetworkTopologyStrategy":
803 # There may by multiple datacenters with different replication
804 # factors.
805 current_repl = {
806 int(val) for key, val in repl_config.items() if key != "class" and val.isdecimal()
807 }
808 else:
809 raise ValueError(f"Unexpected replication strategy: {repl_class}")
810 if replication_factor not in current_repl:
811 raise ValueError(
812 f"New replication factor {replication_factor} differs from the replication factor "
813 f"for already existing keyspace: {current_repl}"
814 )
815 else:
816 # Need a new keyspace.
817 query = (
818 f'CREATE KEYSPACE "{self._keyspace}"'
819 " WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': "
820 f"{replication_factor}"
821 "}"
822 )
823 self._session.execute(query)
825 table_options = self._update_table_options(table_options)
826 for table in self._apdb_tables:
827 if table is ApdbTables.DiaObject and self._enable_replica and self._replica_skips_diaobjects:
828 continue
829 if table in (ApdbTables.SSObject, ApdbTables.SSSource):
830 # SSObject/SSSource do not exist in APDB, but are defined in
831 # ApdbTables. The reason is that AP wants to have schema of
832 # these tables for alert-related business.
833 continue
834 self._makeTableSchema(table, drop, part_range, table_options)
835 for extra_table in self._extra_tables:
836 self._makeTableSchema(extra_table, drop, part_range, table_options)
838 def _update_table_options(self, options: CreateTableOptions | None) -> CreateTableOptions | None:
839 """Extend table options with options for internal tables."""
840 # We want to add TTL option to ApdbVisitDetector table.
841 if not options:
842 options = CreateTableOptions()
844 # set both TTL and gc_grace_seconds to 24h.
845 options.table_options.append(
846 TableOptions(
847 tables=[ExtraTables.ApdbVisitDetector.table_name(self._prefix)],
848 options="default_time_to_live=86400 AND gc_grace_seconds=86400",
849 )
850 )
852 return options
854 def _makeTableSchema(
855 self,
856 table: ApdbTables | ExtraTables,
857 drop: bool = False,
858 part_range: ApdbCassandraTimePartitionRange | None = None,
859 table_options: CreateTableOptions | None = None,
860 ) -> None:
861 _LOG.debug("Making table %s", table)
863 fullTable = table.table_name(self._prefix)
865 table_list = [fullTable]
866 if part_range is not None:
867 if table in self._time_partitioned_tables:
868 table_list = [table.table_name(self._prefix, part) for part in part_range.range()]
870 if drop:
871 queries = [f'DROP TABLE IF EXISTS "{self._keyspace}"."{table_name}"' for table_name in table_list]
872 futures = [self._session.execute_async(query, timeout=None) for query in queries]
873 for future in futures:
874 _LOG.debug("wait for query: %s", future.query)
875 future.result()
876 _LOG.debug("query finished: %s", future.query)
878 queries = []
879 options = table_options.get_options(fullTable).strip() if table_options else None
880 for table_name in table_list:
881 if_not_exists = "" if drop else "IF NOT EXISTS"
882 columns = ", ".join(self._tableColumns(table))
883 query = f'CREATE TABLE {if_not_exists} "{self._keyspace}"."{table_name}" ({columns})'
884 if options:
885 query = f"{query} WITH {options}"
886 _LOG.debug("query: %s", query)
887 queries.append(query)
888 futures = [self._session.execute_async(query, timeout=None) for query in queries]
889 for future in futures:
890 _LOG.debug("wait for query: %s", future.query)
891 future.result()
892 _LOG.debug("query finished: %s", future.query)
894 def _tableColumns(self, table_name: ApdbTables | ExtraTables) -> list[str]:
895 """Return set of columns in a table
897 Parameters
898 ----------
899 table_name : `ApdbTables`
900 Name of the table.
902 Returns
903 -------
904 column_defs : `list`
905 List of strings in the format "column_name type".
906 """
907 table_schema = self._table_schema(table_name)
909 # must have partition columns and clustering columns
910 part_columns = table_schema.annotations.get("cassandra:partitioning_columns", [])
911 clust_columns = [column.name for column in table_schema.primary_key]
912 _LOG.debug("part_columns: %s", part_columns)
913 _LOG.debug("clust_columns: %s", clust_columns)
914 if not part_columns:
915 raise ValueError(f"Table {table_name} configuration is missing partition index")
917 # all columns
918 column_defs = []
919 for column in table_schema.columns:
920 ctype = self._type_map[column.datatype]
921 column_defs.append(f'"{column.name}" {ctype}')
923 # primary key definition
924 part_columns = [f'"{col}"' for col in part_columns]
925 clust_columns = [f'"{col}"' for col in clust_columns]
926 if len(part_columns) > 1:
927 columns = ", ".join(part_columns)
928 part_columns = [f"({columns})"]
929 pkey = ", ".join(part_columns + clust_columns)
930 _LOG.debug("pkey: %s", pkey)
931 column_defs.append(f"PRIMARY KEY ({pkey})")
933 return column_defs
935 def _table_schema(self, table: ApdbTables | ExtraTables) -> schema_model.Table:
936 """Return schema definition for a table."""
937 if isinstance(table, ApdbTables):
938 table_schema = self._apdb_tables[table]
939 else:
940 table_schema = self._extra_tables[table]
941 return table_schema
943 def table_row_size(self, table: ApdbTables | ExtraTables) -> int:
944 """Return an estimate of the row size of a given table.
946 Parameters
947 ----------
948 table : `ApdbTables` or `ExtraTables`
950 Returns
951 -------
952 size : `int`
953 An estimate of a table row size.
955 Notes
956 -----
957 Returned size is not exact. When table has variable-size columns (e.g.
958 strings) may be incorrect. Stored data size or wire-level protocol size
959 can be smaller if some columns are not set or set to NULL.
960 """
961 table_schema = self._table_schema(table)
962 size = sum(column.size() for column in table_schema.columns)
963 return size
965 def time_partitioned_tables(self) -> list[ApdbTables]:
966 """Make the list of time-partitioned tables.
968 Returns
969 -------
970 tables : `list` [`ApdbTables`]
971 Tables the are time-partitioned.
972 """
973 if not self._time_partition_tables:
974 return []
975 has_dia_object_table = not (self._enable_replica and self._replica_skips_diaobjects)
976 tables = [ApdbTables.DiaSource, ApdbTables.DiaForcedSource]
977 if has_dia_object_table:
978 tables.append(ApdbTables.DiaObject)
979 return tables