Coverage for python / lsst / daf / butler / registry / interfaces / _database.py: 21%

462 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-01 08:18 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27from __future__ import annotations 

28 

29from ... import ddl, time_utils 

30 

31__all__ = [ 

32 "Database", 

33 "DatabaseConflictError", 

34 "DatabaseInsertMode", 

35 "DatabaseMetadata", 

36 "ReadOnlyDatabaseError", 

37 "SchemaAlreadyDefinedError", 

38 "StaticTablesContext", 

39] 

40 

41import enum 

42import os 

43import sys 

44import uuid 

45import warnings 

46from abc import ABC, abstractmethod 

47from collections import defaultdict 

48from collections.abc import Callable, Iterable, Iterator, Sequence 

49from contextlib import contextmanager 

50from threading import Lock 

51from typing import Any, cast, final 

52 

53import astropy.time 

54import sqlalchemy 

55 

56from ..._named import NamedValueAbstractSet 

57from ...name_shrinker import NameShrinker 

58from ...timespan_database_representation import TimespanDatabaseRepresentation 

59from .._exceptions import ConflictingDefinitionError 

60from ._database_explain import get_query_plan 

61 

62 

63class DatabaseInsertMode(enum.Enum): 

64 """Mode options available for inserting database records.""" 

65 

66 INSERT = enum.auto() 

67 """Insert records, failing if they already exist.""" 

68 

69 REPLACE = enum.auto() 

70 """Replace records, overwriting existing.""" 

71 

72 ENSURE = enum.auto() 

73 """Insert records, skipping any that already exist.""" 

74 

75 

76# TODO: method is called with list[ReflectedColumn] in SA 2, and 

77# ReflectedColumn does not exist in 1.4. 

78def _checkExistingTableDefinition(name: str, spec: ddl.TableSpec, inspection: list) -> None: 

79 """Test that the definition of a table in a `ddl.TableSpec` and from 

80 database introspection are consistent. 

81 

82 Parameters 

83 ---------- 

84 name : `str` 

85 Name of the table (only used in error messages). 

86 spec : `ddl.TableSpec` 

87 Specification of the table. 

88 inspection : `dict` 

89 Dictionary returned by 

90 `sqlalchemy.engine.reflection.Inspector.get_columns`. 

91 

92 Raises 

93 ------ 

94 DatabaseConflictError 

95 Raised if the definitions are inconsistent. 

96 """ 

97 columnNames = [c["name"] for c in inspection] 

98 if spec.fields.names != set(columnNames): 

99 raise DatabaseConflictError( 

100 f"Table '{name}' exists but is defined differently in the database; " 

101 f"specification has columns {list(spec.fields.names)}, while the " 

102 f"table in the database has {columnNames}." 

103 ) 

104 

105 

106class ReadOnlyDatabaseError(RuntimeError): 

107 """Exception raised when a write operation is called on a read-only 

108 `Database`. 

109 """ 

110 

111 

112class DatabaseConflictError(ConflictingDefinitionError): 

113 """Exception raised when database content (row values or schema entities) 

114 are inconsistent with what this client expects. 

115 """ 

116 

117 

118class SchemaAlreadyDefinedError(RuntimeError): 

119 """Exception raised when trying to initialize database schema when some 

120 tables already exist. 

121 """ 

122 

123 

124class StaticTablesContext: 

125 """Helper class used to declare the static schema for a registry layer 

126 in a database. 

127 

128 An instance of this class is returned by `Database.declareStaticTables`, 

129 which should be the only way it should be constructed. 

130 

131 Parameters 

132 ---------- 

133 db : `Database` 

134 The database. 

135 connection : `sqlalchemy.engine.Connection` 

136 The connection object. 

137 """ 

138 

139 def __init__(self, db: Database, connection: sqlalchemy.engine.Connection): 

140 self._db = db 

141 self._inspector = sqlalchemy.inspect(connection) 

142 self._tableNames = frozenset(self._inspector.get_table_names(schema=self._db.namespace)) 

143 self._initializers: list[Callable[[Database], None]] = [] 

144 

145 def addTable(self, name: str, spec: ddl.TableSpec) -> sqlalchemy.schema.Table: 

146 """Add a new table to the schema, returning its sqlalchemy 

147 representation. 

148 

149 Parameters 

150 ---------- 

151 name : `str` 

152 The name of the table. 

153 spec : `ddl.TableSpec` 

154 The specification of the table. 

155 

156 Returns 

157 ------- 

158 table : `sqlalchemy.schema.Table` 

159 The created table. 

160 

161 Notes 

162 ----- 

163 The new table may not actually be created until the end of the 

164 context created by `Database.declareStaticTables`, allowing tables 

165 to be declared in any order even in the presence of foreign key 

166 relationships. 

167 """ 

168 metadata = self._db._metadata 

169 assert metadata is not None, "Guaranteed by context manager that returns this object." 

170 return metadata.add_table(self._db, name, spec) 

171 

172 def addTableTuple(self, specs: tuple[ddl.TableSpec, ...]) -> tuple[sqlalchemy.schema.Table, ...]: 

173 """Add a named tuple of tables to the schema, returning their 

174 SQLAlchemy representations in a named tuple of the same type. 

175 

176 The new tables may not actually be created until the end of the 

177 context created by `Database.declareStaticTables`, allowing tables 

178 to be declared in any order even in the presence of foreign key 

179 relationships. 

180 

181 Parameters 

182 ---------- 

183 specs : `tuple` of `ddl.TableSpec` 

184 Specifications of multiple tables. 

185 

186 Returns 

187 ------- 

188 tables : `tuple` of `sqlalchemy.schema.Table` 

189 All the tables created. 

190 

191 Notes 

192 ----- 

193 ``specs`` *must* be an instance of a type created by 

194 `collections.namedtuple`, not just regular tuple, and the returned 

195 object is guaranteed to be the same. Because `~collections.namedtuple` 

196 is just a factory for `type` objects, not an actual type itself, 

197 we cannot represent this with type annotations. 

198 """ 

199 return specs._make( # type: ignore 

200 self.addTable(name, spec) 

201 for name, spec in zip(specs._fields, specs, strict=True) # type: ignore 

202 ) 

203 

204 def addInitializer(self, initializer: Callable[[Database], None]) -> None: 

205 """Add a method that does one-time initialization of a database. 

206 

207 Initialization can mean anything that changes state of a database 

208 and needs to be done exactly once after database schema was created. 

209 An example for that could be population of schema attributes. 

210 

211 Parameters 

212 ---------- 

213 initializer : `~collections.abc.Callable` 

214 Method of a single argument which is a `Database` instance. 

215 """ 

216 self._initializers.append(initializer) 

217 

218 

219class Database(ABC): 

220 """An abstract interface that represents a particular database engine's 

221 representation of a single schema/namespace/database. 

222 

223 Parameters 

224 ---------- 

225 origin : `int` 

226 An integer ID that should be used as the default for any datasets, 

227 quanta, or other entities that use a (autoincrement, origin) compound 

228 primary key. 

229 engine : `sqlalchemy.engine.Engine` 

230 The SQLAlchemy engine for this `Database`. 

231 namespace : `str`, optional 

232 Name of the schema or namespace this instance is associated with. 

233 This is passed as the ``schema`` argument when constructing a 

234 `sqlalchemy.schema.MetaData` instance. We use ``namespace`` instead to 

235 avoid confusion between "schema means namespace" and "schema means 

236 table definitions". 

237 metadata : `sqlalchemy.schema.MetaData`, optional 

238 Object representing the tables and other schema entities. If not 

239 provided, will be generated during the next call to 

240 ``declareStaticTables``. 

241 allow_temporary_tables : `bool`, optional 

242 If `True`, database operations will be allowed to use temporary tables. 

243 If `False`, other SQL constructs will be used instead of temporary 

244 tables when possible. 

245 

246 Notes 

247 ----- 

248 `Database` requires all write operations to go through its special named 

249 methods. Our write patterns are sufficiently simple that we don't really 

250 need the full flexibility of SQL insert/update/delete syntax, and we need 

251 non-standard (but common) functionality in these operations sufficiently 

252 often that it seems worthwhile to provide our own generic API. 

253 

254 In contrast, `Database.query` allows arbitrary ``SELECT`` queries (via 

255 their SQLAlchemy representation) to be run, as we expect these to require 

256 significantly more sophistication while still being limited to standard 

257 SQL. 

258 

259 `Database` itself has several underscore-prefixed attributes: 

260 

261 - ``_engine``: SQLAlchemy object representing its engine. 

262 - ``_connection``: method returning a context manager for 

263 `sqlalchemy.engine.Connection` object. 

264 - ``_metadata``: the `sqlalchemy.schema.MetaData` object representing 

265 the tables and other schema entities. 

266 

267 These are considered protected (derived classes may access them, but other 

268 code should not), and read-only, aside from executing SQL via 

269 ``_connection``. 

270 """ 

271 

272 def __init__( 

273 self, 

274 *, 

275 origin: int, 

276 engine: sqlalchemy.engine.Engine, 

277 namespace: str | None = None, 

278 metadata: DatabaseMetadata | None = None, 

279 allow_temporary_tables: bool = True, 

280 ): 

281 self.origin = origin 

282 self.name_shrinker = NameShrinker(engine.dialect.max_identifier_length) 

283 self.namespace = namespace 

284 self._engine = engine 

285 self._session_connection: sqlalchemy.engine.Connection | None = None 

286 self._temp_tables: set[str] = set() 

287 self._metadata = metadata 

288 self._allow_temporary_tables = allow_temporary_tables 

289 

290 def dispose(self) -> None: 

291 """Close all open database connections held by this `Database` 

292 instance. 

293 """ 

294 self._engine.dispose() 

295 

296 def __repr__(self) -> str: 

297 # Rather than try to reproduce all the parameters used to create 

298 # the object, instead report the more useful information of the 

299 # connection URL. 

300 if self._engine.url.password is not None: 

301 uri = str(self._engine.url.set(password="***")) 

302 else: 

303 uri = str(self._engine.url) 

304 if self.namespace: 

