Coverage for python / lsst / daf / butler / registry / interfaces / _database.py: 21%
462 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 08:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 08:49 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
27from __future__ import annotations
29from ... import ddl, time_utils
31__all__ = [
32 "Database",
33 "DatabaseConflictError",
34 "DatabaseInsertMode",
35 "DatabaseMetadata",
36 "ReadOnlyDatabaseError",
37 "SchemaAlreadyDefinedError",
38 "StaticTablesContext",
39]
41import enum
42import os
43import sys
44import uuid
45import warnings
46from abc import ABC, abstractmethod
47from collections import defaultdict
48from collections.abc import Callable, Iterable, Iterator, Sequence
49from contextlib import contextmanager
50from threading import Lock
51from typing import Any, cast, final
53import astropy.time
54import sqlalchemy
56from ..._named import NamedValueAbstractSet
57from ...name_shrinker import NameShrinker
58from ...timespan_database_representation import TimespanDatabaseRepresentation
59from .._exceptions import ConflictingDefinitionError
60from ._database_explain import get_query_plan
63class DatabaseInsertMode(enum.Enum):
64 """Mode options available for inserting database records."""
66 INSERT = enum.auto()
67 """Insert records, failing if they already exist."""
69 REPLACE = enum.auto()
70 """Replace records, overwriting existing."""
72 ENSURE = enum.auto()
73 """Insert records, skipping any that already exist."""
76# TODO: method is called with list[ReflectedColumn] in SA 2, and
77# ReflectedColumn does not exist in 1.4.
78def _checkExistingTableDefinition(name: str, spec: ddl.TableSpec, inspection: list) -> None:
79 """Test that the definition of a table in a `ddl.TableSpec` and from
80 database introspection are consistent.
82 Parameters
83 ----------
84 name : `str`
85 Name of the table (only used in error messages).
86 spec : `ddl.TableSpec`
87 Specification of the table.
88 inspection : `dict`
89 Dictionary returned by
90 `sqlalchemy.engine.reflection.Inspector.get_columns`.
92 Raises
93 ------
94 DatabaseConflictError
95 Raised if the definitions are inconsistent.
96 """
97 columnNames = [c["name"] for c in inspection]
98 if spec.fields.names != set(columnNames):
99 raise DatabaseConflictError(
100 f"Table '{name}' exists but is defined differently in the database; "
101 f"specification has columns {list(spec.fields.names)}, while the "
102 f"table in the database has {columnNames}."
103 )
106class ReadOnlyDatabaseError(RuntimeError):
107 """Exception raised when a write operation is called on a read-only
108 `Database`.
109 """
112class DatabaseConflictError(ConflictingDefinitionError):
113 """Exception raised when database content (row values or schema entities)
114 are inconsistent with what this client expects.
115 """
118class SchemaAlreadyDefinedError(RuntimeError):
119 """Exception raised when trying to initialize database schema when some
120 tables already exist.
121 """
124class StaticTablesContext:
125 """Helper class used to declare the static schema for a registry layer
126 in a database.
128 An instance of this class is returned by `Database.declareStaticTables`,
129 which should be the only way it should be constructed.
131 Parameters
132 ----------
133 db : `Database`
134 The database.
135 connection : `sqlalchemy.engine.Connection`
136 The connection object.
137 """
139 def __init__(self, db: Database, connection: sqlalchemy.engine.Connection):
140 self._db = db
141 self._inspector = sqlalchemy.inspect(connection)
142 self._tableNames = frozenset(self._inspector.get_table_names(schema=self._db.namespace))
143 self._initializers: list[Callable[[Database], None]] = []
145 def addTable(self, name: str, spec: ddl.TableSpec) -> sqlalchemy.schema.Table:
146 """Add a new table to the schema, returning its sqlalchemy
147 representation.
149 Parameters
150 ----------
151 name : `str`
152 The name of the table.
153 spec : `ddl.TableSpec`
154 The specification of the table.
156 Returns
157 -------
158 table : `sqlalchemy.schema.Table`
159 The created table.
161 Notes
162 -----
163 The new table may not actually be created until the end of the
164 context created by `Database.declareStaticTables`, allowing tables
165 to be declared in any order even in the presence of foreign key
166 relationships.
167 """
168 metadata = self._db._metadata
169 assert metadata is not None, "Guaranteed by context manager that returns this object."
170 return metadata.add_table(self._db, name, spec)
172 def addTableTuple(self, specs: tuple[ddl.TableSpec, ...]) -> tuple[sqlalchemy.schema.Table, ...]:
173 """Add a named tuple of tables to the schema, returning their
174 SQLAlchemy representations in a named tuple of the same type.
176 The new tables may not actually be created until the end of the
177 context created by `Database.declareStaticTables`, allowing tables
178 to be declared in any order even in the presence of foreign key
179 relationships.
181 Parameters
182 ----------
183 specs : `tuple` of `ddl.TableSpec`
184 Specifications of multiple tables.
186 Returns
187 -------
188 tables : `tuple` of `sqlalchemy.schema.Table`
189 All the tables created.
191 Notes
192 -----
193 ``specs`` *must* be an instance of a type created by
194 `collections.namedtuple`, not just regular tuple, and the returned
195 object is guaranteed to be the same. Because `~collections.namedtuple`
196 is just a factory for `type` objects, not an actual type itself,
197 we cannot represent this with type annotations.
198 """
199 return specs._make( # type: ignore
200 self.addTable(name, spec)
201 for name, spec in zip(specs._fields, specs, strict=True) # type: ignore
202 )
204 def addInitializer(self, initializer: Callable[[Database], None]) -> None:
205 """Add a method that does one-time initialization of a database.
207 Initialization can mean anything that changes state of a database
208 and needs to be done exactly once after database schema was created.
209 An example for that could be population of schema attributes.
211 Parameters
212 ----------
213 initializer : `~collections.abc.Callable`
214 Method of a single argument which is a `Database` instance.
215 """
216 self._initializers.append(initializer)
219class Database(ABC):
220 """An abstract interface that represents a particular database engine's
221 representation of a single schema/namespace/database.
223 Parameters
224 ----------
225 origin : `int`
226 An integer ID that should be used as the default for any datasets,
227 quanta, or other entities that use a (autoincrement, origin) compound
228 primary key.
229 engine : `sqlalchemy.engine.Engine`
230 The SQLAlchemy engine for this `Database`.
231 namespace : `str`, optional
232 Name of the schema or namespace this instance is associated with.
233 This is passed as the ``schema`` argument when constructing a
234 `sqlalchemy.schema.MetaData` instance. We use ``namespace`` instead to
235 avoid confusion between "schema means namespace" and "schema means
236 table definitions".
237 metadata : `sqlalchemy.schema.MetaData`, optional
238 Object representing the tables and other schema entities. If not
239 provided, will be generated during the next call to
240 ``declareStaticTables``.
241 allow_temporary_tables : `bool`, optional
242 If `True`, database operations will be allowed to use temporary tables.
243 If `False`, other SQL constructs will be used instead of temporary
244 tables when possible.
246 Notes
247 -----
248 `Database` requires all write operations to go through its special named
249 methods. Our write patterns are sufficiently simple that we don't really
250 need the full flexibility of SQL insert/update/delete syntax, and we need
251 non-standard (but common) functionality in these operations sufficiently
252 often that it seems worthwhile to provide our own generic API.
254 In contrast, `Database.query` allows arbitrary ``SELECT`` queries (via
255 their SQLAlchemy representation) to be run, as we expect these to require
256 significantly more sophistication while still being limited to standard
257 SQL.
259 `Database` itself has several underscore-prefixed attributes:
261 - ``_engine``: SQLAlchemy object representing its engine.
262 - ``_connection``: method returning a context manager for
263 `sqlalchemy.engine.Connection` object.
264 - ``_metadata``: the `sqlalchemy.schema.MetaData` object representing
265 the tables and other schema entities.
267 These are considered protected (derived classes may access them, but other
268 code should not), and read-only, aside from executing SQL via
269 ``_connection``.
270 """
272 def __init__(
273 self,
274 *,
275 origin: int,
276 engine: sqlalchemy.engine.Engine,
277 namespace: str | None = None,
278 metadata: DatabaseMetadata | None = None,
279 allow_temporary_tables: bool = True,
280 ):
281 self.origin = origin
282 self.name_shrinker = NameShrinker(engine.dialect.max_identifier_length)
283 self.namespace = namespace
284 self._engine = engine
285 self._session_connection: sqlalchemy.engine.Connection | None = None
286 self._temp_tables: set[str] = set()
287 self._metadata = metadata
288 self._allow_temporary_tables = allow_temporary_tables
290 def dispose(self) -> None:
291 """Close all open database connections held by this `Database`
292 instance.
293 """
294 self._engine.dispose()
296 def __repr__(self) -> str:
297 # Rather than try to reproduce all the parameters used to create
298 # the object, instead report the more useful information of the
299 # connection URL.
300 if self._engine.url.password is not None:
301 uri = str(self._engine.url.set(password="***"))
302 else:
303 uri = str(self._engine.url)
304 if self.namespace:
305 uri += f"#{self.namespace}"
306 return f'{type(self).__name__}("{uri}")'
308 @classmethod
309 def makeDefaultUri(cls, root: str) -> str | None:
310 """Create a default connection URI appropriate for the given root
311 directory, or `None` if there can be no such default.
313 Parameters
314 ----------
315 root : `str`
316 Root string to use to build connection URI.
318 Returns
319 -------
320 uri : `str` or `None`
321 The URI string or `None`.
322 """
323 return None
325 @classmethod
326 def fromUri(
327 cls,
328 uri: str | sqlalchemy.engine.URL,
329 *,
330 origin: int,
331 namespace: str | None = None,
332 writeable: bool = True,
333 allow_temporary_tables: bool = True,
334 ) -> Database:
335 """Construct a database from a SQLAlchemy URI.
337 Parameters
338 ----------
339 uri : `str` or `sqlalchemy.engine.URL`
340 A SQLAlchemy URI connection string.
341 origin : `int`
342 An integer ID that should be used as the default for any datasets,
343 quanta, or other entities that use a (autoincrement, origin)
344 compound primary key.
345 namespace : `str`, optional
346 A database namespace (i.e. schema) the new instance should be
347 associated with. If `None` (default), the namespace (if any) is
348 inferred from the URI.
349 writeable : `bool`, optional
350 If `True`, allow write operations on the database, including
351 ``CREATE TABLE``.
352 allow_temporary_tables : `bool`, optional
353 If `True`, database operations will be allowed to use temporary
354 tables.
355 If `False`, other SQL constructs will be used instead of temporary
356 tables when possible.
358 Returns
359 -------
360 db : `Database`
361 A new `Database` instance.
362 """
363 db = cls.fromEngine(
364 cls.makeEngine(uri, writeable=writeable), origin=origin, namespace=namespace, writeable=writeable
365 )
366 db._allow_temporary_tables = allow_temporary_tables
367 return db
369 @abstractmethod
370 def clone(self) -> Database:
371 """Make an independent copy of this `Database` object.
373 Returns
374 -------
375 db : `Database`
376 A new `Database` instance with the same configuration as this
377 instance.
378 """
379 raise NotImplementedError()
381 @classmethod
382 @abstractmethod
383 def makeEngine(
384 cls, uri: str | sqlalchemy.engine.URL, *, writeable: bool = True
385 ) -> sqlalchemy.engine.Engine:
386 """Create a `sqlalchemy.engine.Engine` from a SQLAlchemy URI.
388 Parameters
389 ----------
390 uri : `str` or `sqlalchemy.engine.URL`
391 A SQLAlchemy URI connection string.
392 writeable : `bool`, optional
393 If `True`, allow write operations on the database, including
394 ``CREATE TABLE``.
396 Returns
397 -------
398 engine : `sqlalchemy.engine.Engine`
399 A database engine.
401 Notes
402 -----
403 Subclasses that support other ways to connect to a database are
404 encouraged to add optional arguments to their implementation of this
405 method, as long as they maintain compatibility with the base class
406 call signature.
407 """
408 raise NotImplementedError()
410 @classmethod
411 @abstractmethod
412 def fromEngine(
413 cls,
414 engine: sqlalchemy.engine.Engine,
415 *,
416 origin: int,
417 namespace: str | None = None,
418 writeable: bool = True,
419 ) -> Database:
420 """Create a new `Database` from an existing `sqlalchemy.engine.Engine`.
422 Parameters
423 ----------
424 engine : `sqlalchemy.engine.Engine`
425 The engine for the database. May be shared between `Database`
426 instances.
427 origin : `int`
428 An integer ID that should be used as the default for any datasets,
429 quanta, or other entities that use a (autoincrement, origin)
430 compound primary key.
431 namespace : `str`, optional
432 A different database namespace (i.e. schema) the new instance
433 should be associated with. If `None` (default), the namespace
434 (if any) is inferred from the connection.
435 writeable : `bool`, optional
436 If `True`, allow write operations on the database, including
437 ``CREATE TABLE``.
439 Returns
440 -------
441 db : `Database`
442 A new `Database` instance.
444 Notes
445 -----
446 This method allows different `Database` instances to share the same
447 engine, which is desirable when they represent different namespaces
448 can be queried together.
449 """
450 raise NotImplementedError()
452 @final
453 @contextmanager
454 def session(self) -> Iterator[None]:
455 """Return a context manager that represents a session (persistent
456 connection to a database).
458 Returns
459 -------
460 context : `AbstractContextManager` [ `None` ]
461 A context manager that does not return a value when entered.
463 Notes
464 -----
465 This method should be used when a sequence of read-only SQL operations
466 will be performed in rapid succession *without* a requirement that they
467 yield consistent results in the presence of concurrent writes (or, more
468 rarely, when conflicting concurrent writes are rare/impossible and the
469 session will be open long enough that a transaction is inadvisable).
470 """
471 with self._session():
472 yield
474 @final
475 @contextmanager
476 def transaction(
477 self,
478 *,
479 interrupting: bool = False,
480 savepoint: bool = False,
481 lock: Iterable[sqlalchemy.schema.Table] = (),
482 for_temp_tables: bool = False,
483 ) -> Iterator[None]:
484 """Return a context manager that represents a transaction.
486 Parameters
487 ----------
488 interrupting : `bool`, optional
489 If `True` (`False` is default), this transaction block may not be
490 nested within an outer one, and attempting to do so is a logic
491 (i.e. assertion) error.
492 savepoint : `bool`, optional
493 If `True` (`False` is default), create a ``SAVEPOINT``, allowing
494 exceptions raised by the database (e.g. due to constraint
495 violations) during this transaction's context to be caught outside
496 it without also rolling back all operations in an outer transaction
497 block. If `False`, transactions may still be nested, but a
498 rollback may be generated at any level and affects all levels, and
499 commits are deferred until the outermost block completes. If any
500 outer transaction block was created with ``savepoint=True``, all
501 inner blocks will be as well (regardless of the actual value
502 passed). This has no effect if this is the outermost transaction.
503 lock : `~collections.abc.Iterable` [ `sqlalchemy.schema.Table` ], \
504 optional
505 A list of tables to lock for the duration of this transaction.
506 These locks are guaranteed to prevent concurrent writes and allow
507 this transaction (only) to acquire the same locks (others should
508 block), but only prevent concurrent reads if the database engine
509 requires that in order to block concurrent writes.
510 for_temp_tables : `bool`, optional
511 If `True`, this transaction may involve creating temporary tables.
513 Returns
514 -------
515 context : `AbstractContextManager` [ `None` ]
516 A context manager that commits the transaction when it is exited
517 without error and rolls back the transactoin when it is exited via
518 an exception.
520 Notes
521 -----
522 All transactions on a connection managed by one or more `Database`
523 instances _must_ go through this method, or transaction state will not
524 be correctly managed.
525 """
526 with self._transaction(
527 interrupting=interrupting, savepoint=savepoint, lock=lock, for_temp_tables=for_temp_tables
528 ):
529 yield
531 @contextmanager
532 def temporary_table(
533 self, spec: ddl.TableSpec, name: str | None = None
534 ) -> Iterator[sqlalchemy.schema.Table]:
535 """Return a context manager that creates and then drops a temporary
536 table.
538 Parameters
539 ----------
540 spec : `ddl.TableSpec`
541 Specification for the columns. Unique and foreign key constraints
542 may be ignored.
543 name : `str`, optional
544 If provided, the name of the SQL construct. If not provided, an
545 opaque but unique identifier is generated.
547 Returns
548 -------
549 context : `AbstractContextManager` [ `sqlalchemy.schema.Table` ]
550 A context manager that returns a SQLAlchemy representation of the
551 temporary table when entered.
553 Notes
554 -----
555 Temporary tables may be created, dropped, and written to even in
556 read-only databases - at least according to the Python-level
557 protections in the `Database` classes. Server permissions may say
558 otherwise, but in that case they probably need to be modified to
559 support the full range of expected read-only butler behavior.
560 """
561 assert self._metadata is not None, "Static tables must be created before temporary tables"
562 if not self.supports_temporary_tables:
563 raise ReadOnlyDatabaseError("Creation of temporary tables is not supported by this database.")
564 with self._session() as connection:
565 table = self._make_temporary_table(connection, spec=spec, name=name)
566 self._temp_tables.add(table.key)
567 try:
568 yield table
569 finally:
570 self._temp_tables.remove(table.key)
571 self._metadata.remove_table(table.name)
573 @contextmanager
574 def _session(self) -> Iterator[sqlalchemy.engine.Connection]:
575 """Protected implementation for `session` that actually returns the
576 connection.
578 This method is for internal `Database` calls that need the actual
579 SQLAlchemy connection object. It should be overridden by subclasses
580 instead of `session` itself.
582 Returns
583 -------
584 context : `AbstractContextManager` [ `sqlalchemy.engine.Connection` ]
585 A context manager that returns a SQLALchemy connection when
586 entered.
588 """
589 if self._session_connection is not None:
590 # session already started, just reuse that
591 yield self._session_connection
592 else:
593 try:
594 # open new connection and close it when done
595 self._session_connection = self._engine.connect()
596 yield self._session_connection
597 finally:
598 if self._session_connection is not None:
599 self._session_connection.close()
600 self._session_connection = None
601 # Temporary tables only live within session
602 self._temp_tables = set()
604 @contextmanager
605 def _transaction(
606 self,
607 *,
608 interrupting: bool = False,
609 savepoint: bool = False,
610 lock: Iterable[sqlalchemy.schema.Table] = (),
611 for_temp_tables: bool = False,
612 ) -> Iterator[tuple[bool, sqlalchemy.engine.Connection]]:
613 """Protected implementation for `transaction` that actually returns the
614 connection and whether this is a new outermost transaction.
616 This method is for internal `Database` calls that need the actual
617 SQLAlchemy connection object. It should be overridden by subclasses
618 instead of `transaction` itself.
620 Parameters
621 ----------
622 interrupting : `bool`, optional
623 If `True` (`False` is default), this transaction block may not be
624 nested without an outer one, and attempting to do so is a logic
625 (i.e. assertion) error.
626 savepoint : `bool`, optional
627 If `True` (`False` is default), create a ``SAVEPOINT``, allowing
628 exceptions raised by the database (e.g. due to constraint
629 violations) during this transaction's context to be caught outside
630 it without also rolling back all operations in an outer transaction
631 block. If `False`, transactions may still be nested, but a
632 rollback may be generated at any level and affects all levels, and
633 commits are deferred until the outermost block completes. If any
634 outer transaction block was created with ``savepoint=True``, all
635 inner blocks will be as well (regardless of the actual value
636 passed). This has no effect if this is the outermost transaction.
637 lock : `~collections.abc.Iterable` [ `sqlalchemy.schema.Table` ], \
638 optional
639 A list of tables to lock for the duration of this transaction.
640 These locks are guaranteed to prevent concurrent writes and allow
641 this transaction (only) to acquire the same locks (others should
642 block), but only prevent concurrent reads if the database engine
643 requires that in order to block concurrent writes.
644 for_temp_tables : `bool`, optional
645 If `True`, this transaction may involve creating temporary tables.
647 Returns
648 -------
649 context : `AbstractContextManager` [ `tuple` [ `bool`,
650 `sqlalchemy.engine.Connection` ] ]
651 A context manager that commits the transaction when it is exited
652 without error and rolls back the transactoin when it is exited via
653 an exception. When entered, it returns a tuple of:
655 - ``is_new`` (`bool`): whether this is a new (outermost)
656 transaction;
657 - ``connection`` (`sqlalchemy.engine.Connection`): the connection.
658 """
659 with self._session() as connection:
660 already_in_transaction = self.isInTransaction()
661 assert not (interrupting and already_in_transaction), (
662 "Logic error in transaction nesting: an operation that would "
663 "interrupt the active transaction context has been requested."
664 )
665 savepoint = savepoint or connection.in_nested_transaction()
666 trans: sqlalchemy.engine.Transaction | None
667 if already_in_transaction:
668 if savepoint:
669 trans = connection.begin_nested()
670 else:
671 # Nested non-savepoint transactions don't do anything.
672 trans = None
673 else:
674 # Use a regular (non-savepoint) transaction always for the
675 # outermost context.
676 trans = connection.begin()
677 self._lockTables(connection, lock)
678 try:
679 yield not already_in_transaction, connection
680 if trans is not None:
681 trans.commit()
682 except BaseException:
683 if trans is not None:
684 trans.rollback()
685 raise
687 @abstractmethod
688 def _lockTables(
689 self, connection: sqlalchemy.engine.Connection, tables: Iterable[sqlalchemy.schema.Table] = ()
690 ) -> None:
691 """Acquire locks on the given tables.
693 This is an implementation hook for subclasses, called by `transaction`.
694 It should not be called directly by other code.
696 Parameters
697 ----------
698 connection : `sqlalchemy.engine.Connection`
699 Database connection object. It is guaranteed that transaction is
700 already in a progress for this connection.
701 tables : `~collections.abc.Iterable` [ `sqlalchemy.schema.Table` ], \
702 optional
703 A list of tables to lock for the duration of this transaction.
704 These locks are guaranteed to prevent concurrent writes and allow
705 this transaction (only) to acquire the same locks (others should
706 block), but only prevent concurrent reads if the database engine
707 requires that in order to block concurrent writes.
708 """
709 raise NotImplementedError()
711 def isTableWriteable(self, table: sqlalchemy.schema.Table) -> bool:
712 """Check whether a table is writeable, either because the database
713 connection is read-write or the table is a temporary table.
715 Parameters
716 ----------
717 table : `sqlalchemy.schema.Table`
718 SQLAlchemy table object to check.
720 Returns
721 -------
722 writeable : `bool`
723 Whether this table is writeable.
724 """
725 return self.isWriteable() or table.key in self._temp_tables
727 def assertTableWriteable(self, table: sqlalchemy.schema.Table, msg: str) -> None:
728 """Raise if the given table is not writeable, either because the
729 database connection is read-write or the table is a temporary table.
731 Parameters
732 ----------
733 table : `sqlalchemy.schema.Table`
734 SQLAlchemy table object to check.
735 msg : `str`, optional
736 If provided, raise `ReadOnlyDatabaseError` instead of returning
737 `False`, with this message.
738 """
739 if not self.isTableWriteable(table):
740 raise ReadOnlyDatabaseError(msg)
742 @contextmanager
743 def declareStaticTables(self, *, create: bool) -> Iterator[StaticTablesContext]:
744 """Return a context manager in which the database's static DDL schema
745 can be declared.
747 Parameters
748 ----------
749 create : `bool`
750 If `True`, attempt to create all tables at the end of the context.
751 If `False`, they will be assumed to already exist.
753 Returns
754 -------
755 schema : `StaticTablesContext`
756 A helper object that is used to add new tables.
758 Raises
759 ------
760 ReadOnlyDatabaseError
761 Raised if ``create`` is `True`, `Database.isWriteable` is `False`,
762 and one or more declared tables do not already exist.
764 Examples
765 --------
766 Given a `Database` instance ``db``::
768 with db.declareStaticTables(create=True) as schema:
769 schema.addTable("table1", TableSpec(...))
770 schema.addTable("table2", TableSpec(...))
772 Notes
773 -----
774 A database's static DDL schema must be declared before any dynamic
775 tables are managed via calls to `ensureTableExists` or
776 `getExistingTable`. The order in which static schema tables are added
777 inside the context block is unimportant; they will automatically be
778 sorted and added in an order consistent with their foreign key
779 relationships.
780 """
781 if create and not self.isWriteable():
782 raise ReadOnlyDatabaseError(f"Cannot create tables in read-only database {self}.")
784 self._metadata = DatabaseMetadata(self.namespace)
785 try:
786 with self._transaction() as (_, connection):
787 context = StaticTablesContext(self, connection)
788 if create and context._tableNames:
789 # Looks like database is already initalized, to avoid
790 # danger of modifying/destroying valid schema we refuse to
791 # do anything in this case
792 raise SchemaAlreadyDefinedError(f"Cannot create tables in non-empty database {self}.")
793 yield context
794 if create:
795 if (
796 self.namespace is not None
797 and self.namespace not in context._inspector.get_schema_names()
798 ):
799 connection.execute(sqlalchemy.schema.CreateSchema(self.namespace))
800 # In our tables we have columns that make use of sqlalchemy
801 # Sequence objects. There is currently a bug in sqlalchemy
802 # that causes a deprecation warning to be thrown on a
803 # property of the Sequence object when the repr for the
804 # sequence is created. Here a filter is used to catch these
805 # deprecation warnings when tables are created.
806 with warnings.catch_warnings():
807 warnings.simplefilter("ignore", category=sqlalchemy.exc.SADeprecationWarning)
808 self._metadata.create_all(connection)
809 # call all initializer methods sequentially
810 for init in context._initializers:
811 init(self)
812 except BaseException:
813 self._metadata = None
814 raise
816 @abstractmethod
817 def isWriteable(self) -> bool:
818 """Return `True` if this database can be modified by this client."""
819 raise NotImplementedError()
821 def isInTransaction(self) -> bool:
822 """Return `True` if there is currently a database connection open with
823 an active transaction; `False` otherwise.
824 """
825 session = self._session_connection
826 return session is not None and session.in_transaction()
828 @abstractmethod
829 def __str__(self) -> str:
830 """Return a human-readable identifier for this `Database`, including
831 any namespace or schema that identifies its names within a `Registry`.
832 """
833 raise NotImplementedError()
835 @property
836 def dialect(self) -> sqlalchemy.engine.Dialect:
837 """The SQLAlchemy dialect for this database engine
838 (`sqlalchemy.engine.Dialect`).
839 """
840 return self._engine.dialect
842 def shrinkDatabaseEntityName(self, original: str) -> str:
843 """Return a version of the given name that fits within this database
844 engine's length limits for table, constraint, indexes, and sequence
845 names.
847 Implementations should not assume that simple truncation is safe,
848 because multiple long names often begin with the same prefix.
850 The default implementation simply returns the given name.
852 Parameters
853 ----------
854 original : `str`
855 The original name.
857 Returns
858 -------
859 shrunk : `str`
860 The new, possibly shortened name.
861 """
862 return original
864 def expandDatabaseEntityName(self, shrunk: str) -> str:
865 """Retrieve the original name for a database entity that was too long
866 to fit within the database engine's limits.
868 Parameters
869 ----------
870 shrunk : `str`
871 The original name.
873 Returns
874 -------
875 shrunk : `str`
876 The new, possibly shortened name.
877 """
878 return shrunk
880 def _makeColumnConstraints(self, table: str, spec: ddl.FieldSpec) -> list[sqlalchemy.CheckConstraint]:
881 """Create constraints based on this spec.
883 Parameters
884 ----------
885 table : `str`
886 Name of the table this column is being added to.
887 spec : `FieldSpec`
888 Specification for the field to be added.
890 Returns
891 -------
892 constraint : `list` of `sqlalchemy.CheckConstraint`
893 Constraint added for this column.
894 """
895 # By default we return no additional constraints
896 return []
898 def _convertFieldSpec(
899 self, table: str, spec: ddl.FieldSpec, metadata: sqlalchemy.MetaData, **kwargs: Any
900 ) -> sqlalchemy.schema.Column:
901 """Convert a `FieldSpec` to a `sqlalchemy.schema.Column`.
903 Parameters
904 ----------
905 table : `str`
906 Name of the table this column is being added to.
907 spec : `FieldSpec`
908 Specification for the field to be added.
909 metadata : `sqlalchemy.MetaData`
910 SQLAlchemy representation of the DDL schema this field's table is
911 being added to.
912 **kwargs
913 Additional keyword arguments to forward to the
914 `sqlalchemy.schema.Column` constructor. This is provided to make
915 it easier for derived classes to delegate to ``super()`` while
916 making only minor changes.
918 Returns
919 -------
920 column : `sqlalchemy.schema.Column`
921 SQLAlchemy representation of the field.
922 """
923 args = []
924 if spec.autoincrement:
925 # Generate a sequence to use for auto incrementing for databases
926 # that do not support it natively. This will be ignored by
927 # sqlalchemy for databases that do support it.
928 args.append(
929 sqlalchemy.Sequence(
930 self.shrinkDatabaseEntityName(f"{table}_seq_{spec.name}"), metadata=metadata
931 )
932 )
933 assert spec.doc is None or isinstance(spec.doc, str), f"Bad doc for {table}.{spec.name}."
934 return sqlalchemy.schema.Column(
935 spec.name,
936 spec.getSizedColumnType(),
937 *args,
938 nullable=spec.nullable,
939 primary_key=spec.primaryKey,
940 comment=spec.doc,
941 server_default=spec.default,
942 **kwargs,
943 )
945 def _convertForeignKeySpec(
946 self, table: str, spec: ddl.ForeignKeySpec, metadata: sqlalchemy.MetaData, **kwargs: Any
947 ) -> sqlalchemy.schema.ForeignKeyConstraint:
948 """Convert a `ForeignKeySpec` to a
949 `sqlalchemy.schema.ForeignKeyConstraint`.
951 Parameters
952 ----------
953 table : `str`
954 Name of the table this foreign key is being added to.
955 spec : `ForeignKeySpec`
956 Specification for the foreign key to be added.
957 metadata : `sqlalchemy.MetaData`
958 SQLAlchemy representation of the DDL schema this constraint is
959 being added to.
960 **kwargs
961 Additional keyword arguments to forward to the
962 `sqlalchemy.schema.ForeignKeyConstraint` constructor. This is
963 provided to make it easier for derived classes to delegate to
964 ``super()`` while making only minor changes.
966 Returns
967 -------
968 constraint : `sqlalchemy.schema.ForeignKeyConstraint`
969 SQLAlchemy representation of the constraint.
970 """
971 name = self.shrinkDatabaseEntityName(
972 "_".join(["fkey", table, spec.table] + list(spec.target) + list(spec.source))
973 )
974 return sqlalchemy.schema.ForeignKeyConstraint(
975 spec.source,
976 [f"{spec.table}.{col}" for col in spec.target],
977 name=name,
978 ondelete=spec.onDelete,
979 )
981 def _convertExclusionConstraintSpec(
982 self,
983 table: str,
984 spec: tuple[str | type[TimespanDatabaseRepresentation], ...],
985 metadata: sqlalchemy.MetaData,
986 ) -> sqlalchemy.schema.Constraint:
987 """Convert a `tuple` from `ddl.TableSpec.exclusion` into a SQLAlchemy
988 constraint representation.
990 Parameters
991 ----------
992 table : `str`
993 Name of the table this constraint is being added to.
994 spec : `tuple` [ `str` or `type` ]
995 A tuple of `str` column names and the `type` object returned by
996 `getTimespanRepresentation` (which must appear exactly once),
997 indicating the order of the columns in the index used to back the
998 constraint.
999 metadata : `sqlalchemy.MetaData`
1000 SQLAlchemy representation of the DDL schema this constraint is
1001 being added to.
1003 Returns
1004 -------
1005 constraint : `sqlalchemy.schema.Constraint`
1006 SQLAlchemy representation of the constraint.
1008 Raises
1009 ------
1010 NotImplementedError
1011 Raised if this database does not support exclusion constraints.
1012 """
1013 raise NotImplementedError(f"Database {self} does not support exclusion constraints.")
1015 def _convertTableSpec(
1016 self, name: str, spec: ddl.TableSpec, metadata: sqlalchemy.MetaData, **kwargs: Any
1017 ) -> sqlalchemy.schema.Table:
1018 """Convert a `TableSpec` to a `sqlalchemy.schema.Table`.
1020 Parameters
1021 ----------
1022 name : `str`
1023 The name of the table.
1024 spec : `TableSpec`
1025 Specification for the foreign key to be added.
1026 metadata : `sqlalchemy.MetaData`
1027 SQLAlchemy representation of the DDL schema this table is being
1028 added to.
1029 **kwargs
1030 Additional keyword arguments to forward to the
1031 `sqlalchemy.schema.Table` constructor. This is provided to make it
1032 easier for derived classes to delegate to ``super()`` while making
1033 only minor changes.
1035 Returns
1036 -------
1037 table : `sqlalchemy.schema.Table`
1038 SQLAlchemy representation of the table.
1040 Notes
1041 -----
1042 This method does not handle ``spec.foreignKeys`` at all, in order to
1043 avoid circular dependencies. These are added by higher-level logic in
1044 `ensureTableExists`, `getExistingTable`, and `declareStaticTables`.
1045 """
1046 args: list[sqlalchemy.schema.SchemaItem] = [
1047 self._convertFieldSpec(name, fieldSpec, metadata) for fieldSpec in spec.fields
1048 ]
1050 # Add any column constraints
1051 for fieldSpec in spec.fields:
1052 args.extend(self._makeColumnConstraints(name, fieldSpec))
1054 # Track indexes added for primary key and unique constraints, to make
1055 # sure we don't add duplicate explicit or foreign key indexes for
1056 # those.
1057 allIndexes = {tuple(fieldSpec.name for fieldSpec in spec.fields if fieldSpec.primaryKey)}
1058 args.extend(
1059 sqlalchemy.schema.UniqueConstraint(
1060 *columns, name=self.shrinkDatabaseEntityName("_".join([name, "unq"] + list(columns)))
1061 )
1062 for columns in spec.unique
1063 )
1064 allIndexes.update(spec.unique)
1065 args.extend(
1066 sqlalchemy.schema.Index(
1067 self.shrinkDatabaseEntityName("_".join([name, "idx"] + list(index.columns))),
1068 *index.columns,
1069 unique=(index.columns in spec.unique),
1070 **index.kwargs,
1071 )
1072 for index in spec.indexes
1073 if index.columns not in allIndexes
1074 )
1075 allIndexes.update(index.columns for index in spec.indexes)
1076 args.extend(
1077 sqlalchemy.schema.Index(
1078 self.shrinkDatabaseEntityName("_".join((name, "fkidx") + fk.source)),
1079 *fk.source,
1080 )
1081 for fk in spec.foreignKeys
1082 if fk.addIndex and fk.source not in allIndexes
1083 )
1085 args.extend(self._convertExclusionConstraintSpec(name, excl, metadata) for excl in spec.exclusion)
1087 assert spec.doc is None or isinstance(spec.doc, str), f"Bad doc for {name}."
1088 return sqlalchemy.schema.Table(name, metadata, *args, comment=spec.doc, info={"spec": spec}, **kwargs)
1090 def ensureTableExists(self, name: str, spec: ddl.TableSpec) -> sqlalchemy.schema.Table:
1091 """Ensure that a table with the given name and specification exists,
1092 creating it if necessary.
1094 Parameters
1095 ----------
1096 name : `str`
1097 Name of the table (not including namespace qualifiers).
1098 spec : `TableSpec`
1099 Specification for the table. This will be used when creating the
1100 table, and *may* be used when obtaining an existing table to check
1101 for consistency, but no such check is guaranteed.
1103 Returns
1104 -------
1105 table : `sqlalchemy.schema.Table`
1106 SQLAlchemy representation of the table.
1108 Raises
1109 ------
1110 ReadOnlyDatabaseError
1111 Raised if `isWriteable` returns `False`, and the table does not
1112 already exist.
1113 DatabaseConflictError
1114 Raised if the table exists but ``spec`` is inconsistent with its
1115 definition.
1117 Notes
1118 -----
1119 This method may not be called within transactions. It may be called on
1120 read-only databases if and only if the table does in fact already
1121 exist.
1123 Subclasses may override this method, but usually should not need to.
1124 """
1125 # TODO: if _engine is used to make a table then it uses separate
1126 # connection and should not interfere with current transaction
1127 assert not self.isInTransaction(), "Table creation interrupts transactions."
1128 assert self._metadata is not None, "Static tables must be declared before dynamic tables."
1129 table = self.getExistingTable(name, spec)
1130 if table is not None:
1131 return table
1132 if not self.isWriteable():
1133 raise ReadOnlyDatabaseError(
1134 f"Table {name} does not exist, and cannot be created because database {self} is read-only."
1135 )
1137 table = self._metadata.add_table(self, name, spec)
1138 try:
1139 with self._transaction() as (_, connection):
1140 table.create(connection)
1141 except sqlalchemy.exc.DatabaseError:
1142 # Some other process could have created the table meanwhile, which
1143 # usually causes OperationalError or ProgrammingError. We cannot
1144 # use IF NOT EXISTS clause in this case due to PostgreSQL race
1145 # condition on server side which causes IntegrityError. Instead we
1146 # catch these exceptions (they all inherit DatabaseError) and
1147 # re-check whether table is now there.
1148 table = self.getExistingTable(name, spec)
1149 if table is None:
1150 raise
1151 return table
1153 def getExistingTable(self, name: str, spec: ddl.TableSpec) -> sqlalchemy.schema.Table | None:
1154 """Obtain an existing table with the given name and specification.
1156 Parameters
1157 ----------
1158 name : `str`
1159 Name of the table (not including namespace qualifiers).
1160 spec : `TableSpec`
1161 Specification for the table. This will be used when creating the
1162 SQLAlchemy representation of the table, and it is used to
1163 check that the actual table in the database is consistent.
1165 Returns
1166 -------
1167 table : `sqlalchemy.schema.Table` or `None`
1168 SQLAlchemy representation of the table, or `None` if it does not
1169 exist.
1171 Raises
1172 ------
1173 DatabaseConflictError
1174 Raised if the table exists but ``spec`` is inconsistent with its
1175 definition.
1177 Notes
1178 -----
1179 This method can be called within transactions and never modifies the
1180 database.
1182 Subclasses may override this method, but usually should not need to.
1183 """
1184 assert self._metadata is not None, "Static tables must be declared before dynamic tables."
1185 table = self._metadata.get_table(name)
1186 if table is not None:
1187 if spec.fields.names != set(table.columns.keys()):
1188 raise DatabaseConflictError(
1189 f"Table '{name}' has already been defined differently; the new "
1190 f"specification has columns {list(spec.fields.names)}, while "
1191 f"the previous definition has {list(table.columns.keys())}."
1192 )
1193 else:
1194 inspector = sqlalchemy.inspect(
1195 self._engine if self._session_connection is None else self._session_connection, raiseerr=True
1196 )
1197 if name in inspector.get_table_names(schema=self.namespace):
1198 _checkExistingTableDefinition(name, spec, inspector.get_columns(name, schema=self.namespace))
1199 return self._metadata.add_table(self, name, spec)
1200 return table
1202 def _make_temporary_table(
1203 self,
1204 connection: sqlalchemy.engine.Connection,
1205 spec: ddl.TableSpec,
1206 name: str | None = None,
1207 **kwargs: Any,
1208 ) -> sqlalchemy.schema.Table:
1209 """Create a temporary table.
1211 Parameters
1212 ----------
1213 connection : `sqlalchemy.engine.Connection`
1214 Connection to use when creating the table.
1215 spec : `TableSpec`
1216 Specification for the table.
1217 name : `str`, optional
1218 A unique (within this session/connetion) name for the table.
1219 Subclasses may override to modify the actual name used. If not
1220 provided, a unique name will be generated.
1221 **kwargs
1222 Additional keyword arguments to forward to the
1223 `sqlalchemy.schema.Table` constructor. This is provided to make it
1224 easier for derived classes to delegate to ``super()`` while making
1225 only minor changes.
1227 Returns
1228 -------
1229 table : `sqlalchemy.schema.Table`
1230 SQLAlchemy representation of the table.
1231 """
1232 if name is None:
1233 name = f"tmp_{uuid.uuid4().hex}"
1234 if self._metadata is None:
1235 raise RuntimeError("Cannot create temporary table before static schema is defined.")
1236 table = self._metadata.add_table(
1237 self, name, spec, prefixes=["TEMPORARY"], schema=sqlalchemy.schema.BLANK_SCHEMA, **kwargs
1238 )
1239 if table.key in self._temp_tables and table.key != name:
1240 raise ValueError(
1241 f"A temporary table with name {name} (transformed to {table.key} by Database) already exists."
1242 )
1243 with self._transaction():
1244 table.create(connection)
1245 return table
1247 @classmethod
1248 def getTimespanRepresentation(cls) -> type[TimespanDatabaseRepresentation]:
1249 """Return a `type` that encapsulates the way `Timespan` objects are
1250 stored in this database.
1252 `Database` does not automatically use the return type of this method
1253 anywhere else; calling code is responsible for making sure that DDL
1254 and queries are consistent with it.
1256 Returns
1257 -------
1258 TimespanReprClass : `type` (`TimespanDatabaseRepresention` subclass)
1259 A type that encapsulates the way `Timespan` objects should be
1260 stored in this database.
1262 Notes
1263 -----
1264 There are two big reasons we've decided to keep timespan-mangling logic
1265 outside the `Database` implementations, even though the choice of
1266 representation is ultimately up to a `Database` implementation:
1268 - Timespans appear in relatively few tables and queries in our
1269 typical usage, and the code that operates on them is already aware
1270 that it is working with timespans. In contrast, a
1271 timespan-representation-aware implementation of, say,
1272 `Database.insert`, would need to have extra logic to identify when
1273 timespan-mangling needed to occur, which would usually be useless
1274 overhead.
1276 - SQLAlchemy's rich SELECT query expression system has no way to wrap
1277 multiple columns in a single expression object (the ORM does, but
1278 we are not using the ORM). So we would have to wrap _much_ more of
1279 that code in our own interfaces to encapsulate timespan
1280 representations there.
1281 """
1282 return TimespanDatabaseRepresentation.Compound
1284 def sync(
1285 self,
1286 table: sqlalchemy.schema.Table,
1287 *,
1288 keys: dict[str, Any],
1289 compared: dict[str, Any] | None = None,
1290 extra: dict[str, Any] | None = None,
1291 returning: Sequence[str] | None = None,
1292 update: bool = False,
1293 ) -> tuple[dict[str, Any] | None, bool | dict[str, Any]]:
1294 """Insert into a table as necessary to ensure database contains
1295 values equivalent to the given ones.
1297 Parameters
1298 ----------
1299 table : `sqlalchemy.schema.Table`
1300 Table to be queried and possibly inserted into.
1301 keys : `dict`
1302 Column name-value pairs used to search for an existing row; must
1303 be a combination that can be used to select a single row if one
1304 exists. If such a row does not exist, these values are used in
1305 the insert.
1306 compared : `dict`, optional
1307 Column name-value pairs that are compared to those in any existing
1308 row. If such a row does not exist, these rows are used in the
1309 insert.
1310 extra : `dict`, optional
1311 Column name-value pairs that are ignored if a matching row exists,
1312 but used in an insert if one is necessary.
1313 returning : `~collections.abc.Sequence` of `str`, optional
1314 The names of columns whose values should be returned.
1315 update : `bool`, optional
1316 If `True` (`False` is default), update the existing row with the
1317 values in ``compared`` instead of raising `DatabaseConflictError`.
1319 Returns
1320 -------
1321 row : `dict`, optional
1322 The value of the fields indicated by ``returning``, or `None` if
1323 ``returning`` is `None`.
1324 inserted_or_updated : `bool` or `dict`
1325 If `True`, a new row was inserted; if `False`, a matching row
1326 already existed. If a `dict` (only possible if ``update=True``),
1327 then an existing row was updated, and the dict maps the names of
1328 the updated columns to their *old* values (new values can be
1329 obtained from ``compared``).
1331 Raises
1332 ------
1333 DatabaseConflictError
1334 Raised if the values in ``compared`` do not match the values in the
1335 database.
1336 ReadOnlyDatabaseError
1337 Raised if `isWriteable` returns `False`, and no matching record
1338 already exists.
1340 Notes
1341 -----
1342 May be used inside transaction contexts, so implementations may not
1343 perform operations that interrupt transactions.
1345 It may be called on read-only databases if and only if the matching row
1346 does in fact already exist.
1347 """
1349 def check() -> tuple[int, dict[str, Any] | None, list | None]:
1350 """Query for a row that matches the ``key`` argument, and compare
1351 to what was given by the caller.
1353 Returns
1354 -------
1355 n : `int`
1356 Number of matching rows. ``n != 1`` is always an error, but
1357 it's a different kind of error depending on where `check` is
1358 being called.
1359 bad : `dict` or `None`
1360 The subset of the keys of ``compared`` for which the existing
1361 values did not match the given one, mapped to the existing
1362 values in the database. Once again, ``not bad`` is always an
1363 error, but a different kind on context. `None` if ``n != 1``.
1364 result : `list` or `None`
1365 Results in the database that correspond to the columns given
1366 in ``returning``, or `None` if ``returning is None``.
1367 """
1368 toSelect: set[str] = set()
1369 if compared is not None:
1370 toSelect.update(compared.keys())
1371 if returning is not None:
1372 toSelect.update(returning)
1373 if not toSelect:
1374 # Need to select some column, even if we just want to see
1375 # how many rows we get back.
1376 toSelect.add(next(iter(keys.keys())))
1377 selectSql = (
1378 sqlalchemy.sql.select(*[table.columns[k].label(k) for k in toSelect])
1379 .select_from(table)
1380 .where(sqlalchemy.sql.and_(*[table.columns[k] == v for k, v in keys.items()]))
1381 )
1382 with self._transaction() as (_, connection):
1383 fetched = list(connection.execute(selectSql).mappings())
1384 if len(fetched) != 1:
1385 return len(fetched), None, None
1386 existing = fetched[0]
1387 if compared is not None:
1389 def safeNotEqual(a: Any, b: Any) -> bool:
1390 if isinstance(a, astropy.time.Time):
1391 return not time_utils.TimeConverter().times_equal(a, b)
1392 return a != b
1394 inconsistencies = {
1395 k: existing[k] for k, v in compared.items() if safeNotEqual(existing[k], v)
1396 }
1397 else:
1398 inconsistencies = {}
1399 if returning is not None:
1400 toReturn: list | None = [existing[k] for k in returning]
1401 else:
1402 toReturn = None
1403 return 1, inconsistencies, toReturn
1405 def _format_bad(inconsistencies: dict[str, Any]) -> str:
1406 """Format the 'bad' dictionary of existing values returned by
1407 ``check`` into a string suitable for an error message.
1408 """
1409 assert compared is not None, "Should not be able to get inconsistencies without comparing."
1410 return ", ".join(f"{k}: {v!r} != {compared[k]!r}" for k, v in inconsistencies.items())
1412 if self.isTableWriteable(table):
1413 # Try an insert first, but allow it to fail (in only specific
1414 # ways).
1415 row = keys.copy()
1416 if compared is not None:
1417 row.update(compared)
1418 if extra is not None:
1419 row.update(extra)
1420 with self.transaction():
1421 inserted = bool(self.ensure(table, row))
1422 inserted_or_updated: bool | dict[str, Any]
1423 # Need to perform check() for this branch inside the
1424 # transaction, so we roll back an insert that didn't do
1425 # what we expected. That limits the extent to which we
1426 # can reduce duplication between this block and the other
1427 # ones that perform similar logic.
1428 n, bad, result = check()
1429 if n < 1:
1430 raise ConflictingDefinitionError(
1431 f"Attempted to ensure {row} exists by inserting it with ON CONFLICT IGNORE, "
1432 f"but a post-insert query on {keys} returned no results. "
1433 f"Insert was {'' if inserted else 'not '}reported as successful. "
1434 "This can occur if the insert violated a database constraint other than the "
1435 "unique constraint or primary key used to identify the row in this call."
1436 )
1437 elif n > 1:
1438 raise RuntimeError(
1439 f"Keys passed to sync {keys.keys()} do not comprise a "
1440 f"unique constraint for table {table.name}."
1441 )
1442 elif bad:
1443 assert compared is not None, (
1444 "Should not be able to get inconsistencies without comparing."
1445 )
1446 if inserted:
1447 raise RuntimeError(
1448 f"Conflict ({bad}) in sync after successful insert; this is "
1449 "possible if the same table is being updated by a concurrent "
1450 "process that isn't using sync, but it may also be a bug in "
1451 "daf_butler."
1452 )
1453 elif update:
1454 with self._transaction() as (_, connection):
1455 connection.execute(
1456 table.update()
1457 .where(sqlalchemy.sql.and_(*[table.columns[k] == v for k, v in keys.items()]))
1458 .values(**{k: compared[k] for k in bad})
1459 )
1460 inserted_or_updated = bad
1461 else:
1462 raise DatabaseConflictError(
1463 f"Conflict in sync for table {table.name} on column(s) {_format_bad(bad)}."
1464 )
1465 else:
1466 inserted_or_updated = inserted
1467 else:
1468 # Database is not writeable; just see if the row exists.
1469 n, bad, result = check()
1470 if n < 1:
1471 raise ReadOnlyDatabaseError("sync needs to insert, but database is read-only.")
1472 elif n > 1:
1473 raise RuntimeError("Keys passed to sync do not comprise a unique constraint.")
1474 elif bad:
1475 if update:
1476 raise ReadOnlyDatabaseError("sync needs to update, but database is read-only.")
1477 else:
1478 raise DatabaseConflictError(
1479 f"Conflict in sync for table {table.name} on column(s) {_format_bad(bad)}."
1480 )
1481 inserted_or_updated = False
1482 if returning is None:
1483 return None, inserted_or_updated
1484 else:
1485 assert result is not None
1486 return dict(zip(returning, result, strict=True)), inserted_or_updated
1488 def insert(
1489 self,
1490 table: sqlalchemy.schema.Table,
1491 *rows: dict,
1492 returnIds: bool = False,
1493 select: sqlalchemy.sql.expression.SelectBase | None = None,
1494 names: Iterable[str] | None = None,
1495 ) -> list[int] | None:
1496 """Insert one or more rows into a table, optionally returning
1497 autoincrement primary key values.
1499 Parameters
1500 ----------
1501 table : `sqlalchemy.schema.Table`
1502 Table rows should be inserted into.
1503 *rows : `dict`
1504 Positional arguments are the rows to be inserted, as dictionaries
1505 mapping column name to value. The keys in all dictionaries must
1506 be the same.
1507 returnIds : `bool`, optional
1508 If `True` (`False` is default), return the values of the table's
1509 autoincrement primary key field (which much exist).
1510 select : `sqlalchemy.sql.SelectBase`, optional
1511 A SELECT query expression to insert rows from. Cannot be provided
1512 with either ``rows`` or ``returnIds=True``.
1513 names : `~collections.abc.Iterable` [ `str` ], optional
1514 Names of columns in ``table`` to be populated, ordered to match the
1515 columns returned by ``select``. Ignored if ``select`` is `None`.
1516 If not provided, the columns returned by ``select`` must be named
1517 to match the desired columns of ``table``.
1519 Returns
1520 -------
1521 ids : `None`, or `list` of `int`
1522 If ``returnIds`` is `True`, a `list` containing the inserted
1523 values for the table's autoincrement primary key.
1525 Raises
1526 ------
1527 ReadOnlyDatabaseError
1528 Raised if `isWriteable` returns `False` when this method is called.
1530 Notes
1531 -----
1532 The default implementation uses bulk insert syntax when ``returnIds``
1533 is `False`, and a loop over single-row insert operations when it is
1534 `True`.
1536 Derived classes should reimplement when they can provide a more
1537 efficient implementation (especially for the latter case).
1539 May be used inside transaction contexts, so implementations may not
1540 perform operations that interrupt transactions.
1541 """
1542 self.assertTableWriteable(table, f"Cannot insert into read-only table {table}.")
1543 if select is not None and (rows or returnIds):
1544 raise TypeError("'select' is incompatible with passing value rows or returnIds=True.")
1545 if not rows and select is None:
1546 if returnIds:
1547 return []
1548 else:
1549 return None
1550 with self._transaction() as (_, connection):
1551 if not returnIds:
1552 if select is not None:
1553 if names is None:
1554 # columns() is deprecated since 1.4, but
1555 # selected_columns() method did not exist in 1.3.
1556 if hasattr(select, "selected_columns"):
1557 names = select.selected_columns.keys()
1558 else:
1559 names = select.columns.keys()
1560 connection.execute(table.insert().from_select(list(names), select))
1561 else:
1562 connection.execute(table.insert(), rows)
1563 return None
1564 else:
1565 sql = table.insert()
1566 ids = []
1567 for row in rows:
1568 key = connection.execute(sql, row).inserted_primary_key
1569 assert key is not None
1570 ids.append(key[0])
1571 return ids
1573 @abstractmethod
1574 def replace(self, table: sqlalchemy.schema.Table, *rows: dict) -> None:
1575 """Insert one or more rows into a table, replacing any existing rows
1576 for which insertion of a new row would violate the primary key
1577 constraint.
1579 Parameters
1580 ----------
1581 table : `sqlalchemy.schema.Table`
1582 Table rows should be inserted into.
1583 *rows
1584 Positional arguments are the rows to be inserted, as dictionaries
1585 mapping column name to value. The keys in all dictionaries must
1586 be the same.
1588 Raises
1589 ------
1590 ReadOnlyDatabaseError
1591 Raised if `isWriteable` returns `False` when this method is called.
1593 Notes
1594 -----
1595 May be used inside transaction contexts, so implementations may not
1596 perform operations that interrupt transactions.
1598 Implementations should raise a `sqlalchemy.exc.IntegrityError`
1599 exception when a constraint other than the primary key would be
1600 violated.
1602 Implementations are not required to support `replace` on tables
1603 with autoincrement keys.
1604 """
1605 raise NotImplementedError()
1607 @abstractmethod
1608 def ensure(self, table: sqlalchemy.schema.Table, *rows: dict, primary_key_only: bool = False) -> int:
1609 """Insert one or more rows into a table, skipping any rows for which
1610 insertion would violate a unique constraint.
1612 Parameters
1613 ----------
1614 table : `sqlalchemy.schema.Table`
1615 Table rows should be inserted into.
1616 *rows
1617 Positional arguments are the rows to be inserted, as dictionaries
1618 mapping column name to value. The keys in all dictionaries must
1619 be the same.
1620 primary_key_only : `bool`, optional
1621 If `True` (`False` is default), only skip rows that violate the
1622 primary key constraint, and raise an exception (and rollback
1623 transactions) for other constraint violations.
1625 Returns
1626 -------
1627 count : `int`
1628 The number of rows actually inserted.
1630 Raises
1631 ------
1632 ReadOnlyDatabaseError
1633 Raised if `isWriteable` returns `False` when this method is called.
1634 This is raised even if the operation would do nothing even on a
1635 writeable database.
1637 Notes
1638 -----
1639 May be used inside transaction contexts, so implementations may not
1640 perform operations that interrupt transactions.
1642 Implementations are not required to support `ensure` on tables
1643 with autoincrement keys.
1644 """
1645 raise NotImplementedError()
1647 def delete(self, table: sqlalchemy.schema.Table, columns: Iterable[str], *rows: dict) -> int:
1648 """Delete one or more rows from a table.
1650 Parameters
1651 ----------
1652 table : `sqlalchemy.schema.Table`
1653 Table that rows should be deleted from.
1654 columns : `~collections.abc.Iterable` of `str`
1655 The names of columns that will be used to constrain the rows to
1656 be deleted; these will be combined via ``AND`` to form the
1657 ``WHERE`` clause of the delete query.
1658 *rows
1659 Positional arguments are the keys of rows to be deleted, as
1660 dictionaries mapping column name to value. The keys in all
1661 dictionaries must be exactly the names in ``columns``.
1663 Returns
1664 -------
1665 count : `int`
1666 Number of rows deleted.
1668 Raises
1669 ------
1670 ReadOnlyDatabaseError
1671 Raised if `isWriteable` returns `False` when this method is called.
1673 Notes
1674 -----
1675 May be used inside transaction contexts, so implementations may not
1676 perform operations that interrupt transactions.
1678 The default implementation should be sufficient for most derived
1679 classes.
1680 """
1681 self.assertTableWriteable(table, f"Cannot delete from read-only table {table}.")
1682 if columns and not rows:
1683 # If there are no columns, this operation is supposed to delete
1684 # everything (so we proceed as usual). But if there are columns,
1685 # but no rows, it was a constrained bulk operation where the
1686 # constraint is that no rows match, and we should short-circuit
1687 # while reporting that no rows were affected.
1688 return 0
1689 sql = table.delete()
1690 columns = list(columns) # Force iterators to list
1692 # More efficient to use IN operator if there is only one
1693 # variable changing across all rows.
1694 content: dict[str, set] = defaultdict(set)
1695 if len(columns) == 1:
1696 # Nothing to calculate since we can always use IN
1697 column = columns[0]
1698 changing_columns = [column]
1699 content[column] = {row[column] for row in rows}
1700 else:
1701 for row in rows:
1702 for k, v in row.items():
1703 content[k].add(v)
1704 changing_columns = [col for col, values in content.items() if len(values) > 1]
1706 if len(changing_columns) != 1:
1707 # More than one column changes each time so do explicit bind
1708 # parameters and have each row processed separately.
1709 whereTerms = [table.columns[name] == sqlalchemy.sql.bindparam(name) for name in columns]
1710 if whereTerms:
1711 sql = sql.where(sqlalchemy.sql.and_(*whereTerms))
1712 with self._transaction() as (_, connection):
1713 return connection.execute(sql, rows).rowcount
1714 else:
1715 # One of the columns has changing values but any others are
1716 # fixed. In this case we can use an IN operator and be more
1717 # efficient.
1718 name = changing_columns.pop()
1720 # Simple where clause for the unchanging columns
1721 clauses = []
1722 for k, v in content.items():
1723 if k == name:
1724 continue
1725 # The set only has one element
1726 clauses.append(table.columns[k] == v.pop())
1728 # The IN operator will not work for "infinite" numbers of
1729 # rows so must batch it up into distinct calls.
1730 in_content = list(content[name])
1731 n_elements = len(in_content)
1733 rowcount = 0
1734 iposn = 0
1735 n_per_loop = 1_000 # Controls how many items to put in IN clause
1736 with self._transaction() as (_, connection):
1737 for iposn in range(0, n_elements, n_per_loop):
1738 endpos = iposn + n_per_loop
1739 in_clause = table.columns[name].in_(in_content[iposn:endpos])
1741 newsql = sql.where(sqlalchemy.sql.and_(*clauses, in_clause))
1742 rowcount += connection.execute(newsql).rowcount
1743 return rowcount
1745 def deleteWhere(self, table: sqlalchemy.schema.Table, where: sqlalchemy.sql.ColumnElement) -> int:
1746 """Delete rows from a table with pre-constructed WHERE clause.
1748 Parameters
1749 ----------
1750 table : `sqlalchemy.schema.Table`
1751 Table that rows should be deleted from.
1752 where : `sqlalchemy.sql.ClauseElement`
1753 The names of columns that will be used to constrain the rows to
1754 be deleted; these will be combined via ``AND`` to form the
1755 ``WHERE`` clause of the delete query.
1757 Returns
1758 -------
1759 count : `int`
1760 Number of rows deleted.
1762 Raises
1763 ------
1764 ReadOnlyDatabaseError
1765 Raised if `isWriteable` returns `False` when this method is called.
1767 Notes
1768 -----
1769 May be used inside transaction contexts, so implementations may not
1770 perform operations that interrupt transactions.
1772 The default implementation should be sufficient for most derived
1773 classes.
1774 """
1775 self.assertTableWriteable(table, f"Cannot delete from read-only table {table}.")
1777 sql = table.delete().where(where)
1778 with self._transaction() as (_, connection):
1779 return connection.execute(sql).rowcount
1781 def update(self, table: sqlalchemy.schema.Table, where: dict[str, str], *rows: dict) -> int:
1782 """Update one or more rows in a table.
1784 Parameters
1785 ----------
1786 table : `sqlalchemy.schema.Table`
1787 Table containing the rows to be updated.
1788 where : `dict` [`str`, `str`]
1789 A mapping from the names of columns that will be used to search for
1790 existing rows to the keys that will hold these values in the
1791 ``rows`` dictionaries. Note that these may not be the same due to
1792 SQLAlchemy limitations.
1793 *rows
1794 Positional arguments are the rows to be updated. The keys in all
1795 dictionaries must be the same, and may correspond to either a
1796 value in the ``where`` dictionary or the name of a column to be
1797 updated.
1799 Returns
1800 -------
1801 count : `int`
1802 Number of rows matched (regardless of whether the update actually
1803 modified them).
1805 Raises
1806 ------
1807 ReadOnlyDatabaseError
1808 Raised if `isWriteable` returns `False` when this method is called.
1810 Notes
1811 -----
1812 May be used inside transaction contexts, so implementations may not
1813 perform operations that interrupt transactions.
1815 The default implementation should be sufficient for most derived
1816 classes.
1817 """
1818 self.assertTableWriteable(table, f"Cannot update read-only table {table}.")
1819 if not rows:
1820 return 0
1821 sql = table.update().where(
1822 sqlalchemy.sql.and_(*[table.columns[k] == sqlalchemy.sql.bindparam(v) for k, v in where.items()])
1823 )
1824 with self._transaction() as (_, connection):
1825 return connection.execute(sql, rows).rowcount
1827 @contextmanager
1828 def query(
1829 self,
1830 sql: sqlalchemy.sql.expression.Executable | sqlalchemy.sql.expression.SelectBase,
1831 *args: Any,
1832 **kwargs: Any,
1833 ) -> Iterator[sqlalchemy.engine.CursorResult]:
1834 """Run a SELECT query against the database.
1836 Parameters
1837 ----------
1838 sql : `sqlalchemy.sql.expression.SelectBase`
1839 A SQLAlchemy representation of a ``SELECT`` query.
1840 *args
1841 Additional positional arguments are forwarded to
1842 `sqlalchemy.engine.Connection.execute`.
1843 **kwargs
1844 Additional keyword arguments are forwarded to
1845 `sqlalchemy.engine.Connection.execute`.
1847 Returns
1848 -------
1849 result_context : `sqlalchemy.engine.CursorResults`
1850 Context manager that returns the query result object when entered.
1851 These results are invalidated when the context is exited.
1852 """
1853 if self._session_connection is None:
1854 connection = self._engine.connect()
1855 else:
1856 connection = self._session_connection
1857 try:
1858 self._log_query_and_plan(connection, sql)
1859 # TODO: SelectBase is not good for execute(), but it used
1860 # everywhere, e.g. in daf_relation. We should switch to Executable
1861 # at some point.
1862 with connection.execute(
1863 cast(sqlalchemy.sql.expression.Executable, sql), *args, **kwargs
1864 ) as result:
1865 yield result
1866 finally:
1867 if connection is not self._session_connection:
1868 connection.close()
1870 def _log_query_and_plan(
1871 self,
1872 connection: sqlalchemy.Connection,
1873 sql: sqlalchemy.sql.expression.Executable | sqlalchemy.sql.expression.SelectBase,
1874 ) -> None:
1875 """Log the given SQL statement and the DB's plan for executing it if
1876 the environment variable DAF_BUTLER_DEBUG_QUERIES is set to a truthy
1877 value.
1878 """
1879 if os.environ.get("DAF_BUTLER_DEBUG_QUERIES", False):
1880 assert isinstance(sql, sqlalchemy.SelectBase)
1881 compiled = sql.compile(connection)
1882 print(
1883 f"Executing SQL statement:\n{compiled}\nBind parameters: {compiled.params}", file=sys.stderr
1884 )
1886 query_plan = get_query_plan(connection, sql)
1887 print(f"Query plan:\n{query_plan}", file=sys.stderr)
1889 @abstractmethod
1890 def constant_rows(
1891 self,
1892 fields: NamedValueAbstractSet[ddl.FieldSpec],
1893 *rows: dict,
1894 name: str | None = None,
1895 ) -> sqlalchemy.sql.FromClause:
1896 """Return a SQLAlchemy object that represents a small number of
1897 constant-valued rows.
1899 Parameters
1900 ----------
1901 fields : `NamedValueAbstractSet` [ `ddl.FieldSpec` ]
1902 The columns of the rows. Unique and foreign key constraints are
1903 ignored.
1904 *rows : `dict`
1905 Values for the rows.
1906 name : `str`, optional
1907 If provided, the name of the SQL construct. If not provided, an
1908 opaque but unique identifier is generated.
1910 Returns
1911 -------
1912 from_clause : `sqlalchemy.sql.FromClause`
1913 SQLAlchemy object representing the given rows. This is guaranteed
1914 to be something that can be directly joined into a ``SELECT``
1915 query's ``FROM`` clause, and will not involve a temporary table
1916 that needs to be cleaned up later.
1918 Notes
1919 -----
1920 The default implementation uses the SQL-standard ``VALUES`` construct,
1921 but support for that construct is varied enough across popular RDBMSs
1922 that the method is still marked abstract to force explicit opt-in via
1923 delegation to `super`.
1924 """
1925 if name is None:
1926 name = f"tmp_{uuid.uuid4().hex}"
1927 return sqlalchemy.sql.values(
1928 *[sqlalchemy.Column(field.name, field.getSizedColumnType()) for field in fields],
1929 name=name,
1930 ).data([tuple(row[name] for name in fields.names) for row in rows])
1932 def get_constant_rows_max(self) -> int:
1933 """Return the maximum number of rows that should be passed to
1934 `constant_rows` for this backend.
1936 Returns
1937 -------
1938 max : `int`
1939 Maximum number of rows.
1941 Notes
1942 -----
1943 This should reflect typical performance profiles (or a guess at these),
1944 not just hard database engine limits.
1945 """
1946 return 1000
1948 @abstractmethod
1949 def glob_expression(
1950 self, expression: sqlalchemy.ColumnElement[Any], pattern: str
1951 ) -> sqlalchemy.ColumnElement[bool]:
1952 """Produce boolean expression for expression match against glob-like
1953 pattern.
1955 Parameters
1956 ----------
1957 expression : `sqlalchemy.ColumnElement`
1958 Column expression that evaluates to string.
1959 pattern : `str`
1960 GLob pattern string.
1962 Returns
1963 -------
1964 glob : `sqlalchemy.ColumnElement`
1965 Boolean expression that matches ``expression`` against ``pattern``.
1966 """
1967 raise NotImplementedError()
1969 @property
1970 @abstractmethod
1971 def has_distinct_on(self) -> bool:
1972 """Whether this database supports the ``DISTINCT ON`` SQL construct."""
1973 raise NotImplementedError()
1975 @property
1976 @abstractmethod
1977 def has_any_aggregate(self) -> bool:
1978 """Whether this database supports the ``ANY_VALUE`` aggregate function
1979 or something equivalent.
1980 """
1981 raise NotImplementedError()
1983 @property
1984 def supports_temporary_tables(self) -> bool:
1985 return self._allow_temporary_tables
1987 @abstractmethod
1988 def apply_any_aggregate(self, column: sqlalchemy.ColumnElement[Any]) -> sqlalchemy.ColumnElement[Any]:
1989 """Wrap the given SQLAlchemy column in the ``ANY_VALUE`` aggregate
1990 function or its equivalent.
1992 Parameters
1993 ----------
1994 column : `sqlalchemy.ColumnElement`
1995 Original column to wrap.
1997 Returns
1998 -------
1999 wrapped : `sqlalchemy.ColumnElement`
2000 A column element of the same SQL type that can appear in the
2001 ``SELECT`` clause even when this column does not appear in the
2002 ``GROUP BY`` clause.
2004 Notes
2005 -----
2006 This method's behavior is unspecified when `has_any_aggregate` is
2007 `False`; the caller is responsible for checking that property first.
2008 """
2009 raise NotImplementedError()
2011 origin: int
2012 """An integer ID that should be used as the default for any datasets,
2013 quanta, or other entities that use a (autoincrement, origin) compound
2014 primary key (`int`).
2015 """
2017 namespace: str | None
2018 """The schema or namespace this database instance is associated with
2019 (`str` or `None`).
2020 """
2022 name_shrinker: NameShrinker
2023 """An object that can be used to shrink field names to fit within the
2024 identifier limit of the database engine (`NameShrinker`).
2025 """
2028class DatabaseMetadata:
2029 """Wrapper around SqlAlchemy MetaData object to ensure threadsafety.
2031 Parameters
2032 ----------
2033 namespace : `str` or `None`
2034 Name of the schema or namespace this instance is associated with.
2036 Notes
2037 -----
2038 `sqlalchemy.MetaData` is documented to be threadsafe for reads, but not
2039 with concurrent modifications. We add tables dynamically at runtime,
2040 and the MetaData object is shared by all Database instances sharing
2041 the same connection pool.
2042 """
2044 def __init__(self, namespace: str | None) -> None:
2045 self._lock = Lock()
2046 self._metadata = sqlalchemy.MetaData(schema=namespace)
2047 self._tables: dict[str, sqlalchemy.Table] = {}
2049 def add_table(
2050 self, db: Database, name: str, spec: ddl.TableSpec, **kwargs: Any
2051 ) -> sqlalchemy.schema.Table:
2052 """Add a new table to the MetaData object, returning its sqlalchemy
2053 representation. This does not physically create the table in the
2054 database -- it only sets up its definition.
2056 Parameters
2057 ----------
2058 db : `Database`
2059 Database connection associated with the table definition.
2060 name : `str`
2061 The name of the table.
2062 spec : `ddl.TableSpec`
2063 The specification of the table.
2064 **kwargs
2065 Additional keyword arguments to forward to the
2066 `sqlalchemy.schema.Table` constructor.
2068 Returns
2069 -------
2070 table : `sqlalchemy.schema.Table`
2071 The created table.
2072 """
2073 with self._lock:
2074 if (table := self._tables.get(name)) is not None:
2075 return table
2077 table = db._convertTableSpec(name, spec, self._metadata, **kwargs)
2078 for foreignKeySpec in spec.foreignKeys:
2079 table.append_constraint(db._convertForeignKeySpec(name, foreignKeySpec, self._metadata))
2081 self._tables[name] = table
2082 return table
2084 def get_table(self, name: str) -> sqlalchemy.schema.Table | None:
2085 """Return the definition of a table that was previously added to this
2086 MetaData object.
2088 Parameters
2089 ----------
2090 name : `str`
2091 Name of the table.
2093 Returns
2094 -------
2095 table : `sqlalchemy.schema.Table` or `None`
2096 The table definition, or `None` if the table is not known to this
2097 MetaData instance.
2098 """
2099 with self._lock:
2100 return self._tables.get(name)
2102 def remove_table(self, name: str) -> None:
2103 """Remove a table that was previously added to this MetaData object.
2105 Parameters
2106 ----------
2107 name : `str`
2108 Name of the table.
2109 """
2110 with self._lock:
2111 table = self._tables.pop(name, None)
2112 if table is not None:
2113 self._metadata.remove(table)
2115 def create_all(self, connection: sqlalchemy.engine.Connection) -> None:
2116 """Create all tables known to this MetaData object in the database.
2117 Same as `sqlalchemy.MetaData.create_all`.
2119 Parameters
2120 ----------
2121 connection : `sqlalchemy.engine.connection`
2122 Database connection that will be used to create tables.
2123 """
2124 with self._lock:
2125 self._metadata.create_all(connection)