Coverage for python / lsst / dax / apdb / apdbUpdateRecord.py: 71%

127 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-22 08:49 +0000

1# This file is part of dax_apdb. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22from __future__ import annotations 

23 

24__all__ = [ 

25 "ApdbCloseDiaObjectValidityRecord", 

26 "ApdbReassignDiaSourceRecord", 

27 "ApdbUpdateNDiaSourcesRecord", 

28 "ApdbUpdateRecord", 

29 "ApdbWithdrawDiaForcedSourceRecord", 

30 "ApdbWithdrawDiaSourceRecord", 

31] 

32 

33import dataclasses 

34import json 

35from abc import ABC 

36from collections.abc import Mapping 

37from dataclasses import dataclass 

38from typing import Any, ClassVar 

39 

40from .apdb import ApdbTables 

41 

42 

43@dataclass(kw_only=True) 

44class ApdbUpdateRecord(ABC): 

45 """Abstract base class representing all types of update records saved to 

46 replica table. 

47 """ 

48 

49 update_time_ns: int 

50 """Time in nanoseconds since epoch when update happened.""" 

51 

52 update_order: int 

53 """Record order in the update.""" 

54 

55 update_type: ClassVar[str] 

56 """Class variable defining type of the update, must be defined in all 

57 concrete subclasses and be unique. 

58 """ 

59 

60 apdb_table: ClassVar[ApdbTables] 

61 """Class variable defining APDB table that this update applies to, must be 

62 defined in all concrete subclasses. 

63 """ 

64 

65 _update_types: ClassVar[dict[str, type[ApdbUpdateRecord]]] = {} 

66 """Class variable for mapping the type of the update to corresponding 

67 record class. 

68 """ 

69 

70 def __init_subclass__(cls, /, update_type: str, **kwargs: Any) -> None: 

71 super().__init_subclass__(**kwargs) 

72 cls.update_type = update_type 

73 cls._update_types[update_type] = cls 

74 

75 def __lt__(self, other: Any) -> bool: 

76 if isinstance(other, ApdbUpdateRecord): 

77 return (self.update_time_ns, self.update_order) < (other.update_time_ns, other.update_order) 

78 raise self._type_error(other) 

79 

80 def __le__(self, other: Any) -> bool: 

81 if isinstance(other, ApdbUpdateRecord): 

82 return (self.update_time_ns, self.update_order) <= (other.update_time_ns, other.update_order) 

83 raise self._type_error(other) 

84 

85 def __gt__(self, other: Any) -> bool: 

86 if isinstance(other, ApdbUpdateRecord): 

87 return (self.update_time_ns, self.update_order) > (other.update_time_ns, other.update_order) 

88 raise self._type_error(other) 

89 

90 def __ge__(self, other: Any) -> bool: 

91 if isinstance(other, ApdbUpdateRecord): 

92 return (self.update_time_ns, self.update_order) >= (other.update_time_ns, other.update_order) 

93 raise self._type_error(other) 

94 

95 def _type_error(self, other: Any) -> TypeError: 

96 return TypeError( 

97 "ordering is not supported between instances of " 

98 f"'{self.__class__.__name__}' and '{other.__class__.__name__}'" 

99 ) 

100 

101 @classmethod 

102 def from_json(cls, update_time_ns: int, update_order: int, json_str: str) -> ApdbUpdateRecord: 

103 json_obj = json.loads(json_str) 

104 if not isinstance(json_obj, Mapping): 

105 raise TypeError("String must contain JSON object.") 

106 kw = dict(json_obj) 

107 if (update_type := kw.pop("update_type", None)) is None: 

108 raise LookupError("`update_type` key is not in JSON object.") 

109 if (klass := cls._update_types.get(update_type)) is not None: 

110 return klass(update_time_ns=update_time_ns, update_order=update_order, **kw) 

111 else: 

112 raise ValueError(f"Unknown update type: {update_type}") 

113 

114 def to_json(self) -> str: 

115 data = dataclasses.asdict(self) 

116 # These fields are stored separately. 

117 data.pop("update_time_ns") 

118 data.pop("update_order") 

119 data["update_type"] = self.update_type 

120 return json.dumps(data) 

121 

122 

123@dataclass(kw_only=True) 

124class ApdbReassignDiaSourceRecord(ApdbUpdateRecord, update_type="reassign_diasource"): 

125 """Update record representing re-assignment of DIASource to SSObject.""" 

126 

