Coverage for tests / test_apdbCassandra.py: 49%
88 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 08:58 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 08:58 +0000
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
22"""Unit test for `ApdbCassandra` class.
24Notes
25-----
26For now this test can only run against actual Cassandra cluster, to specify
27cluster location use ``DAX_APDB_TEST_CASSANDRA_CLUSTER`` environment variable,
28e.g.:
30 export DAX_APDB_TEST_CASSANDRA_CLUSTER=cassandra.example.com
31 pytest tests/test_apdbCassandra.py
33Individual tests create and destroy unique keyspaces in the cluster, there is
34no need to pre-create a keyspace with predefined name.
35"""
37import os
38import unittest
39from typing import Any
41import astropy.time
43import lsst.utils.tests
44from lsst.dax.apdb import (
45 Apdb,
46 ApdbConfig,
47 ApdbTables,
48 ApdbUpdateRecord,
49 IncompatibleVersionError,
50 ReplicaChunk,
51)
52from lsst.dax.apdb.cassandra import ApdbCassandra, ApdbCassandraConfig
53from lsst.dax.apdb.cassandra.connectionContext import ConnectionContext
54from lsst.dax.apdb.pixelization import Pixelization
55from lsst.dax.apdb.tests import ApdbSchemaUpdateTest, ApdbTest, cassandra_mixin
56from lsst.dax.apdb.tests.data_factory import makeObjectCatalog
58TEST_SCHEMA = os.path.join(os.path.abspath(os.path.dirname(__file__)), "config/schema-apdb.yaml")
59TEST_SCHEMA_SSO = os.path.join(os.path.abspath(os.path.dirname(__file__)), "config/schema-sso.yaml")
60# Schema that uses `datetime` for timestamps and combines APDB and SSP.
61TEST_SCHEMA_DT = os.path.join(os.path.abspath(os.path.dirname(__file__)), "config/schema-datetime.yaml")
64class ApdbCassandraMixin(cassandra_mixin.ApdbCassandraMixin):
65 """Mixin class which defines common methods for unit tests."""
67 def pixelization(self, config: ApdbConfig) -> Pixelization:
68 """Return pixelization used by implementation."""
69 assert isinstance(config, ApdbCassandraConfig), "Only expect ApdbCassandraConfig here"
70 return Pixelization(
71 config.partitioning.part_pixelization,
72 config.partitioning.part_pix_level,
73 config.partitioning.part_pix_max_ranges,
74 )
77class ApdbCassandraTestCase(ApdbCassandraMixin, ApdbTest, unittest.TestCase):
78 """A test case for ApdbCassandra class"""
80 time_partition_tables = False
81 time_partition_start: str | None = None
82 time_partition_end: str | None = None
83 extra_chunk_columns = 2
85 def make_instance(self, **kwargs: Any) -> ApdbConfig:
86 """Make config class instance used in all tests."""
87 kw: dict[str, Any] = {
88 "hosts": (self.cluster_host,),
89 "keyspace": self.keyspace,
90 "schema_file": TEST_SCHEMA,
91 "ss_schema_file": TEST_SCHEMA_SSO,
92 "time_partition_tables": self.time_partition_tables,
93 "enable_replica": self.enable_replica,
94 }
95 if self.time_partition_start:
96 kw["time_partition_start"] = self.time_partition_start
97 if self.time_partition_end:
98 kw["time_partition_end"] = self.time_partition_end
99 kw.update(kwargs)
100 return ApdbCassandra.init_database(**kw)
102 def getDiaObjects_table(self) -> ApdbTables:
103 """Return type of table returned from getDiaObjects method."""
104 return ApdbTables.DiaObjectLast
106 def store_update_records(self, apdb: Apdb, records: list[ApdbUpdateRecord], chunk: ReplicaChunk) -> None:
107 # Docstring inherited.
108 assert isinstance(apdb, ApdbCassandra), "Expecting ApdbCassandra instance"
109 apdb._storeUpdateRecords(records, chunk, store_chunk=True)
111 def _count_after_reset_dedup(self, count_before: int) -> int:
112 return 0
115class ApdbCassandraPerMonthTestCase(ApdbCassandraTestCase):
116 """A test case for ApdbCassandra class with per-month tables."""
118 time_partition_tables = True
119 time_partition_start = "2020-06-01T00:00:00"
120 time_partition_end = "2021-06-01T00:00:00"
121 meta_row_count = 4
123 def test_store_partition_range(self) -> None:
124 """Test that writing to non-existing partition raises an error."""
125 config = self.make_instance()
126 apdb = Apdb.from_config(config)
128 region = self.make_region()
130 # Visit time is beyond time_partition_end.
131 visit_time = astropy.time.Time("2022-01-01", format="isot", scale="tai")
132 catalog = makeObjectCatalog(region, 100)
133 with self.assertRaisesRegex(ValueError, "time partitions that do not yet exist"):
134 apdb.store(visit_time, catalog)
136 # Writing to last partition makes a warning.
137 visit_time = astropy.time.Time("2021-06-01", format="isot", scale="tai")
138 catalog = makeObjectCatalog(region, 100)
139 with self.assertWarnsRegex(UserWarning, "Writing into the last temporal partition"):
140 apdb.store(visit_time, catalog)
143class ApdbCassandraTestCaseReplica(ApdbCassandraTestCase):
144 """A test case with enabled replica tables."""
146 enable_replica = True
147 meta_row_count = 4
150class ApdbCassandraTestCaseDatetimeReplica(ApdbCassandraTestCaseReplica):
151 """A test case with datetime timestamps."""
153 use_mjd = False
155 def setUp(self) -> None:
156 super().setUp()
157 # Schema for datetime case is also missing a validityTime column in
158 # DiaObjectLast table.
159 self.table_column_count = dict(self.table_column_count)
160 self.table_column_count[ApdbTables.DiaObjectLast] = 5
162 def make_instance(self, **kwargs: Any) -> ApdbConfig:
163 if "schema_file" in kwargs:
164 return super().make_instance(**kwargs)
165 else:
166 return super().make_instance(schema_file=TEST_SCHEMA_DT, **kwargs)
169class ApdbSchemaUpdateCassandraTestCase(ApdbCassandraMixin, ApdbSchemaUpdateTest, unittest.TestCase):
170 """A test case for schema updates using Cassandra backend."""
172 time_partition_tables = False
174 def make_instance(self, **kwargs: Any) -> ApdbConfig:
175 """Make config class instance used in all tests."""
176 kw = {
177 "hosts": (self.cluster_host,),
178 "keyspace": self.keyspace,
179 "schema_file": TEST_SCHEMA,
180 "ss_schema_file": TEST_SCHEMA_SSO,
181 "time_partition_tables": self.time_partition_tables,
182 }
183 kw.update(kwargs)
184 return ApdbCassandra.init_database(**kw) # type: ignore[arg-type]
187class ApdbCassandraVersionCheck(cassandra_mixin.ApdbCassandraMixin, unittest.TestCase):
188 """A test case to verify that version check happens before reading
189 frozen configuration.
190 """
192 def setUp(self) -> None:
193 super().setUp()
195 self.config = ApdbCassandra.init_database(
196 hosts=(self.cluster_host,),
197 keyspace=self.keyspace,
198 schema_file=TEST_SCHEMA,
199 ss_schema_file=TEST_SCHEMA_SSO,
200 time_partition_tables=False,
201 )
203 def test_version_check(self) -> None:
204 """Test that version check happens before reading config."""
205 apdb = Apdb.from_config(self.config)
206 assert isinstance(apdb, ApdbCassandra)
208 # Store incompatible version.
209 apdb.metadata.set(ConnectionContext.metadataSchemaVersionKey, "99.0.0", force=True)
211 # Overwrite frozen config with something that will break.
212 apdb.metadata.set(ConnectionContext.metadataConfigKey, '{"not_a_config_key": 0}', force=True)
214 # Try again.
215 with self.assertRaises(IncompatibleVersionError):
216 # Need to call some actual method to initiate connection.
217 Apdb.from_config(self.config).metadata.items()
220class MyMemoryTestCase(lsst.utils.tests.MemoryTestCase):
221 """Run file leak tests."""
224def setup_module(module: Any) -> None:
225 """Configure pytest."""
226 lsst.utils.tests.init()
229if __name__ == "__main__":
230 lsst.utils.tests.init()
231 unittest.main()