Coverage for python / lsst / dax / apdb / cassandra / apdbMetadataCassandra.py: 23%
55 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-21 10:35 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-21 10:35 +0000
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
22from __future__ import annotations
24__all__ = ["ApdbMetadataCassandra"]
26from collections.abc import Generator
27from typing import Any
29from ..apdbMetadata import ApdbMetadata
30from .cassandra_utils import StatementFactory
31from .queries import Column as C # noqa: N817
32from .queries import ColumnExpr, Delete, Insert, Select
35class ApdbMetadataCassandra(ApdbMetadata):
36 """Implementation of `ApdbMetadata` for Cassandra backend.
38 Parameters
39 ----------
40 session : `cassandra.cluster.Session`
41 Cassandra session instance.
42 schema : `ApdbSqlSchema`
43 Object providing access to schema details.
44 """
46 def __init__(self, session: Any, table_name: str, keyspace: str, read_profile: str, write_profile: str):
47 self._session = session
48 self._keyspace = keyspace
49 self._table = table_name
50 self._read_profile = read_profile
51 self._write_profile = write_profile
52 self._part = 0 # Partition for all rows
53 self._stmt_factory = StatementFactory(session)
55 def get(self, key: str, default: str | None = None) -> str | None:
56 # Docstring is inherited.
57 query = Select(self._keyspace, self._table, ["value"])
58 query = query.where((C("meta_part") == self._part) & (C("name") == key))
59 stmt, params = self._stmt_factory.with_params(query)
60 result = self._session.execute(stmt, params, execution_profile=self._read_profile)
61 if (row := result.one()) is not None:
62 return row[0]
63 else:
64 return default
66 def set(self, key: str, value: str, *, force: bool = False) -> None:
67 # Docstring is inherited.
68 if not key or not value:
69 raise ValueError("name and value cannot be empty")
70 query = Insert(self._keyspace, self._table, ("meta_part", "name", "value"))
71 stmt = self._stmt_factory(query)
72 if not force and self.get(key) is not None:
73 raise KeyError(f"Metadata key {key!r} already exists")
74 # Race is still possible between check and insert.
75 self._session.execute(stmt, (self._part, key, value), execution_profile=self._write_profile)
77 def delete(self, key: str) -> bool:
78 # Docstring is inherited.
79 if not key:
80 raise ValueError("name cannot be empty")
81 query = (
82 Delete(self._keyspace, self._table).where(C("meta_part") == self._part).where(C("name") == key)
83 )
84 stmt, params = self._stmt_factory.with_params(query)
85 # Cassandra cannot tell how many rows are deleted, just check if row
86 # exists now.
87 exists = self.get(key) is not None
88 # Race is still possible between check and remove.
89 self._session.execute(stmt, params, execution_profile=self._write_profile)
90 return exists
92 def items(self) -> Generator[tuple[str, str], None, None]:
93 # Docstring is inherited.
94 query = Select(self._keyspace, self._table, ("name", "value"))
95 query = query.where(C("meta_part") == self._part)
96 stmt, params = self._stmt_factory.with_params(query)
97 result = self._session.execute(stmt, params, execution_profile=self._read_profile)
98 for row in result:
99 yield tuple(row)
101 def empty(self) -> bool:
102 # Docstring is inherited.
103 query = Select(self._keyspace, self._table, [ColumnExpr("count(*)")])
104 query = query.where(C("meta_part") == self._part)
105 stmt, params = self._stmt_factory.with_params(query)
106 result = self._session.execute(stmt, params, execution_profile=self._read_profile)
107 row = result.one()
108 return row[0] == 0