Coverage for python / lsst / daf / butler / registry / databases / sqlite.py: 18%

192 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-01 08:18 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27from __future__ import annotations 

28 

29__all__ = ["SqliteDatabase"] 

30 

31import copy 

32import os 

33import sqlite3 

34import urllib.parse 

35from collections.abc import Iterable 

36from contextlib import AbstractContextManager, closing 

37from typing import Any 

38 

39import sqlalchemy 

40import sqlalchemy.dialects.sqlite 

41import sqlalchemy.ext.compiler 

42 

43from ... import ddl 

44from ..._named import NamedValueAbstractSet 

45from ..interfaces import Database, DatabaseMetadata, StaticTablesContext 

46 

47 

48def _onSqlite3Connect( 

49 dbapiConnection: sqlite3.Connection, connectionRecord: sqlalchemy.pool._ConnectionRecord 

50) -> None: 

51 assert isinstance(dbapiConnection, sqlite3.Connection) 

52 # Prevent pysqlite from emitting BEGIN and COMMIT statements. 

53 dbapiConnection.isolation_level = None 

54 # Enable foreign keys 

55 with closing(dbapiConnection.cursor()) as cursor: 

56 cursor.execute("PRAGMA foreign_keys=ON;") 

57 cursor.execute("PRAGMA busy_timeout = 300000;") # in ms, so 5min (way longer than should be needed) 

58 

59 

60class SqliteDatabase(Database): 

61 """An implementation of the `Database` interface for SQLite3. 

62 

63 Parameters 

64 ---------- 

65 engine : `sqlalchemy.engine.Engine` 

66 Engine to use for this connection. 

67 origin : `int` 

68 An integer ID that should be used as the default for any datasets, 

69 quanta, or other entities that use a (autoincrement, origin) compound 

70 primary key. 

71 namespace : `str`, optional 

72 The namespace (schema) this database is associated with. If `None`, 

73 the default schema for the connection is used (which may be `None`). 

74 writeable : `bool`, optional 

75 If `True`, allow write operations on the database, including 

76 ``CREATE TABLE``. 

77 allow_temporary_tables : `bool`, optional 

78 If `True`, database operations will be allowed to use temporary 

79 tables. 

80 If `False`, other SQL constructs will be used instead of temporary 

81 tables when possible. 

82 

83 Notes 

84 ----- 

85 The case where ``namespace is not None`` is not yet tested, and may be 

86 broken; we need an API for attaching to different databases in order to 

87 write those tests, but haven't yet worked out what is common/different 

88 across databases well enough to define it. 

89 """ 

90 

91 def __init__( 

92 self, 

93 *, 

94 engine: sqlalchemy.engine.Engine, 

95 origin: int, 

96 namespace: str | None = None, 

97 writeable: bool = True, 

98 allow_temporary_tables: bool = True, 

99 ): 

100 filename = _find_database_filename(engine, namespace) 

101 self._init( 

102 engine=engine, 

103 origin=origin, 

104 namespace=namespace, 

105 writeable=writeable, 

106 filename=filename, 

107 metadata=None, 

108 allow_temporary_tables=allow_temporary_tables, 

109 ) 

110 

111 def _init( 

112 self, 

113 *, 

114 engine: sqlalchemy.engine.Engine, 

115 origin: int, 

116 namespace: str | None = None, 

117 writeable: bool = True, 

118 filename: str | None, 

119 metadata: DatabaseMetadata | None, 

120 allow_temporary_tables: bool, 

121 ) -> None: 

122 # Initialization logic shared between ``__init__`` and ``clone``. 

123 super().__init__( 

124 origin=origin, 

125 engine=engine, 

126 namespace=namespace, 

127 metadata=metadata, 

128 allow_temporary_tables=allow_temporary_tables, 

129 ) 

130 self._writeable = writeable 

131 self.filename = filename 

132 

133 def clone(self) -> SqliteDatabase: 

134 clone = self.__new__(type(self)) 

