Coverage for python / lsst / dax / apdb / apdbReplica.py: 83%

58 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-22 08:49 +0000

1# This file is part of dax_apdb. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22from __future__ import annotations 

23 

24__all__ = ["ApdbReplica", "ApdbTableData", "ReplicaChunk"] 

25 

26import uuid 

27from abc import ABC, abstractmethod 

28from collections.abc import Collection, Iterable, Sequence 

29from dataclasses import dataclass 

30from typing import TYPE_CHECKING 

31 

32import astropy.time 

33import felis.datamodel 

34 

35from lsst.resources import ResourcePathExpression 

36 

37from .apdb import ApdbConfig, ApdbTables 

38from .factory import make_apdb_replica 

39 

40if TYPE_CHECKING: 

41 from .apdbUpdateRecord import ApdbUpdateRecord 

42 from .versionTuple import VersionTuple 

43 

44 

45class ApdbTableData(ABC): 

46 """Abstract class for representing table data.""" 

47 

48 @abstractmethod 

49 def column_names(self) -> Sequence[str]: 

50 """Return ordered sequence of column names in the table. 

51 

52 Returns 

53 ------- 

54 names : `~collections.abc.Sequence` [`str`] 

55 Column names. 

56 """ 

57 raise NotImplementedError() 

58 

59 @abstractmethod 

60 def column_defs(self) -> Sequence[tuple[str, felis.datamodel.DataType]]: 

61 """Return ordered sequence of column names and their types. 

62 

63 Returns 

64 ------- 

65 columns : `~collections.abc.Sequence` \ 

66 [`tuple`[`str`, `felis.datamodel.DataType`]] 

67 Sequence of 2-tuples, each tuple consists of column name and its 

68 type. 

69 """ 

70 raise NotImplementedError() 

71 

72 @abstractmethod 

73 def rows(self) -> Collection[tuple]: 

74 """Return table rows, each row is a tuple of values. 

75 

76 Returns 

77 ------- 

78 rows : `~collections.abc.Collection` [`tuple`] 

79 Collection of tuples. 

80 """ 

81 raise NotImplementedError() 

82 

83 

84@dataclass(frozen=True) 

85class ReplicaChunk: 

86 """Class used for identification of replication chunks. 

87 

88 Instances of this class are used to identify the units of transfer from 

89 APDB to PPDB. Usually single `ReplicaChunk` corresponds to multiple 

90 consecutive calls to `Apdb.store` method. 

91 

92 Every ``store`` with the same ``id`` value will update ``unique_id`` with 

93 some unique value so that it can be verified on PPDB side. 

94 """ 

95 

96 id: int 

97 """A number identifying replication chunk (`int`).""" 

98 

99 last_update_time: astropy.time.Time 

100 """Time of last insert for this chunk, usually corresponds to visit time 

101 (`astropy.time.Time`). 

102 """ 

103 

104 unique_id: uuid.UUID 

105 """Unique value updated on each new store (`uuid.UUID`).""" 

106 

107 @classmethod 

108 def make_replica_chunk( 

109 cls, last_update_time: astropy.time.Time, chunk_window_seconds: int 

110 ) -> ReplicaChunk: 

111 """Generate new unique insert identifier.""" 

112 seconds = int(last_update_time.unix_tai) 

