Coverage for python / lsst / dax / apdb / apdb.py: 93%

61 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-17 08:58 +0000

1# This file is part of dax_apdb. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22from __future__ import annotations 

23 

24__all__ = ["Apdb", "ApdbConfig"] 

25 

26from abc import ABC, abstractmethod 

27from collections.abc import Iterable, Mapping 

28from typing import TYPE_CHECKING 

29 

30import astropy.time 

31import pandas 

32 

33from lsst.resources import ResourcePathExpression 

34from lsst.sphgeom import Region 

35 

36from .apdbSchema import ApdbSchema, ApdbTables 

37from .config import ApdbConfig 

38from .factory import make_apdb 

39from .recordIds import DiaObjectId, DiaSourceId 

40from .schema_model import Table 

41 

42if TYPE_CHECKING: 

43 from .apdbAdmin import ApdbAdmin 

44 from .apdbMetadata import ApdbMetadata 

45 

46 

47class Apdb(ABC): 

48 """Abstract interface for APDB.""" 

49 

50 @classmethod 

51 def from_config(cls, config: ApdbConfig) -> Apdb: 

52 """Create Ppdb instance from configuration object. 

53 

54 Parameters 

55 ---------- 

56 config : `ApdbConfig` 

57 Configuration object, type of this object determines type of the 

58 Apdb implementation. 

59 

60 Returns 

61 ------- 

62 apdb : `apdb` 

63 Instance of `Apdb` class. 

64 """ 

65 return make_apdb(config) 

66 

67 @classmethod 

68 def from_uri(cls, uri: ResourcePathExpression) -> Apdb: 

69 """Make Apdb instance from a serialized configuration. 

70 

71 Parameters 

72 ---------- 

73 uri : `~lsst.resources.ResourcePathExpression` 

74 URI or local file path pointing to a file with serialized 

75 configuration, or a string with a "label:" prefix. In the latter 

76 case, the configuration will be looked up from an APDB index file 

77 using the label name that follows the prefix. The APDB index file's 

78 location is determined by the ``DAX_APDB_INDEX_URI`` environment 

79 variable. 

80 

81 Returns 

82 ------- 

83 apdb : `apdb` 

84 Instance of `Apdb` class, the type of the returned instance is 

85 determined by configuration. 

86 """ 

87 config = ApdbConfig.from_uri(uri) 

88 return make_apdb(config) 

89 

90 @abstractmethod 

91 def getConfig(self) -> ApdbConfig: 

92 """Return APDB configuration for this instance, including any updates 

93 that may be read from database. 

94 

95 Returns 

96 ------- 

97 config : `ApdbConfig` 

98 APDB configuration. 

99 """ 

100 raise NotImplementedError() 

101 

102 @abstractmethod 

103 def tableDef(self, table: ApdbTables) -> Table | None: 

104 """Return table schema definition for a given table. 

105 

106 Parameters 

107 ---------- 

108 table : `ApdbTables` 

109 One of the known APDB tables. 

110 

111 Returns 

112 ------- 

113 tableSchema : `.schema_model.Table` or `None` 

114 Table schema description, `None` is returned if table is not 

115 defined by this implementation. 

116 """ 

117 raise NotImplementedError() 

118 

119 @abstractmethod 

120 def getDiaObjects(self, region: Region) -> pandas.DataFrame: 

121 """Return catalog of DiaObject instances from a given region. 

122 

123 This method returns only the last version of each DiaObject, 

124 and may return only the subset of the DiaObject columns needed 

125 for AP association. Some 

126 records in a returned catalog may be outside the specified region, it 

127 is up to a client to ignore those records or cleanup the catalog before 

128 futher use. 

129 

130 Parameters 

131 ---------- 

132 region : `lsst.sphgeom.Region` 

133 Region to search for DIAObjects. 

134 

135 Returns 

136 ------- 

137 catalog : `pandas.DataFrame` 

138 Catalog containing DiaObject records for a region that may be a 

139 superset of the specified region. 

140 """ 

141 raise NotImplementedError() 

142 

143 @abstractmethod 

144 def getDiaSources( 

145 self, 

146 region: Region, 

147 object_ids: Iterable[int] | None, 

148 visit_time: astropy.time.Time, 

149 start_time: astropy.time.Time | None = None, 

150 ) -> pandas.DataFrame | None: 

