Coverage for python / lsst / daf / butler / registry / databases / postgresql.py: 24%

252 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-17 08:48 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27from __future__ import annotations 

28 

29from sqlalchemy.sql.expression import ColumnElement as ColumnElement 

30 

31from ... import ddl, time_utils 

32 

33__all__ = ["PostgresqlDatabase"] 

34 

35import re 

36from collections.abc import Callable, Iterable, Iterator, Mapping 

37from contextlib import closing, contextmanager 

38from typing import Any 

39 

40import psycopg2 

41import sqlalchemy 

42import sqlalchemy.dialects.postgresql 

43from sqlalchemy import sql 

44 

45from ..._named import NamedValueAbstractSet 

46from ..._timespan import Timespan 

47from ...timespan_database_representation import TimespanDatabaseRepresentation 

48from ..interfaces import Database, DatabaseMetadata 

49 

50 

51class PostgresqlDatabase(Database): 

52 """An implementation of the `Database` interface for PostgreSQL. 

53 

54 Parameters 

55 ---------- 

56 engine : `sqlalchemy.engine.Engine` 

57 Engine to use for this connection. 

58 origin : `int` 

59 An integer ID that should be used as the default for any datasets, 

60 quanta, or other entities that use a (autoincrement, origin) compound 

61 primary key. 

62 namespace : `str`, optional 

63 The namespace (schema) this database is associated with. If `None`, 

64 the default schema for the connection is used (which may be `None`). 

65 writeable : `bool`, optional 

66 If `True`, allow write operations on the database, including 

67 ``CREATE TABLE``. 

68 allow_temporary_tables : `bool`, optional 

69 If `True`, database operations will be allowed to use temporary tables. 

70 If `False`, other SQL constructs will be used instead of temporary 

71 tables when possible. 

72 

73 Notes 

74 ----- 

75 This currently requires the psycopg2 driver to be used as the backend for 

76 SQLAlchemy. Running the tests for this class requires the 

77 ``testing.postgresql`` be installed, which we assume indicates that a 

78 PostgreSQL server is installed and can be run locally in userspace. 

79 

80 Some functionality provided by this class (and used by `Registry`) requires 

81 the ``btree_gist`` PostgreSQL server extension to be installed an enabled 

82 on the database being connected to; this is checked at connection time. 

83 """ 

84 

85 def __init__( 

86 self, 

87 *, 

88 engine: sqlalchemy.engine.Engine, 

89 origin: int, 

90 namespace: str | None = None, 

91 writeable: bool = True, 

92 allow_temporary_tables: bool = True, 

93 ): 

94 with engine.connect() as connection: 

95 # `typing.Any` to make mypy ignore the line below, can't 

96 # use type: ignore 

97 dbapi: Any = connection.connection 

98 try: 

99 dsn = dbapi.get_dsn_parameters() 

100 except (AttributeError, KeyError) as err: 

101 raise RuntimeError("Only the psycopg2 driver for PostgreSQL is supported.") from err 

102 if namespace is None: 

103 query = sql.select(sql.func.current_schema()) 

104 namespace = connection.execute(query).scalar() 

105 query_text = "SELECT COUNT(*) FROM pg_extension WHERE extname='btree_gist';" 

106 if not connection.execute(sqlalchemy.text(query_text)).scalar(): 

107 raise RuntimeError( 

108 "The Butler PostgreSQL backend requires the btree_gist extension. " 

109 "As extensions are enabled per-database, this may require an administrator to run " 

110 "`CREATE EXTENSION btree_gist;` in a database before a butler client for it is " 

111 " initialized." 

112 ) 

113 

114 pg_version = get_postgres_server_version(connection) 

115 self._init( 

116 engine=engine, 

117 origin=origin, 

118 namespace=namespace, 

119 writeable=writeable, 

120 dbname=dsn.get("dbname"), 

121 metadata=None, 

122 pg_version=pg_version, 

123 allow_temporary_tables=allow_temporary_tables, 

124 ) 

125 

