Coverage for python / lsst / dax / apdb / sql / config.py: 66%

47 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 08:48 +0000

1# This file is part of dax_apdb. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22from __future__ import annotations 

23 

24__all__ = ["ApdbSqlConfig", "ApdbSqlConnectionConfig", "ApdbSqlPixelizationConfig"] 

25 

26from collections.abc import Iterable 

27from typing import Any, ClassVar 

28 

29from pydantic import BaseModel, Field, field_validator 

30 

31from ..config import ApdbConfig 

32 

33 

34class ApdbSqlConnectionConfig(BaseModel): 

35 """Configuration of connection parameters.""" 

36 

37 isolation_level: str | None = Field( 

38 default=None, 

39 description=( 

40 "Transaction isolation level, if unset then backend-default value " 

41 "is used, except for SQLite backend where we use READ_UNCOMMITTED. " 

42 "Some backends may not support every allowed value." 

43 ), 

44 ) 

45 

46 connection_pool: bool = Field( 

47 default=True, 

48 description=( 

49 "If False then disable SQLAlchemy connection pool. Do not use connection pool when forking." 

50 ), 

51 ) 

52 

53 connection_timeout: float | None = Field( 

54 default=None, 

55 description=( 

56 "Maximum time to wait time for database lock to be released before " 

57 "exiting. Defaults to sqlalchemy defaults if not set." 

58 ), 

59 ) 

60 

61 extra_parameters: dict[str, Any] = Field( 

62 default={}, description="Additional keyword parameters passed to connect() method verbatim." 

63 ) 

64 

65 @field_validator("isolation_level") 

66 @classmethod 

67 def check_isolation_level(cls, v: str) -> str: 

68 allowed = {None, "READ_COMMITTED", "READ_UNCOMMITTED", "REPEATABLE_READ", "SERIALIZABLE"} 

69 if v not in allowed: 

70 raise ValueError(f"Unexpected value for isolation_level: {v}, allowed values: {allowed}") 

71 return v 

72 

73 

74class ApdbSqlPixelizationConfig(BaseModel): 

75 """Pixelization configuration for SQL-based APDB.""" 

76 

77 htm_level: int = Field( 

78 default=20, 

79 description="HTM indexing level", 

80 ) 

81 

82 htm_max_ranges: int = Field( 

83 default=64, 

84 description="Max number of ranges in HTM envelope", 

85 ) 

86 

87 htm_index_column: str = Field( 

88 default="pixelId", 

89 description="Name of a HTM index column for DiaObject and DiaSource tables", 

90 ) 

91 

92 

93class ApdbSqlConfig(ApdbConfig): 

94 """APDB configuration class for SQL implementation (ApdbSql).""" 

95 

96 _implementation_type: ClassVar[str] = "sql" 

97 

98 db_url: str = Field(description="SQLAlchemy database connection URI.") 

99 

100 namespace: str | None = Field( 

101 default=None, 

102 description=( 

103 "Namespace or schema name for all tables in APDB database. " 

104 "Presently only works for PostgreSQL backend. If schema with this name does " 

105 "not exist it will be created when APDB tables are created." 

106 ), 

107 ) 

108 

109 connection_config: ApdbSqlConnectionConfig = Field( 

110 default_factory=ApdbSqlConnectionConfig, 

111 description="Database connection configuration", 

112 ) 

113 

114 pixelization: ApdbSqlPixelizationConfig = Field( 

115 default_factory=ApdbSqlPixelizationConfig, 

116 description="Configuration for pixelization.", 

117 ) 

118 

119 dia_object_index: str = Field( 

120 default="baseline", 

121 description=( 

122 'Indexing mode for DiaObject table. Allowed value is one of "baseline", ' 

123 '"pix_id_iov", or "last_object_table".' 

124 ), 

125 ) 

126 

127 ra_dec_columns: tuple[str, str] = Field( 

128 default=("ra", "dec"), 

129 description="Names of ra/dec columns in DiaObject table.", 

130 ) 

131 

132 dia_object_columns: list[str] = Field( 

133 default=[], 

134 description="List of columns to read from DiaObject, by default read all columns", 

135 ) 

136 

137 dia_object_columns_for_dedup: list[str] = Field( 

138 default=["ra", "dec", "nDiaSources", "firstDiaSourceMjdTai"], 

139 description=( 

140 "Columns to read from DiaObject for deduplication, empty list means all columns. " 

141 "'diaObjectId' and 'validityStart[MjdTai]' are always added to this list. " 

142 ), 

143 ) 

144 

145 prefix: str = Field( 

146 default="", 

147 description="Prefix to add to table names and index names.", 

148 ) 

149 

150 @field_validator("ra_dec_columns") 

151 @classmethod 

152 def check_ra_dec(cls, v: Iterable[str]) -> tuple[str, str]: 

153 # This validation method is needed in case we initialize model from 

154 # JSON in strict mode, in that mode JSON list is rejected by default. 

155 vtup = tuple(v) 

156 if len(vtup) != 2: 

157 raise ValueError("ra_dec_columns must have exactly two column names") 

158 return vtup 

159 

160 @field_validator("dia_object_index") 

161 @classmethod 

162 def check_dia_object_index(cls, v: str) -> str: 

163 allowed = {"baseline", "pix_id_iov", "last_object_table"} 

164 if v not in allowed: 

165 raise ValueError(f"Unexpected value for dia_object_index: {v}, allowed values: {allowed}") 

166 return v