Coverage for python / lsst / ctrl / mpexec / singleQuantumExecutor.py: 65%

54 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-26 09:00 +0000

1# This file is part of ctrl_mpexec. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30__all__ = ("SingleQuantumExecutor",) 

31 

32import uuid 

33from collections.abc import Callable, Mapping 

34from typing import TYPE_CHECKING, Any 

35 

36from deprecated.sphinx import deprecated 

37 

38import lsst.pipe.base.single_quantum_executor 

39 

40if TYPE_CHECKING: 

41 from lsst.daf.butler import Butler, ButlerMetrics, LimitedButler, Quantum 

42 from lsst.pipe.base import ExecutionResources, PipelineTask, QuantumSuccessCaveats, TaskFactory 

43 from lsst.pipe.base.log_capture import _ExecutionLogRecordsExtra 

44 from lsst.pipe.base.pipeline_graph import TaskNode 

45 

46 

47# TODO[DM-51962]: Remove this module. 

48@deprecated( 

49 "The SingleQuantumExecutor class has moved to lsst.pipe.base.single_quantum_executor. " 

50 "This forwarding shim will be removed after v30.", 

51 version="v30", 

52 category=FutureWarning, 

53) 

54class SingleQuantumExecutor(lsst.pipe.base.single_quantum_executor.SingleQuantumExecutor): 

55 """Executor class which runs one Quantum at a time. 

56 

57 Parameters 

58 ---------- 

59 butler : `~lsst.daf.butler.Butler` or `None` 

60 Data butler, `None` means that Quantum-backed butler should be used 

61 instead. 

62 taskFactory : `~lsst.pipe.base.TaskFactory` 

63 Instance of a task factory. 

64 skipExistingIn : `~typing.Any` 

65 Expressions representing the collections to search for existing output 

66 datasets. See :ref:`daf_butler_ordered_collection_searches` for allowed 

67 types. This class only checks for the presence of butler output run in 

68 the list of collections. If the output run is present in the list then 

69 the quanta whose complete outputs exist in the output run will be 

70 skipped. `None` or empty string/sequence disables skipping. 

71 clobberOutputs : `bool`, optional 

72 If `True`, then outputs from a quantum that exist in output run 

73 collection will be removed prior to executing a quantum. If 

74 ``skipExistingIn`` contains output run, then only partial outputs from 

75 a quantum will be removed. Only used when ``butler`` is not `None`. 

76 enableLsstDebug : `bool`, optional 

77 Enable debugging with ``lsstDebug`` facility for a task. 

78 limited_butler_factory : `~collections.abc.Callable`, optional 

79 A method that creates a `~lsst.daf.butler.LimitedButler` instance for a 

80 given Quantum. This parameter must be defined if ``butler`` is `None`. 

81 If ``butler`` is not `None` then this parameter is ignored. 

82 resources : `~lsst.pipe.base.ExecutionResources`, optional 

83 The resources available to this quantum when executing. 

84 skipExisting : `bool`, optional 

85 If `True`, skip quanta whose metadata datasets are already stored. 

86 Unlike ``skipExistingIn``, this works with limited butlers as well as 

87 full butlers. Always set to `True` if ``skipExistingIn`` matches 

88 ``butler.run``. 

89 assumeNoExistingOutputs : `bool`, optional 

90 If `True`, assume preexisting outputs are impossible (e.g. because this 

91 is known by higher-level code to be a new ``RUN`` collection), and do 

92 not look for them. This causes the ``skipExisting`` and 

93 ``clobberOutputs`` options to be ignored, but unlike just setting both 

94 of those to `False`, it also avoids all dataset existence checks. 

95 raise_on_partial_outputs : `bool`, optional 

96 If `True` raise exceptions chained by 

97 `lsst.pipe.base.AnnotatedPartialOutputsError` immediately, instead of 

98 considering the partial result a success and continuing to run 

99 downstream tasks. 

100 job_metadata : `~collections.abc.Mapping` 

101 Mapping with extra metadata to embed within the quantum metadata under 

102 the "job" key. This is intended to correspond to information common 

103 to all quanta being executed in a single process, such as the time 

104 taken to load the quantum graph in a BPS job. 

105 

106 Notes 

107 ----- 

108 This is a deprecated backwards-compatibility shim for 

109 `lsst.pipe.base.single_quantum_executor.SingleQuantumExecutor`, which has 

110 the same functionality with very minor interface changes. 

111 

112 """ 

