Coverage for python / lsst / daf / butler / ddl.py: 50%

231 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-06 08:30 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27"""Classes for representing SQL data-definition language (DDL) in Python. 

28 

29This include "CREATE TABLE" etc. 

30 

31This provides an extra layer on top of SQLAlchemy's classes for these concepts, 

32because we need a level of indirection between logical tables and the actual 

33SQL, and SQLAlchemy's DDL classes always map 1-1 to SQL. 

34 

35We've opted for the rather more obscure "ddl" as the name of this module 

36instead of "schema" because the latter is too overloaded; in most SQL 

37databases, a "schema" is also another term for a namespace. 

38""" 

39 

40from __future__ import annotations 

41 

42__all__ = ( 

43 "GUID", 

44 "AstropyTimeNsecTai", 

45 "Base64Bytes", 

46 "Base64Region", 

47 "FieldSpec", 

48 "ForeignKeySpec", 

49 "IndexSpec", 

50 "TableSpec", 

51) 

52 

53import logging 

54import uuid 

55from base64 import b64decode, b64encode 

56from collections.abc import Callable, Iterable 

57from dataclasses import dataclass 

58from math import ceil 

59from typing import TYPE_CHECKING, Any 

60 

61import astropy.time 

62import sqlalchemy 

63from sqlalchemy.dialects.postgresql import UUID 

64 

65from lsst.sphgeom import Region 

66from lsst.utils.iteration import ensure_iterable 

67 

68from . import time_utils 

69from ._config import Config 

70from ._exceptions import ValidationError 

71from ._named import NamedValueSet 

72from .utils import stripIfNotNone 

73 

74if TYPE_CHECKING: 

75 from .timespan_database_representation import TimespanDatabaseRepresentation 

76 

77 

78_LOG = logging.getLogger(__name__) 

79 

80 

81class SchemaValidationError(ValidationError): 

82 """Exceptions that indicate problems in Registry schema configuration.""" 

83 

84 @classmethod 

85 def translate(cls, caught: type[Exception], message: str) -> Callable: 

86 """Return decorator to re-raise exceptions as `SchemaValidationError`. 

87 

88 Decorated functions must be class or instance methods, with a 

89 ``config`` parameter as their first argument. This will be passed 

90 to ``message.format()`` as a keyword argument, along with ``err``, 

91 the original exception. 

92 

93 Parameters 

94 ---------- 

95 caught : `type` (`Exception` subclass) 

96 The type of exception to catch. 

97 message : `str` 

98 A `str.format` string that may contain named placeholders for 

99 ``config``, ``err``, or any keyword-only argument accepted by 

100 the decorated function. 

101 """ 

102 

103 def decorate(func: Callable) -> Callable: 

104 def decorated(self: Any, config: Config, *args: Any, **kwargs: Any) -> Any: 

105 try: 

106 return func(self, config, *args, **kwargs) 

107 except caught as err: 

108 raise cls(message.format(config=str(config), err=err)) from err 

109 

110 return decorated 

111 

112 return decorate 

113 

114 

115class Base64Bytes(sqlalchemy.TypeDecorator): 

116 """A SQLAlchemy custom type for Python `bytes`. 

117 

118 Maps Python `bytes` to a base64-encoded `sqlalchemy.Text` field. 

119 

120 Parameters 

121 ---------- 

122 nbytes : `int` or `None`, optional 

123 Number of bytes. 

124 *args : `typing.Any` 

125 Parameters passed to base class constructor. 

126 **kwargs : `typing.Any` 

127 Keyword parameters passed to base class constructor. 

128 """ 

129 

130 impl = sqlalchemy.Text 

131 

132 cache_ok = True 

133 

134 def __init__(self, nbytes: int | None = None, *args: Any, **kwargs: Any): 

135 if nbytes is not None: 

136 length = 4 * ceil(nbytes / 3) if self.impl is sqlalchemy.String else None 

137 else: 

138 length = None 

139 super().__init__(*args, length=length, **kwargs) 

140 self.nbytes = nbytes 

141 

