Coverage for python / lsst / dax / apdb / cassandra / cassandra_utils.py: 19%

132 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-22 08:49 +0000

1# This file is part of dax_apdb. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22from __future__ import annotations 

23 

24__all__ = [ 

25 "ApdbCassandraTableData", 

26 "PreparedStatementCache", 

27 "literal", 

28 "pandas_dataframe_factory", 

29 "quote_id", 

30 "raw_data_factory", 

31 "select_concurrent", 

32] 

33 

34import logging 

35from collections.abc import Collection, Iterable, Iterator, Sequence 

36from datetime import datetime, timedelta 

37from typing import Any 

38from uuid import UUID 

39 

40import felis.datamodel 

41import numpy as np 

42import pandas 

43 

44# If cassandra-driver is not there the module can still be imported 

45# but things will not work. 

46try: 

47 import cassandra.concurrent 

48 from cassandra.cluster import EXEC_PROFILE_DEFAULT, Session 

49 from cassandra.query import PreparedStatement 

50 

51 CASSANDRA_IMPORTED = True 

52except ImportError: 

53 CASSANDRA_IMPORTED = False 

54 EXEC_PROFILE_DEFAULT = object() 

55 

56from ..apdbReplica import ApdbTableData 

57 

58_LOG = logging.getLogger(__name__) 

59 

60 

61class ApdbCassandraTableData(ApdbTableData): 

62 """Implementation of ApdbTableData that wraps Cassandra raw data.""" 

63 

64 def __init__(self, columns: list[str], rows: list[tuple]): 

65 self._columns = columns 

66 self._rows = rows 

67 self._column_types: dict[str, felis.datamodel.DataType] = {} 

68 

69 def set_column_types(self, types: dict[str, felis.datamodel.DataType]) -> None: 

70 """Update column types. 

71 

72 Parameters 

73 ---------- 

74 types : `dict`[`str`, `felis.datamodel.DataType`] 

75 Mapping of column name its type. 

76 

77 Notes 

78 ----- 

79 Due to the way how instances of this class are constructed it is 

80 impossible to pass types of columns to the constructor, instead we will 

81 need to make a call to this method after construction. 

82 """ 

83 self._column_types = types 

84 

85 def column_names(self) -> Sequence[str]: 

86 # docstring inherited 

87 return self._columns 

88 

89 def column_defs(self) -> Sequence[tuple[str, felis.datamodel.DataType]]: 

90 return tuple((column, self._column_types[column]) for column in self._columns) 

91 

92 def rows(self) -> Collection[tuple]: 

93 # docstring inherited 

94 return self._rows 

95 

96 def append(self, other: ApdbCassandraTableData) -> None: 

97 """Extend rows in this table with rows in other table""" 

98 if self._columns != other._columns: 

99 raise ValueError(f"Different columns returned by queries: {self._columns} and {other._columns}") 

100 self._rows.extend(other._rows) 

101 

102 def project(self, *, drop: Iterable[str] = set()) -> None: 

103 """Modify data in place by droppiing some columns.""" 

104 drop_set = set(drop) 

105 if not drop_set: 

106 return 

107 

108 drop_idx = [] 

109 for idx, col_name in enumerate(self._columns): 

110 if col_name in drop_set: 

111 drop_idx.append(idx) 

112 # Have to reverse it so deletion does not change index. 

113 drop_idx.reverse() 

114 

115 for row_idx in range(len(self._rows)): 

116 row = list(self._rows[row_idx]) 

117 for idx in drop_idx: 

118 del row[idx] 

119 self._rows[row_idx] = tuple(row) 

120 

121 for idx in drop_idx: 

122 del self._columns[idx] 

123 

124 def __iter__(self) -> Iterator[tuple]: 

125 """Make it look like a row iterator, needed for some odd logic.""" 

126 return iter(self._rows) 

127 

128 

129class PreparedStatementCache: 

130 """Cache for prepared Cassandra statements""" 

131 

132 def __init__(self, session: Session) -> None: 

133 self._session = session 

134 self._prepared_statements: dict[str, PreparedStatement] = {} 

135 

136 def prepare(self, query: str) -> PreparedStatement: 

137 """Convert query string into prepared statement.""" 

138 stmt = self._prepared_statements.get(query) 

139 if stmt is None: 

140 stmt = self._session.prepare(query) 

141 self._prepared_statements[query] = stmt 

142 return stmt 

143 

144 

145def pandas_dataframe_factory(colnames: list[str], rows: list[tuple]) -> pandas.DataFrame: 

146 """Create pandas DataFrame from Cassandra result set. 

147 

148 Parameters 

149 ---------- 

150 colnames : `list` [ `str` ] 

151 Names of the columns. 

152 rows : `list` of `tuple` 

153 Result rows. 

154 

155 Returns 

156 ------- 

157 catalog : `pandas.DataFrame` 

158 DataFrame with the result set. 

159 

160 Notes 

161 ----- 

162 When using this method as row factory for Cassandra, the resulting 

163 DataFrame should be accessed in a non-standard way using 

164 `ResultSet._current_rows` attribute. 

165 """ 

166 return pandas.DataFrame.from_records(rows, columns=colnames) 

167 

168 

169def raw_data_factory(colnames: list[str], rows: list[tuple]) -> ApdbCassandraTableData: 

