Coverage for python / lsst / daf / butler / registry / interfaces / _bridge.py: 81%

72 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:37 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27from __future__ import annotations 

28 

29__all__ = ("DatasetIdRef", "DatastoreRegistryBridge", "DatastoreRegistryBridgeManager", "FakeDatasetRef") 

30 

31from abc import ABC, abstractmethod 

32from collections.abc import Collection, Iterable 

33from contextlib import AbstractContextManager 

34from typing import TYPE_CHECKING, Any 

35 

36from lsst.utils.classes import immutable 

37 

38from ..._dataset_ref import DatasetId, DatasetRef 

39from ._versioning import VersionedExtension, VersionTuple 

40 

41if TYPE_CHECKING: 

42 from ..._dataset_ref import DatasetDatastoreRecords 

43 from ..._dataset_type import DatasetType 

44 from ...datastore import DatastoreTransaction 

45 from ...datastore.stored_file_info import StoredDatastoreItemInfo 

46 from ...dimensions import DimensionUniverse 

47 from ._database import Database, StaticTablesContext 

48 from ._datasets import DatasetRecordStorageManager 

49 from ._opaque import OpaqueTableStorage, OpaqueTableStorageManager 

50 

51 

52@immutable 

53class FakeDatasetRef: 

54 """A fake `DatasetRef` that can be used internally by butler where 

55 only the dataset ID is available. 

56 

57 Should only be used when registry can not be used to create a full 

58 `DatasetRef` from the ID. A particular use case is during dataset 

59 deletion when solely the ID is available. 

60 

61 Parameters 

62 ---------- 

63 id : `DatasetId` 

64 The dataset ID. 

65 """ 

66 

67 __slots__ = ("id",) 

68 

69 def __init__(self, id: DatasetId): 

70 self.id = id 

71 

72 def __str__(self) -> str: 

73 return f"dataset_id={self.id}" 

74 

75 def __repr__(self) -> str: 

76 return f"FakeDatasetRef({self.id})" 

77 

78 def __eq__(self, other: Any) -> bool: 

79 try: 

80 return self.id == other.id 

81 except AttributeError: 

82 return NotImplemented 

83 

84 def __hash__(self) -> int: 

85 return hash(self.id) 

86 

87 id: DatasetId 

88 """Unique identifier for this dataset. 

89 """ 

90 

91 @property 

92 def datasetType(self) -> DatasetType: 

93 raise AttributeError("A FakeDatasetRef can not be associated with a valid DatasetType") 

94 

95 @property 

96 def _datastore_records(self) -> DatasetDatastoreRecords | None: 

97 raise AttributeError("A FakeDatasetRef can not be associated with datastore records") 

98 

99 

100DatasetIdRef = DatasetRef | FakeDatasetRef 

101"""A type-annotation alias that matches both `DatasetRef` and `FakeDatasetRef`. 

102""" 

103 

104 

105class DatastoreRegistryBridge(ABC): 

106 """An abstract base class that defines the interface that a `Datastore` 

107 uses to communicate with a `Registry`. 

108 

109 Parameters 

110 ---------- 

111 datastoreName : `str` 

112 Name of the `Datastore` as it should appear in `Registry` tables 

113 referencing it. 

114 """ 

115 

116 def __init__(self, datastoreName: str): 

117 self.datastoreName = datastoreName 

118 

119 @abstractmethod 

120 def insert(self, refs: Iterable[DatasetIdRef]) -> None: 

121 """Record that a datastore holds the given datasets. 

122 

123 Parameters 

124 ---------- 

125 refs : `~collections.abc.Iterable` of `DatasetIdRef` 

126 References to the datasets. 

127 """ 

128 raise NotImplementedError() 

129 

130 @abstractmethod 

131 def ensure(self, refs: Iterable[DatasetIdRef]) -> None: 

132 """Record that a datastore holds the given datasets, skipping if 

133 the ref is already registered. 

134 

135 Parameters 

136 ---------- 

137 refs : `~collections.abc.Iterable` of `DatasetIdRef` 

138 References to the datasets. 

139 """ 

140 raise NotImplementedError() 

141 

142 @abstractmethod 

143 def forget(self, refs: Iterable[DatasetIdRef]) -> None: 

144 """Remove dataset location information without any attempt to put it 

145 in the trash while waiting for external deletes. 

146 

147 This should be used only to implement `Datastore.forget`, or in cases 

148 where deleting the actual datastore artifacts cannot fail. 

149 

150 Parameters 

151 ---------- 

152 refs : `~collections.abc.Iterable` of `DatasetIdRef` 

153 References to the datasets. 

154 """ 

155 raise NotImplementedError() 

156 

157 @abstractmethod 

158 def moveToTrash(self, refs: Iterable[DatasetIdRef], transaction: DatastoreTransaction | None) -> None: 

159 """Move dataset location information to trash. 

160 

161 Parameters 

162 ---------- 

163 refs : `~collections.abc.Iterable` of `DatasetIdRef` 

164 References to the datasets. 

165 transaction : `DatastoreTransaction` or `None` 

166 Transaction object. Can be `None` in some bridges or if no rollback 

167 is required. 

168 """ 