113 

114 def __init__( 

115 self, 

116 butler: Butler | None, 

117 taskFactory: TaskFactory, 

118 skipExistingIn: Any = None, 

119 clobberOutputs: bool = False, 

120 enableLsstDebug: bool = False, 

121 limited_butler_factory: Callable[[Quantum], LimitedButler] | None = None, 

122 resources: ExecutionResources | None = None, 

123 skipExisting: bool = False, 

124 assumeNoExistingOutputs: bool = False, 

125 raise_on_partial_outputs: bool = True, 

126 job_metadata: Mapping[str, int | str | float] | None = None, 

127 ): 

128 super().__init__( 

129 butler=butler, 

130 task_factory=taskFactory, 

131 skip_existing_in=skipExistingIn, 

132 clobber_outputs=clobberOutputs, 

133 enable_lsst_debug=enableLsstDebug, 

134 limited_butler_factory=limited_butler_factory, 

135 resources=resources, 

136 skip_existing=skipExisting, 

137 assume_no_existing_outputs=assumeNoExistingOutputs, 

138 raise_on_partial_outputs=raise_on_partial_outputs, 

139 job_metadata=job_metadata, 

140 ) 

141 

142 def checkExistingOutputs( 

143 self, 

144 quantum: Quantum, 

145 task_node: TaskNode, 

146 /, 

147 limited_butler: LimitedButler, 

148 log_extra: _ExecutionLogRecordsExtra, 

149 ) -> bool: 

150 return super()._check_existing_outputs( 

151 quantum, task_node, limited_butler=limited_butler, log_extra=log_extra 

152 ) 

153 

154 def updatedQuantumInputs( 

155 self, quantum: Quantum, task_node: TaskNode, /, limited_butler: LimitedButler 

156 ) -> Quantum: 

157 return super()._updated_quantum_inputs(quantum, task_node, limited_butler=limited_butler) 

158 

159 def runQuantum( 

160 self, 

161 task: PipelineTask, 

162 quantum: Quantum, 

163 task_node: TaskNode, 

164 /, 

165 limited_butler: LimitedButler, 

166 quantum_id: uuid.UUID | None = None, 

167 ) -> tuple[QuantumSuccessCaveats, list[uuid.UUID], ButlerMetrics]: 

168 ids_put: list[uuid.UUID] = [] 

169 with limited_butler.record_metrics() as butler_metrics: 

170 quantum_success_caveats = super()._run_quantum( 

171 task, 

172 quantum, 

173 task_node, 

174 limited_butler=limited_butler, 

175 quantum_id=quantum_id, 

176 ids_put=ids_put, 

177 ) 

178 return quantum_success_caveats, ids_put, butler_metrics 

179 

180 def writeMetadata( 

181 self, quantum: Quantum, metadata: Any, task_node: TaskNode, /, limited_butler: LimitedButler 

182 ) -> None: 

183 return super()._write_metadata(quantum, metadata, task_node, limited_butler=limited_butler) 

184 

185 def initGlobals(self, quantum: Quantum) -> None: 

186 pass 

187 

188 @property 

189 def butler(self) -> Butler | None: 

190 return self._butler 

191 

192 @property 

193 def taskFactory(self) -> TaskFactory: 

194 return self._task_factory 

195 

196 @property 

197 def clobberOutputs(self) -> bool: 

198 return self._clobber_outputs 

199 

200 @property 

201 def enableLsstDebug(self) -> bool: 

202 return self._enable_lsst_debug 

203 

204 @property 

205 def limited_butler_factory(self) -> Callable[[Quantum], LimitedButler] | None: 

206 return self._limited_butler_factory 

207 

208 @property 

209 def resources(self) -> ExecutionResources | None: 

210 return self._resources 

211 

212 @property 

213 def assumeNoExistingOutputs(self) -> bool: 

214 return self._assume_no_existing_outputs 

215 

216 @property 

217 def raise_on_partial_outputs(self) -> bool: 

218 return self._raise_on_partial_outputs 

219 

220 @property 

221 def job_metadata(self) -> Mapping[str, int | str | float] | None: 

222 return self._job_metadata 

223 

224 @property 

225 def skipExisting(self) -> bool: 

226 return self._skip_existing