170 """Make 2-element tuple containing unmodified data: list of column names 

171 and list of rows. 

172 

173 Parameters 

174 ---------- 

175 colnames : `list` [ `str` ] 

176 Names of the columns. 

177 rows : `list` of `tuple` 

178 Result rows. 

179 

180 Returns 

181 ------- 

182 data : `ApdbCassandraTableData` 

183 Input data wrapped into ApdbCassandraTableData. 

184 

185 Notes 

186 ----- 

187 When using this method as row factory for Cassandra, the resulting 

188 object should be accessed in a non-standard way using 

189 `ResultSet._current_rows` attribute. 

190 """ 

191 return ApdbCassandraTableData(colnames, rows) 

192 

193 

194def execute_concurrent( 

195 session: Session, 

196 statements: list[tuple], 

197 *, 

198 execution_profile: object = EXEC_PROFILE_DEFAULT, 

199 concurrency: int = 100, 

200) -> None: 

201 """Wrapp call to `cassandra.concurrent.execute_concurrent` to avoid 

202 importing cassandra in other modules. 

203 """ 

204 cassandra.concurrent.execute_concurrent( 

205 session, 

206 statements, 

207 concurrency=concurrency, 

208 execution_profile=execution_profile, 

209 ) 

210 

211 

212def select_concurrent( 

213 session: Session, statements: list[tuple], execution_profile: str, concurrency: int 

214) -> pandas.DataFrame | ApdbCassandraTableData | list: 

215 """Execute bunch of queries concurrently and merge their results into 

216 a single result. 

217 

218 Parameters 

219 ---------- 

220 statements : `list` [ `tuple` ] 

221 List of statements and their parameters, passed directly to 

222 ``execute_concurrent()``. 

223 execution_profile : `str` 

224 Execution profile name. 

225 

226 Returns 

227 ------- 

228 result 

229 Combined result of multiple statements, type of the result depends on 

230 specific row factory defined in execution profile. If row factory is 

231 `pandas_dataframe_factory` then pandas DataFrame is created from a 

232 combined result. If row factory is `raw_data_factory` then 

233 `ApdbCassandraTableData` is built from all records. Otherwise a list of 

234 rows is returned, type of each row is determined by the row factory. 

235 

236 Notes 

237 ----- 

238 This method can raise any exception that is raised by one of the provided 

239 statements. 

240 """ 

241 results = cassandra.concurrent.execute_concurrent( 

242 session, 

243 statements, 

244 results_generator=True, 

245 raise_on_first_error=False, 

246 concurrency=concurrency, 

247 execution_profile=execution_profile, 

248 ) 

249 

250 ep = session.get_execution_profile(execution_profile) 

251 if ep.row_factory is raw_data_factory: 

252 # Collect rows into a single list and build Dataframe out of that 

253 _LOG.debug("making raw data out of rows/columns") 

254 table_data: ApdbCassandraTableData | None = None 

255 for success, result in results: 

256 if success: 

257 data = result._current_rows 

258 assert isinstance(data, ApdbCassandraTableData) 

259 if table_data is None: 

260 table_data = data 

261 else: 

262 table_data.append(data) 

263 else: 

264 _LOG.error("error returned by query: %s", result) 

265 raise result 

266 if table_data is None: 

267 table_data = ApdbCassandraTableData([], []) 

268 return table_data 

269 

270 elif ep.row_factory is pandas_dataframe_factory: 

271 # Merge multiple DataFrames into one 

272 _LOG.debug("making pandas data frame out of set of data frames") 

273 dataframes = [] 

274 for success, result in results: 

275 if success: 

276 dataframes.append(result._current_rows) 

277 else: 

278 _LOG.error("error returned by query: %s", result) 

279 raise result 

280 # Concatenate all frames, but skip empty ones. 

281 non_empty = [df for df in dataframes if not df.empty] 

282 if not non_empty: 

283 # If all frames are empty, return the first one. 

284 catalog = dataframes[0] 

285 elif len(non_empty) == 1: 

286 catalog = non_empty[0] 

287 else: 

288 catalog = pandas.concat(non_empty) 

289 _LOG.debug("pandas catalog shape: %s", catalog.shape) 

290 return catalog 

291 

292 else: 

293 # Just concatenate all rows into a single collection. 

294 rows = [] 

295 for success, result in results: 

296 if success: 

297 rows.extend(result) 

298 else: 

299 _LOG.error("error returned by query: %s", result) 

300 raise result 

301 _LOG.debug("number of rows: %s", len(rows)) 

302 return rows 

303 

304 

305def literal(v: Any) -> Any: 

306 """Transform object into a value for the query.""" 

307 if v is None or v is pandas.NA: 

308 v = None 

309 elif isinstance(v, datetime): 

310 v = int((v - datetime(1970, 1, 1)) / timedelta(seconds=1) * 1000) 

311 elif isinstance(v, bytes | str | UUID | int): 

312 pass 

313 elif isinstance(v, np.bool_): 

314 v = bool(v) 

315 else: 

316 try: 

317 if not np.isfinite(v): 

318 v = None 

319 except TypeError: 

320 pass 

321 return v 

322 

323 

324def quote_id(columnName: str) -> str: 

325 """Smart quoting for column names. Lower-case names are not quoted.""" 

326 if not columnName.islower(): 

327 columnName = '"' + columnName + '"' 

328 return columnName