Coverage for tests / test_apdbCassandra.py: 51%
74 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:46 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:46 +0000
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
22"""Unit test for `ApdbCassandra` class.
24Notes
25-----
26For now this test can only run against actual Cassandra cluster, to specify
27cluster location use ``DAX_APDB_TEST_CASSANDRA_CLUSTER`` environment variable,
28e.g.:
30 export DAX_APDB_TEST_CASSANDRA_CLUSTER=cassandra.example.com
31 pytest tests/test_apdbCassandra.py
33Individual tests create and destroy unique keyspaces in the cluster, there is
34no need to pre-create a keyspace with predefined name.
35"""
37import os
38import unittest
39from typing import Any
41import astropy.time
43import lsst.utils.tests
44from lsst.dax.apdb import Apdb, ApdbConfig, ApdbTables, ApdbUpdateRecord, ReplicaChunk
45from lsst.dax.apdb.cassandra import ApdbCassandra, ApdbCassandraConfig
46from lsst.dax.apdb.pixelization import Pixelization
47from lsst.dax.apdb.tests import ApdbSchemaUpdateTest, ApdbTest, cassandra_mixin
48from lsst.dax.apdb.tests.data_factory import makeObjectCatalog
50TEST_SCHEMA = os.path.join(os.path.abspath(os.path.dirname(__file__)), "config/schema-apdb.yaml")
51TEST_SCHEMA_SSO = os.path.join(os.path.abspath(os.path.dirname(__file__)), "config/schema-sso.yaml")
52# Schema that uses `datetime` for timestamps and combines APDB and SSP.
53TEST_SCHEMA_DT = os.path.join(os.path.abspath(os.path.dirname(__file__)), "config/schema-datetime.yaml")
56class ApdbCassandraMixin(cassandra_mixin.ApdbCassandraMixin):
57 """Mixin class which defines common methods for unit tests."""
59 def pixelization(self, config: ApdbConfig) -> Pixelization:
60 """Return pixelization used by implementation."""
61 assert isinstance(config, ApdbCassandraConfig), "Only expect ApdbCassandraConfig here"
62 return Pixelization(
63 config.partitioning.part_pixelization,
64 config.partitioning.part_pix_level,
65 config.partitioning.part_pix_max_ranges,
66 )
69class ApdbCassandraTestCase(ApdbCassandraMixin, ApdbTest, unittest.TestCase):
70 """A test case for ApdbCassandra class"""
72 time_partition_tables = False
73 time_partition_start: str | None = None
74 time_partition_end: str | None = None
75 extra_chunk_columns = 2
77 def make_instance(self, **kwargs: Any) -> ApdbConfig:
78 """Make config class instance used in all tests."""
79 kw: dict[str, Any] = {
80 "hosts": (self.cluster_host,),
81 "keyspace": self.keyspace,
82 "schema_file": TEST_SCHEMA,
83 "ss_schema_file": TEST_SCHEMA_SSO,
84 "time_partition_tables": self.time_partition_tables,
85 "enable_replica": self.enable_replica,
86 }
87 if self.time_partition_start:
88 kw["time_partition_start"] = self.time_partition_start
89 if self.time_partition_end:
90 kw["time_partition_end"] = self.time_partition_end
91 kw.update(kwargs)
92 return ApdbCassandra.init_database(**kw)
94 def getDiaObjects_table(self) -> ApdbTables:
95 """Return type of table returned from getDiaObjects method."""
96 return ApdbTables.DiaObjectLast
98 def store_update_records(self, apdb: Apdb, records: list[ApdbUpdateRecord], chunk: ReplicaChunk) -> None:
99 # Docstring inherited.
100 assert isinstance(apdb, ApdbCassandra), "Expecting ApdbCassandra instance"
101 apdb._storeUpdateRecords(records, chunk, store_chunk=True)
104class ApdbCassandraPerMonthTestCase(ApdbCassandraTestCase):
105 """A test case for ApdbCassandra class with per-month tables."""
107 time_partition_tables = True
108 time_partition_start = "2020-06-01T00:00:00"
109 time_partition_end = "2021-06-01T00:00:00"
110 meta_row_count = 4
112 def test_store_partition_range(self) -> None:
113 """Test that writing to non-existing partition raises an error."""
114 config = self.make_instance()
115 apdb = Apdb.from_config(config)
117 region = self.make_region()
119 # Visit time is beyond time_partition_end.
120 visit_time = astropy.time.Time("2022-01-01", format="isot", scale="tai")
121 catalog = makeObjectCatalog(region, 100, visit_time)
122 with self.assertRaisesRegex(ValueError, "time partitions that do not yet exist"):
123 apdb.store(visit_time, catalog)
125 # Writing to last partition makes a warning.
126 visit_time = astropy.time.Time("2021-06-01", format="isot", scale="tai")
127 catalog = makeObjectCatalog(region, 100, visit_time)
128 with self.assertWarnsRegex(UserWarning, "Writing into the last temporal partition"):
129 apdb.store(visit_time, catalog)
132class ApdbCassandraTestCaseReplica(ApdbCassandraTestCase):
133 """A test case with enabled replica tables."""
135 enable_replica = True
136 meta_row_count = 4
139class ApdbCassandraTestCaseDatetimeReplica(ApdbCassandraTestCaseReplica):
140 """A test case with datetime timestamps."""
142 use_mjd = False
144 def setUp(self) -> None:
145 super().setUp()
146 # Schema for datetime case is also missing a validityTime column in
147 # DiaObjectLast table.
148 self.table_column_count = dict(self.table_column_count)
149 self.table_column_count[ApdbTables.DiaObjectLast] = 4
151 def make_instance(self, **kwargs: Any) -> ApdbConfig:
152 if "schema_file" in kwargs:
153 return super().make_instance(**kwargs)
154 else:
155 return super().make_instance(schema_file=TEST_SCHEMA_DT, **kwargs)
158class ApdbSchemaUpdateCassandraTestCase(ApdbCassandraMixin, ApdbSchemaUpdateTest, unittest.TestCase):
159 """A test case for schema updates using Cassandra backend."""
161 time_partition_tables = False
163 def make_instance(self, **kwargs: Any) -> ApdbConfig:
164 """Make config class instance used in all tests."""
165 kw = {
166 "hosts": (self.cluster_host,),
167 "keyspace": self.keyspace,
168 "schema_file": TEST_SCHEMA,
169 "ss_schema_file": TEST_SCHEMA_SSO,
170 "time_partition_tables": self.time_partition_tables,
171 }
172 kw.update(kwargs)
173 return ApdbCassandra.init_database(**kw) # type: ignore[arg-type]
176class MyMemoryTestCase(lsst.utils.tests.MemoryTestCase):
177 """Run file leak tests."""
180def setup_module(module: Any) -> None:
181 """Configure pytest."""
182 lsst.utils.tests.init()
185if __name__ == "__main__":
186 lsst.utils.tests.init()
187 unittest.main()