Coverage for python / lsst / daf / butler / registry / bridge / monolithic.py: 25%

123 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-24 08:17 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27from __future__ import annotations 

28 

29from ... import ddl 

30 

31__all__ = ("MonolithicDatastoreRegistryBridge", "MonolithicDatastoreRegistryBridgeManager") 

32 

33from collections import namedtuple 

34from collections.abc import Collection, Iterable, Iterator 

35from contextlib import contextmanager 

36from typing import TYPE_CHECKING, cast 

37 

38import sqlalchemy 

39 

40from lsst.utils.iteration import chunk_iterable 

41 

42from ..._dataset_ref import DatasetId 

43from ...datastore.stored_file_info import StoredDatastoreItemInfo 

44from ..interfaces import ( 

45 DatasetIdRef, 

46 DatastoreRegistryBridge, 

47 DatastoreRegistryBridgeManager, 

48 FakeDatasetRef, 

49 OpaqueTableStorage, 

50 VersionTuple, 

51) 

52from ..opaque import ByNameOpaqueTableStorage 

53from .ephemeral import EphemeralDatastoreRegistryBridge 

54 

55if TYPE_CHECKING: 

56 from ...datastore import DatastoreTransaction 

57 from ...dimensions import DimensionUniverse 

58 from ..interfaces import ( 

59 Database, 

60 DatasetRecordStorageManager, 

61 OpaqueTableStorageManager, 

62 StaticTablesContext, 

63 ) 

64 

65_TablesTuple = namedtuple( 

66 "_TablesTuple", 

67 [ 

68 "dataset_location", 

69 "dataset_location_trash", 

70 ], 

71) 

72 

73# This has to be updated on every schema change 

74_VERSION = VersionTuple(0, 2, 1) 

75 

76 

77def _makeTableSpecs(datasets: type[DatasetRecordStorageManager]) -> _TablesTuple: 

78 """Construct specifications for tables used by the monolithic datastore 

79 bridge classes. 

80 

81 Parameters 

82 ---------- 

83 datasets : subclass of `DatasetRecordStorageManager` 

84 Manager class for datasets; used only to create foreign key fields. 

85 

86 Returns 

87 ------- 

88 specs : `_TablesTuple` 

89 A named tuple containing `ddl.TableSpec` instances. 

90 """ 

91 # We want the dataset_location and dataset_location_trash tables 

92 # to have the same definition, aside from the behavior of their link 

93 # to the dataset table: the trash table has no foreign key constraint. 

94 # The order of columns in dataset_location_trash is reversed, it is more 

95 # optimal for query planner. 

96 

97 datastore_field = ddl.FieldSpec( 

98 name="datastore_name", 

99 dtype=sqlalchemy.String, 

100 length=256, 

101 primaryKey=True, 

102 nullable=False, 

103 doc="Name of the Datastore this entry corresponds to.", 

104 ) 

105 

106 dataset_location = ddl.TableSpec( 

107 doc=( 

108 "A table that provides information on whether a dataset is stored in " 

109 "one or more Datastores. The presence or absence of a record in this " 

110 "table itself indicates whether the dataset is present in that " 

111 "Datastore. " 

112 ), 

113 fields=[datastore_field], 

114 ) 

115 datasets.addDatasetForeignKey(dataset_location, primaryKey=True) 

116 

117 dataset_location_trash = ddl.TableSpec( 

118 doc="A table that keeps iinformation about datasets that are removed from Datastores.", 

119 fields=[], 

120 ) 

121 datasets.addDatasetForeignKey(dataset_location_trash, primaryKey=True, constraint=False) 

122 dataset_location_trash.fields.add(datastore_field) 

123 

124 return _TablesTuple( 

125 dataset_location=dataset_location, 

126 dataset_location_trash=dataset_location_trash, 

127 ) 

128 

129 

130class MonolithicDatastoreRegistryBridge(DatastoreRegistryBridge): 

131 """An implementation of `DatastoreRegistryBridge` that uses the same two 

132 tables for all non-ephemeral datastores. 

133 

134 Parameters 

135 ---------- 

136 datastoreName : `str` 

137 Name of the `Datastore` as it should appear in `Registry` tables 

138 referencing it. 

139 db : `Database` 

140 Object providing a database connection and generic distractions. 

141 tables : `_TablesTuple` 

142 Named tuple containing `sqlalchemy.schema.Table` instances. 

143 """ 

144 

145 def __init__(self, datastoreName: str, *, db: Database, tables: _TablesTuple): 