126 def _init( 

127 self, 

128 *, 

129 engine: sqlalchemy.engine.Engine, 

130 origin: int, 

131 namespace: str | None = None, 

132 writeable: bool = True, 

133 dbname: str, 

134 metadata: DatabaseMetadata | None, 

135 pg_version: tuple[int, int], 

136 allow_temporary_tables: bool = True, 

137 ) -> None: 

138 # Initialization logic shared between ``__init__`` and ``clone``. 

139 super().__init__( 

140 origin=origin, 

141 engine=engine, 

142 namespace=namespace, 

143 metadata=metadata, 

144 allow_temporary_tables=allow_temporary_tables, 

145 ) 

146 self._writeable = writeable 

147 self.dbname = dbname 

148 self._pg_version = pg_version 

149 

150 def clone(self) -> PostgresqlDatabase: 

151 clone = self.__new__(type(self)) 

152 clone._init( 

153 origin=self.origin, 

154 engine=self._engine, 

155 namespace=self.namespace, 

156 writeable=self._writeable, 

157 dbname=self.dbname, 

158 metadata=self._metadata, 

159 pg_version=self._pg_version, 

160 allow_temporary_tables=self._allow_temporary_tables, 

161 ) 

162 return clone 

163 

164 @classmethod 

165 def makeEngine( 

166 cls, uri: str | sqlalchemy.engine.URL, *, writeable: bool = True 

167 ) -> sqlalchemy.engine.Engine: 

168 return sqlalchemy.engine.create_engine( 

169 uri, 

170 # Prevent stale database connections from throwing exeptions, at 

171 # the expense of a round trip to the database server each time we 

172 # check out a session. Many services using the Butler operate in 

173 # networks where connections are dropped when idle for some time. 

174 pool_pre_ping=True, 

175 # This engine and database connection pool can be shared between 

176 # multiple Butler instances created via Butler.clone() or 

177 # LabeledButlerFactory, and typically these will be used from 

178 # multiple threads simultaneously. So we need to configure 

179 # SQLAlchemy to pool connections for multi-threaded usage. 

180 # 

181 # This pool size was chosen to work well for services using 

182 # FastAPI. FastAPI uses a thread pool of 40 by default, so this 

183 # gives us a connection for each thread in the pool. Because Butler 

184 # is currently sync-only, we won't ever be executing more queries 

185 # than we have threads. 

186 # 

187 # Connections are only created as they are needed, so in typical 

188 # single-threaded Butler use only one connection will ever be 

189 # created. Services with low peak concurrency may never create this 

190 # many connections. 

191 # 

192 # The main Butler databases at SLAC are run behind pgbouncer, so we 

193 # can support a larger number of simultaneous connections than if 

194 # we were connecting directly to Postgres. 

195 # 

196 # See 

197 # https://docs.sqlalchemy.org/en/20/core/pooling.html#sqlalchemy.pool.QueuePool.__init__ 

198 # for more information on the behavior of this parameter. 

199 pool_size=40, 

200 # If we are experiencing heavy enough load that we overflow the 

201 # connection pool, it will be harmful to start creating extra 

202 # connections that we disconnect immediately after use. 

203 # Connecting from scratch is fairly expensive, which is why we have 

204 # a pool in the first place. 

205 max_overflow=0, 

206 # If the pool is full, this is the maximum number of seconds we 

207 # will wait for a connection to become available before giving up. 

208 pool_timeout=60, 

209 # In combination with pool_pre_ping, prevent SQLAlchemy from 

210 # unnecessarily reviving pooled connections that have gone stale. 

211 # Setting this to true makes it always re-use the most recent 

212 # known-good connection when possible, instead of cycling to other 

213 # connections in the pool that we may no longer need. 

214 pool_use_lifo=True, 

215 ) 

216 

217 @classmethod 

218 def fromEngine( 

219 cls, 

220 engine: sqlalchemy.engine.Engine, 

221 *, 

222 origin: int, 

223 namespace: str | None = None, 

224 writeable: bool = True, 

225 ) -> Database: 

226 return cls(engine=engine, origin=origin, namespace=namespace, writeable=writeable) 

