Coverage for python / lsst / dax / apdb / apdbUpdateRecord.py: 59%

126 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-26 08:58 +0000

1# This file is part of dax_apdb. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22from __future__ import annotations 

23 

24__all__ = [ 

25 "ApdbCloseDiaObjectValidityRecord", 

26 "ApdbReassignDiaSourceToDiaObjectRecord", 

27 "ApdbReassignDiaSourceToSSObjectRecord", 

28 "ApdbUpdateNDiaSourcesRecord", 

29 "ApdbUpdateRecord", 

30 "ApdbWithdrawDiaForcedSourceRecord", 

31 "ApdbWithdrawDiaSourceRecord", 

32] 

33 

34import dataclasses 

35import json 

36from abc import ABC, abstractmethod 

37from collections.abc import Mapping 

38from dataclasses import dataclass 

39from typing import Any, ClassVar 

40 

41from .apdb import ApdbTables 

42from .recordIds import DiaForcedSourceId, DiaObjectId, DiaSourceId 

43 

44 

45@dataclass(kw_only=True) 

46class ApdbUpdateRecord(ABC): 

47 """Abstract base class representing all types of update records saved to 

48 replica table. 

49 """ 

50 

51 update_time_ns: int 

52 """Time in nanoseconds since epoch when update happened.""" 

53 

54 update_order: int 

55 """Record order in the update.""" 

56 

57 update_type: ClassVar[str] 

58 """Class variable defining type of the update, must be defined in all 

59 concrete subclasses and be unique. 

60 """ 

61 

62 apdb_table: ClassVar[ApdbTables] 

63 """Class variable defining APDB table that this update applies to, must be 

64 defined in all concrete subclasses. 

65 """ 

66 

67 _update_types: ClassVar[dict[str, type[ApdbUpdateRecord]]] = {} 

68 """Class variable for mapping the type of the update to corresponding 

69 record class. 

70 """ 

71 

72 def __init_subclass__(cls, /, update_type: str, **kwargs: Any) -> None: 

73 super().__init_subclass__(**kwargs) 

74 cls.update_type = update_type 

75 cls._update_types[update_type] = cls 

76 

77 def __lt__(self, other: Any) -> bool: 

78 if isinstance(other, ApdbUpdateRecord): 

79 return (self.update_time_ns, self.update_order) < (other.update_time_ns, other.update_order) 

80 raise self._type_error(other) 

81 

82 def __le__(self, other: Any) -> bool: 

83 if isinstance(other, ApdbUpdateRecord): 

84 return (self.update_time_ns, self.update_order) <= (other.update_time_ns, other.update_order) 

85 raise self._type_error(other) 

86 

87 def __gt__(self, other: Any) -> bool: 

88 if isinstance(other, ApdbUpdateRecord): 

89 return (self.update_time_ns, self.update_order) > (other.update_time_ns, other.update_order) 

90 raise self._type_error(other) 

91 

92 def __ge__(self, other: Any) -> bool: 

93 if isinstance(other, ApdbUpdateRecord): 

94 return (self.update_time_ns, self.update_order) >= (other.update_time_ns, other.update_order) 

95 raise self._type_error(other) 

96 

97 def _type_error(self, other: Any) -> TypeError: 

98 return TypeError( 

99 "ordering is not supported between instances of " 

100 f"'{self.__class__.__name__}' and '{other.__class__.__name__}'" 

101 ) 

102 

103 @classmethod 

104 def from_json(cls, update_time_ns: int, update_order: int, json_str: str) -> ApdbUpdateRecord: 

105 json_obj = json.loads(json_str) 

106 if not isinstance(json_obj, Mapping): 

107 raise TypeError("String must contain JSON object.") 

108 kw = dict(json_obj) 

109 if (update_type := kw.pop("update_type", None)) is None: 

110 raise LookupError("`update_type` key is not in JSON object.") 

111 if (klass := cls._update_types.get(update_type)) is not None: 

112 return klass(update_time_ns=update_time_ns, update_order=update_order, **kw) 

113 else: 

114 raise ValueError(f"Unknown update type: {update_type}") 

115 

116 def to_json(self) -> str: 

117 data = dataclasses.asdict(self) 

118 # These fields are stored separately. 

119 data.pop("update_time_ns") 

120 data.pop("update_order") 

121 data["update_type"] = self.update_type 

122 return json.dumps(data) 

123 

124 @abstractmethod 

125 def record_id(self) -> tuple[tuple[str, int], ...]: 

126 """Return a tuple of (field name, field value) pairs for fields that 

127 identify the record to which this update applies. 

128 

129 Returns 

130 ------- 

131 record_id_tuple : `tuple` [`tuple` [`str`, `int`], ...] 

132 Tuple of (field name, field value) pairs for fields that identify 

133 the record to which this update applies. 

134 """ 

135 raise NotImplementedError() 

136 

137 @abstractmethod 

138 def record_payload(self) -> tuple[tuple[str, Any], ...]: 

139 """Return a tuple of (field name, field value) pairs for fields that 

140 represent updates being applied by this record. 

141 

142 Returns 

143 ------- 

144 payload_tuple : `tuple` [`tuple` [`str`, `Any`], ...] 

145 Tuple of (field name, field value) pairs. 

146 

147 Notes 

148 ----- 

149 Returned tuple contains the fields that are actually updated. Even if 

150 this class represents an update that modifies multiple fields, 

151 individual records can update a smaller set of fields. 

152 """ 

153 raise NotImplementedError() 

154 

155 

156@dataclass(kw_only=True) 

157class ApdbReassignDiaSourceToDiaObjectRecord( 

158 ApdbUpdateRecord, DiaSourceId, update_type="reassign_diasource_to_diaobject" 

159): 

