Coverage for tests / test_cassandraPartitioner.py: 15%

99 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-28 08:43 +0000

1# This file is part of dax_apdb. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22import unittest 

23from typing import Any 

24 

25from astropy.time import Time 

26 

27from lsst.dax.apdb import ApdbTables 

28from lsst.dax.apdb.cassandra import ( 

29 ApdbCassandraConfig, 

30 ApdbCassandraPartitioningConfig, 

31 ApdbCassandraTimePartitionRange, 

32) 

33from lsst.dax.apdb.cassandra.partitioner import Partitioner 

34from lsst.dax.apdb.cassandra.queries import QExpr 

35from lsst.sphgeom import Box, UnitVector3d 

36 

37 

38class CassandraPartitionerTestCase(unittest.TestCase): 

39 """A test case for ApdbCassandra class""" 

40 

41 def make_partitioner(self, **kwargs: Any) -> Partitioner: 

42 """Make Partitioner instance, keyword arguments are passed to 

43 ApdbCassandraPartitioningConfig. 

44 """ 

45 params = { 

46 "part_pixelization": "mq3c", 

47 "part_pix_level": 10, 

48 "query_per_spatial_part": False, 

49 } 

50 params.update(kwargs) 

51 

52 partitioning = ApdbCassandraPartitioningConfig(**params) # type: ignore[arg-type] 

53 config = ApdbCassandraConfig(partitioning=partitioning) 

54 return Partitioner(config) 

55 

56 def test_pixel(self) -> None: 

57 """Test pixel() method.""" 

58 partitioner = self.make_partitioner() 

59 

60 self.assertEqual(partitioner.pixel(UnitVector3d(1.0, 1.0, 1.0)), 0xD00000) 

61 self.assertEqual(partitioner.pixel(UnitVector3d(-1.0, 1.0, 1.0)), 0xE00000) 

62 self.assertEqual(partitioner.pixel(UnitVector3d(1.0, -1.0, 1.0)), 0xF55555) 

63 self.assertEqual(partitioner.pixel(UnitVector3d(1.0, 1.0, -1.0)), 0xAAAAAA) 

64 self.assertEqual(partitioner.pixel(UnitVector3d(-1.0, -1.0, 1.0)), 0xEFFFFF) 

65 self.assertEqual(partitioner.pixel(UnitVector3d(1.0, -1.0, -1.0)), 0xFAAAAA) 

66 

67 def test_time_partition(self) -> None: 

68 """Test time_partition() method.""" 

69 partitioner = self.make_partitioner() 

70 

71 astrotime = Time("2025-06-01T00:00:00", format="isot", scale="tai") 

72 self.assertEqual(partitioner.time_partition(astrotime), 674) 

73 astrotime = Time("2025-07-01T00:00:00", format="isot", scale="tai") 

74 self.assertEqual(partitioner.time_partition(astrotime), 675) 

75 

76 self.assertEqual(partitioner.time_partition(60827.0), 674) 

77 self.assertEqual(partitioner.time_partition(60857.0), 675) 

78 

79 def test_partition_period(self) -> None: 

80 """Test partition_period() method.""" 

81 partitioner = self.make_partitioner() 

82 

83 self.assertEqual( 

84 partitioner.partition_period(674), 

85 ( 

86 Time("2025-05-12T00:00:00", format="isot", scale="tai"), 

87 Time("2025-06-11T00:00:00", format="isot", scale="tai"), 

88 ), 

89 ) 

90 self.assertEqual( 

91 partitioner.partition_period(800), 

92 ( 

93 Time("2035-09-17T00:00:00", format="isot", scale="tai"), 

94 Time("2035-10-17T00:00:00", format="isot", scale="tai"), 

95 ), 

96 ) 

97 

98 def test_spatial_where(self) -> None: 

99 """Test spatial_where() method.""" 

100 region = Box.fromDegrees(-0.05, 0.05, 0.05, 0.15) 

101 

102 partitioner = self.make_partitioner() 

103 result, count = partitioner.spatial_where(region) 

104 self.assertEqual(count, 4) 

105 self.assertEqual( 

106 result, 

107 [ 

108 QExpr( 

109 "apdb_part IN ({},{},{},{})", 

110 (12058622, 12058623, 12058624, 12058625), 

111 can_prepare=False, 

112 ) 

113 ], 

114 ) 

115 result, count = partitioner.spatial_where(region, use_ranges=True) 

116 self.assertEqual(count, 4) 

117 self.assertEqual(result, [QExpr("apdb_part >= {} AND apdb_part <= {}", (12058622, 12058625))]) 

118 

119 partitioner = self.make_partitioner(query_per_spatial_part=True) 

120 result, count = partitioner.spatial_where(region) 

121 self.assertEqual(count, 4) 

122 self.assertIn(QExpr("apdb_part = {}", (12058622,)), result) 