305 uri += f"#{self.namespace}" 

306 return f'{type(self).__name__}("{uri}")' 

307 

308 @classmethod 

309 def makeDefaultUri(cls, root: str) -> str | None: 

310 """Create a default connection URI appropriate for the given root 

311 directory, or `None` if there can be no such default. 

312 

313 Parameters 

314 ---------- 

315 root : `str` 

316 Root string to use to build connection URI. 

317 

318 Returns 

319 ------- 

320 uri : `str` or `None` 

321 The URI string or `None`. 

322 """ 

323 return None 

324 

325 @classmethod 

326 def fromUri( 

327 cls, 

328 uri: str | sqlalchemy.engine.URL, 

329 *, 

330 origin: int, 

331 namespace: str | None = None, 

332 writeable: bool = True, 

333 allow_temporary_tables: bool = True, 

334 ) -> Database: 

335 """Construct a database from a SQLAlchemy URI. 

336 

337 Parameters 

338 ---------- 

339 uri : `str` or `sqlalchemy.engine.URL` 

340 A SQLAlchemy URI connection string. 

341 origin : `int` 

342 An integer ID that should be used as the default for any datasets, 

343 quanta, or other entities that use a (autoincrement, origin) 

344 compound primary key. 

345 namespace : `str`, optional 

346 A database namespace (i.e. schema) the new instance should be 

347 associated with. If `None` (default), the namespace (if any) is 

348 inferred from the URI. 

349 writeable : `bool`, optional 

350 If `True`, allow write operations on the database, including 

351 ``CREATE TABLE``. 

352 allow_temporary_tables : `bool`, optional 

353 If `True`, database operations will be allowed to use temporary 

354 tables. 

355 If `False`, other SQL constructs will be used instead of temporary 

356 tables when possible. 

357 

358 Returns 

359 ------- 

360 db : `Database` 

361 A new `Database` instance. 

362 """ 

363 db = cls.fromEngine( 

364 cls.makeEngine(uri, writeable=writeable), origin=origin, namespace=namespace, writeable=writeable 

365 ) 

366 db._allow_temporary_tables = allow_temporary_tables 

367 return db 

368 

369 @abstractmethod 

370 def clone(self) -> Database: 

371 """Make an independent copy of this `Database` object. 

372 

373 Returns 

374 ------- 

375 db : `Database` 

376 A new `Database` instance with the same configuration as this 

377 instance. 

378 """ 

379 raise NotImplementedError() 

380 

381 @classmethod 

382 @abstractmethod 

383 def makeEngine( 

384 cls, uri: str | sqlalchemy.engine.URL, *, writeable: bool = True 

385 ) -> sqlalchemy.engine.Engine: 

386 """Create a `sqlalchemy.engine.Engine` from a SQLAlchemy URI. 

387 

388 Parameters 

389 ---------- 

390 uri : `str` or `sqlalchemy.engine.URL` 

391 A SQLAlchemy URI connection string. 

392 writeable : `bool`, optional 

393 If `True`, allow write operations on the database, including 

394 ``CREATE TABLE``. 

395 

396 Returns 

397 ------- 

398 engine : `sqlalchemy.engine.Engine` 

399 A database engine. 

400 

401 Notes 

402 ----- 

403 Subclasses that support other ways to connect to a database are 

404 encouraged to add optional arguments to their implementation of this 

405 method, as long as they maintain compatibility with the base class 

406 call signature. 

407 """ 

408 raise NotImplementedError() 

409 

410 @classmethod 

411 @abstractmethod 

412 def fromEngine( 

413 cls, 

414 engine: sqlalchemy.engine.Engine, 

415 *, 

416 origin: int, 

417 namespace: str | None = None, 

418 writeable: bool = True, 

419 ) -> Database: 

420 """Create a new `Database` from an existing `sqlalchemy.engine.Engine`. 

421 

422 Parameters 

423 ---------- 

424 engine : `sqlalchemy.engine.Engine` 

425 The engine for the database. May be shared between `Database` 

426 instances. 

427 origin : `int` 

428 An integer ID that should be used as the default for any datasets, 

429 quanta, or other entities that use a (autoincrement, origin) 

430 compound primary key. 

431 namespace : `str`, optional 

432 A different database namespace (i.e. schema) the new instance 

433 should be associated with. If `None` (default), the namespace 

434 (if any) is inferred from the connection. 

435 writeable : `bool`, optional 

436 If `True`, allow write operations on the database, including 

437 ``CREATE TABLE``. 

438 

439 Returns 

440 ------- 

441 db : `Database` 

442 A new `Database` instance. 

443 

444 Notes 

445 ----- 

446 This method allows different `Database` instances to share the same 

447 engine, which is desirable when they represent different namespaces 

448 can be queried together. 

449 """ 

450 raise NotImplementedError() 

451 

452 @final 

453 @contextmanager 

454 def session(self) -> Iterator[None]: 

455 """Return a context manager that represents a session (persistent 

456 connection to a database). 

457 

458 Returns 

459 ------- 

460 context : `AbstractContextManager` [ `None` ] 

461 A context manager that does not return a value when entered. 

462 

463 Notes 

464 ----- 

465 This method should be used when a sequence of read-only SQL operations 

466 will be performed in rapid succession *without* a requirement that they 

467 yield consistent results in the presence of concurrent writes (or, more 

468 rarely, when conflicting concurrent writes are rare/impossible and the 

469 session will be open long enough that a transaction is inadvisable). 

470 """ 

471 with self._session(): 

472 yield 

473 

474 @final 

475 @contextmanager 

476 def transaction( 

477 self, 

478 *, 

479 interrupting: bool = False, 

480 savepoint: bool = False, 

481 lock: Iterable[sqlalchemy.schema.Table] = (), 

482 for_temp_tables: bool = False, 

483 ) -> Iterator[None]: 

484 """Return a context manager that represents a transaction. 

485 

486 Parameters 

487 ---------- 

488 interrupting : `bool`, optional 

489 If `True` (`False` is default), this transaction block may not be 

490 nested within an outer one, and attempting to do so is a logic 

491 (i.e. assertion) error. 

492 savepoint : `bool`, optional 

493 If `True` (`False` is default), create a ``SAVEPOINT``, allowing 

494 exceptions raised by the database (e.g. due to constraint 

495 violations) during this transaction's context to be caught outside 

496 it without also rolling back all operations in an outer transaction 

497 block. If `False`, transactions may still be nested, but a 

498 rollback may be generated at any level and affects all levels, and 

499 commits are deferred until the outermost block completes. If any 

500 outer transaction block was created with ``savepoint=True``, all 

501 inner blocks will be as well (regardless of the actual value 

502 passed). This has no effect if this is the outermost transaction. 

503 lock : `~collections.abc.Iterable` [ `sqlalchemy.schema.Table` ], \ 

504 optional 

505 A list of tables to lock for the duration of this transaction. 

506 These locks are guaranteed to prevent concurrent writes and allow 

507 this transaction (only) to acquire the same locks (others should 

508 block), but only prevent concurrent reads if the database engine 

509 requires that in order to block concurrent writes. 

510 for_temp_tables : `bool`, optional 

511 If `True`, this transaction may involve creating temporary tables. 

512 

513 Returns 

514 ------- 

515 context : `AbstractContextManager` [ `None` ] 

516 A context manager that commits the transaction when it is exited 

517 without error and rolls back the transactoin when it is exited via 

518 an exception. 

519 

520 Notes 

521 ----- 

522 All transactions on a connection managed by one or more `Database` 

523 instances _must_ go through this method, or transaction state will not 

524 be correctly managed. 

525 """ 

526 with self._transaction( 

527 interrupting=interrupting, savepoint=savepoint, lock=lock, for_temp_tables=for_temp_tables 

528 ): 

529 yield 

530 

531 @contextmanager 

532 def temporary_table( 

533 self, spec: ddl.TableSpec, name: str | None = None 

534 ) -> Iterator[sqlalchemy.schema.Table]: 

535 """Return a context manager that creates and then drops a temporary 

536 table. 

537 

538 Parameters 

539 ---------- 

540 spec : `ddl.TableSpec` 

541 Specification for the columns. Unique and foreign key constraints 

542 may be ignored. 

543 name : `str`, optional 

544 If provided, the name of the SQL construct. If not provided, an 

545 opaque but unique identifier is generated. 

546 

547 Returns 

548 ------- 

549 context : `AbstractContextManager` [ `sqlalchemy.schema.Table` ] 

550 A context manager that returns a SQLAlchemy representation of the 

551 temporary table when entered. 

552 

553 Notes 

554 ----- 

555 Temporary tables may be created, dropped, and written to even in 

556 read-only databases - at least according to the Python-level 

557 protections in the `Database` classes. Server permissions may say 

558 otherwise, but in that case they probably need to be modified to 

559 support the full range of expected read-only butler behavior. 

560 """ 

561 assert self._metadata is not None, "Static tables must be created before temporary tables" 

562 if not self.supports_temporary_tables: 

563 raise ReadOnlyDatabaseError("Creation of temporary tables is not supported by this database.") 

564 with self._session() as connection: 

565 table = self._make_temporary_table(connection, spec=spec, name=name) 

566 self._temp_tables.add(table.key) 

567 try: 

568 yield table 

569 finally: 

570 self._temp_tables.remove(table.key) 

571 self._metadata.remove_table(table.name) 

572 

573 @contextmanager 

574 def _session(self) -> Iterator[sqlalchemy.engine.Connection]: 

575 """Protected implementation for `session` that actually returns the 

576 connection. 

577 

578 This method is for internal `Database` calls that need the actual 

579 SQLAlchemy connection object. It should be overridden by subclasses 

580 instead of `session` itself. 

581 

582 Returns 

583 ------- 

584 context : `AbstractContextManager` [ `sqlalchemy.engine.Connection` ] 

585 A context manager that returns a SQLALchemy connection when 

586 entered. 

587 

588 """ 

589 if self._session_connection is not None: 

590 # session already started, just reuse that 

591 yield self._session_connection 

592 else: 

593 try: 

594 # open new connection and close it when done 

