Coverage for python / lsst / dax / apdb / sql / apdbSqlSchema.py: 18%

195 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-22 08:56 +0000

1# This file is part of dax_apdb. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22"""Module responsible for APDB schema operations.""" 

23 

24from __future__ import annotations 

25 

26__all__ = ["ApdbSqlSchema", "ExtraTables"] 

27 

28import enum 

29import itertools 

30import logging 

31from collections.abc import Mapping 

32 

33import felis.datamodel 

34import sqlalchemy 

35 

36from .. import schema_model 

37from ..apdbSchema import ApdbSchema, ApdbTables 

38from .modelToSql import ModelToSql 

39 

40_LOG = logging.getLogger(__name__) 

41 

42 

43class InconsistentSchemaError(RuntimeError): 

44 """Exception raised when schema state is inconsistent.""" 

45 

46 

47@enum.unique 

48class ExtraTables(enum.Enum): 

49 """Names of the tables used for tracking insert IDs.""" 

50 

51 ApdbReplicaChunks = "ApdbReplicaChunks" 

52 """Name of the table for replica chunks records.""" 

53 

54 DiaObjectChunks = "DiaObjectChunks" 

55 """Name of the table for DIAObject chunk data.""" 

56 

57 DiaSourceChunks = "DiaSourceChunks" 

58 """Name of the table for DIASource chunk data.""" 

59 

60 DiaForcedSourceChunks = "DiaForcedSourceChunks" 

61 """Name of the table for DIAForcedSource chunk data.""" 

62 

63 ApdbUpdateRecordChunks = "ApdbUpdateRecordChunks" 

64 """Name of the table for ApdbUpdateRecord chunk data.""" 

65 

66 def table_name(self, prefix: str = "") -> str: 

67 """Return full table name.""" 

68 return prefix + self.value 

69 

70 @classmethod 

71 def replica_chunk_tables(cls) -> Mapping[ExtraTables, ApdbTables]: 

72 """Return mapping of tables used for replica chunk storage to their 

73 corresponding regular tables. 

74 """ 

75 return { 

76 cls.DiaObjectChunks: ApdbTables.DiaObject, 

77 cls.DiaSourceChunks: ApdbTables.DiaSource, 

78 cls.DiaForcedSourceChunks: ApdbTables.DiaForcedSource, 

79 } 

80 

81 

82class ApdbSqlSchema: 

83 """Class for management of APDB schema. 

84 

85 Attributes 

86 ---------- 

87 objects : `sqlalchemy.Table` 

88 DiaObject table instance 

89 objects_last : `sqlalchemy.Table` 

90 DiaObjectLast table instance, may be None 

91 sources : `sqlalchemy.Table` 

92 DiaSource table instance 

93 forcedSources : `sqlalchemy.Table` 

94 DiaForcedSource table instance 

95 replication_enabled : `bool` 

96 If true then schema has tables for replication chunks. 

97 

98 Parameters 

99 ---------- 

100 table_schema : `ApdbSchema` 

101 Basic description of table schema. 

102 engine : `sqlalchemy.engine.Engine` 

103 SQLAlchemy engine instance 

104 dia_object_index : `str` 

105 Indexing mode for DiaObject table, see `ApdbSqlConfig.dia_object_index` 

106 for details. 

107 htm_index_column : `str` 

108 Name of a HTM index column for DiaObject and DiaSource tables. 

109 prefix : `str`, optional 

110 Prefix to add to all schema elements. 

111 namespace : `str`, optional 

112 Namespace (or schema name) to use for all APDB tables. 

113 enable_replica : `bool`, optional 

114 If `True` then use additional tables for replica chunks. 

115 """ 

116 

117 pixel_id_tables = (ApdbTables.DiaObject, ApdbTables.DiaObjectLast, ApdbTables.DiaSource) 

118 """Tables that need pixelId column for spatial indexing.""" 

119 

120 def __init__( 

121 self, 

122 table_schema: ApdbSchema, 

123 engine: sqlalchemy.engine.Engine, 

124 dia_object_index: str, 

125 htm_index_column: str, 

126 prefix: str = "", 

127 namespace: str | None = None, 

128 enable_replica: bool = False, 

129 ): 

130 self._table_schema = table_schema 

131 self._engine = engine 

132 self._dia_object_index = dia_object_index 

