Coverage for python / lsst / dax / apdb / cassandra / config.py: 72%

77 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:46 +0000

1# This file is part of dax_apdb. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22from __future__ import annotations 

23 

24__all__ = [ 

25 "ApdbCassandraConfig", 

26 "ApdbCassandraConnectionConfig", 

27 "ApdbCassandraPartitioningConfig", 

28 "ApdbCassandraTimePartitionRange", 

29] 

30 

31import json 

32from collections.abc import Iterable, Iterator 

33from typing import TYPE_CHECKING, Any, ClassVar, Self 

34 

35from pydantic import BaseModel, Field, field_validator 

36 

37# If cassandra-driver is not there the module can still be imported. 

38try: 

39 import cassandra 

40 

41 CASSANDRA_IMPORTED = True 

42except ImportError: 

43 CASSANDRA_IMPORTED = False 

44 

45 

46from ..config import ApdbConfig 

47 

48if TYPE_CHECKING: 

49 from .apdbMetadataCassandra import ApdbMetadataCassandra 

50 

51 

52class ApdbCassandraConnectionConfig(BaseModel): 

53 """Connection configuration for Cassandra APDB.""" 

54 

55 port: int = Field( 

56 default=9042, 

57 description="Port number to connect to.", 

58 ) 

59 

60 private_ips: tuple[str, ...] = Field( 

61 default=(), 

62 description="List of internal IP addresses for contact_points.", 

63 ) 

64 

65 username: str = Field( 

66 default="", 

67 description=( 

68 "Cassandra user name, if empty then db-auth.yaml has to provide it together with a password." 

69 ), 

70 ) 

71 

72 read_consistency: str = Field( 

73 default="QUORUM", 

74 description="Name for consistency level of read operations, default: QUORUM, can be ONE.", 

75 ) 

76 

77 write_consistency: str = Field( 

78 default="QUORUM", 

79 description="Name for consistency level of write operations, default: QUORUM, can be ONE.", 

80 ) 

81 

82 read_timeout: float = Field( 

83 default=120.0, 

84 description="Timeout in seconds for read operations.", 

85 ) 

86 

87 write_timeout: float = Field( 

88 default=60.0, 

89 description="Timeout in seconds for write operations.", 

90 ) 

91 

92 remove_timeout: float = Field( 

93 default=600.0, 

94 description="Timeout in seconds for remove operations.", 

95 ) 

96 

97 read_concurrency: int = Field( 

98 default=500, 

99 description="Concurrency level for read operations.", 

100 ) 

101 

102 protocol_version: int = Field( 

103 default=cassandra.ProtocolVersion.V4 if CASSANDRA_IMPORTED else 4, 

104 description="Cassandra protocol version to use, default is V4.", 

105 ) 

106 

107 extra_parameters: dict[str, Any] = Field( 

108 default={}, description="Additional keyword parameters passed to connect() method verbatim." 

109 ) 

110 

111 

112class ApdbCassandraPartitioningConfig(BaseModel): 

113 """Partitioning configuration for Cassandra APDB.""" 

114 

115 part_pixelization: str = Field( 

116 default="mq3c", 

117 description="Pixelization used for partitioning index.", 

118 ) 

119 

120 part_pix_level: int = Field( 

121 default=11, 

122 description="Pixelization level used for partitioning index.", 

123 ) 

124 

125 part_pix_max_ranges: int = Field( 

126 default=128, 

127 description="Max number of ranges in pixelization envelope", 

128 ) 

129 

130 time_partition_tables: bool = Field( 

131 default=False, 

132 description="Use per-partition tables for sources instead of partitioning by time", 

133 ) 

134 

135 time_partition_days: int = Field( 

136 default=30, 

137 description=( 

138 "Time partitioning granularity in days, this value must not be changed after database is " 

139 "initialized" 

140 ), 

141 ) 

142 

143 time_partition_start: str = Field( 

144 default="2018-12-01T00:00:00", 

145 description=( 

146 "Starting time for per-partition tables, in yyyy-mm-ddThh:mm:ss format, in TAI. " 

147 "This is used only when time_partition_tables is True." 

148 ), 

149 ) 

150 

151 time_partition_end: str = Field( 

152 default="2030-01-01T00:00:00", 

153 description=( 

154 "Ending time for per-partition tables, in yyyy-mm-ddThh:mm:ss format, in TAI. " 

155 "This is used only when time_partition_tables is True." 

156 ), 

157 ) 

158 

159 query_per_time_part: bool = Field( 

160 default=False, 

161 description=( 

162 "If True then build separate query for each time partition, otherwise build one single query. " 

163 "This is only used when time_partition_tables is False in schema config." 

164 ), 

165 ) 

166 

167 query_per_spatial_part: bool = Field( 

168 default=False, 

169 description="If True then build one query per spatial partition, otherwise build single query.", 

170 ) 

171 

172 @field_validator("part_pixelization") 

173 @classmethod 

174 def check_pixelization(cls, v: str) -> str: 