595 self._session_connection = self._engine.connect() 

596 yield self._session_connection 

597 finally: 

598 if self._session_connection is not None: 

599 self._session_connection.close() 

600 self._session_connection = None 

601 # Temporary tables only live within session 

602 self._temp_tables = set() 

603 

604 @contextmanager 

605 def _transaction( 

606 self, 

607 *, 

608 interrupting: bool = False, 

609 savepoint: bool = False, 

610 lock: Iterable[sqlalchemy.schema.Table] = (), 

611 for_temp_tables: bool = False, 

612 ) -> Iterator[tuple[bool, sqlalchemy.engine.Connection]]: 

613 """Protected implementation for `transaction` that actually returns the 

614 connection and whether this is a new outermost transaction. 

615 

616 This method is for internal `Database` calls that need the actual 

617 SQLAlchemy connection object. It should be overridden by subclasses 

618 instead of `transaction` itself. 

619 

620 Parameters 

621 ---------- 

622 interrupting : `bool`, optional 

623 If `True` (`False` is default), this transaction block may not be 

624 nested without an outer one, and attempting to do so is a logic 

625 (i.e. assertion) error. 

626 savepoint : `bool`, optional 

627 If `True` (`False` is default), create a ``SAVEPOINT``, allowing 

628 exceptions raised by the database (e.g. due to constraint 

629 violations) during this transaction's context to be caught outside 

630 it without also rolling back all operations in an outer transaction 

631 block. If `False`, transactions may still be nested, but a 

632 rollback may be generated at any level and affects all levels, and 

633 commits are deferred until the outermost block completes. If any 

634 outer transaction block was created with ``savepoint=True``, all 

635 inner blocks will be as well (regardless of the actual value 

636 passed). This has no effect if this is the outermost transaction. 

637 lock : `~collections.abc.Iterable` [ `sqlalchemy.schema.Table` ], \ 

638 optional 

639 A list of tables to lock for the duration of this transaction. 

640 These locks are guaranteed to prevent concurrent writes and allow 

641 this transaction (only) to acquire the same locks (others should 

642 block), but only prevent concurrent reads if the database engine 

643 requires that in order to block concurrent writes. 

644 for_temp_tables : `bool`, optional 

645 If `True`, this transaction may involve creating temporary tables. 

646 

647 Returns 

648 ------- 

649 context : `AbstractContextManager` [ `tuple` [ `bool`, 

650 `sqlalchemy.engine.Connection` ] ] 

651 A context manager that commits the transaction when it is exited 

652 without error and rolls back the transactoin when it is exited via 

653 an exception. When entered, it returns a tuple of: 

654 

655 - ``is_new`` (`bool`): whether this is a new (outermost) 

656 transaction; 

657 - ``connection`` (`sqlalchemy.engine.Connection`): the connection. 

658 """ 

659 with self._session() as connection: 

660 already_in_transaction = self.isInTransaction() 

661 assert not (interrupting and already_in_transaction), ( 

662 "Logic error in transaction nesting: an operation that would " 

663 "interrupt the active transaction context has been requested." 

664 ) 

665 savepoint = savepoint or connection.in_nested_transaction() 

666 trans: sqlalchemy.engine.Transaction | None 

667 if already_in_transaction: 

668 if savepoint: 

669 trans = connection.begin_nested() 

670 else: 

671 # Nested non-savepoint transactions don't do anything. 

672 trans = None 

673 else: 

674 # Use a regular (non-savepoint) transaction always for the 

675 # outermost context. 

676 trans = connection.begin() 

677 self._lockTables(connection, lock) 

678 try: 

679 yield not already_in_transaction, connection 

680 if trans is not None: 

681 trans.commit() 

682 except BaseException: 

683 if trans is not None: 

684 trans.rollback() 

685 raise 

686 

687 @abstractmethod 

688 def _lockTables( 

689 self, connection: sqlalchemy.engine.Connection, tables: Iterable[sqlalchemy.schema.Table] = () 

690 ) -> None: 

691 """Acquire locks on the given tables. 

692 

693 This is an implementation hook for subclasses, called by `transaction`. 

694 It should not be called directly by other code. 

695 

696 Parameters 

697 ---------- 

698 connection : `sqlalchemy.engine.Connection` 

699 Database connection object. It is guaranteed that transaction is 

700 already in a progress for this connection. 

701 tables : `~collections.abc.Iterable` [ `sqlalchemy.schema.Table` ], \ 

702 optional 

703 A list of tables to lock for the duration of this transaction. 

704 These locks are guaranteed to prevent concurrent writes and allow 

705 this transaction (only) to acquire the same locks (others should 

706 block), but only prevent concurrent reads if the database engine 

707 requires that in order to block concurrent writes. 

708 """ 

709 raise NotImplementedError() 

710 

711 def isTableWriteable(self, table: sqlalchemy.schema.Table) -> bool: 

712 """Check whether a table is writeable, either because the database 

713 connection is read-write or the table is a temporary table. 

714 

715 Parameters 

716 ---------- 

717 table : `sqlalchemy.schema.Table` 

718 SQLAlchemy table object to check. 

719 

720 Returns 

721 ------- 

722 writeable : `bool` 

723 Whether this table is writeable. 

724 """ 

725 return self.isWriteable() or table.key in self._temp_tables 

726 

727 def assertTableWriteable(self, table: sqlalchemy.schema.Table, msg: str) -> None: 

728 """Raise if the given table is not writeable, either because the 

729 database connection is read-write or the table is a temporary table. 

730 

731 Parameters 

732 ---------- 

733 table : `sqlalchemy.schema.Table` 

734 SQLAlchemy table object to check. 

735 msg : `str`, optional 

736 If provided, raise `ReadOnlyDatabaseError` instead of returning 

737 `False`, with this message. 

738 """ 

739 if not self.isTableWriteable(table): 

740 raise ReadOnlyDatabaseError(msg) 

741 

742 @contextmanager 

743 def declareStaticTables(self, *, create: bool) -> Iterator[StaticTablesContext]: 

744 """Return a context manager in which the database's static DDL schema 

745 can be declared. 

746 

747 Parameters 

748 ---------- 

749 create : `bool` 

750 If `True`, attempt to create all tables at the end of the context. 

751 If `False`, they will be assumed to already exist. 

752 

753 Returns 

754 ------- 

755 schema : `StaticTablesContext` 

756 A helper object that is used to add new tables. 

757 

758 Raises 

759 ------ 

760 ReadOnlyDatabaseError 

761 Raised if ``create`` is `True`, `Database.isWriteable` is `False`, 

762 and one or more declared tables do not already exist. 

763 

764 Examples 

765 -------- 

766 Given a `Database` instance ``db``:: 

767 

768 with db.declareStaticTables(create=True) as schema: 

769 schema.addTable("table1", TableSpec(...)) 

770 schema.addTable("table2", TableSpec(...)) 

771 

772 Notes 

773 ----- 

774 A database's static DDL schema must be declared before any dynamic 

775 tables are managed via calls to `ensureTableExists` or 

776 `getExistingTable`. The order in which static schema tables are added 

777 inside the context block is unimportant; they will automatically be 

778 sorted and added in an order consistent with their foreign key 

779 relationships. 

780 """ 

781 if create and not self.isWriteable(): 

782 raise ReadOnlyDatabaseError(f"Cannot create tables in read-only database {self}.") 

783 

784 self._metadata = DatabaseMetadata(self.namespace) 

785 try: 

786 with self._transaction() as (_, connection): 

787 context = StaticTablesContext(self, connection) 

788 if create and context._tableNames: 

789 # Looks like database is already initalized, to avoid 

790 # danger of modifying/destroying valid schema we refuse to 

791 # do anything in this case 

792 raise SchemaAlreadyDefinedError(f"Cannot create tables in non-empty database {self}.") 

793 yield context 

794 if create: 

795 if ( 

796 self.namespace is not None 

797 and self.namespace not in context._inspector.get_schema_names() 

798 ): 

799 connection.execute(sqlalchemy.schema.CreateSchema(self.namespace)) 

800 # In our tables we have columns that make use of sqlalchemy 

801 # Sequence objects. There is currently a bug in sqlalchemy 

802 # that causes a deprecation warning to be thrown on a 

803 # property of the Sequence object when the repr for the 

804 # sequence is created. Here a filter is used to catch these 

805 # deprecation warnings when tables are created. 

806 with warnings.catch_warnings(): 

807 warnings.simplefilter("ignore", category=sqlalchemy.exc.SADeprecationWarning) 

808 self._metadata.create_all(connection) 

809 # call all initializer methods sequentially 

810 for init in context._initializers: 

811 init(self) 

812 except BaseException: 

813 self._metadata = None 

814 raise 

815 

816 @abstractmethod 

817 def isWriteable(self) -> bool: 

818 """Return `True` if this database can be modified by this client.""" 

819 raise NotImplementedError() 

820 

821 def isInTransaction(self) -> bool: 

822 """Return `True` if there is currently a database connection open with 

823 an active transaction; `False` otherwise. 

824 """ 

825 session = self._session_connection 

826 return session is not None and session.in_transaction() 

827 

828 @abstractmethod 

829 def __str__(self) -> str: 

830 """Return a human-readable identifier for this `Database`, including 

831 any namespace or schema that identifies its names within a `Registry`. 

832 """ 

833 raise NotImplementedError() 

834 

835 @property 

836 def dialect(self) -> sqlalchemy.engine.Dialect: 

837 """The SQLAlchemy dialect for this database engine 

838 (`sqlalchemy.engine.Dialect`). 

839 """ 

840 return self._engine.dialect 

841 

842 def shrinkDatabaseEntityName(self, original: str) -> str: 

843 """Return a version of the given name that fits within this database 

844 engine's length limits for table, constraint, indexes, and sequence 

845 names. 

846 

847 Implementations should not assume that simple truncation is safe, 

848 because multiple long names often begin with the same prefix. 

849 

850 The default implementation simply returns the given name. 

851 

852 Parameters 

853 ---------- 

854 original : `str` 

855 The original name. 

856 

857 Returns 

858 ------- 

859 shrunk : `str` 

860 The new, possibly shortened name. 

861 """ 

