Coverage for python / lsst / ctrl / mpexec / cli / script / build.py: 11%
56 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 08:33 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 08:33 +0000
1# This file is part of ctrl_mpexec.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30__all__ = ("build",)
32from lsst.daf.butler import Butler
33from lsst.pipe.base import Pipeline
34from lsst.pipe.base.pipeline_graph import visualization
35from lsst.resources import ResourcePathExpression
37from ..._pipeline_graph_factory import PipelineGraphFactory
38from ...showInfo import ShowInfo
39from ..utils import _PipelineAction
42def build(
43 *,
44 pipeline: ResourcePathExpression | Pipeline,
45 pipeline_actions: list[_PipelineAction] | _PipelineAction,
46 pipeline_dot: str,
47 pipeline_mermaid: str,
48 save_pipeline: str,
49 show: ShowInfo,
50 butler_config: ResourcePathExpression | None = None,
51 select_tasks: str = "",
52 **kwargs: object,
53) -> PipelineGraphFactory:
54 """Implement the command line interface `pipetask build` subcommand.
56 Should only be called by command line tools and unit test code that tests
57 this function.
59 Build and optionally save pipeline definition.
61 Returns the pipeline instance that was built, for testing and for using
62 this function with other script functions.
64 Parameters
65 ----------
66 pipeline : `str` or `lsst.pipe.base.Pipeline`
67 Path location of a pipeline definition file in YAML format.
68 pipeline_actions : `list` [`PipelineAction`] or `PipelineAction`
69 A list of pipeline actions in the order they should be executed.
70 pipeline_dot : `str`
71 Path location for storing GraphViz DOT representation of a pipeline.
72 pipeline_mermaid : `str`
73 Path location for storing Mermaid representation of a pipeline.
74 save_pipeline : `str`
75 Path location for storing resulting pipeline definition in YAML format.
76 show : `lsst.ctrl.mpexec.showInfo.ShowInfo`
77 Descriptions of what to dump to stdout.
78 butler_config : `str`, `dict`, or `lsst.daf.butler.Config`, optional
79 If `str`, `butler_config` is the path location of the gen3
80 butler/registry config file. If `dict`, `butler_config` is key value
81 pairs used to init or update the `lsst.daf.butler.Config` instance. If
82 `Config`, it is the object used to configure a Butler.
83 Only used to resolve pipeline graphs for --show pipeline-graph and
84 --show task-graph.
85 select_tasks : `str`, optional
86 String expression that filters the tasks in the pipeline.
87 **kwargs
88 Ignored; click commands may accept options for more than one script
89 function and pass all the option kwargs to each of the script functions
90 which ignore these unused kwargs.
92 Returns
93 -------
94 pipeline_graph_factory : `..PipelineGraphFactory`
95 A helper object that holds the built pipeline and can turn it into a
96 pipeline graph.
98 Raises
99 ------
100 Exception
101 Raised if there is a failure building the pipeline.
102 """
103 # If pipeline_actions is a single instance, not a list, then put it in
104 # a list. _PipelineAction is a namedtuple, so we can't use
105 # `lsst.utils.iteration.iterable` because a namedtuple *is* iterable,
106 # but we need a list of _PipelineAction.
107 if isinstance(pipeline_actions, _PipelineAction):
108 pipeline_actions = [pipeline_actions]
110 if pipeline:
111 if not isinstance(pipeline, Pipeline):
112 pipeline = Pipeline.from_uri(pipeline)
113 else:
114 pipeline = Pipeline("anonymous")
116 # loop over all pipeline actions and apply them in order
117 for action in pipeline_actions:
118 match action.action:
119 case "add_instrument":
120 pipeline.addInstrument(action.value)
121 case "new_task":
122 pipeline.addTask(action.value, action.label)
123 case "delete_task":
124 pipeline.removeTask(action.label)
125 case "config":
126 # action value string is "field=value", split it at '='
127 field, _, value = action.value.partition("=")
128 pipeline.addConfigOverride(action.label, field, value)
129 case "configfile":
130 pipeline.addConfigFile(action.label, action.value)
131 case _:
132 raise ValueError(f"Unexpected pipeline action: {action.action}")
134 if save_pipeline:
135 pipeline.write_to_uri(save_pipeline)
137 butler: Butler | None = None
138 if butler_config:
139 butler = Butler.from_config(butler_config, writeable=False)
141 try:
142 pipeline_graph_factory = PipelineGraphFactory(pipeline, butler, select_tasks)
143 finally:
144 if butler is not None:
145 butler.close()
147 if pipeline_dot:
148 with open(pipeline_dot, "w") as stream:
149 visualization.show_dot(
150 pipeline_graph_factory(visualization_only=True),
151 stream,
152 dataset_types=True,
153 task_classes="full",
154 )
156 if pipeline_mermaid:
157 # Determine output format based on file extension.
158 if pipeline_mermaid.endswith(".svg"):
159 output_format = "svg"
160 file_mode = "wb"
161 elif pipeline_mermaid.endswith(".png"):
162 output_format = "png"
163 file_mode = "wb"
164 else: # Default to the text-based mmd format.
165 output_format = "mmd"
166 file_mode = "w"
168 with open(pipeline_mermaid, file_mode) as stream:
169 visualization.show_mermaid(
170 pipeline_graph_factory(visualization_only=True),
171 stream,
172 output_format=output_format,
173 width=4500 if output_format != "mmd" else None,
174 dataset_types=True,
175 task_classes="full",
176 )
178 show.show_pipeline_info(pipeline_graph_factory)
180 return pipeline_graph_factory