Coverage for python / lsst / dax / apdb / cassandra / apdbMetadataCassandra.py: 24%

44 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:46 +0000

1# This file is part of dax_apdb. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22from __future__ import annotations 

23 

24__all__ = ["ApdbMetadataCassandra"] 

25 

26from collections.abc import Generator 

27from typing import Any 

28 

29from ..apdbMetadata import ApdbMetadata 

30from .cassandra_utils import PreparedStatementCache, quote_id 

31 

32 

33class ApdbMetadataCassandra(ApdbMetadata): 

34 """Implementation of `ApdbMetadata` for Cassandra backend. 

35 

36 Parameters 

37 ---------- 

38 session : `cassandra.cluster.Session` 

39 Cassandra session instance. 

40 schema : `ApdbSqlSchema` 

41 Object providing access to schema details. 

42 """ 

43 

44 def __init__(self, session: Any, table_name: str, keyspace: str, read_profile: str, write_profile: str): 

45 self._session = session 

46 self._read_profile = read_profile 

47 self._write_profile = write_profile 

48 self._part = 0 # Partition for all rows 

49 self._preparer = PreparedStatementCache(session) 

50 self._table_clause = f"{quote_id(keyspace)}.{quote_id(table_name)}" 

51 

52 def get(self, key: str, default: str | None = None) -> str | None: 

53 # Docstring is inherited. 

54 query = f"SELECT value FROM {self._table_clause} WHERE meta_part = ? AND name = ?" 

55 result = self._session.execute( 

56 self._preparer.prepare(query), (self._part, key), execution_profile=self._read_profile 

57 ) 

58 if (row := result.one()) is not None: 

59 return row[0] 

60 else: 

61 return default 

62 

63 def set(self, key: str, value: str, *, force: bool = False) -> None: 

64 # Docstring is inherited. 

65 if not key or not value: 

66 raise ValueError("name and value cannot be empty") 

67 query = f"INSERT INTO {self._table_clause} (meta_part, name, value) VALUES (?, ?, ?)" 

68 if not force and self.get(key) is not None: 

69 raise KeyError(f"Metadata key {key!r} already exists") 

70 # Race is still possible between check and insert. 

71 self._session.execute( 

72 self._preparer.prepare(query), (self._part, key, value), execution_profile=self._write_profile 

73 ) 

74 

75 def delete(self, key: str) -> bool: 

76 # Docstring is inherited. 

77 if not key: 

78 raise ValueError("name cannot be empty") 

79 query = f"DELETE FROM {self._table_clause} WHERE meta_part = ? AND name = ?" 

80 # Cassandra cannot tell how many rows are deleted, just check if row 

81 # exists now. 

82 exists = self.get(key) is not None 

83 # Race is still possible between check and remove. 

84 self._session.execute( 

85 self._preparer.prepare(query), (self._part, key), execution_profile=self._write_profile 

86 ) 

87 return exists 

88 

89 def items(self) -> Generator[tuple[str, str], None, None]: 

90 # Docstring is inherited. 

91 query = f"SELECT name, value FROM {self._table_clause} WHERE meta_part = ?" 

92 result = self._session.execute( 

93 self._preparer.prepare(query), (self._part,), execution_profile=self._read_profile 

94 ) 

95 for row in result: 

96 yield tuple(row) 

97 

98 def empty(self) -> bool: 

99 # Docstring is inherited. 

100 query = f"SELECT count(*) FROM {self._table_clause} WHERE meta_part = ?" 

101 result = self._session.execute( 

102 self._preparer.prepare(query), (self._part,), execution_profile=self._read_profile 

103 ) 

104 row = result.one() 

105 return row[0] == 0