Coverage for python / lsst / dax / apdb / cassandra / partitioner.py: 15%
79 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:46 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:46 +0000
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
22from __future__ import annotations
24__all__ = ["Partitioner"]
27import astropy.time
29from lsst import sphgeom
31from ..apdbSchema import ApdbTables
32from ..pixelization import Pixelization
33from .config import ApdbCassandraConfig, ApdbCassandraTimePartitionRange
36class Partitioner:
37 """Logic for temporal and spacial partitiong of APDB tables.
39 Parameters
40 ----------
41 config : `ApdbCassandraConfig`
42 Configuration object.
43 """
45 partition_zero_epoch = astropy.time.Time(0, format="unix_tai")
46 """Start time for partition 0, this should never be changed."""
48 def __init__(self, config: ApdbCassandraConfig):
49 self._config = config
50 self.pixelization = Pixelization(
51 config.partitioning.part_pixelization,
52 config.partitioning.part_pix_level,
53 config.partitioning.part_pix_max_ranges,
54 )
55 self._epoch = float(self.partition_zero_epoch.mjd)
57 def pixel(self, direction: sphgeom.UnitVector3d) -> int:
58 """Compute the index of the pixel for given direction.
60 Parameters
61 ----------
62 direction : `lsst.sphgeom.UnitVector3d`
63 Spatial position.
65 Returns
66 -------
67 pixel : `int`
68 Pixel index.
69 """
70 return self.pixelization.pixel(direction)
72 def time_partition(self, time: float | astropy.time.Time) -> int:
73 """Calculate time partition number for a given time.
75 Parameters
76 ----------
77 time : `float` or `astropy.time.Time`
78 Time for which to calculate partition number. Can be float to mean
79 MJD or `astropy.time.Time`
81 Returns
82 -------
83 partition : `int`
84 Partition number for a given time.
85 """
86 if isinstance(time, astropy.time.Time):
87 mjd = float(time.mjd)
88 else:
89 mjd = time
90 days_since_epoch = mjd - self._epoch
91 partition = int(days_since_epoch) // self._config.partitioning.time_partition_days
92 return partition
94 def partition_period(self, time_partition: int) -> tuple[astropy.time.Time, astropy.time.Time]:
95 """Return time period for specified taime partition.
97 Parameters
98 ----------
99 time_partition : `int`
100 Time partition.
102 Returns
103 -------
104 start : `astropy.time.Time`
105 Start of the period, inclusive boundary.
106 end : `astropy.time.Time`
107 Start of the period, exclusive boundary.
108 """
109 partition_days = self._config.partitioning.time_partition_days
110 start_mjd = self._epoch + partition_days * time_partition
111 end_mjd = self._epoch + partition_days * (time_partition + 1)
112 start = astropy.time.Time(start_mjd, format="mjd", scale="tai")
113 end = astropy.time.Time(end_mjd, format="mjd", scale="tai")
114 return (start, end)
116 def spatial_where(
117 self, region: sphgeom.Region | None, *, use_ranges: bool = False, for_prepare: bool = False
118 ) -> tuple[list[tuple[str, tuple]], int]:
119 """Generate expressions for spatial part of WHERE clause.
121 Parameters
122 ----------
123 region : `sphgeom.Region`
124 Spatial region for query results.
125 use_ranges : `bool`, optional
126 If True then use pixel ranges ("apdb_part >= p1 AND apdb_part <=
127 p2") instead of exact list of pixels. Should be set to True for
128 large regions covering very many pixels.
129 for_prepare : `bool`, optional
130 If True then use placeholders for prepared statement (?), otherwise
131 produce regulr statement placeholders (%s).
133 Returns
134 -------
135 expressions : `list` [ `tuple` ]
136 Empty list is returned if ``region`` is `None`, otherwise a list
137 of one or more ``(expression: str, parameters: tuple)`` tuples.
138 partition_count : `int`
139 Number of spatial partitions in the result.
140 """
141 if region is None:
142 return [], 0
144 token = "?" if for_prepare else "%s"
146 count = 0
147 expressions: list[tuple[str, tuple]] = []
148 if use_ranges:
149 pixel_ranges = self.pixelization.envelope(region)
150 for lower, upper in pixel_ranges:
151 upper -= 1
152 if lower == upper:
153 expressions.append((f'"apdb_part" = {token}', (lower,)))
154 count += 1
155 elif lower + 1 == upper:
156 expressions.append((f'"apdb_part" = {token}', (lower,)))
157 expressions.append((f'"apdb_part" = {token}', (upper,)))
158 count += 2
159 else:
160 count += upper - lower + 1
161 expressions.append((f'"apdb_part" >= {token} AND "apdb_part" <= {token}', (lower, upper)))
162 else:
163 pixels = self.pixelization.pixels(region)
164 count = len(pixels)
165 if self._config.partitioning.query_per_spatial_part:
166 expressions.extend((f'"apdb_part" = {token}', (pixel,)) for pixel in pixels)
167 else:
168 pixels_str = ",".join([str(pix) for pix in pixels])
169 expressions.append((f'"apdb_part" IN ({pixels_str})', ()))
171 return expressions, count
173 def temporal_where(
174 self,
175 table: ApdbTables,
176 start_time: float | astropy.time.Time,
177 end_time: float | astropy.time.Time,
178 *,
179 query_per_time_part: bool | None = None,
180 for_prepare: bool = False,
181 partitons_range: ApdbCassandraTimePartitionRange | None = None,
182 ) -> tuple[list[str], list[tuple[str, tuple]]]:
183 """Generate table names and expressions for temporal part of WHERE
184 clauses.
186 Parameters
187 ----------
188 table : `ApdbTables`
189 Table to select from.
190 start_time : `astropy.time.Time` or `float`
191 Starting Datetime of MJD value of the time range.
192 end_time : `astropy.time.Time` or `float`
193 Starting Datetime of MJD value of the time range.
194 query_per_time_part : `bool`, optional
195 If None then use ``query_per_time_part`` from configuration.
196 for_prepare : `bool`, optional
197 If True then use placeholders for prepared statement (?), otherwise
198 produce regulr statement placeholders (%s).
199 partitons_range : `ApdbCassandraTimePartitionRange` or `None`
200 Partitions range to further restrict time range.
202 Returns
203 -------
204 tables : `list` [ `str` ]
205 List of the table names to query. Empty list is returned when time
206 range does not overlap ``partitons_range``.
207 expressions : `list` [ `tuple` ]
208 A list of zero or more ``(expression: str, parameters: tuple)``
209 tuples.
210 """
211 tables: list[str]
212 temporal_where: list[tuple[str, tuple]] = []
213 # First and last partition.
214 time_part_start = self.time_partition(start_time)
215 time_part_end = self.time_partition(end_time)
216 if partitons_range:
217 # Check for non-overlapping ranges.
218 if time_part_start > partitons_range.end or time_part_end < partitons_range.start:
219 return [], []
220 if time_part_start < partitons_range.start:
221 time_part_start = partitons_range.start
222 if time_part_end > partitons_range.end:
223 time_part_end = partitons_range.end
224 # Inclusive range.
225 time_parts = list(range(time_part_start, time_part_end + 1))
226 if self._config.partitioning.time_partition_tables:
227 tables = [table.table_name(self._config.prefix, part) for part in time_parts]
228 else:
229 token = "?" if for_prepare else "%s"
230 tables = [table.table_name(self._config.prefix)]
231 if query_per_time_part is None:
232 query_per_time_part = self._config.partitioning.query_per_time_part
233 if query_per_time_part:
234 temporal_where = [(f'"apdb_time_part" = {token}', (time_part,)) for time_part in time_parts]
235 else:
236 time_part_list = ",".join([str(part) for part in time_parts])
237 temporal_where = [(f'"apdb_time_part" IN ({time_part_list})', ())]
239 return tables, temporal_where