Coverage for python / lsst / dax / apdb / cassandra / apdbMetadataCassandra.py: 23%

55 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-01 08:19 +0000

1# This file is part of dax_apdb. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22from __future__ import annotations 

23 

24__all__ = ["ApdbMetadataCassandra"] 

25 

26from collections.abc import Generator 

27from typing import Any 

28 

29from ..apdbMetadata import ApdbMetadata 

30from .cassandra_utils import StatementFactory 

31from .queries import Column as C # noqa: N817 

32from .queries import ColumnExpr, Delete, Insert, Select 

33 

34 

35class ApdbMetadataCassandra(ApdbMetadata): 

36 """Implementation of `ApdbMetadata` for Cassandra backend. 

37 

38 Parameters 

39 ---------- 

40 session : `cassandra.cluster.Session` 

41 Cassandra session instance. 

42 schema : `ApdbSqlSchema` 

43 Object providing access to schema details. 

44 """ 

45 

46 def __init__(self, session: Any, table_name: str, keyspace: str, read_profile: str, write_profile: str): 

47 self._session = session 

48 self._keyspace = keyspace 

49 self._table = table_name 

50 self._read_profile = read_profile 

51 self._write_profile = write_profile 

52 self._part = 0 # Partition for all rows 

53 self._stmt_factory = StatementFactory(session) 

54 

55 def get(self, key: str, default: str | None = None) -> str | None: 

56 # Docstring is inherited. 

57 query = Select(self._keyspace, self._table, ["value"]) 

58 query = query.where((C("meta_part") == self._part) & (C("name") == key)) 

59 stmt, params = self._stmt_factory.with_params(query) 

60 result = self._session.execute(stmt, params, execution_profile=self._read_profile) 

61 if (row := result.one()) is not None: 

62 return row[0] 

63 else: 

64 return default 

65 

66 def set(self, key: str, value: str, *, force: bool = False) -> None: 

67 # Docstring is inherited. 

68 if not key or not value: 

69 raise ValueError("name and value cannot be empty") 

70 query = Insert(self._keyspace, self._table, ("meta_part", "name", "value")) 

71 stmt = self._stmt_factory(query) 

72 if not force and self.get(key) is not None: 

73 raise KeyError(f"Metadata key {key!r} already exists") 

74 # Race is still possible between check and insert. 

75 self._session.execute(stmt, (self._part, key, value), execution_profile=self._write_profile) 

76 

77 def delete(self, key: str) -> bool: 

78 # Docstring is inherited. 

79 if not key: 

80 raise ValueError("name cannot be empty") 

81 query = ( 

82 Delete(self._keyspace, self._table).where(C("meta_part") == self._part).where(C("name") == key) 

83 ) 

84 stmt, params = self._stmt_factory.with_params(query) 

85 # Cassandra cannot tell how many rows are deleted, just check if row 

86 # exists now. 

87 exists = self.get(key) is not None 

88 # Race is still possible between check and remove. 

89 self._session.execute(stmt, params, execution_profile=self._write_profile) 

90 return exists 

91 

92 def items(self) -> Generator[tuple[str, str], None, None]: 

93 # Docstring is inherited. 

94 query = Select(self._keyspace, self._table, ("name", "value")) 

95 query = query.where(C("meta_part") == self._part) 

96 stmt, params = self._stmt_factory.with_params(query) 

97 result = self._session.execute(stmt, params, execution_profile=self._read_profile) 

98 for row in result: 

99 yield tuple(row) 

100 

101 def empty(self) -> bool: 

102 # Docstring is inherited. 

103 query = Select(self._keyspace, self._table, [ColumnExpr("count(*)")]) 

104 query = query.where(C("meta_part") == self._part) 

105 stmt, params = self._stmt_factory.with_params(query) 

106 result = self._session.execute(stmt, params, execution_profile=self._read_profile) 

107 row = result.one() 

108 return row[0] == 0