Coverage for python / lsst / dax / apdb / apdb.py: 93%
61 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 08:58 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 08:58 +0000
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
22from __future__ import annotations
24__all__ = ["Apdb", "ApdbConfig"]
26from abc import ABC, abstractmethod
27from collections.abc import Iterable, Mapping
28from typing import TYPE_CHECKING
30import astropy.time
31import pandas
33from lsst.resources import ResourcePathExpression
34from lsst.sphgeom import Region
36from .apdbSchema import ApdbSchema, ApdbTables
37from .config import ApdbConfig
38from .factory import make_apdb
39from .recordIds import DiaObjectId, DiaSourceId
40from .schema_model import Table
42if TYPE_CHECKING:
43 from .apdbAdmin import ApdbAdmin
44 from .apdbMetadata import ApdbMetadata
47class Apdb(ABC):
48 """Abstract interface for APDB."""
50 @classmethod
51 def from_config(cls, config: ApdbConfig) -> Apdb:
52 """Create Ppdb instance from configuration object.
54 Parameters
55 ----------
56 config : `ApdbConfig`
57 Configuration object, type of this object determines type of the
58 Apdb implementation.
60 Returns
61 -------
62 apdb : `apdb`
63 Instance of `Apdb` class.
64 """
65 return make_apdb(config)
67 @classmethod
68 def from_uri(cls, uri: ResourcePathExpression) -> Apdb:
69 """Make Apdb instance from a serialized configuration.
71 Parameters
72 ----------
73 uri : `~lsst.resources.ResourcePathExpression`
74 URI or local file path pointing to a file with serialized
75 configuration, or a string with a "label:" prefix. In the latter
76 case, the configuration will be looked up from an APDB index file
77 using the label name that follows the prefix. The APDB index file's
78 location is determined by the ``DAX_APDB_INDEX_URI`` environment
79 variable.
81 Returns
82 -------
83 apdb : `apdb`
84 Instance of `Apdb` class, the type of the returned instance is
85 determined by configuration.
86 """
87 config = ApdbConfig.from_uri(uri)
88 return make_apdb(config)
90 @abstractmethod
91 def getConfig(self) -> ApdbConfig:
92 """Return APDB configuration for this instance, including any updates
93 that may be read from database.
95 Returns
96 -------
97 config : `ApdbConfig`
98 APDB configuration.
99 """
100 raise NotImplementedError()
102 @abstractmethod
103 def tableDef(self, table: ApdbTables) -> Table | None:
104 """Return table schema definition for a given table.
106 Parameters
107 ----------
108 table : `ApdbTables`
109 One of the known APDB tables.
111 Returns
112 -------
113 tableSchema : `.schema_model.Table` or `None`
114 Table schema description, `None` is returned if table is not
115 defined by this implementation.
116 """
117 raise NotImplementedError()
119 @abstractmethod
120 def getDiaObjects(self, region: Region) -> pandas.DataFrame:
121 """Return catalog of DiaObject instances from a given region.
123 This method returns only the last version of each DiaObject,
124 and may return only the subset of the DiaObject columns needed
125 for AP association. Some
126 records in a returned catalog may be outside the specified region, it
127 is up to a client to ignore those records or cleanup the catalog before
128 futher use.
130 Parameters
131 ----------
132 region : `lsst.sphgeom.Region`
133 Region to search for DIAObjects.
135 Returns
136 -------
137 catalog : `pandas.DataFrame`
138 Catalog containing DiaObject records for a region that may be a
139 superset of the specified region.
140 """
141 raise NotImplementedError()
143 @abstractmethod
144 def getDiaSources(
145 self,
146 region: Region,
147 object_ids: Iterable[int] | None,
148 visit_time: astropy.time.Time,
149 start_time: astropy.time.Time | None = None,
150 ) -> pandas.DataFrame | None:
151 """Return catalog of DiaSource instances from a given region.
153 Parameters
154 ----------
155 region : `lsst.sphgeom.Region`
156 Region to search for DIASources.
157 object_ids : iterable [ `int` ], optional
158 List of DiaObject IDs to further constrain the set of returned
159 sources. If `None` then returned sources are not constrained. If
160 list is empty then empty catalog is returned with a correct
161 schema.
162 visit_time : `astropy.time.Time`
163 Time of the current visit. If APDB contains records later than this
164 time they may also be returned.
165 start_time : `astropy.time.Time`, optional
166 Lower bound of time window for the query. If not specified then
167 it is calculated using ``visit_time`` and
168 ``read_forced_sources_months`` configuration parameter.
170 Returns
171 -------
172 catalog : `pandas.DataFrame`, or `None`
173 Catalog containing DiaSource records. `None` is returned if
174 ``start_time`` is not specified and ``read_sources_months``
175 configuration parameter is set to 0.
177 Notes
178 -----
179 This method returns DiaSource catalog for a region with additional
180 filtering based on DiaObject IDs. Only a subset of DiaSource history
181 is returned limited by ``read_sources_months`` config parameter, w.r.t.
182 ``visit_time``. If ``object_ids`` is empty then an empty catalog is
183 always returned with the correct schema (columns/types). If
184 ``object_ids`` is `None` then no filtering is performed and some of the
185 returned records may be outside the specified region.
186 """
187 raise NotImplementedError()
189 @abstractmethod
190 def getDiaForcedSources(
191 self,
192 region: Region,
193 object_ids: Iterable[int] | None,
194 visit_time: astropy.time.Time,
195 start_time: astropy.time.Time | None = None,
196 ) -> pandas.DataFrame | None:
197 """Return catalog of DiaForcedSource instances from a given region.
199 Parameters
200 ----------
201 region : `lsst.sphgeom.Region`
202 Region to search for DIASources.
203 object_ids : iterable [ `int` ], optional
204 List of DiaObject IDs to further constrain the set of returned
205 sources. If list is empty then empty catalog is returned with a
206 correct schema. If `None` then returned sources are not
207 constrained.
208 visit_time : `astropy.time.Time`
209 Time of the current visit. If APDB contains records later than this
210 time they may also be returned.
211 start_time : `astropy.time.Time`, optional
212 Lower bound of time window for the query. If not specified then
213 it is calculated using ``visit_time`` and
214 ``read_forced_sources_months`` configuration parameter.
216 Returns
217 -------
218 catalog : `pandas.DataFrame`, or `None`
219 Catalog containing DiaForcedSource records. `None` is returned if
220 ``start_time`` is not specified and ``read_forced_sources_months``
221 configuration parameter is set to 0.
223 Raises
224 ------
225 NotImplementedError
226 May be raised by some implementations if ``object_ids`` is `None`.
228 Notes
229 -----
230 This method returns DiaForcedSource catalog for a region with
231 additional filtering based on DiaObject IDs. Only a subset of DiaSource
232 history is returned limited by ``read_forced_sources_months`` config
233 parameter, w.r.t. ``visit_time``. If ``object_ids`` is empty then an
234 empty catalog is always returned with the correct schema
235 (columns/types). If ``object_ids`` is `None` then no filtering is
236 performed and some of the returned records may be outside the specified
237 region.
238 """
239 raise NotImplementedError()
241 @abstractmethod
242 def getDiaObjectsForDedup(self, since: astropy.time.Time | None = None) -> pandas.DataFrame:
243 """Return catalog of DiaObject stored in APDB since specified time.
245 This method should be used by deduplication algorithm to retrieve
246 DiaObject records added to APDB since previous deduplication (typically
247 during previous night). Returned catalog will have only a small subset
248 of DiaObject attributes required by deduplication algorithm.
250 Parameters
251 ----------
252 since : `astropy.time.Time`, optional
253 Starting search time (time of previous deduplication). If not
254 provided the time of the last deduplication stored in metadata
255 by `resetDedup` method is used.
257 Returns
258 -------
259 catalog : `pandas.DataFrame`
260 Catalog containing DiaObject records, only a subset of attributes
261 will be returned.
262 """
263 raise NotImplementedError()
265 @abstractmethod
266 def getDiaSourcesForDiaObjects(
267 self, objects: list[DiaObjectId], start_time: astropy.time.Time, max_dist_arcsec: float = 1.0
268 ) -> pandas.DataFrame:
269 """Return catalog of DiaSources associated with given DiaObjects.
271 Parameters
272 ----------
273 objects : `list` [`DiaObjectId`]
274 DiaObjects associated with returned DiaSources.
275 start_time : `astropy.time.Time`
276 Lower bound for ``midpointMjdTai`` for returned DiaSources.
277 max_dist_arcsec : `float`
278 Maximum expected distance in arcsec between DiaSource and
279 DiaObject. This parameter is used to optimize spatial queries in
280 cases when DiaObject is located near the partition boundary. If the
281 distance from DiaObject to the boundary is smaller than
282 ``max_dist_arcsec``, then the neighbor partition will be included
283 in search too.
285 Returns
286 -------
287 catalog : `pandas.DataFrame`
288 Catalog containing DiaSource records associated to given
289 DiaObjects.
291 Notes
292 -----
293 Primary purpose of this method is to support deduplication algorithm.
294 Its implementation is likely to be very slow and inefficient, it should
295 not be used for regular queries.
296 """
297 raise NotImplementedError()
299 @abstractmethod
300 def containsVisitDetector(
301 self,
302 visit: int,
303 detector: int,
304 region: Region,
305 visit_time: astropy.time.Time,
306 ) -> bool:
307 """Test whether any sources for a given visit-detector are present in
308 the APDB.
310 Parameters
311 ----------
312 visit, detector : `int`
313 The ID of the visit-detector to search for.
314 region : `lsst.sphgeom.Region`
315 Region corresponding to the visit/detector combination.
316 visit_time : `astropy.time.Time`
317 Visit time (as opposed to visit processing time). This can be any
318 timestamp in the visit timespan, e.g. its begin or end time.
320 Returns
321 -------
322 present : `bool`
323 `True` if at least one DiaSource or DiaForcedSource record
324 may exist for the specified observation, `False` otherwise.
325 """
326 raise NotImplementedError()
328 @abstractmethod
329 def store(
330 self,
331 visit_time: astropy.time.Time,
332 objects: pandas.DataFrame,
333 sources: pandas.DataFrame | None = None,
334 forced_sources: pandas.DataFrame | None = None,
335 ) -> None:
336 """Store all three types of catalogs in the database.
338 Parameters
339 ----------
340 visit_time : `astropy.time.Time`
341 Time of the visit.
342 objects : `pandas.DataFrame`
343 Catalog with DiaObject records.
344 sources : `pandas.DataFrame`, optional
345 Catalog with DiaSource records.
346 forced_sources : `pandas.DataFrame`, optional
347 Catalog with DiaForcedSource records.
349 Notes
350 -----
351 This methods takes DataFrame catalogs, their schema must be
352 compatible with the schema of APDB table:
354 - column names must correspond to database table columns
355 - types and units of the columns must match database definitions,
356 no unit conversion is performed presently
357 - columns that have default values in database schema can be
358 omitted from catalog
359 - this method knows how to fill interval-related columns of DiaObject
360 (validityStart, validityEnd) they do not need to appear in a
361 catalog
362 - source catalogs have ``diaObjectId`` column associating sources
363 with objects
365 This operation need not be atomic, but DiaSources and DiaForcedSources
366 will not be stored until all DiaObjects are stored.
367 """
368 raise NotImplementedError()
370 @abstractmethod
371 def reassignDiaSourcesToDiaObjects(
372 self,
373 idMap: Mapping[DiaSourceId, int],
374 *,
375 increment_nDiaSources: bool = True,
376 decrement_nDiaSources: bool = True,
377 ) -> None:
378 """Re-assign DiaSources from one DiaObject to another, typically
379 during deduplication.
381 Parameters
382 ----------
383 idMap : `~collections.abc.Mapping` [`DiaSourceId`, `int`]
384 Mapping from DiaSource to their new ``diaObjectId``.
385 increment_nDiaSources : `bool`, optional
386 If `True` then increment the value of ``nDiaSources`` in DiaObjects
387 that DiaSources are reassigned to.
388 decrement_nDiaSources : `bool`, optional
389 If `True` then decrement the value of ``nDiaSources`` in DiaObjects
390 that DiaSources are reassigned from.
392 Raises
393 ------
394 LookupError
395 Raised if some of DiaSources or DiaObjects are not found.
397 Notes
398 -----
399 DiaSources initially could be associated with SSObjects. This method
400 needs to be called before `setValidityEnd`.
401 """
402 raise NotImplementedError()
404 @abstractmethod
405 def setValidityEnd(
406 self, objects: list[DiaObjectId], validityEnd: astropy.time.Time, raise_on_missing_id: bool = False
407 ) -> int:
408 """Close validity interval for specified DiaObjects.
410 Parameters
411 ----------
412 objects : `list` [`DiaObjectId`]
413 DiaObjects which will have their validityEnd updated, if their
414 current validityEnd is NULL.
415 validityEnd : `astropy.time.Time`
416 Value for validityEnd.
417 raise_on_missing_id : `bool`, optional
418 If `True` then `LookupError` will be raised if any object in the
419 list is missing from the database.
421 Returns
422 -------
423 count : `int`
424 Actual number of records for which validityEnd was updated.
426 Raises
427 ------
428 LookupError
429 Raised if ``raise_on_missing_id`` is `True` and some of the
430 specified DiaObjects could not be found in the database.
432 Notes
433 -----
434 This method has to be called after `reassignDiaSourcesToDiaObjects`.
435 """
436 raise NotImplementedError()
438 @abstractmethod
439 def resetDedup(self, dedup_time: astropy.time.Time | None = None) -> None:
440 """Delete deduplication-related data and remember deduplication time.
441 Deduplication data generated before ``dedup_time`` will be erased.
443 Parameters
444 ----------
445 dedup_time : `astropy.time.Time`, optional
446 Time of the last deduplication, current time is used if not
447 provided.
448 """
449 raise NotImplementedError()
451 @abstractmethod
452 def reassignDiaSources(self, idMap: Mapping[int, int]) -> None:
453 """Associate DiaSources with SSObjects, dis-associating them
454 from DiaObjects.
456 Parameters
457 ----------
458 idMap : `Mapping`
459 Maps DiaSource IDs to their new SSObject IDs.
461 Raises
462 ------
463 ValueError
464 Raised if DiaSource ID does not exist in the database.
465 """
466 raise NotImplementedError()
468 @abstractmethod
469 def countUnassociatedObjects(self) -> int:
470 """Return the number of DiaObjects that have only one DiaSource
471 associated with them.
473 Used as part of ap_verify metrics.
475 Returns
476 -------
477 count : `int`
478 Number of DiaObjects with exactly one associated DiaSource.
480 Notes
481 -----
482 This method can be very inefficient or slow in some implementations.
483 """
484 raise NotImplementedError()
486 @property
487 @abstractmethod
488 def schema(self) -> ApdbSchema:
489 """APDB table schema from ``sdm_schemas`` (`ApdbSchema`)."""
490 raise NotImplementedError()
492 @property
493 @abstractmethod
494 def metadata(self) -> ApdbMetadata:
495 """Object controlling access to APDB metadata (`ApdbMetadata`)."""
496 raise NotImplementedError()
498 @property
499 @abstractmethod
500 def admin(self) -> ApdbAdmin:
501 """Object providing adminitrative interface for APDB (`ApdbAdmin`)."""
502 raise NotImplementedError()
504 def _current_time(self) -> astropy.time.Time:
505 """Return current system time.
507 Returns
508 -------
509 current_time : `astropy.time.Time`
510 Current time.
512 Notes
513 -----
514 This method exists primarily for testing purposes, it can be
515 monkey-patched in unit tests to return something else than current
516 system time, if necessary.
517 """
518 return astropy.time.Time.now()