Coverage for python / lsst / daf / butler / ddl.py: 50%
231 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-24 08:17 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-24 08:17 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
27"""Classes for representing SQL data-definition language (DDL) in Python.
29This include "CREATE TABLE" etc.
31This provides an extra layer on top of SQLAlchemy's classes for these concepts,
32because we need a level of indirection between logical tables and the actual
33SQL, and SQLAlchemy's DDL classes always map 1-1 to SQL.
35We've opted for the rather more obscure "ddl" as the name of this module
36instead of "schema" because the latter is too overloaded; in most SQL
37databases, a "schema" is also another term for a namespace.
38"""
40from __future__ import annotations
42__all__ = (
43 "GUID",
44 "AstropyTimeNsecTai",
45 "Base64Bytes",
46 "Base64Region",
47 "FieldSpec",
48 "ForeignKeySpec",
49 "IndexSpec",
50 "TableSpec",
51)
53import logging
54import uuid
55from base64 import b64decode, b64encode
56from collections.abc import Callable, Iterable
57from dataclasses import dataclass
58from math import ceil
59from typing import TYPE_CHECKING, Any
61import astropy.time
62import sqlalchemy
63from sqlalchemy.dialects.postgresql import UUID
65from lsst.sphgeom import Region
66from lsst.utils.iteration import ensure_iterable
68from . import time_utils
69from ._config import Config
70from ._exceptions import ValidationError
71from ._named import NamedValueSet
72from .utils import stripIfNotNone
74if TYPE_CHECKING:
75 from .timespan_database_representation import TimespanDatabaseRepresentation
78_LOG = logging.getLogger(__name__)
81class SchemaValidationError(ValidationError):
82 """Exceptions that indicate problems in Registry schema configuration."""
84 @classmethod
85 def translate(cls, caught: type[Exception], message: str) -> Callable:
86 """Return decorator to re-raise exceptions as `SchemaValidationError`.
88 Decorated functions must be class or instance methods, with a
89 ``config`` parameter as their first argument. This will be passed
90 to ``message.format()`` as a keyword argument, along with ``err``,
91 the original exception.
93 Parameters
94 ----------
95 caught : `type` (`Exception` subclass)
96 The type of exception to catch.
97 message : `str`
98 A `str.format` string that may contain named placeholders for
99 ``config``, ``err``, or any keyword-only argument accepted by
100 the decorated function.
101 """
103 def decorate(func: Callable) -> Callable:
104 def decorated(self: Any, config: Config, *args: Any, **kwargs: Any) -> Any:
105 try:
106 return func(self, config, *args, **kwargs)
107 except caught as err:
108 raise cls(message.format(config=str(config), err=err)) from err
110 return decorated
112 return decorate
115class Base64Bytes(sqlalchemy.TypeDecorator):
116 """A SQLAlchemy custom type for Python `bytes`.
118 Maps Python `bytes` to a base64-encoded `sqlalchemy.Text` field.
120 Parameters
121 ----------
122 nbytes : `int` or `None`, optional
123 Number of bytes.
124 *args : `typing.Any`
125 Parameters passed to base class constructor.
126 **kwargs : `typing.Any`
127 Keyword parameters passed to base class constructor.
128 """
130 impl = sqlalchemy.Text
132 cache_ok = True
134 def __init__(self, nbytes: int | None = None, *args: Any, **kwargs: Any):
135 if nbytes is not None:
136 length = 4 * ceil(nbytes / 3) if self.impl is sqlalchemy.String else None
137 else:
138 length = None
139 super().__init__(*args, length=length, **kwargs)
140 self.nbytes = nbytes
142 def process_bind_param(self, value: bytes | None, dialect: sqlalchemy.engine.Dialect) -> str | None:
143 # 'value' is native `bytes`. We want to encode that to base64 `bytes`
144 # and then ASCII `str`, because `str` is what SQLAlchemy expects for
145 # String fields.
146 if value is None:
147 return None
148 if not isinstance(value, bytes):
149 raise TypeError(
150 f"Base64Bytes fields require 'bytes' values; got '{value}' with type {type(value)}."
151 )
152 return b64encode(value).decode("ascii")
154 def process_result_value(self, value: str | None, dialect: sqlalchemy.engine.Dialect) -> bytes | None:
155 # 'value' is a `str` that must be ASCII because it's base64-encoded.
156 # We want to transform that to base64-encoded `bytes` and then
157 # native `bytes`.
158 return b64decode(value.encode("ascii")) if value is not None else None
160 @property
161 def python_type(self) -> type[bytes]:
162 return bytes
165# create an alias, for use below to disambiguate between the built in
166# sqlachemy type
167LocalBase64Bytes = Base64Bytes
170class Base64Region(Base64Bytes):
171 """A SQLAlchemy custom type for Python `lsst.sphgeom.Region`.
173 Maps Python `lsst.sphgeom.Region` to a base64-encoded `sqlalchemy.String`.
174 """
176 cache_ok = True # have to be set explicitly in each class
178 def process_bind_param(self, value: Region | None, dialect: sqlalchemy.engine.Dialect) -> str | None:
179 if value is None:
180 return None
181 return super().process_bind_param(value.encode(), dialect)
183 def process_result_value(self, value: str | None, dialect: sqlalchemy.engine.Dialect) -> Region | None:
184 if value is None:
185 return None
186 return Region.decodeBase64(value)
188 @property
189 def python_type(self) -> type[Region]:
190 return Region
192 @classmethod
193 def union_aggregate(
194 cls, column: sqlalchemy.ColumnElement[Base64Region]
195 ) -> sqlalchemy.ColumnElement[Base64Region]:
196 """Return a SQLAlchemy aggregate expression that computes the union of
197 a set of unions.
199 Parameters
200 ----------
201 column : `sqlalchemy.ColumnElement`
202 SQLAlchemy column expression representing the regions to be
203 combined.
205 Returns
206 -------
207 union_column : `sqlalchemy.ColumnElement`
208 SQLAlchemy column expression representing the union.
209 """
210 return sqlalchemy.cast(sqlalchemy.func.aggregate_strings(column, ":"), type_=Base64Region)
213class AstropyTimeNsecTai(sqlalchemy.TypeDecorator):
214 """A SQLAlchemy custom type for Python `astropy.time.Time`.
216 Maps Python `astropy.time.Time` to a number of nanoseconds since Unix
217 epoch in TAI scale.
218 """
220 impl = sqlalchemy.BigInteger
222 cache_ok = True
224 def process_bind_param(
225 self, value: astropy.time.Time | None, dialect: sqlalchemy.engine.Dialect
226 ) -> int | None:
227 if value is None:
228 return None
229 if not isinstance(value, astropy.time.Time):
230 raise TypeError(f"Unsupported type: {type(value)}, expected astropy.time.Time")
231 value = time_utils.TimeConverter().astropy_to_nsec(value)
232 return value
234 def process_result_value(
235 self, value: int | None, dialect: sqlalchemy.engine.Dialect
236 ) -> astropy.time.Time | None:
237 # value is nanoseconds since epoch, or None
238 if value is None:
239 return None
240 value = time_utils.TimeConverter().nsec_to_astropy(value)
241 return value
244# TODO: sqlalchemy 2 has internal support for UUID:
245# https://docs.sqlalchemy.org/en/20/core/type_basics.html#sqlalchemy.types.Uuid
246class GUID(sqlalchemy.TypeDecorator):
247 """Platform-independent GUID type.
249 Uses PostgreSQL's UUID type, otherwise uses CHAR(32), storing as
250 stringified hex values.
251 """
253 impl = sqlalchemy.CHAR
255 cache_ok = True
257 def load_dialect_impl(self, dialect: sqlalchemy.Dialect) -> sqlalchemy.types.TypeEngine:
258 if dialect.name == "postgresql":
259 return dialect.type_descriptor(UUID())
260 else:
261 return dialect.type_descriptor(sqlalchemy.CHAR(32))
263 def process_bind_param(self, value: Any, dialect: sqlalchemy.Dialect) -> str | None:
264 if value is None:
265 return value
267 # Coerce input to UUID type, in general having UUID on input is the
268 # only thing that we want but there is code right now that uses ints.
269 if isinstance(value, int):
270 value = uuid.UUID(int=value)
271 elif isinstance(value, bytes):
272 value = uuid.UUID(bytes=value)
273 elif isinstance(value, str):
274 # hexstring
275 value = uuid.UUID(hex=value)
276 elif not isinstance(value, uuid.UUID):
277 raise TypeError(f"Unexpected type of a bind value: {type(value)}")
279 if dialect.name == "postgresql":
280 return str(value)
281 else:
282 return f"{value.int:032x}"
284 def process_result_value(
285 self, value: str | uuid.UUID | None, dialect: sqlalchemy.Dialect
286 ) -> uuid.UUID | None:
287 if value is None or isinstance(value, uuid.UUID):
288 # sqlalchemy 2 converts to UUID internally
289 return value
290 else:
291 return uuid.UUID(hex=value)
294VALID_CONFIG_COLUMN_TYPES = {
295 "string": sqlalchemy.String,
296 "int": sqlalchemy.BigInteger,
297 "float": sqlalchemy.Float,
298 "region": Base64Region,
299 "bool": sqlalchemy.Boolean,
300 "blob": sqlalchemy.LargeBinary,
301 "datetime": AstropyTimeNsecTai,
302 "hash": Base64Bytes,
303 "uuid": GUID,
304}
307@dataclass
308class FieldSpec:
309 """A data class for defining a column in a logical `Registry` table."""
311 name: str
312 """Name of the column."""
314 dtype: type
315 """Type of the column; usually a `type` subclass provided by SQLAlchemy
316 that defines both a Python type and a corresponding precise SQL type.
317 """
319 length: int | None = None
320 """Length of the type in the database, for variable-length types."""
322 nbytes: int | None = None
323 """Natural length used for hash and encoded-region columns, to be converted
324 into the post-encoding length.
325 """
327 primaryKey: bool = False
328 """Whether this field is (part of) its table's primary key."""
330 autoincrement: bool = False
331 """Whether the database should insert automatically incremented values when
332 no value is provided in an INSERT.
333 """
335 nullable: bool = True
336 """Whether this field is allowed to be NULL. If ``primaryKey`` is
337 `True`, during construction this value will be forced to `False`."""
339 default: Any = None
340 """A server-side default value for this field.
342 This is passed directly as the ``server_default`` argument to
343 `sqlalchemy.schema.Column`. It does _not_ go through SQLAlchemy's usual
344 type conversion or quoting for Python literals, and should hence be used
345 with care. See the SQLAlchemy documentation for more information.
346 """
348 doc: str | None = None
349 """Documentation for this field."""
351 def __post_init__(self) -> None:
352 if self.primaryKey:
353 # Change the default to match primaryKey.
354 self.nullable = False
356 def __eq__(self, other: Any) -> bool:
357 if isinstance(other, FieldSpec):
358 return self.name == other.name
359 else:
360 return NotImplemented
362 def __hash__(self) -> int:
363 return hash(self.name)
365 @classmethod
366 @SchemaValidationError.translate(KeyError, "Missing key {err} in column config '{config}'.")
367 def fromConfig(cls, config: Config, **kwargs: Any) -> FieldSpec:
368 """Create a `FieldSpec` from a subset of a `SchemaConfig`.
370 Parameters
371 ----------
372 config : `Config`
373 Configuration describing the column. Nested configuration keys
374 correspond to `FieldSpec` attributes.
375 **kwargs
376 Additional keyword arguments that provide defaults for values
377 not present in config.
379 Returns
380 -------
381 spec: `FieldSpec`
382 Specification structure for the column.
384 Raises
385 ------
386 SchemaValidationError
387 Raised if configuration keys are missing or have invalid values.
388 """
389 dtype = VALID_CONFIG_COLUMN_TYPES.get(config["type"])
390 if dtype is None:
391 raise SchemaValidationError(f"Invalid field type string: '{config['type']}'.")
392 if not config["name"].islower():
393 raise SchemaValidationError(f"Column name '{config['name']}' is not all lowercase.")
394 self = cls(name=config["name"], dtype=dtype, **kwargs)
395 self.length = config.get("length", self.length)
396 self.nbytes = config.get("nbytes", self.nbytes)
397 if self.length is not None and self.nbytes is not None:
398 raise SchemaValidationError(f"Both length and nbytes provided for field '{self.name}'.")
399 self.primaryKey = config.get("primaryKey", self.primaryKey)
400 self.autoincrement = config.get("autoincrement", self.autoincrement)
401 self.nullable = config.get("nullable", False if self.primaryKey else self.nullable)
402 self.doc = stripIfNotNone(config.get("doc", None))
403 return self
405 @classmethod
406 def for_region(cls, name: str = "region", nullable: bool = True, nbytes: int = 2048) -> FieldSpec:
407 """Create a `FieldSpec` for a spatial region column.
409 Parameters
410 ----------
411 name : `str`, optional
412 Name for the field.
413 nullable : `bool`, optional
414 Whether NULL values are permitted.
415 nbytes : `int`, optional
416 Maximum number of bytes for serialized regions. The actual column
417 size will be larger to allow for base-64 encoding.
419 Returns
420 -------
421 spec : `FieldSpec`
422 Specification structure for a region column.
423 """
424 return cls(name, nullable=nullable, dtype=Base64Region, nbytes=nbytes)
426 def isStringType(self) -> bool:
427 """Indicate that this is a sqlalchemy.String field spec.
429 Returns
430 -------
431 isString : `bool`
432 The field refers to a `sqlalchemy.String` and not any other type.
433 This can return `False` even if the object was created with a
434 string type if it has been decided that it should be implemented
435 as a `sqlalchemy.Text` type.
436 """
437 # For short strings retain them as strings
438 if self.dtype is sqlalchemy.String and self.length and self.length <= 32:
439 return True
440 return False
442 def getSizedColumnType(self) -> sqlalchemy.types.TypeEngine | type:
443 """Return a sized version of the column type.
445 Utilizes either (or neither) of ``self.length`` and ``self.nbytes``.
447 Returns
448 -------
449 dtype : `sqlalchemy.types.TypeEngine`
450 A SQLAlchemy column type object.
451 """
452 if self.length is not None:
453 # Last chance check that we are only looking at possible String
454 if self.dtype is sqlalchemy.String and not self.isStringType():
455 return sqlalchemy.Text
456 return self.dtype(length=self.length)
457 if self.nbytes is not None:
458 return self.dtype(nbytes=self.nbytes)
459 return self.dtype
461 def getPythonType(self) -> type:
462 """Return the Python type associated with this field's (SQL) dtype.
464 Returns
465 -------
466 type : `type`
467 Python type associated with this field's (SQL) `dtype`.
468 """
469 # to construct these objects, nbytes keyword is needed
470 if issubclass(self.dtype, LocalBase64Bytes):
471 # satisfy mypy for something that must be true
472 assert self.nbytes is not None
473 return self.dtype(nbytes=self.nbytes).python_type
474 else:
475 return self.dtype().python_type # type: ignore
478@dataclass
479class ForeignKeySpec:
480 """Definition of a foreign key constraint in a logical `Registry` table."""
482 table: str
483 """Name of the target table."""
485 source: tuple[str, ...]
486 """Tuple of source table column names."""
488 target: tuple[str, ...]
489 """Tuple of target table column names."""
491 onDelete: str | None = None
492 """SQL clause indicating how to handle deletes to the target table.
494 If not `None` (which indicates that a constraint violation exception should
495 be raised), should be either "SET NULL" or "CASCADE".
496 """
498 addIndex: bool = True
499 """If `True`, create an index on the columns of this foreign key in the
500 source table.
501 """
503 @classmethod
504 @SchemaValidationError.translate(KeyError, "Missing key {err} in foreignKey config '{config}'.")
505 def fromConfig(cls, config: Config) -> ForeignKeySpec:
506 """Create a `ForeignKeySpec` from a subset of a `SchemaConfig`.
508 Parameters
509 ----------
510 config : `Config`
511 Configuration describing the constraint. Nested configuration keys
512 correspond to `ForeignKeySpec` attributes.
514 Returns
515 -------
516 spec: `ForeignKeySpec`
517 Specification structure for the constraint.
519 Raises
520 ------
521 SchemaValidationError
522 Raised if configuration keys are missing or have invalid values.
523 """
524 return cls(
525 table=config["table"],
526 source=tuple(ensure_iterable(config["source"])),
527 target=tuple(ensure_iterable(config["target"])),
528 onDelete=config.get("onDelete", None),
529 )
532@dataclass(frozen=True)
533class IndexSpec:
534 """Specification of an index on table columns.
536 Parameters
537 ----------
538 *columns : `str`
539 Names of the columns to index.
540 **kwargs : `typing.Any`
541 Additional keyword arguments to pass directly to
542 `sqlalchemy.schema.Index` constructor. This could be used to provide
543 backend-specific options, e.g. to create a ``GIST`` index in PostgreSQL
544 one can pass ``postgresql_using="gist"``.
545 """
547 def __init__(self, *columns: str, **kwargs: Any):
548 object.__setattr__(self, "columns", tuple(columns))
549 object.__setattr__(self, "kwargs", kwargs)
551 def __hash__(self) -> int:
552 return hash(self.columns)
554 columns: tuple[str, ...]
555 """Column names to include in the index (`Tuple` [ `str` ])."""
557 kwargs: dict[str, Any]
558 """Additional keyword arguments passed directly to
559 `sqlalchemy.schema.Index` constructor (`dict` [ `str`, `typing.Any` ]).
560 """
563@dataclass
564class TableSpec:
565 """A data class used to define a table or table-like query interface.
567 Parameters
568 ----------
569 fields : `~collections.abc.Iterable` [ `FieldSpec` ]
570 Specifications for the columns in this table.
571 unique : `~collections.abc.Iterable` [ `tuple` [ `str` ] ], optional
572 Non-primary-key unique constraints for the table.
573 indexes : `~collections.abc.Iterable` [ `IndexSpec` ], optional
574 Indexes for the table.
575 foreignKeys : `~collections.abc.Iterable` [ `ForeignKeySpec` ], optional
576 Foreign key constraints for the table.
577 exclusion : `~collections.abc.Iterable` [ `tuple` [ `str` or `type` ] ]
578 Special constraints that prohibit overlaps between timespans over rows
579 where other columns are equal. These take the same form as unique
580 constraints, but each tuple may contain a single
581 `TimespanDatabaseRepresentation` subclass representing a timespan
582 column.
583 recycleIds : `bool`, optional
584 If `True`, allow databases that might normally recycle autoincrement
585 IDs to do so (usually better for performance) on any autoincrement
586 field in this table.
587 doc : `str`, optional
588 Documentation for the table.
589 """
591 def __init__(
592 self,
593 fields: Iterable[FieldSpec],
594 *,
595 unique: Iterable[tuple[str, ...]] = (),
596 indexes: Iterable[IndexSpec] = (),
597 foreignKeys: Iterable[ForeignKeySpec] = (),
598 exclusion: Iterable[tuple[str | type[TimespanDatabaseRepresentation], ...]] = (),
599 recycleIds: bool = True,
600 doc: str | None = None,
601 ):
602 self.fields = NamedValueSet(fields)
603 self.unique = set(unique)
604 self.indexes = set(indexes)
605 self.foreignKeys = list(foreignKeys)
606 self.exclusion = set(exclusion)
607 self.recycleIds = recycleIds
608 self.doc = doc
610 fields: NamedValueSet[FieldSpec]
611 """Specifications for the columns in this table."""
613 unique: set[tuple[str, ...]]
614 """Non-primary-key unique constraints for the table."""
616 indexes: set[IndexSpec]
617 """Indexes for the table."""
619 foreignKeys: list[ForeignKeySpec]
620 """Foreign key constraints for the table."""
622 exclusion: set[tuple[str | type[TimespanDatabaseRepresentation], ...]]
623 """Exclusion constraints for the table.
625 Exclusion constraints behave mostly like unique constraints, but may
626 contain a database-native Timespan column that is restricted to not overlap
627 across rows (for identical combinations of any non-Timespan columns in the
628 constraint).
629 """
631 recycleIds: bool = True
632 """If `True`, allow databases that might normally recycle autoincrement IDs
633 to do so (usually better for performance) on any autoincrement field in
634 this table.
635 """
637 doc: str | None = None
638 """Documentation for the table."""
640 @classmethod
641 @SchemaValidationError.translate(KeyError, "Missing key {err} in table config '{config}'.")
642 def fromConfig(cls, config: Config) -> TableSpec:
643 """Create a `ForeignKeySpec` from a subset of a `SchemaConfig`.
645 Parameters
646 ----------
647 config : `Config`
648 Configuration describing the constraint. Nested configuration keys
649 correspond to `TableSpec` attributes.
651 Returns
652 -------
653 spec: `TableSpec`
654 Specification structure for the table.
656 Raises
657 ------
658 SchemaValidationError
659 Raised if configuration keys are missing or have invalid values.
660 """
661 return cls(
662 fields=NamedValueSet(FieldSpec.fromConfig(c) for c in config["columns"]),
663 unique={tuple(u) for u in config.get("unique", ())},
664 foreignKeys=[ForeignKeySpec.fromConfig(c) for c in config.get("foreignKeys", ())],
665 doc=stripIfNotNone(config.get("doc")),
666 )