Coverage for python / lsst / dax / apdb / sql / apdbSqlReplica.py: 28%

113 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-22 08:49 +0000

1# This file is part of dax_apdb. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22"""Module defining Apdb class and related methods.""" 

23 

24from __future__ import annotations 

25 

26__all__ = ["ApdbSqlReplica"] 

27 

28import logging 

29from collections.abc import Collection, Iterable, Mapping, Sequence 

30from typing import TYPE_CHECKING, cast 

31 

32import astropy.time 

33import felis.datamodel 

34import sqlalchemy 

35from sqlalchemy import sql 

36 

37from ..apdbReplica import ApdbReplica, ApdbTableData, ReplicaChunk 

38from ..apdbSchema import ApdbTables 

39from ..apdbUpdateRecord import ApdbUpdateRecord 

40from ..monitor import MonAgent 

41from ..schema_model import ExtraDataTypes 

42from ..timer import Timer 

43from ..versionTuple import VersionTuple 

44from .apdbSqlSchema import ExtraTables 

45 

46if TYPE_CHECKING: 

47 from .apdbSqlSchema import ApdbSqlSchema 

48 

49 

50_LOG = logging.getLogger(__name__) 

51 

52_MON = MonAgent(__name__) 

53 

54VERSION = VersionTuple(1, 0, 0) 

55"""Version for the code controlling replication tables. This needs to be 

56updated following compatibility rules when schema produced by this code 

57changes. 

58""" 

59 

60 

61class ApdbSqlTableData(ApdbTableData): 

62 """Implementation of ApdbTableData that wraps sqlalchemy Result.""" 

63 

64 def __init__(self, result: sqlalchemy.engine.Result, column_types: dict[str, felis.datamodel.DataType]): 

65 self._column_defs = tuple((column, column_types[column]) for column in result.keys()) 

66 self._rows: list[tuple] = cast(list[tuple], list(result.fetchall())) 

67 

68 def column_names(self) -> Sequence[str]: 

69 return tuple(column_def[0] for column_def in self._column_defs) 

70 

71 def column_defs(self) -> Sequence[tuple[str, felis.datamodel.DataType]]: 

72 return self._column_defs 

73 

74 def rows(self) -> Collection[tuple]: 

75 return self._rows 

76 

77 

78class ApdbSqlReplica(ApdbReplica): 

79 """Implementation of `ApdbReplica` for SQL backend. 

80 

81 Parameters 

82 ---------- 

83 schema : `ApdbSqlSchema` 

84 Instance of `ApdbSqlSchema` class for APDB database. 

85 engine : `sqlalchemy.engine.Engine` 

86 Engine for database access. 

87 db_schema_version : `VersionTuple` 

88 Version of the database schema. 

89 timer : `bool`, optional 

90 If `True` then log timing information. 

91 """ 

92 

93 def __init__( 

94 self, 

95 schema: ApdbSqlSchema, 

96 engine: sqlalchemy.engine.Engine, 

97 db_schema_version: VersionTuple, 

98 timer: bool = False, 

99 ): 

100 self._schema = schema 

101 self._engine = engine 

102 self._db_schema_version = db_schema_version 

103 

104 self._timer_args: list[MonAgent | logging.Logger] = [_MON] 

105 if timer: 

106 self._timer_args.append(_LOG) 

107 

108 def _timer(self, name: str, *, tags: Mapping[str, str | int] | None = None) -> Timer: 

109 """Create `Timer` instance given its name.""" 

110 return Timer(name, *self._timer_args, tags=tags) 

111 

112 def schemaVersion(self) -> VersionTuple: 

113 # Docstring inherited from base class. 

114 return self._db_schema_version 

115 

116 @classmethod 

117 def apdbReplicaImplementationVersion(cls) -> VersionTuple: 

118 # Docstring inherited from base class. 

119 return VERSION 

120 

121 def getReplicaChunks(self) -> list[ReplicaChunk] | None: 

122 # docstring is inherited from a base class 

123 if not self._schema.replication_enabled: 

124 return None 

125 

126 table = self._schema.get_table(ExtraTables.ApdbReplicaChunks) 

127 assert table is not None, "replication_enabled=True means it must be defined" 

128 query = sql.select( 

129 table.columns["apdb_replica_chunk"], table.columns["last_update_time"], table.columns["unique_id"] 

130 ).order_by(table.columns["last_update_time"]) 

131 with self._timer("chunks_select_time") as timer: 