142 def process_bind_param(self, value: bytes | None, dialect: sqlalchemy.engine.Dialect) -> str | None: 

143 # 'value' is native `bytes`. We want to encode that to base64 `bytes` 

144 # and then ASCII `str`, because `str` is what SQLAlchemy expects for 

145 # String fields. 

146 if value is None: 

147 return None 

148 if not isinstance(value, bytes): 

149 raise TypeError( 

150 f"Base64Bytes fields require 'bytes' values; got '{value}' with type {type(value)}." 

151 ) 

152 return b64encode(value).decode("ascii") 

153 

154 def process_result_value(self, value: str | None, dialect: sqlalchemy.engine.Dialect) -> bytes | None: 

155 # 'value' is a `str` that must be ASCII because it's base64-encoded. 

156 # We want to transform that to base64-encoded `bytes` and then 

157 # native `bytes`. 

158 return b64decode(value.encode("ascii")) if value is not None else None 

159 

160 @property 

161 def python_type(self) -> type[bytes]: 

162 return bytes 

163 

164 

165# create an alias, for use below to disambiguate between the built in 

166# sqlachemy type 

167LocalBase64Bytes = Base64Bytes 

168 

169 

170class Base64Region(Base64Bytes): 

171 """A SQLAlchemy custom type for Python `lsst.sphgeom.Region`. 

172 

173 Maps Python `lsst.sphgeom.Region` to a base64-encoded `sqlalchemy.String`. 

174 """ 

175 

176 cache_ok = True # have to be set explicitly in each class 

177 

178 def process_bind_param(self, value: Region | None, dialect: sqlalchemy.engine.Dialect) -> str | None: 

179 if value is None: 

180 return None 

181 return super().process_bind_param(value.encode(), dialect) 

182 

183 def process_result_value(self, value: str | None, dialect: sqlalchemy.engine.Dialect) -> Region | None: 

184 if value is None: 

185 return None 

186 return Region.decodeBase64(value) 

187 

188 @property 

189 def python_type(self) -> type[Region]: 

190 return Region 

191 

192 @classmethod 

193 def union_aggregate( 

194 cls, column: sqlalchemy.ColumnElement[Base64Region] 

195 ) -> sqlalchemy.ColumnElement[Base64Region]: 

196 """Return a SQLAlchemy aggregate expression that computes the union of 

197 a set of unions. 

198 

199 Parameters 

200 ---------- 

201 column : `sqlalchemy.ColumnElement` 

202 SQLAlchemy column expression representing the regions to be 

203 combined. 

204 

205 Returns 

206 ------- 

207 union_column : `sqlalchemy.ColumnElement` 

208 SQLAlchemy column expression representing the union. 

209 """ 

210 return sqlalchemy.cast(sqlalchemy.func.aggregate_strings(column, ":"), type_=Base64Region) 

211 

212 

213class AstropyTimeNsecTai(sqlalchemy.TypeDecorator): 

214 """A SQLAlchemy custom type for Python `astropy.time.Time`. 

215 

216 Maps Python `astropy.time.Time` to a number of nanoseconds since Unix 

217 epoch in TAI scale. 

218 """ 

219 

220 impl = sqlalchemy.BigInteger 

221 

222 cache_ok = True 

223 

224 def process_bind_param( 

225 self, value: astropy.time.Time | None, dialect: sqlalchemy.engine.Dialect 

226 ) -> int | None: 

227 if value is None: 

228 return None 

229 if not isinstance(value, astropy.time.Time): 

230 raise TypeError(f"Unsupported type: {type(value)}, expected astropy.time.Time") 

231 value = time_utils.TimeConverter().astropy_to_nsec(value) 

232 return value 

233 

234 def process_result_value( 

235 self, value: int | None, dialect: sqlalchemy.engine.Dialect 

236 ) -> astropy.time.Time | None: 

237 # value is nanoseconds since epoch, or None 

238 if value is None: 

239 return None 

240 value = time_utils.TimeConverter().nsec_to_astropy(value) 

241 return value 

242 

243 

244# TODO: sqlalchemy 2 has internal support for UUID: 

