Coverage for tests / test_cassandraPartitioner.py: 14%

96 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:46 +0000

1# This file is part of dax_apdb. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22import unittest 

23from typing import Any 

24 

25from astropy.time import Time 

26 

27from lsst.dax.apdb import ApdbTables 

28from lsst.dax.apdb.cassandra import ( 

29 ApdbCassandraConfig, 

30 ApdbCassandraPartitioningConfig, 

31 ApdbCassandraTimePartitionRange, 

32) 

33from lsst.dax.apdb.cassandra.partitioner import Partitioner 

34from lsst.sphgeom import Box, UnitVector3d 

35 

36 

37class CassandraPartitionerTestCase(unittest.TestCase): 

38 """A test case for ApdbCassandra class""" 

39 

40 def make_partitioner(self, **kwargs: Any) -> Partitioner: 

41 """Make Partitioner instance, keyword arguments are passed to 

42 ApdbCassandraPartitioningConfig. 

43 """ 

44 params = { 

45 "part_pixelization": "mq3c", 

46 "part_pix_level": 10, 

47 "query_per_spatial_part": False, 

48 } 

49 params.update(kwargs) 

50 

51 partitioning = ApdbCassandraPartitioningConfig(**params) # type: ignore[arg-type] 

52 config = ApdbCassandraConfig(partitioning=partitioning) 

53 return Partitioner(config) 

54 

55 def test_pixel(self) -> None: 

56 """Test pixel() method.""" 

57 partitioner = self.make_partitioner() 

58 

59 self.assertEqual(partitioner.pixel(UnitVector3d(1.0, 1.0, 1.0)), 0xD00000) 

60 self.assertEqual(partitioner.pixel(UnitVector3d(-1.0, 1.0, 1.0)), 0xE00000) 

61 self.assertEqual(partitioner.pixel(UnitVector3d(1.0, -1.0, 1.0)), 0xF55555) 

62 self.assertEqual(partitioner.pixel(UnitVector3d(1.0, 1.0, -1.0)), 0xAAAAAA) 

63 self.assertEqual(partitioner.pixel(UnitVector3d(-1.0, -1.0, 1.0)), 0xEFFFFF) 

64 self.assertEqual(partitioner.pixel(UnitVector3d(1.0, -1.0, -1.0)), 0xFAAAAA) 

65 

66 def test_time_partition(self) -> None: 

67 """Test time_partition() method.""" 

68 partitioner = self.make_partitioner() 

69 

70 astrotime = Time("2025-06-01T00:00:00", format="isot", scale="tai") 

71 self.assertEqual(partitioner.time_partition(astrotime), 674) 

72 astrotime = Time("2025-07-01T00:00:00", format="isot", scale="tai") 

73 self.assertEqual(partitioner.time_partition(astrotime), 675) 

74 

75 self.assertEqual(partitioner.time_partition(60827.0), 674) 

76 self.assertEqual(partitioner.time_partition(60857.0), 675) 

77 

78 def test_partition_period(self) -> None: 

79 """Test partition_period() method.""" 

80 partitioner = self.make_partitioner() 

81 

82 self.assertEqual( 

83 partitioner.partition_period(674), 

84 ( 

85 Time("2025-05-12T00:00:00", format="isot", scale="tai"), 

86 Time("2025-06-11T00:00:00", format="isot", scale="tai"), 

87 ), 

88 ) 

89 self.assertEqual( 

90 partitioner.partition_period(800), 

91 ( 

92 Time("2035-09-17T00:00:00", format="isot", scale="tai"), 

93 Time("2035-10-17T00:00:00", format="isot", scale="tai"), 

94 ), 

95 ) 

96 

97 def test_spatial_where(self) -> None: 

98 """Test spatial_where() method.""" 

99 region = Box.fromDegrees(-0.05, 0.05, 0.05, 0.15) 

100 

101 partitioner = self.make_partitioner() 

102 result, count = partitioner.spatial_where(region) 

103 self.assertEqual(count, 4) 

104 self.assertEqual(result, [('"apdb_part" IN (12058622,12058623,12058624,12058625)', ())]) 

105 result, count = partitioner.spatial_where(region, use_ranges=True) 

106 self.assertEqual(count, 4) 

107 self.assertEqual(result, [('"apdb_part" >= %s AND "apdb_part" <= %s', (12058622, 12058625))]) 

108 

109 partitioner = self.make_partitioner(query_per_spatial_part=True) 

110 result, count = partitioner.spatial_where(region) 

111 self.assertEqual(count, 4) 

112 self.assertEqual( 

113 set(result), 

114 { 

115 ('"apdb_part" = %s', (12058622,)), 

116 ('"apdb_part" = %s', (12058623,)), 

117 ('"apdb_part" = %s', (12058624,)), 

118 ('"apdb_part" = %s', (12058625,)), 

119 }, 

120 ) 

121 result, count = partitioner.spatial_where(region, for_prepare=True) 

