Coverage for python / lsst / dax / apdb / cassandra / config.py: 73%

78 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-22 08:56 +0000

1# This file is part of dax_apdb. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22from __future__ import annotations 

23 

24__all__ = [ 

25 "ApdbCassandraConfig", 

26 "ApdbCassandraConnectionConfig", 

27 "ApdbCassandraPartitioningConfig", 

28 "ApdbCassandraTimePartitionRange", 

29] 

30 

31import json 

32from collections.abc import Iterable, Iterator 

33from typing import TYPE_CHECKING, Any, ClassVar, Self 

34 

35from pydantic import BaseModel, Field, field_validator 

36 

37# If cassandra-driver is not there the module can still be imported. 

38try: 

39 import cassandra 

40 

41 CASSANDRA_IMPORTED = True 

42except ImportError: 

43 CASSANDRA_IMPORTED = False 

44 

45 

46from ..config import ApdbConfig 

47 

48if TYPE_CHECKING: 

49 from .apdbMetadataCassandra import ApdbMetadataCassandra 

50 

51 

52class ApdbCassandraConnectionConfig(BaseModel): 

53 """Connection configuration for Cassandra APDB.""" 

54 

55 port: int = Field( 

56 default=9042, 

57 description="Port number to connect to.", 

58 ) 

59 

60 private_ips: tuple[str, ...] = Field( 

61 default=(), 

62 description="List of internal IP addresses for contact_points.", 

63 ) 

64 

65 username: str = Field( 

66 default="", 

67 description=( 

68 "Cassandra user name, if empty then db-auth.yaml has to provide it together with a password." 

69 ), 

70 ) 

71 

72 read_consistency: str = Field( 

73 default="QUORUM", 

74 description="Name for consistency level of read operations, default: QUORUM, can be ONE.", 

75 ) 

76 

77 write_consistency: str = Field( 

78 default="QUORUM", 

79 description="Name for consistency level of write operations, default: QUORUM, can be ONE.", 

80 ) 

81 

82 read_timeout: float = Field( 

83 default=120.0, 

84 description="Timeout in seconds for read operations.", 

85 ) 

86 

87 write_timeout: float = Field( 

88 default=60.0, 

89 description="Timeout in seconds for write operations.", 

90 ) 

91 

92 remove_timeout: float = Field( 

93 default=600.0, 

94 description="Timeout in seconds for remove operations.", 

95 ) 

96 

97 read_concurrency: int = Field( 

98 default=500, 

99 description="Concurrency level for read operations.", 

100 ) 

101 

102 protocol_version: int = Field( 

103 default=cassandra.ProtocolVersion.V4 if CASSANDRA_IMPORTED else 4, 

104 description="Cassandra protocol version to use, default is V4.", 

105 ) 

106 

107 extra_parameters: dict[str, Any] = Field( 

108 default={}, description="Additional keyword parameters passed to connect() method verbatim." 

109 ) 

110 

111 

112class ApdbCassandraPartitioningConfig(BaseModel): 

113 """Partitioning configuration for Cassandra APDB.""" 

114 

115 part_pixelization: str = Field( 

116 default="mq3c", 

117 description="Pixelization used for partitioning index.", 

118 ) 

119 

120 part_pix_level: int = Field( 

121 default=11, 

122 description="Pixelization level used for partitioning index.", 

123 ) 

124 

125 part_pix_max_ranges: int = Field( 

126 default=128, 

127 description="Max number of ranges in pixelization envelope", 

128 ) 

129 

130 time_partition_tables: bool = Field( 

131 default=False, 

132 description="Use per-partition tables for sources instead of partitioning by time", 

133 ) 

134 

135 time_partition_days: int = Field( 

136 default=30, 

137 description=( 

138 "Time partitioning granularity in days, this value must not be changed after database is " 

139 "initialized" 

140 ), 

141 ) 

142 

143 time_partition_start: str = Field( 

144 default="2018-12-01T00:00:00", 

145 description=( 

146 "Starting time for per-partition tables, in yyyy-mm-ddThh:mm:ss format, in TAI. " 

147 "This is used only when time_partition_tables is True." 

148 ), 

149 ) 

150 

151 time_partition_end: str = Field( 

152 default="2030-01-01T00:00:00", 

153 description=( 

154 "Ending time for per-partition tables, in yyyy-mm-ddThh:mm:ss format, in TAI. " 

155 "This is used only when time_partition_tables is True." 

156 ), 

157 ) 

158 

159 query_per_time_part: bool = Field( 

160 default=False, 

161 description=( 

162 "If True then build separate query for each time partition, otherwise build one single query. " 

163 "This is only used when time_partition_tables is False in schema config." 

164 ), 

165 ) 

166 

167 query_per_spatial_part: bool = Field( 

168 default=False, 

169 description="If True then build one query per spatial partition, otherwise build single query.", 

170 ) 

171 

172 num_part_dedup: int = Field( 

173 default=64, 

174 description="Number of partitions in DiaObjectDedup table.", 

175 ) 

176 

177 @field_validator("part_pixelization") 

178 @classmethod 