146 super().__init__(datastoreName) 

147 self._db = db 

148 self._tables = tables 

149 

150 def _refsToRows(self, refs: Iterable[DatasetIdRef]) -> list[dict]: 

151 """Transform an iterable of `DatasetRef` or `FakeDatasetRef` objects to 

152 a list of dictionaries that match the schema of the tables used by this 

153 class. 

154 

155 Parameters 

156 ---------- 

157 refs : `~collections.abc.Iterable` [ `DatasetRef` or `FakeDatasetRef` ] 

158 Datasets to transform. 

159 

160 Returns 

161 ------- 

162 rows : `list` [ `dict` ] 

163 List of dictionaries, with "datastoreName" and "dataset_id" keys. 

164 """ 

165 return [{"datastore_name": self.datastoreName, "dataset_id": ref.id} for ref in refs] 

166 

167 def ensure(self, refs: Iterable[DatasetIdRef]) -> None: 

168 # Docstring inherited from DatastoreRegistryBridge 

169 self._db.ensure(self._tables.dataset_location, *self._refsToRows(refs)) 

170 

171 def insert(self, refs: Iterable[DatasetIdRef]) -> None: 

172 # Docstring inherited from DatastoreRegistryBridge 

173 self._db.insert(self._tables.dataset_location, *self._refsToRows(refs)) 

174 

175 def forget(self, refs: Iterable[DatasetIdRef]) -> None: 

176 # Docstring inherited from DatastoreRegistryBridge 

177 with self._db.transaction(): 

178 # The list of IDs can be very large, split it into reasonable size 

179 # chunks to avoid hitting limits. 

180 for refs_chunk in chunk_iterable(refs, 50_000): 

181 dataset_ids = [ref.id for ref in refs_chunk] 

182 where = sqlalchemy.sql.and_( 

183 self._tables.dataset_location.columns.datastore_name == self.datastoreName, 

184 self._tables.dataset_location.columns.dataset_id.in_(dataset_ids), 

185 ) 

186 self._db.deleteWhere(self._tables.dataset_location, where) 

187 

188 def moveToTrash(self, refs: Iterable[DatasetIdRef], transaction: DatastoreTransaction | None) -> None: 

189 # Docstring inherited from DatastoreRegistryBridge 

190 location = self._tables.dataset_location 

191 location_trash = self._tables.dataset_location_trash 

192 with self._db.transaction(): 

193 for refs_chunk in chunk_iterable(refs, 50_000): 

194 # We only want to move IDs that actually exist in the 

195 # dataset_location table. Instead of querying for existing IDs, 

196 # which would need an extra query, we use INSERT ... SELECT 

197 # and DELETE using WHERE clause that limits operations to 

198 # existing IDs. 

199 dataset_ids = [ref.id for ref in refs_chunk] 

200 

201 where = sqlalchemy.sql.and_( 

202 location.columns.datastore_name == self.datastoreName, 

203 location.columns.dataset_id.in_(dataset_ids), 

204 ) 

205 

206 select = ( 

207 sqlalchemy.sql.select(location.columns.datastore_name, location.columns.dataset_id) 

208 .where(where) 

209 .with_for_update() 

210 ) 

211 self._db.insert(location_trash, select=select) 

212 

213 self._db.deleteWhere(location, where) 

214 

215 def check(self, refs: Iterable[DatasetIdRef]) -> Iterable[DatasetIdRef]: 

216 # Docstring inherited from DatastoreRegistryBridge 

217 byId = {ref.id: ref for ref in refs} 

218 found: list[DatasetIdRef] = [] 

219 with self._db.session(): 

220 for batch in chunk_iterable(byId.keys(), 50000): 

221 sql = ( 

222 sqlalchemy.sql.select(self._tables.dataset_location.columns.dataset_id) 

223 .select_from(self._tables.dataset_location) 

224 .where( 

225 sqlalchemy.sql.and_( 

226 self._tables.dataset_location.columns.datastore_name == self.datastoreName, 

227 self._tables.dataset_location.columns.dataset_id.in_(batch), 

228 ) 

229 ) 

230 ) 

231 with self._db.query(sql) as sql_result: 

232 sql_ids = sql_result.scalars().all() 

233 found.extend(byId[id] for id in sql_ids) 

234 

235 return found 

236 

237 @contextmanager 