245# https://docs.sqlalchemy.org/en/20/core/type_basics.html#sqlalchemy.types.Uuid 

246class GUID(sqlalchemy.TypeDecorator): 

247 """Platform-independent GUID type. 

248 

249 Uses PostgreSQL's UUID type, otherwise uses CHAR(32), storing as 

250 stringified hex values. 

251 """ 

252 

253 impl = sqlalchemy.CHAR 

254 

255 cache_ok = True 

256 

257 def load_dialect_impl(self, dialect: sqlalchemy.Dialect) -> sqlalchemy.types.TypeEngine: 

258 if dialect.name == "postgresql": 

259 return dialect.type_descriptor(UUID()) 

260 else: 

261 return dialect.type_descriptor(sqlalchemy.CHAR(32)) 

262 

263 def process_bind_param(self, value: Any, dialect: sqlalchemy.Dialect) -> str | None: 

264 if value is None: 

265 return value 

266 

267 # Coerce input to UUID type, in general having UUID on input is the 

268 # only thing that we want but there is code right now that uses ints. 

269 if isinstance(value, int): 

270 value = uuid.UUID(int=value) 

271 elif isinstance(value, bytes): 

272 value = uuid.UUID(bytes=value) 

273 elif isinstance(value, str): 

274 # hexstring 

275 value = uuid.UUID(hex=value) 

276 elif not isinstance(value, uuid.UUID): 

277 raise TypeError(f"Unexpected type of a bind value: {type(value)}") 

278 

279 if dialect.name == "postgresql": 

280 return str(value) 

281 else: 

282 return f"{value.int:032x}" 

283 

284 def process_result_value( 

285 self, value: str | uuid.UUID | None, dialect: sqlalchemy.Dialect 

286 ) -> uuid.UUID | None: 

287 if value is None or isinstance(value, uuid.UUID): 

288 # sqlalchemy 2 converts to UUID internally 

289 return value 

290 else: 

291 return uuid.UUID(hex=value) 

292 

293 

294VALID_CONFIG_COLUMN_TYPES = { 

295 "string": sqlalchemy.String, 

296 "int": sqlalchemy.BigInteger, 

297 "float": sqlalchemy.Float, 

298 "region": Base64Region, 

299 "bool": sqlalchemy.Boolean, 

300 "blob": sqlalchemy.LargeBinary, 

301 "datetime": AstropyTimeNsecTai, 

302 "hash": Base64Bytes, 

303 "uuid": GUID, 

304} 

305 

306 

307@dataclass 

308class FieldSpec: 

309 """A data class for defining a column in a logical `Registry` table.""" 

310 

311 name: str 

312 """Name of the column.""" 

313 

314 dtype: type 

315 """Type of the column; usually a `type` subclass provided by SQLAlchemy 

316 that defines both a Python type and a corresponding precise SQL type. 

317 """ 

318 

319 length: int | None = None 

320 """Length of the type in the database, for variable-length types.""" 

321 

322 nbytes: int | None = None 

323 """Natural length used for hash and encoded-region columns, to be converted 

324 into the post-encoding length. 

325 """ 

326 

327 primaryKey: bool = False 

328 """Whether this field is (part of) its table's primary key.""" 

329 

330 autoincrement: bool = False 

331 """Whether the database should insert automatically incremented values when 

332 no value is provided in an INSERT. 

333 """ 

334 

335 nullable: bool = True 

336 """Whether this field is allowed to be NULL. If ``primaryKey`` is 

337 `True`, during construction this value will be forced to `False`.""" 

338 

339 default: Any = None 

340 """A server-side default value for this field. 

341 

342 This is passed directly as the ``server_default`` argument to 

343 `sqlalchemy.schema.Column`. It does _not_ go through SQLAlchemy's usual 

344 type conversion or quoting for Python literals, and should hence be used 

345 with care. See the SQLAlchemy documentation for more information. 

346 """ 

347 

348 doc: str | None = None 

349 """Documentation for this field.""" 

350 

351 def __post_init__(self) -> None: 

