Coverage for python / lsst / pipe / base / separable_pipeline_executor.py: 33%
73 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 08:59 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 08:59 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
29from __future__ import annotations
31__all__ = [
32 "SeparablePipelineExecutor",
33]
36import datetime
37import getpass
38import logging
39from collections.abc import Iterable
40from typing import Any
42import lsst.resources
43from lsst.daf.butler import Butler, DatasetRef
44from lsst.daf.butler._rubin.temporary_for_ingest import TemporaryForIngest
46from ._quantumContext import ExecutionResources
47from .all_dimensions_quantum_graph_builder import AllDimensionsQuantumGraphBuilder
48from .graph import QuantumGraph
49from .mp_graph_executor import MPGraphExecutor, MPGraphExecutorError
50from .pipeline import Pipeline
51from .quantum_graph import PredictedQuantumGraph
52from .quantum_graph_builder import QuantumGraphBuilder
53from .quantum_graph_executor import QuantumGraphExecutor
54from .single_quantum_executor import SingleQuantumExecutor
55from .taskFactory import TaskFactory
57_LOG = logging.getLogger(__name__)
60class SeparablePipelineExecutor:
61 """An executor that allows each step of pipeline execution to be
62 run independently.
64 The executor can run any or all of the following steps:
66 * pre-execution initialization
67 * pipeline building
68 * quantum graph generation
69 * quantum graph execution
71 Any of these steps can also be handed off to external code without
72 compromising the remaining ones.
74 Parameters
75 ----------
76 butler : `lsst.daf.butler.Butler`
77 A Butler whose ``collections`` and ``run`` attributes contain the input
78 and output collections to use for processing.
79 clobber_output : `bool`, optional
80 If set, the pipeline execution overwrites existing output files.
81 Otherwise, any conflict between existing and new outputs is an error.
82 skip_existing_in : `~collections.abc.Iterable` [`str`], optional
83 If not empty, the pipeline execution searches the listed collections
84 for existing outputs, and skips any quanta that have run to completion
85 (or have no work to do). Otherwise, all tasks are attempted (subject to
86 ``clobber_output``).
87 task_factory : `.TaskFactory`, optional
88 A custom task factory for use in pre-execution and execution. By
89 default, a new instance of `.TaskFactory` is used.
90 resources : `.ExecutionResources`
91 The resources available to each quantum being executed.
92 raise_on_partial_outputs : `bool`, optional
93 If `True` raise exceptions chained by
94 `.AnnotatedPartialOutputsError` immediately, instead of
95 considering the partial result a success and continuing to run
96 downstream tasks.
97 """
99 def __init__(
100 self,
101 butler: Butler,
102 clobber_output: bool = False,
103 skip_existing_in: Iterable[str] | None = None,
104 task_factory: TaskFactory | None = None,
105 resources: ExecutionResources | None = None,
106 raise_on_partial_outputs: bool = True,
107 ):
108 self._butler = Butler.from_config(
109 butler=butler, collections=butler.collections.defaults, run=butler.run
110 )
111 if not self._butler.collections.defaults:
112 raise ValueError("Butler must specify input collections for pipeline.")
113 if not self._butler.run:
114 raise ValueError("Butler must specify output run for pipeline.")
116 self._clobber_output = clobber_output
117 self._skip_existing_in = list(skip_existing_in) if skip_existing_in else []
119 self._task_factory = task_factory if task_factory else TaskFactory()
120 self.resources = resources
121 self.raise_on_partial_outputs = raise_on_partial_outputs
123 def pre_execute_qgraph(
124 self,
125 graph: QuantumGraph | PredictedQuantumGraph,
126 register_dataset_types: bool = False,
127 save_init_outputs: bool = True,
128 save_versions: bool = True,
129 ) -> None:
130 """Run pre-execution initialization.
132 This method will be deprecated after DM-38041, to be replaced with a
133 method that takes either a `.Pipeline` or a
134 resolved `.pipeline_graph.PipelineGraph` instead of a `.QuantumGraph`.
136 Parameters
137 ----------
138 graph : `.QuantumGraph` or `.quantum_graph.PredictedQuantumGraph`
139 The quantum graph defining the pipeline and datasets to
140 be initialized.
141 register_dataset_types : `bool`, optional
142 If `True`, register all output dataset types from the pipeline
143 represented by ``graph``.
144 save_init_outputs : `bool`, optional
145 If `True`, create init-output datasets in this object's output run.
146 save_versions : `bool`, optional
147 If `True`, save a package versions dataset.
148 """
149 if register_dataset_types:
150 graph.pipeline_graph.register_dataset_types(self._butler, include_packages=save_versions)
151 if save_init_outputs:
152 graph.write_init_outputs(self._butler, skip_existing=(self._butler.run in self._skip_existing_in))
153 graph.write_configs(self._butler)
154 if save_versions:
155 graph.write_packages(self._butler)
157 def make_pipeline(self, pipeline_uri: str | lsst.resources.ResourcePath) -> Pipeline:
158 """Build a pipeline from pipeline and configuration information.
160 Parameters
161 ----------
162 pipeline_uri : `str` or `lsst.resources.ResourcePath`
163 URI to a file containing a pipeline definition. A URI fragment may
164 be used to specify a subset of the pipeline, as described in
165 :ref:`pipeline-running-intro`.
167 Returns
168 -------
169 pipeline : `.Pipeline`
170 The fully-built pipeline.
171 """
172 return Pipeline.from_uri(pipeline_uri)
174 def make_quantum_graph_builder(
175 self,
176 pipeline: Pipeline,
177 where: str = "",
178 *,
179 builder_class: type[QuantumGraphBuilder] = AllDimensionsQuantumGraphBuilder,
180 **kwargs: Any,
181 ) -> QuantumGraphBuilder:
182 """Initialize a quantum graph builder from a pipeline and input
183 datasets.
185 Parameters
186 ----------
187 pipeline : `.Pipeline`
188 The pipeline for which to generate a quantum graph.
189 where : `str`, optional
190 A data ID query that constrains the quanta generated. Must not be
191 provided if a custom ``builder_class`` is given and that class does
192 not accept ``where`` as a construction argument.
193 builder_class : `type` [ \
194 `.quantum_graph_builder.QuantumGraphBuilder` ], optional
195 Quantum graph builder implementation. Ignored if ``builder`` is
196 provided.
197 **kwargs
198 Additional keyword arguments are forwarded to ``builder_class``
199 when a quantum graph builder instance is constructed. All
200 arguments accepted by the
201 `~.quantum_graph_builder.QuantumGraphBuilder` base
202 class are provided automatically (from explicit arguments to this
203 method and executor attributes) and do not need to be included
204 as keyword arguments.
206 Returns
207 -------
208 builder : `.quantum_graph_builder.QuantumGraphBuilder`
209 A quantum graph builder.
210 """
211 if where:
212 # Only pass 'where' if it's actually provided, since some
213 # QuantumGraphBuilder subclasses may not accept it.
214 kwargs["where"] = where
215 return builder_class(
216 pipeline.to_graph(),
217 self._butler,
218 skip_existing_in=self._skip_existing_in,
219 clobber=self._clobber_output,
220 **kwargs,
221 )
223 def make_quantum_graph(
224 self,
225 pipeline: Pipeline,
226 where: str = "",
227 *,
228 builder_class: type[QuantumGraphBuilder] = AllDimensionsQuantumGraphBuilder,
229 attach_datastore_records: bool = False,
230 **kwargs: Any,
231 ) -> QuantumGraph:
232 """Build a quantum graph from a pipeline and input datasets.
234 This returns an instance of the old `.QuantumGraph` class. Use
235 `build_quantum_graph` to construct a
236 `.quantum_graph.PredictedQuantumGraph`.
238 Parameters
239 ----------
240 pipeline : `.Pipeline`
241 The pipeline for which to generate a quantum graph.
242 where : `str`, optional
243 A data ID query that constrains the quanta generated. Must not be
244 provided if a custom ``builder_class`` is given and that class does
245 not accept ``where`` as a construction argument.
246 builder_class : `type` [ \
247 `.quantum_graph_builder.QuantumGraphBuilder` ], optional
248 Quantum graph builder implementation. Ignored if ``builder`` is
249 provided.
250 attach_datastore_records : `bool`, optional
251 Whether to attach datastore records. These are currently used only
252 by `lsst.daf.butler.QuantumBackedButler`, which is not used by
253 `SeparablePipelineExecutor` for execution.
254 **kwargs
255 Additional keyword arguments are forwarded to ``builder_class``
256 when a quantum graph builder instance is constructed. All
257 arguments accepted by the
258 `~.quantum_graph_builder.QuantumGraphBuilder` base
259 class are provided automatically (from explicit arguments to this
260 method and executor attributes) and do not need to be included
261 as keyword arguments.
263 Returns
264 -------
265 graph : `.QuantumGraph`
266 The quantum graph for ``.Pipeline`` as run on the datasets
267 identified by ``where``.
269 Notes
270 -----
271 This method does no special handling of empty quantum graphs. If
272 needed, clients can use `len` to test if the returned graph is empty.
273 """
274 metadata = {
275 "input": self._butler.collections.defaults,
276 "output_run": self._butler.run,
277 "skip_existing_in": self._skip_existing_in,
278 "skip_existing": bool(self._skip_existing_in),
279 "data_query": where,
280 "user": getpass.getuser(),
281 "time": str(datetime.datetime.now()),
282 }
283 qg_builder = self.make_quantum_graph_builder(pipeline, where, builder_class=builder_class, **kwargs)
284 graph = qg_builder.build(metadata=metadata, attach_datastore_records=attach_datastore_records)
285 _LOG.info(
286 "QuantumGraph contains %d quanta for %d tasks, graph ID: %r",
287 len(graph),
288 len(graph.taskGraph),
289 graph.graphID,
290 )
291 return graph
293 def build_quantum_graph(
294 self,
295 pipeline: Pipeline,
296 where: str = "",
297 *,
298 builder_class: type[QuantumGraphBuilder] = AllDimensionsQuantumGraphBuilder,
299 attach_datastore_records: bool = False,
300 **kwargs: Any,
301 ) -> PredictedQuantumGraph:
302 """Build a quantum graph from a pipeline and input datasets.
304 This returns an instance of the new
305 `.quantum_graph.PredictedQuantumGraph` class. Use `make_quantum_graph`
306 to construct a `.QuantumGraph`.
308 Parameters
309 ----------
310 pipeline : `.Pipeline`
311 The pipeline for which to generate a quantum graph.
312 where : `str`, optional
313 A data ID query that constrains the quanta generated. Must not be
314 provided if a custom ``builder_class`` is given and that class does
315 not accept ``where`` as a construction argument.
316 builder_class : `type` [ \
317 `.quantum_graph_builder.QuantumGraphBuilder` ], optional
318 Quantum graph builder implementation. Ignored if ``builder`` is
319 provided.
320 attach_datastore_records : `bool`, optional
321 Whether to attach datastore records. These are currently used only
322 by `lsst.daf.butler.QuantumBackedButler`, which is not used by
323 `SeparablePipelineExecutor` for execution.
324 **kwargs
325 Additional keyword arguments are forwarded to ``builder_class``
326 when a quantum graph builder instance is constructed. All
327 arguments accepted by the
328 `~.quantum_graph_builder.QuantumGraphBuilder` base
329 class are provided automatically (from explicit arguments to this
330 method and executor attributes) and do not need to be included
331 as keyword arguments.
333 Returns
334 -------
335 graph : `.QuantumGraph`
336 The quantum graph for ``.Pipeline`` as run on the datasets
337 identified by ``where``.
339 Notes
340 -----
341 This method does no special handling of empty quantum graphs. If
342 needed, clients can use `len` to test if the returned graph is empty.
343 """
344 metadata = {
345 "skip_existing_in": self._skip_existing_in,
346 "skip_existing": bool(self._skip_existing_in),
347 "data_query": where,
348 }
349 qg_builder = self.make_quantum_graph_builder(pipeline, where, builder_class=builder_class, **kwargs)
350 graph = qg_builder.finish(
351 metadata=metadata, attach_datastore_records=attach_datastore_records
352 ).assemble()
353 _LOG.info(
354 "PredictedQuantumGraph contains %d quanta for %d tasks.",
355 len(graph),
356 len(graph.quanta_by_task),
357 )
358 return graph
360 def run_pipeline(
361 self,
362 graph: QuantumGraph | PredictedQuantumGraph,
363 fail_fast: bool = False,
364 graph_executor: QuantumGraphExecutor | None = None,
365 num_proc: int = 1,
366 *,
367 provenance_dataset_ref: DatasetRef | None = None,
368 ) -> None:
369 """Run a pipeline in the form of a prepared quantum graph.
371 Pre-execution initialization must have already been run;
372 see `pre_execute_qgraph`.
374 Parameters
375 ----------
376 graph : `.QuantumGraph` or `.quantum_graph.PredictedQuantumGraph`
377 The pipeline and datasets to execute.
378 fail_fast : `bool`, optional
379 If `True`, abort all execution if any task fails when
380 running with multiple processes. Only used with the default graph
381 executor).
382 graph_executor : `.quantum_graph_executor.QuantumGraphExecutor`,\
383 optional
384 A custom graph executor. By default, a new instance of
385 `.mp_graph_executor.MPGraphExecutor` is used.
386 num_proc : `int`, optional
387 The number of processes that can be used to run the pipeline. The
388 default value ensures that no subprocess is created. Only used with
389 the default graph executor.
390 provenance_dataset_ref : `lsst.daf.butler.DatasetRef`, optional
391 Dataset that should be used to save provenance. Provenance is only
392 supported when running in a single process (at least for the
393 default quantum executor), and should not be enabled in contexts
394 where a quantum might be executed more than once (i.e. retried)
395 within the same `~lsst.daf.butler.CollectionType.RUN` collection.
396 The caller is responsible for registering the dataset type and for
397 ensuring that the dimensions of this dataset do not lead to
398 uniqueness conflicts.
399 """
400 if not graph_executor:
401 quantum_executor = SingleQuantumExecutor(
402 butler=self._butler,
403 task_factory=self._task_factory,
404 skip_existing_in=self._skip_existing_in,
405 clobber_outputs=self._clobber_output,
406 resources=self.resources,
407 raise_on_partial_outputs=self.raise_on_partial_outputs,
408 )
409 graph_executor = MPGraphExecutor(
410 num_proc=num_proc,
411 timeout=2_592_000.0, # In practice, timeout is never helpful; set to 30 days.
412 quantum_executor=quantum_executor,
413 fail_fast=fail_fast,
414 )
415 # Have to reset connection pool to avoid sharing connections with
416 # forked processes.
417 self._butler.registry.resetConnectionPool()
419 if provenance_dataset_ref is not None:
420 with TemporaryForIngest(self._butler, provenance_dataset_ref) as temporary:
421 try:
422 graph_executor.execute(graph, provenance_graph_file=temporary.ospath)
423 temporary.ingest()
424 except MPGraphExecutorError:
425 # If the graph executor itself raised, it will have
426 # finished the provenance rewrite. In other cases the
427 # temporary file might be incomplete or corrupted and we
428 # can't roll the dice on ingesting it.
429 temporary.ingest()
430 raise
432 else:
433 graph_executor.execute(graph)