169 raise NotImplementedError() 

170 

171 @abstractmethod 

172 def check(self, refs: Iterable[DatasetIdRef]) -> Iterable[DatasetIdRef]: 

173 """Check which refs are listed for this datastore. 

174 

175 Parameters 

176 ---------- 

177 refs : `~collections.abc.Iterable` of `DatasetIdRef` 

178 References to the datasets. 

179 

180 Returns 

181 ------- 

182 present : `~collections.abc.Iterable` [ `DatasetIdRef` ] 

183 Datasets from ``refs`` that are recorded as being in this 

184 datastore. 

185 """ 

186 raise NotImplementedError() 

187 

188 @abstractmethod 

189 def emptyTrash( 

190 self, 

191 records_table: OpaqueTableStorage | None = None, 

192 record_class: type[StoredDatastoreItemInfo] | None = None, 

193 record_column: str | None = None, 

194 selected_ids: Collection[DatasetId] | None = None, 

195 dry_run: bool = False, 

196 ) -> AbstractContextManager[ 

197 tuple[Iterable[tuple[DatasetIdRef, StoredDatastoreItemInfo | None]], set[str] | None] 

198 ]: 

199 """Retrieve all the dataset ref IDs that are in the trash 

200 associated for this datastore, and then remove them if the context 

201 exists without an exception being raised. 

202 

203 Parameters 

204 ---------- 

205 records_table : `OpaqueTableStorage`, optional 

206 Table of records to query with the trash records. 

207 record_class : `type` of `StoredDatastoreItemInfo`, optional 

208 Class to use when reading records from ``records_table``. 

209 record_column : `str`, optional 

210 Name of the column in records_table that refers to the artifact. 

211 selected_ids : `collections.abc.Collection` [ `DatasetId` ] \ 

212 or `None`, optional 

213 If provided, collection of IDs that should be trashed. Only records 

214 within this selection will be yielded and then removed. This 

215 can be used to allow a subset of the trash table to be emptied. 

216 If an empty set is given no artifacts will be trashed. If `None` 

217 the full list from the trash table will be used. 

218 dry_run : `bool`, optional 

219 If `True`, the trash table will be queried and results reported 

220 but no artifacts will be removed. 

221 

222 Yields 

223 ------ 

224 matches : `~collections.abc.Iterable` [`DatasetIdRef`, \ 

225 `StoredDatastoreItemInfo`] 

226 The IDs of datasets that can be safely removed from this datastore 

227 and the corresponding information from the records table. 

228 Can be empty. 

229 artifacts_to_keep : `set` of `str`, optional 

230 Any external artifacts that are known to the table but which should 

231 not be deleted. If `None`, the caller should check themselves. 

232 

233 Examples 

234 -------- 

235 Typical usage by a Datastore is something like:: 

236 

237 with self.bridge.emptyTrash() as trashed: 

238 iter, to_keep = trashed 

239 for ref, info in iter: 

240 # Remove artifacts associated with id, 

241 # raise an exception if something goes wrong. 

242 

243 Notes 

244 ----- 

245 The object yielded by the context manager may be a single-pass 

246 iterator. If multiple passes are required, it should be converted to 

247 a `list` or other container. 

248 

249 Datastores should never raise (except perhaps in testing) when an 

250 artifact cannot be removed only because it is already gone - this 

251 condition is an unavoidable outcome of concurrent delete operations, 

252 and must not be considered and error for those to be safe. 

253 

254 If a table record is provided the trashed records will be deleted 

255 when the context manager completes. 

256 """ 

257 raise NotImplementedError() 

258 

259 datastoreName: str 

260 """The name of the `Datastore` as it should appear in `Registry` tables 

261 (`str`). 

262 """ 

263 

264 

265class DatastoreRegistryBridgeManager(VersionedExtension): 

266 """An abstract base class that defines the interface between `Registry` 

267 and `Datastore` when a new `Datastore` is constructed. 

268 

269 Parameters 

270 ---------- 

271 opaque : `OpaqueTableStorageManager` 

272 Manager object for opaque table storage in the `Registry`. 

273 universe : `DimensionUniverse` 

274 All dimensions know to the `Registry`. 

275 registry_schema_version : `VersionTuple` or `None`, optional 

276 Version of registry schema. 

277 

278 Notes 

279 ----- 

280 Datastores are passed an instance of `DatastoreRegistryBridgeManager` at 

281 construction, and should use it to obtain and keep any of the following: 

282 

283 - a `DatastoreRegistryBridge` instance to record in the `Registry` what is 

284 present in the datastore (needed by all datastores that are not just 

285 forwarders); 

286 

287 - one or more `OpaqueTableStorage` instance if they wish to store internal 

288 records in the `Registry` database; 

289 

290 - the `DimensionUniverse`, if they need it to (e.g.) construct or validate 

291 filename templates. 

292 """ 