352 if self.primaryKey: 

353 # Change the default to match primaryKey. 

354 self.nullable = False 

355 

356 def __eq__(self, other: Any) -> bool: 

357 if isinstance(other, FieldSpec): 

358 return self.name == other.name 

359 else: 

360 return NotImplemented 

361 

362 def __hash__(self) -> int: 

363 return hash(self.name) 

364 

365 @classmethod 

366 @SchemaValidationError.translate(KeyError, "Missing key {err} in column config '{config}'.") 

367 def fromConfig(cls, config: Config, **kwargs: Any) -> FieldSpec: 

368 """Create a `FieldSpec` from a subset of a `SchemaConfig`. 

369 

370 Parameters 

371 ---------- 

372 config : `Config` 

373 Configuration describing the column. Nested configuration keys 

374 correspond to `FieldSpec` attributes. 

375 **kwargs 

376 Additional keyword arguments that provide defaults for values 

377 not present in config. 

378 

379 Returns 

380 ------- 

381 spec: `FieldSpec` 

382 Specification structure for the column. 

383 

384 Raises 

385 ------ 

386 SchemaValidationError 

387 Raised if configuration keys are missing or have invalid values. 

388 """ 

389 dtype = VALID_CONFIG_COLUMN_TYPES.get(config["type"]) 

390 if dtype is None: 

391 raise SchemaValidationError(f"Invalid field type string: '{config['type']}'.") 

392 if not config["name"].islower(): 

393 raise SchemaValidationError(f"Column name '{config['name']}' is not all lowercase.") 

394 self = cls(name=config["name"], dtype=dtype, **kwargs) 

395 self.length = config.get("length", self.length) 

396 self.nbytes = config.get("nbytes", self.nbytes) 

397 if self.length is not None and self.nbytes is not None: 

398 raise SchemaValidationError(f"Both length and nbytes provided for field '{self.name}'.") 

399 self.primaryKey = config.get("primaryKey", self.primaryKey) 

400 self.autoincrement = config.get("autoincrement", self.autoincrement) 

401 self.nullable = config.get("nullable", False if self.primaryKey else self.nullable) 

402 self.doc = stripIfNotNone(config.get("doc", None)) 

403 return self 

404 

405 @classmethod 

406 def for_region(cls, name: str = "region", nullable: bool = True, nbytes: int = 2048) -> FieldSpec: 

407 """Create a `FieldSpec` for a spatial region column. 

408 

409 Parameters 

410 ---------- 

411 name : `str`, optional 

412 Name for the field. 

413 nullable : `bool`, optional 

414 Whether NULL values are permitted. 

415 nbytes : `int`, optional 

416 Maximum number of bytes for serialized regions. The actual column 

417 size will be larger to allow for base-64 encoding. 

418 

419 Returns 

420 ------- 

421 spec : `FieldSpec` 

422 Specification structure for a region column. 

423 """ 

424 return cls(name, nullable=nullable, dtype=Base64Region, nbytes=nbytes) 

425 

426 def isStringType(self) -> bool: 

427 """Indicate that this is a sqlalchemy.String field spec. 

428 

429 Returns 

430 ------- 

431 isString : `bool` 

432 The field refers to a `sqlalchemy.String` and not any other type. 

433 This can return `False` even if the object was created with a 

434 string type if it has been decided that it should be implemented 

435 as a `sqlalchemy.Text` type. 

436 """ 

437 # For short strings retain them as strings 

438 if self.dtype is sqlalchemy.String and self.length and self.length <= 32: 

439 return True 

440 return False 

441 

442 def getSizedColumnType(self) -> sqlalchemy.types.TypeEngine | type: 

443 """Return a sized version of the column type. 

444 

445 Utilizes either (or neither) of ``self.length`` and ``self.nbytes``. 

446 

447 Returns 

448 ------- 

449 dtype : `sqlalchemy.types.TypeEngine` 

450 A SQLAlchemy column type object. 

451 """ 

452 if self.length is not None: 

453 # Last chance check that we are only looking at possible String 