151 """Return catalog of DiaSource instances from a given region. 

152 

153 Parameters 

154 ---------- 

155 region : `lsst.sphgeom.Region` 

156 Region to search for DIASources. 

157 object_ids : iterable [ `int` ], optional 

158 List of DiaObject IDs to further constrain the set of returned 

159 sources. If `None` then returned sources are not constrained. If 

160 list is empty then empty catalog is returned with a correct 

161 schema. 

162 visit_time : `astropy.time.Time` 

163 Time of the current visit. If APDB contains records later than this 

164 time they may also be returned. 

165 start_time : `astropy.time.Time`, optional 

166 Lower bound of time window for the query. If not specified then 

167 it is calculated using ``visit_time`` and 

168 ``read_forced_sources_months`` configuration parameter. 

169 

170 Returns 

171 ------- 

172 catalog : `pandas.DataFrame`, or `None` 

173 Catalog containing DiaSource records. `None` is returned if 

174 ``start_time`` is not specified and ``read_sources_months`` 

175 configuration parameter is set to 0. 

176 

177 Notes 

178 ----- 

179 This method returns DiaSource catalog for a region with additional 

180 filtering based on DiaObject IDs. Only a subset of DiaSource history 

181 is returned limited by ``read_sources_months`` config parameter, w.r.t. 

182 ``visit_time``. If ``object_ids`` is empty then an empty catalog is 

183 always returned with the correct schema (columns/types). If 

184 ``object_ids`` is `None` then no filtering is performed and some of the 

185 returned records may be outside the specified region. 

186 """ 

187 raise NotImplementedError() 

188 

189 @abstractmethod 

190 def getDiaForcedSources( 

191 self, 

192 region: Region, 

193 object_ids: Iterable[int] | None, 

194 visit_time: astropy.time.Time, 

195 start_time: astropy.time.Time | None = None, 

196 ) -> pandas.DataFrame | None: 

197 """Return catalog of DiaForcedSource instances from a given region. 

198 

199 Parameters 

200 ---------- 

201 region : `lsst.sphgeom.Region` 

202 Region to search for DIASources. 

203 object_ids : iterable [ `int` ], optional 

204 List of DiaObject IDs to further constrain the set of returned 

205 sources. If list is empty then empty catalog is returned with a 

206 correct schema. If `None` then returned sources are not 

207 constrained. 

208 visit_time : `astropy.time.Time` 

209 Time of the current visit. If APDB contains records later than this 

210 time they may also be returned. 

211 start_time : `astropy.time.Time`, optional 

212 Lower bound of time window for the query. If not specified then 

213 it is calculated using ``visit_time`` and 

214 ``read_forced_sources_months`` configuration parameter. 

215 

216 Returns 

217 ------- 

218 catalog : `pandas.DataFrame`, or `None` 

219 Catalog containing DiaForcedSource records. `None` is returned if 

220 ``start_time`` is not specified and ``read_forced_sources_months`` 

221 configuration parameter is set to 0. 

222 

223 Raises 

224 ------ 

225 NotImplementedError 

226 May be raised by some implementations if ``object_ids`` is `None`. 

227 

228 Notes 

229 ----- 

230 This method returns DiaForcedSource catalog for a region with 

231 additional filtering based on DiaObject IDs. Only a subset of DiaSource 

232 history is returned limited by ``read_forced_sources_months`` config 

233 parameter, w.r.t. ``visit_time``. If ``object_ids`` is empty then an 

234 empty catalog is always returned with the correct schema 

235 (columns/types). If ``object_ids`` is `None` then no filtering is 

236 performed and some of the returned records may be outside the specified 

237 region. 

238 """ 

239 raise NotImplementedError() 

240 

241 @abstractmethod 

242 def getDiaObjectsForDedup(self, since: astropy.time.Time | None = None) -> pandas.DataFrame: 

243 """Return catalog of DiaObject stored in APDB since specified time. 

244 

245 This method should be used by deduplication algorithm to retrieve 

246 DiaObject records added to APDB since previous deduplication (typically 

247 during previous night). Returned catalog will have only a small subset 

248 of DiaObject attributes required by deduplication algorithm. 

249 

250 Parameters 

251 ---------- 

252 since : `astropy.time.Time`, optional 

253 Starting search time (time of previous deduplication). If not 

254 provided the time of the last deduplication stored in metadata 

255 by `resetDedup` method is used. 

256 

257 Returns 

258 ------- 

259 catalog : `pandas.DataFrame` 

260 Catalog containing DiaObject records, only a subset of attributes 

261 will be returned. 

262 """ 

263 raise NotImplementedError() 