135 clone._init( 

136 engine=self._engine, 

137 origin=self.origin, 

138 namespace=self.namespace, 

139 writeable=self._writeable, 

140 filename=self.filename, 

141 metadata=self._metadata, 

142 allow_temporary_tables=self._allow_temporary_tables, 

143 ) 

144 return clone 

145 

146 @classmethod 

147 def makeDefaultUri(cls, root: str) -> str | None: 

148 return "sqlite:///" + os.path.join(root, "gen3.sqlite3") 

149 

150 @classmethod 

151 def makeEngine( 

152 cls, 

153 uri: str | sqlalchemy.engine.URL | None = None, 

154 *, 

155 filename: str | None = None, 

156 writeable: bool = True, 

157 ) -> sqlalchemy.engine.Engine: 

158 """Create a `sqlalchemy.engine.Engine` from a SQLAlchemy URI or 

159 filename. 

160 

161 Parameters 

162 ---------- 

163 uri : `str` or `sqlalchemy.engine.URL`, optional 

164 A SQLAlchemy URI connection string. 

165 filename : `str` 

166 Name of the SQLite database file, or `None` to use an in-memory 

167 database. Ignored if ``uri is not None``. 

168 writeable : `bool`, optional 

169 If `True`, allow write operations on the database, including 

170 ``CREATE TABLE``. 

171 

172 Returns 

173 ------- 

174 engine : `sqlalchemy.engine.Engine` 

175 A database engine. 

176 """ 

177 # In order to be able to tell SQLite that we want a read-only or 

178 # read-write connection, we need to make the SQLite DBAPI connection 

179 # with a "URI"-based connection string. SQLAlchemy claims it can do 

180 # this 

181 # (https://docs.sqlalchemy.org/en/13/dialects/sqlite.html#uri-connections), 

182 # but it doesn't seem to work as advertised. To work around this, we 

183 # use the 'creator' argument to sqlalchemy.engine.create_engine, which 

184 # lets us pass a callable that creates the DBAPI connection. 

185 if uri is None: 

186 if filename is None: 

187 target = ":memory:" 

188 uri = "sqlite://" 

189 else: 

190 target = f"file:{filename}" 

191 uri = f"sqlite:///{filename}" 

192 else: 

193 if isinstance(uri, sqlalchemy.engine.URL): 

194 # We have to parse strings anyway, so convert it to string. 

195 uri = uri.render_as_string(hide_password=False) 

196 parsed = urllib.parse.urlparse(uri) 

197 queries = parsed.query.split("&") 

198 if "uri=true" in queries: 

199 # This is a SQLAlchemy URI that is already trying to make a 

200 # SQLite connection via a SQLite URI, and hence there may 

201 # be URI components for both SQLite and SQLAlchemy. We 

202 # don't need to support that, and it'd be a 

203 # reimplementation of all of the (broken) logic in 

204 # SQLAlchemy for doing this, so we just don't. 

205 raise NotImplementedError("SQLite connection strings with 'uri=true' are not supported.") 

206 # This is just a SQLAlchemy URI with a non-URI SQLite 

207 # connection string inside it. Pull that out so we can use it 

208 # in the creator call. 

209 if parsed.path.startswith("/"): 

210 filename = parsed.path[1:] 

211 target = f"file:{filename}" 

212 else: 

213 filename = None 

214 target = ":memory:" 

215 if filename is None: 

216 if not writeable: 

217 raise NotImplementedError("Read-only :memory: databases are not supported.") 

218 else: 

219 if writeable: 

220 target += "?mode=rwc&uri=true" 

221 else: 

222 target += "?mode=ro&uri=true" 

223 

224 def creator() -> sqlite3.Connection: 

225 return sqlite3.connect(target, check_same_thread=False, uri=True) 

226 

227 engine = sqlalchemy.engine.create_engine(uri, creator=creator) 

228 

229 sqlalchemy.event.listen(engine, "connect", _onSqlite3Connect) 

230 

231 def _onSqlite3Begin(connection: sqlalchemy.engine.Connection) -> sqlalchemy.engine.Connection: 