862 return original 

863 

864 def expandDatabaseEntityName(self, shrunk: str) -> str: 

865 """Retrieve the original name for a database entity that was too long 

866 to fit within the database engine's limits. 

867 

868 Parameters 

869 ---------- 

870 shrunk : `str` 

871 The original name. 

872 

873 Returns 

874 ------- 

875 shrunk : `str` 

876 The new, possibly shortened name. 

877 """ 

878 return shrunk 

879 

880 def _makeColumnConstraints(self, table: str, spec: ddl.FieldSpec) -> list[sqlalchemy.CheckConstraint]: 

881 """Create constraints based on this spec. 

882 

883 Parameters 

884 ---------- 

885 table : `str` 

886 Name of the table this column is being added to. 

887 spec : `FieldSpec` 

888 Specification for the field to be added. 

889 

890 Returns 

891 ------- 

892 constraint : `list` of `sqlalchemy.CheckConstraint` 

893 Constraint added for this column. 

894 """ 

895 # By default we return no additional constraints 

896 return [] 

897 

898 def _convertFieldSpec( 

899 self, table: str, spec: ddl.FieldSpec, metadata: sqlalchemy.MetaData, **kwargs: Any 

900 ) -> sqlalchemy.schema.Column: 

901 """Convert a `FieldSpec` to a `sqlalchemy.schema.Column`. 

902 

903 Parameters 

904 ---------- 

905 table : `str` 

906 Name of the table this column is being added to. 

907 spec : `FieldSpec` 

908 Specification for the field to be added. 

909 metadata : `sqlalchemy.MetaData` 

910 SQLAlchemy representation of the DDL schema this field's table is 

911 being added to. 

912 **kwargs 

913 Additional keyword arguments to forward to the 

914 `sqlalchemy.schema.Column` constructor. This is provided to make 

915 it easier for derived classes to delegate to ``super()`` while 

916 making only minor changes. 

917 

918 Returns 

919 ------- 

920 column : `sqlalchemy.schema.Column` 

921 SQLAlchemy representation of the field. 

922 """ 

923 args = [] 

924 if spec.autoincrement: 

925 # Generate a sequence to use for auto incrementing for databases 

926 # that do not support it natively. This will be ignored by 

927 # sqlalchemy for databases that do support it. 

928 args.append( 

929 sqlalchemy.Sequence( 

930 self.shrinkDatabaseEntityName(f"{table}_seq_{spec.name}"), metadata=metadata 

931 ) 

932 ) 

933 assert spec.doc is None or isinstance(spec.doc, str), f"Bad doc for {table}.{spec.name}." 

934 return sqlalchemy.schema.Column( 

935 spec.name, 

936 spec.getSizedColumnType(), 

937 *args, 

938 nullable=spec.nullable, 

939 primary_key=spec.primaryKey, 

940 comment=spec.doc, 

941 server_default=spec.default, 

942 **kwargs, 

943 ) 

944 

945 def _convertForeignKeySpec( 

946 self, table: str, spec: ddl.ForeignKeySpec, metadata: sqlalchemy.MetaData, **kwargs: Any 

947 ) -> sqlalchemy.schema.ForeignKeyConstraint: 

948 """Convert a `ForeignKeySpec` to a 

949 `sqlalchemy.schema.ForeignKeyConstraint`. 

950 

951 Parameters 

952 ---------- 

953 table : `str` 

954 Name of the table this foreign key is being added to. 

955 spec : `ForeignKeySpec` 

956 Specification for the foreign key to be added. 

957 metadata : `sqlalchemy.MetaData` 

958 SQLAlchemy representation of the DDL schema this constraint is 

959 being added to. 

960 **kwargs 

961 Additional keyword arguments to forward to the 

962 `sqlalchemy.schema.ForeignKeyConstraint` constructor. This is 

963 provided to make it easier for derived classes to delegate to 

964 ``super()`` while making only minor changes. 

965 

966 Returns 

967 ------- 

968 constraint : `sqlalchemy.schema.ForeignKeyConstraint` 

969 SQLAlchemy representation of the constraint. 

970 """ 

971 name = self.shrinkDatabaseEntityName( 

972 "_".join(["fkey", table, spec.table] + list(spec.target) + list(spec.source)) 

973 ) 

974 return sqlalchemy.schema.ForeignKeyConstraint( 

975 spec.source, 

976 [f"{spec.table}.{col}" for col in spec.target], 

977 name=name, 

978 ondelete=spec.onDelete, 

979 ) 

980 

981 def _convertExclusionConstraintSpec( 

982 self, 

983 table: str, 

984 spec: tuple[str | type[TimespanDatabaseRepresentation], ...], 

985 metadata: sqlalchemy.MetaData, 

986 ) -> sqlalchemy.schema.Constraint: 

987 """Convert a `tuple` from `ddl.TableSpec.exclusion` into a SQLAlchemy 

988 constraint representation. 

989 

990 Parameters 

991 ---------- 

992 table : `str` 

993 Name of the table this constraint is being added to. 

994 spec : `tuple` [ `str` or `type` ] 

995 A tuple of `str` column names and the `type` object returned by 

996 `getTimespanRepresentation` (which must appear exactly once), 

997 indicating the order of the columns in the index used to back the 

998 constraint. 

999 metadata : `sqlalchemy.MetaData` 

1000 SQLAlchemy representation of the DDL schema this constraint is 

1001 being added to. 

1002 

1003 Returns 

1004 ------- 

1005 constraint : `sqlalchemy.schema.Constraint` 

1006 SQLAlchemy representation of the constraint. 

1007 

1008 Raises 

1009 ------ 

1010 NotImplementedError 

1011 Raised if this database does not support exclusion constraints. 

1012 """ 

1013 raise NotImplementedError(f"Database {self} does not support exclusion constraints.") 

1014 

1015 def _convertTableSpec( 

1016 self, name: str, spec: ddl.TableSpec, metadata: sqlalchemy.MetaData, **kwargs: Any 

1017 ) -> sqlalchemy.schema.Table: 

1018 """Convert a `TableSpec` to a `sqlalchemy.schema.Table`. 

1019 

1020 Parameters 

1021 ---------- 

1022 name : `str` 

1023 The name of the table. 

1024 spec : `TableSpec` 

1025 Specification for the foreign key to be added. 

1026 metadata : `sqlalchemy.MetaData` 

1027 SQLAlchemy representation of the DDL schema this table is being 

1028 added to. 

1029 **kwargs 

1030 Additional keyword arguments to forward to the 

1031 `sqlalchemy.schema.Table` constructor. This is provided to make it 

1032 easier for derived classes to delegate to ``super()`` while making 

1033 only minor changes. 

1034 

1035 Returns 

1036 ------- 

1037 table : `sqlalchemy.schema.Table` 

1038 SQLAlchemy representation of the table. 

1039 

1040 Notes 

1041 ----- 

1042 This method does not handle ``spec.foreignKeys`` at all, in order to 

1043 avoid circular dependencies. These are added by higher-level logic in 

1044 `ensureTableExists`, `getExistingTable`, and `declareStaticTables`. 

1045 """ 

1046 args: list[sqlalchemy.schema.SchemaItem] = [ 

1047 self._convertFieldSpec(name, fieldSpec, metadata) for fieldSpec in spec.fields 

1048 ] 

1049 

1050 # Add any column constraints 

1051 for fieldSpec in spec.fields: 

1052 args.extend(self._makeColumnConstraints(name, fieldSpec)) 

1053 

1054 # Track indexes added for primary key and unique constraints, to make 

1055 # sure we don't add duplicate explicit or foreign key indexes for 

1056 # those. 

1057 allIndexes = {tuple(fieldSpec.name for fieldSpec in spec.fields if fieldSpec.primaryKey)} 

1058 args.extend( 

1059 sqlalchemy.schema.UniqueConstraint( 

1060 *columns, name=self.shrinkDatabaseEntityName("_".join([name, "unq"] + list(columns))) 

1061 ) 

1062 for columns in spec.unique 

1063 ) 

1064 allIndexes.update(spec.unique) 

1065 args.extend( 

1066 sqlalchemy.schema.Index( 

1067 self.shrinkDatabaseEntityName("_".join([name, "idx"] + list(index.columns))), 

1068 *index.columns, 

1069 unique=(index.columns in spec.unique), 

1070 **index.kwargs, 

1071 ) 

1072 for index in spec.indexes 

1073 if index.columns not in allIndexes 

1074 ) 

1075 allIndexes.update(index.columns for index in spec.indexes) 

1076 args.extend( 

1077 sqlalchemy.schema.Index( 

1078 self.shrinkDatabaseEntityName("_".join((name, "fkidx") + fk.source)), 

1079 *fk.source, 

1080 ) 

1081 for fk in spec.foreignKeys 

1082 if fk.addIndex and fk.source not in allIndexes 

1083 ) 

1084 

1085 args.extend(self._convertExclusionConstraintSpec(name, excl, metadata) for excl in spec.exclusion) 

1086 

1087 assert spec.doc is None or isinstance(spec.doc, str), f"Bad doc for {name}." 

1088 return sqlalchemy.schema.Table(name, metadata, *args, comment=spec.doc, info={"spec": spec}, **kwargs) 

1089 

1090 def ensureTableExists(self, name: str, spec: ddl.TableSpec) -> sqlalchemy.schema.Table: 

1091 """Ensure that a table with the given name and specification exists, 

1092 creating it if necessary. 

1093 

1094 Parameters 

1095 ---------- 

1096 name : `str` 

1097 Name of the table (not including namespace qualifiers). 

1098 spec : `TableSpec` 

1099 Specification for the table. This will be used when creating the 

1100 table, and *may* be used when obtaining an existing table to check 

1101 for consistency, but no such check is guaranteed. 

1102 

1103 Returns 

1104 ------- 

1105 table : `sqlalchemy.schema.Table` 

1106 SQLAlchemy representation of the table. 

1107 

1108 Raises 

1109 ------ 

1110 ReadOnlyDatabaseError 

1111 Raised if `isWriteable` returns `False`, and the table does not 

1112 already exist. 

1113 DatabaseConflictError 

1114 Raised if the table exists but ``spec`` is inconsistent with its 

1115 definition. 

1116 

1117 Notes 

1118 ----- 

1119 This method may not be called within transactions. It may be called on 

1120 read-only databases if and only if the table does in fact already 

1121 exist. 

1122 

1123 Subclasses may override this method, but usually should not need to. 

1124 """ 

