Coverage for python / lsst / ctrl / mpexec / singleQuantumExecutor.py: 65%
54 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 08:57 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 08:57 +0000
1# This file is part of ctrl_mpexec.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30__all__ = ("SingleQuantumExecutor",)
32import uuid
33from collections.abc import Callable, Mapping
34from typing import TYPE_CHECKING, Any
36from deprecated.sphinx import deprecated
38import lsst.pipe.base.single_quantum_executor
40if TYPE_CHECKING:
41 from lsst.daf.butler import Butler, ButlerMetrics, LimitedButler, Quantum
42 from lsst.pipe.base import ExecutionResources, PipelineTask, QuantumSuccessCaveats, TaskFactory
43 from lsst.pipe.base.log_capture import _ExecutionLogRecordsExtra
44 from lsst.pipe.base.pipeline_graph import TaskNode
47# TODO[DM-51962]: Remove this module.
48@deprecated(
49 "The SingleQuantumExecutor class has moved to lsst.pipe.base.single_quantum_executor. "
50 "This forwarding shim will be removed after v30.",
51 version="v30",
52 category=FutureWarning,
53)
54class SingleQuantumExecutor(lsst.pipe.base.single_quantum_executor.SingleQuantumExecutor):
55 """Executor class which runs one Quantum at a time.
57 Parameters
58 ----------
59 butler : `~lsst.daf.butler.Butler` or `None`
60 Data butler, `None` means that Quantum-backed butler should be used
61 instead.
62 taskFactory : `~lsst.pipe.base.TaskFactory`
63 Instance of a task factory.
64 skipExistingIn : `~typing.Any`
65 Expressions representing the collections to search for existing output
66 datasets. See :ref:`daf_butler_ordered_collection_searches` for allowed
67 types. This class only checks for the presence of butler output run in
68 the list of collections. If the output run is present in the list then
69 the quanta whose complete outputs exist in the output run will be
70 skipped. `None` or empty string/sequence disables skipping.
71 clobberOutputs : `bool`, optional
72 If `True`, then outputs from a quantum that exist in output run
73 collection will be removed prior to executing a quantum. If
74 ``skipExistingIn`` contains output run, then only partial outputs from
75 a quantum will be removed. Only used when ``butler`` is not `None`.
76 enableLsstDebug : `bool`, optional
77 Enable debugging with ``lsstDebug`` facility for a task.
78 limited_butler_factory : `~collections.abc.Callable`, optional
79 A method that creates a `~lsst.daf.butler.LimitedButler` instance for a
80 given Quantum. This parameter must be defined if ``butler`` is `None`.
81 If ``butler`` is not `None` then this parameter is ignored.
82 resources : `~lsst.pipe.base.ExecutionResources`, optional
83 The resources available to this quantum when executing.
84 skipExisting : `bool`, optional
85 If `True`, skip quanta whose metadata datasets are already stored.
86 Unlike ``skipExistingIn``, this works with limited butlers as well as
87 full butlers. Always set to `True` if ``skipExistingIn`` matches
88 ``butler.run``.
89 assumeNoExistingOutputs : `bool`, optional
90 If `True`, assume preexisting outputs are impossible (e.g. because this
91 is known by higher-level code to be a new ``RUN`` collection), and do
92 not look for them. This causes the ``skipExisting`` and
93 ``clobberOutputs`` options to be ignored, but unlike just setting both
94 of those to `False`, it also avoids all dataset existence checks.
95 raise_on_partial_outputs : `bool`, optional
96 If `True` raise exceptions chained by
97 `lsst.pipe.base.AnnotatedPartialOutputsError` immediately, instead of
98 considering the partial result a success and continuing to run
99 downstream tasks.
100 job_metadata : `~collections.abc.Mapping`
101 Mapping with extra metadata to embed within the quantum metadata under
102 the "job" key. This is intended to correspond to information common
103 to all quanta being executed in a single process, such as the time
104 taken to load the quantum graph in a BPS job.
106 Notes
107 -----
108 This is a deprecated backwards-compatibility shim for
109 `lsst.pipe.base.single_quantum_executor.SingleQuantumExecutor`, which has
110 the same functionality with very minor interface changes.
112 """
114 def __init__(
115 self,
116 butler: Butler | None,
117 taskFactory: TaskFactory,
118 skipExistingIn: Any = None,
119 clobberOutputs: bool = False,
120 enableLsstDebug: bool = False,
121 limited_butler_factory: Callable[[Quantum], LimitedButler] | None = None,
122 resources: ExecutionResources | None = None,
123 skipExisting: bool = False,
124 assumeNoExistingOutputs: bool = False,
125 raise_on_partial_outputs: bool = True,
126 job_metadata: Mapping[str, int | str | float] | None = None,
127 ):
128 super().__init__(
129 butler=butler,
130 task_factory=taskFactory,
131 skip_existing_in=skipExistingIn,
132 clobber_outputs=clobberOutputs,
133 enable_lsst_debug=enableLsstDebug,
134 limited_butler_factory=limited_butler_factory,
135 resources=resources,
136 skip_existing=skipExisting,
137 assume_no_existing_outputs=assumeNoExistingOutputs,
138 raise_on_partial_outputs=raise_on_partial_outputs,
139 job_metadata=job_metadata,
140 )
142 def checkExistingOutputs(
143 self,
144 quantum: Quantum,
145 task_node: TaskNode,
146 /,
147 limited_butler: LimitedButler,
148 log_extra: _ExecutionLogRecordsExtra,
149 ) -> bool:
150 return super()._check_existing_outputs(
151 quantum, task_node, limited_butler=limited_butler, log_extra=log_extra
152 )
154 def updatedQuantumInputs(
155 self, quantum: Quantum, task_node: TaskNode, /, limited_butler: LimitedButler
156 ) -> Quantum:
157 return super()._updated_quantum_inputs(quantum, task_node, limited_butler=limited_butler)
159 def runQuantum(
160 self,
161 task: PipelineTask,
162 quantum: Quantum,
163 task_node: TaskNode,
164 /,
165 limited_butler: LimitedButler,
166 quantum_id: uuid.UUID | None = None,
167 ) -> tuple[QuantumSuccessCaveats, list[uuid.UUID], ButlerMetrics]:
168 ids_put: list[uuid.UUID] = []
169 with limited_butler.record_metrics() as butler_metrics:
170 quantum_success_caveats = super()._run_quantum(
171 task,
172 quantum,
173 task_node,
174 limited_butler=limited_butler,
175 quantum_id=quantum_id,
176 ids_put=ids_put,
177 )
178 return quantum_success_caveats, ids_put, butler_metrics
180 def writeMetadata(
181 self, quantum: Quantum, metadata: Any, task_node: TaskNode, /, limited_butler: LimitedButler
182 ) -> None:
183 return super()._write_metadata(quantum, metadata, task_node, limited_butler=limited_butler)
185 def initGlobals(self, quantum: Quantum) -> None:
186 pass
188 @property
189 def butler(self) -> Butler | None:
190 return self._butler
192 @property
193 def taskFactory(self) -> TaskFactory:
194 return self._task_factory
196 @property
197 def clobberOutputs(self) -> bool:
198 return self._clobber_outputs
200 @property
201 def enableLsstDebug(self) -> bool:
202 return self._enable_lsst_debug
204 @property
205 def limited_butler_factory(self) -> Callable[[Quantum], LimitedButler] | None:
206 return self._limited_butler_factory
208 @property
209 def resources(self) -> ExecutionResources | None:
210 return self._resources
212 @property
213 def assumeNoExistingOutputs(self) -> bool:
214 return self._assume_no_existing_outputs
216 @property
217 def raise_on_partial_outputs(self) -> bool:
218 return self._raise_on_partial_outputs
220 @property
221 def job_metadata(self) -> Mapping[str, int | str | float] | None:
222 return self._job_metadata
224 @property
225 def skipExisting(self) -> bool:
226 return self._skip_existing