238 def emptyTrash( 

239 self, 

240 records_table: OpaqueTableStorage | None = None, 

241 record_class: type[StoredDatastoreItemInfo] | None = None, 

242 record_column: str | None = None, 

243 selected_ids: Collection[DatasetId] | None = None, 

244 dry_run: bool = False, 

245 ) -> Iterator[tuple[Iterable[tuple[DatasetIdRef, StoredDatastoreItemInfo | None]], set[str] | None]]: 

246 # Docstring inherited from DatastoreRegistryBridge 

247 

248 if records_table is None: 

249 raise ValueError("This implementation requires a records table.") 

250 

251 assert isinstance(records_table, ByNameOpaqueTableStorage), ( 

252 f"Records table must support hidden attributes. Got {type(records_table)}." 

253 ) 

254 

255 if record_class is None: 

256 raise ValueError("Record class must be provided if records table is given.") 

257 

258 # Helper closure to generate the common join+where clause. 

259 def join_records( 

260 select: sqlalchemy.sql.Select, location_table: sqlalchemy.schema.Table 

261 ) -> sqlalchemy.sql.Select: 

262 # mypy needs to be sure 

263 assert isinstance(records_table, ByNameOpaqueTableStorage) 

264 return select.select_from( 

265 records_table._table.join( 

266 location_table, 

267 onclause=records_table._table.columns.dataset_id == location_table.columns.dataset_id, 

268 ) 

269 ).where(location_table.columns.datastore_name == self.datastoreName) 

270 

271 # SELECT records.dataset_id, records.path FROM records 

272 # JOIN records on dataset_location.dataset_id == records.dataset_id 

273 # WHERE dataset_location.datastore_name = datastoreName 

274 

275 # It's possible that we may end up with a ref listed in the trash 

276 # table that is not listed in the records table. Such an 

277 # inconsistency would be missed by this query. 

278 info_in_trash = join_records(records_table._table.select(), self._tables.dataset_location_trash) 

279 if selected_ids: 

280 info_in_trash = info_in_trash.where( 

281 self._tables.dataset_location_trash.columns["dataset_id"].in_(selected_ids) 

282 ) 

283 info_in_trash = info_in_trash.with_for_update(skip_locked=True) 

284 

285 # Run query, transform results into a list of dicts that we can later 

286 # use to delete. 

287 with self._db.query(info_in_trash) as sql_result: 

288 rows = [dict(row, datastore_name=self.datastoreName) for row in sql_result.mappings()] 

289 

290 # It is possible for trashed refs to be linked to artifacts that 

291 # are still associated with refs that are not to be trashed. We 

292 # need to be careful to consider those and indicate to the caller 

293 # that those artifacts should be retained. Can only do this check 

294 # if the caller provides a column name that can map to multiple 

295 # refs. 

296 preserved: set[str] | None = None 

297 if record_column is not None: 

298 # Some helper subqueries 

299 items_not_in_trash = join_records( 

300 sqlalchemy.sql.select(records_table._table.columns[record_column]), 

301 self._tables.dataset_location, 

302 ).alias("items_not_in_trash") 

303 items_in_trash = join_records( 

304 sqlalchemy.sql.select(records_table._table.columns[record_column]), 

305 self._tables.dataset_location_trash, 

306 ) 

307 if selected_ids: 

308 items_in_trash = items_in_trash.where( 

309 self._tables.dataset_location_trash.columns["dataset_id"].in_(selected_ids) 

310 ) 

311 items_in_trash_alias = items_in_trash.alias("items_in_trash") 

312 

313 # A query for paths that are referenced by datasets in the trash 

314 # and datasets not in the trash. 

315 items_to_preserve = sqlalchemy.sql.select( 

316 items_in_trash_alias.columns[record_column] 

317 ).select_from( 

318 items_not_in_trash.join( 

319 items_in_trash_alias, 

320 onclause=items_in_trash_alias.columns[record_column] 

321 == items_not_in_trash.columns[record_column], 

322 ) 

323 ) 

324 with self._db.query(items_to_preserve) as sql_result: 

325 preserved = {row[record_column] for row in sql_result.mappings()} 

326 

327 # Convert results to a tuple of id+info and a record of the artifacts 

328 # that should not be deleted from datastore. The id+info tuple is 

329 # solely to allow logging to report the relevant ID. 

330 id_info = ((FakeDatasetRef(row["dataset_id"]), record_class.from_record(row)) for row in rows) 

331 

332 # Start contextmanager, return results 

333 yield ((id_info, preserved)) 

334 

335 # No exception raised in context manager block. 

336 if not rows or dry_run: 

337 return 

338 

339 # Delete the rows from the records table 