264 

265 @abstractmethod 

266 def getDiaSourcesForDiaObjects( 

267 self, objects: list[DiaObjectId], start_time: astropy.time.Time, max_dist_arcsec: float = 1.0 

268 ) -> pandas.DataFrame: 

269 """Return catalog of DiaSources associated with given DiaObjects. 

270 

271 Parameters 

272 ---------- 

273 objects : `list` [`DiaObjectId`] 

274 DiaObjects associated with returned DiaSources. 

275 start_time : `astropy.time.Time` 

276 Lower bound for ``midpointMjdTai`` for returned DiaSources. 

277 max_dist_arcsec : `float` 

278 Maximum expected distance in arcsec between DiaSource and 

279 DiaObject. This parameter is used to optimize spatial queries in 

280 cases when DiaObject is located near the partition boundary. If the 

281 distance from DiaObject to the boundary is smaller than 

282 ``max_dist_arcsec``, then the neighbor partition will be included 

283 in search too. 

284 

285 Returns 

286 ------- 

287 catalog : `pandas.DataFrame` 

288 Catalog containing DiaSource records associated to given 

289 DiaObjects. 

290 

291 Notes 

292 ----- 

293 Primary purpose of this method is to support deduplication algorithm. 

294 Its implementation is likely to be very slow and inefficient, it should 

295 not be used for regular queries. 

296 """ 

297 raise NotImplementedError() 

298 

299 @abstractmethod 

300 def containsVisitDetector( 

301 self, 

302 visit: int, 

303 detector: int, 

304 region: Region, 

305 visit_time: astropy.time.Time, 

306 ) -> bool: 

307 """Test whether any sources for a given visit-detector are present in 

308 the APDB. 

309 

310 Parameters 

311 ---------- 

312 visit, detector : `int` 

313 The ID of the visit-detector to search for. 

314 region : `lsst.sphgeom.Region` 

315 Region corresponding to the visit/detector combination. 

316 visit_time : `astropy.time.Time` 

317 Visit time (as opposed to visit processing time). This can be any 

318 timestamp in the visit timespan, e.g. its begin or end time. 

319 

320 Returns 

321 ------- 

322 present : `bool` 

323 `True` if at least one DiaSource or DiaForcedSource record 

324 may exist for the specified observation, `False` otherwise. 

325 """ 

326 raise NotImplementedError() 

327 

328 @abstractmethod 

329 def store( 

330 self, 

331 visit_time: astropy.time.Time, 

332 objects: pandas.DataFrame, 

333 sources: pandas.DataFrame | None = None, 

334 forced_sources: pandas.DataFrame | None = None, 

335 ) -> None: 

336 """Store all three types of catalogs in the database. 

337 

338 Parameters 

339 ---------- 

340 visit_time : `astropy.time.Time` 

341 Time of the visit. 

342 objects : `pandas.DataFrame` 

343 Catalog with DiaObject records. 

344 sources : `pandas.DataFrame`, optional 

345 Catalog with DiaSource records. 

346 forced_sources : `pandas.DataFrame`, optional 

347 Catalog with DiaForcedSource records. 

348 

349 Notes 

350 ----- 

351 This methods takes DataFrame catalogs, their schema must be 

352 compatible with the schema of APDB table: 

353 

354 - column names must correspond to database table columns 

355 - types and units of the columns must match database definitions, 

356 no unit conversion is performed presently 

357 - columns that have default values in database schema can be 

358 omitted from catalog 

359 - this method knows how to fill interval-related columns of DiaObject 

360 (validityStart, validityEnd) they do not need to appear in a 

361 catalog 

362 - source catalogs have ``diaObjectId`` column associating sources 

363 with objects 

364 

365 This operation need not be atomic, but DiaSources and DiaForcedSources 

366 will not be stored until all DiaObjects are stored. 

367 """ 

368 raise NotImplementedError() 

369 

370 @abstractmethod 

371 def reassignDiaSourcesToDiaObjects( 

372 self, 

373 idMap: Mapping[DiaSourceId, int], 

374 *, 

375 increment_nDiaSources: bool = True, 

376 decrement_nDiaSources: bool = True, 

377 ) -> None: 