232 assert connection.dialect.name == "sqlite" 

233 # Replace pysqlite's buggy transaction handling that never BEGINs 

234 # with our own that does, and tell SQLite to try to acquire a lock 

235 # as soon as we start a transaction that might involve writes (this 

236 # should lead to more blocking and fewer deadlocks). 

237 if writeable: 

238 connection.execute(sqlalchemy.text("BEGIN IMMEDIATE")) 

239 else: 

240 connection.execute(sqlalchemy.text("BEGIN")) 

241 return connection 

242 

243 sqlalchemy.event.listen(engine, "begin", _onSqlite3Begin) 

244 

245 return engine 

246 

247 @classmethod 

248 def fromEngine( 

249 cls, 

250 engine: sqlalchemy.engine.Engine, 

251 *, 

252 origin: int, 

253 namespace: str | None = None, 

254 writeable: bool = True, 

255 ) -> Database: 

256 return cls(engine=engine, origin=origin, writeable=writeable, namespace=namespace) 

257 

258 def isWriteable(self) -> bool: 

259 return self._writeable 

260 

261 def __str__(self) -> str: 

262 if self.filename: 

263 return f"SQLite3@{self.filename}" 

264 else: 

265 return "SQLite3@:memory:" 

266 

267 def _lockTables( 

268 self, connection: sqlalchemy.engine.Connection, tables: Iterable[sqlalchemy.schema.Table] = () 

269 ) -> None: 

270 # Docstring inherited. 

271 # Our SQLite database always acquires full-database locks at the 

272 # beginning of a transaction, so there's no need to acquire table-level 

273 # locks - which is good, because SQLite doesn't have table-level 

274 # locking. 

275 pass 

276 

277 # MyPy claims that the return type here isn't covariant with the return 

278 # type of the base class method, which is formally correct but irrelevant 

279 # - the base class return type is _GeneratorContextManager, but only 

280 # because it's generated by the contextmanager decorator. 

281 def declareStaticTables( # type: ignore 

282 self, *, create: bool 

283 ) -> AbstractContextManager[StaticTablesContext]: 

284 # If the user asked for an in-memory, writeable database, then we may 

285 # need to re-create schema even if create=False because schema can be 

286 # lost on re-connect. This is only really relevant for tests, and it's 

287 # convenient there. 

288 if self.filename is None and self.isWriteable(): 

289 inspector = sqlalchemy.inspect(self._engine) 

290 tables = inspector.get_table_names(schema=self.namespace) 

291 if not tables: 

292 create = True 

293 return super().declareStaticTables(create=create) 

294 

295 def _convertFieldSpec( 

296 self, table: str, spec: ddl.FieldSpec, metadata: sqlalchemy.MetaData, **kwargs: Any 

297 ) -> sqlalchemy.schema.Column: 

298 if spec.autoincrement: 

299 if not spec.primaryKey: 

300 raise RuntimeError( 

301 f"Autoincrement field {table}.{spec.name} that is not a primary key is not supported." 

302 ) 

303 if spec.dtype != sqlalchemy.Integer: 

304 # SQLite's autoincrement is really limited; it only works if 

305 # the column type is exactly "INTEGER". But it also doesn't 

306 # care about the distinctions between different integer types, 

307 # so it's safe to change it. 

308 spec = copy.copy(spec) 

309 spec.dtype = sqlalchemy.Integer 

310 return super()._convertFieldSpec(table, spec, metadata, **kwargs) 

311 

312 def _makeColumnConstraints(self, table: str, spec: ddl.FieldSpec) -> list[sqlalchemy.CheckConstraint]: 

313 # For sqlite we force constraints on all string columns since sqlite 

314 # ignores everything otherwise and this leads to problems with 

315 # other databases. 

316 

317 constraints = [] 

318 if spec.isStringType(): 

319 name = self.shrinkDatabaseEntityName("_".join([table, "len", spec.name])) 