1125 # TODO: if _engine is used to make a table then it uses separate 

1126 # connection and should not interfere with current transaction 

1127 assert not self.isInTransaction(), "Table creation interrupts transactions." 

1128 assert self._metadata is not None, "Static tables must be declared before dynamic tables." 

1129 table = self.getExistingTable(name, spec) 

1130 if table is not None: 

1131 return table 

1132 if not self.isWriteable(): 

1133 raise ReadOnlyDatabaseError( 

1134 f"Table {name} does not exist, and cannot be created because database {self} is read-only." 

1135 ) 

1136 

1137 table = self._metadata.add_table(self, name, spec) 

1138 try: 

1139 with self._transaction() as (_, connection): 

1140 table.create(connection) 

1141 except sqlalchemy.exc.DatabaseError: 

1142 # Some other process could have created the table meanwhile, which 

1143 # usually causes OperationalError or ProgrammingError. We cannot 

1144 # use IF NOT EXISTS clause in this case due to PostgreSQL race 

1145 # condition on server side which causes IntegrityError. Instead we 

1146 # catch these exceptions (they all inherit DatabaseError) and 

1147 # re-check whether table is now there. 

1148 table = self.getExistingTable(name, spec) 

1149 if table is None: 

1150 raise 

1151 return table 

1152 

1153 def getExistingTable(self, name: str, spec: ddl.TableSpec) -> sqlalchemy.schema.Table | None: 

1154 """Obtain an existing table with the given name and specification. 

1155 

1156 Parameters 

1157 ---------- 

1158 name : `str` 

1159 Name of the table (not including namespace qualifiers). 

1160 spec : `TableSpec` 

1161 Specification for the table. This will be used when creating the 

1162 SQLAlchemy representation of the table, and it is used to 

1163 check that the actual table in the database is consistent. 

1164 

1165 Returns 

1166 ------- 

1167 table : `sqlalchemy.schema.Table` or `None` 

1168 SQLAlchemy representation of the table, or `None` if it does not 

1169 exist. 

1170 

1171 Raises 

1172 ------ 

1173 DatabaseConflictError 

1174 Raised if the table exists but ``spec`` is inconsistent with its 

1175 definition. 

1176 

1177 Notes 

1178 ----- 

1179 This method can be called within transactions and never modifies the 

1180 database. 

1181 

1182 Subclasses may override this method, but usually should not need to. 

1183 """ 

1184 assert self._metadata is not None, "Static tables must be declared before dynamic tables." 

1185 table = self._metadata.get_table(name) 

1186 if table is not None: 

1187 if spec.fields.names != set(table.columns.keys()): 

1188 raise DatabaseConflictError( 

1189 f"Table '{name}' has already been defined differently; the new " 

1190 f"specification has columns {list(spec.fields.names)}, while " 

1191 f"the previous definition has {list(table.columns.keys())}." 

1192 ) 

1193 else: 

1194 inspector = sqlalchemy.inspect( 

1195 self._engine if self._session_connection is None else self._session_connection, raiseerr=True 

1196 ) 

1197 if name in inspector.get_table_names(schema=self.namespace): 

1198 _checkExistingTableDefinition(name, spec, inspector.get_columns(name, schema=self.namespace)) 

1199 return self._metadata.add_table(self, name, spec) 

1200 return table 

1201 

1202 def _make_temporary_table( 

1203 self, 

1204 connection: sqlalchemy.engine.Connection, 

1205 spec: ddl.TableSpec, 

1206 name: str | None = None, 

1207 **kwargs: Any, 

1208 ) -> sqlalchemy.schema.Table: 

1209 """Create a temporary table. 

1210 

1211 Parameters 

1212 ---------- 

1213 connection : `sqlalchemy.engine.Connection` 

1214 Connection to use when creating the table. 

1215 spec : `TableSpec` 

1216 Specification for the table. 

1217 name : `str`, optional 

1218 A unique (within this session/connetion) name for the table. 

1219 Subclasses may override to modify the actual name used. If not 

1220 provided, a unique name will be generated. 

1221 **kwargs 

1222 Additional keyword arguments to forward to the 

1223 `sqlalchemy.schema.Table` constructor. This is provided to make it 

1224 easier for derived classes to delegate to ``super()`` while making 

1225 only minor changes. 

1226 

1227 Returns 

1228 ------- 

1229 table : `sqlalchemy.schema.Table` 

1230 SQLAlchemy representation of the table. 

1231 """ 

1232 if name is None: 

1233 name = f"tmp_{uuid.uuid4().hex}" 

1234 if self._metadata is None: 

1235 raise RuntimeError("Cannot create temporary table before static schema is defined.") 

1236 table = self._metadata.add_table( 

1237 self, name, spec, prefixes=["TEMPORARY"], schema=sqlalchemy.schema.BLANK_SCHEMA, **kwargs 

1238 ) 

1239 if table.key in self._temp_tables and table.key != name: 

1240 raise ValueError( 

1241 f"A temporary table with name {name} (transformed to {table.key} by Database) already exists." 

1242 ) 

1243 with self._transaction(): 

1244 table.create(connection) 

1245 return table 

1246 

1247 @classmethod 

1248 def getTimespanRepresentation(cls) -> type[TimespanDatabaseRepresentation]: 

1249 """Return a `type` that encapsulates the way `Timespan` objects are 

1250 stored in this database. 

1251 

1252 `Database` does not automatically use the return type of this method 

1253 anywhere else; calling code is responsible for making sure that DDL 

1254 and queries are consistent with it. 

1255 

1256 Returns 

1257 ------- 

1258 TimespanReprClass : `type` (`TimespanDatabaseRepresention` subclass) 

1259 A type that encapsulates the way `Timespan` objects should be 

1260 stored in this database. 

1261 

1262 Notes 

1263 ----- 

1264 There are two big reasons we've decided to keep timespan-mangling logic 

1265 outside the `Database` implementations, even though the choice of 

1266 representation is ultimately up to a `Database` implementation: 

1267 

1268 - Timespans appear in relatively few tables and queries in our 

1269 typical usage, and the code that operates on them is already aware 

1270 that it is working with timespans. In contrast, a 

1271 timespan-representation-aware implementation of, say, 

1272 `Database.insert`, would need to have extra logic to identify when 

1273 timespan-mangling needed to occur, which would usually be useless 

1274 overhead. 

1275 

1276 - SQLAlchemy's rich SELECT query expression system has no way to wrap 

1277 multiple columns in a single expression object (the ORM does, but 

1278 we are not using the ORM). So we would have to wrap _much_ more of 

1279 that code in our own interfaces to encapsulate timespan 

1280 representations there. 

1281 """ 

1282 return TimespanDatabaseRepresentation.Compound 

1283 

1284 def sync( 

1285 self, 

1286 table: sqlalchemy.schema.Table, 

1287 *, 

1288 keys: dict[str, Any], 

1289 compared: dict[str, Any] | None = None, 

1290 extra: dict[str, Any] | None = None, 

1291 returning: Sequence[str] | None = None, 

1292 update: bool = False, 

1293 ) -> tuple[dict[str, Any] | None, bool | dict[str, Any]]: 

1294 """Insert into a table as necessary to ensure database contains 

1295 values equivalent to the given ones. 

1296 

1297 Parameters 

1298 ---------- 

1299 table : `sqlalchemy.schema.Table` 

1300 Table to be queried and possibly inserted into. 

1301 keys : `dict` 

1302 Column name-value pairs used to search for an existing row; must 

1303 be a combination that can be used to select a single row if one 

1304 exists. If such a row does not exist, these values are used in 

1305 the insert. 

1306 compared : `dict`, optional 

1307 Column name-value pairs that are compared to those in any existing 

1308 row. If such a row does not exist, these rows are used in the 

1309 insert. 

1310 extra : `dict`, optional 

1311 Column name-value pairs that are ignored if a matching row exists, 

1312 but used in an insert if one is necessary. 

1313 returning : `~collections.abc.Sequence` of `str`, optional 

1314 The names of columns whose values should be returned. 

1315 update : `bool`, optional 

1316 If `True` (`False` is default), update the existing row with the 

1317 values in ``compared`` instead of raising `DatabaseConflictError`. 

1318 

1319 Returns 

1320 ------- 

1321 row : `dict`, optional 

1322 The value of the fields indicated by ``returning``, or `None` if 

1323 ``returning`` is `None`. 

1324 inserted_or_updated : `bool` or `dict` 

1325 If `True`, a new row was inserted; if `False`, a matching row 

1326 already existed. If a `dict` (only possible if ``update=True``), 

1327 then an existing row was updated, and the dict maps the names of 

1328 the updated columns to their *old* values (new values can be 

1329 obtained from ``compared``). 

1330 

1331 Raises 

1332 ------ 

1333 DatabaseConflictError 

1334 Raised if the values in ``compared`` do not match the values in the 

1335 database. 

1336 ReadOnlyDatabaseError 

1337 Raised if `isWriteable` returns `False`, and no matching record 

1338 already exists. 

1339 

1340 Notes 

1341 ----- 

1342 May be used inside transaction contexts, so implementations may not 

1343 perform operations that interrupt transactions. 

1344 

1345 It may be called on read-only databases if and only if the matching row 

1346 does in fact already exist. 

1347 """ 

1348 

1349 def check() -> tuple[int, dict[str, Any] | None, list | None]: 