378 """Re-assign DiaSources from one DiaObject to another, typically 

379 during deduplication. 

380 

381 Parameters 

382 ---------- 

383 idMap : `~collections.abc.Mapping` [`DiaSourceId`, `int`] 

384 Mapping from DiaSource to their new ``diaObjectId``. 

385 increment_nDiaSources : `bool`, optional 

386 If `True` then increment the value of ``nDiaSources`` in DiaObjects 

387 that DiaSources are reassigned to. 

388 decrement_nDiaSources : `bool`, optional 

389 If `True` then decrement the value of ``nDiaSources`` in DiaObjects 

390 that DiaSources are reassigned from. 

391 

392 Raises 

393 ------ 

394 LookupError 

395 Raised if some of DiaSources or DiaObjects are not found. 

396 

397 Notes 

398 ----- 

399 DiaSources initially could be associated with SSObjects. This method 

400 needs to be called before `setValidityEnd`. 

401 """ 

402 raise NotImplementedError() 

403 

404 @abstractmethod 

405 def setValidityEnd( 

406 self, objects: list[DiaObjectId], validityEnd: astropy.time.Time, raise_on_missing_id: bool = False 

407 ) -> int: 

408 """Close validity interval for specified DiaObjects. 

409 

410 Parameters 

411 ---------- 

412 objects : `list` [`DiaObjectId`] 

413 DiaObjects which will have their validityEnd updated, if their 

414 current validityEnd is NULL. 

415 validityEnd : `astropy.time.Time` 

416 Value for validityEnd. 

417 raise_on_missing_id : `bool`, optional 

418 If `True` then `LookupError` will be raised if any object in the 

419 list is missing from the database. 

420 

421 Returns 

422 ------- 

423 count : `int` 

424 Actual number of records for which validityEnd was updated. 

425 

426 Raises 

427 ------ 

428 LookupError 

429 Raised if ``raise_on_missing_id`` is `True` and some of the 

430 specified DiaObjects could not be found in the database. 

431 

432 Notes 

433 ----- 

434 This method has to be called after `reassignDiaSourcesToDiaObjects`. 

435 """ 

436 raise NotImplementedError() 

437 

438 @abstractmethod 

439 def resetDedup(self, dedup_time: astropy.time.Time | None = None) -> None: 

440 """Delete deduplication-related data and remember deduplication time. 

441 Deduplication data generated before ``dedup_time`` will be erased. 

442 

443 Parameters 

444 ---------- 

445 dedup_time : `astropy.time.Time`, optional 

446 Time of the last deduplication, current time is used if not 

447 provided. 

448 """ 

449 raise NotImplementedError() 

450 

451 @abstractmethod 

452 def reassignDiaSources(self, idMap: Mapping[int, int]) -> None: 

453 """Associate DiaSources with SSObjects, dis-associating them 

454 from DiaObjects. 

455 

456 Parameters 

457 ---------- 

458 idMap : `Mapping` 

459 Maps DiaSource IDs to their new SSObject IDs. 

460 

461 Raises 

462 ------ 

463 ValueError 

464 Raised if DiaSource ID does not exist in the database. 

465 """ 

466 raise NotImplementedError() 

467 

468 @abstractmethod 

469 def countUnassociatedObjects(self) -> int: 

470 """Return the number of DiaObjects that have only one DiaSource 

471 associated with them. 

472 

473 Used as part of ap_verify metrics. 

474 

475 Returns 

476 ------- 

477 count : `int` 

478 Number of DiaObjects with exactly one associated DiaSource. 

479 

480 Notes 

481 ----- 

482 This method can be very inefficient or slow in some implementations. 

483 """ 

484 raise NotImplementedError() 

485 

486 @property 

487 @abstractmethod 

488 def schema(self) -> ApdbSchema: 

489 """APDB table schema from ``sdm_schemas`` (`ApdbSchema`).""" 

490 raise NotImplementedError() 

491 

492 @property 

493 @abstractmethod 

494 def metadata(self) -> ApdbMetadata: 

495 """Object controlling access to APDB metadata (`ApdbMetadata`).""" 

496 raise NotImplementedError() 

497 

498 @property 

499 @abstractmethod 

500 def admin(self) -> ApdbAdmin: 

501 """Object providing adminitrative interface for APDB (`ApdbAdmin`).""" 

502 raise NotImplementedError() 

503 

504 def _current_time(self) -> astropy.time.Time: 

505 """Return current system time. 

506 

507 Returns 

508 ------- 

509 current_time : `astropy.time.Time` 

510 Current time. 

511 

512 Notes 

513 ----- 

514 This method exists primarily for testing purposes, it can be 

515 monkey-patched in unit tests to return something else than current 

516 system time, if necessary. 

517 """ 

518 return astropy.time.Time.now()