454 if self.dtype is sqlalchemy.String and not self.isStringType(): 

455 return sqlalchemy.Text 

456 return self.dtype(length=self.length) 

457 if self.nbytes is not None: 

458 return self.dtype(nbytes=self.nbytes) 

459 return self.dtype 

460 

461 def getPythonType(self) -> type: 

462 """Return the Python type associated with this field's (SQL) dtype. 

463 

464 Returns 

465 ------- 

466 type : `type` 

467 Python type associated with this field's (SQL) `dtype`. 

468 """ 

469 # to construct these objects, nbytes keyword is needed 

470 if issubclass(self.dtype, LocalBase64Bytes): 

471 # satisfy mypy for something that must be true 

472 assert self.nbytes is not None 

473 return self.dtype(nbytes=self.nbytes).python_type 

474 else: 

475 return self.dtype().python_type # type: ignore 

476 

477 

478@dataclass 

479class ForeignKeySpec: 

480 """Definition of a foreign key constraint in a logical `Registry` table.""" 

481 

482 table: str 

483 """Name of the target table.""" 

484 

485 source: tuple[str, ...] 

486 """Tuple of source table column names.""" 

487 

488 target: tuple[str, ...] 

489 """Tuple of target table column names.""" 

490 

491 onDelete: str | None = None 

492 """SQL clause indicating how to handle deletes to the target table. 

493 

494 If not `None` (which indicates that a constraint violation exception should 

495 be raised), should be either "SET NULL" or "CASCADE". 

496 """ 

497 

498 addIndex: bool = True 

499 """If `True`, create an index on the columns of this foreign key in the 

500 source table. 

501 """ 

502 

503 @classmethod 

504 @SchemaValidationError.translate(KeyError, "Missing key {err} in foreignKey config '{config}'.") 

505 def fromConfig(cls, config: Config) -> ForeignKeySpec: 

506 """Create a `ForeignKeySpec` from a subset of a `SchemaConfig`. 

507 

508 Parameters 

509 ---------- 

510 config : `Config` 

511 Configuration describing the constraint. Nested configuration keys 

512 correspond to `ForeignKeySpec` attributes. 

513 

514 Returns 

515 ------- 

516 spec: `ForeignKeySpec` 

517 Specification structure for the constraint. 

518 

519 Raises 

520 ------ 

521 SchemaValidationError 

522 Raised if configuration keys are missing or have invalid values. 

523 """ 

524 return cls( 

525 table=config["table"], 

526 source=tuple(ensure_iterable(config["source"])), 

527 target=tuple(ensure_iterable(config["target"])), 

528 onDelete=config.get("onDelete", None), 

529 ) 

530 

531 

532@dataclass(frozen=True) 

533class IndexSpec: 

534 """Specification of an index on table columns. 

535 

536 Parameters 

537 ---------- 

538 *columns : `str` 

539 Names of the columns to index. 

540 **kwargs : `typing.Any` 

541 Additional keyword arguments to pass directly to 

542 `sqlalchemy.schema.Index` constructor. This could be used to provide 

543 backend-specific options, e.g. to create a ``GIST`` index in PostgreSQL 

544 one can pass ``postgresql_using="gist"``. 

545 """ 

546 

547 def __init__(self, *columns: str, **kwargs: Any): 

548 object.__setattr__(self, "columns", tuple(columns)) 

549 object.__setattr__(self, "kwargs", kwargs) 

550 

551 def __hash__(self) -> int: 

552 return hash(self.columns) 

553 

554 columns: tuple[str, ...] 

555 """Column names to include in the index (`Tuple` [ `str` ]).""" 

556 

557 kwargs: dict[str, Any] 

558 """Additional keyword arguments passed directly to 

559 `sqlalchemy.schema.Index` constructor (`dict` [ `str`, `typing.Any` ]). 

560 """ 

561 

562 

563@dataclass 

564class TableSpec: 