133 self._htm_index_column = htm_index_column 

134 self._prefix = prefix 

135 self._enable_replica = enable_replica 

136 

137 self._metadata = sqlalchemy.schema.MetaData(schema=namespace) 

138 

139 # Add pixelId column and index to tables that need it 

140 for table in self.pixel_id_tables: 

141 tableDef = table_schema.tableSchemas.get(table) 

142 if not tableDef: 

143 continue 

144 column = schema_model.Column( 

145 id=f"#{htm_index_column}", 

146 name=htm_index_column, 

147 datatype=felis.datamodel.DataType.long, 

148 nullable=False, 

149 value=None, 

150 description="Pixelization index column.", 

151 table=tableDef, 

152 ) 

153 tableDef.columns.append(column) 

154 

155 # Adjust index if needed 

156 if table == ApdbTables.DiaObject and self._dia_object_index == "pix_id_iov": 

157 tableDef.primary_key.insert(0, column) 

158 

159 if table is ApdbTables.DiaObjectLast: 

160 # use it as a leading PK column 

161 tableDef.primary_key.insert(0, column) 

162 else: 

163 # make a regular index 

164 name = f"IDX_{tableDef.name}_{htm_index_column}" 

165 index = schema_model.Index(id=f"#{name}", name=name, columns=[column]) 

166 tableDef.indexes.append(index) 

167 

168 # generate schema for all tables, must be called last 

169 apdb_tables = self._make_apdb_tables() 

170 extra_tables = self._make_extra_tables(apdb_tables) 

171 

172 converter = ModelToSql(metadata=self._metadata, prefix=self._prefix) 

173 id_to_table = converter.make_tables(itertools.chain(apdb_tables.values(), extra_tables.values())) 

174 

175 self._apdb_tables = { 

176 apdb_enum: id_to_table[table_model.id] for apdb_enum, table_model in apdb_tables.items() 

177 } 

178 self._extra_tables = { 

179 extra_enum: id_to_table[table_model.id] for extra_enum, table_model in extra_tables.items() 

180 } 

181 

182 self._has_replica_chunks: bool | None = None 

183 self._metadata_check: bool | None = None 

184 

185 @property 

186 def tableSchemas(self) -> Mapping[ApdbTables, schema_model.Table]: 

187 return self._table_schema.tableSchemas 

188 

189 def empty(self) -> bool: 

190 """Return True if database schema is empty. 

191 

192 Returns 

193 ------- 

194 empty : `bool` 

195 `True` if none of the required APDB tables exist in the database, 

196 `False` if all required tables exist. 

197 

198 Raises 

199 ------ 

200 InconsistentSchemaError 

201 Raised when some of the required tables exist but not all. 

202 """ 

203 inspector = sqlalchemy.inspect(self._engine) 

204 table_names = set(inspector.get_table_names(self._metadata.schema)) 

205 

206 existing_tables = [] 

207 missing_tables = [] 

208 for table_enum in self._apdb_tables: 

209 table_name = table_enum.table_name(self._prefix) 

210 if table_name in table_names: 

211 existing_tables.append(table_name) 

212 else: 

213 missing_tables.append(table_name) 

214 

215 if not missing_tables: 

216 return False 

217 elif not existing_tables: 

218 return True 

219 else: 

220 raise InconsistentSchemaError( 

221 f"Only some required APDB tables exist: {existing_tables}, missing tables: {missing_tables}" 

222 ) 

223 

224 def makeSchema(self, drop: bool = False) -> None: 

225 """Create or re-create all tables. 

226 

227 Parameters 

228 ---------- 

229 drop : `bool`, optional 

230 If True then drop tables before creating new ones. 

231 """ 

232 # Create namespace if it does not exist yet, for now this only makes 

233 # sense for postgres. 

234 if self._metadata.schema: 

235 dialect = self._engine.dialect 

236 quoted_schema = dialect.preparer(dialect).quote_schema(self._metadata.schema) 

237 create_schema = sqlalchemy.schema.DDL( 

238 "CREATE SCHEMA IF NOT EXISTS %(schema)s", context={"schema": quoted_schema} 

239 ).execute_if(dialect="postgresql") 

240 sqlalchemy.event.listen(self._metadata, "before_create", create_schema) 

241 

242 # create all tables (optionally drop first) 