1350 """Query for a row that matches the ``key`` argument, and compare 

1351 to what was given by the caller. 

1352 

1353 Returns 

1354 ------- 

1355 n : `int` 

1356 Number of matching rows. ``n != 1`` is always an error, but 

1357 it's a different kind of error depending on where `check` is 

1358 being called. 

1359 bad : `dict` or `None` 

1360 The subset of the keys of ``compared`` for which the existing 

1361 values did not match the given one, mapped to the existing 

1362 values in the database. Once again, ``not bad`` is always an 

1363 error, but a different kind on context. `None` if ``n != 1``. 

1364 result : `list` or `None` 

1365 Results in the database that correspond to the columns given 

1366 in ``returning``, or `None` if ``returning is None``. 

1367 """ 

1368 toSelect: set[str] = set() 

1369 if compared is not None: 

1370 toSelect.update(compared.keys()) 

1371 if returning is not None: 

1372 toSelect.update(returning) 

1373 if not toSelect: 

1374 # Need to select some column, even if we just want to see 

1375 # how many rows we get back. 

1376 toSelect.add(next(iter(keys.keys()))) 

1377 selectSql = ( 

1378 sqlalchemy.sql.select(*[table.columns[k].label(k) for k in toSelect]) 

1379 .select_from(table) 

1380 .where(sqlalchemy.sql.and_(*[table.columns[k] == v for k, v in keys.items()])) 

1381 ) 

1382 with self._transaction() as (_, connection): 

1383 fetched = list(connection.execute(selectSql).mappings()) 

1384 if len(fetched) != 1: 

1385 return len(fetched), None, None 

1386 existing = fetched[0] 

1387 if compared is not None: 

1388 

1389 def safeNotEqual(a: Any, b: Any) -> bool: 

1390 if isinstance(a, astropy.time.Time): 

1391 return not time_utils.TimeConverter().times_equal(a, b) 

1392 return a != b 

1393 

1394 inconsistencies = { 

1395 k: existing[k] for k, v in compared.items() if safeNotEqual(existing[k], v) 

1396 } 

1397 else: 

1398 inconsistencies = {} 

1399 if returning is not None: 

1400 toReturn: list | None = [existing[k] for k in returning] 

1401 else: 

1402 toReturn = None 

1403 return 1, inconsistencies, toReturn 

1404 

1405 def _format_bad(inconsistencies: dict[str, Any]) -> str: 

1406 """Format the 'bad' dictionary of existing values returned by 

1407 ``check`` into a string suitable for an error message. 

1408 """ 

1409 assert compared is not None, "Should not be able to get inconsistencies without comparing." 

1410 return ", ".join(f"{k}: {v!r} != {compared[k]!r}" for k, v in inconsistencies.items()) 

1411 

1412 if self.isTableWriteable(table): 

1413 # Try an insert first, but allow it to fail (in only specific 

1414 # ways). 

1415 row = keys.copy() 

1416 if compared is not None: 

1417 row.update(compared) 

1418 if extra is not None: 

1419 row.update(extra) 

1420 with self.transaction(): 

1421 inserted = bool(self.ensure(table, row)) 

1422 inserted_or_updated: bool | dict[str, Any] 

1423 # Need to perform check() for this branch inside the 

1424 # transaction, so we roll back an insert that didn't do 

1425 # what we expected. That limits the extent to which we 

1426 # can reduce duplication between this block and the other 

1427 # ones that perform similar logic. 

1428 n, bad, result = check() 

1429 if n < 1: 

1430 raise ConflictingDefinitionError( 

1431 f"Attempted to ensure {row} exists by inserting it with ON CONFLICT IGNORE, " 

1432 f"but a post-insert query on {keys} returned no results. " 

1433 f"Insert was {'' if inserted else 'not '}reported as successful. " 

1434 "This can occur if the insert violated a database constraint other than the " 

1435 "unique constraint or primary key used to identify the row in this call." 

1436 ) 

1437 elif n > 1: 

1438 raise RuntimeError( 

1439 f"Keys passed to sync {keys.keys()} do not comprise a " 

1440 f"unique constraint for table {table.name}." 

1441 ) 

1442 elif bad: 

1443 assert compared is not None, ( 

1444 "Should not be able to get inconsistencies without comparing." 

1445 ) 

1446 if inserted: 

1447 raise RuntimeError( 

1448 f"Conflict ({bad}) in sync after successful insert; this is " 

1449 "possible if the same table is being updated by a concurrent " 

1450 "process that isn't using sync, but it may also be a bug in " 

1451 "daf_butler." 

1452 ) 

1453 elif update: 

1454 with self._transaction() as (_, connection): 

1455 connection.execute( 

1456 table.update() 

1457 .where(sqlalchemy.sql.and_(*[table.columns[k] == v for k, v in keys.items()])) 

1458 .values(**{k: compared[k] for k in bad}) 

1459 ) 

1460 inserted_or_updated = bad 

1461 else: 

1462 raise DatabaseConflictError( 

1463 f"Conflict in sync for table {table.name} on column(s) {_format_bad(bad)}." 

1464 ) 

1465 else: 

1466 inserted_or_updated = inserted 

1467 else: 

1468 # Database is not writeable; just see if the row exists. 

1469 n, bad, result = check() 

1470 if n < 1: 

1471 raise ReadOnlyDatabaseError("sync needs to insert, but database is read-only.") 

1472 elif n > 1: 

1473 raise RuntimeError("Keys passed to sync do not comprise a unique constraint.") 

1474 elif bad: 

1475 if update: 

1476 raise ReadOnlyDatabaseError("sync needs to update, but database is read-only.") 

1477 else: 

1478 raise DatabaseConflictError( 

1479 f"Conflict in sync for table {table.name} on column(s) {_format_bad(bad)}." 

1480 ) 

1481 inserted_or_updated = False 

1482 if returning is None: 

1483 return None, inserted_or_updated 

1484 else: 

1485 assert result is not None 

1486 return dict(zip(returning, result, strict=True)), inserted_or_updated 

1487 

1488 def insert( 

1489 self, 

1490 table: sqlalchemy.schema.Table, 

1491 *rows: dict, 

1492 returnIds: bool = False, 

1493 select: sqlalchemy.sql.expression.SelectBase | None = None, 

1494 names: Iterable[str] | None = None, 

1495 ) -> list[int] | None: 

1496 """Insert one or more rows into a table, optionally returning 

1497 autoincrement primary key values. 

1498 

1499 Parameters 

1500 ---------- 

1501 table : `sqlalchemy.schema.Table` 

1502 Table rows should be inserted into. 

1503 *rows : `dict` 

1504 Positional arguments are the rows to be inserted, as dictionaries 

1505 mapping column name to value. The keys in all dictionaries must 

1506 be the same. 

1507 returnIds : `bool`, optional 

1508 If `True` (`False` is default), return the values of the table's 

1509 autoincrement primary key field (which much exist). 

1510 select : `sqlalchemy.sql.SelectBase`, optional 

1511 A SELECT query expression to insert rows from. Cannot be provided 

1512 with either ``rows`` or ``returnIds=True``. 

1513 names : `~collections.abc.Iterable` [ `str` ], optional 

1514 Names of columns in ``table`` to be populated, ordered to match the 

1515 columns returned by ``select``. Ignored if ``select`` is `None`. 

1516 If not provided, the columns returned by ``select`` must be named 

1517 to match the desired columns of ``table``. 

1518 

1519 Returns 

1520 ------- 

1521 ids : `None`, or `list` of `int` 

1522 If ``returnIds`` is `True`, a `list` containing the inserted 

1523 values for the table's autoincrement primary key. 

1524 

1525 Raises 

1526 ------ 

1527 ReadOnlyDatabaseError 

1528 Raised if `isWriteable` returns `False` when this method is called. 

1529 

1530 Notes 

1531 ----- 

1532 The default implementation uses bulk insert syntax when ``returnIds`` 

1533 is `False`, and a loop over single-row insert operations when it is 

1534 `True`. 

1535 

1536 Derived classes should reimplement when they can provide a more 

1537 efficient implementation (especially for the latter case). 

1538 

1539 May be used inside transaction contexts, so implementations may not 

1540 perform operations that interrupt transactions. 

1541 """ 

1542 self.assertTableWriteable(table, f"Cannot insert into read-only table {table}.") 

1543 if select is not None and (rows or returnIds): 

1544 raise TypeError("'select' is incompatible with passing value rows or returnIds=True.") 

1545 if not rows and select is None: 

1546 if returnIds: 

1547 return [] 

1548 else: 

1549 return None 

1550 with self._transaction() as (_, connection): 

1551 if not returnIds: 

1552 if select is not None: 

1553 if names is None: 

1554 # columns() is deprecated since 1.4, but 

1555 # selected_columns() method did not exist in 1.3. 

1556 if hasattr(select, "selected_columns"): 

1557 names = select.selected_columns.keys() 

1558 else: 

1559 names = select.columns.keys() 

1560 connection.execute(table.insert().from_select(list(names), select)) 

1561 else: 

1562 connection.execute(table.insert(), rows) 

1563 return None 

1564 else: 

1565 sql = table.insert() 

1566 ids = [] 

1567 for row in rows: 

1568 key = connection.execute(sql, row).inserted_primary_key 

1569 assert key is not None 

1570 ids.append(key[0]) 

1571 return ids 

1572 

1573 @abstractmethod 

1574 def replace(self, table: sqlalchemy.schema.Table, *rows: dict) -> None: 

1575 """Insert one or more rows into a table, replacing any existing rows 

1576 for which insertion of a new row would violate the primary key 

1577 constraint. 

1578 

1579 Parameters 

1580 ---------- 

1581 table : `sqlalchemy.schema.Table` 

1582 Table rows should be inserted into. 

1583 *rows 

1584 Positional arguments are the rows to be inserted, as dictionaries 

1585 mapping column name to value. The keys in all dictionaries must 

1586 be the same. 

1587 

1588 Raises 

1589 ------ 

1590 ReadOnlyDatabaseError 

1591 Raised if `isWriteable` returns `False` when this method is called. 

1592 

1593 Notes 

1594 ----- 

1595 May be used inside transaction contexts, so implementations may not 

1596 perform operations that interrupt transactions. 

1597 

1598 Implementations should raise a `sqlalchemy.exc.IntegrityError` 

1599 exception when a constraint other than the primary key would be 

1600 violated. 

1601 

1602 Implementations are not required to support `replace` on tables 

1603 with autoincrement keys. 

1604 """ 