122 self.assertEqual(count, 4) 

123 self.assertEqual( 

124 set(result), 

125 { 

126 ('"apdb_part" = ?', (12058622,)), 

127 ('"apdb_part" = ?', (12058623,)), 

128 ('"apdb_part" = ?', (12058624,)), 

129 ('"apdb_part" = ?', (12058625,)), 

130 }, 

131 ) 

132 result, count = partitioner.spatial_where(region, use_ranges=True, for_prepare=True) 

133 self.assertEqual(count, 4) 

134 self.assertEqual(result, [('"apdb_part" >= ? AND "apdb_part" <= ?', (12058622, 12058625))]) 

135 

136 def _check_temporal_where( 

137 self, 

138 tables: list[str], 

139 where: list[tuple], 

140 part_start: int, 

141 part_end: int, 

142 *, 

143 time_partition_tables: bool = False, 

144 query_per_time_part: bool = False, 

145 for_prepare: bool = False, 

146 ) -> None: 

147 if part_start > part_end: 

148 self.assertEqual(tables, []) 

149 self.assertEqual(where, []) 

150 elif time_partition_tables: 

151 expect_tables = [f"DiaSource_{part}" for part in range(part_start, part_end + 1)] 

152 self.assertEqual(tables, expect_tables) 

153 self.assertEqual(where, []) 

154 elif query_per_time_part: 

155 where_str = '"apdb_time_part" = ?' if for_prepare else '"apdb_time_part" = %s' 

156 expect_where = [(where_str, (part,)) for part in range(part_start, part_end + 1)] 

157 self.assertEqual(tables, ["DiaSource"]) 

158 self.assertEqual(where, expect_where) 

159 else: 

160 parts_str = ",".join(str(part) for part in range(part_start, part_end + 1)) 

161 self.assertEqual(tables, ["DiaSource"]) 

162 self.assertEqual(where, [(f'"apdb_time_part" IN ({parts_str})', ())]) 

163 

164 def test_temporal_where(self) -> None: 

165 """Test temporal_where() method.""" 

166 start_time = Time("2025-01-01T00:00:00", format="isot", scale="tai") 

167 end_time = Time("2025-06-01T00:00:00", format="isot", scale="tai") 

168 

169 partitioner = self.make_partitioner() 

170 

171 tables, where = partitioner.temporal_where(ApdbTables.DiaSource, start_time, end_time) 

172 self._check_temporal_where(tables, where, 669, 674) 

173 

174 tables, where = partitioner.temporal_where( 

175 ApdbTables.DiaSource, start_time, end_time, query_per_time_part=True 

176 ) 

177 self._check_temporal_where(tables, where, 669, 674, query_per_time_part=True) 

178 

179 tables, where = partitioner.temporal_where( 

180 ApdbTables.DiaSource, 

181 start_time, 

182 end_time, 

183 query_per_time_part=True, 

184 for_prepare=True, 

185 ) 

186 self._check_temporal_where(tables, where, 669, 674, query_per_time_part=True, for_prepare=True) 

187 

188 partitioner = self.make_partitioner(query_per_time_part=True) 

189 

190 tables, where = partitioner.temporal_where(ApdbTables.DiaSource, start_time, end_time) 

191 self._check_temporal_where(tables, where, 669, 674, query_per_time_part=True) 

192 

193 partitioner = self.make_partitioner(time_partition_tables=True) 

194 tables, where = partitioner.temporal_where(ApdbTables.DiaSource, start_time, end_time) 

195 self._check_temporal_where(tables, where, 669, 674, time_partition_tables=True) 

196 

197 # Check additional partition range constraint. 

198 ranges = [ 

199 ((0, 1000), (669, 674)), 

200 ((0, 1), (0, -1)), 

201 ((600, 670), (669, 670)), 

202 ((670, 770), (670, 674)), 

203 ((671, 672), (671, 672)), 

204 ] 

205 

206 partitioner = self.make_partitioner() 

207 for (range_start, range_end), (result_start, result_end) in ranges: 

208 part_range = ApdbCassandraTimePartitionRange(start=range_start, end=range_end) 

209 tables, where = partitioner.temporal_where( 

210 ApdbTables.DiaSource, start_time, end_time, partitons_range=part_range 

211 ) 

212 self._check_temporal_where(tables, where, result_start, result_end) 

213 

214 partitioner = self.make_partitioner(time_partition_tables=True) 

215 for (range_start, range_end), (result_start, result_end) in ranges: 

216 part_range = ApdbCassandraTimePartitionRange(start=range_start, end=range_end) 

217 tables, where = partitioner.temporal_where( 

218 ApdbTables.DiaSource, start_time, end_time, partitons_range=part_range 

219 ) 

220 self._check_temporal_where(tables, where, result_start, result_end, time_partition_tables=True) 

221 

222 

223if __name__ == "__main__": 

224 unittest.main()