175 allowed = {"htm", "q3c", "mq3c", "healpix"} 

176 if v not in allowed: 

177 raise ValueError(f"Unexpected value for part_pixelization: {v}, allowed values: {allowed}") 

178 return v 

179 

180 

181class ApdbCassandraConfig(ApdbConfig): 

182 """Configuration class for Cassandra-based APDB implementation.""" 

183 

184 _implementation_type: ClassVar[str] = "cassandra" 

185 

186 contact_points: tuple[str, ...] = Field( 

187 default=("127.0.0.1",), 

188 description="The list of contact points to try connecting for cluster discovery.", 

189 ) 

190 

191 keyspace: str = Field( 

192 default="apdb", 

193 description="Keyspace name for APDB tables.", 

194 ) 

195 

196 connection_config: ApdbCassandraConnectionConfig = Field( 

197 default_factory=ApdbCassandraConnectionConfig, 

198 description="Database connection configuration", 

199 ) 

200 

201 partitioning: ApdbCassandraPartitioningConfig = Field( 

202 default_factory=ApdbCassandraPartitioningConfig, 

203 description="Configuration for partitioning.", 

204 ) 

205 

206 dia_object_columns: list[str] = Field( 

207 default=[], 

208 description="List of columns to read from DiaObject[Last], by default read all columns.", 

209 ) 

210 

211 prefix: str = Field( 

212 default="", 

213 description="Prefix to add to table names.", 

214 ) 

215 

216 ra_dec_columns: tuple[str, str] = Field( 

217 default=("ra", "dec"), 

218 description="Names of ra/dec columns in DiaObject table", 

219 ) 

220 

221 replica_skips_diaobjects: bool = Field( 

222 default=False, 

223 description=( 

224 "If True then do not store DiaObjects when enable_replica is True " 

225 "(DiaObjectsChunks has the same data)." 

226 ), 

227 ) 

228 

229 replica_sub_chunk_count: int = Field( 

230 default=64, 

231 description="Number of sub-partitions in replica chunk tables.", 

232 ) 

233 

234 batch_statement_limit: int = Field( 

235 default=65_535, 

236 description=( 

237 "Limit on a number of rows in a BatchStatement. Default is the same as Cassandra limit of 65535." 

238 ), 

239 ) 

240 

241 batch_size_limit: int = Field( 

242 default=1_000_000, 

243 description=( 

244 "Limit on a size of BatchStatement in bytes. Batch size is estimated approximately. " 

245 "Set to 0 or negative to disable this limit. " 

246 "Server-side batch size warning threshold needs to be set to at least this value." 

247 ), 

248 ) 

249 

250 @field_validator("ra_dec_columns") 

251 @classmethod 

252 def check_ra_dec(cls, v: Iterable[str]) -> tuple[str, str]: 

253 # This validation method is needed in case we initialize model from 

254 # JSON in strict mode, in that mode JSON list is rejected by default. 

255 vtup = tuple(v) 

256 if len(vtup) != 2: 

257 raise ValueError("ra_dec_columns must have exactly two column names") 

258 return vtup 

259 

260 

261class ApdbCassandraTimePartitionRange(BaseModel): 

262 """Configuration of the time partitions, this is not user-configurable, 

263 but it is reflected in metadata. 

264 """ 

265 

266 metadataTimePartitionKey: ClassVar[str] = "config:time-partition-range.json" 

267 """Name of the metadata key to store time partition range.""" 

268 

269 start: int = Field( 

270 description="Start partition number for per-time-partition tables that exist in the schema." 

271 ) 

272 

273 end: int = Field( 

274 description="End partition number (inclusive) for per-time-partition tables that exist in the schema." 

275 ) 

276 

277 def range(self) -> Iterator[int]: 

278 """Generate a sequence of partition numbers.""" 

279 yield from range(self.start, self.end + 1) 

280 

281 @classmethod 

282 def from_meta(cls, metadata: ApdbMetadataCassandra) -> Self: 

283 """Read this configuration object from metadata table. 

284 

285 Parameters 

286 ---------- 

287 metadata : `ApdbMetadataCassandra` 

288 Metadata table. 

289 

290 Returns 

291 ------- 

292 range : `ApdbCassandraTimePartitionRange` 

293 Configuration retrieved from database. 

294 """ 

295 time_partitions_str = metadata.get(cls.metadataTimePartitionKey) 

296 if time_partitions_str is None: 

297 raise LookupError(f"Key '{cls.metadataTimePartitionKey}' is missing from metadata table.") 

298 time_partitions_json = json.loads(time_partitions_str) 

299 return cls.model_validate(time_partitions_json) 

300 

301 def save_to_meta(self, metadata: ApdbMetadataCassandra) -> None: 

302 """Save this configuration to metadata table. 

303 

304 Parameters 

305 ---------- 

306 metadata : `ApdbMetadataCassandra` 

307 Metadata table. 

308 """ 

309 metadata.set(self.metadataTimePartitionKey, json.dumps(self.model_dump()), force=True)