227 

228 @contextmanager 

229 def _transaction( 

230 self, 

231 *, 

232 interrupting: bool = False, 

233 savepoint: bool = False, 

234 lock: Iterable[sqlalchemy.schema.Table] = (), 

235 for_temp_tables: bool = False, 

236 ) -> Iterator[tuple[bool, sqlalchemy.engine.Connection]]: 

237 with super()._transaction(interrupting=interrupting, savepoint=savepoint, lock=lock) as ( 

238 is_new, 

239 connection, 

240 ): 

241 if is_new: 

242 # pgbouncer with transaction-level pooling (which we aim to 

243 # support) says that SET cannot be used, except for a list of 

244 # "Startup parameters" that includes "timezone" (see 

245 # https://www.pgbouncer.org/features.html#fnref:0). But I 

246 # don't see "timezone" in PostgreSQL's list of parameters 

247 # passed when creating a new connection 

248 # (https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-PARAMKEYWORDS). 

249 # Given that the pgbouncer docs say, "PgBouncer detects their 

250 # changes and so it can guarantee they remain consistent for 

251 # the client", I assume we can use "SET TIMESPAN" and pgbouncer 

252 # will take care of clients that share connections being set 

253 # consistently. And if that assumption is wrong, we should 

254 # still probably be okay, since all clients should be Butler 

255 # clients, and they'll all be setting the same thing. 

256 # 

257 # The "SET TRANSACTION READ ONLY" should also be safe, because 

258 # it only ever acts on the current transaction; I think it's 

259 # not included in pgbouncer's declaration that SET is 

260 # incompatible with transaction-level pooling because 

261 # PostgreSQL actually considers SET TRANSACTION to be a 

262 # fundamentally different statement from SET (they have their 

263 # own distinct doc pages, at least). 

264 with closing(connection.connection.cursor()) as cursor: 

265 # PostgreSQL permits writing to temporary tables inside 

266 # read-only transactions, but it doesn't permit creating 

267 # them. 

268 if not (self.isWriteable() or for_temp_tables): 

269 cursor.execute("SET TRANSACTION READ ONLY") 

270 # Make timestamps UTC, because we didn't use TIMESTAMPZ 

271 # for the column type. When we can tolerate a schema 

272 # change, we should change that type and remove this 

273 # line. 

274 cursor.execute("SET TIME ZONE 0") 

275 # Using server-side cursors with complex queries frequently 

276 # generates suboptimal query plan, setting 

277 # cursor_tuple_fraction=1 helps for those cases. 

278 cursor.execute("SET cursor_tuple_fraction = 1") 

279 yield is_new, connection 

280 

281 @contextmanager 

282 def temporary_table( 

283 self, spec: ddl.TableSpec, name: str | None = None 

284 ) -> Iterator[sqlalchemy.schema.Table]: 

285 # Docstring inherited. 

286 with self.transaction(for_temp_tables=True), super().temporary_table(spec, name) as table: 

287 yield table 

288 

289 def _lockTables( 

290 self, connection: sqlalchemy.engine.Connection, tables: Iterable[sqlalchemy.schema.Table] = () 

291 ) -> None: 

292 # Docstring inherited. 

293 for table in tables: 

294 connection.execute(sqlalchemy.text(f"LOCK TABLE {table.key} IN EXCLUSIVE MODE")) 

295 

296 def isWriteable(self) -> bool: 

297 return self._writeable 

298 

299 def __str__(self) -> str: 

300 return f"PostgreSQL@{self.dbname}:{self.namespace}" 

301 

302 def shrinkDatabaseEntityName(self, original: str) -> str: 

303 return self.name_shrinker.shrink(original) 

304 

305 def expandDatabaseEntityName(self, shrunk: str) -> str: 

306 return self.name_shrinker.expand(shrunk) 

307 

308 def _convertExclusionConstraintSpec( 

309 self, 

310 table: str, 

311 spec: tuple[str | type[TimespanDatabaseRepresentation], ...], 

312 metadata: sqlalchemy.MetaData, 

313 ) -> sqlalchemy.schema.Constraint: 

