Coverage for python / lsst / ctrl / mpexec / _pipeline_graph_factory.py: 18%
39 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:20 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:20 +0000
1# This file is part of ctrl_mpexec.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30__all__ = ("PipelineGraphFactory",)
32from lsst.daf.butler import Butler
33from lsst.pipe.base import Pipeline, PipelineGraph
36class PipelineGraphFactory:
37 """A factory for building and caching a PipelineGraph.
39 Parameters
40 ----------
41 pipeline : `lsst.pipe.base.Pipeline`
42 Pipeline definition to start from.
43 butler : `lsst.daf.butler.Butler` or `None`, optional
44 Butler that can be used to resolve dataset type definitions and get
45 dimension schema.
46 select_tasks : `str`, optional
47 String expression that filters the tasks in the pipeline graph.
48 pipeline_graph : `lsst.pipe.base.pipeline_graph.PipelineGraph`, optional
49 Already-constructed pipeline graph.
50 """
52 def __init__(
53 self,
54 pipeline: Pipeline | None = None,
55 butler: Butler | None = None,
56 select_tasks: str = "",
57 *,
58 pipeline_graph: PipelineGraph | None = None,
59 ):
60 if pipeline is None and pipeline_graph is None:
61 raise TypeError("At least one of 'pipeline' and 'pipeline_graph' must not be `None`.")
62 self._pipeline = pipeline
63 self._registry = butler.registry if butler is not None else None
64 self._select_tasks = select_tasks
65 self._pipeline_graph: PipelineGraph | None = pipeline_graph
66 self._resolved: bool = False
67 self._for_visualization_only: bool = False
69 def __call__(self, *, resolve: bool = True, visualization_only: bool = False) -> PipelineGraph:
70 if self._pipeline_graph is None:
71 assert self._pipeline is not None, "Guaranteed at construction."
72 self._pipeline_graph = self._pipeline.to_graph()
73 if self._select_tasks:
74 self._pipeline_graph = self._pipeline_graph.select(self._select_tasks)
75 if resolve and not self._resolved:
76 self._pipeline_graph.resolve(self._registry, visualization_only=visualization_only)
77 self._resolved = True
78 self._for_visualization_only = self._registry is None
79 elif resolve and not visualization_only and self._for_visualization_only:
80 raise RuntimeError("Cannot resolve pipeline graph without butler.")
81 return self._pipeline_graph
83 @property
84 def pipeline(self) -> Pipeline:
85 """The original pipeline definition."""
86 if self._pipeline is None:
87 raise RuntimeError("Cannot obtain pipeline from pipeline graph.")
88 if self._select_tasks:
89 raise RuntimeError(
90 "The --select-tasks option cannot be used with operations that return or display a "
91 "pipeline as YAML, since it only operates on the pipeline graph."
92 )
93 return self._pipeline
95 def __bool__(self) -> bool:
96 if self._pipeline is not None:
97 return bool(self._pipeline)
98 else:
99 assert self._pipeline_graph is not None, "Guaranteed at construction."
100 return bool(self._pipeline_graph.tasks)