179 def check_pixelization(cls, v: str) -> str: 

180 allowed = {"htm", "q3c", "mq3c", "healpix"} 

181 if v not in allowed: 

182 raise ValueError(f"Unexpected value for part_pixelization: {v}, allowed values: {allowed}") 

183 return v 

184 

185 

186class ApdbCassandraConfig(ApdbConfig): 

187 """Configuration class for Cassandra-based APDB implementation.""" 

188 

189 _implementation_type: ClassVar[str] = "cassandra" 

190 

191 contact_points: tuple[str, ...] = Field( 

192 default=("127.0.0.1",), 

193 description="The list of contact points to try connecting for cluster discovery.", 

194 ) 

195 

196 keyspace: str = Field( 

197 default="apdb", 

198 description="Keyspace name for APDB tables.", 

199 ) 

200 

201 connection_config: ApdbCassandraConnectionConfig = Field( 

202 default_factory=ApdbCassandraConnectionConfig, 

203 description="Database connection configuration", 

204 ) 

205 

206 partitioning: ApdbCassandraPartitioningConfig = Field( 

207 default_factory=ApdbCassandraPartitioningConfig, 

208 description="Configuration for partitioning.", 

209 ) 

210 

211 dia_object_columns: list[str] = Field( 

212 default=[], 

213 description="List of columns to read from DiaObject[Last], by default read all columns.", 

214 ) 

215 

216 prefix: str = Field( 

217 default="", 

218 description="Prefix to add to table names.", 

219 ) 

220 

221 ra_dec_columns: tuple[str, str] = Field( 

222 default=("ra", "dec"), 

223 description="Names of ra/dec columns in DiaObject table", 

224 ) 

225 

226 replica_skips_diaobjects: bool = Field( 

227 default=False, 

228 description=( 

229 "If True then do not store DiaObjects when enable_replica is True " 

230 "(DiaObjectsChunks has the same data)." 

231 ), 

232 ) 

233 

234 replica_sub_chunk_count: int = Field( 

235 default=64, 

236 description="Number of sub-partitions in replica chunk tables.", 

237 ) 

238 

239 batch_statement_limit: int = Field( 

240 default=65_535, 

241 description=( 

242 "Limit on a number of rows in a BatchStatement. Default is the same as Cassandra limit of 65535." 

243 ), 

244 ) 

245 

246 batch_size_limit: int = Field( 

247 default=1_000_000, 

248 description=( 

249 "Limit on a size of BatchStatement in bytes. Batch size is estimated approximately. " 

250 "Set to 0 or negative to disable this limit. " 

251 "Server-side batch size warning threshold needs to be set to at least this value." 

252 ), 

253 ) 

254 

255 @field_validator("ra_dec_columns") 

256 @classmethod 

257 def check_ra_dec(cls, v: Iterable[str]) -> tuple[str, str]: 

258 # This validation method is needed in case we initialize model from 

259 # JSON in strict mode, in that mode JSON list is rejected by default. 

260 vtup = tuple(v) 

261 if len(vtup) != 2: 

262 raise ValueError("ra_dec_columns must have exactly two column names") 

263 return vtup 

264 

265 

266class ApdbCassandraTimePartitionRange(BaseModel): 

267 """Configuration of the time partitions, this is not user-configurable, 

268 but it is reflected in metadata. 

269 """ 

270 

271 metadataTimePartitionKey: ClassVar[str] = "config:time-partition-range.json" 

272 """Name of the metadata key to store time partition range.""" 

273 

274 start: int = Field( 

275 description="Start partition number for per-time-partition tables that exist in the schema." 

276 ) 

277 

278 end: int = Field( 

279 description="End partition number (inclusive) for per-time-partition tables that exist in the schema." 

280 ) 

281 

282 def range(self) -> Iterator[int]: 

283 """Generate a sequence of partition numbers.""" 

284 yield from range(self.start, self.end + 1) 

285 

286 @classmethod 

287 def from_meta(cls, metadata: ApdbMetadataCassandra) -> Self: 

288 """Read this configuration object from metadata table. 

289 

290 Parameters 

291 ---------- 

292 metadata : `ApdbMetadataCassandra` 

293 Metadata table. 

294 

295 Returns 

296 ------- 

297 range : `ApdbCassandraTimePartitionRange` 

298 Configuration retrieved from database. 

299 """ 

300 time_partitions_str = metadata.get(cls.metadataTimePartitionKey) 

301 if time_partitions_str is None: 

302 raise LookupError(f"Key '{cls.metadataTimePartitionKey}' is missing from metadata table.") 

303 time_partitions_json = json.loads(time_partitions_str) 

304 return cls.model_validate(time_partitions_json) 

305 

306 def save_to_meta(self, metadata: ApdbMetadataCassandra) -> None: 

307 """Save this configuration to metadata table. 

308 

309 Parameters 

310 ---------- 

311 metadata : `ApdbMetadataCassandra` 

312 Metadata table. 

313 """ 

314 metadata.set(self.metadataTimePartitionKey, json.dumps(self.model_dump()), force=True)