1605 raise NotImplementedError() 

1606 

1607 @abstractmethod 

1608 def ensure(self, table: sqlalchemy.schema.Table, *rows: dict, primary_key_only: bool = False) -> int: 

1609 """Insert one or more rows into a table, skipping any rows for which 

1610 insertion would violate a unique constraint. 

1611 

1612 Parameters 

1613 ---------- 

1614 table : `sqlalchemy.schema.Table` 

1615 Table rows should be inserted into. 

1616 *rows 

1617 Positional arguments are the rows to be inserted, as dictionaries 

1618 mapping column name to value. The keys in all dictionaries must 

1619 be the same. 

1620 primary_key_only : `bool`, optional 

1621 If `True` (`False` is default), only skip rows that violate the 

1622 primary key constraint, and raise an exception (and rollback 

1623 transactions) for other constraint violations. 

1624 

1625 Returns 

1626 ------- 

1627 count : `int` 

1628 The number of rows actually inserted. 

1629 

1630 Raises 

1631 ------ 

1632 ReadOnlyDatabaseError 

1633 Raised if `isWriteable` returns `False` when this method is called. 

1634 This is raised even if the operation would do nothing even on a 

1635 writeable database. 

1636 

1637 Notes 

1638 ----- 

1639 May be used inside transaction contexts, so implementations may not 

1640 perform operations that interrupt transactions. 

1641 

1642 Implementations are not required to support `ensure` on tables 

1643 with autoincrement keys. 

1644 """ 

1645 raise NotImplementedError() 

1646 

1647 def delete(self, table: sqlalchemy.schema.Table, columns: Iterable[str], *rows: dict) -> int: 

1648 """Delete one or more rows from a table. 

1649 

1650 Parameters 

1651 ---------- 

1652 table : `sqlalchemy.schema.Table` 

1653 Table that rows should be deleted from. 

1654 columns : `~collections.abc.Iterable` of `str` 

1655 The names of columns that will be used to constrain the rows to 

1656 be deleted; these will be combined via ``AND`` to form the 

1657 ``WHERE`` clause of the delete query. 

1658 *rows 

1659 Positional arguments are the keys of rows to be deleted, as 

1660 dictionaries mapping column name to value. The keys in all 

1661 dictionaries must be exactly the names in ``columns``. 

1662 

1663 Returns 

1664 ------- 

1665 count : `int` 

1666 Number of rows deleted. 

1667 

1668 Raises 

1669 ------ 

1670 ReadOnlyDatabaseError 

1671 Raised if `isWriteable` returns `False` when this method is called. 

1672 

1673 Notes 

1674 ----- 

1675 May be used inside transaction contexts, so implementations may not 

1676 perform operations that interrupt transactions. 

1677 

1678 The default implementation should be sufficient for most derived 

1679 classes. 

1680 """ 

1681 self.assertTableWriteable(table, f"Cannot delete from read-only table {table}.") 

1682 if columns and not rows: 

1683 # If there are no columns, this operation is supposed to delete 

1684 # everything (so we proceed as usual). But if there are columns, 

1685 # but no rows, it was a constrained bulk operation where the 

1686 # constraint is that no rows match, and we should short-circuit 

1687 # while reporting that no rows were affected. 

1688 return 0 

1689 sql = table.delete() 

1690 columns = list(columns) # Force iterators to list 

1691 

1692 # More efficient to use IN operator if there is only one 

1693 # variable changing across all rows. 

1694 content: dict[str, set] = defaultdict(set) 

1695 if len(columns) == 1: 

1696 # Nothing to calculate since we can always use IN 

1697 column = columns[0] 

1698 changing_columns = [column] 

1699 content[column] = {row[column] for row in rows} 

1700 else: 

1701 for row in rows: 

1702 for k, v in row.items(): 

1703 content[k].add(v) 

1704 changing_columns = [col for col, values in content.items() if len(values) > 1] 

1705 

1706 if len(changing_columns) != 1: 

1707 # More than one column changes each time so do explicit bind 

1708 # parameters and have each row processed separately. 

1709 whereTerms = [table.columns[name] == sqlalchemy.sql.bindparam(name) for name in columns] 

1710 if whereTerms: 

1711 sql = sql.where(sqlalchemy.sql.and_(*whereTerms)) 

1712 with self._transaction() as (_, connection): 

1713 return connection.execute(sql, rows).rowcount 

1714 else: 

1715 # One of the columns has changing values but any others are 

1716 # fixed. In this case we can use an IN operator and be more 

1717 # efficient. 

1718 name = changing_columns.pop() 

1719 

1720 # Simple where clause for the unchanging columns 

1721 clauses = [] 

1722 for k, v in content.items(): 

1723 if k == name: 

1724 continue 

1725 # The set only has one element 

1726 clauses.append(table.columns[k] == v.pop()) 

1727 

1728 # The IN operator will not work for "infinite" numbers of 

1729 # rows so must batch it up into distinct calls. 

1730 in_content = list(content[name]) 

1731 n_elements = len(in_content) 

1732 

1733 rowcount = 0 

1734 iposn = 0 

1735 n_per_loop = 1_000 # Controls how many items to put in IN clause 

1736 with self._transaction() as (_, connection): 

1737 for iposn in range(0, n_elements, n_per_loop): 

1738 endpos = iposn + n_per_loop 

1739 in_clause = table.columns[name].in_(in_content[iposn:endpos]) 

1740 

1741 newsql = sql.where(sqlalchemy.sql.and_(*clauses, in_clause)) 

1742 rowcount += connection.execute(newsql).rowcount 

1743 return rowcount 

1744 

1745 def deleteWhere(self, table: sqlalchemy.schema.Table, where: sqlalchemy.sql.ColumnElement) -> int: 

1746 """Delete rows from a table with pre-constructed WHERE clause. 

1747 

1748 Parameters 

1749 ---------- 

1750 table : `sqlalchemy.schema.Table` 

1751 Table that rows should be deleted from. 

1752 where : `sqlalchemy.sql.ClauseElement` 

1753 The names of columns that will be used to constrain the rows to 

1754 be deleted; these will be combined via ``AND`` to form the 

1755 ``WHERE`` clause of the delete query. 

1756 

1757 Returns 

1758 ------- 

1759 count : `int` 

1760 Number of rows deleted. 

1761 

1762 Raises 

1763 ------ 

1764 ReadOnlyDatabaseError 

1765 Raised if `isWriteable` returns `False` when this method is called. 

1766 

1767 Notes 

1768 ----- 

1769 May be used inside transaction contexts, so implementations may not 

1770 perform operations that interrupt transactions. 

1771 

1772 The default implementation should be sufficient for most derived 

1773 classes. 

1774 """ 

1775 self.assertTableWriteable(table, f"Cannot delete from read-only table {table}.") 

1776 

1777 sql = table.delete().where(where) 

1778 with self._transaction() as (_, connection): 

1779 return connection.execute(sql).rowcount 

1780 

1781 def update(self, table: sqlalchemy.schema.Table, where: dict[str, str], *rows: dict) -> int: 

1782 """Update one or more rows in a table. 

1783 

1784 Parameters 

1785 ---------- 

1786 table : `sqlalchemy.schema.Table` 

1787 Table containing the rows to be updated. 

1788 where : `dict` [`str`, `str`] 

1789 A mapping from the names of columns that will be used to search for 

1790 existing rows to the keys that will hold these values in the 

1791 ``rows`` dictionaries. Note that these may not be the same due to 

1792 SQLAlchemy limitations. 

1793 *rows 

1794 Positional arguments are the rows to be updated. The keys in all 

1795 dictionaries must be the same, and may correspond to either a 

1796 value in the ``where`` dictionary or the name of a column to be 

1797 updated. 

1798 

1799 Returns 

1800 ------- 

1801 count : `int` 

1802 Number of rows matched (regardless of whether the update actually 

1803 modified them). 

1804 

1805 Raises 

1806 ------ 

1807 ReadOnlyDatabaseError 

1808 Raised if `isWriteable` returns `False` when this method is called. 

1809 

1810 Notes 

1811 ----- 

1812 May be used inside transaction contexts, so implementations may not 

1813 perform operations that interrupt transactions. 

1814 

1815 The default implementation should be sufficient for most derived 

1816 classes. 

1817 """ 

1818 self.assertTableWriteable(table, f"Cannot update read-only table {table}.") 

1819 if not rows: 

1820 return 0 

1821 sql = table.update().where( 

1822 sqlalchemy.sql.and_(*[table.columns[k] == sqlalchemy.sql.bindparam(v) for k, v in where.items()]) 

1823 ) 

1824 with self._transaction() as (_, connection): 

1825 return connection.execute(sql, rows).rowcount 

1826 

1827 @contextmanager 

1828 def query( 

1829 self, 

1830 sql: sqlalchemy.sql.expression.Executable | sqlalchemy.sql.expression.SelectBase, 

1831 *args: Any, 

1832 **kwargs: Any, 

1833 ) -> Iterator[sqlalchemy.engine.CursorResult]: 

1834 """Run a SELECT query against the database. 

1835 

1836 Parameters 

1837 ---------- 

1838 sql : `sqlalchemy.sql.expression.SelectBase` 

1839 A SQLAlchemy representation of a ``SELECT`` query. 

1840 *args 

1841 Additional positional arguments are forwarded to 

1842 `sqlalchemy.engine.Connection.execute`. 

1843 **kwargs 

1844 Additional keyword arguments are forwarded to 

1845 `sqlalchemy.engine.Connection.execute`. 

1846 

1847 Returns 

1848 ------- 

1849 result_context : `sqlalchemy.engine.CursorResults` 

1850 Context manager that returns the query result object when entered. 

1851 These results are invalidated when the context is exited. 

1852 """ 

1853 if self._session_connection is None: 

1854 connection = self._engine.connect() 

1855 else: 

1856 connection = self._session_connection 

1857 try: 

1858 self._log_query_and_plan(connection, sql) 