243 if drop: 

244 _LOG.info("dropping all tables") 

245 self._metadata.drop_all(self._engine) 

246 _LOG.info("creating all tables") 

247 self._metadata.create_all(self._engine) 

248 

249 # Reset possibly cached value. 

250 self._has_replica_chunks = None 

251 self._metadata_check = None 

252 

253 def get_table(self, table_enum: ApdbTables | ExtraTables) -> sqlalchemy.schema.Table: 

254 """Return SQLAlchemy table instance for a specified table type/enum. 

255 

256 Parameters 

257 ---------- 

258 table_enum : `ApdbTables` or `ExtraTables` 

259 Type of table to return. 

260 

261 Returns 

262 ------- 

263 table : `sqlalchemy.schema.Table` 

264 Table instance. 

265 

266 Raises 

267 ------ 

268 ValueError 

269 Raised if ``table_enum`` is not valid for this database. 

270 """ 

271 try: 

272 if isinstance(table_enum, ApdbTables): 

273 if table_enum is ApdbTables.metadata: 

274 # There may be cases when schema is configured with the 

275 # metadata table but database is still missing it. Check 

276 # that table actually exists in the database. Note that 

277 # this may interact with `makeSchema`. 

278 if self._metadata_check is None: 

279 inspector = sqlalchemy.inspect(self._engine) 

280 table_name = table_enum.table_name(self._prefix) 

281 self._metadata_check = inspector.has_table(table_name, schema=self._metadata.schema) 

282 if not self._metadata_check: 

283 # this will be caught below 

284 raise LookupError("metadata table is missing") 

285 return self._apdb_tables[table_enum] 

286 else: 

287 return self._extra_tables[table_enum] 

288 except LookupError: 

289 raise ValueError(f"Table type {table_enum} does not exist in the schema") from None 

290 

291 def check_column(self, table_enum: ApdbTables | ExtraTables, column: str) -> bool: 

292 """Check for the existence of the column in a given table, checking is 

293 done against database, not APDB schema. 

294 

295 Parameters 

296 ---------- 

297 table_enum : `ApdbTables` or `ExtraTables` 

298 Table to check for a column. 

299 column : `str` 

300 Name of the column to check. 

301 

302 Returns 

303 ------- 

304 exists : `bool` 

305 True if column exists, False otherwise. 

306 """ 

307 inspector = sqlalchemy.inspect(self._engine) 

308 table_name = table_enum.table_name(self._prefix) 

309 columns = inspector.get_columns(table_name, schema=self._metadata.schema) 

310 for col in columns: 

311 if col["name"] == column: 

312 return True 

313 return False 

314 

315 def get_apdb_columns(self, table_enum: ApdbTables | ExtraTables) -> list[sqlalchemy.schema.Column]: 

316 """Return list of columns defined for a table in APDB schema. 

317 

318 Returned list excludes columns that are implementation-specific, e.g. 

319 ``pixelId`` column is not include in the returned list. 

320 

321 Parameters 

322 ---------- 

323 table_enum : `ApdbTables` or `ExtraTables` 

324 Type of table. 

325 

326 Returns 

327 ------- 

328 table : `list` [`sqlalchemy.schema.Column`] 

329 Table instance. 

330 

331 Raises 

332 ------ 

333 ValueError 

334 Raised if ``table_enum`` is not valid for this database. 

335 """ 

336 table = self.get_table(table_enum) 

337 exclude_columns = set() 

338 if table_enum in self.pixel_id_tables: 

339 exclude_columns.add(self._htm_index_column) 

340 return [column for column in table.columns if column.name not in exclude_columns] 

341 

342 @property 

343 def replication_enabled(self) -> bool: 

344 """True if replication is enabled (`bool`).""" 

345 return self._enable_replica 

346 

347 def _make_apdb_tables(self, mysql_engine: str = "InnoDB") -> Mapping[ApdbTables, schema_model.Table]: 

348 """Generate schema for regular tables. 

349 

350 Parameters 

351 ---------- 

352 mysql_engine : `str`, optional 

353 MySQL engine type to use for new tables. 

354 """ 

355 tables: dict[ApdbTables, schema_model.Table] = {} 

356 for table_enum in ApdbTables: 

357 if table_enum is ApdbTables.DiaObjectLast and self._dia_object_index != "last_object_table": 

