Coverage for tests / test_cassandraPartitioner.py: 15%
99 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 08:58 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 08:58 +0000
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
22import unittest
23from typing import Any
25from astropy.time import Time
27from lsst.dax.apdb import ApdbTables
28from lsst.dax.apdb.cassandra import (
29 ApdbCassandraConfig,
30 ApdbCassandraPartitioningConfig,
31 ApdbCassandraTimePartitionRange,
32)
33from lsst.dax.apdb.cassandra.partitioner import Partitioner
34from lsst.dax.apdb.cassandra.queries import QExpr
35from lsst.sphgeom import Box, UnitVector3d
38class CassandraPartitionerTestCase(unittest.TestCase):
39 """A test case for ApdbCassandra class"""
41 def make_partitioner(self, **kwargs: Any) -> Partitioner:
42 """Make Partitioner instance, keyword arguments are passed to
43 ApdbCassandraPartitioningConfig.
44 """
45 params = {
46 "part_pixelization": "mq3c",
47 "part_pix_level": 10,
48 "query_per_spatial_part": False,
49 }
50 params.update(kwargs)
52 partitioning = ApdbCassandraPartitioningConfig(**params) # type: ignore[arg-type]
53 config = ApdbCassandraConfig(partitioning=partitioning)
54 return Partitioner(config)
56 def test_pixel(self) -> None:
57 """Test pixel() method."""
58 partitioner = self.make_partitioner()
60 self.assertEqual(partitioner.pixel(UnitVector3d(1.0, 1.0, 1.0)), 0xD00000)
61 self.assertEqual(partitioner.pixel(UnitVector3d(-1.0, 1.0, 1.0)), 0xE00000)
62 self.assertEqual(partitioner.pixel(UnitVector3d(1.0, -1.0, 1.0)), 0xF55555)
63 self.assertEqual(partitioner.pixel(UnitVector3d(1.0, 1.0, -1.0)), 0xAAAAAA)
64 self.assertEqual(partitioner.pixel(UnitVector3d(-1.0, -1.0, 1.0)), 0xEFFFFF)
65 self.assertEqual(partitioner.pixel(UnitVector3d(1.0, -1.0, -1.0)), 0xFAAAAA)
67 def test_time_partition(self) -> None:
68 """Test time_partition() method."""
69 partitioner = self.make_partitioner()
71 astrotime = Time("2025-06-01T00:00:00", format="isot", scale="tai")
72 self.assertEqual(partitioner.time_partition(astrotime), 674)
73 astrotime = Time("2025-07-01T00:00:00", format="isot", scale="tai")
74 self.assertEqual(partitioner.time_partition(astrotime), 675)
76 self.assertEqual(partitioner.time_partition(60827.0), 674)
77 self.assertEqual(partitioner.time_partition(60857.0), 675)
79 def test_partition_period(self) -> None:
80 """Test partition_period() method."""
81 partitioner = self.make_partitioner()
83 self.assertEqual(
84 partitioner.partition_period(674),
85 (
86 Time("2025-05-12T00:00:00", format="isot", scale="tai"),
87 Time("2025-06-11T00:00:00", format="isot", scale="tai"),
88 ),
89 )
90 self.assertEqual(
91 partitioner.partition_period(800),
92 (
93 Time("2035-09-17T00:00:00", format="isot", scale="tai"),
94 Time("2035-10-17T00:00:00", format="isot", scale="tai"),
95 ),
96 )
98 def test_spatial_where(self) -> None:
99 """Test spatial_where() method."""
100 region = Box.fromDegrees(-0.05, 0.05, 0.05, 0.15)
102 partitioner = self.make_partitioner()
103 result, count = partitioner.spatial_where(region)
104 self.assertEqual(count, 4)
105 self.assertEqual(
106 result,
107 [
108 QExpr(
109 "apdb_part IN ({},{},{},{})",
110 (12058622, 12058623, 12058624, 12058625),
111 can_prepare=False,
112 )
113 ],
114 )
115 result, count = partitioner.spatial_where(region, use_ranges=True)
116 self.assertEqual(count, 4)
117 self.assertEqual(result, [QExpr("apdb_part >= {} AND apdb_part <= {}", (12058622, 12058625))])
119 partitioner = self.make_partitioner(query_per_spatial_part=True)
120 result, count = partitioner.spatial_where(region)
121 self.assertEqual(count, 4)
122 self.assertIn(QExpr("apdb_part = {}", (12058622,)), result)
123 self.assertIn(QExpr("apdb_part = {}", (12058623,)), result)
124 self.assertIn(QExpr("apdb_part = {}", (12058624,)), result)
125 self.assertIn(QExpr("apdb_part = {}", (12058625,)), result)
127 result, count = partitioner.spatial_where(region, use_ranges=True)
128 self.assertEqual(count, 4)
129 self.assertEqual(result, [QExpr("apdb_part >= {} AND apdb_part <= {}", (12058622, 12058625))])
131 def _check_temporal_where(
132 self,
133 tables: list[str],
134 where: list[QExpr],
135 part_start: int,
136 part_end: int,
137 *,
138 time_partition_tables: bool = False,
139 query_per_time_part: bool = False,
140 ) -> None:
141 if part_start > part_end:
142 self.assertEqual(tables, [])
143 self.assertEqual(where, [])
144 elif time_partition_tables:
145 expect_tables = [f"DiaSource_{part}" for part in range(part_start, part_end + 1)]
146 self.assertEqual(tables, expect_tables)
147 self.assertEqual(where, [])
148 elif query_per_time_part:
149 where_str = '"apdb_time_part" = {}'
150 expect_where = [QExpr(where_str, (part,)) for part in range(part_start, part_end + 1)]
151 self.assertEqual(tables, ["DiaSource"])
152 self.assertEqual(where, expect_where)
153 else:
154 num_part = part_end + 1 - part_start
155 placeholders = ",".join(["{}"] * num_part)
156 self.assertEqual(tables, ["DiaSource"])
157 can_prepare = num_part <= 3
158 self.assertEqual(
159 where,
160 [
161 QExpr(
162 f"apdb_time_part IN ({placeholders})",
163 tuple(range(part_start, part_end + 1)),
164 can_prepare=can_prepare,
165 )
166 ],
167 )
169 def test_temporal_where(self) -> None:
170 """Test temporal_where() method."""
171 start_time = Time("2025-01-01T00:00:00", format="isot", scale="tai")
172 end_time = Time("2025-06-01T00:00:00", format="isot", scale="tai")
174 partitioner = self.make_partitioner()
176 tables, where = partitioner.temporal_where(ApdbTables.DiaSource, start_time, end_time)
177 self._check_temporal_where(tables, where, 669, 674)
179 tables, where = partitioner.temporal_where(
180 ApdbTables.DiaSource, start_time, end_time, query_per_time_part=True
181 )
182 self._check_temporal_where(tables, where, 669, 674, query_per_time_part=True)
184 tables, where = partitioner.temporal_where(
185 ApdbTables.DiaSource,
186 start_time,
187 end_time,
188 query_per_time_part=True,
189 )
190 self._check_temporal_where(tables, where, 669, 674, query_per_time_part=True)
192 partitioner = self.make_partitioner(query_per_time_part=True)
194 tables, where = partitioner.temporal_where(ApdbTables.DiaSource, start_time, end_time)
195 self._check_temporal_where(tables, where, 669, 674, query_per_time_part=True)
197 partitioner = self.make_partitioner(time_partition_tables=True)
198 tables, where = partitioner.temporal_where(ApdbTables.DiaSource, start_time, end_time)
199 self._check_temporal_where(tables, where, 669, 674, time_partition_tables=True)
201 # Check additional partition range constraint.
202 ranges = [
203 ((0, 1000), (669, 674)),
204 ((0, 1), (0, -1)),
205 ((600, 670), (669, 670)),
206 ((670, 770), (670, 674)),
207 ((671, 672), (671, 672)),
208 ]
210 partitioner = self.make_partitioner()
211 for (range_start, range_end), (result_start, result_end) in ranges:
212 part_range = ApdbCassandraTimePartitionRange(start=range_start, end=range_end)
213 tables, where = partitioner.temporal_where(
214 ApdbTables.DiaSource, start_time, end_time, partitons_range=part_range
215 )
216 self._check_temporal_where(tables, where, result_start, result_end)
218 partitioner = self.make_partitioner(time_partition_tables=True)
219 for (range_start, range_end), (result_start, result_end) in ranges:
220 part_range = ApdbCassandraTimePartitionRange(start=range_start, end=range_end)
221 tables, where = partitioner.temporal_where(
222 ApdbTables.DiaSource, start_time, end_time, partitons_range=part_range
223 )
224 self._check_temporal_where(tables, where, result_start, result_end, time_partition_tables=True)
227if __name__ == "__main__":
228 unittest.main()