565 """A data class used to define a table or table-like query interface. 

566 

567 Parameters 

568 ---------- 

569 fields : `~collections.abc.Iterable` [ `FieldSpec` ] 

570 Specifications for the columns in this table. 

571 unique : `~collections.abc.Iterable` [ `tuple` [ `str` ] ], optional 

572 Non-primary-key unique constraints for the table. 

573 indexes : `~collections.abc.Iterable` [ `IndexSpec` ], optional 

574 Indexes for the table. 

575 foreignKeys : `~collections.abc.Iterable` [ `ForeignKeySpec` ], optional 

576 Foreign key constraints for the table. 

577 exclusion : `~collections.abc.Iterable` [ `tuple` [ `str` or `type` ] ] 

578 Special constraints that prohibit overlaps between timespans over rows 

579 where other columns are equal. These take the same form as unique 

580 constraints, but each tuple may contain a single 

581 `TimespanDatabaseRepresentation` subclass representing a timespan 

582 column. 

583 recycleIds : `bool`, optional 

584 If `True`, allow databases that might normally recycle autoincrement 

585 IDs to do so (usually better for performance) on any autoincrement 

586 field in this table. 

587 doc : `str`, optional 

588 Documentation for the table. 

589 """ 

590 

591 def __init__( 

592 self, 

593 fields: Iterable[FieldSpec], 

594 *, 

595 unique: Iterable[tuple[str, ...]] = (), 

596 indexes: Iterable[IndexSpec] = (), 

597 foreignKeys: Iterable[ForeignKeySpec] = (), 

598 exclusion: Iterable[tuple[str | type[TimespanDatabaseRepresentation], ...]] = (), 

599 recycleIds: bool = True, 

600 doc: str | None = None, 

601 ): 

602 self.fields = NamedValueSet(fields) 

603 self.unique = set(unique) 

604 self.indexes = set(indexes) 

605 self.foreignKeys = list(foreignKeys) 

606 self.exclusion = set(exclusion) 

607 self.recycleIds = recycleIds 

608 self.doc = doc 

609 

610 fields: NamedValueSet[FieldSpec] 

611 """Specifications for the columns in this table.""" 

612 

613 unique: set[tuple[str, ...]] 

614 """Non-primary-key unique constraints for the table.""" 

615 

616 indexes: set[IndexSpec] 

617 """Indexes for the table.""" 

618 

619 foreignKeys: list[ForeignKeySpec] 

620 """Foreign key constraints for the table.""" 

621 

622 exclusion: set[tuple[str | type[TimespanDatabaseRepresentation], ...]] 

623 """Exclusion constraints for the table. 

624 

625 Exclusion constraints behave mostly like unique constraints, but may 

626 contain a database-native Timespan column that is restricted to not overlap 

627 across rows (for identical combinations of any non-Timespan columns in the 

628 constraint). 

629 """ 

630 

631 recycleIds: bool = True 

632 """If `True`, allow databases that might normally recycle autoincrement IDs 

633 to do so (usually better for performance) on any autoincrement field in 

634 this table. 

635 """ 

636 

637 doc: str | None = None 

638 """Documentation for the table.""" 

639 

640 @classmethod 

641 @SchemaValidationError.translate(KeyError, "Missing key {err} in table config '{config}'.") 

642 def fromConfig(cls, config: Config) -> TableSpec: 

643 """Create a `ForeignKeySpec` from a subset of a `SchemaConfig`. 

644 

645 Parameters 

646 ---------- 

647 config : `Config` 

648 Configuration describing the constraint. Nested configuration keys 

649 correspond to `TableSpec` attributes. 

650 

651 Returns 

652 ------- 

653 spec: `TableSpec` 

654 Specification structure for the table. 

655 

656 Raises 

657 ------ 

658 SchemaValidationError 

659 Raised if configuration keys are missing or have invalid values. 

660 """ 

661 return cls( 

662 fields=NamedValueSet(FieldSpec.fromConfig(c) for c in config["columns"]), 

663 unique={tuple(u) for u in config.get("unique", ())}, 

664 foreignKeys=[ForeignKeySpec.fromConfig(c) for c in config.get("foreignKeys", ())], 

665 doc=stripIfNotNone(config.get("doc")), 

666 )