358 continue 

359 if table_enum is ApdbTables.metadata and table_enum not in self.tableSchemas: 

360 # Schema does not define metadata. 

361 continue 

362 if table_enum in (ApdbTables.SSObject, ApdbTables.SSSource): 

363 # SSObject/SSSource do not exist in APDB, but are defined in 

364 # ApdbTables. The reason is that AP wants to have schema of 

365 # these tables for alert-related business. 

366 continue 

367 table = self.tableSchemas[table_enum] 

368 

369 if table_enum is ApdbTables.DiaObjectLast: 

370 # In the past DiaObjectLast table did not have validityStart. 

371 validity_start_column = "validityStartMjdTai" 

372 try: 

373 if not self.check_column(ApdbTables.DiaObjectLast, validity_start_column): 

374 for column in table.columns: 

375 if column.name == validity_start_column: 

376 table.columns.remove(column) 

377 break 

378 except sqlalchemy.exc.NoSuchTableError: 

379 # Table does not exist yet, will be created later. 

380 pass 

381 

382 tables[table_enum] = table 

383 

384 return tables 

385 

386 def _make_extra_tables( 

387 self, apdb_tables: Mapping[ApdbTables, schema_model.Table] 

388 ) -> Mapping[ExtraTables, schema_model.Table]: 

389 """Generate schema for insert ID tables.""" 

390 if not self._enable_replica: 

391 return {} 

392 

393 tables = {} 

394 column_defs: list[schema_model.Column] = [ 

395 schema_model.Column( 

396 name="apdb_replica_chunk", 

397 id="#ApdbReplicaChunks.apdb_replica_chunk", 

398 datatype=felis.datamodel.DataType.long, 

399 ), 

400 schema_model.Column( 

401 name="last_update_time", 

402 id="#ApdbReplicaChunks.last_update_time", 

403 datatype=felis.datamodel.DataType.timestamp, 

404 nullable=False, 

405 ), 

406 schema_model.Column( 

407 name="unique_id", 

408 id="#ApdbReplicaChunks.unique_id", 

409 datatype=schema_model.ExtraDataTypes.UUID, 

410 nullable=False, 

411 ), 

412 ] 

413 parent_table = schema_model.Table( 

414 name=ExtraTables.ApdbReplicaChunks.table_name(self._prefix), 

415 id="#ApdbReplicaChunks", 

416 columns=column_defs, 

417 primary_key=[column_defs[0]], 

418 constraints=[], 

419 indexes=[], 

420 ) 

421 tables[ExtraTables.ApdbReplicaChunks] = parent_table 

422 

423 for table_enum, apdb_enum in ExtraTables.replica_chunk_tables().items(): 

424 apdb_table = apdb_tables[apdb_enum] 

425 table_name = table_enum.table_name(self._prefix) 

426 

427 columns = self._replicaChunkColumns(table_enum, apdb_enum) 

428 column_map = {column.name: column for column in columns} 

429 # PK is the same as for original table 

430 pk_columns = [column_map[column.name] for column in apdb_table.primary_key] 

431 

432 indices = self._replicaChunkIndices(table_enum, column_map) 

433 constraints = self._replicaChunkConstraints(table_enum, apdb_table, parent_table, column_map) 

434 table = schema_model.Table( 

435 name=table_name, 

436 id=f"#{table_name}", 

437 columns=columns, 

438 primary_key=pk_columns, 

439 indexes=indices, 

440 constraints=constraints, 

441 ) 

442 tables[table_enum] = table 

443 

444 # ApdbUpdateRecordChunks table. 

445 table_name = ExtraTables.ApdbUpdateRecordChunks.table_name(self._prefix) 