113 seconds = (seconds // chunk_window_seconds) * chunk_window_seconds 

114 unique_id = uuid.uuid4() 

115 return ReplicaChunk(id=seconds, last_update_time=last_update_time, unique_id=unique_id) 

116 

117 def __str__(self) -> str: 

118 class_name = self.__class__.__name__ 

119 time_str = str(self.last_update_time.tai.isot) 

120 return f"{class_name}(id={self.id:10d}, last_update_time={time_str}/tai, unique_id={self.unique_id})" 

121 

122 

123class ApdbReplica(ABC): 

124 """Abstract interface for APDB replication methods.""" 

125 

126 @classmethod 

127 def from_config(cls, config: ApdbConfig) -> ApdbReplica: 

128 """Create ApdbReplica instance from configuration object. 

129 

130 Parameters 

131 ---------- 

132 config : `ApdbConfig` 

133 Configuration object, type of this object determines type of the 

134 ApdbReplica implementation. 

135 

136 Returns 

137 ------- 

138 replica : `ApdbReplica` 

139 Instance of `ApdbReplica` class. 

140 """ 

141 return make_apdb_replica(config) 

142 

143 @classmethod 

144 def from_uri(cls, uri: ResourcePathExpression) -> ApdbReplica: 

145 """Make ApdbReplica instance from a serialized configuration. 

146 

147 Parameters 

148 ---------- 

149 uri : `~lsst.resources.ResourcePathExpression` 

150 URI or local file path pointing to a file with serialized 

151 configuration, or a string with a "label:" prefix. In the latter 

152 case, the configuration will be looked up from an APDB index file 

153 using the label name that follows the prefix. The APDB index file's 

154 location is determined by the ``DAX_APDB_INDEX_URI`` environment 

155 variable. 

156 

157 Returns 

158 ------- 

159 replica : `ApdbReplica` 

160 Instance of `ApdbReplica` class, the type of the returned instance 

161 is determined by configuration. 

162 """ 

163 config = ApdbConfig.from_uri(uri) 

164 return make_apdb_replica(config) 

165 

166 @classmethod 

167 @abstractmethod 

168 def apdbReplicaImplementationVersion(cls) -> VersionTuple: 

169 """Return version number for current ApdbReplica implementation. 

170 

171 Returns 

172 ------- 

173 version : `VersionTuple` 

174 Version of the code defined in implementation class. 

175 """ 

176 raise NotImplementedError() 

177 

178 @abstractmethod 

179 def schemaVersion(self) -> VersionTuple: 

180 """Return version number of the database schema. 

181 

182 Returns 

183 ------- 

184 version : `VersionTuple` 

185 Version of the database schema. 

186 """ 

187 raise NotImplementedError() 

188 

189 @abstractmethod 

190 def getReplicaChunks(self) -> list[ReplicaChunk] | None: 

191 """Return collection of replication chunks known to the database. 

192 

193 Returns 

194 ------- 

195 chunks : `list` [`ReplicaChunk`] or `None` 

196 List of chunks, they may be time-ordered if database supports 

197 ordering. `None` is returned if database is not configured for 

198 replication. 

199 """ 

200 raise NotImplementedError() 

201 

202 @abstractmethod 

203 def deleteReplicaChunks(self, chunks: Iterable[int]) -> None: 

204 """Remove replication chunks from the database. 

205 

206 Parameters 

207 ---------- 

208 chunks : `~collections.abc.Iterable` [`int`] 

209 Chunk identifiers to remove. 

210 

211 Notes 

212 ----- 

213 This method causes Apdb to forget about specified chunks. If there 

214 are any auxiliary data associated with the identifiers, it is also 

215 removed from database (but data in regular tables is not removed). 

216 This method should be called after successful transfer of data from 

217 APDB to PPDB to free space used by replicas. 

218 """ 

219 raise NotImplementedError() 

220 

221 @abstractmethod 

222 def getTableDataChunks(self, table: ApdbTables, chunks: Iterable[int]) -> ApdbTableData: 

223 """Return catalog of new records for a table from given replica chunks. 

224 

225 Parameters 

226 ---------- 

227 table : `ApdbTables` 

228 Table for which to return the data. Acceptable tables are 

229 `ApdbTables.DiaObject`, `ApdbTables.DiaSource`, and 

230 `ApdbTables.DiaForcedSource`. 

231 chunks : `~collections.abc.Iterable` [`int`] 

232 Chunk identifiers to return. 

233 

234 Returns 

235 ------- 

236 data : `ApdbTableData` 

237 Catalog containing table records. In addition to all regular 

238 columns it will contain ``apdb_replica_chunk`` column. 

239 

240 Notes 

241 ----- 

242 This method returns new records that have been added to the table by 

243 `Apdb.store()` method. Updates to the records that happen at later time 

244 are available from `getTableUpdateChunks` method. 

245 

246 This part of API may not be very stable and can change before the 

247 implementation finalizes. 

248 """ 

249 raise NotImplementedError() 

250 

251 @abstractmethod 

252 def getUpdateRecordChunks(self, chunks: Iterable[int]) -> Sequence[ApdbUpdateRecord]: 

253 """Return the list of record updates from given replica chunks. 

254 

255 Parameters 

256 ---------- 

257 chunks : `~collections.abc.Iterable` [`int`] 

258 Chunk identifiers to return. 

259 

260 Returns 

261 ------- 

262 records : `~collections.abc.Sequence` [`ApdbUpdateRecord`] 

263 Collection of update records. Records will be sorted according 

264 their update time and update order. 

265 """ 

266 raise NotImplementedError()