340 records_table.delete(["dataset_id"], *[{"dataset_id": row["dataset_id"]} for row in rows]) 

341 

342 # Delete those rows from the trash table. 

343 self._db.delete( 

344 self._tables.dataset_location_trash, 

345 ["dataset_id", "datastore_name"], 

346 *[{"dataset_id": row["dataset_id"], "datastore_name": row["datastore_name"]} for row in rows], 

347 ) 

348 

349 

350class MonolithicDatastoreRegistryBridgeManager(DatastoreRegistryBridgeManager): 

351 """An implementation of `DatastoreRegistryBridgeManager` that uses the same 

352 two tables for all non-ephemeral datastores. 

353 

354 Parameters 

355 ---------- 

356 db : `Database` 

357 Object providing a database connection and generic distractions. 

358 tables : `_TablesTuple` 

359 Named tuple containing `sqlalchemy.schema.Table` instances. 

360 opaque : `OpaqueTableStorageManager` 

361 Manager object for opaque table storage in the `Registry`. 

362 universe : `DimensionUniverse` 

363 All dimensions know to the `Registry`. 

364 registry_schema_version : `VersionTuple` or `None`, optional 

365 The version of the registry schema. 

366 """ 

367 

368 def __init__( 

369 self, 

370 *, 

371 db: Database, 

372 tables: _TablesTuple, 

373 opaque: OpaqueTableStorageManager, 

374 universe: DimensionUniverse, 

375 registry_schema_version: VersionTuple | None = None, 

376 ): 

377 super().__init__( 

378 opaque=opaque, 

379 universe=universe, 

380 registry_schema_version=registry_schema_version, 

381 ) 

382 self._db = db 

383 self._tables = tables 

384 self._ephemeral: dict[str, EphemeralDatastoreRegistryBridge] = {} 

385 

386 def clone(self, *, db: Database, opaque: OpaqueTableStorageManager) -> DatastoreRegistryBridgeManager: 

387 return MonolithicDatastoreRegistryBridgeManager( 

388 db=db, 

389 tables=self._tables, 

390 opaque=opaque, 

391 universe=self.universe, 

392 registry_schema_version=self._registry_schema_version, 

393 ) 

394 

395 @classmethod 

396 def initialize( 

397 cls, 

398 db: Database, 

399 context: StaticTablesContext, 

400 *, 

401 opaque: OpaqueTableStorageManager, 

402 datasets: type[DatasetRecordStorageManager], 

403 universe: DimensionUniverse, 

404 registry_schema_version: VersionTuple | None = None, 

405 ) -> DatastoreRegistryBridgeManager: 

406 # Docstring inherited from DatastoreRegistryBridge 

407 tables = context.addTableTuple(_makeTableSpecs(datasets)) 

408 return cls( 

409 db=db, 

410 tables=cast(_TablesTuple, tables), 

411 opaque=opaque, 

412 universe=universe, 

413 registry_schema_version=registry_schema_version, 

414 ) 

415 

416 def refresh(self) -> None: 

417 # Docstring inherited from DatastoreRegistryBridge 

418 # This implementation has no in-Python state that depends on which 

419 # datastores exist, so there's nothing to do. 

420 pass 

421 

422 def register(self, name: str, *, ephemeral: bool = False) -> DatastoreRegistryBridge: 

423 # Docstring inherited from DatastoreRegistryBridge 

424 if ephemeral: 

425 return self._ephemeral.setdefault(name, EphemeralDatastoreRegistryBridge(name)) 

426 return MonolithicDatastoreRegistryBridge(name, db=self._db, tables=self._tables) 

427 

428 def findDatastores(self, ref: DatasetIdRef) -> Iterable[str]: 

429 # Docstring inherited from DatastoreRegistryBridge 

430 sql = ( 

431 sqlalchemy.sql.select(self._tables.dataset_location.columns.datastore_name) 

432 .select_from(self._tables.dataset_location) 

433 .where(self._tables.dataset_location.columns.dataset_id == ref.id) 

434 ) 

435 with self._db.query(sql) as sql_result: 

436 sql_rows = sql_result.mappings().fetchall() 

437 for row in sql_rows: 

438 yield row[self._tables.dataset_location.columns.datastore_name] 

439 for name, bridge in self._ephemeral.items(): 

440 if ref in bridge: 

441 yield name 

442 

443 @classmethod 

444 def currentVersions(cls) -> list[VersionTuple]: 

445 # Docstring inherited from VersionedExtension. 

446 return [_VERSION]