123 self.assertIn(QExpr("apdb_part = {}", (12058623,)), result) 

124 self.assertIn(QExpr("apdb_part = {}", (12058624,)), result) 

125 self.assertIn(QExpr("apdb_part = {}", (12058625,)), result) 

126 

127 result, count = partitioner.spatial_where(region, use_ranges=True) 

128 self.assertEqual(count, 4) 

129 self.assertEqual(result, [QExpr("apdb_part >= {} AND apdb_part <= {}", (12058622, 12058625))]) 

130 

131 def _check_temporal_where( 

132 self, 

133 tables: list[str], 

134 where: list[QExpr], 

135 part_start: int, 

136 part_end: int, 

137 *, 

138 time_partition_tables: bool = False, 

139 query_per_time_part: bool = False, 

140 ) -> None: 

141 if part_start > part_end: 

142 self.assertEqual(tables, []) 

143 self.assertEqual(where, []) 

144 elif time_partition_tables: 

145 expect_tables = [f"DiaSource_{part}" for part in range(part_start, part_end + 1)] 

146 self.assertEqual(tables, expect_tables) 

147 self.assertEqual(where, []) 

148 elif query_per_time_part: 

149 where_str = '"apdb_time_part" = {}' 

150 expect_where = [QExpr(where_str, (part,)) for part in range(part_start, part_end + 1)] 

151 self.assertEqual(tables, ["DiaSource"]) 

152 self.assertEqual(where, expect_where) 

153 else: 

154 num_part = part_end + 1 - part_start 

155 placeholders = ",".join(["{}"] * num_part) 

156 self.assertEqual(tables, ["DiaSource"]) 

157 can_prepare = num_part <= 3 

158 self.assertEqual( 

159 where, 

160 [ 

161 QExpr( 

162 f"apdb_time_part IN ({placeholders})", 

163 tuple(range(part_start, part_end + 1)), 

164 can_prepare=can_prepare, 

165 ) 

166 ], 

167 ) 

168 

169 def test_temporal_where(self) -> None: 

170 """Test temporal_where() method.""" 

171 start_time = Time("2025-01-01T00:00:00", format="isot", scale="tai") 

172 end_time = Time("2025-06-01T00:00:00", format="isot", scale="tai") 

173 

174 partitioner = self.make_partitioner() 

175 

176 tables, where = partitioner.temporal_where(ApdbTables.DiaSource, start_time, end_time) 

177 self._check_temporal_where(tables, where, 669, 674) 

178 

179 tables, where = partitioner.temporal_where( 

180 ApdbTables.DiaSource, start_time, end_time, query_per_time_part=True 

181 ) 

182 self._check_temporal_where(tables, where, 669, 674, query_per_time_part=True) 

183 

184 tables, where = partitioner.temporal_where( 

185 ApdbTables.DiaSource, 

186 start_time, 

187 end_time, 

188 query_per_time_part=True, 

189 ) 

190 self._check_temporal_where(tables, where, 669, 674, query_per_time_part=True) 

191 

192 partitioner = self.make_partitioner(query_per_time_part=True) 

193 

194 tables, where = partitioner.temporal_where(ApdbTables.DiaSource, start_time, end_time) 

195 self._check_temporal_where(tables, where, 669, 674, query_per_time_part=True) 

196 

197 partitioner = self.make_partitioner(time_partition_tables=True) 

198 tables, where = partitioner.temporal_where(ApdbTables.DiaSource, start_time, end_time) 

199 self._check_temporal_where(tables, where, 669, 674, time_partition_tables=True) 

200 

201 # Check additional partition range constraint. 

202 ranges = [ 

203 ((0, 1000), (669, 674)), 

204 ((0, 1), (0, -1)), 

205 ((600, 670), (669, 670)), 

206 ((670, 770), (670, 674)), 

207 ((671, 672), (671, 672)), 

208 ] 

209 

210 partitioner = self.make_partitioner() 

211 for (range_start, range_end), (result_start, result_end) in ranges: 

212 part_range = ApdbCassandraTimePartitionRange(start=range_start, end=range_end) 

213 tables, where = partitioner.temporal_where( 

214 ApdbTables.DiaSource, start_time, end_time, partitons_range=part_range 

215 ) 

216 self._check_temporal_where(tables, where, result_start, result_end) 

217 

218 partitioner = self.make_partitioner(time_partition_tables=True) 

219 for (range_start, range_end), (result_start, result_end) in ranges: 

220 part_range = ApdbCassandraTimePartitionRange(start=range_start, end=range_end) 

221 tables, where = partitioner.temporal_where( 

222 ApdbTables.DiaSource, start_time, end_time, partitons_range=part_range 

223 ) 

224 self._check_temporal_where(tables, where, result_start, result_end, time_partition_tables=True) 

225 

226 

227if __name__ == "__main__": 

228 unittest.main()