Coverage for tests / test_apdbCassandra.py: 49%

88 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-17 08:58 +0000

1# This file is part of dax_apdb. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22"""Unit test for `ApdbCassandra` class. 

23 

24Notes 

25----- 

26For now this test can only run against actual Cassandra cluster, to specify 

27cluster location use ``DAX_APDB_TEST_CASSANDRA_CLUSTER`` environment variable, 

28e.g.: 

29 

30 export DAX_APDB_TEST_CASSANDRA_CLUSTER=cassandra.example.com 

31 pytest tests/test_apdbCassandra.py 

32 

33Individual tests create and destroy unique keyspaces in the cluster, there is 

34no need to pre-create a keyspace with predefined name. 

35""" 

36 

37import os 

38import unittest 

39from typing import Any 

40 

41import astropy.time 

42 

43import lsst.utils.tests 

44from lsst.dax.apdb import ( 

45 Apdb, 

46 ApdbConfig, 

47 ApdbTables, 

48 ApdbUpdateRecord, 

49 IncompatibleVersionError, 

50 ReplicaChunk, 

51) 

52from lsst.dax.apdb.cassandra import ApdbCassandra, ApdbCassandraConfig 

53from lsst.dax.apdb.cassandra.connectionContext import ConnectionContext 

54from lsst.dax.apdb.pixelization import Pixelization 

55from lsst.dax.apdb.tests import ApdbSchemaUpdateTest, ApdbTest, cassandra_mixin 

56from lsst.dax.apdb.tests.data_factory import makeObjectCatalog 

57 

58TEST_SCHEMA = os.path.join(os.path.abspath(os.path.dirname(__file__)), "config/schema-apdb.yaml") 

59TEST_SCHEMA_SSO = os.path.join(os.path.abspath(os.path.dirname(__file__)), "config/schema-sso.yaml") 

60# Schema that uses `datetime` for timestamps and combines APDB and SSP. 

61TEST_SCHEMA_DT = os.path.join(os.path.abspath(os.path.dirname(__file__)), "config/schema-datetime.yaml") 

62 

63 

64class ApdbCassandraMixin(cassandra_mixin.ApdbCassandraMixin): 

65 """Mixin class which defines common methods for unit tests.""" 

66 

67 def pixelization(self, config: ApdbConfig) -> Pixelization: 

68 """Return pixelization used by implementation.""" 

69 assert isinstance(config, ApdbCassandraConfig), "Only expect ApdbCassandraConfig here" 

70 return Pixelization( 

71 config.partitioning.part_pixelization, 

72 config.partitioning.part_pix_level, 

73 config.partitioning.part_pix_max_ranges, 

74 ) 

75 

76 

77class ApdbCassandraTestCase(ApdbCassandraMixin, ApdbTest, unittest.TestCase): 

78 """A test case for ApdbCassandra class""" 

79 

80 time_partition_tables = False 

81 time_partition_start: str | None = None 

82 time_partition_end: str | None = None 

83 extra_chunk_columns = 2 

84 

85 def make_instance(self, **kwargs: Any) -> ApdbConfig: 

86 """Make config class instance used in all tests.""" 

87 kw: dict[str, Any] = { 

88 "hosts": (self.cluster_host,), 

89 "keyspace": self.keyspace, 

90 "schema_file": TEST_SCHEMA, 

91 "ss_schema_file": TEST_SCHEMA_SSO, 

92 "time_partition_tables": self.time_partition_tables, 

93 "enable_replica": self.enable_replica, 

94 } 

95 if self.time_partition_start: 

96 kw["time_partition_start"] = self.time_partition_start 

97 if self.time_partition_end: 

98 kw["time_partition_end"] = self.time_partition_end 

99 kw.update(kwargs) 

100 return ApdbCassandra.init_database(**kw) 

101 

102 def getDiaObjects_table(self) -> ApdbTables: 

103 """Return type of table returned from getDiaObjects method.""" 

104 return ApdbTables.DiaObjectLast 

105 

106 def store_update_records(self, apdb: Apdb, records: list[ApdbUpdateRecord], chunk: ReplicaChunk) -> None: 

107 # Docstring inherited. 

108 assert isinstance(apdb, ApdbCassandra), "Expecting ApdbCassandra instance" 

109 apdb._storeUpdateRecords(records, chunk, store_chunk=True) 

110 

111 def _count_after_reset_dedup(self, count_before: int) -> int: 

112 return 0 

113 

114 

115class ApdbCassandraPerMonthTestCase(ApdbCassandraTestCase): 

116 """A test case for ApdbCassandra class with per-month tables.""" 

117 

118 time_partition_tables = True 

119 time_partition_start = "2020-06-01T00:00:00" 

120 time_partition_end = "2021-06-01T00:00:00" 

121 meta_row_count = 4 

122 

123 def test_store_partition_range(self) -> None: 

124 """Test that writing to non-existing partition raises an error.""" 

125 config = self.make_instance() 

126 apdb = Apdb.from_config(config) 

127 

128 region = self.make_region() 

129 

130 # Visit time is beyond time_partition_end. 

131 visit_time = astropy.time.Time("2022-01-01", format="isot", scale="tai") 

132 catalog = makeObjectCatalog(region, 100) 

133 with self.assertRaisesRegex(ValueError, "time partitions that do not yet exist"): 

134 apdb.store(visit_time, catalog) 

135 

136 # Writing to last partition makes a warning. 

137 visit_time = astropy.time.Time("2021-06-01", format="isot", scale="tai") 

138 catalog = makeObjectCatalog(region, 100) 

139 with self.assertWarnsRegex(UserWarning, "Writing into the last temporal partition"): 

140 apdb.store(visit_time, catalog) 

141 

142 

143class ApdbCassandraTestCaseReplica(ApdbCassandraTestCase): 

144 """A test case with enabled replica tables.""" 

145 

146 enable_replica = True 

147 meta_row_count = 4 

148 

149 

150class ApdbCassandraTestCaseDatetimeReplica(ApdbCassandraTestCaseReplica): 

151 """A test case with datetime timestamps.""" 

152 

153 use_mjd = False 

154 

155 def setUp(self) -> None: 

156 super().setUp() 

157 # Schema for datetime case is also missing a validityTime column in 

158 # DiaObjectLast table. 

159 self.table_column_count = dict(self.table_column_count) 

160 self.table_column_count[ApdbTables.DiaObjectLast] = 5 

161 

162 def make_instance(self, **kwargs: Any) -> ApdbConfig: 

163 if "schema_file" in kwargs: 

164 return super().make_instance(**kwargs) 

165 else: 

166 return super().make_instance(schema_file=TEST_SCHEMA_DT, **kwargs) 

167 

168 

169class ApdbSchemaUpdateCassandraTestCase(ApdbCassandraMixin, ApdbSchemaUpdateTest, unittest.TestCase): 

170 """A test case for schema updates using Cassandra backend.""" 

171 

172 time_partition_tables = False 

173 

174 def make_instance(self, **kwargs: Any) -> ApdbConfig: 

175 """Make config class instance used in all tests.""" 

176 kw = { 

177 "hosts": (self.cluster_host,), 

178 "keyspace": self.keyspace, 

179 "schema_file": TEST_SCHEMA, 

180 "ss_schema_file": TEST_SCHEMA_SSO, 

181 "time_partition_tables": self.time_partition_tables, 

182 } 

183 kw.update(kwargs) 

184 return ApdbCassandra.init_database(**kw) # type: ignore[arg-type] 

185 

186 

187class ApdbCassandraVersionCheck(cassandra_mixin.ApdbCassandraMixin, unittest.TestCase): 

188 """A test case to verify that version check happens before reading 

