Coverage for python / lsst / dax / apdb / cassandra / config.py: 73%
78 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-21 10:35 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-21 10:35 +0000
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
22from __future__ import annotations
24__all__ = [
25 "ApdbCassandraConfig",
26 "ApdbCassandraConnectionConfig",
27 "ApdbCassandraPartitioningConfig",
28 "ApdbCassandraTimePartitionRange",
29]
31import json
32from collections.abc import Iterable, Iterator
33from typing import TYPE_CHECKING, Any, ClassVar, Self
35from pydantic import BaseModel, Field, field_validator
37# If cassandra-driver is not there the module can still be imported.
38try:
39 import cassandra
41 CASSANDRA_IMPORTED = True
42except ImportError:
43 CASSANDRA_IMPORTED = False
46from ..config import ApdbConfig
48if TYPE_CHECKING:
49 from .apdbMetadataCassandra import ApdbMetadataCassandra
52class ApdbCassandraConnectionConfig(BaseModel):
53 """Connection configuration for Cassandra APDB."""
55 port: int = Field(
56 default=9042,
57 description="Port number to connect to.",
58 )
60 private_ips: tuple[str, ...] = Field(
61 default=(),
62 description="List of internal IP addresses for contact_points.",
63 )
65 username: str = Field(
66 default="",
67 description=(
68 "Cassandra user name, if empty then db-auth.yaml has to provide it together with a password."
69 ),
70 )
72 read_consistency: str = Field(
73 default="QUORUM",
74 description="Name for consistency level of read operations, default: QUORUM, can be ONE.",
75 )
77 write_consistency: str = Field(
78 default="QUORUM",
79 description="Name for consistency level of write operations, default: QUORUM, can be ONE.",
80 )
82 read_timeout: float = Field(
83 default=120.0,
84 description="Timeout in seconds for read operations.",
85 )
87 write_timeout: float = Field(
88 default=60.0,
89 description="Timeout in seconds for write operations.",
90 )
92 remove_timeout: float = Field(
93 default=600.0,
94 description="Timeout in seconds for remove operations.",
95 )
97 read_concurrency: int = Field(
98 default=500,
99 description="Concurrency level for read operations.",
100 )
102 protocol_version: int = Field(
103 default=cassandra.ProtocolVersion.V4 if CASSANDRA_IMPORTED else 4,
104 description="Cassandra protocol version to use, default is V4.",
105 )
107 extra_parameters: dict[str, Any] = Field(
108 default={}, description="Additional keyword parameters passed to connect() method verbatim."
109 )
112class ApdbCassandraPartitioningConfig(BaseModel):
113 """Partitioning configuration for Cassandra APDB."""
115 part_pixelization: str = Field(
116 default="mq3c",
117 description="Pixelization used for partitioning index.",
118 )
120 part_pix_level: int = Field(
121 default=11,
122 description="Pixelization level used for partitioning index.",
123 )
125 part_pix_max_ranges: int = Field(
126 default=128,
127 description="Max number of ranges in pixelization envelope",
128 )
130 time_partition_tables: bool = Field(
131 default=False,
132 description="Use per-partition tables for sources instead of partitioning by time",
133 )
135 time_partition_days: int = Field(
136 default=30,
137 description=(
138 "Time partitioning granularity in days, this value must not be changed after database is "
139 "initialized"
140 ),
141 )
143 time_partition_start: str = Field(
144 default="2018-12-01T00:00:00",
145 description=(
146 "Starting time for per-partition tables, in yyyy-mm-ddThh:mm:ss format, in TAI. "
147 "This is used only when time_partition_tables is True."
148 ),
149 )
151 time_partition_end: str = Field(
152 default="2030-01-01T00:00:00",
153 description=(
154 "Ending time for per-partition tables, in yyyy-mm-ddThh:mm:ss format, in TAI. "
155 "This is used only when time_partition_tables is True."
156 ),
157 )
159 query_per_time_part: bool = Field(
160 default=False,
161 description=(
162 "If True then build separate query for each time partition, otherwise build one single query. "
163 "This is only used when time_partition_tables is False in schema config."
164 ),
165 )
167 query_per_spatial_part: bool = Field(
168 default=False,
169 description="If True then build one query per spatial partition, otherwise build single query.",
170 )
172 num_part_dedup: int = Field(
173 default=64,
174 description="Number of partitions in DiaObjectDedup table.",
175 )
177 @field_validator("part_pixelization")
178 @classmethod
179 def check_pixelization(cls, v: str) -> str:
180 allowed = {"htm", "q3c", "mq3c", "healpix"}
181 if v not in allowed:
182 raise ValueError(f"Unexpected value for part_pixelization: {v}, allowed values: {allowed}")
183 return v
186class ApdbCassandraConfig(ApdbConfig):
187 """Configuration class for Cassandra-based APDB implementation."""
189 _implementation_type: ClassVar[str] = "cassandra"
191 contact_points: tuple[str, ...] = Field(
192 default=("127.0.0.1",),
193 description="The list of contact points to try connecting for cluster discovery.",
194 )
196 keyspace: str = Field(
197 default="apdb",
198 description="Keyspace name for APDB tables.",
199 )
201 connection_config: ApdbCassandraConnectionConfig = Field(
202 default_factory=ApdbCassandraConnectionConfig,
203 description="Database connection configuration",
204 )
206 partitioning: ApdbCassandraPartitioningConfig = Field(
207 default_factory=ApdbCassandraPartitioningConfig,
208 description="Configuration for partitioning.",
209 )
211 dia_object_columns: list[str] = Field(
212 default=[],
213 description="List of columns to read from DiaObject[Last], by default read all columns.",
214 )
216 prefix: str = Field(
217 default="",
218 description="Prefix to add to table names.",
219 )
221 ra_dec_columns: tuple[str, str] = Field(
222 default=("ra", "dec"),
223 description="Names of ra/dec columns in DiaObject table",
224 )
226 replica_skips_diaobjects: bool = Field(
227 default=False,
228 description=(
229 "If True then do not store DiaObjects when enable_replica is True "
230 "(DiaObjectsChunks has the same data)."
231 ),
232 )
234 replica_sub_chunk_count: int = Field(
235 default=64,
236 description="Number of sub-partitions in replica chunk tables.",
237 )
239 batch_statement_limit: int = Field(
240 default=65_535,
241 description=(
242 "Limit on a number of rows in a BatchStatement. Default is the same as Cassandra limit of 65535."
243 ),
244 )
246 batch_size_limit: int = Field(
247 default=1_000_000,
248 description=(
249 "Limit on a size of BatchStatement in bytes. Batch size is estimated approximately. "
250 "Set to 0 or negative to disable this limit. "
251 "Server-side batch size warning threshold needs to be set to at least this value."
252 ),
253 )
255 @field_validator("ra_dec_columns")
256 @classmethod
257 def check_ra_dec(cls, v: Iterable[str]) -> tuple[str, str]:
258 # This validation method is needed in case we initialize model from
259 # JSON in strict mode, in that mode JSON list is rejected by default.
260 vtup = tuple(v)
261 if len(vtup) != 2:
262 raise ValueError("ra_dec_columns must have exactly two column names")
263 return vtup
266class ApdbCassandraTimePartitionRange(BaseModel):
267 """Configuration of the time partitions, this is not user-configurable,
268 but it is reflected in metadata.
269 """
271 metadataTimePartitionKey: ClassVar[str] = "config:time-partition-range.json"
272 """Name of the metadata key to store time partition range."""
274 start: int = Field(
275 description="Start partition number for per-time-partition tables that exist in the schema."
276 )
278 end: int = Field(
279 description="End partition number (inclusive) for per-time-partition tables that exist in the schema."
280 )
282 def range(self) -> Iterator[int]:
283 """Generate a sequence of partition numbers."""
284 yield from range(self.start, self.end + 1)
286 @classmethod
287 def from_meta(cls, metadata: ApdbMetadataCassandra) -> Self:
288 """Read this configuration object from metadata table.
290 Parameters
291 ----------
292 metadata : `ApdbMetadataCassandra`
293 Metadata table.
295 Returns
296 -------
297 range : `ApdbCassandraTimePartitionRange`
298 Configuration retrieved from database.
299 """
300 time_partitions_str = metadata.get(cls.metadataTimePartitionKey)
301 if time_partitions_str is None:
302 raise LookupError(f"Key '{cls.metadataTimePartitionKey}' is missing from metadata table.")
303 time_partitions_json = json.loads(time_partitions_str)
304 return cls.model_validate(time_partitions_json)
306 def save_to_meta(self, metadata: ApdbMetadataCassandra) -> None:
307 """Save this configuration to metadata table.
309 Parameters
310 ----------
311 metadata : `ApdbMetadataCassandra`
312 Metadata table.
313 """
314 metadata.set(self.metadataTimePartitionKey, json.dumps(self.model_dump()), force=True)