293 

294 def __init__( 

295 self, 

296 *, 

297 opaque: OpaqueTableStorageManager, 

298 universe: DimensionUniverse, 

299 registry_schema_version: VersionTuple | None = None, 

300 ): 

301 super().__init__(registry_schema_version=registry_schema_version) 

302 self.opaque = opaque 

303 self.universe = universe 

304 

305 @abstractmethod 

306 def clone( 

307 self, 

308 *, 

309 db: Database, 

310 opaque: OpaqueTableStorageManager, 

311 ) -> DatastoreRegistryBridgeManager: 

312 """Make an independent copy of this manager instance bound to new 

313 instances of `Database` and other managers. 

314 

315 Parameters 

316 ---------- 

317 db : `Database` 

318 New `Database` object to use when instantiating the manager. 

319 opaque : `OpaqueTableStorageManager` 

320 New `OpaqueTableStorageManager` object to use when instantiating 

321 the manager. 

322 

323 Returns 

324 ------- 

325 instance : `DatastoreRegistryBridgeManager` 

326 New manager instance with the same configuration as this instance, 

327 but bound to a new Database object. 

328 """ 

329 raise NotImplementedError() 

330 

331 @classmethod 

332 @abstractmethod 

333 def initialize( 

334 cls, 

335 db: Database, 

336 context: StaticTablesContext, 

337 *, 

338 opaque: OpaqueTableStorageManager, 

339 datasets: type[DatasetRecordStorageManager], 

340 universe: DimensionUniverse, 

341 registry_schema_version: VersionTuple | None = None, 

342 ) -> DatastoreRegistryBridgeManager: 

343 """Construct an instance of the manager. 

344 

345 Parameters 

346 ---------- 

347 db : `Database` 

348 Interface to the underlying database engine and namespace. 

349 context : `StaticTablesContext` 

350 Context object obtained from `Database.declareStaticTables`; used 

351 to declare any tables that should always be present in a layer 

352 implemented with this manager. 

353 opaque : `OpaqueTableStorageManager` 

354 Registry manager object for opaque (to Registry) tables, provided 

355 to allow Datastores to store their internal information inside the 

356 Registry database. 

357 datasets : subclass of `DatasetRecordStorageManager` 

358 Concrete class that will be used to manage the core dataset tables 

359 in this registry; should be used only to create foreign keys to 

360 those tables. 

361 universe : `DimensionUniverse` 

362 All dimensions known to the registry. 

363 registry_schema_version : `VersionTuple` or `None` 

364 Schema version of this extension as defined in registry. 

365 

366 Returns 

367 ------- 

368 manager : `DatastoreRegistryBridgeManager` 

369 An instance of a concrete `DatastoreRegistryBridgeManager` 

370 subclass. 

371 """ 

372 raise NotImplementedError() 

373 

374 @abstractmethod 

375 def refresh(self) -> None: 

376 """Ensure all other operations on this manager are aware of any 

377 collections that may have been registered by other clients since it 

378 was initialized or last refreshed. 

379 """ 

380 raise NotImplementedError() 

381 

382 @abstractmethod 

383 def register(self, name: str, *, ephemeral: bool = True) -> DatastoreRegistryBridge: 

384 """Register a new `Datastore` associated with this `Registry`. 

385 

386 This method should be called by all `Datastore` classes aside from 

387 those that only forward storage to other datastores. 

388 

389 Parameters 

390 ---------- 

391 name : `str` 

392 Name of the datastore, as it should appear in `Registry` tables. 

393 ephemeral : `bool`, optional 

394 If `True` (`False` is default), return a bridge object that is 

395 backed by storage that will not last past the end of the current 

396 process. This should be used whenever the same is true of the 

397 dataset's artifacts. 

398 

399 Returns 

400 ------- 

401 bridge : `DatastoreRegistryBridge` 

402 Object that provides the interface this `Datastore` should use to 

403 communicate with the `Registry`. 

404 """ 

405 raise NotImplementedError() 

406 

407 @abstractmethod 

408 def findDatastores(self, ref: DatasetIdRef) -> Iterable[str]: 

409 """Retrieve datastore locations for a given dataset. 

410 

411 Parameters 

412 ---------- 

413 ref : `DatasetIdRef` 

414 A reference to the dataset for which to retrieve storage 

415 information. 

416 

417 Returns 

418 ------- 

419 datastores : `~collections.abc.Iterable` [ `str` ] 

420 All the matching datastores holding this dataset. Empty if the 

421 dataset does not exist anywhere. 

422 

423 Raises 

424 ------ 

425 AmbiguousDatasetError 

426 Raised if ``ref.id`` is `None`. 

427 """ 

428 raise NotImplementedError() 

429 

430 opaque: OpaqueTableStorageManager 

431 """Registry manager object for opaque (to Registry) tables, provided 

432 to allow Datastores to store their internal information inside the 

433 Registry database. 

434 """ 

435 

436 universe: DimensionUniverse 

437 """All dimensions known to the `Registry`. 

438 """