Coverage for python / lsst / pipe / base / blocking_limited_butler.py: 35%
68 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 08:59 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 08:59 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30__all__ = ["BlockingLimitedButler"]
32import logging
33import time
34from collections.abc import Iterable, Mapping
35from typing import Any
37from lsst.daf.butler import (
38 ButlerMetrics,
39 DatasetProvenance,
40 DatasetRef,
41 DeferredDatasetHandle,
42 DimensionUniverse,
43 LimitedButler,
44 StorageClass,
45)
47_LOG = logging.getLogger(__name__)
50class BlockingLimitedButler(LimitedButler):
51 """A `LimitedButler` that blocks until certain dataset types exist.
53 Parameters
54 ----------
55 wrapped : `LimitedButler`
56 The butler to wrap.
57 timeouts : `~collections.abc.Mapping` [ `str`, `float` or `None` ]
58 Timeouts in seconds to wait for different dataset types. Dataset types
59 not included not blocked on (i.e. their timeout is ``0.0``).
61 Notes
62 -----
63 When a timeout is exceeded, `get` will raise `FileNotFoundError` (as usual
64 for a dataset that does not exist) and `stored_many` will mark the dataset
65 as non-existent. `getDeferred` does not block.
66 """
68 def __init__(
69 self,
70 wrapped: LimitedButler,
71 timeouts: Mapping[str, float | None],
72 ):
73 self._wrapped = wrapped
74 self._timeouts = timeouts
76 def close(self) -> None:
77 self._wrapped.close()
79 @property
80 def _metrics(self) -> ButlerMetrics:
81 # Need to always forward from the wrapped metrics object.
82 return self._wrapped._metrics
84 @_metrics.setter
85 def _metrics(self, metrics: ButlerMetrics) -> None:
86 # Allow record_metrics() context manager to override the wrapped
87 # butler.
88 self._wrapped._metrics = metrics
90 def get(
91 self,
92 ref: DatasetRef,
93 /,
94 *,
95 parameters: dict[str, Any] | None = None,
96 storageClass: StorageClass | str | None = None,
97 ) -> Any:
98 parent_dataset_type_name = ref.datasetType.nameAndComponent()[0]
99 timeout = self._timeouts.get(parent_dataset_type_name, 0.0)
100 start = time.time()
101 while True:
102 try:
103 return self._wrapped.get(ref, parameters=parameters, storageClass=storageClass)
104 except FileNotFoundError as err:
105 if timeout is not None:
106 elapsed = time.time() - start
107 if elapsed > timeout:
108 err.add_note(f"Timed out after {elapsed:03f}s.")
109 raise
110 _LOG.info(f"Dataset {ref.datasetType} not immediately available for {ref.id}, waiting {timeout}s")
111 time.sleep(0.5)
113 def getDeferred(
114 self,
115 ref: DatasetRef,
116 /,
117 *,
118 parameters: dict[str, Any] | None = None,
119 storageClass: str | StorageClass | None = None,
120 ) -> DeferredDatasetHandle:
121 # note that this does not use the block at all
122 return self._wrapped.getDeferred(ref, parameters=parameters, storageClass=storageClass)
124 def stored_many(self, refs: Iterable[DatasetRef]) -> dict[DatasetRef, bool]:
125 start = time.time()
126 result = self._wrapped.stored_many(refs)
127 timeouts = {ref.id: self._timeouts.get(ref.datasetType.nameAndComponent()[0], 0.0) for ref in result}
128 while True:
129 elapsed = time.time() - start
130 remaining: list[DatasetRef] = []
131 for ref, exists in result.items():
132 timeout = timeouts[ref.id]
133 if not exists and (timeout is None or elapsed <= timeout):
134 _LOG.info(
135 f"Dataset {ref.datasetType} not immediately available for {ref.id}, "
136 f"waiting {timeout}s"
137 )
138 remaining.append(ref)
139 if not remaining:
140 return result
141 result.update(self._wrapped.stored_many(remaining))
142 time.sleep(0.5)
144 def isWriteable(self) -> bool:
145 return self._wrapped.isWriteable()
147 def put(self, obj: Any, ref: DatasetRef, /, *, provenance: DatasetProvenance | None = None) -> DatasetRef:
148 return self._wrapped.put(obj, ref, provenance=provenance)
150 def pruneDatasets(
151 self,
152 refs: Iterable[DatasetRef],
153 *,
154 disassociate: bool = True,
155 unstore: bool = False,
156 tags: Iterable[str] = (),
157 purge: bool = False,
158 ) -> None:
159 return self._wrapped.pruneDatasets(
160 refs, disassociate=disassociate, unstore=unstore, tags=tags, purge=purge
161 )
163 @property
164 def dimensions(self) -> DimensionUniverse:
165 return self._wrapped.dimensions
167 @property
168 def _datastore(self) -> Any:
169 return self._wrapped._datastore
171 @_datastore.setter # demanded by MyPy since we declare it to be an instance attribute in LimitedButler.
172 def _datastore(self, value: Any) -> None:
173 self._wrapped._datastore = value