1859 # TODO: SelectBase is not good for execute(), but it used 

1860 # everywhere, e.g. in daf_relation. We should switch to Executable 

1861 # at some point. 

1862 with connection.execute( 

1863 cast(sqlalchemy.sql.expression.Executable, sql), *args, **kwargs 

1864 ) as result: 

1865 yield result 

1866 finally: 

1867 if connection is not self._session_connection: 

1868 connection.close() 

1869 

1870 def _log_query_and_plan( 

1871 self, 

1872 connection: sqlalchemy.Connection, 

1873 sql: sqlalchemy.sql.expression.Executable | sqlalchemy.sql.expression.SelectBase, 

1874 ) -> None: 

1875 """Log the given SQL statement and the DB's plan for executing it if 

1876 the environment variable DAF_BUTLER_DEBUG_QUERIES is set to a truthy 

1877 value. 

1878 """ 

1879 if os.environ.get("DAF_BUTLER_DEBUG_QUERIES", False): 

1880 assert isinstance(sql, sqlalchemy.SelectBase) 

1881 compiled = sql.compile(connection) 

1882 print( 

1883 f"Executing SQL statement:\n{compiled}\nBind parameters: {compiled.params}", file=sys.stderr 

1884 ) 

1885 

1886 query_plan = get_query_plan(connection, sql) 

1887 print(f"Query plan:\n{query_plan}", file=sys.stderr) 

1888 

1889 @abstractmethod 

1890 def constant_rows( 

1891 self, 

1892 fields: NamedValueAbstractSet[ddl.FieldSpec], 

1893 *rows: dict, 

1894 name: str | None = None, 

1895 ) -> sqlalchemy.sql.FromClause: 

1896 """Return a SQLAlchemy object that represents a small number of 

1897 constant-valued rows. 

1898 

1899 Parameters 

1900 ---------- 

1901 fields : `NamedValueAbstractSet` [ `ddl.FieldSpec` ] 

1902 The columns of the rows. Unique and foreign key constraints are 

1903 ignored. 

1904 *rows : `dict` 

1905 Values for the rows. 

1906 name : `str`, optional 

1907 If provided, the name of the SQL construct. If not provided, an 

1908 opaque but unique identifier is generated. 

1909 

1910 Returns 

1911 ------- 

1912 from_clause : `sqlalchemy.sql.FromClause` 

1913 SQLAlchemy object representing the given rows. This is guaranteed 

1914 to be something that can be directly joined into a ``SELECT`` 

1915 query's ``FROM`` clause, and will not involve a temporary table 

1916 that needs to be cleaned up later. 

1917 

1918 Notes 

1919 ----- 

1920 The default implementation uses the SQL-standard ``VALUES`` construct, 

1921 but support for that construct is varied enough across popular RDBMSs 

1922 that the method is still marked abstract to force explicit opt-in via 

1923 delegation to `super`. 

1924 """ 

1925 if name is None: 

1926 name = f"tmp_{uuid.uuid4().hex}" 

1927 return sqlalchemy.sql.values( 

1928 *[sqlalchemy.Column(field.name, field.getSizedColumnType()) for field in fields], 

1929 name=name, 

1930 ).data([tuple(row[name] for name in fields.names) for row in rows]) 

1931 

1932 def get_constant_rows_max(self) -> int: 

1933 """Return the maximum number of rows that should be passed to 

1934 `constant_rows` for this backend. 

1935 

1936 Returns 

1937 ------- 

1938 max : `int` 

1939 Maximum number of rows. 

1940 

1941 Notes 

1942 ----- 

1943 This should reflect typical performance profiles (or a guess at these), 

1944 not just hard database engine limits. 

1945 """ 

1946 return 1000 

1947 

1948 @abstractmethod 

1949 def glob_expression( 

1950 self, expression: sqlalchemy.ColumnElement[Any], pattern: str 

1951 ) -> sqlalchemy.ColumnElement[bool]: 

1952 """Produce boolean expression for expression match against glob-like 

1953 pattern. 

1954 

1955 Parameters 

1956 ---------- 

1957 expression : `sqlalchemy.ColumnElement` 

1958 Column expression that evaluates to string. 

1959 pattern : `str` 

1960 GLob pattern string. 

1961 

1962 Returns 

1963 ------- 

1964 glob : `sqlalchemy.ColumnElement` 

1965 Boolean expression that matches ``expression`` against ``pattern``. 

1966 """ 

1967 raise NotImplementedError() 

1968 

1969 @property 

1970 @abstractmethod 

1971 def has_distinct_on(self) -> bool: 

1972 """Whether this database supports the ``DISTINCT ON`` SQL construct.""" 

1973 raise NotImplementedError() 

1974 

1975 @property 

1976 @abstractmethod 

1977 def has_any_aggregate(self) -> bool: 

1978 """Whether this database supports the ``ANY_VALUE`` aggregate function 

1979 or something equivalent. 

1980 """ 

1981 raise NotImplementedError() 

1982 

1983 @property 

1984 def supports_temporary_tables(self) -> bool: 

1985 return self._allow_temporary_tables 

1986 

1987 @abstractmethod 

1988 def apply_any_aggregate(self, column: sqlalchemy.ColumnElement[Any]) -> sqlalchemy.ColumnElement[Any]: 

1989 """Wrap the given SQLAlchemy column in the ``ANY_VALUE`` aggregate 

1990 function or its equivalent. 

1991 

1992 Parameters 

1993 ---------- 

1994 column : `sqlalchemy.ColumnElement` 

1995 Original column to wrap. 

1996 

1997 Returns 

1998 ------- 

1999 wrapped : `sqlalchemy.ColumnElement` 

2000 A column element of the same SQL type that can appear in the 

2001 ``SELECT`` clause even when this column does not appear in the 

2002 ``GROUP BY`` clause. 

2003 

2004 Notes 

2005 ----- 

2006 This method's behavior is unspecified when `has_any_aggregate` is 

2007 `False`; the caller is responsible for checking that property first. 

2008 """ 

2009 raise NotImplementedError() 

2010 

2011 origin: int 

2012 """An integer ID that should be used as the default for any datasets, 

2013 quanta, or other entities that use a (autoincrement, origin) compound 

2014 primary key (`int`). 

2015 """ 

2016 

2017 namespace: str | None 

2018 """The schema or namespace this database instance is associated with 

2019 (`str` or `None`). 

2020 """ 

2021 

2022 name_shrinker: NameShrinker 

2023 """An object that can be used to shrink field names to fit within the 

2024 identifier limit of the database engine (`NameShrinker`). 

2025 """ 

2026 

2027 

2028class DatabaseMetadata: 

2029 """Wrapper around SqlAlchemy MetaData object to ensure threadsafety. 

2030 

2031 Parameters 

2032 ---------- 

2033 namespace : `str` or `None` 

2034 Name of the schema or namespace this instance is associated with. 

2035 

2036 Notes 

2037 ----- 

2038 `sqlalchemy.MetaData` is documented to be threadsafe for reads, but not 

2039 with concurrent modifications. We add tables dynamically at runtime, 

2040 and the MetaData object is shared by all Database instances sharing 

2041 the same connection pool. 

2042 """ 

2043 

2044 def __init__(self, namespace: str | None) -> None: 

2045 self._lock = Lock() 

2046 self._metadata = sqlalchemy.MetaData(schema=namespace) 

2047 self._tables: dict[str, sqlalchemy.Table] = {} 

2048 

2049 def add_table( 

2050 self, db: Database, name: str, spec: ddl.TableSpec, **kwargs: Any 

2051 ) -> sqlalchemy.schema.Table: 

2052 """Add a new table to the MetaData object, returning its sqlalchemy 

2053 representation. This does not physically create the table in the 

2054 database -- it only sets up its definition. 

2055 

2056 Parameters 

2057 ---------- 

2058 db : `Database` 

2059 Database connection associated with the table definition. 

2060 name : `str` 

2061 The name of the table. 

2062 spec : `ddl.TableSpec` 

2063 The specification of the table. 

2064 **kwargs 

2065 Additional keyword arguments to forward to the 

2066 `sqlalchemy.schema.Table` constructor. 

2067 

2068 Returns 

2069 ------- 

2070 table : `sqlalchemy.schema.Table` 

2071 The created table. 

2072 """ 

2073 with self._lock: 

2074 if (table := self._tables.get(name)) is not None: 

2075 return table 

2076 

2077 table = db._convertTableSpec(name, spec, self._metadata, **kwargs) 

2078 for foreignKeySpec in spec.foreignKeys: 

2079 table.append_constraint(db._convertForeignKeySpec(name, foreignKeySpec, self._metadata)) 

2080 

2081 self._tables[name] = table 

2082 return table 

2083 

2084 def get_table(self, name: str) -> sqlalchemy.schema.Table | None: 

2085 """Return the definition of a table that was previously added to this 

2086 MetaData object. 

2087 

2088 Parameters 

2089 ---------- 

2090 name : `str` 

2091 Name of the table. 

2092 

2093 Returns 

2094 ------- 

2095 table : `sqlalchemy.schema.Table` or `None` 

2096 The table definition, or `None` if the table is not known to this 

2097 MetaData instance. 

2098 """ 

2099 with self._lock: 

2100 return self._tables.get(name) 

2101 

2102 def remove_table(self, name: str) -> None: 

2103 """Remove a table that was previously added to this MetaData object. 

2104 

2105 Parameters 

2106 ---------- 

2107 name : `str` 

2108 Name of the table. 

2109 """ 

2110 with self._lock: 

2111 table = self._tables.pop(name, None) 

2112 if table is not None: 

2113 self._metadata.remove(table) 

2114 

2115 def create_all(self, connection: sqlalchemy.engine.Connection) -> None: 

2116 """Create all tables known to this MetaData object in the database. 

2117 Same as `sqlalchemy.MetaData.create_all`. 

2118 

2119 Parameters 

2120 ---------- 

2121 connection : `sqlalchemy.engine.connection` 

2122 Database connection that will be used to create tables. 

2123 """ 

2124 with self._lock: 

2125 self._metadata.create_all(connection)