Coverage for tests / test_cassandraPartitioner.py: 14%
96 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:46 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:46 +0000
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
22import unittest
23from typing import Any
25from astropy.time import Time
27from lsst.dax.apdb import ApdbTables
28from lsst.dax.apdb.cassandra import (
29 ApdbCassandraConfig,
30 ApdbCassandraPartitioningConfig,
31 ApdbCassandraTimePartitionRange,
32)
33from lsst.dax.apdb.cassandra.partitioner import Partitioner
34from lsst.sphgeom import Box, UnitVector3d
37class CassandraPartitionerTestCase(unittest.TestCase):
38 """A test case for ApdbCassandra class"""
40 def make_partitioner(self, **kwargs: Any) -> Partitioner:
41 """Make Partitioner instance, keyword arguments are passed to
42 ApdbCassandraPartitioningConfig.
43 """
44 params = {
45 "part_pixelization": "mq3c",
46 "part_pix_level": 10,
47 "query_per_spatial_part": False,
48 }
49 params.update(kwargs)
51 partitioning = ApdbCassandraPartitioningConfig(**params) # type: ignore[arg-type]
52 config = ApdbCassandraConfig(partitioning=partitioning)
53 return Partitioner(config)
55 def test_pixel(self) -> None:
56 """Test pixel() method."""
57 partitioner = self.make_partitioner()
59 self.assertEqual(partitioner.pixel(UnitVector3d(1.0, 1.0, 1.0)), 0xD00000)
60 self.assertEqual(partitioner.pixel(UnitVector3d(-1.0, 1.0, 1.0)), 0xE00000)
61 self.assertEqual(partitioner.pixel(UnitVector3d(1.0, -1.0, 1.0)), 0xF55555)
62 self.assertEqual(partitioner.pixel(UnitVector3d(1.0, 1.0, -1.0)), 0xAAAAAA)
63 self.assertEqual(partitioner.pixel(UnitVector3d(-1.0, -1.0, 1.0)), 0xEFFFFF)
64 self.assertEqual(partitioner.pixel(UnitVector3d(1.0, -1.0, -1.0)), 0xFAAAAA)
66 def test_time_partition(self) -> None:
67 """Test time_partition() method."""
68 partitioner = self.make_partitioner()
70 astrotime = Time("2025-06-01T00:00:00", format="isot", scale="tai")
71 self.assertEqual(partitioner.time_partition(astrotime), 674)
72 astrotime = Time("2025-07-01T00:00:00", format="isot", scale="tai")
73 self.assertEqual(partitioner.time_partition(astrotime), 675)
75 self.assertEqual(partitioner.time_partition(60827.0), 674)
76 self.assertEqual(partitioner.time_partition(60857.0), 675)
78 def test_partition_period(self) -> None:
79 """Test partition_period() method."""
80 partitioner = self.make_partitioner()
82 self.assertEqual(
83 partitioner.partition_period(674),
84 (
85 Time("2025-05-12T00:00:00", format="isot", scale="tai"),
86 Time("2025-06-11T00:00:00", format="isot", scale="tai"),
87 ),
88 )
89 self.assertEqual(
90 partitioner.partition_period(800),
91 (
92 Time("2035-09-17T00:00:00", format="isot", scale="tai"),
93 Time("2035-10-17T00:00:00", format="isot", scale="tai"),
94 ),
95 )
97 def test_spatial_where(self) -> None:
98 """Test spatial_where() method."""
99 region = Box.fromDegrees(-0.05, 0.05, 0.05, 0.15)
101 partitioner = self.make_partitioner()
102 result, count = partitioner.spatial_where(region)
103 self.assertEqual(count, 4)
104 self.assertEqual(result, [('"apdb_part" IN (12058622,12058623,12058624,12058625)', ())])
105 result, count = partitioner.spatial_where(region, use_ranges=True)
106 self.assertEqual(count, 4)
107 self.assertEqual(result, [('"apdb_part" >= %s AND "apdb_part" <= %s', (12058622, 12058625))])
109 partitioner = self.make_partitioner(query_per_spatial_part=True)
110 result, count = partitioner.spatial_where(region)
111 self.assertEqual(count, 4)
112 self.assertEqual(
113 set(result),
114 {
115 ('"apdb_part" = %s', (12058622,)),
116 ('"apdb_part" = %s', (12058623,)),
117 ('"apdb_part" = %s', (12058624,)),
118 ('"apdb_part" = %s', (12058625,)),
119 },
120 )
121 result, count = partitioner.spatial_where(region, for_prepare=True)
122 self.assertEqual(count, 4)
123 self.assertEqual(
124 set(result),
125 {
126 ('"apdb_part" = ?', (12058622,)),
127 ('"apdb_part" = ?', (12058623,)),
128 ('"apdb_part" = ?', (12058624,)),
129 ('"apdb_part" = ?', (12058625,)),
130 },
131 )
132 result, count = partitioner.spatial_where(region, use_ranges=True, for_prepare=True)
133 self.assertEqual(count, 4)
134 self.assertEqual(result, [('"apdb_part" >= ? AND "apdb_part" <= ?', (12058622, 12058625))])
136 def _check_temporal_where(
137 self,
138 tables: list[str],
139 where: list[tuple],
140 part_start: int,
141 part_end: int,
142 *,
143 time_partition_tables: bool = False,
144 query_per_time_part: bool = False,
145 for_prepare: bool = False,
146 ) -> None:
147 if part_start > part_end:
148 self.assertEqual(tables, [])
149 self.assertEqual(where, [])
150 elif time_partition_tables:
151 expect_tables = [f"DiaSource_{part}" for part in range(part_start, part_end + 1)]
152 self.assertEqual(tables, expect_tables)
153 self.assertEqual(where, [])
154 elif query_per_time_part:
155 where_str = '"apdb_time_part" = ?' if for_prepare else '"apdb_time_part" = %s'
156 expect_where = [(where_str, (part,)) for part in range(part_start, part_end + 1)]
157 self.assertEqual(tables, ["DiaSource"])
158 self.assertEqual(where, expect_where)
159 else:
160 parts_str = ",".join(str(part) for part in range(part_start, part_end + 1))
161 self.assertEqual(tables, ["DiaSource"])
162 self.assertEqual(where, [(f'"apdb_time_part" IN ({parts_str})', ())])
164 def test_temporal_where(self) -> None:
165 """Test temporal_where() method."""
166 start_time = Time("2025-01-01T00:00:00", format="isot", scale="tai")
167 end_time = Time("2025-06-01T00:00:00", format="isot", scale="tai")
169 partitioner = self.make_partitioner()
171 tables, where = partitioner.temporal_where(ApdbTables.DiaSource, start_time, end_time)
172 self._check_temporal_where(tables, where, 669, 674)
174 tables, where = partitioner.temporal_where(
175 ApdbTables.DiaSource, start_time, end_time, query_per_time_part=True
176 )
177 self._check_temporal_where(tables, where, 669, 674, query_per_time_part=True)
179 tables, where = partitioner.temporal_where(
180 ApdbTables.DiaSource,
181 start_time,
182 end_time,
183 query_per_time_part=True,
184 for_prepare=True,
185 )
186 self._check_temporal_where(tables, where, 669, 674, query_per_time_part=True, for_prepare=True)
188 partitioner = self.make_partitioner(query_per_time_part=True)
190 tables, where = partitioner.temporal_where(ApdbTables.DiaSource, start_time, end_time)
191 self._check_temporal_where(tables, where, 669, 674, query_per_time_part=True)
193 partitioner = self.make_partitioner(time_partition_tables=True)
194 tables, where = partitioner.temporal_where(ApdbTables.DiaSource, start_time, end_time)
195 self._check_temporal_where(tables, where, 669, 674, time_partition_tables=True)
197 # Check additional partition range constraint.
198 ranges = [
199 ((0, 1000), (669, 674)),
200 ((0, 1), (0, -1)),
201 ((600, 670), (669, 670)),
202 ((670, 770), (670, 674)),
203 ((671, 672), (671, 672)),
204 ]
206 partitioner = self.make_partitioner()
207 for (range_start, range_end), (result_start, result_end) in ranges:
208 part_range = ApdbCassandraTimePartitionRange(start=range_start, end=range_end)
209 tables, where = partitioner.temporal_where(
210 ApdbTables.DiaSource, start_time, end_time, partitons_range=part_range
211 )
212 self._check_temporal_where(tables, where, result_start, result_end)
214 partitioner = self.make_partitioner(time_partition_tables=True)
215 for (range_start, range_end), (result_start, result_end) in ranges:
216 part_range = ApdbCassandraTimePartitionRange(start=range_start, end=range_end)
217 tables, where = partitioner.temporal_where(
218 ApdbTables.DiaSource, start_time, end_time, partitons_range=part_range
219 )
220 self._check_temporal_where(tables, where, result_start, result_end, time_partition_tables=True)
223if __name__ == "__main__":
224 unittest.main()