Coverage for python / lsst / daf / butler / _limited_butler.py: 62%

80 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-17 08:49 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30__all__ = ("LimitedButler",) 

31 

32import logging 

33from abc import abstractmethod 

34from collections.abc import Iterable, Iterator 

35from contextlib import AbstractContextManager, contextmanager 

36from typing import Any, ClassVar, Literal, Self 

37 

38from lsst.resources import ResourcePath 

39 

40from ._butler_metrics import ButlerMetrics 

41from ._dataset_provenance import DatasetProvenance 

42from ._dataset_ref import DatasetRef 

43from ._deferredDatasetHandle import DeferredDatasetHandle 

44from ._storage_class import StorageClass, StorageClassFactory 

45from .datastore import DatasetRefURIs, Datastore, FileTransferSource 

46from .dimensions import DimensionUniverse 

47 

48log = logging.getLogger(__name__) 

49 

50 

51class LimitedButler(AbstractContextManager): 

52 """A minimal butler interface that is sufficient to back 

53 `~lsst.pipe.base.PipelineTask` execution. 

54 """ 

55 

56 GENERATION: ClassVar[int] = 3 

57 """This is a Generation 3 Butler. 

58 

59 This attribute may be removed in the future, once the Generation 2 Butler 

60 interface has been fully retired; it should only be used in transitional 

61 code. 

62 """ 

63 

64 @abstractmethod 

65 def isWriteable(self) -> bool: 

66 """Return `True` if this `Butler` supports write operations.""" 

67 raise NotImplementedError() 

68 

69 @abstractmethod 

70 def put(self, obj: Any, ref: DatasetRef, /, *, provenance: DatasetProvenance | None = None) -> DatasetRef: 

71 """Store a dataset that already has a UUID and ``RUN`` collection. 

72 

73 Parameters 

74 ---------- 

75 obj : `object` 

76 The dataset. 

77 ref : `DatasetRef` 

78 Resolved reference for a not-yet-stored dataset. 

79 provenance : `DatasetProvenance` or `None`, optional 

80 Any provenance that should be attached to the serialized dataset. 

81 Not supported by all serialization mechanisms. 

82 

83 Returns 

84 ------- 

85 ref : `DatasetRef` 

86 The same as the given, for convenience and symmetry with 

87 `Butler.put`. 

88 

89 Raises 

90 ------ 

91 TypeError 

92 Raised if the butler is read-only. 

93 

94 Notes 

95 ----- 

96 Whether this method inserts the given dataset into a ``Registry`` is 

97 implementation defined (some `LimitedButler` subclasses do not have a 

98 `Registry`), but it always adds the dataset to a `Datastore`, and the 

99 given ``ref.id`` and ``ref.run`` are always preserved. 

100 """ 

101 raise NotImplementedError() 

102 

103 def __enter__(self) -> Self: 

104 return self 

105 

106 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> Literal[False]: 

107 try: 

108 self.close() 

109 except Exception: 

110 log.exception("An exception occurred during Butler.close()") 

111 return False 

112 

113 def close(self) -> None: 

114 """Release all resources associated with this Butler instance. The 

115 instance may no longer be used after this is called. 

116 

117 Notes 

118 ----- 

119 Instead of calling ``close()`` directly, you can use the Butler object 

120 as a context manager. For example:: 

121 

122 with Butler(...) as butler: 

123 butler.get(...) 

124 # butler is closed after exiting the block. 

125 """ 

126 pass 

127 

128 def get( 

129 self, 

130 ref: DatasetRef, 

131 /, 

132 *, 

133 parameters: dict[str, Any] | None = None, 

134 storageClass: StorageClass | str | None = None, 

135 ) -> Any: 

