Coverage for python / lsst / dax / apdb / sql / apdbSqlReplica.py: 25%
134 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:43 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:43 +0000
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
22"""Module defining Apdb class and related methods."""
24from __future__ import annotations
26__all__ = ["ApdbSqlReplica"]
28import logging
29from collections.abc import Collection, Iterable, Mapping, Sequence
30from typing import TYPE_CHECKING, cast
32import astropy.time
33import felis.datamodel
34import numpy
35import pandas
36import sqlalchemy
37from sqlalchemy import sql
39from ..apdbReplica import ApdbReplica, ApdbTableData, ReplicaChunk
40from ..apdbSchema import ApdbTables
41from ..apdbUpdateRecord import ApdbUpdateRecord
42from ..monitor import MonAgent
43from ..schema_model import Column, ExtraDataTypes
44from ..timer import Timer
45from ..versionTuple import VersionTuple
46from .apdbSqlSchema import ExtraTables
48if TYPE_CHECKING:
49 from .apdbSqlSchema import ApdbSqlSchema
52_LOG = logging.getLogger(__name__)
54_MON = MonAgent(__name__)
56VERSION = VersionTuple(1, 0, 0)
57"""Version for the code controlling replication tables. This needs to be
58updated following compatibility rules when schema produced by this code
59changes.
60"""
63class ApdbSqlTableData(ApdbTableData):
64 """Implementation of ApdbTableData that wraps sqlalchemy Result.
66 Parameters
67 ----------
68 result : `sqlalchemy.engine.Result`
69 Result returned from query.
70 column_defs : `list` [`..schema_model.Column`]
71 Column definitions, must include all columns appearing in the
72 ``result``.
73 """
75 def __init__(self, result: sqlalchemy.engine.Result, column_defs: list[Column]):
76 column_map = {column_def.name: column_def for column_def in column_defs}
77 self._column_defs = tuple(column_map[column_name] for column_name in result.keys())
78 column_types = []
79 for column_def in self._column_defs:
80 if isinstance(column_def.datatype, ExtraDataTypes):
81 raise TypeError("Unsupported column type {column_def.datatype} for column {column_def.name}")
82 column_types.append((column_def.name, column_def.datatype))
83 self._column_types = tuple(column_types)
84 self._rows: list[tuple] = cast(list[tuple], list(result.fetchall()))
86 def column_names(self) -> Sequence[str]:
87 return tuple(column_def.name for column_def in self._column_defs)
89 def column_defs(self) -> Sequence[tuple[str, felis.datamodel.DataType]]:
90 return self._column_types
92 def rows(self) -> Collection[tuple]:
93 return self._rows
95 def to_pandas(self) -> pandas.DataFrame:
96 """Convert data to pandas DataFrame.
98 Returns
99 -------
100 dataframe : `pandas.DataFrame`
101 Resulting DataFrame.
102 """
103 if not self._rows:
104 # There could be columns that are not in the configured schema, use
105 # object column type for them.
106 column_data = {}
107 for column_def in self._column_defs:
108 column_data[column_def.name] = pandas.Series(dtype=column_def.pandas_type)
109 return pandas.DataFrame(column_data)
111 # To avoid nested loops convert everything to ndarray.
112 array = numpy.array(self._rows, dtype=object)
113 array = array.T
114 column_data = {}
115 for i, column_def in enumerate(self._column_defs):
116 column_data[column_def.name] = pandas.Series(array[i], dtype=column_def.pandas_type)
117 return pandas.DataFrame(column_data)
120class ApdbSqlReplica(ApdbReplica):
121 """Implementation of `ApdbReplica` for SQL backend.
123 Parameters
124 ----------
125 schema : `ApdbSqlSchema`
126 Instance of `ApdbSqlSchema` class for APDB database.
127 engine : `sqlalchemy.engine.Engine`
128 Engine for database access.
129 db_schema_version : `VersionTuple`
130 Version of the database schema.
131 timer : `bool`, optional
132 If `True` then log timing information.
133 """
135 def __init__(
136 self,
137 schema: ApdbSqlSchema,
138 engine: sqlalchemy.engine.Engine,
139 db_schema_version: VersionTuple,
140 timer: bool = False,
141 ):
142 self._schema = schema
143 self._engine = engine
144 self._db_schema_version = db_schema_version
146 self._timer_args: list[MonAgent | logging.Logger] = [_MON]
147 if timer:
148 self._timer_args.append(_LOG)
150 def _timer(self, name: str, *, tags: Mapping[str, str | int] | None = None) -> Timer:
151 """Create `Timer` instance given its name."""
152 return Timer(name, *self._timer_args, tags=tags)
154 def schemaVersion(self) -> VersionTuple:
155 # Docstring inherited from base class.
156 return self._db_schema_version
158 @classmethod
159 def apdbReplicaImplementationVersion(cls) -> VersionTuple:
160 # Docstring inherited from base class.
161 return VERSION
163 def getReplicaChunks(self) -> list[ReplicaChunk] | None:
164 # docstring is inherited from a base class
165 if not self._schema.replication_enabled:
166 return None
168 table = self._schema.get_table(ExtraTables.ApdbReplicaChunks)
169 assert table is not None, "replication_enabled=True means it must be defined"
170 query = sql.select(
171 table.columns["apdb_replica_chunk"], table.columns["last_update_time"], table.columns["unique_id"]
172 ).order_by(table.columns["last_update_time"])
173 with self._timer("chunks_select_time") as timer:
174 with self._engine.connect() as conn:
175 result = conn.execution_options(stream_results=True, max_row_buffer=10000).execute(query)
176 ids = []
177 for row in result:
178 last_update_time = astropy.time.Time(row[1].timestamp(), format="unix_tai")
179 ids.append(ReplicaChunk(id=row[0], last_update_time=last_update_time, unique_id=row[2]))
180 timer.add_values(row_count=len(ids))
181 return ids
183 def deleteReplicaChunks(self, chunks: Iterable[int]) -> None:
184 # docstring is inherited from a base class
185 if not self._schema.replication_enabled:
186 raise ValueError("APDB is not configured for replication")
188 table = self._schema.get_table(ExtraTables.ApdbReplicaChunks)
189 chunk_list = list(chunks)
190 where_clause = table.columns["apdb_replica_chunk"].in_(chunk_list)
191 stmt = table.delete().where(where_clause)
192 with self._timer("chunks_delete_time") as timer:
193 with self._engine.begin() as conn:
194 conn.execute(stmt)
195 timer.add_values(row_count=len(chunk_list))
197 def getTableDataChunks(self, table: ApdbTables, chunks: Iterable[int]) -> ApdbTableData:
198 # docstring is inherited from a base class
199 for chunk_table, table_enum in ExtraTables.replica_chunk_tables().items():
200 if table is table_enum:
201 return self._get_chunks(chunks, table, chunk_table)
202 raise ValueError(f"Table {table} does not support replica chunks.")
204 def _get_chunks(
205 self,
206 chunks: Iterable[int],
207 table_enum: ApdbTables,
208 chunk_table_enum: ExtraTables,
209 ) -> ApdbTableData:
210 """Return catalog of records for given insert identifiers, common
211 implementation for all DIA tables.
212 """
213 if not self._schema.replication_enabled:
214 raise ValueError("APDB is not configured for replication")
216 table = self._schema.get_table(table_enum)
217 chunk_table = self._schema.get_table(chunk_table_enum)
219 join = table.join(chunk_table)
220 chunk_id_column = chunk_table.columns["apdb_replica_chunk"]
221 apdb_columns = self._schema.get_apdb_columns(table_enum)
222 where_clause = chunk_id_column.in_(chunks)
223 query = sql.select(chunk_id_column, *apdb_columns).select_from(join).where(where_clause)
225 table_schema = self._schema.tableSchemas[table_enum]
226 chunk_column_def = Column(
227 name="apdb_replica_chunk", datatype=felis.datamodel.DataType.long, id="", nullable=False
228 )
229 column_defs = table_schema.columns + [chunk_column_def]
231 # execute select
232 with self._timer("table_chunk_select_time", tags={"table": table.name}) as timer:
233 with self._engine.begin() as conn:
234 result = conn.execution_options(stream_results=True, max_row_buffer=10000).execute(query)
235 table_data = ApdbSqlTableData(result, column_defs)
236 timer.add_values(row_count=len(table_data.rows()))
237 return table_data
239 def getUpdateRecordChunks(self, chunks: Iterable[int]) -> Sequence[ApdbUpdateRecord]:
240 # docstring is inherited from a base class
241 if not self._schema.replication_enabled:
242 raise ValueError("APDB is not configured for replication")
244 try:
245 table = self._schema.get_table(ExtraTables.ApdbUpdateRecordChunks)
246 except ValueError:
247 # Table does not exist yet.
248 return []
249 query = table.select().where(table.columns["apdb_replica_chunk"].in_(chunks))
251 records = []
252 with self._timer("select_update_record_time", tags={"table": table.name}) as timer:
253 with self._engine.begin() as conn:
254 result = conn.execution_options(stream_results=True, max_row_buffer=10000).execute(query)
255 for row in result:
256 records.append(
257 ApdbUpdateRecord.from_json(row.update_time_ns, row.update_order, row.update_payload)
258 )
259 timer.add_values(row_count=len(records))
261 records.sort()
262 return records