Coverage for python / lsst / dax / apdb / sql / config.py: 66%
47 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 08:58 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 08:58 +0000
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
22from __future__ import annotations
24__all__ = ["ApdbSqlConfig", "ApdbSqlConnectionConfig", "ApdbSqlPixelizationConfig"]
26from collections.abc import Iterable
27from typing import Any, ClassVar
29from pydantic import BaseModel, Field, field_validator
31from ..config import ApdbConfig
34class ApdbSqlConnectionConfig(BaseModel):
35 """Configuration of connection parameters."""
37 isolation_level: str | None = Field(
38 default=None,
39 description=(
40 "Transaction isolation level, if unset then backend-default value "
41 "is used, except for SQLite backend where we use READ_UNCOMMITTED. "
42 "Some backends may not support every allowed value."
43 ),
44 )
46 connection_pool: bool = Field(
47 default=True,
48 description=(
49 "If False then disable SQLAlchemy connection pool. Do not use connection pool when forking."
50 ),
51 )
53 connection_timeout: float | None = Field(
54 default=None,
55 description=(
56 "Maximum time to wait time for database lock to be released before "
57 "exiting. Defaults to sqlalchemy defaults if not set."
58 ),
59 )
61 extra_parameters: dict[str, Any] = Field(
62 default={}, description="Additional keyword parameters passed to connect() method verbatim."
63 )
65 @field_validator("isolation_level")
66 @classmethod
67 def check_isolation_level(cls, v: str) -> str:
68 allowed = {None, "READ_COMMITTED", "READ_UNCOMMITTED", "REPEATABLE_READ", "SERIALIZABLE"}
69 if v not in allowed:
70 raise ValueError(f"Unexpected value for isolation_level: {v}, allowed values: {allowed}")
71 return v
74class ApdbSqlPixelizationConfig(BaseModel):
75 """Pixelization configuration for SQL-based APDB."""
77 htm_level: int = Field(
78 default=20,
79 description="HTM indexing level",
80 )
82 htm_max_ranges: int = Field(
83 default=64,
84 description="Max number of ranges in HTM envelope",
85 )
87 htm_index_column: str = Field(
88 default="pixelId",
89 description="Name of a HTM index column for DiaObject and DiaSource tables",
90 )
93class ApdbSqlConfig(ApdbConfig):
94 """APDB configuration class for SQL implementation (ApdbSql)."""
96 _implementation_type: ClassVar[str] = "sql"
98 db_url: str = Field(description="SQLAlchemy database connection URI.")
100 namespace: str | None = Field(
101 default=None,
102 description=(
103 "Namespace or schema name for all tables in APDB database. "
104 "Presently only works for PostgreSQL backend. If schema with this name does "
105 "not exist it will be created when APDB tables are created."
106 ),
107 )
109 connection_config: ApdbSqlConnectionConfig = Field(
110 default_factory=ApdbSqlConnectionConfig,
111 description="Database connection configuration",
112 )
114 pixelization: ApdbSqlPixelizationConfig = Field(
115 default_factory=ApdbSqlPixelizationConfig,
116 description="Configuration for pixelization.",
117 )
119 dia_object_index: str = Field(
120 default="baseline",
121 description=(
122 'Indexing mode for DiaObject table. Allowed value is one of "baseline", '
123 '"pix_id_iov", or "last_object_table".'
124 ),
125 )
127 ra_dec_columns: tuple[str, str] = Field(
128 default=("ra", "dec"),
129 description="Names of ra/dec columns in DiaObject table.",
130 )
132 dia_object_columns: list[str] = Field(
133 default=[],
134 description="List of columns to read from DiaObject, by default read all columns",
135 )
137 dia_object_columns_for_dedup: list[str] = Field(
138 default=["ra", "dec", "nDiaSources", "firstDiaSourceMjdTai"],
139 description=(
140 "Columns to read from DiaObject for deduplication, empty list means all columns. "
141 "'diaObjectId' and 'validityStart[MjdTai]' are always added to this list. "
142 ),
143 )
145 prefix: str = Field(
146 default="",
147 description="Prefix to add to table names and index names.",
148 )
150 @field_validator("ra_dec_columns")
151 @classmethod
152 def check_ra_dec(cls, v: Iterable[str]) -> tuple[str, str]:
153 # This validation method is needed in case we initialize model from
154 # JSON in strict mode, in that mode JSON list is rejected by default.
155 vtup = tuple(v)
156 if len(vtup) != 2:
157 raise ValueError("ra_dec_columns must have exactly two column names")
158 return vtup
160 @field_validator("dia_object_index")
161 @classmethod
162 def check_dia_object_index(cls, v: str) -> str:
163 allowed = {"baseline", "pix_id_iov", "last_object_table"}
164 if v not in allowed:
165 raise ValueError(f"Unexpected value for dia_object_index: {v}, allowed values: {allowed}")
166 return v