Coverage for python / lsst / daf / butler / registry / databases / postgresql.py: 24%
252 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 08:30 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 08:30 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
27from __future__ import annotations
29from sqlalchemy.sql.expression import ColumnElement as ColumnElement
31from ... import ddl, time_utils
33__all__ = ["PostgresqlDatabase"]
35import re
36from collections.abc import Callable, Iterable, Iterator, Mapping
37from contextlib import closing, contextmanager
38from typing import Any
40import psycopg2
41import sqlalchemy
42import sqlalchemy.dialects.postgresql
43from sqlalchemy import sql
45from ..._named import NamedValueAbstractSet
46from ..._timespan import Timespan
47from ...timespan_database_representation import TimespanDatabaseRepresentation
48from ..interfaces import Database, DatabaseMetadata
51class PostgresqlDatabase(Database):
52 """An implementation of the `Database` interface for PostgreSQL.
54 Parameters
55 ----------
56 engine : `sqlalchemy.engine.Engine`
57 Engine to use for this connection.
58 origin : `int`
59 An integer ID that should be used as the default for any datasets,
60 quanta, or other entities that use a (autoincrement, origin) compound
61 primary key.
62 namespace : `str`, optional
63 The namespace (schema) this database is associated with. If `None`,
64 the default schema for the connection is used (which may be `None`).
65 writeable : `bool`, optional
66 If `True`, allow write operations on the database, including
67 ``CREATE TABLE``.
68 allow_temporary_tables : `bool`, optional
69 If `True`, database operations will be allowed to use temporary tables.
70 If `False`, other SQL constructs will be used instead of temporary
71 tables when possible.
73 Notes
74 -----
75 This currently requires the psycopg2 driver to be used as the backend for
76 SQLAlchemy. Running the tests for this class requires the
77 ``testing.postgresql`` be installed, which we assume indicates that a
78 PostgreSQL server is installed and can be run locally in userspace.
80 Some functionality provided by this class (and used by `Registry`) requires
81 the ``btree_gist`` PostgreSQL server extension to be installed an enabled
82 on the database being connected to; this is checked at connection time.
83 """
85 def __init__(
86 self,
87 *,
88 engine: sqlalchemy.engine.Engine,
89 origin: int,
90 namespace: str | None = None,
91 writeable: bool = True,
92 allow_temporary_tables: bool = True,
93 ):
94 with engine.connect() as connection:
95 # `typing.Any` to make mypy ignore the line below, can't
96 # use type: ignore
97 dbapi: Any = connection.connection
98 try:
99 dsn = dbapi.get_dsn_parameters()
100 except (AttributeError, KeyError) as err:
101 raise RuntimeError("Only the psycopg2 driver for PostgreSQL is supported.") from err
102 if namespace is None:
103 query = sql.select(sql.func.current_schema())
104 namespace = connection.execute(query).scalar()
105 query_text = "SELECT COUNT(*) FROM pg_extension WHERE extname='btree_gist';"
106 if not connection.execute(sqlalchemy.text(query_text)).scalar():
107 raise RuntimeError(
108 "The Butler PostgreSQL backend requires the btree_gist extension. "
109 "As extensions are enabled per-database, this may require an administrator to run "
110 "`CREATE EXTENSION btree_gist;` in a database before a butler client for it is "
111 " initialized."
112 )
114 pg_version = get_postgres_server_version(connection)
115 self._init(
116 engine=engine,
117 origin=origin,
118 namespace=namespace,
119 writeable=writeable,
120 dbname=dsn.get("dbname"),
121 metadata=None,
122 pg_version=pg_version,
123 allow_temporary_tables=allow_temporary_tables,
124 )
126 def _init(
127 self,
128 *,
129 engine: sqlalchemy.engine.Engine,
130 origin: int,
131 namespace: str | None = None,
132 writeable: bool = True,
133 dbname: str,
134 metadata: DatabaseMetadata | None,
135 pg_version: tuple[int, int],
136 allow_temporary_tables: bool = True,
137 ) -> None:
138 # Initialization logic shared between ``__init__`` and ``clone``.
139 super().__init__(
140 origin=origin,
141 engine=engine,
142 namespace=namespace,
143 metadata=metadata,
144 allow_temporary_tables=allow_temporary_tables,
145 )
146 self._writeable = writeable
147 self.dbname = dbname
148 self._pg_version = pg_version
150 def clone(self) -> PostgresqlDatabase:
151 clone = self.__new__(type(self))
152 clone._init(
153 origin=self.origin,
154 engine=self._engine,
155 namespace=self.namespace,
156 writeable=self._writeable,
157 dbname=self.dbname,
158 metadata=self._metadata,
159 pg_version=self._pg_version,
160 allow_temporary_tables=self._allow_temporary_tables,
161 )
162 return clone
164 @classmethod
165 def makeEngine(
166 cls, uri: str | sqlalchemy.engine.URL, *, writeable: bool = True
167 ) -> sqlalchemy.engine.Engine:
168 return sqlalchemy.engine.create_engine(
169 uri,
170 # Prevent stale database connections from throwing exeptions, at
171 # the expense of a round trip to the database server each time we
172 # check out a session. Many services using the Butler operate in
173 # networks where connections are dropped when idle for some time.
174 pool_pre_ping=True,
175 # This engine and database connection pool can be shared between
176 # multiple Butler instances created via Butler.clone() or
177 # LabeledButlerFactory, and typically these will be used from
178 # multiple threads simultaneously. So we need to configure
179 # SQLAlchemy to pool connections for multi-threaded usage.
180 #
181 # This pool size was chosen to work well for services using
182 # FastAPI. FastAPI uses a thread pool of 40 by default, so this
183 # gives us a connection for each thread in the pool. Because Butler
184 # is currently sync-only, we won't ever be executing more queries
185 # than we have threads.
186 #
187 # Connections are only created as they are needed, so in typical
188 # single-threaded Butler use only one connection will ever be
189 # created. Services with low peak concurrency may never create this
190 # many connections.
191 #
192 # The main Butler databases at SLAC are run behind pgbouncer, so we
193 # can support a larger number of simultaneous connections than if
194 # we were connecting directly to Postgres.
195 #
196 # See
197 # https://docs.sqlalchemy.org/en/20/core/pooling.html#sqlalchemy.pool.QueuePool.__init__
198 # for more information on the behavior of this parameter.
199 pool_size=40,
200 # If we are experiencing heavy enough load that we overflow the
201 # connection pool, it will be harmful to start creating extra
202 # connections that we disconnect immediately after use.
203 # Connecting from scratch is fairly expensive, which is why we have
204 # a pool in the first place.
205 max_overflow=0,
206 # If the pool is full, this is the maximum number of seconds we
207 # will wait for a connection to become available before giving up.
208 pool_timeout=60,
209 # In combination with pool_pre_ping, prevent SQLAlchemy from
210 # unnecessarily reviving pooled connections that have gone stale.
211 # Setting this to true makes it always re-use the most recent
212 # known-good connection when possible, instead of cycling to other
213 # connections in the pool that we may no longer need.
214 pool_use_lifo=True,
215 )
217 @classmethod
218 def fromEngine(
219 cls,
220 engine: sqlalchemy.engine.Engine,
221 *,
222 origin: int,
223 namespace: str | None = None,
224 writeable: bool = True,
225 ) -> Database:
226 return cls(engine=engine, origin=origin, namespace=namespace, writeable=writeable)
228 @contextmanager
229 def _transaction(
230 self,
231 *,
232 interrupting: bool = False,
233 savepoint: bool = False,
234 lock: Iterable[sqlalchemy.schema.Table] = (),
235 for_temp_tables: bool = False,
236 ) -> Iterator[tuple[bool, sqlalchemy.engine.Connection]]:
237 with super()._transaction(interrupting=interrupting, savepoint=savepoint, lock=lock) as (
238 is_new,
239 connection,
240 ):
241 if is_new:
242 # pgbouncer with transaction-level pooling (which we aim to
243 # support) says that SET cannot be used, except for a list of
244 # "Startup parameters" that includes "timezone" (see
245 # https://www.pgbouncer.org/features.html#fnref:0). But I
246 # don't see "timezone" in PostgreSQL's list of parameters
247 # passed when creating a new connection
248 # (https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-PARAMKEYWORDS).
249 # Given that the pgbouncer docs say, "PgBouncer detects their
250 # changes and so it can guarantee they remain consistent for
251 # the client", I assume we can use "SET TIMESPAN" and pgbouncer
252 # will take care of clients that share connections being set
253 # consistently. And if that assumption is wrong, we should
254 # still probably be okay, since all clients should be Butler
255 # clients, and they'll all be setting the same thing.
256 #
257 # The "SET TRANSACTION READ ONLY" should also be safe, because
258 # it only ever acts on the current transaction; I think it's
259 # not included in pgbouncer's declaration that SET is
260 # incompatible with transaction-level pooling because
261 # PostgreSQL actually considers SET TRANSACTION to be a
262 # fundamentally different statement from SET (they have their
263 # own distinct doc pages, at least).
264 with closing(connection.connection.cursor()) as cursor:
265 # PostgreSQL permits writing to temporary tables inside
266 # read-only transactions, but it doesn't permit creating
267 # them.
268 if not (self.isWriteable() or for_temp_tables):
269 cursor.execute("SET TRANSACTION READ ONLY")
270 # Make timestamps UTC, because we didn't use TIMESTAMPZ
271 # for the column type. When we can tolerate a schema
272 # change, we should change that type and remove this
273 # line.
274 cursor.execute("SET TIME ZONE 0")
275 # Using server-side cursors with complex queries frequently
276 # generates suboptimal query plan, setting
277 # cursor_tuple_fraction=1 helps for those cases.
278 cursor.execute("SET cursor_tuple_fraction = 1")
279 yield is_new, connection
281 @contextmanager
282 def temporary_table(
283 self, spec: ddl.TableSpec, name: str | None = None
284 ) -> Iterator[sqlalchemy.schema.Table]:
285 # Docstring inherited.
286 with self.transaction(for_temp_tables=True), super().temporary_table(spec, name) as table:
287 yield table
289 def _lockTables(
290 self, connection: sqlalchemy.engine.Connection, tables: Iterable[sqlalchemy.schema.Table] = ()
291 ) -> None:
292 # Docstring inherited.
293 for table in tables:
294 connection.execute(sqlalchemy.text(f"LOCK TABLE {table.key} IN EXCLUSIVE MODE"))
296 def isWriteable(self) -> bool:
297 return self._writeable
299 def __str__(self) -> str:
300 return f"PostgreSQL@{self.dbname}:{self.namespace}"
302 def shrinkDatabaseEntityName(self, original: str) -> str:
303 return self.name_shrinker.shrink(original)
305 def expandDatabaseEntityName(self, shrunk: str) -> str:
306 return self.name_shrinker.expand(shrunk)
308 def _convertExclusionConstraintSpec(
309 self,
310 table: str,
311 spec: tuple[str | type[TimespanDatabaseRepresentation], ...],
312 metadata: sqlalchemy.MetaData,
313 ) -> sqlalchemy.schema.Constraint:
314 # Docstring inherited.
315 args: list[tuple[sqlalchemy.schema.Column, str]] = []
316 names = ["excl"]
317 for item in spec:
318 if isinstance(item, str):
319 args.append((sqlalchemy.schema.Column(item), "="))
320 names.append(item)
321 elif issubclass(item, TimespanDatabaseRepresentation):
322 assert item is self.getTimespanRepresentation()
323 args.append((sqlalchemy.schema.Column(TimespanDatabaseRepresentation.NAME), "&&"))
324 names.append(TimespanDatabaseRepresentation.NAME)
325 return sqlalchemy.dialects.postgresql.ExcludeConstraint(
326 *args,
327 name=self.shrinkDatabaseEntityName("_".join(names)),
328 )
330 def _make_temporary_table(
331 self,
332 connection: sqlalchemy.engine.Connection,
333 spec: ddl.TableSpec,
334 name: str | None = None,
335 **kwargs: Any,
336 ) -> sqlalchemy.schema.Table:
337 # Docstring inherited
338 # Adding ON COMMIT DROP here is really quite defensive: we already
339 # manually drop the table at the end of the temporary_table context
340 # manager, and that will usually happen first. But this will guarantee
341 # that we drop the table at the end of the transaction even if the
342 # connection lasts longer, and that's good citizenship when connections
343 # may be multiplexed by e.g. pgbouncer.
344 return super()._make_temporary_table(connection, spec, name, postgresql_on_commit="DROP", **kwargs)
346 @classmethod
347 def getTimespanRepresentation(cls) -> type[TimespanDatabaseRepresentation]:
348 # Docstring inherited.
349 return _RangeTimespanRepresentation
351 def replace(self, table: sqlalchemy.schema.Table, *rows: dict) -> None:
352 self.assertTableWriteable(table, f"Cannot replace into read-only table {table}.")
353 if not rows:
354 return
355 # This uses special support for UPSERT in PostgreSQL backend:
356 # https://docs.sqlalchemy.org/en/13/dialects/postgresql.html#insert-on-conflict-upsert
357 query = sqlalchemy.dialects.postgresql.dml.insert(table)
358 # In the SET clause assign all columns using special `excluded`
359 # pseudo-table. If some column in the table does not appear in the
360 # INSERT list this will set it to NULL.
361 excluded = query.excluded
362 data = {
363 column.name: getattr(excluded, column.name)
364 for column in table.columns
365 if column.name not in table.primary_key
366 }
367 if not data:
368 self.ensure(table, *rows)
369 return
370 query = query.on_conflict_do_update(constraint=table.primary_key, set_=data)
371 with self._transaction() as (_, connection):
372 connection.execute(query, rows)
374 def ensure(self, table: sqlalchemy.schema.Table, *rows: dict, primary_key_only: bool = False) -> int:
375 # Docstring inherited.
376 self.assertTableWriteable(table, f"Cannot ensure into read-only table {table}.")
377 if not rows:
378 return 0
379 # Like `replace`, this uses UPSERT.
380 base_insert = sqlalchemy.dialects.postgresql.dml.insert(table)
381 if primary_key_only:
382 query = base_insert.on_conflict_do_nothing(constraint=table.primary_key)
383 else:
384 query = base_insert.on_conflict_do_nothing()
385 with self._transaction() as (_, connection):
386 return connection.execute(query, rows).rowcount
388 def constant_rows(
389 self,
390 fields: NamedValueAbstractSet[ddl.FieldSpec],
391 *rows: dict,
392 name: str | None = None,
393 ) -> sqlalchemy.sql.FromClause:
394 # Docstring inherited.
395 return super().constant_rows(fields, *rows, name=name)
397 @property
398 def has_distinct_on(self) -> bool:
399 # Docstring inherited.
400 return True
402 @property
403 def has_any_aggregate(self) -> bool:
404 # Docstring inherited.
405 return self._pg_version >= (16, 0)
407 def apply_any_aggregate(self, column: sqlalchemy.ColumnElement[Any]) -> sqlalchemy.ColumnElement[Any]:
408 # Docstring inherited
410 # The cast is required to prevent sqlalchemy from forgetting the type
411 # of the initial column. Without the cast, for example, Base64Region
412 # would become a String column in the output.
413 return sqlalchemy.cast(sqlalchemy.func.any_value(column), column.type)
415 def glob_expression(
416 self, expression: sqlalchemy.ColumnElement[Any], pattern: str
417 ) -> sqlalchemy.ColumnElement[bool]:
418 # Docstring inherited.
419 # With Postgres we can use LIKE as it is case-sensitive, we only need
420 # to translate our simple glob language to LIKE patterns, escaping few
421 # things. Also we can try to optimize special cases when we can use
422 # ``starts_with()`` function or "*" pattern. ``starts_with()`` in
423 # Postgres14 does not use btree index, and LIKE with a constant prefix
424 # is using index, so we may not need ``starts_with()`` after all.
425 if pattern == "*":
426 # Simple case.
427 return sqlalchemy.literal(True)
429 def _escape(pattern: str) -> str:
430 """Transform glob pattern string into LIKE pattern."""
431 tokens = []
432 escape = ""
433 for char in pattern:
434 if not escape:
435 if char == "\\":
436 escape = char
437 elif char in "_%":
438 # Escape special SQL characters.
439 tokens.append(f"\\{char}")
440 elif char == "*":
441 tokens.append("%")
442 elif char == "?":
443 tokens.append("_")
444 else:
445 tokens.append(char)
446 else:
447 if char == "\\":
448 # Two backslashes become one.
449 tokens.append(char)
450 elif char in "*?":
451 # Handle escaped wildcards.
452 tokens.append(f"{char}")
453 else:
454 # Copy both backslash and character.
455 tokens.append(escape)
456 tokens.append(char)
457 escape = ""
458 # Trailing backslash added to pattern.
459 tokens.append(escape)
460 return "".join(tokens)
462 pattern = _escape(pattern)
463 return expression.op("LIKE")(sqlalchemy.literal(pattern))
466class _RangeTimespanType(sqlalchemy.TypeDecorator):
467 """A single-column `Timespan` representation usable only with
468 PostgreSQL.
470 This type should be able to take advantage of PostgreSQL's built-in
471 range operators, and the indexing and EXCLUSION table constraints built
472 off of them.
473 """
475 impl = sqlalchemy.dialects.postgresql.INT8RANGE
477 cache_ok = True
479 def process_bind_param(
480 self, value: Timespan | None, dialect: sqlalchemy.engine.Dialect
481 ) -> psycopg2.extras.NumericRange | None:
482 if value is None:
483 return None
484 if not isinstance(value, Timespan):
485 raise TypeError(f"Unsupported type: {type(value)}, expected Timespan.")
486 if value.isEmpty():
487 return psycopg2.extras.NumericRange(empty=True)
488 else:
489 converter = time_utils.TimeConverter()
490 assert value.nsec[0] >= converter.min_nsec, "Guaranteed by Timespan.__init__."
491 assert value.nsec[1] <= converter.max_nsec, "Guaranteed by Timespan.__init__."
492 lower = None if value.nsec[0] == converter.min_nsec else value.nsec[0]
493 upper = None if value.nsec[1] == converter.max_nsec else value.nsec[1]
494 return psycopg2.extras.NumericRange(lower=lower, upper=upper)
496 def process_result_value(
497 self, value: psycopg2.extras.NumericRange | None, dialect: sqlalchemy.engine.Dialect
498 ) -> Timespan | None:
499 if value is None:
500 return None
501 if value.isempty:
502 return Timespan.makeEmpty()
503 converter = time_utils.TimeConverter()
504 begin_nsec = converter.min_nsec if value.lower is None else value.lower
505 end_nsec = converter.max_nsec if value.upper is None else value.upper
506 return Timespan(begin=None, end=None, _nsec=(begin_nsec, end_nsec))
509class _RangeTimespanRepresentation(TimespanDatabaseRepresentation):
510 """An implementation of `TimespanDatabaseRepresentation` that uses
511 `_RangeTimespanType` to store a timespan in a single
512 PostgreSQL-specific field.
514 Parameters
515 ----------
516 column : `sqlalchemy.sql.ColumnElement`
517 SQLAlchemy object representing the column.
518 """
520 def __init__(self, column: sqlalchemy.sql.ColumnElement, name: str):
521 self.column = column
522 self._name = name
524 __slots__ = ("column", "_name")
526 @classmethod
527 def makeFieldSpecs(
528 cls, nullable: bool, name: str | None = None, **kwargs: Any
529 ) -> tuple[ddl.FieldSpec, ...]:
530 # Docstring inherited.
531 if name is None:
532 name = cls.NAME
533 return (
534 ddl.FieldSpec(
535 name,
536 dtype=_RangeTimespanType,
537 nullable=nullable,
538 default=(None if nullable else sqlalchemy.sql.text("'(,)'::int8range")),
539 **kwargs,
540 ),
541 )
543 @classmethod
544 def getFieldNames(cls, name: str | None = None) -> tuple[str, ...]:
545 # Docstring inherited.
546 if name is None:
547 name = cls.NAME
548 return (name,)
550 @classmethod
551 def update(
552 cls, extent: Timespan | None, name: str | None = None, result: dict[str, Any] | None = None
553 ) -> dict[str, Any]:
554 # Docstring inherited.
555 if name is None:
556 name = cls.NAME
557 if result is None:
558 result = {}
559 result[name] = extent
560 return result
562 @classmethod
563 def extract(cls, mapping: Mapping[str, Any], name: str | None = None) -> Timespan | None:
564 # Docstring inherited.
565 if name is None:
566 name = cls.NAME
567 return mapping[name]
569 @classmethod
570 def fromLiteral(cls, timespan: Timespan | None) -> _RangeTimespanRepresentation:
571 # Docstring inherited.
572 if timespan is None:
573 # Cast NULL to an expected type, helps Postgres to figure out
574 # column type when doing UNION.
575 return cls(
576 column=sqlalchemy.func.cast(sqlalchemy.sql.null(), type_=_RangeTimespanType), name=cls.NAME
577 )
578 return cls(
579 column=sqlalchemy.sql.cast(
580 sqlalchemy.sql.literal(timespan, type_=_RangeTimespanType), type_=_RangeTimespanType
581 ),
582 name=cls.NAME,
583 )
585 @classmethod
586 def from_columns(
587 cls, columns: sqlalchemy.sql.ColumnCollection, name: str | None = None
588 ) -> _RangeTimespanRepresentation:
589 # Docstring inherited.
590 if name is None:
591 name = cls.NAME
592 return cls(columns[name], name)
594 @property
595 def name(self) -> str:
596 # Docstring inherited.
597 return self._name
599 def isNull(self) -> sqlalchemy.sql.ColumnElement:
600 # Docstring inherited.
601 return self.column.is_(None)
603 def isEmpty(self) -> sqlalchemy.sql.ColumnElement:
604 # Docstring inherited
605 return sqlalchemy.sql.func.isempty(self.column)
607 def __lt__(
608 self, other: _RangeTimespanRepresentation | sqlalchemy.sql.ColumnElement
609 ) -> sqlalchemy.sql.ColumnElement:
610 # Docstring inherited.
611 if isinstance(other, sqlalchemy.sql.ColumnElement):
612 return sqlalchemy.sql.and_(
613 sqlalchemy.sql.not_(sqlalchemy.sql.func.upper_inf(self.column)),
614 sqlalchemy.sql.not_(sqlalchemy.sql.func.isempty(self.column)),
615 sqlalchemy.sql.func.upper(self.column) <= other,
616 )
617 else:
618 return self.column << other.column
620 def __gt__(
621 self, other: _RangeTimespanRepresentation | sqlalchemy.sql.ColumnElement
622 ) -> sqlalchemy.sql.ColumnElement:
623 # Docstring inherited.
624 if isinstance(other, sqlalchemy.sql.ColumnElement):
625 return sqlalchemy.sql.and_(
626 sqlalchemy.sql.not_(sqlalchemy.sql.func.lower_inf(self.column)),
627 sqlalchemy.sql.not_(sqlalchemy.sql.func.isempty(self.column)),
628 sqlalchemy.sql.func.lower(self.column) > other,
629 )
630 else:
631 return self.column >> other.column
633 def overlaps(
634 self, other: _RangeTimespanRepresentation | sqlalchemy.sql.ColumnElement
635 ) -> sqlalchemy.sql.ColumnElement:
636 # Docstring inherited.
637 if not isinstance(other, _RangeTimespanRepresentation):
638 return self.contains(other)
639 return self.column.overlaps(other.column)
641 def contains(
642 self, other: _RangeTimespanRepresentation | sqlalchemy.sql.ColumnElement
643 ) -> sqlalchemy.sql.ColumnElement:
644 # Docstring inherited
645 if isinstance(other, _RangeTimespanRepresentation):
646 return self.column.contains(other.column)
647 else:
648 return self.column.contains(other)
650 def lower(self) -> sqlalchemy.sql.ColumnElement:
651 # Docstring inherited.
652 return sqlalchemy.sql.functions.coalesce(
653 sqlalchemy.sql.func.lower(self.column), sqlalchemy.sql.literal(0)
654 )
656 def upper(self) -> sqlalchemy.sql.ColumnElement:
657 # Docstring inherited.
658 return sqlalchemy.sql.functions.coalesce(
659 sqlalchemy.sql.func.upper(self.column), sqlalchemy.sql.literal(0)
660 )
662 def flatten(self, name: str | None = None) -> tuple[sqlalchemy.sql.ColumnElement]:
663 # Docstring inherited.
664 if name is None:
665 return (self.column,)
666 else:
667 return (self.column.label(name),)
669 def apply_any_aggregate(
670 self, func: Callable[[ColumnElement[Any]], ColumnElement[Any]]
671 ) -> TimespanDatabaseRepresentation:
672 # Docstring inherited.
673 return _RangeTimespanRepresentation(func(self.column), self.name)
676def get_postgres_server_version(connection: sqlalchemy.Connection) -> tuple[int, int]: # numpydoc ignore=PR01
677 """Return the postgres DB server version as a tuple
678 (major version, minor version).
679 """
680 raw_pg_version = connection.execute(sqlalchemy.text("SHOW server_version")).scalar()
681 _SERVER_VERSION_REGEX = re.compile(r"(?P<major>\d+)\.(?P<minor>\d+)")
682 if raw_pg_version is not None and (m := _SERVER_VERSION_REGEX.search(raw_pg_version)):
683 return (int(m.group("major")), int(m.group("minor")))
684 else:
685 raise RuntimeError("Failed to get PostgreSQL server version.")