Coverage for python / lsst / pipe / base / blocking_limited_butler.py: 35%

68 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:47 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30__all__ = ["BlockingLimitedButler"] 

31 

32import logging 

33import time 

34from collections.abc import Iterable, Mapping 

35from typing import Any 

36 

37from lsst.daf.butler import ( 

38 ButlerMetrics, 

39 DatasetProvenance, 

40 DatasetRef, 

41 DeferredDatasetHandle, 

42 DimensionUniverse, 

43 LimitedButler, 

44 StorageClass, 

45) 

46 

47_LOG = logging.getLogger(__name__) 

48 

49 

50class BlockingLimitedButler(LimitedButler): 

51 """A `LimitedButler` that blocks until certain dataset types exist. 

52 

53 Parameters 

54 ---------- 

55 wrapped : `LimitedButler` 

56 The butler to wrap. 

57 timeouts : `~collections.abc.Mapping` [ `str`, `float` or `None` ] 

58 Timeouts in seconds to wait for different dataset types. Dataset types 

59 not included not blocked on (i.e. their timeout is ``0.0``). 

60 

61 Notes 

62 ----- 

63 When a timeout is exceeded, `get` will raise `FileNotFoundError` (as usual 

64 for a dataset that does not exist) and `stored_many` will mark the dataset 

65 as non-existent. `getDeferred` does not block. 

66 """ 

67 

68 def __init__( 

69 self, 

70 wrapped: LimitedButler, 

71 timeouts: Mapping[str, float | None], 

72 ): 

73 self._wrapped = wrapped 

74 self._timeouts = timeouts 

75 

76 def close(self) -> None: 

77 self._wrapped.close() 

78 

79 @property 

80 def _metrics(self) -> ButlerMetrics: 

81 # Need to always forward from the wrapped metrics object. 

82 return self._wrapped._metrics 

83 

84 @_metrics.setter 

85 def _metrics(self, metrics: ButlerMetrics) -> None: 

86 # Allow record_metrics() context manager to override the wrapped 

87 # butler. 

88 self._wrapped._metrics = metrics 

89 

90 def get( 

91 self, 

92 ref: DatasetRef, 

93 /, 

94 *, 

95 parameters: dict[str, Any] | None = None, 

96 storageClass: StorageClass | str | None = None, 

97 ) -> Any: 

98 parent_dataset_type_name = ref.datasetType.nameAndComponent()[0] 

99 timeout = self._timeouts.get(parent_dataset_type_name, 0.0) 

100 start = time.time() 

101 while True: 

102 try: 

103 return self._wrapped.get(ref, parameters=parameters, storageClass=storageClass) 

104 except FileNotFoundError as err: 

105 if timeout is not None: 

106 elapsed = time.time() - start 

107 if elapsed > timeout: 

108 err.add_note(f"Timed out after {elapsed:03f}s.") 

109 raise 

110 _LOG.info(f"Dataset {ref.datasetType} not immediately available for {ref.id}, waiting {timeout}s") 

111 time.sleep(0.5) 

112 

113 def getDeferred( 

114 self, 

115 ref: DatasetRef, 

116 /, 

117 *, 

118 parameters: dict[str, Any] | None = None, 

119 storageClass: str | StorageClass | None = None, 

120 ) -> DeferredDatasetHandle: 

121 # note that this does not use the block at all 

122 return self._wrapped.getDeferred(ref, parameters=parameters, storageClass=storageClass) 

123 

124 def stored_many(self, refs: Iterable[DatasetRef]) -> dict[DatasetRef, bool]: 

125 start = time.time() 

126 result = self._wrapped.stored_many(refs) 

127 timeouts = {ref.id: self._timeouts.get(ref.datasetType.nameAndComponent()[0], 0.0) for ref in result} 

128 while True: 

129 elapsed = time.time() - start 

130 remaining: list[DatasetRef] = [] 

131 for ref, exists in result.items(): 

132 timeout = timeouts[ref.id] 

133 if not exists and (timeout is None or elapsed <= timeout): 

134 _LOG.info( 

135 f"Dataset {ref.datasetType} not immediately available for {ref.id}, " 

136 f"waiting {timeout}s" 

137 ) 

138 remaining.append(ref) 

139 if not remaining: 

140 return result 

141 result.update(self._wrapped.stored_many(remaining)) 

142 time.sleep(0.5) 

143 

144 def isWriteable(self) -> bool: 

145 return self._wrapped.isWriteable() 

146 

147 def put(self, obj: Any, ref: DatasetRef, /, *, provenance: DatasetProvenance | None = None) -> DatasetRef: 

148 return self._wrapped.put(obj, ref, provenance=provenance) 

149 

150 def pruneDatasets( 

151 self, 

152 refs: Iterable[DatasetRef], 

153 *, 

154 disassociate: bool = True, 

155 unstore: bool = False, 

156 tags: Iterable[str] = (), 

157 purge: bool = False, 

158 ) -> None: 

159 return self._wrapped.pruneDatasets( 

160 refs, disassociate=disassociate, unstore=unstore, tags=tags, purge=purge 

161 ) 

162 

163 @property 

164 def dimensions(self) -> DimensionUniverse: 

165 return self._wrapped.dimensions 

166 

167 @property 

168 def _datastore(self) -> Any: 

169 return self._wrapped._datastore 

170 

171 @_datastore.setter # demanded by MyPy since we declare it to be an instance attribute in LimitedButler. 

172 def _datastore(self, value: Any) -> None: 

173 self._wrapped._datastore = value