Coverage for python / lsst / pipe / base / quantum_graph_executor.py: 72%
36 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 08:59 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 08:59 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30__all__ = ["QuantumExecutionResult", "QuantumExecutor", "QuantumGraphExecutor"]
32from abc import ABC, abstractmethod
33from typing import TYPE_CHECKING, Self
35from lsst.daf.butler import Quantum
37from .quantum_reports import QuantumReport, Report
39if TYPE_CHECKING:
40 import uuid
42 from lsst.daf.butler.logging import ButlerLogRecords
44 from ._task_metadata import TaskMetadata
45 from .graph import QuantumGraph
46 from .pipeline_graph import TaskNode
47 from .quantum_graph import PredictedQuantumGraph
50class QuantumExecutionResult(tuple[Quantum, QuantumReport | None]):
51 """A result struct that captures information about a single quantum's
52 execution.
54 Parameters
55 ----------
56 quantum : `lsst.daf.butler.Quantum`
57 Quantum that was executed.
58 report : `.quantum_reports.QuantumReport`
59 Report with basic information about the execution.
60 task_metadata : `TaskMetadata`, optional
61 Metadata saved by the task and executor during execution.
62 skipped_existing : `bool`, optional
63 If `True`, this quantum was not executed because it appeared to have
64 already been executed successfully.
65 adjusted_no_work : `bool`, optional
66 If `True`, this quantum was not executed because the
67 `PipelineTaskConnections.adjustQuanta` hook raised `NoWorkFound`.
69 Notes
70 -----
71 For backwards compatibility, this class is a two-element tuple that allows
72 the ``quantum`` and ``report`` attributes to be unpacked. Additional
73 regular attributes may be added by executors (but the tuple must remain
74 only two elements to enable the current unpacking interface).
75 """
77 def __new__(
78 cls,
79 quantum: Quantum,
80 report: QuantumReport | None,
81 *,
82 task_metadata: TaskMetadata | None = None,
83 skipped_existing: bool | None = None,
84 adjusted_no_work: bool | None = None,
85 ) -> Self:
86 return super().__new__(cls, (quantum, report))
88 # We need to define both __init__ and __new__ because tuple inheritance
89 # requires __new__ and numpydoc requires __init__.
91 def __init__(
92 self,
93 quantum: Quantum,
94 report: QuantumReport | None,
95 *,
96 task_metadata: TaskMetadata | None = None,
97 skipped_existing: bool | None = None,
98 adjusted_no_work: bool | None = None,
99 ):
100 self._task_metadata = task_metadata
101 self._skipped_existing = skipped_existing
102 self._adjusted_no_work = adjusted_no_work
104 @property
105 def quantum(self) -> Quantum:
106 """The quantum actually executed."""
107 return self[0]
109 @property
110 def report(self) -> QuantumReport | None:
111 """Structure describing the status of the execution of a quantum.
113 This is `None` if the implementation does not support this feature.
114 """
115 return self[1]
117 @property
118 def task_metadata(self) -> TaskMetadata | None:
119 """Metadata saved by the task and executor during execution."""
120 return self._task_metadata
122 @property
123 def skipped_existing(self) -> bool | None:
124 """If `True`, this quantum was not executed because it appeared to have
125 already been executed successfully.
126 """
127 return self._skipped_existing
129 @property
130 def adjusted_no_work(self) -> bool | None:
131 """If `True`, this quantum was not executed because the
132 `PipelineTaskConnections.adjustQuanta` hook raised `NoWorkFound`.
133 """
134 return self._adjusted_no_work
137class QuantumExecutor(ABC):
138 """Class which abstracts execution of a single Quantum.
140 In general implementation should not depend on execution model and
141 execution should always happen in-process. Main reason for existence
142 of this class is to provide do-nothing implementation that can be used
143 in the unit tests.
144 """
146 @abstractmethod
147 def execute(
148 self,
149 task_node: TaskNode,
150 /,
151 quantum: Quantum,
152 quantum_id: uuid.UUID | None = None,
153 *,
154 log_records: ButlerLogRecords | None = None,
155 ) -> QuantumExecutionResult:
156 """Execute single quantum.
158 Parameters
159 ----------
160 task_node : `~.pipeline_graph.TaskNode`
161 Task definition structure.
162 quantum : `~lsst.daf.butler.Quantum`
163 Quantum for this execution.
164 quantum_id : `uuid.UUID` or `None`, optional
165 The ID of the quantum to be executed.
166 log_records : `lsst.daf.butler.ButlerLogRecords`, optional
167 Container that should be used to store logs in memory before
168 writing them to the butler. This disables streaming log (since
169 we'd have to store them in memory anyway), but it permits the
170 caller to prepend logs to be stored in the butler and allows task
171 logs to be inspected by the caller after execution is complete.
173 Returns
174 -------
175 result : `QuantumExecutionResult`
176 Result struct. May also be unpacked as a 2-tuple (see type
177 documentation).
179 Notes
180 -----
181 Any exception raised by the task or code that wraps task execution is
182 propagated to the caller of this method.
183 """
184 raise NotImplementedError()
187class QuantumGraphExecutor(ABC):
188 """Class which abstracts QuantumGraph execution.
190 Any specific execution model is implemented in sub-class by overriding
191 the `execute` method.
192 """
194 @abstractmethod
195 def execute(
196 self, graph: QuantumGraph | PredictedQuantumGraph, *, provenance_graph_file: str | None = None
197 ) -> None:
198 """Execute whole graph.
200 Implementation of this method depends on particular execution model
201 and it has to be provided by a subclass. Execution model determines
202 what happens here; it can be either actual running of the task or,
203 for example, generation of the scripts for delayed batch execution.
205 Parameters
206 ----------
207 graph : `.QuantumGraph` or `.quantum_graph.PredictedQuantumGraph`
208 Execution graph.
209 provenance_graph_file : `str`, optional
210 A filename to write provenance to.
211 """
212 raise NotImplementedError()
214 def getReport(self) -> Report | None:
215 """Return execution report from last call to `execute`.
217 Returns
218 -------
219 report : `~.quantum_reports.Report`, optional
220 Structure describing the status of the execution of a quantum
221 graph. `None` is returned if implementation does not support
222 this feature.
224 Raises
225 ------
226 RuntimeError
227 Raised if this method is called before `execute`.
228 """
229 return None