320 constraints.append( 

321 sqlalchemy.CheckConstraint( 

322 f'length("{spec.name}")<={spec.length}' 

323 # Oracle converts 

324 # empty strings to 

325 # NULL so check 

326 f' AND length("{spec.name}")>=1', 

327 name=name, 

328 ) 

329 ) 

330 

331 constraints.extend(super()._makeColumnConstraints(table, spec)) 

332 return constraints 

333 

334 def _convertTableSpec( 

335 self, name: str, spec: ddl.TableSpec, metadata: sqlalchemy.MetaData, **kwargs: Any 

336 ) -> sqlalchemy.schema.Table: 

337 primaryKeyFieldNames = {field.name for field in spec.fields if field.primaryKey} 

338 autoincrFieldNames = {field.name for field in spec.fields if field.autoincrement} 

339 if len(autoincrFieldNames) > 1: 

340 raise RuntimeError("At most one autoincrement field per table is allowed.") 

341 if len(primaryKeyFieldNames) > 1 and len(autoincrFieldNames) > 0: 

342 # SQLite's default rowid-based autoincrement doesn't work if the 

343 # field is just one field in a compound primary key. As a 

344 # workaround, we create an extra table with just one column that 

345 # we'll insert into to generate those IDs. That's only safe if 

346 # that single-column table's records are already unique with just 

347 # the autoincrement field, not the rest of the primary key. In 

348 # practice, that means the single-column table's records are those 

349 # for which origin == self.origin. 

350 (autoincrFieldName,) = autoincrFieldNames 

351 otherPrimaryKeyFieldNames = primaryKeyFieldNames - autoincrFieldNames 

352 if otherPrimaryKeyFieldNames != {"origin"}: 

353 # We need the only other field in the key to be 'origin'. 

354 raise NotImplementedError( 

355 "Compound primary keys with an autoincrement are only supported in SQLite " 

356 "if the only non-autoincrement primary key field is 'origin'." 

357 ) 

358 if not spec.recycleIds: 

359 kwargs = dict(kwargs, sqlite_autoincrement=True) 

360 return super()._convertTableSpec(name, spec, metadata, **kwargs) 

361 

362 def replace(self, table: sqlalchemy.schema.Table, *rows: dict) -> None: 

363 self.assertTableWriteable(table, f"Cannot replace into read-only table {table}.") 

364 if not rows: 

365 return 

366 query = sqlalchemy.dialects.sqlite.insert(table) 

367 excluded = query.excluded 

368 data = { 

369 column.name: getattr(excluded, column.name) 

370 for column in table.columns 

371 if column.name not in table.primary_key 

372 } 

373 if not data: 

374 self.ensure(table, *rows) 

375 return 

376 query = query.on_conflict_do_update(index_elements=table.primary_key, set_=data) 

377 with self._transaction() as (_, connection): 

378 connection.execute(query, rows) 

379 

380 def ensure(self, table: sqlalchemy.schema.Table, *rows: dict, primary_key_only: bool = False) -> int: 

381 self.assertTableWriteable(table, f"Cannot ensure into read-only table {table}.") 

382 if not rows: 

383 return 0 

384 query = sqlalchemy.dialects.sqlite.insert(table) 

385 if primary_key_only: 

386 query = query.on_conflict_do_nothing(index_elements=table.primary_key) 

387 else: 

388 query = query.on_conflict_do_nothing() 

389 with self._transaction() as (_, connection): 

390 return connection.execute(query, rows).rowcount 

391 

392 def constant_rows( 

393 self, 

394 fields: NamedValueAbstractSet[ddl.FieldSpec], 

395 *rows: dict, 

396 name: str | None = None, 

397 ) -> sqlalchemy.sql.FromClause: 

398 # Docstring inherited. 

399 # While SQLite supports VALUES, it doesn't support assigning a name 

400 # to that construct or the names of its columns, and hence there's no 

401 # way to actually join it into a SELECT query. It seems the only 

402 # alternative is something like: 

403 # 

404 # SELECT ? AS a, ? AS b 

405 # UNION ALL 

