Coverage for python / lsst / dax / apdb / cassandra / partitioner.py: 17%

79 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-17 08:58 +0000

1# This file is part of dax_apdb. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22from __future__ import annotations 

23 

24__all__ = ["Partitioner"] 

25 

26 

27import astropy.time 

28 

29from lsst import sphgeom 

30 

31from ..apdbSchema import ApdbTables 

32from ..pixelization import Pixelization 

33from .config import ApdbCassandraConfig, ApdbCassandraTimePartitionRange 

34from .queries import Column as C # noqa: N817 

35from .queries import QExpr 

36 

37 

38class Partitioner: 

39 """Logic for temporal and spatial partitioning of APDB tables. 

40 

41 Parameters 

42 ---------- 

43 config : `ApdbCassandraConfig` 

44 Configuration object. 

45 """ 

46 

47 partition_zero_epoch = astropy.time.Time(0, format="unix_tai") 

48 """Start time for partition 0, this should never be changed.""" 

49 

50 def __init__(self, config: ApdbCassandraConfig): 

51 self._config = config 

52 self.pixelization = Pixelization( 

53 config.partitioning.part_pixelization, 

54 config.partitioning.part_pix_level, 

55 config.partitioning.part_pix_max_ranges, 

56 ) 

57 self._epoch = float(self.partition_zero_epoch.mjd) 

58 

59 def pixel(self, direction: sphgeom.UnitVector3d) -> int: 

60 """Compute the index of the pixel for given direction. 

61 

62 Parameters 

63 ---------- 

64 direction : `lsst.sphgeom.UnitVector3d` 

65 Spatial position. 

66 

67 Returns 

68 ------- 

69 pixel : `int` 

70 Pixel index. 

71 """ 

72 return self.pixelization.pixel(direction) 

73 

74 def time_partition(self, time: float | astropy.time.Time) -> int: 

75 """Calculate time partition number for a given time. 

76 

77 Parameters 

78 ---------- 

79 time : `float` or `astropy.time.Time` 

80 Time for which to calculate partition number. Can be float to mean 

81 MJD or `astropy.time.Time` 

82 

83 Returns 

84 ------- 

85 partition : `int` 

86 Partition number for a given time. 

87 """ 

88 if isinstance(time, astropy.time.Time): 

89 mjd = float(time.mjd) 

90 else: 

91 mjd = time 

92 days_since_epoch = mjd - self._epoch 

93 partition = int(days_since_epoch) // self._config.partitioning.time_partition_days 

94 return partition 

95 

96 def partition_period(self, time_partition: int) -> tuple[astropy.time.Time, astropy.time.Time]: 

97 """Return time period for specified time partition. 

98 

99 Parameters 

100 ---------- 

101 time_partition : `int` 

102 Time partition. 

103 

104 Returns 

105 ------- 

106 start : `astropy.time.Time` 

107 Start of the period, inclusive boundary. 

108 end : `astropy.time.Time` 

109 Start of the period, exclusive boundary. 

110 """ 

111 partition_days = self._config.partitioning.time_partition_days 

112 start_mjd = self._epoch + partition_days * time_partition 

113 end_mjd = self._epoch + partition_days * (time_partition + 1) 

114 start = astropy.time.Time(start_mjd, format="mjd", scale="tai") 

115 end = astropy.time.Time(end_mjd, format="mjd", scale="tai") 

116 return (start, end) 

117 

118 def spatial_where( 

119 self, region: sphgeom.Region | None, *, use_ranges: bool = False 

120 ) -> tuple[list[QExpr], int]: 

121 """Generate expressions for spatial part of WHERE clause. 

122 

123 Parameters 

124 ---------- 

125 region : `sphgeom.Region` 

126 Spatial region for query results. 

127 use_ranges : `bool`, optional 

128 If True then use pixel ranges ("apdb_part >= p1 AND apdb_part <= 

129 p2") instead of exact list of pixels. Should be set to True for 

130 large regions covering very many pixels. 

131 

132 Returns 

133 ------- 

134 expressions : `list` [ `QExpr` ] 

135 Empty list is returned if ``region`` is `None`, otherwise a list 

136 of one or more `QExpr`. 

137 partition_count : `int` 

138 Number of spatial partitions in the result. 

139 """ 

