Coverage for tests / test_apdbCassandra.py: 51%

74 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:46 +0000

1# This file is part of dax_apdb. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22"""Unit test for `ApdbCassandra` class. 

23 

24Notes 

25----- 

26For now this test can only run against actual Cassandra cluster, to specify 

27cluster location use ``DAX_APDB_TEST_CASSANDRA_CLUSTER`` environment variable, 

28e.g.: 

29 

30 export DAX_APDB_TEST_CASSANDRA_CLUSTER=cassandra.example.com 

31 pytest tests/test_apdbCassandra.py 

32 

33Individual tests create and destroy unique keyspaces in the cluster, there is 

34no need to pre-create a keyspace with predefined name. 

35""" 

36 

37import os 

38import unittest 

39from typing import Any 

40 

41import astropy.time 

42 

43import lsst.utils.tests 

44from lsst.dax.apdb import Apdb, ApdbConfig, ApdbTables, ApdbUpdateRecord, ReplicaChunk 

45from lsst.dax.apdb.cassandra import ApdbCassandra, ApdbCassandraConfig 

46from lsst.dax.apdb.pixelization import Pixelization 

47from lsst.dax.apdb.tests import ApdbSchemaUpdateTest, ApdbTest, cassandra_mixin 

48from lsst.dax.apdb.tests.data_factory import makeObjectCatalog 

49 

50TEST_SCHEMA = os.path.join(os.path.abspath(os.path.dirname(__file__)), "config/schema-apdb.yaml") 

51TEST_SCHEMA_SSO = os.path.join(os.path.abspath(os.path.dirname(__file__)), "config/schema-sso.yaml") 

52# Schema that uses `datetime` for timestamps and combines APDB and SSP. 

53TEST_SCHEMA_DT = os.path.join(os.path.abspath(os.path.dirname(__file__)), "config/schema-datetime.yaml") 

54 

55 

56class ApdbCassandraMixin(cassandra_mixin.ApdbCassandraMixin): 

57 """Mixin class which defines common methods for unit tests.""" 

58 

59 def pixelization(self, config: ApdbConfig) -> Pixelization: 

60 """Return pixelization used by implementation.""" 

61 assert isinstance(config, ApdbCassandraConfig), "Only expect ApdbCassandraConfig here" 

62 return Pixelization( 

63 config.partitioning.part_pixelization, 

64 config.partitioning.part_pix_level, 

65 config.partitioning.part_pix_max_ranges, 

66 ) 

67 

68 

69class ApdbCassandraTestCase(ApdbCassandraMixin, ApdbTest, unittest.TestCase): 

70 """A test case for ApdbCassandra class""" 

71 

72 time_partition_tables = False 

73 time_partition_start: str | None = None 

74 time_partition_end: str | None = None 

75 extra_chunk_columns = 2 

76 

77 def make_instance(self, **kwargs: Any) -> ApdbConfig: 

78 """Make config class instance used in all tests.""" 

79 kw: dict[str, Any] = { 

80 "hosts": (self.cluster_host,), 

81 "keyspace": self.keyspace, 

82 "schema_file": TEST_SCHEMA, 

83 "ss_schema_file": TEST_SCHEMA_SSO, 

84 "time_partition_tables": self.time_partition_tables, 

85 "enable_replica": self.enable_replica, 

86 } 

87 if self.time_partition_start: 

88 kw["time_partition_start"] = self.time_partition_start 

89 if self.time_partition_end: 

90 kw["time_partition_end"] = self.time_partition_end 

91 kw.update(kwargs) 

92 return ApdbCassandra.init_database(**kw) 

93 

94 def getDiaObjects_table(self) -> ApdbTables: 

95 """Return type of table returned from getDiaObjects method.""" 

96 return ApdbTables.DiaObjectLast 

97 

98 def store_update_records(self, apdb: Apdb, records: list[ApdbUpdateRecord], chunk: ReplicaChunk) -> None: 

99 # Docstring inherited. 

100 assert isinstance(apdb, ApdbCassandra), "Expecting ApdbCassandra instance" 

101 apdb._storeUpdateRecords(records, chunk, store_chunk=True) 

102 

103 

104class ApdbCassandraPerMonthTestCase(ApdbCassandraTestCase): 

105 """A test case for ApdbCassandra class with per-month tables.""" 

106 

107 time_partition_tables = True 

108 time_partition_start = "2020-06-01T00:00:00" 

109 time_partition_end = "2021-06-01T00:00:00" 

110 meta_row_count = 4 

111 

112 def test_store_partition_range(self) -> None: 

113 """Test that writing to non-existing partition raises an error.""" 

114 config = self.make_instance() 

115 apdb = Apdb.from_config(config) 

116 

117 region = self.make_region() 

118 

119 # Visit time is beyond time_partition_end. 

120 visit_time = astropy.time.Time("2022-01-01", format="isot", scale="tai") 

121 catalog = makeObjectCatalog(region, 100, visit_time) 

122 with self.assertRaisesRegex(ValueError, "time partitions that do not yet exist"): 

123 apdb.store(visit_time, catalog) 

124 

125 # Writing to last partition makes a warning. 

126 visit_time = astropy.time.Time("2021-06-01", format="isot", scale="tai") 

127 catalog = makeObjectCatalog(region, 100, visit_time) 

128 with self.assertWarnsRegex(UserWarning, "Writing into the last temporal partition"): 

129 apdb.store(visit_time, catalog) 

130 

131 

132class ApdbCassandraTestCaseReplica(ApdbCassandraTestCase): 

133 """A test case with enabled replica tables.""" 

134 

135 enable_replica = True 

136 meta_row_count = 4 

137 

138 

139class ApdbCassandraTestCaseDatetimeReplica(ApdbCassandraTestCaseReplica): 

140 """A test case with datetime timestamps.""" 

141 

142 use_mjd = False 

143 

144 def setUp(self) -> None: 

145 super().setUp() 

146 # Schema for datetime case is also missing a validityTime column in 

147 # DiaObjectLast table. 

148 self.table_column_count = dict(self.table_column_count) 

149 self.table_column_count[ApdbTables.DiaObjectLast] = 4 

150 

151 def make_instance(self, **kwargs: Any) -> ApdbConfig: 

152 if "schema_file" in kwargs: 

153 return super().make_instance(**kwargs) 

154 else: 

155 return super().make_instance(schema_file=TEST_SCHEMA_DT, **kwargs) 

156 

157 

158class ApdbSchemaUpdateCassandraTestCase(ApdbCassandraMixin, ApdbSchemaUpdateTest, unittest.TestCase): 

159 """A test case for schema updates using Cassandra backend.""" 

160 

161 time_partition_tables = False 

162 

163 def make_instance(self, **kwargs: Any) -> ApdbConfig: 

164 """Make config class instance used in all tests.""" 

165 kw = { 

166 "hosts": (self.cluster_host,), 

167 "keyspace": self.keyspace, 

168 "schema_file": TEST_SCHEMA, 

169 "ss_schema_file": TEST_SCHEMA_SSO, 

170 "time_partition_tables": self.time_partition_tables, 

171 } 

172 kw.update(kwargs) 

173 return ApdbCassandra.init_database(**kw) # type: ignore[arg-type] 

174 

175 

176class MyMemoryTestCase(lsst.utils.tests.MemoryTestCase): 

177 """Run file leak tests.""" 

178 

179 

180def setup_module(module: Any) -> None: 

181 """Configure pytest.""" 

182 lsst.utils.tests.init() 

183 

184 

185if __name__ == "__main__": 

186 lsst.utils.tests.init() 

187 unittest.main()