314 # Docstring inherited. 

315 args: list[tuple[sqlalchemy.schema.Column, str]] = [] 

316 names = ["excl"] 

317 for item in spec: 

318 if isinstance(item, str): 

319 args.append((sqlalchemy.schema.Column(item), "=")) 

320 names.append(item) 

321 elif issubclass(item, TimespanDatabaseRepresentation): 

322 assert item is self.getTimespanRepresentation() 

323 args.append((sqlalchemy.schema.Column(TimespanDatabaseRepresentation.NAME), "&&")) 

324 names.append(TimespanDatabaseRepresentation.NAME) 

325 return sqlalchemy.dialects.postgresql.ExcludeConstraint( 

326 *args, 

327 name=self.shrinkDatabaseEntityName("_".join(names)), 

328 ) 

329 

330 def _make_temporary_table( 

331 self, 

332 connection: sqlalchemy.engine.Connection, 

333 spec: ddl.TableSpec, 

334 name: str | None = None, 

335 **kwargs: Any, 

336 ) -> sqlalchemy.schema.Table: 

337 # Docstring inherited 

338 # Adding ON COMMIT DROP here is really quite defensive: we already 

339 # manually drop the table at the end of the temporary_table context 

340 # manager, and that will usually happen first. But this will guarantee 

341 # that we drop the table at the end of the transaction even if the 

342 # connection lasts longer, and that's good citizenship when connections 

343 # may be multiplexed by e.g. pgbouncer. 

344 return super()._make_temporary_table(connection, spec, name, postgresql_on_commit="DROP", **kwargs) 

345 

346 @classmethod 

347 def getTimespanRepresentation(cls) -> type[TimespanDatabaseRepresentation]: 

348 # Docstring inherited. 

349 return _RangeTimespanRepresentation 

350 

351 def replace(self, table: sqlalchemy.schema.Table, *rows: dict) -> None: 

352 self.assertTableWriteable(table, f"Cannot replace into read-only table {table}.") 

353 if not rows: 

354 return 

355 # This uses special support for UPSERT in PostgreSQL backend: 

356 # https://docs.sqlalchemy.org/en/13/dialects/postgresql.html#insert-on-conflict-upsert 

357 query = sqlalchemy.dialects.postgresql.dml.insert(table) 

358 # In the SET clause assign all columns using special `excluded` 

359 # pseudo-table. If some column in the table does not appear in the 

360 # INSERT list this will set it to NULL. 

361 excluded = query.excluded 

362 data = { 

363 column.name: getattr(excluded, column.name) 

364 for column in table.columns 

365 if column.name not in table.primary_key 

366 } 

367 if not data: 

368 self.ensure(table, *rows) 

369 return 

370 query = query.on_conflict_do_update(constraint=table.primary_key, set_=data) 

371 with self._transaction() as (_, connection): 

372 connection.execute(query, rows) 

373 

374 def ensure(self, table: sqlalchemy.schema.Table, *rows: dict, primary_key_only: bool = False) -> int: 

375 # Docstring inherited. 

376 self.assertTableWriteable(table, f"Cannot ensure into read-only table {table}.") 

377 if not rows: 

378 return 0 

379 # Like `replace`, this uses UPSERT. 

380 base_insert = sqlalchemy.dialects.postgresql.dml.insert(table) 

381 if primary_key_only: 

382 query = base_insert.on_conflict_do_nothing(constraint=table.primary_key) 

383 else: 

384 query = base_insert.on_conflict_do_nothing() 

385 with self._transaction() as (_, connection): 

386 return connection.execute(query, rows).rowcount 

387 

388 def constant_rows( 

389 self, 

390 fields: NamedValueAbstractSet[ddl.FieldSpec], 

391 *rows: dict, 

392 name: str | None = None, 

393 ) -> sqlalchemy.sql.FromClause: 

394 # Docstring inherited. 

395 return super().constant_rows(fields, *rows, name=name) 

396 

397 @property 

398 def has_distinct_on(self) -> bool: 