406 # SELECT ? AS a, ? AS b 

407 # 

408 selects = [ 

409 sqlalchemy.sql.select( 

410 *[sqlalchemy.sql.literal(row[field.name], field.dtype).label(field.name) for field in fields] 

411 ) 

412 for row in rows 

413 ] 

414 return sqlalchemy.sql.union_all(*selects).alias(name) 

415 

416 def get_constant_rows_max(self) -> int: 

417 # Docstring inherited. 

418 # This is the default SQLITE_MAX_COMPOUND_SELECT (see 

419 # https://www.sqlite.org/limits.html): 

420 return 500 

421 

422 @property 

423 def has_distinct_on(self) -> bool: 

424 # Docstring inherited. 

425 return False 

426 

427 @property 

428 def has_any_aggregate(self) -> bool: 

429 # Docstring inherited. 

430 return True 

431 

432 def apply_any_aggregate(self, column: sqlalchemy.ColumnElement[Any]) -> sqlalchemy.ColumnElement[Any]: 

433 # Docstring inherited. 

434 # In SQLite, columns are permitted in the SELECT clause without an 

435 # aggregate function even if they're not in the GROUP BY, with an 

436 # arbitrary value picked if there is more than one. 

437 return column 

438 

439 def glob_expression( 

440 self, expression: sqlalchemy.ColumnElement[Any], pattern: str 

441 ) -> sqlalchemy.ColumnElement[bool]: 

442 # Docstring inherited. 

443 # SQLite GLOB operator is preferrable in our case because it is 

444 # case-sensitive. Our glob syntax supports only * and ?, while GLOB 

445 # also supports bracket expressions. While we do not expect brackets 

446 # in patters, we want to convert them to regular characters. Note that 

447 # escaping does not work in GLOB, so we want to replace ``\*`` with 

448 # ``*`` and similarly for ``\?``. 

449 if pattern == "*": 

450 # Simple case. 

451 return sqlalchemy.literal(True) 

452 

453 def _escape(pattern: str) -> str: 

454 """Transform our glob pattern into SQLite pattern.""" 

455 tokens = [] 

456 escape = "" 

457 for char in pattern: 

458 if not escape: 

459 if char == "\\": 

460 escape = char 

461 elif char in "[]": 

462 # Handle brackets. 

463 tokens.append(f"[{char}]") 

464 else: 

465 tokens.append(char) 

466 else: 

467 if char == "\\": 

468 # Two backslashes become one. 

469 tokens.append(char) 

470 elif char in "[]*?": 

471 # Handle escaped brackets and wildcards. 

472 tokens.append(f"[{char}]") 

473 else: 

474 # Copy both backslash and character. 

475 tokens.append(escape) 

476 tokens.append(char) 

477 escape = "" 

478 # Trailing backslash added to pattern. 

479 tokens.append(escape) 

480 return "".join(tokens) 

481 

482 pattern = _escape(pattern) 

483 return expression.op("GLOB")(sqlalchemy.literal(pattern)) 

484 

485 filename: str | None 

486 """Name of the file this database is connected to (`str` or `None`). 

487 

488 Set to `None` for in-memory databases. 

489 """ 

490 

491 

492def _find_database_filename( 

493 engine: sqlalchemy.engine.Engine, 

494 namespace: str | None = None, 

495) -> str | None: 

496 # Get the filename from a call to 'PRAGMA database_list'. 

497 with engine.connect() as connection, closing(connection.connection.cursor()) as cursor: 

498 dbList = list(cursor.execute("PRAGMA database_list").fetchall()) 

499 if len(dbList) == 0: 

500 raise RuntimeError("No database in connection.") 

501 if namespace is None: 

502 namespace = "main" 

503 # Look for the filename associated with this namespace. 

504 for _, dbname, filename in dbList: # B007 

505 if dbname == namespace: 

506 break 

507 else: 

508 raise RuntimeError(f"No '{namespace}' database in connection.") 

509 if not filename: 

510 return None 

511 else: 

512 return filename