Coverage for python / lsst / daf / butler / registry / databases / sqlite.py: 18%
192 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 08:30 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 08:30 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
27from __future__ import annotations
29__all__ = ["SqliteDatabase"]
31import copy
32import os
33import sqlite3
34import urllib.parse
35from collections.abc import Iterable
36from contextlib import AbstractContextManager, closing
37from typing import Any
39import sqlalchemy
40import sqlalchemy.dialects.sqlite
41import sqlalchemy.ext.compiler
43from ... import ddl
44from ..._named import NamedValueAbstractSet
45from ..interfaces import Database, DatabaseMetadata, StaticTablesContext
48def _onSqlite3Connect(
49 dbapiConnection: sqlite3.Connection, connectionRecord: sqlalchemy.pool._ConnectionRecord
50) -> None:
51 assert isinstance(dbapiConnection, sqlite3.Connection)
52 # Prevent pysqlite from emitting BEGIN and COMMIT statements.
53 dbapiConnection.isolation_level = None
54 # Enable foreign keys
55 with closing(dbapiConnection.cursor()) as cursor:
56 cursor.execute("PRAGMA foreign_keys=ON;")
57 cursor.execute("PRAGMA busy_timeout = 300000;") # in ms, so 5min (way longer than should be needed)
60class SqliteDatabase(Database):
61 """An implementation of the `Database` interface for SQLite3.
63 Parameters
64 ----------
65 engine : `sqlalchemy.engine.Engine`
66 Engine to use for this connection.
67 origin : `int`
68 An integer ID that should be used as the default for any datasets,
69 quanta, or other entities that use a (autoincrement, origin) compound
70 primary key.
71 namespace : `str`, optional
72 The namespace (schema) this database is associated with. If `None`,
73 the default schema for the connection is used (which may be `None`).
74 writeable : `bool`, optional
75 If `True`, allow write operations on the database, including
76 ``CREATE TABLE``.
77 allow_temporary_tables : `bool`, optional
78 If `True`, database operations will be allowed to use temporary
79 tables.
80 If `False`, other SQL constructs will be used instead of temporary
81 tables when possible.
83 Notes
84 -----
85 The case where ``namespace is not None`` is not yet tested, and may be
86 broken; we need an API for attaching to different databases in order to
87 write those tests, but haven't yet worked out what is common/different
88 across databases well enough to define it.
89 """
91 def __init__(
92 self,
93 *,
94 engine: sqlalchemy.engine.Engine,
95 origin: int,
96 namespace: str | None = None,
97 writeable: bool = True,
98 allow_temporary_tables: bool = True,
99 ):
100 filename = _find_database_filename(engine, namespace)
101 self._init(
102 engine=engine,
103 origin=origin,
104 namespace=namespace,
105 writeable=writeable,
106 filename=filename,
107 metadata=None,
108 allow_temporary_tables=allow_temporary_tables,
109 )
111 def _init(
112 self,
113 *,
114 engine: sqlalchemy.engine.Engine,
115 origin: int,
116 namespace: str | None = None,
117 writeable: bool = True,
118 filename: str | None,
119 metadata: DatabaseMetadata | None,
120 allow_temporary_tables: bool,
121 ) -> None:
122 # Initialization logic shared between ``__init__`` and ``clone``.
123 super().__init__(
124 origin=origin,
125 engine=engine,
126 namespace=namespace,
127 metadata=metadata,
128 allow_temporary_tables=allow_temporary_tables,
129 )
130 self._writeable = writeable
131 self.filename = filename
133 def clone(self) -> SqliteDatabase:
134 clone = self.__new__(type(self))
135 clone._init(
136 engine=self._engine,
137 origin=self.origin,
138 namespace=self.namespace,
139 writeable=self._writeable,
140 filename=self.filename,
141 metadata=self._metadata,
142 allow_temporary_tables=self._allow_temporary_tables,
143 )
144 return clone
146 @classmethod
147 def makeDefaultUri(cls, root: str) -> str | None:
148 return "sqlite:///" + os.path.join(root, "gen3.sqlite3")
150 @classmethod
151 def makeEngine(
152 cls,
153 uri: str | sqlalchemy.engine.URL | None = None,
154 *,
155 filename: str | None = None,
156 writeable: bool = True,
157 ) -> sqlalchemy.engine.Engine:
158 """Create a `sqlalchemy.engine.Engine` from a SQLAlchemy URI or
159 filename.
161 Parameters
162 ----------
163 uri : `str` or `sqlalchemy.engine.URL`, optional
164 A SQLAlchemy URI connection string.
165 filename : `str`
166 Name of the SQLite database file, or `None` to use an in-memory
167 database. Ignored if ``uri is not None``.
168 writeable : `bool`, optional
169 If `True`, allow write operations on the database, including
170 ``CREATE TABLE``.
172 Returns
173 -------
174 engine : `sqlalchemy.engine.Engine`
175 A database engine.
176 """
177 # In order to be able to tell SQLite that we want a read-only or
178 # read-write connection, we need to make the SQLite DBAPI connection
179 # with a "URI"-based connection string. SQLAlchemy claims it can do
180 # this
181 # (https://docs.sqlalchemy.org/en/13/dialects/sqlite.html#uri-connections),
182 # but it doesn't seem to work as advertised. To work around this, we
183 # use the 'creator' argument to sqlalchemy.engine.create_engine, which
184 # lets us pass a callable that creates the DBAPI connection.
185 if uri is None:
186 if filename is None:
187 target = ":memory:"
188 uri = "sqlite://"
189 else:
190 target = f"file:{filename}"
191 uri = f"sqlite:///{filename}"
192 else:
193 if isinstance(uri, sqlalchemy.engine.URL):
194 # We have to parse strings anyway, so convert it to string.
195 uri = uri.render_as_string(hide_password=False)
196 parsed = urllib.parse.urlparse(uri)
197 queries = parsed.query.split("&")
198 if "uri=true" in queries:
199 # This is a SQLAlchemy URI that is already trying to make a
200 # SQLite connection via a SQLite URI, and hence there may
201 # be URI components for both SQLite and SQLAlchemy. We
202 # don't need to support that, and it'd be a
203 # reimplementation of all of the (broken) logic in
204 # SQLAlchemy for doing this, so we just don't.
205 raise NotImplementedError("SQLite connection strings with 'uri=true' are not supported.")
206 # This is just a SQLAlchemy URI with a non-URI SQLite
207 # connection string inside it. Pull that out so we can use it
208 # in the creator call.
209 if parsed.path.startswith("/"):
210 filename = parsed.path[1:]
211 target = f"file:{filename}"
212 else:
213 filename = None
214 target = ":memory:"
215 if filename is None:
216 if not writeable:
217 raise NotImplementedError("Read-only :memory: databases are not supported.")
218 else:
219 if writeable:
220 target += "?mode=rwc&uri=true"
221 else:
222 target += "?mode=ro&uri=true"
224 def creator() -> sqlite3.Connection:
225 return sqlite3.connect(target, check_same_thread=False, uri=True)
227 engine = sqlalchemy.engine.create_engine(uri, creator=creator)
229 sqlalchemy.event.listen(engine, "connect", _onSqlite3Connect)
231 def _onSqlite3Begin(connection: sqlalchemy.engine.Connection) -> sqlalchemy.engine.Connection:
232 assert connection.dialect.name == "sqlite"
233 # Replace pysqlite's buggy transaction handling that never BEGINs
234 # with our own that does, and tell SQLite to try to acquire a lock
235 # as soon as we start a transaction that might involve writes (this
236 # should lead to more blocking and fewer deadlocks).
237 if writeable:
238 connection.execute(sqlalchemy.text("BEGIN IMMEDIATE"))
239 else:
240 connection.execute(sqlalchemy.text("BEGIN"))
241 return connection
243 sqlalchemy.event.listen(engine, "begin", _onSqlite3Begin)
245 return engine
247 @classmethod
248 def fromEngine(
249 cls,
250 engine: sqlalchemy.engine.Engine,
251 *,
252 origin: int,
253 namespace: str | None = None,
254 writeable: bool = True,
255 ) -> Database:
256 return cls(engine=engine, origin=origin, writeable=writeable, namespace=namespace)
258 def isWriteable(self) -> bool:
259 return self._writeable
261 def __str__(self) -> str:
262 if self.filename:
263 return f"SQLite3@{self.filename}"
264 else:
265 return "SQLite3@:memory:"
267 def _lockTables(
268 self, connection: sqlalchemy.engine.Connection, tables: Iterable[sqlalchemy.schema.Table] = ()
269 ) -> None:
270 # Docstring inherited.
271 # Our SQLite database always acquires full-database locks at the
272 # beginning of a transaction, so there's no need to acquire table-level
273 # locks - which is good, because SQLite doesn't have table-level
274 # locking.
275 pass
277 # MyPy claims that the return type here isn't covariant with the return
278 # type of the base class method, which is formally correct but irrelevant
279 # - the base class return type is _GeneratorContextManager, but only
280 # because it's generated by the contextmanager decorator.
281 def declareStaticTables( # type: ignore
282 self, *, create: bool
283 ) -> AbstractContextManager[StaticTablesContext]:
284 # If the user asked for an in-memory, writeable database, then we may
285 # need to re-create schema even if create=False because schema can be
286 # lost on re-connect. This is only really relevant for tests, and it's
287 # convenient there.
288 if self.filename is None and self.isWriteable():
289 inspector = sqlalchemy.inspect(self._engine)
290 tables = inspector.get_table_names(schema=self.namespace)
291 if not tables:
292 create = True
293 return super().declareStaticTables(create=create)
295 def _convertFieldSpec(
296 self, table: str, spec: ddl.FieldSpec, metadata: sqlalchemy.MetaData, **kwargs: Any
297 ) -> sqlalchemy.schema.Column:
298 if spec.autoincrement:
299 if not spec.primaryKey:
300 raise RuntimeError(
301 f"Autoincrement field {table}.{spec.name} that is not a primary key is not supported."
302 )
303 if spec.dtype != sqlalchemy.Integer:
304 # SQLite's autoincrement is really limited; it only works if
305 # the column type is exactly "INTEGER". But it also doesn't
306 # care about the distinctions between different integer types,
307 # so it's safe to change it.
308 spec = copy.copy(spec)
309 spec.dtype = sqlalchemy.Integer
310 return super()._convertFieldSpec(table, spec, metadata, **kwargs)
312 def _makeColumnConstraints(self, table: str, spec: ddl.FieldSpec) -> list[sqlalchemy.CheckConstraint]:
313 # For sqlite we force constraints on all string columns since sqlite
314 # ignores everything otherwise and this leads to problems with
315 # other databases.
317 constraints = []
318 if spec.isStringType():
319 name = self.shrinkDatabaseEntityName("_".join([table, "len", spec.name]))
320 constraints.append(
321 sqlalchemy.CheckConstraint(
322 f'length("{spec.name}")<={spec.length}'
323 # Oracle converts
324 # empty strings to
325 # NULL so check
326 f' AND length("{spec.name}")>=1',
327 name=name,
328 )
329 )
331 constraints.extend(super()._makeColumnConstraints(table, spec))
332 return constraints
334 def _convertTableSpec(
335 self, name: str, spec: ddl.TableSpec, metadata: sqlalchemy.MetaData, **kwargs: Any
336 ) -> sqlalchemy.schema.Table:
337 primaryKeyFieldNames = {field.name for field in spec.fields if field.primaryKey}
338 autoincrFieldNames = {field.name for field in spec.fields if field.autoincrement}
339 if len(autoincrFieldNames) > 1:
340 raise RuntimeError("At most one autoincrement field per table is allowed.")
341 if len(primaryKeyFieldNames) > 1 and len(autoincrFieldNames) > 0:
342 # SQLite's default rowid-based autoincrement doesn't work if the
343 # field is just one field in a compound primary key. As a
344 # workaround, we create an extra table with just one column that
345 # we'll insert into to generate those IDs. That's only safe if
346 # that single-column table's records are already unique with just
347 # the autoincrement field, not the rest of the primary key. In
348 # practice, that means the single-column table's records are those
349 # for which origin == self.origin.
350 (autoincrFieldName,) = autoincrFieldNames
351 otherPrimaryKeyFieldNames = primaryKeyFieldNames - autoincrFieldNames
352 if otherPrimaryKeyFieldNames != {"origin"}:
353 # We need the only other field in the key to be 'origin'.
354 raise NotImplementedError(
355 "Compound primary keys with an autoincrement are only supported in SQLite "
356 "if the only non-autoincrement primary key field is 'origin'."
357 )
358 if not spec.recycleIds:
359 kwargs = dict(kwargs, sqlite_autoincrement=True)
360 return super()._convertTableSpec(name, spec, metadata, **kwargs)
362 def replace(self, table: sqlalchemy.schema.Table, *rows: dict) -> None:
363 self.assertTableWriteable(table, f"Cannot replace into read-only table {table}.")
364 if not rows:
365 return
366 query = sqlalchemy.dialects.sqlite.insert(table)
367 excluded = query.excluded
368 data = {
369 column.name: getattr(excluded, column.name)
370 for column in table.columns
371 if column.name not in table.primary_key
372 }
373 if not data:
374 self.ensure(table, *rows)
375 return
376 query = query.on_conflict_do_update(index_elements=table.primary_key, set_=data)
377 with self._transaction() as (_, connection):
378 connection.execute(query, rows)
380 def ensure(self, table: sqlalchemy.schema.Table, *rows: dict, primary_key_only: bool = False) -> int:
381 self.assertTableWriteable(table, f"Cannot ensure into read-only table {table}.")
382 if not rows:
383 return 0
384 query = sqlalchemy.dialects.sqlite.insert(table)
385 if primary_key_only:
386 query = query.on_conflict_do_nothing(index_elements=table.primary_key)
387 else:
388 query = query.on_conflict_do_nothing()
389 with self._transaction() as (_, connection):
390 return connection.execute(query, rows).rowcount
392 def constant_rows(
393 self,
394 fields: NamedValueAbstractSet[ddl.FieldSpec],
395 *rows: dict,
396 name: str | None = None,
397 ) -> sqlalchemy.sql.FromClause:
398 # Docstring inherited.
399 # While SQLite supports VALUES, it doesn't support assigning a name
400 # to that construct or the names of its columns, and hence there's no
401 # way to actually join it into a SELECT query. It seems the only
402 # alternative is something like:
403 #
404 # SELECT ? AS a, ? AS b
405 # UNION ALL
406 # SELECT ? AS a, ? AS b
407 #
408 selects = [
409 sqlalchemy.sql.select(
410 *[sqlalchemy.sql.literal(row[field.name], field.dtype).label(field.name) for field in fields]
411 )
412 for row in rows
413 ]
414 return sqlalchemy.sql.union_all(*selects).alias(name)
416 def get_constant_rows_max(self) -> int:
417 # Docstring inherited.
418 # This is the default SQLITE_MAX_COMPOUND_SELECT (see
419 # https://www.sqlite.org/limits.html):
420 return 500
422 @property
423 def has_distinct_on(self) -> bool:
424 # Docstring inherited.
425 return False
427 @property
428 def has_any_aggregate(self) -> bool:
429 # Docstring inherited.
430 return True
432 def apply_any_aggregate(self, column: sqlalchemy.ColumnElement[Any]) -> sqlalchemy.ColumnElement[Any]:
433 # Docstring inherited.
434 # In SQLite, columns are permitted in the SELECT clause without an
435 # aggregate function even if they're not in the GROUP BY, with an
436 # arbitrary value picked if there is more than one.
437 return column
439 def glob_expression(
440 self, expression: sqlalchemy.ColumnElement[Any], pattern: str
441 ) -> sqlalchemy.ColumnElement[bool]:
442 # Docstring inherited.
443 # SQLite GLOB operator is preferrable in our case because it is
444 # case-sensitive. Our glob syntax supports only * and ?, while GLOB
445 # also supports bracket expressions. While we do not expect brackets
446 # in patters, we want to convert them to regular characters. Note that
447 # escaping does not work in GLOB, so we want to replace ``\*`` with
448 # ``*`` and similarly for ``\?``.
449 if pattern == "*":
450 # Simple case.
451 return sqlalchemy.literal(True)
453 def _escape(pattern: str) -> str:
454 """Transform our glob pattern into SQLite pattern."""
455 tokens = []
456 escape = ""
457 for char in pattern:
458 if not escape:
459 if char == "\\":
460 escape = char
461 elif char in "[]":
462 # Handle brackets.
463 tokens.append(f"[{char}]")
464 else:
465 tokens.append(char)
466 else:
467 if char == "\\":
468 # Two backslashes become one.
469 tokens.append(char)
470 elif char in "[]*?":
471 # Handle escaped brackets and wildcards.
472 tokens.append(f"[{char}]")
473 else:
474 # Copy both backslash and character.
475 tokens.append(escape)
476 tokens.append(char)
477 escape = ""
478 # Trailing backslash added to pattern.
479 tokens.append(escape)
480 return "".join(tokens)
482 pattern = _escape(pattern)
483 return expression.op("GLOB")(sqlalchemy.literal(pattern))
485 filename: str | None
486 """Name of the file this database is connected to (`str` or `None`).
488 Set to `None` for in-memory databases.
489 """
492def _find_database_filename(
493 engine: sqlalchemy.engine.Engine,
494 namespace: str | None = None,
495) -> str | None:
496 # Get the filename from a call to 'PRAGMA database_list'.
497 with engine.connect() as connection, closing(connection.connection.cursor()) as cursor:
498 dbList = list(cursor.execute("PRAGMA database_list").fetchall())
499 if len(dbList) == 0:
500 raise RuntimeError("No database in connection.")
501 if namespace is None:
502 namespace = "main"
503 # Look for the filename associated with this namespace.
504 for _, dbname, filename in dbList: # B007
505 if dbname == namespace:
506 break
507 else:
508 raise RuntimeError(f"No '{namespace}' database in connection.")
509 if not filename:
510 return None
511 else:
512 return filename