399 # Docstring inherited. 

400 return True 

401 

402 @property 

403 def has_any_aggregate(self) -> bool: 

404 # Docstring inherited. 

405 return self._pg_version >= (16, 0) 

406 

407 def apply_any_aggregate(self, column: sqlalchemy.ColumnElement[Any]) -> sqlalchemy.ColumnElement[Any]: 

408 # Docstring inherited 

409 

410 # The cast is required to prevent sqlalchemy from forgetting the type 

411 # of the initial column. Without the cast, for example, Base64Region 

412 # would become a String column in the output. 

413 return sqlalchemy.cast(sqlalchemy.func.any_value(column), column.type) 

414 

415 def glob_expression( 

416 self, expression: sqlalchemy.ColumnElement[Any], pattern: str 

417 ) -> sqlalchemy.ColumnElement[bool]: 

418 # Docstring inherited. 

419 # With Postgres we can use LIKE as it is case-sensitive, we only need 

420 # to translate our simple glob language to LIKE patterns, escaping few 

421 # things. Also we can try to optimize special cases when we can use 

422 # ``starts_with()`` function or "*" pattern. ``starts_with()`` in 

423 # Postgres14 does not use btree index, and LIKE with a constant prefix 

424 # is using index, so we may not need ``starts_with()`` after all. 

425 if pattern == "*": 

426 # Simple case. 

427 return sqlalchemy.literal(True) 

428 

429 def _escape(pattern: str) -> str: 

430 """Transform glob pattern string into LIKE pattern.""" 

431 tokens = [] 

432 escape = "" 

433 for char in pattern: 

434 if not escape: 

435 if char == "\\": 

436 escape = char 

437 elif char in "_%": 

438 # Escape special SQL characters. 

439 tokens.append(f"\\{char}") 

440 elif char == "*": 

441 tokens.append("%") 

442 elif char == "?": 

443 tokens.append("_") 

444 else: 

445 tokens.append(char) 

446 else: 

447 if char == "\\": 

448 # Two backslashes become one. 

449 tokens.append(char) 

450 elif char in "*?": 

451 # Handle escaped wildcards. 

452 tokens.append(f"{char}") 

453 else: 

454 # Copy both backslash and character. 

455 tokens.append(escape) 

456 tokens.append(char) 

457 escape = "" 

458 # Trailing backslash added to pattern. 

459 tokens.append(escape) 

460 return "".join(tokens) 

461 

462 pattern = _escape(pattern) 

463 return expression.op("LIKE")(sqlalchemy.literal(pattern)) 

464 

465 

466class _RangeTimespanType(sqlalchemy.TypeDecorator): 

467 """A single-column `Timespan` representation usable only with 

468 PostgreSQL. 

469 

470 This type should be able to take advantage of PostgreSQL's built-in 

471 range operators, and the indexing and EXCLUSION table constraints built 

472 off of them. 

473 """ 

474 

475 impl = sqlalchemy.dialects.postgresql.INT8RANGE 

476 

477 cache_ok = True 

478 

479 def process_bind_param( 

480 self, value: Timespan | None, dialect: sqlalchemy.engine.Dialect 

481 ) -> psycopg2.extras.NumericRange | None: 

482 if value is None: 

483 return None 

484 if not isinstance(value, Timespan): 

485 raise TypeError(f"Unsupported type: {type(value)}, expected Timespan.") 

486 if value.isEmpty(): 

487 return psycopg2.extras.NumericRange(empty=True) 

488 else: 

489 converter = time_utils.TimeConverter() 

490 assert value.nsec[0] >= converter.min_nsec, "Guaranteed by Timespan.__init__." 

491 assert value.nsec[1] <= converter.max_nsec, "Guaranteed by Timespan.__init__." 

492 lower = None if value.nsec[0] == converter.min_nsec else value.nsec[0] 

493 upper = None if value.nsec[1] == converter.max_nsec else value.nsec[1] 

494 return psycopg2.extras.NumericRange(lower=lower, upper=upper) 

495 

