Coverage for python / lsst / dax / apdb / cassandra / config.py: 72%
77 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 08:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 08:49 +0000
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
22from __future__ import annotations
24__all__ = [
25 "ApdbCassandraConfig",
26 "ApdbCassandraConnectionConfig",
27 "ApdbCassandraPartitioningConfig",
28 "ApdbCassandraTimePartitionRange",
29]
31import json
32from collections.abc import Iterable, Iterator
33from typing import TYPE_CHECKING, Any, ClassVar, Self
35from pydantic import BaseModel, Field, field_validator
37# If cassandra-driver is not there the module can still be imported.
38try:
39 import cassandra
41 CASSANDRA_IMPORTED = True
42except ImportError:
43 CASSANDRA_IMPORTED = False
46from ..config import ApdbConfig
48if TYPE_CHECKING:
49 from .apdbMetadataCassandra import ApdbMetadataCassandra
52class ApdbCassandraConnectionConfig(BaseModel):
53 """Connection configuration for Cassandra APDB."""
55 port: int = Field(
56 default=9042,
57 description="Port number to connect to.",
58 )
60 private_ips: tuple[str, ...] = Field(
61 default=(),
62 description="List of internal IP addresses for contact_points.",
63 )
65 username: str = Field(
66 default="",
67 description=(
68 "Cassandra user name, if empty then db-auth.yaml has to provide it together with a password."
69 ),
70 )
72 read_consistency: str = Field(
73 default="QUORUM",
74 description="Name for consistency level of read operations, default: QUORUM, can be ONE.",
75 )
77 write_consistency: str = Field(
78 default="QUORUM",
79 description="Name for consistency level of write operations, default: QUORUM, can be ONE.",
80 )
82 read_timeout: float = Field(
83 default=120.0,
84 description="Timeout in seconds for read operations.",
85 )
87 write_timeout: float = Field(
88 default=60.0,
89 description="Timeout in seconds for write operations.",
90 )
92 remove_timeout: float = Field(
93 default=600.0,
94 description="Timeout in seconds for remove operations.",
95 )
97 read_concurrency: int = Field(
98 default=500,
99 description="Concurrency level for read operations.",
100 )
102 protocol_version: int = Field(
103 default=cassandra.ProtocolVersion.V4 if CASSANDRA_IMPORTED else 4,
104 description="Cassandra protocol version to use, default is V4.",
105 )
107 extra_parameters: dict[str, Any] = Field(
108 default={}, description="Additional keyword parameters passed to connect() method verbatim."
109 )
112class ApdbCassandraPartitioningConfig(BaseModel):
113 """Partitioning configuration for Cassandra APDB."""
115 part_pixelization: str = Field(
116 default="mq3c",
117 description="Pixelization used for partitioning index.",
118 )
120 part_pix_level: int = Field(
121 default=11,
122 description="Pixelization level used for partitioning index.",
123 )
125 part_pix_max_ranges: int = Field(
126 default=128,
127 description="Max number of ranges in pixelization envelope",
128 )
130 time_partition_tables: bool = Field(
131 default=False,
132 description="Use per-partition tables for sources instead of partitioning by time",
133 )
135 time_partition_days: int = Field(
136 default=30,
137 description=(
138 "Time partitioning granularity in days, this value must not be changed after database is "
139 "initialized"
140 ),
141 )
143 time_partition_start: str = Field(
144 default="2018-12-01T00:00:00",
145 description=(
146 "Starting time for per-partition tables, in yyyy-mm-ddThh:mm:ss format, in TAI. "
147 "This is used only when time_partition_tables is True."
148 ),
149 )
151 time_partition_end: str = Field(
152 default="2030-01-01T00:00:00",
153 description=(
154 "Ending time for per-partition tables, in yyyy-mm-ddThh:mm:ss format, in TAI. "
155 "This is used only when time_partition_tables is True."
156 ),
157 )
159 query_per_time_part: bool = Field(
160 default=False,
161 description=(
162 "If True then build separate query for each time partition, otherwise build one single query. "
163 "This is only used when time_partition_tables is False in schema config."
164 ),
165 )
167 query_per_spatial_part: bool = Field(
168 default=False,
169 description="If True then build one query per spatial partition, otherwise build single query.",
170 )
172 @field_validator("part_pixelization")
173 @classmethod
174 def check_pixelization(cls, v: str) -> str:
175 allowed = {"htm", "q3c", "mq3c", "healpix"}
176 if v not in allowed:
177 raise ValueError(f"Unexpected value for part_pixelization: {v}, allowed values: {allowed}")
178 return v
181class ApdbCassandraConfig(ApdbConfig):
182 """Configuration class for Cassandra-based APDB implementation."""
184 _implementation_type: ClassVar[str] = "cassandra"
186 contact_points: tuple[str, ...] = Field(
187 default=("127.0.0.1",),
188 description="The list of contact points to try connecting for cluster discovery.",
189 )
191 keyspace: str = Field(
192 default="apdb",
193 description="Keyspace name for APDB tables.",
194 )
196 connection_config: ApdbCassandraConnectionConfig = Field(
197 default_factory=ApdbCassandraConnectionConfig,
198 description="Database connection configuration",
199 )
201 partitioning: ApdbCassandraPartitioningConfig = Field(
202 default_factory=ApdbCassandraPartitioningConfig,
203 description="Configuration for partitioning.",
204 )
206 dia_object_columns: list[str] = Field(
207 default=[],
208 description="List of columns to read from DiaObject[Last], by default read all columns.",
209 )
211 prefix: str = Field(
212 default="",
213 description="Prefix to add to table names.",
214 )
216 ra_dec_columns: tuple[str, str] = Field(
217 default=("ra", "dec"),
218 description="Names of ra/dec columns in DiaObject table",
219 )
221 replica_skips_diaobjects: bool = Field(
222 default=False,
223 description=(
224 "If True then do not store DiaObjects when enable_replica is True "
225 "(DiaObjectsChunks has the same data)."
226 ),
227 )
229 replica_sub_chunk_count: int = Field(
230 default=64,
231 description="Number of sub-partitions in replica chunk tables.",
232 )
234 batch_statement_limit: int = Field(
235 default=65_535,
236 description=(
237 "Limit on a number of rows in a BatchStatement. Default is the same as Cassandra limit of 65535."
238 ),
239 )
241 batch_size_limit: int = Field(
242 default=1_000_000,
243 description=(
244 "Limit on a size of BatchStatement in bytes. Batch size is estimated approximately. "
245 "Set to 0 or negative to disable this limit. "
246 "Server-side batch size warning threshold needs to be set to at least this value."
247 ),
248 )
250 @field_validator("ra_dec_columns")
251 @classmethod
252 def check_ra_dec(cls, v: Iterable[str]) -> tuple[str, str]:
253 # This validation method is needed in case we initialize model from
254 # JSON in strict mode, in that mode JSON list is rejected by default.
255 vtup = tuple(v)
256 if len(vtup) != 2:
257 raise ValueError("ra_dec_columns must have exactly two column names")
258 return vtup
261class ApdbCassandraTimePartitionRange(BaseModel):
262 """Configuration of the time partitions, this is not user-configurable,
263 but it is reflected in metadata.
264 """
266 metadataTimePartitionKey: ClassVar[str] = "config:time-partition-range.json"
267 """Name of the metadata key to store time partition range."""
269 start: int = Field(
270 description="Start partition number for per-time-partition tables that exist in the schema."
271 )
273 end: int = Field(
274 description="End partition number (inclusive) for per-time-partition tables that exist in the schema."
275 )
277 def range(self) -> Iterator[int]:
278 """Generate a sequence of partition numbers."""
279 yield from range(self.start, self.end + 1)
281 @classmethod
282 def from_meta(cls, metadata: ApdbMetadataCassandra) -> Self:
283 """Read this configuration object from metadata table.
285 Parameters
286 ----------
287 metadata : `ApdbMetadataCassandra`
288 Metadata table.
290 Returns
291 -------
292 range : `ApdbCassandraTimePartitionRange`
293 Configuration retrieved from database.
294 """
295 time_partitions_str = metadata.get(cls.metadataTimePartitionKey)
296 if time_partitions_str is None:
297 raise LookupError(f"Key '{cls.metadataTimePartitionKey}' is missing from metadata table.")
298 time_partitions_json = json.loads(time_partitions_str)
299 return cls.model_validate(time_partitions_json)
301 def save_to_meta(self, metadata: ApdbMetadataCassandra) -> None:
302 """Save this configuration to metadata table.
304 Parameters
305 ----------
306 metadata : `ApdbMetadataCassandra`
307 Metadata table.
308 """
309 metadata.set(self.metadataTimePartitionKey, json.dumps(self.model_dump()), force=True)