Coverage for python / lsst / ctrl / mpexec / cli / script / qgraph.py: 17%
85 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 08:45 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 08:45 +0000
1# This file is part of ctrl_mpexec.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30__all__ = ("qgraph",)
32import uuid
33from collections.abc import Iterable, Mapping, Sequence
34from typing import TYPE_CHECKING
36from astropy.table import Table
38from lsst.pipe.base import BuildId, QuantumGraph
39from lsst.pipe.base.all_dimensions_quantum_graph_builder import (
40 AllDimensionsQuantumGraphBuilder,
41 DatasetQueryConstraintVariant,
42)
43from lsst.pipe.base.dot_tools import graph2dot
44from lsst.pipe.base.mermaid_tools import graph2mermaid
45from lsst.pipe.base.pipeline_graph import TaskImportMode
46from lsst.pipe.base.quantum_graph import PredictedQuantumGraph, PredictedQuantumGraphComponents
47from lsst.resources import ResourcePath, ResourcePathExpression
48from lsst.utils.iteration import ensure_iterable
49from lsst.utils.logging import getLogger
51from ..._pipeline_graph_factory import PipelineGraphFactory
52from ...showInfo import ShowInfo
53from ..butler_factory import ButlerFactory
54from ..utils import summarize_quantum_graph
56if TYPE_CHECKING:
57 from lsst.pipe.base.tests.mocks import ForcedFailure # this monkey patches; only import for annotation!
59_LOG = getLogger(__name__)
62def qgraph(
63 pipeline_graph_factory: PipelineGraphFactory | None,
64 *,
65 qgraph: ResourcePathExpression | None,
66 qgraph_id: str | None,
67 qgraph_node_id: Iterable[uuid.UUID | str] | None,
68 qgraph_datastore_records: bool,
69 skip_existing_in: Iterable[str] | None,
70 skip_existing: bool,
71 save_qgraph: ResourcePathExpression | None,
72 qgraph_dot: str | None,
73 qgraph_mermaid: str | None,
74 butler_config: ResourcePathExpression,
75 input: Iterable[str] | str,
76 output: str | None,
77 output_run: str | None,
78 extend_run: bool,
79 replace_run: bool,
80 prune_replaced: str | None,
81 data_query: str | None,
82 data_id_table: Iterable[ResourcePathExpression],
83 show: ShowInfo,
84 clobber_outputs: bool,
85 dataset_query_constraint: str,
86 rebase: bool,
87 mock: bool = False,
88 unmocked_dataset_types: Sequence[str],
89 mock_failure: Mapping[str, ForcedFailure],
90 for_execution: bool = False,
91 for_init_output_run: bool = False,
92 **kwargs: object,
93) -> PredictedQuantumGraph | None:
94 """Implement the command line interface `pipetask qgraph` subcommand.
96 Should only be called by command line tools and unit test code that test
97 this function.
99 Parameters
100 ----------
101 pipeline_graph_factory : `..PipelineGraphFactory` or `None`
102 A factory that holds the pipeline and can produce a pipeline graph.
103 If this is not `None` then ``qgraph`` should be `None`.
104 qgraph : convertible to `lsst.resources.ResourcePath`, or `None`
105 URI location for a serialized quantum graph definition. If this option
106 is not `None` then ``pipeline_graph_factory`` should be `None`.
107 qgraph_id : `str` or `None`
108 Quantum graph identifier, if specified must match the identifier of the
109 graph loaded from a file. Ignored if graph is not loaded from a file.
110 qgraph_node_id : `~collections.abc.Iterable` [`str` | `uuid.UUID`] or \
111 `None`
112 Only load a specified set of nodes if graph is loaded from a file,
113 nodes are identified by integer IDs.
114 qgraph_datastore_records : `bool`
115 If `True` then include datastore records into generated quanta.
116 skip_existing_in : `~collections.abc.Iterable` [ `str` ] or `None`
117 Accepts list of collections, if all Quantum outputs already exist in
118 the specified list of collections then that Quantum will be excluded
119 from the QuantumGraph.
120 skip_existing : `bool`
121 Appends output RUN collection to the ``skip_existing_in`` list.
122 save_qgraph : convertible to `lsst.resources.ResourcePath` or `None`
123 URI location for saving the quantum graph.
124 qgraph_dot : `str` or `None`
125 Path location for storing GraphViz DOT representation of a quantum
126 graph.
127 qgraph_mermaid : `str` or `None`
128 Path location for storing Mermaid representation of a quantum graph.
129 butler_config : convertible to `lsst.resources.ResourcePath`
130 Path to butler repository configuration.
131 input : `~collections.abc.Iterable` [ `str` ] or `None`
132 List of names of the input collection(s).
133 output : `str` or `None`
134 Name of the output CHAINED collection. This may either be an existing
135 CHAINED collection to use as both input and output (if `input` is
136 `None`), or a new CHAINED collection created to include all inputs
137 (if `input` is not `None`). In both cases, the collection's children
138 will start with an output RUN collection that directly holds all new
139 datasets (see `output_run`).
140 output_run : `str` or `None`
141 Name of the new output RUN collection. If not provided then `output`
142 must be provided and a new RUN collection will be created by appending
143 a timestamp to the value passed with `output`. If this collection
144 already exists then `extend_run` must be passed.
145 extend_run : `bool`
146 Instead of creating a new RUN collection, insert datasets into either
147 the one given by `output_run` (if provided) or the first child
148 collection of `output` (which must be of type RUN).
149 replace_run : `bool`
150 Before creating a new RUN collection in an existing CHAINED collection,
151 remove the first child collection (which must be of type RUN). This can
152 be used to repeatedly write to the same (parent) collection during
153 development, but it does not delete the datasets associated with the
154 replaced run unless `prune-replaced` is also True. Requires `output`,
155 and `extend_run` must be `None`.
156 prune_replaced : `str` or `None`
157 If not `None`, delete the datasets in the collection replaced by
158 `replace_run`, either just from the datastore ("unstore") or by
159 removing them and the RUN completely ("purge"). Requires
160 ``replace_run`` to be `True`.
161 data_query : `str`
162 User query selection expression.
163 data_id_table : `~collections.abc.Iterable` [convertible to \
164 `lsst.resources.ResourcePath`]
165 Paths to data ID tables to join in.
166 show : `lsst.ctrl.mpexec.showInfo.ShowInfo`
167 Descriptions of what to dump to stdout.
168 clobber_outputs : `bool`
169 Remove outputs from previous execution of the same quantum before new
170 execution. If ``skip_existing`` is also passed, then only failed
171 quanta will be clobbered.
172 dataset_query_constraint : `str`
173 Control constraining graph building using pre-existing dataset types.
174 Valid values are off, all, or a comma separated list of dataset type
175 names.
176 rebase : `bool`
177 If `True` then reset output collection chain if it is inconsistent with
178 the ``inputs``.
179 mock : `bool`
180 If True, use a mocked version of the pipeline.
181 unmocked_dataset_types : `collections.abc.Sequence` [ `str` ], optional
182 List of overall-input dataset types that should not be mocked.
183 mock_failure : `~collections.abc.Mapping`
184 Quanta that should raise exceptions.
185 for_execution : `bool`, optional
186 If `True`, the script is being used to feed another that will execute
187 the given quanta, and hence all information needed for execution must
188 be loaded.
189 for_init_output_run : `bool`, optional
190 If `True`, the script is being used to feed another that will
191 initialize the output run, and hence all information needed to do so
192 must be loaded.
193 **kwargs : `dict` [`str`, `str`]
194 Ignored; click commands may accept options for more than one script
195 function and pass all the option kwargs to each of the script functions
196 which ignore these unused kwargs.
198 Returns
199 -------
200 qg : `lsst.pipe.base.quantum_graph.PredictedQuantumGraph`
201 The quantum graph object that was created or loaded.
202 """
203 # make sure that --extend-run always enables --skip-existing
204 if extend_run:
205 skip_existing = True
207 skip_existing_in = tuple(skip_existing_in) if skip_existing_in is not None else ()
208 if data_query is None:
209 data_query = ""
210 inputs = list(ensure_iterable(input)) if input else []
211 del input
213 butler, collections, run = ButlerFactory.make_butler_and_collections(
214 butler_config,
215 output=output,
216 output_run=output_run,
217 inputs=inputs,
218 extend_run=extend_run,
219 rebase=rebase,
220 replace_run=replace_run,
221 prune_replaced=prune_replaced,
222 )
223 with butler:
224 if skip_existing and run:
225 skip_existing_in += (run,)
227 qgc: PredictedQuantumGraphComponents
228 if qgraph is not None:
229 # click passes empty tuple as default value for qgraph_node_id
230 quantum_ids = (
231 {uuid.UUID(q) if not isinstance(q, uuid.UUID) else q for q in qgraph_node_id}
232 if qgraph_node_id
233 else None
234 )
235 qgraph = ResourcePath(qgraph)
236 match qgraph.getExtension():
237 case ".qgraph":
238 qgc = PredictedQuantumGraphComponents.from_old_quantum_graph(
239 QuantumGraph.loadUri(
240 qgraph,
241 butler.dimensions,
242 nodes=quantum_ids,
243 graphID=BuildId(qgraph_id) if qgraph_id is not None else None,
244 )
245 )
246 case ".qg":
247 if qgraph_id is not None:
248 _LOG.warning("--qgraph-id is ignored when loading new '.qg' files.")
249 if for_execution or for_init_output_run or save_qgraph or show.needs_full_qg:
250 import_mode = TaskImportMode.ASSUME_CONSISTENT_EDGES
251 else:
252 import_mode = TaskImportMode.DO_NOT_IMPORT
253 with PredictedQuantumGraph.open(qgraph, import_mode=import_mode) as reader:
254 if (
255 for_execution
256 or qgraph_dot
257 or qgraph_mermaid
258 or show.needs_full_qg
259 or qgraph_node_id
260 ):
261 # This reads everything for the given quanta.
262 reader.read_execution_quanta(quantum_ids)
263 elif for_init_output_run:
264 reader.read_init_quanta()
265 else:
266 reader.read_thin_graph()
267 qgc = reader.components
268 case ext:
269 raise ValueError(f"Unrecognized extension for quantum graph: {ext!r}")
271 # pipeline can not be provided in this case
272 if pipeline_graph_factory:
273 raise ValueError(
274 "Pipeline must not be given when quantum graph is read from "
275 f"file: {bool(pipeline_graph_factory)}"
276 )
277 else:
278 if pipeline_graph_factory is None:
279 raise ValueError("Pipeline must be given when quantum graph is not read from file.")
280 # We can't resolve the pipeline graph if we're mocking until after
281 # we've done the mocking (and the QG build will resolve on its own
282 # anyway).
283 pipeline_graph = pipeline_graph_factory(resolve=False)
284 if mock:
285 from lsst.pipe.base.tests.mocks import mock_pipeline_graph
287 pipeline_graph = mock_pipeline_graph(
288 pipeline_graph,
289 unmocked_dataset_types=unmocked_dataset_types,
290 force_failures=mock_failure,
291 )
292 data_id_tables = []
293 for table_file in data_id_table:
294 with ResourcePath(table_file).as_local() as local_path:
295 table = Table.read(local_path.ospath)
296 # Add the filename to the metadata for more logging
297 # information down in the QG builder.
298 table.meta["filename"] = table_file
299 data_id_tables.append(table)
300 # make execution plan (a.k.a. DAG) for pipeline
301 graph_builder = AllDimensionsQuantumGraphBuilder(
302 pipeline_graph,
303 butler,
304 where=data_query,
305 skip_existing_in=skip_existing_in,
306 clobber=clobber_outputs,
307 dataset_query_constraint=DatasetQueryConstraintVariant.fromExpression(
308 dataset_query_constraint
309 ),
310 input_collections=collections,
311 output_run=run,
312 data_id_tables=data_id_tables,
313 )
314 # Accumulate metadata (QB builder adds some of its own).
315 metadata = {
316 "butler_argument": str(butler_config),
317 "extend_run": extend_run,
318 "skip_existing_in": skip_existing_in,
319 "skip_existing": skip_existing,
320 "data_query": data_query,
321 }
322 assert run is not None, "Butler output run collection must be defined"
323 qgc = graph_builder.finish(
324 output, metadata=metadata, attach_datastore_records=qgraph_datastore_records
325 )
327 if save_qgraph:
328 _LOG.verbose("Writing quantum graph to %r.", save_qgraph)
329 qgc.write(save_qgraph)
331 qg = qgc.assemble()
333 if not summarize_quantum_graph(qg):
334 return None
336 if qgraph_dot:
337 _LOG.verbose("Writing quantum graph DOT visualization to %r.", qgraph_dot)
338 graph2dot(qg, qgraph_dot)
340 if qgraph_mermaid:
341 _LOG.verbose("Writing quantum graph Mermaid visualization to %r.", qgraph_mermaid)
342 graph2mermaid(qg, qgraph_mermaid)
344 # optionally dump some info.
345 show.show_graph_info(qg, butler_config)
347 return qg