496 def process_result_value( 

497 self, value: psycopg2.extras.NumericRange | None, dialect: sqlalchemy.engine.Dialect 

498 ) -> Timespan | None: 

499 if value is None: 

500 return None 

501 if value.isempty: 

502 return Timespan.makeEmpty() 

503 converter = time_utils.TimeConverter() 

504 begin_nsec = converter.min_nsec if value.lower is None else value.lower 

505 end_nsec = converter.max_nsec if value.upper is None else value.upper 

506 return Timespan(begin=None, end=None, _nsec=(begin_nsec, end_nsec)) 

507 

508 

509class _RangeTimespanRepresentation(TimespanDatabaseRepresentation): 

510 """An implementation of `TimespanDatabaseRepresentation` that uses 

511 `_RangeTimespanType` to store a timespan in a single 

512 PostgreSQL-specific field. 

513 

514 Parameters 

515 ---------- 

516 column : `sqlalchemy.sql.ColumnElement` 

517 SQLAlchemy object representing the column. 

518 """ 

519 

520 def __init__(self, column: sqlalchemy.sql.ColumnElement, name: str): 

521 self.column = column 

522 self._name = name 

523 

524 __slots__ = ("column", "_name") 

525 

526 @classmethod 

527 def makeFieldSpecs( 

528 cls, nullable: bool, name: str | None = None, **kwargs: Any 

529 ) -> tuple[ddl.FieldSpec, ...]: 

530 # Docstring inherited. 

531 if name is None: 

532 name = cls.NAME 

533 return ( 

534 ddl.FieldSpec( 

535 name, 

536 dtype=_RangeTimespanType, 

537 nullable=nullable, 

538 default=(None if nullable else sqlalchemy.sql.text("'(,)'::int8range")), 

539 **kwargs, 

540 ), 

541 ) 

542 

543 @classmethod 

544 def getFieldNames(cls, name: str | None = None) -> tuple[str, ...]: 

545 # Docstring inherited. 

546 if name is None: 

547 name = cls.NAME 

548 return (name,) 

549 

550 @classmethod 

551 def update( 

552 cls, extent: Timespan | None, name: str | None = None, result: dict[str, Any] | None = None 

553 ) -> dict[str, Any]: 

554 # Docstring inherited. 

555 if name is None: 

556 name = cls.NAME 

557 if result is None: 

558 result = {} 

559 result[name] = extent 

560 return result 

561 

562 @classmethod 

563 def extract(cls, mapping: Mapping[str, Any], name: str | None = None) -> Timespan | None: 

564 # Docstring inherited. 

565 if name is None: 

566 name = cls.NAME 

567 return mapping[name] 

568 

569 @classmethod 

570 def fromLiteral(cls, timespan: Timespan | None) -> _RangeTimespanRepresentation: 

571 # Docstring inherited. 

572 if timespan is None: 

573 # Cast NULL to an expected type, helps Postgres to figure out 

574 # column type when doing UNION. 

575 return cls( 

576 column=sqlalchemy.func.cast(sqlalchemy.sql.null(), type_=_RangeTimespanType), name=cls.NAME 

577 ) 

578 return cls( 

579 column=sqlalchemy.sql.cast( 

580 sqlalchemy.sql.literal(timespan, type_=_RangeTimespanType), type_=_RangeTimespanType 

581 ), 

582 name=cls.NAME, 

583 ) 

584 

585 @classmethod 

586 def from_columns( 

587 cls, columns: sqlalchemy.sql.ColumnCollection, name: str | None = None 

588 ) -> _RangeTimespanRepresentation: 

589 # Docstring inherited. 

590 if name is None: 

591 name = cls.NAME 

592 return cls(columns[name], name) 

593 

594 @property 

595 def name(self) -> str: 

596 # Docstring inherited. 

597 return self._name 

598 

599 def isNull(self) -> sqlalchemy.sql.ColumnElement: 

600 # Docstring inherited. 

601 return self.column.is_(None) 

602 

603 def isEmpty(self) -> sqlalchemy.sql.ColumnElement: 

604 # Docstring inherited 

