Coverage for python / lsst / dax / apdb / apdbUpdateRecord.py: 59%
126 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:19 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:19 +0000
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
22from __future__ import annotations
24__all__ = [
25 "ApdbCloseDiaObjectValidityRecord",
26 "ApdbReassignDiaSourceToDiaObjectRecord",
27 "ApdbReassignDiaSourceToSSObjectRecord",
28 "ApdbUpdateNDiaSourcesRecord",
29 "ApdbUpdateRecord",
30 "ApdbWithdrawDiaForcedSourceRecord",
31 "ApdbWithdrawDiaSourceRecord",
32]
34import dataclasses
35import json
36from abc import ABC, abstractmethod
37from collections.abc import Mapping
38from dataclasses import dataclass
39from typing import Any, ClassVar
41from .apdb import ApdbTables
42from .recordIds import DiaForcedSourceId, DiaObjectId, DiaSourceId
45@dataclass(kw_only=True)
46class ApdbUpdateRecord(ABC):
47 """Abstract base class representing all types of update records saved to
48 replica table.
49 """
51 update_time_ns: int
52 """Time in nanoseconds since epoch when update happened."""
54 update_order: int
55 """Record order in the update."""
57 update_type: ClassVar[str]
58 """Class variable defining type of the update, must be defined in all
59 concrete subclasses and be unique.
60 """
62 apdb_table: ClassVar[ApdbTables]
63 """Class variable defining APDB table that this update applies to, must be
64 defined in all concrete subclasses.
65 """
67 _update_types: ClassVar[dict[str, type[ApdbUpdateRecord]]] = {}
68 """Class variable for mapping the type of the update to corresponding
69 record class.
70 """
72 def __init_subclass__(cls, /, update_type: str, **kwargs: Any) -> None:
73 super().__init_subclass__(**kwargs)
74 cls.update_type = update_type
75 cls._update_types[update_type] = cls
77 def __lt__(self, other: Any) -> bool:
78 if isinstance(other, ApdbUpdateRecord):
79 return (self.update_time_ns, self.update_order) < (other.update_time_ns, other.update_order)
80 raise self._type_error(other)
82 def __le__(self, other: Any) -> bool:
83 if isinstance(other, ApdbUpdateRecord):
84 return (self.update_time_ns, self.update_order) <= (other.update_time_ns, other.update_order)
85 raise self._type_error(other)
87 def __gt__(self, other: Any) -> bool:
88 if isinstance(other, ApdbUpdateRecord):
89 return (self.update_time_ns, self.update_order) > (other.update_time_ns, other.update_order)
90 raise self._type_error(other)
92 def __ge__(self, other: Any) -> bool:
93 if isinstance(other, ApdbUpdateRecord):
94 return (self.update_time_ns, self.update_order) >= (other.update_time_ns, other.update_order)
95 raise self._type_error(other)
97 def _type_error(self, other: Any) -> TypeError:
98 return TypeError(
99 "ordering is not supported between instances of "
100 f"'{self.__class__.__name__}' and '{other.__class__.__name__}'"
101 )
103 @classmethod
104 def from_json(cls, update_time_ns: int, update_order: int, json_str: str) -> ApdbUpdateRecord:
105 json_obj = json.loads(json_str)
106 if not isinstance(json_obj, Mapping):
107 raise TypeError("String must contain JSON object.")
108 kw = dict(json_obj)
109 if (update_type := kw.pop("update_type", None)) is None:
110 raise LookupError("`update_type` key is not in JSON object.")
111 if (klass := cls._update_types.get(update_type)) is not None:
112 return klass(update_time_ns=update_time_ns, update_order=update_order, **kw)
113 else:
114 raise ValueError(f"Unknown update type: {update_type}")
116 def to_json(self) -> str:
117 data = dataclasses.asdict(self)
118 # These fields are stored separately.
119 data.pop("update_time_ns")
120 data.pop("update_order")
121 data["update_type"] = self.update_type
122 return json.dumps(data)
124 @abstractmethod
125 def record_id(self) -> tuple[tuple[str, int], ...]:
126 """Return a tuple of (field name, field value) pairs for fields that
127 identify the record to which this update applies.
129 Returns
130 -------
131 record_id_tuple : `tuple` [`tuple` [`str`, `int`], ...]
132 Tuple of (field name, field value) pairs for fields that identify
133 the record to which this update applies.
134 """
135 raise NotImplementedError()
137 @abstractmethod
138 def record_payload(self) -> tuple[tuple[str, Any], ...]:
139 """Return a tuple of (field name, field value) pairs for fields that
140 represent updates being applied by this record.
142 Returns
143 -------
144 payload_tuple : `tuple` [`tuple` [`str`, `Any`], ...]
145 Tuple of (field name, field value) pairs.
147 Notes
148 -----
149 Returned tuple contains the fields that are actually updated. Even if
150 this class represents an update that modifies multiple fields,
151 individual records can update a smaller set of fields.
152 """
153 raise NotImplementedError()
156@dataclass(kw_only=True)
157class ApdbReassignDiaSourceToDiaObjectRecord(
158 ApdbUpdateRecord, DiaSourceId, update_type="reassign_diasource_to_diaobject"
159):
160 """Update record representing re-assignment of DIASource to a different
161 DIAObject.
162 """
164 diaObjectId: int
165 """ID of a new associated DIAObject record."""
167 apdb_table: ClassVar[ApdbTables] = ApdbTables.DiaSource
169 def record_id(self) -> tuple[tuple[str, int], ...]:
170 # Docstring inherited from the base class.
171 return (("diaSourceId", self.diaSourceId),)
173 def record_payload(self) -> tuple[tuple[str, Any], ...]:
174 # Docstring inherited from the base class.
175 return (("diaObjectId", self.diaObjectId),)
178@dataclass(kw_only=True)
179class ApdbReassignDiaSourceToSSObjectRecord(
180 ApdbUpdateRecord, DiaSourceId, update_type="reassign_diasource_to_ssobject"
181):
182 """Update record representing re-assignment of DIASource to SSObject."""
184 ssObjectId: int
185 """ID of SSObject to re-associate to."""
187 ssObjectReassocTimeMjdTai: float
188 """Time when DIASource was re-associated from DIAObject to SSObject."""
190 apdb_table: ClassVar[ApdbTables] = ApdbTables.DiaSource
192 def record_id(self) -> tuple[tuple[str, int], ...]:
193 # Docstring inherited from the base class.
194 return (("diaSourceId", self.diaSourceId),)
196 def record_payload(self) -> tuple[tuple[str, Any], ...]:
197 # Docstring inherited from the base class.
198 return (
199 ("ssObjectId", self.ssObjectId),
200 ("ssObjectReassocTimeMjdTai", self.ssObjectReassocTimeMjdTai),
201 )
204@dataclass(kw_only=True)
205class ApdbWithdrawDiaSourceRecord(ApdbUpdateRecord, DiaSourceId, update_type="withdraw_diasource"):
206 """Update record representing withdrawal of DIASource."""
208 timeWithdrawnMjdTai: float
209 """Time when this record was marked invalid."""
211 apdb_table: ClassVar[ApdbTables] = ApdbTables.DiaSource
213 def record_id(self) -> tuple[tuple[str, int], ...]:
214 # Docstring inherited from the base class.
215 return (("diaSourceId", self.diaSourceId),)
217 def record_payload(self) -> tuple[tuple[str, Any], ...]:
218 # Docstring inherited from the base class.
219 return (("timeWithdrawnMjdTai", self.timeWithdrawnMjdTai),)
222@dataclass(kw_only=True)
223class ApdbWithdrawDiaForcedSourceRecord(
224 ApdbUpdateRecord, DiaForcedSourceId, update_type="withdraw_diaforcedsource"
225):
226 """Update record representing withdrawal of DIAForcedSource."""
228 timeWithdrawnMjdTai: float
229 """Time when this record was marked invalid."""
231 apdb_table: ClassVar[ApdbTables] = ApdbTables.DiaForcedSource
233 def record_id(self) -> tuple[tuple[str, int], ...]:
234 # Docstring inherited from the base class.
235 return (
236 ("diaObjectId", self.diaObjectId),
237 ("visit", self.visit),
238 ("detector", self.detector),
239 )
241 def record_payload(self) -> tuple[tuple[str, Any], ...]:
242 # Docstring inherited from the base class.
243 return (("timeWithdrawnMjdTai", self.timeWithdrawnMjdTai),)
246@dataclass(kw_only=True)
247class ApdbCloseDiaObjectValidityRecord(ApdbUpdateRecord, DiaObjectId, update_type="close_diaobject_validity"):
248 """Record representing closing of the validity interval of DIAObject."""
250 validityEndMjdTai: float
251 """Time to set validityEnd to."""
253 nDiaSources: int | None
254 """New value for nDiaSources column for updated record, or None if
255 nDiaSources does not change.
256 """
258 apdb_table: ClassVar[ApdbTables] = ApdbTables.DiaObject
260 def record_id(self) -> tuple[tuple[str, int], ...]:
261 # Docstring inherited from the base class.
262 return (("diaObjectId", self.diaObjectId),)
264 def record_payload(self) -> tuple[tuple[str, Any], ...]:
265 # Docstring inherited from the base class.
266 payload: tuple[tuple[str, Any], ...] = (("validityEndMjdTai", self.validityEndMjdTai),)
267 # nDiaSources is updated only when not None.
268 if self.nDiaSources is not None:
269 payload += (("nDiaSources", self.nDiaSources),)
270 return payload
273@dataclass(kw_only=True)
274class ApdbUpdateNDiaSourcesRecord(ApdbUpdateRecord, DiaObjectId, update_type="update_n_dia_sources"):
275 """Record representing change in the number of associated sources of
276 DIAObject.
277 """
279 nDiaSources: int
280 """New value for nDiaSources column for updated record."""
282 apdb_table: ClassVar[ApdbTables] = ApdbTables.DiaObject
284 def record_id(self) -> tuple[tuple[str, int], ...]:
285 # Docstring inherited from the base class.
286 return (("diaObjectId", self.diaObjectId),)
288 def record_payload(self) -> tuple[tuple[str, Any], ...]:
289 # Docstring inherited from the base class.
290 return (("nDiaSources", self.nDiaSources),)