136 """Retrieve a stored dataset. 

137 

138 Parameters 

139 ---------- 

140 ref : `DatasetRef` 

141 A resolved `DatasetRef` directly associated with a dataset. 

142 parameters : `dict` 

143 Additional StorageClass-defined options to control reading, 

144 typically used to efficiently read only a subset of the dataset. 

145 storageClass : `StorageClass` or `str`, optional 

146 The storage class to be used to override the Python type 

147 returned by this method. By default the returned type matches 

148 the dataset type definition for this dataset. Specifying a 

149 read `StorageClass` can force a different type to be returned. 

150 This type must be compatible with the original type. 

151 

152 Returns 

153 ------- 

154 obj : `object` 

155 The dataset. 

156 

157 Notes 

158 ----- 

159 In a `LimitedButler` the only allowable way to specify a dataset is 

160 to use a resolved `DatasetRef`. Subclasses can support more options. 

161 """ 

162 log.debug("Butler get: %s, parameters=%s, storageClass: %s", ref, parameters, storageClass) 

163 with self._metrics.instrument_get(log, msg="Retrieved dataset"): 

164 return self._datastore.get(ref, parameters=parameters, storageClass=storageClass) 

165 

166 def getDeferred( 

167 self, 

168 ref: DatasetRef, 

169 /, 

170 *, 

171 parameters: dict[str, Any] | None = None, 

172 storageClass: str | StorageClass | None = None, 

173 ) -> DeferredDatasetHandle: 

174 """Create a `DeferredDatasetHandle` which can later retrieve a dataset, 

175 after an immediate registry lookup. 

176 

177 Parameters 

178 ---------- 

179 ref : `DatasetRef` 

180 For the default implementation of a `LimitedButler`, the only 

181 acceptable parameter is a resolved `DatasetRef`. 

182 parameters : `dict` 

183 Additional StorageClass-defined options to control reading, 

184 typically used to efficiently read only a subset of the dataset. 

185 storageClass : `StorageClass` or `str`, optional 

186 The storage class to be used to override the Python type 

187 returned by this method. By default the returned type matches 

188 the dataset type definition for this dataset. Specifying a 

189 read `StorageClass` can force a different type to be returned. 

190 This type must be compatible with the original type. 

191 

192 Returns 

193 ------- 

194 obj : `DeferredDatasetHandle` 

195 A handle which can be used to retrieve a dataset at a later time. 

196 

197 Notes 

198 ----- 

199 In a `LimitedButler` the only allowable way to specify a dataset is 

200 to use a resolved `DatasetRef`. Subclasses can support more options. 

201 """ 

202 return DeferredDatasetHandle(butler=self, ref=ref, parameters=parameters, storageClass=storageClass) 

203 

204 def get_datastore_names(self) -> tuple[str, ...]: 

205 """Return the names of the datastores associated with this butler. 

206 

207 Returns 

208 ------- 

209 names : `tuple` [`str`, ...] 

210 The names of the datastores. 

211 """ 

212 return self._datastore.names 

213 

214 def get_datastore_roots(self) -> dict[str, ResourcePath | None]: 

215 """Return the defined root URIs for all registered datastores. 

216 

217 Returns 

218 ------- 

219 roots : `dict` [`str`, `~lsst.resources.ResourcePath` | `None`] 

220 A mapping from datastore name to datastore root URI. The root 

221 can be `None` if the datastore does not have any concept of a root 

222 URI. 

223 """ 

224 return self._datastore.roots 

225 

226 def getURIs( 

227 self, 

228 ref: DatasetRef, 

229 /, 

230 *, 

231 predict: bool = False, 

232 ) -> DatasetRefURIs: 

233 """Return the URIs associated with the dataset. 

234 

235 Parameters 

236 ---------- 

237 ref : `DatasetRef` 

238 A `DatasetRef` for which URIs are requested. 

239 predict : `bool` 

240 If `True`, allow URIs to be returned of datasets that have not 

241 been written. 

242 

243 Returns 

244 ------- 

245 uris : `DatasetRefURIs` 

246 The URI to the primary artifact associated with this dataset (if 

247 the dataset was disassembled within the datastore this may be 

248 `None`), and the URIs to any components associated with the dataset 

249 artifact (can be empty if there are no components). 

250 """ 

251 return self._datastore.getURIs(ref, predict) 

252 

253 def getURI( 

254 self, 

255 ref: DatasetRef, 

256 /, 

257 *, 

258 predict: bool = False, 

259 ) -> ResourcePath: 

