Coverage for python / lsst / pipe / base / quantum_graph_executor.py: 72%

36 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-28 08:44 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30__all__ = ["QuantumExecutionResult", "QuantumExecutor", "QuantumGraphExecutor"] 

31 

32from abc import ABC, abstractmethod 

33from typing import TYPE_CHECKING, Self 

34 

35from lsst.daf.butler import Quantum 

36 

37from .quantum_reports import QuantumReport, Report 

38 

39if TYPE_CHECKING: 

40 import uuid 

41 

42 from lsst.daf.butler.logging import ButlerLogRecords 

43 

44 from ._task_metadata import TaskMetadata 

45 from .graph import QuantumGraph 

46 from .pipeline_graph import TaskNode 

47 from .quantum_graph import PredictedQuantumGraph 

48 

49 

50class QuantumExecutionResult(tuple[Quantum, QuantumReport | None]): 

51 """A result struct that captures information about a single quantum's 

52 execution. 

53 

54 Parameters 

55 ---------- 

56 quantum : `lsst.daf.butler.Quantum` 

57 Quantum that was executed. 

58 report : `.quantum_reports.QuantumReport` 

59 Report with basic information about the execution. 

60 task_metadata : `TaskMetadata`, optional 

61 Metadata saved by the task and executor during execution. 

62 skipped_existing : `bool`, optional 

63 If `True`, this quantum was not executed because it appeared to have 

64 already been executed successfully. 

65 adjusted_no_work : `bool`, optional 

66 If `True`, this quantum was not executed because the 

67 `PipelineTaskConnections.adjustQuanta` hook raised `NoWorkFound`. 

68 

69 Notes 

70 ----- 

71 For backwards compatibility, this class is a two-element tuple that allows 

72 the ``quantum`` and ``report`` attributes to be unpacked. Additional 

73 regular attributes may be added by executors (but the tuple must remain 

74 only two elements to enable the current unpacking interface). 

75 """ 

76 

77 def __new__( 

78 cls, 

79 quantum: Quantum, 

80 report: QuantumReport | None, 

81 *, 

82 task_metadata: TaskMetadata | None = None, 

83 skipped_existing: bool | None = None, 

84 adjusted_no_work: bool | None = None, 

85 ) -> Self: 

86 return super().__new__(cls, (quantum, report)) 

87 

88 # We need to define both __init__ and __new__ because tuple inheritance 

89 # requires __new__ and numpydoc requires __init__. 

90 

91 def __init__( 

92 self, 

93 quantum: Quantum, 

94 report: QuantumReport | None, 

95 *, 

96 task_metadata: TaskMetadata | None = None, 

97 skipped_existing: bool | None = None, 

98 adjusted_no_work: bool | None = None, 

99 ): 

100 self._task_metadata = task_metadata 

101 self._skipped_existing = skipped_existing 

102 self._adjusted_no_work = adjusted_no_work 

103 

104 @property 

105 def quantum(self) -> Quantum: 

106 """The quantum actually executed.""" 

107 return self[0] 

108 

109 @property 

110 def report(self) -> QuantumReport | None: 

111 """Structure describing the status of the execution of a quantum. 

112 

113 This is `None` if the implementation does not support this feature. 

114 """ 

115 return self[1] 

116 

117 @property 

118 def task_metadata(self) -> TaskMetadata | None: 

119 """Metadata saved by the task and executor during execution.""" 

120 return self._task_metadata 

121 

122 @property 

123 def skipped_existing(self) -> bool | None: 

124 """If `True`, this quantum was not executed because it appeared to have 

125 already been executed successfully. 

126 """ 

127 return self._skipped_existing 

128 

129 @property 

130 def adjusted_no_work(self) -> bool | None: 

131 """If `True`, this quantum was not executed because the 

132 `PipelineTaskConnections.adjustQuanta` hook raised `NoWorkFound`. 

133 """ 

134 return self._adjusted_no_work 

135 

136 

137class QuantumExecutor(ABC): 

138 """Class which abstracts execution of a single Quantum. 

139 

140 In general implementation should not depend on execution model and 

141 execution should always happen in-process. Main reason for existence 

142 of this class is to provide do-nothing implementation that can be used 

143 in the unit tests. 

144 """ 

145 

146 @abstractmethod 

147 def execute( 

148 self, 

149 task_node: TaskNode, 

150 /, 

151 quantum: Quantum, 

152 quantum_id: uuid.UUID | None = None, 

153 *, 

154 log_records: ButlerLogRecords | None = None, 

155 ) -> QuantumExecutionResult: 

156 """Execute single quantum. 

157 

158 Parameters 

159 ---------- 

160 task_node : `~.pipeline_graph.TaskNode` 

161 Task definition structure. 

162 quantum : `~lsst.daf.butler.Quantum` 

163 Quantum for this execution. 

164 quantum_id : `uuid.UUID` or `None`, optional 

165 The ID of the quantum to be executed. 

166 log_records : `lsst.daf.butler.ButlerLogRecords`, optional 

167 Container that should be used to store logs in memory before 

168 writing them to the butler. This disables streaming log (since 

169 we'd have to store them in memory anyway), but it permits the 

170 caller to prepend logs to be stored in the butler and allows task 

171 logs to be inspected by the caller after execution is complete. 

172 

173 Returns 

174 ------- 

175 result : `QuantumExecutionResult` 

176 Result struct. May also be unpacked as a 2-tuple (see type 

177 documentation). 

178 

179 Notes 

180 ----- 

181 Any exception raised by the task or code that wraps task execution is 

182 propagated to the caller of this method. 

183 """ 

184 raise NotImplementedError() 

185 

186 

187class QuantumGraphExecutor(ABC): 

188 """Class which abstracts QuantumGraph execution. 

189 

190 Any specific execution model is implemented in sub-class by overriding 

191 the `execute` method. 

192 """ 

193 

194 @abstractmethod 

195 def execute( 

196 self, graph: QuantumGraph | PredictedQuantumGraph, *, provenance_graph_file: str | None = None 

197 ) -> None: 

198 """Execute whole graph. 

199 

200 Implementation of this method depends on particular execution model 

201 and it has to be provided by a subclass. Execution model determines 

202 what happens here; it can be either actual running of the task or, 

203 for example, generation of the scripts for delayed batch execution. 

204 

205 Parameters 

206 ---------- 

207 graph : `.QuantumGraph` or `.quantum_graph.PredictedQuantumGraph` 

208 Execution graph. 

209 provenance_graph_file : `str`, optional 

210 A filename to write provenance to. 

211 """ 

212 raise NotImplementedError() 

213 

214 def getReport(self) -> Report | None: 

215 """Return execution report from last call to `execute`. 

216 

217 Returns 

218 ------- 

219 report : `~.quantum_reports.Report`, optional 

220 Structure describing the status of the execution of a quantum 

221 graph. `None` is returned if implementation does not support 

222 this feature. 

223 

224 Raises 

225 ------ 

226 RuntimeError 

227 Raised if this method is called before `execute`. 

228 """ 

229 return None