Coverage for python / lsst / daf / butler / registry / bridge / monolithic.py: 25%
123 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:36 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:36 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
27from __future__ import annotations
29from ... import ddl
31__all__ = ("MonolithicDatastoreRegistryBridge", "MonolithicDatastoreRegistryBridgeManager")
33from collections import namedtuple
34from collections.abc import Collection, Iterable, Iterator
35from contextlib import contextmanager
36from typing import TYPE_CHECKING, cast
38import sqlalchemy
40from lsst.utils.iteration import chunk_iterable
42from ..._dataset_ref import DatasetId
43from ...datastore.stored_file_info import StoredDatastoreItemInfo
44from ..interfaces import (
45 DatasetIdRef,
46 DatastoreRegistryBridge,
47 DatastoreRegistryBridgeManager,
48 FakeDatasetRef,
49 OpaqueTableStorage,
50 VersionTuple,
51)
52from ..opaque import ByNameOpaqueTableStorage
53from .ephemeral import EphemeralDatastoreRegistryBridge
55if TYPE_CHECKING:
56 from ...datastore import DatastoreTransaction
57 from ...dimensions import DimensionUniverse
58 from ..interfaces import (
59 Database,
60 DatasetRecordStorageManager,
61 OpaqueTableStorageManager,
62 StaticTablesContext,
63 )
65_TablesTuple = namedtuple(
66 "_TablesTuple",
67 [
68 "dataset_location",
69 "dataset_location_trash",
70 ],
71)
73# This has to be updated on every schema change
74_VERSION = VersionTuple(0, 2, 1)
77def _makeTableSpecs(datasets: type[DatasetRecordStorageManager]) -> _TablesTuple:
78 """Construct specifications for tables used by the monolithic datastore
79 bridge classes.
81 Parameters
82 ----------
83 datasets : subclass of `DatasetRecordStorageManager`
84 Manager class for datasets; used only to create foreign key fields.
86 Returns
87 -------
88 specs : `_TablesTuple`
89 A named tuple containing `ddl.TableSpec` instances.
90 """
91 # We want the dataset_location and dataset_location_trash tables
92 # to have the same definition, aside from the behavior of their link
93 # to the dataset table: the trash table has no foreign key constraint.
94 # The order of columns in dataset_location_trash is reversed, it is more
95 # optimal for query planner.
97 datastore_field = ddl.FieldSpec(
98 name="datastore_name",
99 dtype=sqlalchemy.String,
100 length=256,
101 primaryKey=True,
102 nullable=False,
103 doc="Name of the Datastore this entry corresponds to.",
104 )
106 dataset_location = ddl.TableSpec(
107 doc=(
108 "A table that provides information on whether a dataset is stored in "
109 "one or more Datastores. The presence or absence of a record in this "
110 "table itself indicates whether the dataset is present in that "
111 "Datastore. "
112 ),
113 fields=[datastore_field],
114 )
115 datasets.addDatasetForeignKey(dataset_location, primaryKey=True)
117 dataset_location_trash = ddl.TableSpec(
118 doc="A table that keeps iinformation about datasets that are removed from Datastores.",
119 fields=[],
120 )
121 datasets.addDatasetForeignKey(dataset_location_trash, primaryKey=True, constraint=False)
122 dataset_location_trash.fields.add(datastore_field)
124 return _TablesTuple(
125 dataset_location=dataset_location,
126 dataset_location_trash=dataset_location_trash,
127 )
130class MonolithicDatastoreRegistryBridge(DatastoreRegistryBridge):
131 """An implementation of `DatastoreRegistryBridge` that uses the same two
132 tables for all non-ephemeral datastores.
134 Parameters
135 ----------
136 datastoreName : `str`
137 Name of the `Datastore` as it should appear in `Registry` tables
138 referencing it.
139 db : `Database`
140 Object providing a database connection and generic distractions.
141 tables : `_TablesTuple`
142 Named tuple containing `sqlalchemy.schema.Table` instances.
143 """
145 def __init__(self, datastoreName: str, *, db: Database, tables: _TablesTuple):
146 super().__init__(datastoreName)
147 self._db = db
148 self._tables = tables
150 def _refsToRows(self, refs: Iterable[DatasetIdRef]) -> list[dict]:
151 """Transform an iterable of `DatasetRef` or `FakeDatasetRef` objects to
152 a list of dictionaries that match the schema of the tables used by this
153 class.
155 Parameters
156 ----------
157 refs : `~collections.abc.Iterable` [ `DatasetRef` or `FakeDatasetRef` ]
158 Datasets to transform.
160 Returns
161 -------
162 rows : `list` [ `dict` ]
163 List of dictionaries, with "datastoreName" and "dataset_id" keys.
164 """
165 return [{"datastore_name": self.datastoreName, "dataset_id": ref.id} for ref in refs]
167 def ensure(self, refs: Iterable[DatasetIdRef]) -> None:
168 # Docstring inherited from DatastoreRegistryBridge
169 self._db.ensure(self._tables.dataset_location, *self._refsToRows(refs))
171 def insert(self, refs: Iterable[DatasetIdRef]) -> None:
172 # Docstring inherited from DatastoreRegistryBridge
173 self._db.insert(self._tables.dataset_location, *self._refsToRows(refs))
175 def forget(self, refs: Iterable[DatasetIdRef]) -> None:
176 # Docstring inherited from DatastoreRegistryBridge
177 with self._db.transaction():
178 # The list of IDs can be very large, split it into reasonable size
179 # chunks to avoid hitting limits.
180 for refs_chunk in chunk_iterable(refs, 50_000):
181 dataset_ids = [ref.id for ref in refs_chunk]
182 where = sqlalchemy.sql.and_(
183 self._tables.dataset_location.columns.datastore_name == self.datastoreName,
184 self._tables.dataset_location.columns.dataset_id.in_(dataset_ids),
185 )
186 self._db.deleteWhere(self._tables.dataset_location, where)
188 def moveToTrash(self, refs: Iterable[DatasetIdRef], transaction: DatastoreTransaction | None) -> None:
189 # Docstring inherited from DatastoreRegistryBridge
190 location = self._tables.dataset_location
191 location_trash = self._tables.dataset_location_trash
192 with self._db.transaction():
193 for refs_chunk in chunk_iterable(refs, 50_000):
194 # We only want to move IDs that actually exist in the
195 # dataset_location table. Instead of querying for existing IDs,
196 # which would need an extra query, we use INSERT ... SELECT
197 # and DELETE using WHERE clause that limits operations to
198 # existing IDs.
199 dataset_ids = [ref.id for ref in refs_chunk]
201 where = sqlalchemy.sql.and_(
202 location.columns.datastore_name == self.datastoreName,
203 location.columns.dataset_id.in_(dataset_ids),
204 )
206 select = (
207 sqlalchemy.sql.select(location.columns.datastore_name, location.columns.dataset_id)
208 .where(where)
209 .with_for_update()
210 )
211 self._db.insert(location_trash, select=select)
213 self._db.deleteWhere(location, where)
215 def check(self, refs: Iterable[DatasetIdRef]) -> Iterable[DatasetIdRef]:
216 # Docstring inherited from DatastoreRegistryBridge
217 byId = {ref.id: ref for ref in refs}
218 found: list[DatasetIdRef] = []
219 with self._db.session():
220 for batch in chunk_iterable(byId.keys(), 50000):
221 sql = (
222 sqlalchemy.sql.select(self._tables.dataset_location.columns.dataset_id)
223 .select_from(self._tables.dataset_location)
224 .where(
225 sqlalchemy.sql.and_(
226 self._tables.dataset_location.columns.datastore_name == self.datastoreName,
227 self._tables.dataset_location.columns.dataset_id.in_(batch),
228 )
229 )
230 )
231 with self._db.query(sql) as sql_result:
232 sql_ids = sql_result.scalars().all()
233 found.extend(byId[id] for id in sql_ids)
235 return found
237 @contextmanager
238 def emptyTrash(
239 self,
240 records_table: OpaqueTableStorage | None = None,
241 record_class: type[StoredDatastoreItemInfo] | None = None,
242 record_column: str | None = None,
243 selected_ids: Collection[DatasetId] | None = None,
244 dry_run: bool = False,
245 ) -> Iterator[tuple[Iterable[tuple[DatasetIdRef, StoredDatastoreItemInfo | None]], set[str] | None]]:
246 # Docstring inherited from DatastoreRegistryBridge
248 if records_table is None:
249 raise ValueError("This implementation requires a records table.")
251 assert isinstance(records_table, ByNameOpaqueTableStorage), (
252 f"Records table must support hidden attributes. Got {type(records_table)}."
253 )
255 if record_class is None:
256 raise ValueError("Record class must be provided if records table is given.")
258 # Helper closure to generate the common join+where clause.
259 def join_records(
260 select: sqlalchemy.sql.Select, location_table: sqlalchemy.schema.Table
261 ) -> sqlalchemy.sql.Select:
262 # mypy needs to be sure
263 assert isinstance(records_table, ByNameOpaqueTableStorage)
264 return select.select_from(
265 records_table._table.join(
266 location_table,
267 onclause=records_table._table.columns.dataset_id == location_table.columns.dataset_id,
268 )
269 ).where(location_table.columns.datastore_name == self.datastoreName)
271 # SELECT records.dataset_id, records.path FROM records
272 # JOIN records on dataset_location.dataset_id == records.dataset_id
273 # WHERE dataset_location.datastore_name = datastoreName
275 # It's possible that we may end up with a ref listed in the trash
276 # table that is not listed in the records table. Such an
277 # inconsistency would be missed by this query.
278 info_in_trash = join_records(records_table._table.select(), self._tables.dataset_location_trash)
279 if selected_ids:
280 info_in_trash = info_in_trash.where(
281 self._tables.dataset_location_trash.columns["dataset_id"].in_(selected_ids)
282 )
283 info_in_trash = info_in_trash.with_for_update(skip_locked=True)
285 # Run query, transform results into a list of dicts that we can later
286 # use to delete.
287 with self._db.query(info_in_trash) as sql_result:
288 rows = [dict(row, datastore_name=self.datastoreName) for row in sql_result.mappings()]
290 # It is possible for trashed refs to be linked to artifacts that
291 # are still associated with refs that are not to be trashed. We
292 # need to be careful to consider those and indicate to the caller
293 # that those artifacts should be retained. Can only do this check
294 # if the caller provides a column name that can map to multiple
295 # refs.
296 preserved: set[str] | None = None
297 if record_column is not None:
298 # Some helper subqueries
299 items_not_in_trash = join_records(
300 sqlalchemy.sql.select(records_table._table.columns[record_column]),
301 self._tables.dataset_location,
302 ).alias("items_not_in_trash")
303 items_in_trash = join_records(
304 sqlalchemy.sql.select(records_table._table.columns[record_column]),
305 self._tables.dataset_location_trash,
306 )
307 if selected_ids:
308 items_in_trash = items_in_trash.where(
309 self._tables.dataset_location_trash.columns["dataset_id"].in_(selected_ids)
310 )
311 items_in_trash_alias = items_in_trash.alias("items_in_trash")
313 # A query for paths that are referenced by datasets in the trash
314 # and datasets not in the trash.
315 items_to_preserve = sqlalchemy.sql.select(
316 items_in_trash_alias.columns[record_column]
317 ).select_from(
318 items_not_in_trash.join(
319 items_in_trash_alias,
320 onclause=items_in_trash_alias.columns[record_column]
321 == items_not_in_trash.columns[record_column],
322 )
323 )
324 with self._db.query(items_to_preserve) as sql_result:
325 preserved = {row[record_column] for row in sql_result.mappings()}
327 # Convert results to a tuple of id+info and a record of the artifacts
328 # that should not be deleted from datastore. The id+info tuple is
329 # solely to allow logging to report the relevant ID.
330 id_info = ((FakeDatasetRef(row["dataset_id"]), record_class.from_record(row)) for row in rows)
332 # Start contextmanager, return results
333 yield ((id_info, preserved))
335 # No exception raised in context manager block.
336 if not rows or dry_run:
337 return
339 # Delete the rows from the records table
340 records_table.delete(["dataset_id"], *[{"dataset_id": row["dataset_id"]} for row in rows])
342 # Delete those rows from the trash table.
343 self._db.delete(
344 self._tables.dataset_location_trash,
345 ["dataset_id", "datastore_name"],
346 *[{"dataset_id": row["dataset_id"], "datastore_name": row["datastore_name"]} for row in rows],
347 )
350class MonolithicDatastoreRegistryBridgeManager(DatastoreRegistryBridgeManager):
351 """An implementation of `DatastoreRegistryBridgeManager` that uses the same
352 two tables for all non-ephemeral datastores.
354 Parameters
355 ----------
356 db : `Database`
357 Object providing a database connection and generic distractions.
358 tables : `_TablesTuple`
359 Named tuple containing `sqlalchemy.schema.Table` instances.
360 opaque : `OpaqueTableStorageManager`
361 Manager object for opaque table storage in the `Registry`.
362 universe : `DimensionUniverse`
363 All dimensions know to the `Registry`.
364 registry_schema_version : `VersionTuple` or `None`, optional
365 The version of the registry schema.
366 """
368 def __init__(
369 self,
370 *,
371 db: Database,
372 tables: _TablesTuple,
373 opaque: OpaqueTableStorageManager,
374 universe: DimensionUniverse,
375 registry_schema_version: VersionTuple | None = None,
376 ):
377 super().__init__(
378 opaque=opaque,
379 universe=universe,
380 registry_schema_version=registry_schema_version,
381 )
382 self._db = db
383 self._tables = tables
384 self._ephemeral: dict[str, EphemeralDatastoreRegistryBridge] = {}
386 def clone(self, *, db: Database, opaque: OpaqueTableStorageManager) -> DatastoreRegistryBridgeManager:
387 return MonolithicDatastoreRegistryBridgeManager(
388 db=db,
389 tables=self._tables,
390 opaque=opaque,
391 universe=self.universe,
392 registry_schema_version=self._registry_schema_version,
393 )
395 @classmethod
396 def initialize(
397 cls,
398 db: Database,
399 context: StaticTablesContext,
400 *,
401 opaque: OpaqueTableStorageManager,
402 datasets: type[DatasetRecordStorageManager],
403 universe: DimensionUniverse,
404 registry_schema_version: VersionTuple | None = None,
405 ) -> DatastoreRegistryBridgeManager:
406 # Docstring inherited from DatastoreRegistryBridge
407 tables = context.addTableTuple(_makeTableSpecs(datasets))
408 return cls(
409 db=db,
410 tables=cast(_TablesTuple, tables),
411 opaque=opaque,
412 universe=universe,
413 registry_schema_version=registry_schema_version,
414 )
416 def refresh(self) -> None:
417 # Docstring inherited from DatastoreRegistryBridge
418 # This implementation has no in-Python state that depends on which
419 # datastores exist, so there's nothing to do.
420 pass
422 def register(self, name: str, *, ephemeral: bool = False) -> DatastoreRegistryBridge:
423 # Docstring inherited from DatastoreRegistryBridge
424 if ephemeral:
425 return self._ephemeral.setdefault(name, EphemeralDatastoreRegistryBridge(name))
426 return MonolithicDatastoreRegistryBridge(name, db=self._db, tables=self._tables)
428 def findDatastores(self, ref: DatasetIdRef) -> Iterable[str]:
429 # Docstring inherited from DatastoreRegistryBridge
430 sql = (
431 sqlalchemy.sql.select(self._tables.dataset_location.columns.datastore_name)
432 .select_from(self._tables.dataset_location)
433 .where(self._tables.dataset_location.columns.dataset_id == ref.id)
434 )
435 with self._db.query(sql) as sql_result:
436 sql_rows = sql_result.mappings().fetchall()
437 for row in sql_rows:
438 yield row[self._tables.dataset_location.columns.datastore_name]
439 for name, bridge in self._ephemeral.items():
440 if ref in bridge:
441 yield name
443 @classmethod
444 def currentVersions(cls) -> list[VersionTuple]:
445 # Docstring inherited from VersionedExtension.
446 return [_VERSION]