260 """Return the URI to the Dataset. 

261 

262 Parameters 

263 ---------- 

264 ref : `DatasetRef` 

265 A `DatasetRef` for which a single URI is requested. 

266 predict : `bool` 

267 If `True`, allow URIs to be returned of datasets that have not 

268 been written. 

269 

270 Returns 

271 ------- 

272 uri : `lsst.resources.ResourcePath` 

273 URI pointing to the Dataset within the datastore. If the 

274 Dataset does not exist in the datastore, and if ``predict`` is 

275 `True`, the URI will be a prediction and will include a URI 

276 fragment "#predicted". 

277 If the datastore does not have entities that relate well 

278 to the concept of a URI the returned URI string will be 

279 descriptive. The returned URI is not guaranteed to be obtainable. 

280 

281 Raises 

282 ------ 

283 RuntimeError 

284 Raised if a URI is requested for a dataset that consists of 

285 multiple artifacts. 

286 """ 

287 primary, components = self.getURIs(ref, predict=predict) 

288 

289 if primary is None or components: 

290 raise RuntimeError( 

291 f"Dataset ({ref}) includes distinct URIs for components. Use LimitedButler.getURIs() instead." 

292 ) 

293 return primary 

294 

295 def get_many_uris( 

296 self, 

297 refs: Iterable[DatasetRef], 

298 predict: bool = False, 

299 allow_missing: bool = False, 

300 ) -> dict[DatasetRef, DatasetRefURIs]: 

301 """Return URIs associated with many datasets. 

302 

303 Parameters 

304 ---------- 

305 refs : `~collections.abc.Iterable` of `DatasetIdRef` 

306 References to the required datasets. 

307 predict : `bool`, optional 

308 If `True`, allow URIs to be returned of datasets that have not 

309 been written. 

310 allow_missing : `bool` 

311 If `False`, and ``predict`` is `False`, will raise if a 

312 `DatasetRef` does not exist. 

313 

314 Returns 

315 ------- 

316 URIs : `dict` of [`DatasetRef`, `DatasetRefURIs`] 

317 A dict of primary and component URIs, indexed by the passed-in 

318 refs. 

319 

320 Raises 

321 ------ 

322 FileNotFoundError 

323 A URI has been requested for a dataset that does not exist and 

324 guessing is not allowed. 

325 

326 Notes 

327 ----- 

328 In file-based datastores, get_many_uris does not check that the file is 

329 present. It assumes that if datastore is aware of the file then it 

330 actually exists. 

331 """ 

332 return self._datastore.getManyURIs(refs, predict=predict, allow_missing=allow_missing) 

333 

334 def stored(self, ref: DatasetRef) -> bool: 

335 """Indicate whether the dataset's artifacts are present in the 

336 Datastore. 

337 

338 Parameters 

339 ---------- 

340 ref : `DatasetRef` 

341 Resolved reference to a dataset. 

342 

343 Returns 

344 ------- 

345 stored : `bool` 

346 Whether the dataset artifact exists in the datastore and can be 

347 retrieved. 

348 """ 

349 return self.stored_many([ref])[ref] 

350 

351 def stored_many( 

352 self, 

353 refs: Iterable[DatasetRef], 

354 ) -> dict[DatasetRef, bool]: 

355 """Check the datastore for artifact existence of multiple datasets 

356 at once. 

357 

358 Parameters 

359 ---------- 

360 refs : `~collections.abc.Iterable` of `DatasetRef` 

361 The datasets to be checked. 

362 

363 Returns 

364 ------- 

365 existence : `dict` of [`DatasetRef`, `bool`] 

366 Mapping from given dataset refs to boolean indicating artifact 

367 existence. 

368 """ 

369 return self._datastore.mexists(refs) 

370 

371 def markInputUnused(self, ref: DatasetRef) -> None: 