605 return sqlalchemy.sql.func.isempty(self.column) 

606 

607 def __lt__( 

608 self, other: _RangeTimespanRepresentation | sqlalchemy.sql.ColumnElement 

609 ) -> sqlalchemy.sql.ColumnElement: 

610 # Docstring inherited. 

611 if isinstance(other, sqlalchemy.sql.ColumnElement): 

612 return sqlalchemy.sql.and_( 

613 sqlalchemy.sql.not_(sqlalchemy.sql.func.upper_inf(self.column)), 

614 sqlalchemy.sql.not_(sqlalchemy.sql.func.isempty(self.column)), 

615 sqlalchemy.sql.func.upper(self.column) <= other, 

616 ) 

617 else: 

618 return self.column << other.column 

619 

620 def __gt__( 

621 self, other: _RangeTimespanRepresentation | sqlalchemy.sql.ColumnElement 

622 ) -> sqlalchemy.sql.ColumnElement: 

623 # Docstring inherited. 

624 if isinstance(other, sqlalchemy.sql.ColumnElement): 

625 return sqlalchemy.sql.and_( 

626 sqlalchemy.sql.not_(sqlalchemy.sql.func.lower_inf(self.column)), 

627 sqlalchemy.sql.not_(sqlalchemy.sql.func.isempty(self.column)), 

628 sqlalchemy.sql.func.lower(self.column) > other, 

629 ) 

630 else: 

631 return self.column >> other.column 

632 

633 def overlaps( 

634 self, other: _RangeTimespanRepresentation | sqlalchemy.sql.ColumnElement 

635 ) -> sqlalchemy.sql.ColumnElement: 

636 # Docstring inherited. 

637 if not isinstance(other, _RangeTimespanRepresentation): 

638 return self.contains(other) 

639 return self.column.overlaps(other.column) 

640 

641 def contains( 

642 self, other: _RangeTimespanRepresentation | sqlalchemy.sql.ColumnElement 

643 ) -> sqlalchemy.sql.ColumnElement: 

644 # Docstring inherited 

645 if isinstance(other, _RangeTimespanRepresentation): 

646 return self.column.contains(other.column) 

647 else: 

648 return self.column.contains(other) 

649 

650 def lower(self) -> sqlalchemy.sql.ColumnElement: 

651 # Docstring inherited. 

652 return sqlalchemy.sql.functions.coalesce( 

653 sqlalchemy.sql.func.lower(self.column), sqlalchemy.sql.literal(0) 

654 ) 

655 

656 def upper(self) -> sqlalchemy.sql.ColumnElement: 

657 # Docstring inherited. 

658 return sqlalchemy.sql.functions.coalesce( 

659 sqlalchemy.sql.func.upper(self.column), sqlalchemy.sql.literal(0) 

660 ) 

661 

662 def flatten(self, name: str | None = None) -> tuple[sqlalchemy.sql.ColumnElement]: 

663 # Docstring inherited. 

664 if name is None: 

665 return (self.column,) 

666 else: 

667 return (self.column.label(name),) 

668 

669 def apply_any_aggregate( 

670 self, func: Callable[[ColumnElement[Any]], ColumnElement[Any]] 

671 ) -> TimespanDatabaseRepresentation: 

672 # Docstring inherited. 

673 return _RangeTimespanRepresentation(func(self.column), self.name) 

674 

675 

676def get_postgres_server_version(connection: sqlalchemy.Connection) -> tuple[int, int]: # numpydoc ignore=PR01 

677 """Return the postgres DB server version as a tuple 

678 (major version, minor version). 

679 """ 

680 raw_pg_version = connection.execute(sqlalchemy.text("SHOW server_version")).scalar() 

681 _SERVER_VERSION_REGEX = re.compile(r"(?P<major>\d+)\.(?P<minor>\d+)") 

682 if raw_pg_version is not None and (m := _SERVER_VERSION_REGEX.search(raw_pg_version)): 

683 return (int(m.group("major")), int(m.group("minor"))) 

684 else: 

685 raise RuntimeError("Failed to get PostgreSQL server version.")