189 frozen configuration. 

190 """ 

191 

192 def setUp(self) -> None: 

193 super().setUp() 

194 

195 self.config = ApdbCassandra.init_database( 

196 hosts=(self.cluster_host,), 

197 keyspace=self.keyspace, 

198 schema_file=TEST_SCHEMA, 

199 ss_schema_file=TEST_SCHEMA_SSO, 

200 time_partition_tables=False, 

201 ) 

202 

203 def test_version_check(self) -> None: 

204 """Test that version check happens before reading config.""" 

205 apdb = Apdb.from_config(self.config) 

206 assert isinstance(apdb, ApdbCassandra) 

207 

208 # Store incompatible version. 

209 apdb.metadata.set(ConnectionContext.metadataSchemaVersionKey, "99.0.0", force=True) 

210 

211 # Overwrite frozen config with something that will break. 

212 apdb.metadata.set(ConnectionContext.metadataConfigKey, '{"not_a_config_key": 0}', force=True) 

213 

214 # Try again. 

215 with self.assertRaises(IncompatibleVersionError): 

216 # Need to call some actual method to initiate connection. 

217 Apdb.from_config(self.config).metadata.items() 

218 

219 

220class MyMemoryTestCase(lsst.utils.tests.MemoryTestCase): 

221 """Run file leak tests.""" 

222 

223 

224def setup_module(module: Any) -> None: 

225 """Configure pytest.""" 

226 lsst.utils.tests.init() 

227 

228 

229if __name__ == "__main__": 

230 lsst.utils.tests.init() 

231 unittest.main()