132 with self._engine.connect() as conn: 

133 result = conn.execution_options(stream_results=True, max_row_buffer=10000).execute(query) 

134 ids = [] 

135 for row in result: 

136 last_update_time = astropy.time.Time(row[1].timestamp(), format="unix_tai") 

137 ids.append(ReplicaChunk(id=row[0], last_update_time=last_update_time, unique_id=row[2])) 

138 timer.add_values(row_count=len(ids)) 

139 return ids 

140 

141 def deleteReplicaChunks(self, chunks: Iterable[int]) -> None: 

142 # docstring is inherited from a base class 

143 if not self._schema.replication_enabled: 

144 raise ValueError("APDB is not configured for replication") 

145 

146 table = self._schema.get_table(ExtraTables.ApdbReplicaChunks) 

147 chunk_list = list(chunks) 

148 where_clause = table.columns["apdb_replica_chunk"].in_(chunk_list) 

149 stmt = table.delete().where(where_clause) 

150 with self._timer("chunks_delete_time") as timer: 

151 with self._engine.begin() as conn: 

152 conn.execute(stmt) 

153 timer.add_values(row_count=len(chunk_list)) 

154 

155 def getTableDataChunks(self, table: ApdbTables, chunks: Iterable[int]) -> ApdbTableData: 

156 # docstring is inherited from a base class 

157 for chunk_table, table_enum in ExtraTables.replica_chunk_tables().items(): 

158 if table is table_enum: 

159 return self._get_chunks(chunks, table, chunk_table) 

160 raise ValueError(f"Table {table} does not support replica chunks.") 

161 

162 def _get_chunks( 

163 self, 

164 chunks: Iterable[int], 

165 table_enum: ApdbTables, 

166 chunk_table_enum: ExtraTables, 

167 ) -> ApdbTableData: 

168 """Return catalog of records for given insert identifiers, common 

169 implementation for all DIA tables. 

170 """ 

171 if not self._schema.replication_enabled: 

172 raise ValueError("APDB is not configured for replication") 

173 

174 table = self._schema.get_table(table_enum) 

175 chunk_table = self._schema.get_table(chunk_table_enum) 

176 

177 join = table.join(chunk_table) 

178 chunk_id_column = chunk_table.columns["apdb_replica_chunk"] 

179 apdb_columns = self._schema.get_apdb_columns(table_enum) 

180 where_clause = chunk_id_column.in_(chunks) 

181 query = sql.select(chunk_id_column, *apdb_columns).select_from(join).where(where_clause) 

182 

183 table_schema = self._schema.tableSchemas[table_enum] 

184 # Regular tables should never have columns of ExtraDataTypes, this is 

185 # just to make mypy happy. 

186 column_types = { 

187 column.name: column.datatype 

188 for column in table_schema.columns 

189 if not isinstance(column.datatype, ExtraDataTypes) 

190 } 

191 column_types["apdb_replica_chunk"] = felis.datamodel.DataType.long 

192 

193 # execute select 

194 with self._timer("table_chunk_select_time", tags={"table": table.name}) as timer: 

195 with self._engine.begin() as conn: 

196 result = conn.execution_options(stream_results=True, max_row_buffer=10000).execute(query) 

197 table_data = ApdbSqlTableData(result, column_types) 

198 timer.add_values(row_count=len(table_data.rows())) 

199 return table_data 

200 

201 def getUpdateRecordChunks(self, chunks: Iterable[int]) -> Sequence[ApdbUpdateRecord]: 

202 # docstring is inherited from a base class 

203 if not self._schema.replication_enabled: 

204 raise ValueError("APDB is not configured for replication") 

205 

206 try: 

207 table = self._schema.get_table(ExtraTables.ApdbUpdateRecordChunks) 

208 except ValueError: 

209 # Table does not exist yet. 

210 return [] 

211 query = table.select().where(table.columns["apdb_replica_chunk"].in_(chunks)) 

212 

213 records = [] 

214 with self._timer("select_update_record_time", tags={"table": table.name}) as timer: 

215 with self._engine.begin() as conn: 

216 result = conn.execution_options(stream_results=True, max_row_buffer=10000).execute(query) 

217 for row in result: 

218 records.append( 

219 ApdbUpdateRecord.from_json(row.update_time_ns, row.update_order, row.update_payload) 

220 ) 

221 timer.add_values(row_count=len(records)) 

222 

223 records.sort() 

224 return records