Coverage for python / lsst / dax / apdb / cassandra / legacy_config.py: 0%

43 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-07 08:19 +0000

1# This file is part of dax_apdb. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22from __future__ import annotations 

23 

24from lsst.pex.config import ChoiceField, Field, ListField 

25 

26# If cassandra-driver is not there the module can still be imported. 

27try: 

28 import cassandra 

29 

30 CASSANDRA_IMPORTED = True 

31except ImportError: 

32 CASSANDRA_IMPORTED = False 

33 

34from .. import legacy_config 

35from . import config 

36 

37 

38class ApdbCassandraConfig(legacy_config.ApdbConfig): 

39 """Configuration class for Cassandra-based APDB implementation.""" 

40 

41 contact_points = ListField[str]( 

42 doc="The list of contact points to try connecting for cluster discovery.", default=["127.0.0.1"] 

43 ) 

44 private_ips = ListField[str](doc="List of internal IP addresses for contact_points.", default=[]) 

45 port = Field[int](doc="Port number to connect to.", default=9042) 

46 keyspace = Field[str](doc="Default keyspace for operations.", default="apdb") 

47 username = Field[str]( 

48 doc="Cassandra user name, if empty then db-auth.yaml has to provide it with password.", 

49 default="", 

50 ) 

51 read_consistency = Field[str]( 

52 doc="Name for consistency level of read operations, default: QUORUM, can be ONE.", default="QUORUM" 

53 ) 

54 write_consistency = Field[str]( 

55 doc="Name for consistency level of write operations, default: QUORUM, can be ONE.", default="QUORUM" 

56 ) 

57 read_timeout = Field[float](doc="Timeout in seconds for read operations.", default=120.0) 

58 write_timeout = Field[float](doc="Timeout in seconds for write operations.", default=60.0) 

59 remove_timeout = Field[float](doc="Timeout in seconds for remove operations.", default=600.0) 

60 read_concurrency = Field[int](doc="Concurrency level for read operations.", default=500) 

61 protocol_version = Field[int]( 

62 doc="Cassandra protocol version to use, default is V4", 

63 default=cassandra.ProtocolVersion.V4 if CASSANDRA_IMPORTED else 4, 

64 ) 

65 dia_object_columns = ListField[str]( 

66 doc="List of columns to read from DiaObject[Last], by default read all columns", default=[] 

67 ) 

68 prefix = Field[str](doc="Prefix to add to table names", default="") 

69 part_pixelization = ChoiceField[str]( 

70 allowed={"htm": "HTM pixelization", "q3c": "Q3C pixelization", "mq3c": "MQ3C pixelization"}, 

71 doc="Pixelization used for partitioning index.", 

72 default="mq3c", 

73 ) 

74 part_pix_level = Field[int](doc="Pixelization level used for partitioning index.", default=11) 

75 part_pix_max_ranges = Field[int](doc="Max number of ranges in pixelization envelope", default=128) 

76 ra_dec_columns = ListField[str](default=["ra", "dec"], doc="Names of ra/dec columns in DiaObject table") 

77 timer = Field[bool](doc="If True then print/log timing information", default=False) 

78 time_partition_tables = Field[bool]( 

79 doc="Use per-partition tables for sources instead of partitioning by time", default=False 

80 ) 

81 time_partition_days = Field[int]( 

82 doc=( 

83 "Time partitioning granularity in days, this value must not be changed after database is " 

84 "initialized" 

85 ), 

86 default=30, 

87 ) 

88 time_partition_start = Field[str]( 

89 doc=( 

90 "Starting time for per-partition tables, in yyyy-mm-ddThh:mm:ss format, in TAI. " 

91 "This is used only when time_partition_tables is True." 

92 ), 

93 default="2018-12-01T00:00:00", 

94 ) 

95 time_partition_end = Field[str]( 

96 doc=( 

97 "Ending time for per-partition tables, in yyyy-mm-ddThh:mm:ss format, in TAI. " 

98 "This is used only when time_partition_tables is True." 

99 ), 

100 default="2030-01-01T00:00:00", 

101 ) 

102 query_per_time_part = Field[bool]( 

103 default=False, 

104 doc=( 

105 "If True then build separate query for each time partition, otherwise build one single query. " 

106 "This is only used when time_partition_tables is False in schema config." 

107 ), 

108 ) 

109 query_per_spatial_part = Field[bool]( 

110 default=False, 

111 doc="If True then build one query per spatial partition, otherwise build single query.", 

112 ) 

113 use_insert_id_skips_diaobjects = Field[bool]( 

114 default=False, 

115 doc=( 

116 "If True then do not store DiaObjects when use_insert_id is True " 

117 "(DiaObjectsChunks has the same data)." 

118 ), 

119 ) 

120 idle_heartbeat_interval = Field[int]( 

121 doc=( 

122 "Interval, in seconds, on which to heartbeat idle connections. " 

123 "Zero (default) disables heartbeats." 

124 ), 

125 default=0, 

126 ) 

127 idle_heartbeat_timeout = Field[int]( 

128 doc="Timeout, in seconds, on which the heartbeat wait for idle connection responses.", 

129 default=30, 

130 ) 

131 

132 def to_model(self) -> config.ApdbCassandraConfig: 

133 # Docstring inherited from base class. 

134 

135 # control_connection_timeout is not in the pex_config, but it is set as 

136 # a default in init_database, so we use the same value here. 

137 connection_config = config.ApdbCassandraConnectionConfig( 

138 port=self.port, 

139 private_ips=tuple(self.private_ips), 

140 username=self.username, 

141 read_consistency=self.read_consistency, 

142 write_consistency=self.write_consistency, 

143 read_timeout=self.read_timeout, 

144 write_timeout=self.write_timeout, 

145 remove_timeout=self.remove_timeout, 

146 read_concurrency=self.read_concurrency, 

147 protocol_version=self.protocol_version, 

148 extra_parameters={ 

149 "idle_heartbeat_interval": self.idle_heartbeat_interval, 

150 "idle_heartbeat_timeout": self.idle_heartbeat_timeout, 

151 "control_connection_timeout": 100, 

152 }, 

153 ) 

154 partitioning_config = config.ApdbCassandraPartitioningConfig( 

155 part_pixelization=self.part_pixelization, 

156 part_pix_level=self.part_pix_level, 

157 part_pix_max_ranges=self.part_pix_max_ranges, 

158 time_partition_tables=self.time_partition_tables, 

159 time_partition_days=self.time_partition_days, 

160 time_partition_start=self.time_partition_start, 

161 time_partition_end=self.time_partition_end, 

162 query_per_time_part=self.query_per_time_part, 

163 query_per_spatial_part=self.query_per_spatial_part, 

164 ) 

165 new_config = config.ApdbCassandraConfig( 

166 schema_file=self.schema_file, 

167 schema_name=self.schema_name, 

168 read_sources_months=self.read_sources_months, 

169 read_forced_sources_months=self.read_forced_sources_months, 

170 enable_replica=self.use_insert_id, 

171 replica_chunk_seconds=self.replica_chunk_seconds, 

172 contact_points=tuple(self.contact_points), 

173 keyspace=self.keyspace, 

174 connection_config=connection_config, 

175 partitioning=partitioning_config, 

176 dia_object_columns=list(self.dia_object_columns), 

177 prefix=self.prefix, 

178 ra_dec_columns=(self.ra_dec_columns[0], self.ra_dec_columns[1]), 

179 replica_skips_diaobjects=self.use_insert_id_skips_diaobjects, 

180 ) 

181 return new_config