446 columns = [ 

447 schema_model.Column( 

448 name="apdb_replica_chunk", 

449 id=f"#{table_name}.apdb_replica_chunk", 

450 datatype=felis.datamodel.DataType.long, 

451 nullable=False, 

452 ), 

453 schema_model.Column( 

454 id=f"#{ExtraTables.ApdbUpdateRecordChunks.value}.update_time_ns", 

455 name="update_time_ns", 

456 datatype=felis.datamodel.DataType.long, 

457 nullable=False, 

458 ), 

459 schema_model.Column( 

460 id=f"#{ExtraTables.ApdbUpdateRecordChunks.value}.update_order", 

461 name="update_order", 

462 datatype=felis.datamodel.DataType.int, 

463 nullable=False, 

464 ), 

465 schema_model.Column( 

466 id=f"#{ExtraTables.ApdbUpdateRecordChunks.value}.update_unique_id", 

467 name="update_unique_id", 

468 datatype=schema_model.ExtraDataTypes.UUID, 

469 nullable=False, 

470 ), 

471 schema_model.Column( 

472 id=f"#{ExtraTables.ApdbUpdateRecordChunks.value}.update_payload", 

473 name="update_payload", 

474 datatype=felis.datamodel.DataType.string, 

475 nullable=False, 

476 ), 

477 ] 

478 tables[ExtraTables.ApdbUpdateRecordChunks] = schema_model.Table( 

479 name=table_name, 

480 id=f"#{table_name}", 

481 columns=columns, 

482 primary_key=columns[:4], 

483 indexes=[], 

484 constraints=[], 

485 ) 

486 

487 return tables 

488 

489 def _replicaChunkColumns( 

490 self, table_enum: ExtraTables, apdb_enum: ApdbTables 

491 ) -> list[schema_model.Column]: 

492 """Return list of columns for replica chunks tables.""" 

493 table_name = table_enum.table_name() 

494 column_defs: list[schema_model.Column] = [ 

495 schema_model.Column( 

496 name="apdb_replica_chunk", 

497 id=f"#{table_name}.apdb_replica_chunk", 

498 datatype=felis.datamodel.DataType.long, 

499 nullable=False, 

500 ) 

501 ] 

502 if table_enum in ExtraTables.replica_chunk_tables(): 

503 table_model = self.tableSchemas[apdb_enum] 

504 column_defs += [column.clone() for column in table_model.primary_key] 

505 else: 

506 assert False, "Above branches have to cover all enum values" 

507 return column_defs 

508 

509 def _replicaChunkIndices( 

510 self, 

511 table_enum: ExtraTables, 

512 column_map: Mapping[str, schema_model.Column], 

513 ) -> list[schema_model.Index]: 

514 """Return set of indices for replica chunk table.""" 

515 index_defs: list[schema_model.Index] = [] 

516 if table_enum in ExtraTables.replica_chunk_tables(): 

517 # Non-unique index on replica chunk column. 

518 name = self._prefix + table_enum.name + "_apdb_replica_chunk_idx" 

519 column = column_map["apdb_replica_chunk"] 

520 index_defs.append(schema_model.Index(name=name, id=f"#{name}", columns=[column])) 

521 return index_defs 

522 

523 def _replicaChunkConstraints( 

524 self, 

525 table_enum: ExtraTables, 

526 apdb_table: schema_model.Table, 

527 parent_table: schema_model.Table, 

528 column_map: Mapping[str, schema_model.Column], 

529 ) -> list[schema_model.Constraint]: 

530 """Return set of constraints for replica chunk table.""" 

531 constraints: list[schema_model.Constraint] = [] 

532 replica_chunk_tables = ExtraTables.replica_chunk_tables() 

533 if table_enum in replica_chunk_tables: 

534 # Foreign key to original table 

535 name = f"{table_enum.table_name()}_fk_{apdb_table.name}" 

536 other_columns = apdb_table.primary_key 

537 this_columns = [column_map[column.name] for column in apdb_table.primary_key] 

538 constraints.append( 

539 schema_model.ForeignKeyConstraint( 

540 name=name, 

541 id=f"#{name}", 

542 columns=this_columns, 

543 referenced_columns=other_columns, 

544 onupdate="CASCADE", 

545 ondelete="CASCADE", 

546 ) 

547 ) 

548 

549 # Foreign key to parent chunk ID table 

550 name = f"{table_enum.table_name()}_fk_{parent_table.name}" 

551 other_columns = parent_table.primary_key 

552 this_columns = [column_map[column.name] for column in parent_table.primary_key] 

553 constraints.append( 

554 schema_model.ForeignKeyConstraint( 

555 name=name, 

556 id=f"#{name}", 

557 columns=this_columns, 

558 referenced_columns=other_columns, 

559 onupdate="CASCADE", 

560 ondelete="CASCADE", 

561 ) 

562 ) 

563 return constraints