140 if region is None: 

141 return [], 0 

142 

143 count = 0 

144 expressions: list[QExpr] = [] 

145 if use_ranges: 

146 pixel_ranges = self.pixelization.envelope(region) 

147 for lower, upper in pixel_ranges: 

148 upper -= 1 

149 if lower == upper: 

150 expressions.append(C("apdb_part") == lower) 

151 count += 1 

152 elif lower + 1 == upper: 

153 expressions.append(C("apdb_part") == lower) 

154 expressions.append(C("apdb_part") == upper) 

155 count += 2 

156 else: 

157 count += upper - lower + 1 

158 expressions.append((C("apdb_part") >= lower) & (C("apdb_part") <= upper)) 

159 else: 

160 pixels = self.pixelization.pixels(region) 

161 count = len(pixels) 

162 if self._config.partitioning.query_per_spatial_part: 

163 expressions.extend((C("apdb_part") == pixel) for pixel in pixels) 

164 else: 

165 # If the are many pixels then don't prepare statements. 

166 can_prepare = len(pixels) <= 3 

167 expressions.append(C("apdb_part").in_(pixels, can_prepare=can_prepare)) 

168 

169 return expressions, count 

170 

171 def temporal_where( 

172 self, 

173 table: ApdbTables, 

174 start_time: float | astropy.time.Time, 

175 end_time: float | astropy.time.Time, 

176 *, 

177 query_per_time_part: bool | None = None, 

178 partitons_range: ApdbCassandraTimePartitionRange | None = None, 

179 ) -> tuple[list[str], list[QExpr]]: 

180 """Generate table names and expressions for temporal part of WHERE 

181 clauses. 

182 

183 Parameters 

184 ---------- 

185 table : `ApdbTables` 

186 Table to select from. 

187 start_time : `astropy.time.Time` or `float` 

188 Starting Datetime of MJD value of the time range. 

189 end_time : `astropy.time.Time` or `float` 

190 Starting Datetime of MJD value of the time range. 

191 query_per_time_part : `bool`, optional 

192 If None then use ``query_per_time_part`` from configuration. 

193 partitons_range : `ApdbCassandraTimePartitionRange` or `None` 

194 Partitions range to further restrict time range. 

195 

196 Returns 

197 ------- 

198 tables : `list` [ `str` ] 

199 List of the table names to query. Empty list is returned when time 

200 range does not overlap ``partitons_range``. 

201 expressions : `list` [ `QExpr` ] 

202 A list of zero or more `QExpr` instances. 

203 """ 

204 tables: list[str] 

205 temporal_where: list[QExpr] = [] 

206 # First and last partition. 

207 time_part_start = self.time_partition(start_time) 

208 time_part_end = self.time_partition(end_time) 

209 if partitons_range: 

210 # Check for non-overlapping ranges. 

211 if time_part_start > partitons_range.end or time_part_end < partitons_range.start: 

212 return [], [] 

213 if time_part_start < partitons_range.start: 

214 time_part_start = partitons_range.start 

215 if time_part_end > partitons_range.end: 

216 time_part_end = partitons_range.end 

217 # Inclusive range. 

218 time_parts = list(range(time_part_start, time_part_end + 1)) 

219 if self._config.partitioning.time_partition_tables: 

220 tables = [table.table_name(self._config.prefix, part) for part in time_parts] 

221 else: 

222 tables = [table.table_name(self._config.prefix)] 

223 if query_per_time_part is None: 

224 query_per_time_part = self._config.partitioning.query_per_time_part 

225 if query_per_time_part: 

226 temporal_where = [QExpr('"apdb_time_part" = {}', (time_part,)) for time_part in time_parts] 

227 else: 

228 # If the are many partitions then don't prepare statements. 

229 can_prepare = len(time_parts) <= 3 

230 temporal_where = [C("apdb_time_part").in_(time_parts, can_prepare=can_prepare)] 

231 

232 return tables, temporal_where