372 """Indicate that a predicted input was not actually used when 

373 processing a `Quantum`. 

374 

375 Parameters 

376 ---------- 

377 ref : `DatasetRef` 

378 Reference to the unused dataset. 

379 

380 Notes 

381 ----- 

382 By default, a dataset is considered "actually used" if it is accessed 

383 via `get` or a handle to it is obtained via `getDeferred` 

384 (even if the handle is not used). This method must be called after one 

385 of those in order to remove the dataset from the actual input list. 

386 

387 This method does nothing for butlers that do not store provenance 

388 information (which is the default implementation provided by the base 

389 class). 

390 """ 

391 pass 

392 

393 @abstractmethod 

394 def pruneDatasets( 

395 self, 

396 refs: Iterable[DatasetRef], 

397 *, 

398 disassociate: bool = True, 

399 unstore: bool = False, 

400 tags: Iterable[str] = (), 

401 purge: bool = False, 

402 ) -> None: 

403 """Remove one or more datasets from a collection and/or storage. 

404 

405 Parameters 

406 ---------- 

407 refs : `~collections.abc.Iterable` of `DatasetRef` 

408 Datasets to prune. These must be "resolved" references (not just 

409 a `DatasetType` and data ID). 

410 disassociate : `bool`, optional 

411 Disassociate pruned datasets from ``tags``, or from all collections 

412 if ``purge=True``. 

413 unstore : `bool`, optional 

414 If `True` (`False` is default) remove these datasets from all 

415 datastores known to this butler. Note that this will make it 

416 impossible to retrieve these datasets even via other collections. 

417 Datasets that are already not stored are ignored by this option. 

418 tags : `~collections.abc.Iterable` [ `str` ], optional 

419 `~CollectionType.TAGGED` collections to disassociate the datasets 

420 from. Ignored if ``disassociate`` is `False` or ``purge`` is 

421 `True`. 

422 purge : `bool`, optional 

423 If `True` (`False` is default), completely remove the dataset from 

424 the `Registry`. To prevent accidental deletions, ``purge`` may 

425 only be `True` if all of the following conditions are met: 

426 

427 - ``disassociate`` is `True`; 

428 - ``unstore`` is `True`. 

429 

430 This mode may remove provenance information from datasets other 

431 than those provided, and should be used with extreme care. 

432 

433 Raises 

434 ------ 

435 TypeError 

436 Raised if the butler is read-only, if no collection was provided, 

437 or the conditions for ``purge=True`` were not met. 

438 """ 

439 raise NotImplementedError() 

440 

441 @contextmanager 

442 def record_metrics(self, metrics: ButlerMetrics | None = None) -> Iterator[ButlerMetrics]: 

443 """Enable new metrics recording context. 

444 

445 Parameters 

446 ---------- 

447 metrics : `lsst.daf.butler.ButlerMetrics` 

448 Optional override metrics object. If given, this will be the 

449 same object returned by the context manager. 

450 

451 Yields 

452 ------ 

453 metrics : `lsst.daf.butler.ButlerMetrics` 

454 Metrics recorded within this context. This temporarily replaces 

455 any existing metrics object associated with this butler. 

456 """ 

457 old_metrics = self._metrics 

458 new_metrics = metrics if metrics is not None else ButlerMetrics() 

459 try: 

460 self._metrics = new_metrics 

461 yield new_metrics 

462 finally: 

463 self._metrics = old_metrics 

464 

465 @property 

466 @abstractmethod 

467 def dimensions(self) -> DimensionUniverse: 

468 """Structure managing all dimensions recognized by this data 

469 repository (`DimensionUniverse`). 

470 """ 

471 raise NotImplementedError() 

472 

473 @property 

474 def _file_transfer_source(self) -> FileTransferSource: 

475 """Object that manages the transfer of files between Butler 

476 repositories. 

477 """ 

478 return self._datastore 

479 

480 _datastore: Datastore 

481 """The object that manages actual dataset storage (`Datastore`).""" 

482 

483 storageClasses: StorageClassFactory 

484 """An object that maps known storage class names to objects that fully 

485 describe them (`StorageClassFactory`). 

486 """ 

487 

488 _metrics: ButlerMetrics 

489 """An object for recording metrics associated with this butler. 

490 (`ButlerMetrics`) 

491 """