160 """Update record representing re-assignment of DIASource to a different 

161 DIAObject. 

162 """ 

163 

164 diaObjectId: int 

165 """ID of a new associated DIAObject record.""" 

166 

167 apdb_table: ClassVar[ApdbTables] = ApdbTables.DiaSource 

168 

169 def record_id(self) -> tuple[tuple[str, int], ...]: 

170 # Docstring inherited from the base class. 

171 return (("diaSourceId", self.diaSourceId),) 

172 

173 def record_payload(self) -> tuple[tuple[str, Any], ...]: 

174 # Docstring inherited from the base class. 

175 return (("diaObjectId", self.diaObjectId),) 

176 

177 

178@dataclass(kw_only=True) 

179class ApdbReassignDiaSourceToSSObjectRecord( 

180 ApdbUpdateRecord, DiaSourceId, update_type="reassign_diasource_to_ssobject" 

181): 

182 """Update record representing re-assignment of DIASource to SSObject.""" 

183 

184 ssObjectId: int 

185 """ID of SSObject to re-associate to.""" 

186 

187 ssObjectReassocTimeMjdTai: float 

188 """Time when DIASource was re-associated from DIAObject to SSObject.""" 

189 

190 apdb_table: ClassVar[ApdbTables] = ApdbTables.DiaSource 

191 

192 def record_id(self) -> tuple[tuple[str, int], ...]: 

193 # Docstring inherited from the base class. 

194 return (("diaSourceId", self.diaSourceId),) 

195 

196 def record_payload(self) -> tuple[tuple[str, Any], ...]: 

197 # Docstring inherited from the base class. 

198 return ( 

199 ("ssObjectId", self.ssObjectId), 

200 ("ssObjectReassocTimeMjdTai", self.ssObjectReassocTimeMjdTai), 

201 ) 

202 

203 

204@dataclass(kw_only=True) 

205class ApdbWithdrawDiaSourceRecord(ApdbUpdateRecord, DiaSourceId, update_type="withdraw_diasource"): 

206 """Update record representing withdrawal of DIASource.""" 

207 

208 timeWithdrawnMjdTai: float 

209 """Time when this record was marked invalid.""" 

210 

211 apdb_table: ClassVar[ApdbTables] = ApdbTables.DiaSource 

212 

213 def record_id(self) -> tuple[tuple[str, int], ...]: 

214 # Docstring inherited from the base class. 

215 return (("diaSourceId", self.diaSourceId),) 

216 

217 def record_payload(self) -> tuple[tuple[str, Any], ...]: 

218 # Docstring inherited from the base class. 

219 return (("timeWithdrawnMjdTai", self.timeWithdrawnMjdTai),) 

220 

221 

222@dataclass(kw_only=True) 

223class ApdbWithdrawDiaForcedSourceRecord( 

224 ApdbUpdateRecord, DiaForcedSourceId, update_type="withdraw_diaforcedsource" 

225): 

226 """Update record representing withdrawal of DIAForcedSource.""" 

227 

228 timeWithdrawnMjdTai: float 

229 """Time when this record was marked invalid.""" 

230 

231 apdb_table: ClassVar[ApdbTables] = ApdbTables.DiaForcedSource 

232 

233 def record_id(self) -> tuple[tuple[str, int], ...]: 

234 # Docstring inherited from the base class. 

235 return ( 

236 ("diaObjectId", self.diaObjectId), 

237 ("visit", self.visit), 

238 ("detector", self.detector), 

239 ) 

240 

241 def record_payload(self) -> tuple[tuple[str, Any], ...]: 

242 # Docstring inherited from the base class. 

243 return (("timeWithdrawnMjdTai", self.timeWithdrawnMjdTai),) 

244 

245 

246@dataclass(kw_only=True) 

247class ApdbCloseDiaObjectValidityRecord(ApdbUpdateRecord, DiaObjectId, update_type="close_diaobject_validity"): 

248 """Record representing closing of the validity interval of DIAObject.""" 

249 

250 validityEndMjdTai: float 

251 """Time to set validityEnd to.""" 

252 

253 nDiaSources: int | None 

254 """New value for nDiaSources column for updated record, or None if 

255 nDiaSources does not change. 

256 """ 

257 

258 apdb_table: ClassVar[ApdbTables] = ApdbTables.DiaObject 

259 

260 def record_id(self) -> tuple[tuple[str, int], ...]: 

261 # Docstring inherited from the base class. 

262 return (("diaObjectId", self.diaObjectId),) 

263 

264 def record_payload(self) -> tuple[tuple[str, Any], ...]: 

265 # Docstring inherited from the base class. 

266 payload: tuple[tuple[str, Any], ...] = (("validityEndMjdTai", self.validityEndMjdTai),) 

267 # nDiaSources is updated only when not None. 

268 if self.nDiaSources is not None: 

269 payload += (("nDiaSources", self.nDiaSources),) 

270 return payload 

271 

272 

273@dataclass(kw_only=True) 

274class ApdbUpdateNDiaSourcesRecord(ApdbUpdateRecord, DiaObjectId, update_type="update_n_dia_sources"): 

275 """Record representing change in the number of associated sources of 

276 DIAObject. 

277 """ 

278 

279 nDiaSources: int 

280 """New value for nDiaSources column for updated record.""" 

281 

282 apdb_table: ClassVar[ApdbTables] = ApdbTables.DiaObject 

283 

284 def record_id(self) -> tuple[tuple[str, int], ...]: 

285 # Docstring inherited from the base class. 

286 return (("diaObjectId", self.diaObjectId),) 

287 

288 def record_payload(self) -> tuple[tuple[str, Any], ...]: 

289 # Docstring inherited from the base class. 

290 return (("nDiaSources", self.nDiaSources),)