Coverage for python / lsst / dax / apdb / cassandra / partitioner.py: 17%
79 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 08:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 08:48 +0000
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
22from __future__ import annotations
24__all__ = ["Partitioner"]
27import astropy.time
29from lsst import sphgeom
31from ..apdbSchema import ApdbTables
32from ..pixelization import Pixelization
33from .config import ApdbCassandraConfig, ApdbCassandraTimePartitionRange
34from .queries import Column as C # noqa: N817
35from .queries import QExpr
38class Partitioner:
39 """Logic for temporal and spatial partitioning of APDB tables.
41 Parameters
42 ----------
43 config : `ApdbCassandraConfig`
44 Configuration object.
45 """
47 partition_zero_epoch = astropy.time.Time(0, format="unix_tai")
48 """Start time for partition 0, this should never be changed."""
50 def __init__(self, config: ApdbCassandraConfig):
51 self._config = config
52 self.pixelization = Pixelization(
53 config.partitioning.part_pixelization,
54 config.partitioning.part_pix_level,
55 config.partitioning.part_pix_max_ranges,
56 )
57 self._epoch = float(self.partition_zero_epoch.mjd)
59 def pixel(self, direction: sphgeom.UnitVector3d) -> int:
60 """Compute the index of the pixel for given direction.
62 Parameters
63 ----------
64 direction : `lsst.sphgeom.UnitVector3d`
65 Spatial position.
67 Returns
68 -------
69 pixel : `int`
70 Pixel index.
71 """
72 return self.pixelization.pixel(direction)
74 def time_partition(self, time: float | astropy.time.Time) -> int:
75 """Calculate time partition number for a given time.
77 Parameters
78 ----------
79 time : `float` or `astropy.time.Time`
80 Time for which to calculate partition number. Can be float to mean
81 MJD or `astropy.time.Time`
83 Returns
84 -------
85 partition : `int`
86 Partition number for a given time.
87 """
88 if isinstance(time, astropy.time.Time):
89 mjd = float(time.mjd)
90 else:
91 mjd = time
92 days_since_epoch = mjd - self._epoch
93 partition = int(days_since_epoch) // self._config.partitioning.time_partition_days
94 return partition
96 def partition_period(self, time_partition: int) -> tuple[astropy.time.Time, astropy.time.Time]:
97 """Return time period for specified time partition.
99 Parameters
100 ----------
101 time_partition : `int`
102 Time partition.
104 Returns
105 -------
106 start : `astropy.time.Time`
107 Start of the period, inclusive boundary.
108 end : `astropy.time.Time`
109 Start of the period, exclusive boundary.
110 """
111 partition_days = self._config.partitioning.time_partition_days
112 start_mjd = self._epoch + partition_days * time_partition
113 end_mjd = self._epoch + partition_days * (time_partition + 1)
114 start = astropy.time.Time(start_mjd, format="mjd", scale="tai")
115 end = astropy.time.Time(end_mjd, format="mjd", scale="tai")
116 return (start, end)
118 def spatial_where(
119 self, region: sphgeom.Region | None, *, use_ranges: bool = False
120 ) -> tuple[list[QExpr], int]:
121 """Generate expressions for spatial part of WHERE clause.
123 Parameters
124 ----------
125 region : `sphgeom.Region`
126 Spatial region for query results.
127 use_ranges : `bool`, optional
128 If True then use pixel ranges ("apdb_part >= p1 AND apdb_part <=
129 p2") instead of exact list of pixels. Should be set to True for
130 large regions covering very many pixels.
132 Returns
133 -------
134 expressions : `list` [ `QExpr` ]
135 Empty list is returned if ``region`` is `None`, otherwise a list
136 of one or more `QExpr`.
137 partition_count : `int`
138 Number of spatial partitions in the result.
139 """
140 if region is None:
141 return [], 0
143 count = 0
144 expressions: list[QExpr] = []
145 if use_ranges:
146 pixel_ranges = self.pixelization.envelope(region)
147 for lower, upper in pixel_ranges:
148 upper -= 1
149 if lower == upper:
150 expressions.append(C("apdb_part") == lower)
151 count += 1
152 elif lower + 1 == upper:
153 expressions.append(C("apdb_part") == lower)
154 expressions.append(C("apdb_part") == upper)
155 count += 2
156 else:
157 count += upper - lower + 1
158 expressions.append((C("apdb_part") >= lower) & (C("apdb_part") <= upper))
159 else:
160 pixels = self.pixelization.pixels(region)
161 count = len(pixels)
162 if self._config.partitioning.query_per_spatial_part:
163 expressions.extend((C("apdb_part") == pixel) for pixel in pixels)
164 else:
165 # If the are many pixels then don't prepare statements.
166 can_prepare = len(pixels) <= 3
167 expressions.append(C("apdb_part").in_(pixels, can_prepare=can_prepare))
169 return expressions, count
171 def temporal_where(
172 self,
173 table: ApdbTables,
174 start_time: float | astropy.time.Time,
175 end_time: float | astropy.time.Time,
176 *,
177 query_per_time_part: bool | None = None,
178 partitons_range: ApdbCassandraTimePartitionRange | None = None,
179 ) -> tuple[list[str], list[QExpr]]:
180 """Generate table names and expressions for temporal part of WHERE
181 clauses.
183 Parameters
184 ----------
185 table : `ApdbTables`
186 Table to select from.
187 start_time : `astropy.time.Time` or `float`
188 Starting Datetime of MJD value of the time range.
189 end_time : `astropy.time.Time` or `float`
190 Starting Datetime of MJD value of the time range.
191 query_per_time_part : `bool`, optional
192 If None then use ``query_per_time_part`` from configuration.
193 partitons_range : `ApdbCassandraTimePartitionRange` or `None`
194 Partitions range to further restrict time range.
196 Returns
197 -------
198 tables : `list` [ `str` ]
199 List of the table names to query. Empty list is returned when time
200 range does not overlap ``partitons_range``.
201 expressions : `list` [ `QExpr` ]
202 A list of zero or more `QExpr` instances.
203 """
204 tables: list[str]
205 temporal_where: list[QExpr] = []
206 # First and last partition.
207 time_part_start = self.time_partition(start_time)
208 time_part_end = self.time_partition(end_time)
209 if partitons_range:
210 # Check for non-overlapping ranges.
211 if time_part_start > partitons_range.end or time_part_end < partitons_range.start:
212 return [], []
213 if time_part_start < partitons_range.start:
214 time_part_start = partitons_range.start
215 if time_part_end > partitons_range.end:
216 time_part_end = partitons_range.end
217 # Inclusive range.
218 time_parts = list(range(time_part_start, time_part_end + 1))
219 if self._config.partitioning.time_partition_tables:
220 tables = [table.table_name(self._config.prefix, part) for part in time_parts]
221 else:
222 tables = [table.table_name(self._config.prefix)]
223 if query_per_time_part is None:
224 query_per_time_part = self._config.partitioning.query_per_time_part
225 if query_per_time_part:
226 temporal_where = [QExpr('"apdb_time_part" = {}', (time_part,)) for time_part in time_parts]
227 else:
228 # If the are many partitions then don't prepare statements.
229 can_prepare = len(time_parts) <= 3
230 temporal_where = [C("apdb_time_part").in_(time_parts, can_prepare=can_prepare)]
232 return tables, temporal_where