Coverage for python / lsst / dax / apdb / apdbUpdateRecord.py: 71%
127 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 08:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 08:49 +0000
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
22from __future__ import annotations
24__all__ = [
25 "ApdbCloseDiaObjectValidityRecord",
26 "ApdbReassignDiaSourceRecord",
27 "ApdbUpdateNDiaSourcesRecord",
28 "ApdbUpdateRecord",
29 "ApdbWithdrawDiaForcedSourceRecord",
30 "ApdbWithdrawDiaSourceRecord",
31]
33import dataclasses
34import json
35from abc import ABC
36from collections.abc import Mapping
37from dataclasses import dataclass
38from typing import Any, ClassVar
40from .apdb import ApdbTables
43@dataclass(kw_only=True)
44class ApdbUpdateRecord(ABC):
45 """Abstract base class representing all types of update records saved to
46 replica table.
47 """
49 update_time_ns: int
50 """Time in nanoseconds since epoch when update happened."""
52 update_order: int
53 """Record order in the update."""
55 update_type: ClassVar[str]
56 """Class variable defining type of the update, must be defined in all
57 concrete subclasses and be unique.
58 """
60 apdb_table: ClassVar[ApdbTables]
61 """Class variable defining APDB table that this update applies to, must be
62 defined in all concrete subclasses.
63 """
65 _update_types: ClassVar[dict[str, type[ApdbUpdateRecord]]] = {}
66 """Class variable for mapping the type of the update to corresponding
67 record class.
68 """
70 def __init_subclass__(cls, /, update_type: str, **kwargs: Any) -> None:
71 super().__init_subclass__(**kwargs)
72 cls.update_type = update_type
73 cls._update_types[update_type] = cls
75 def __lt__(self, other: Any) -> bool:
76 if isinstance(other, ApdbUpdateRecord):
77 return (self.update_time_ns, self.update_order) < (other.update_time_ns, other.update_order)
78 raise self._type_error(other)
80 def __le__(self, other: Any) -> bool:
81 if isinstance(other, ApdbUpdateRecord):
82 return (self.update_time_ns, self.update_order) <= (other.update_time_ns, other.update_order)
83 raise self._type_error(other)
85 def __gt__(self, other: Any) -> bool:
86 if isinstance(other, ApdbUpdateRecord):
87 return (self.update_time_ns, self.update_order) > (other.update_time_ns, other.update_order)
88 raise self._type_error(other)
90 def __ge__(self, other: Any) -> bool:
91 if isinstance(other, ApdbUpdateRecord):
92 return (self.update_time_ns, self.update_order) >= (other.update_time_ns, other.update_order)
93 raise self._type_error(other)
95 def _type_error(self, other: Any) -> TypeError:
96 return TypeError(
97 "ordering is not supported between instances of "
98 f"'{self.__class__.__name__}' and '{other.__class__.__name__}'"
99 )
101 @classmethod
102 def from_json(cls, update_time_ns: int, update_order: int, json_str: str) -> ApdbUpdateRecord:
103 json_obj = json.loads(json_str)
104 if not isinstance(json_obj, Mapping):
105 raise TypeError("String must contain JSON object.")
106 kw = dict(json_obj)
107 if (update_type := kw.pop("update_type", None)) is None:
108 raise LookupError("`update_type` key is not in JSON object.")
109 if (klass := cls._update_types.get(update_type)) is not None:
110 return klass(update_time_ns=update_time_ns, update_order=update_order, **kw)
111 else:
112 raise ValueError(f"Unknown update type: {update_type}")
114 def to_json(self) -> str:
115 data = dataclasses.asdict(self)
116 # These fields are stored separately.
117 data.pop("update_time_ns")
118 data.pop("update_order")
119 data["update_type"] = self.update_type
120 return json.dumps(data)
123@dataclass(kw_only=True)
124class ApdbReassignDiaSourceRecord(ApdbUpdateRecord, update_type="reassign_diasource"):
125 """Update record representing re-assignment of DIASource to SSObject."""
127 diaSourceId: int
128 """ID of DIASource record."""
130 diaObjectId: int
131 """ID of associated DIAObject record."""
133 ssObjectId: int
134 """ID of SSObject to re-associate to."""
136 ssObjectReassocTimeMjdTai: float
137 """Time when DIASource was re-associated from DIAObject to SSObject."""
139 ra: float
140 """DIASource ra, not required to be exact, but needs to be close to the
141 database record.
142 """
144 dec: float
145 """DIASource dec, not required to be exact, but needs to be close to the
146 database record.
147 """
149 apdb_table: ClassVar[ApdbTables] = ApdbTables.DiaSource
152@dataclass(kw_only=True)
153class ApdbWithdrawDiaSourceRecord(ApdbUpdateRecord, update_type="withdraw_diasource"):
154 """Update record representing withdrawal of DIASource."""
156 diaSourceId: int
157 """ID of DIASource record."""
159 diaObjectId: int | None
160 """ID of associated DIAObject record or None if it is associated to
161 SSObject.
162 """
164 timeWithdrawnMjdTai: float
165 """Time when this record was marked invalid."""
167 ra: float
168 """DIASource ra, not required to be exact, but needs to be close to the
169 database record.
170 """
172 dec: float
173 """DIASource dec, not required to be exact, but needs to be close to the
174 database record.
175 """
177 apdb_table: ClassVar[ApdbTables] = ApdbTables.DiaSource
180@dataclass(kw_only=True)
181class ApdbWithdrawDiaForcedSourceRecord(ApdbUpdateRecord, update_type="withdraw_diaforcedsource"):
182 """Update record representing withdrawal of DIAForcedSource."""
184 diaObjectId: int
185 """ID of DIAObject for withdrawn DIAForcedSource."""
187 visit: int
188 """Visit ID of DIAForcedSource."""
190 detector: int
191 """Detector ID of DIAForcedSource."""
193 timeWithdrawnMjdTai: float
194 """Time when this record was marked invalid."""
196 ra: float
197 """DIAForcedSource ra, not required to be exact, but needs to be close to
198 the database record.
199 """
201 dec: float
202 """DIAForcedSource dec, not required to be exact, but needs to be close to
203 the database record.
204 """
206 apdb_table: ClassVar[ApdbTables] = ApdbTables.DiaForcedSource
209@dataclass(kw_only=True)
210class ApdbCloseDiaObjectValidityRecord(ApdbUpdateRecord, update_type="close_diaobject_validity"):
211 """Record representing closing of the validity interval of DIAObject."""
213 diaObjectId: int
214 """ID of DIAObject."""
216 validityEndMjdTai: float
217 """Time to set validityEnd to."""
219 nDiaSources: int | None
220 """New value for nDiaSources column for updated record, or None if
221 nDiaSources does not change.
222 """
224 ra: float
225 """DIAObject ra, not required to be exact, but needs to be close to the
226 database record.
227 """
229 dec: float
230 """DIAObject dec, not required to be exact, but needs to be close to the
231 database record.
232 """
234 apdb_table: ClassVar[ApdbTables] = ApdbTables.DiaObject
237@dataclass(kw_only=True)
238class ApdbUpdateNDiaSourcesRecord(ApdbUpdateRecord, update_type="update_n_dia_sources"):
239 """Record representing change in the number of associated sources of
240 DIAObject.
241 """
243 diaObjectId: int
244 """ID of DIAObject."""
246 nDiaSources: int
247 """New value for nDiaSources column for updated record."""
249 ra: float
250 """DIAObject ra, not required to be exact, but needs to be close to the
251 database record.
252 """
254 dec: float
255 """DIAObject dec, not required to be exact, but needs to be close to the
256 database record.
257 """
259 apdb_table: ClassVar[ApdbTables] = ApdbTables.DiaObject