127 diaSourceId: int 

128 """ID of DIASource record.""" 

129 

130 diaObjectId: int 

131 """ID of associated DIAObject record.""" 

132 

133 ssObjectId: int 

134 """ID of SSObject to re-associate to.""" 

135 

136 ssObjectReassocTimeMjdTai: float 

137 """Time when DIASource was re-associated from DIAObject to SSObject.""" 

138 

139 ra: float 

140 """DIASource ra, not required to be exact, but needs to be close to the 

141 database record. 

142 """ 

143 

144 dec: float 

145 """DIASource dec, not required to be exact, but needs to be close to the 

146 database record. 

147 """ 

148 

149 apdb_table: ClassVar[ApdbTables] = ApdbTables.DiaSource 

150 

151 

152@dataclass(kw_only=True) 

153class ApdbWithdrawDiaSourceRecord(ApdbUpdateRecord, update_type="withdraw_diasource"): 

154 """Update record representing withdrawal of DIASource.""" 

155 

156 diaSourceId: int 

157 """ID of DIASource record.""" 

158 

159 diaObjectId: int | None 

160 """ID of associated DIAObject record or None if it is associated to 

161 SSObject. 

162 """ 

163 

164 timeWithdrawnMjdTai: float 

165 """Time when this record was marked invalid.""" 

166 

167 ra: float 

168 """DIASource ra, not required to be exact, but needs to be close to the 

169 database record. 

170 """ 

171 

172 dec: float 

173 """DIASource dec, not required to be exact, but needs to be close to the 

174 database record. 

175 """ 

176 

177 apdb_table: ClassVar[ApdbTables] = ApdbTables.DiaSource 

178 

179 

180@dataclass(kw_only=True) 

181class ApdbWithdrawDiaForcedSourceRecord(ApdbUpdateRecord, update_type="withdraw_diaforcedsource"): 

182 """Update record representing withdrawal of DIAForcedSource.""" 

183 

184 diaObjectId: int 

185 """ID of DIAObject for withdrawn DIAForcedSource.""" 

186 

187 visit: int 

188 """Visit ID of DIAForcedSource.""" 

189 

190 detector: int 

191 """Detector ID of DIAForcedSource.""" 

192 

193 timeWithdrawnMjdTai: float 

194 """Time when this record was marked invalid.""" 

195 

196 ra: float 

197 """DIAForcedSource ra, not required to be exact, but needs to be close to 

198 the database record. 

199 """ 

200 

201 dec: float 

202 """DIAForcedSource dec, not required to be exact, but needs to be close to 

203 the database record. 

204 """ 

205 

206 apdb_table: ClassVar[ApdbTables] = ApdbTables.DiaForcedSource 

207 

208 

209@dataclass(kw_only=True) 

210class ApdbCloseDiaObjectValidityRecord(ApdbUpdateRecord, update_type="close_diaobject_validity"): 

211 """Record representing closing of the validity interval of DIAObject.""" 

212 

213 diaObjectId: int 

214 """ID of DIAObject.""" 

215 

216 validityEndMjdTai: float 

217 """Time to set validityEnd to.""" 

218 

219 nDiaSources: int | None 

220 """New value for nDiaSources column for updated record, or None if 

221 nDiaSources does not change. 

222 """ 

223 

224 ra: float 

225 """DIAObject ra, not required to be exact, but needs to be close to the 

226 database record. 

227 """ 

228 

229 dec: float 

230 """DIAObject dec, not required to be exact, but needs to be close to the 

231 database record. 

232 """ 

233 

234 apdb_table: ClassVar[ApdbTables] = ApdbTables.DiaObject 

235 

236 

237@dataclass(kw_only=True) 

238class ApdbUpdateNDiaSourcesRecord(ApdbUpdateRecord, update_type="update_n_dia_sources"): 

239 """Record representing change in the number of associated sources of 

240 DIAObject. 

241 """ 

242 

243 diaObjectId: int 

244 """ID of DIAObject.""" 

245 

246 nDiaSources: int 

247 """New value for nDiaSources column for updated record.""" 

248 

249 ra: float 

250 """DIAObject ra, not required to be exact, but needs to be close to the 

251 database record. 

252 """ 

253 

254 dec: float 

255 """DIAObject dec, not required to be exact, but needs to be close to the 

256 database record. 

257 """ 

258 

259 apdb_table: ClassVar[ApdbTables] = ApdbTables.DiaObject