Coverage for python / lsst / ctrl / mpexec / cli / cmd / commands.py: 52%
184 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 08:33 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 08:33 +0000
1# This file is part of ctrl_mpexec.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28import sys
29from collections.abc import Iterator, Sequence
30from contextlib import contextmanager
31from functools import partial
32from importlib import import_module
33from tempfile import NamedTemporaryFile
34from typing import Any
36import click
38from lsst.ctrl.mpexec.showInfo import ShowInfo
39from lsst.daf.butler.cli.opt import (
40 collections_option,
41 confirm_option,
42 options_file_option,
43 processes_option,
44 repo_argument,
45 where_option,
46)
47from lsst.daf.butler.cli.utils import catch_and_exit, option_section, unwrap
48from lsst.pipe.base.quantum_reports import Report
50from .. import opt as ctrlMpExecOpts
51from .. import script
52from ..script import confirmable
53from ..utils import PipetaskCommand, collect_pipeline_actions
55epilog = unwrap(
56 """Notes:
58--task, --delete, --config, --config-file, and --instrument action options can
59appear multiple times; all values are used, in order left to right.
61FILE reads command-line options from the specified file. Data may be
62distributed among multiple lines (e.g. one option per line). Data after # is
63treated as a comment and ignored. Blank lines and lines starting with # are
64ignored.)
65"""
66)
69def _unhandledShow(show: ShowInfo, cmd: str) -> None:
70 if show.unhandled:
71 print(
72 f"The following '--show' options were not known to the {cmd} command: "
73 f"{', '.join(show.unhandled)}",
74 file=sys.stderr,
75 )
78@click.command(cls=PipetaskCommand, epilog=epilog, short_help="Build pipeline definition.")
79@click.pass_context
80@ctrlMpExecOpts.show_option()
81@ctrlMpExecOpts.pipeline_build_options()
82@option_section(sectionText="")
83@options_file_option()
84@catch_and_exit
85def build(ctx: click.Context, **kwargs: Any) -> None:
86 """Build and optionally save pipeline definition.
88 This does not require input data to be specified.
89 """
90 kwargs = collect_pipeline_actions(ctx, **kwargs)
91 show = ShowInfo(kwargs.pop("show", []))
92 if kwargs.get("butler_config") is not None and (
93 {"pipeline-graph", "task-graph"}.isdisjoint(show.commands) and not kwargs.get("pipeline_dot")
94 ):
95 raise click.ClickException(
96 "--butler-config was provided but nothing uses it "
97 "(only --show pipeline-graph, --show task-graph and --pipeline-dot do)."
98 )
99 script.build(**kwargs, show=show)
100 _unhandledShow(show, "build")
103@contextmanager
104def coverage_context(kwargs: dict[str, Any]) -> Iterator[None]:
105 """Enable coverage recording."""
106 packages = kwargs.pop("cov_packages", ())
107 report = kwargs.pop("cov_report", True)
108 if not kwargs.pop("coverage", False):
109 yield
110 return
111 # Lazily import coverage only when we might need it
112 try:
113 coverage = import_module("coverage")
114 except ModuleNotFoundError:
115 raise click.ClickException("coverage was requested but the coverage package is not installed.")
116 with NamedTemporaryFile("w") as rcfile:
117 rcfile.write(
118 """
119[run]
120branch = True
121concurrency = multiprocessing
122"""
123 )
124 if packages:
125 packages_str = ",".join(packages)
126 rcfile.write(f"source_pkgs = {packages_str}\n")
127 rcfile.flush()
128 cov = coverage.Coverage(config_file=rcfile.name)
129 cov.start()
130 try:
131 yield
132 finally:
133 cov.stop()
134 cov.save()
135 if report:
136 outdir = "./covhtml"
137 cov.html_report(directory=outdir)
138 click.echo(f"Coverage report written to {outdir}.")
141@click.command(cls=PipetaskCommand, epilog=epilog)
142@click.pass_context
143@ctrlMpExecOpts.show_option()
144@ctrlMpExecOpts.pipeline_build_options(skip_butler_config=True)
145@ctrlMpExecOpts.qgraph_options()
146@ctrlMpExecOpts.butler_options()
147@option_section(sectionText="")
148@options_file_option()
149@catch_and_exit
150def qgraph(ctx: click.Context, **kwargs: Any) -> None:
151 """Build and optionally save quantum graph."""
152 kwargs = collect_pipeline_actions(ctx, **kwargs)
153 summary = kwargs.pop("summary", None)
154 with coverage_context(kwargs):
155 show = ShowInfo(kwargs.pop("show", []))
156 # The only reason 'build' might want a butler is to resolve the
157 # pipeline graph for its own 'show' options, which wouldn't run in this
158 # context. Take it out of the kwargs so it doesn't instantiate a
159 # butler unnecessarily.
160 butler_config = kwargs.pop("butler_config", None)
161 pipeline_graph_factory = script.build(**kwargs, show=show)
162 kwargs["butler_config"] = butler_config
163 if show.handled and not show.unhandled:
164 print(
165 "No quantum graph generated. The --show option was given and all options were processed.",
166 file=sys.stderr,
167 )
168 return
169 if (
170 qgraph := script.qgraph(
171 pipeline_graph_factory,
172 **kwargs,
173 show=show,
174 # Making a summary report requires that we load the same graph
175 # components as execution.
176 for_execution=(summary is not None),
177 )
178 ) is None:
179 raise click.ClickException("QuantumGraph was empty; ERROR logs above should provide details.")
180 # QuantumGraph-only summary call here since script.qgraph also called
181 # by run methods.
182 if summary:
183 report = Report(qgraphSummary=qgraph._make_summary())
184 with open(summary, "w") as out:
185 # Do not save fields that are not set.
186 out.write(report.model_dump_json(exclude_none=True, indent=2))
188 _unhandledShow(show, "qgraph")
191@click.command(cls=PipetaskCommand, epilog=epilog)
192@ctrlMpExecOpts.run_options()
193@catch_and_exit
194def run(ctx: click.Context, **kwargs: Any) -> None:
195 """Build and execute pipeline and quantum graph."""
196 kwargs = collect_pipeline_actions(ctx, **kwargs)
197 with coverage_context(kwargs):
198 show = ShowInfo(kwargs.pop("show", []))
199 pipeline_graph_factory = script.build(**kwargs, show=show)
200 if show.handled and not show.unhandled:
201 print(
202 "No quantum graph generated or pipeline executed. "
203 "The --show option was given and all options were processed.",
204 file=sys.stderr,
205 )
206 return
207 if (qgraph := script.qgraph(pipeline_graph_factory, for_execution=True, **kwargs, show=show)) is None:
208 raise click.ClickException("QuantumGraph was empty; ERROR logs above should provide details.")
209 _unhandledShow(show, "run")
210 if show.handled:
211 print(
212 "No pipeline executed. The --show option was given and all options were processed.",
213 file=sys.stderr,
214 )
215 return
216 script.run(qgraph, **kwargs)
219@click.command(cls=PipetaskCommand)
220@ctrlMpExecOpts.butler_config_option()
221@ctrlMpExecOpts.collection_argument()
222@confirm_option()
223@ctrlMpExecOpts.recursive_option(
224 help="""If the parent CHAINED collection has child CHAINED collections,
225 search the children until nested chains that start with the parent's name
226 are removed."""
227)
228def purge(confirm: bool, **kwargs: Any) -> None:
229 """Remove a CHAINED collection and its contained collections.
231 COLLECTION is the name of the chained collection to purge. it must not be a
232 child of any other CHAINED collections
234 Child collections must be members of exactly one collection.
236 The collections that will be removed will be printed, there will be an
237 option to continue or abort (unless using --no-confirm).
238 """
239 confirmable.confirm(partial(script.purge, **kwargs), confirm)
242@click.command(cls=PipetaskCommand)
243@ctrlMpExecOpts.butler_config_option()
244@ctrlMpExecOpts.collection_argument()
245@confirm_option()
246def cleanup(confirm: bool, **kwargs: Any) -> None:
247 """Remove non-members of CHAINED collections.
249 Removes collections that start with the same name as a CHAINED
250 collection but are not members of that collection.
251 """
252 confirmable.confirm(partial(script.cleanup, **kwargs), confirm)
255@click.command(cls=PipetaskCommand)
256@repo_argument()
257@ctrlMpExecOpts.qgraph_argument()
258@ctrlMpExecOpts.config_search_path_option()
259@ctrlMpExecOpts.qgraph_id_option()
260@ctrlMpExecOpts.coverage_options()
261def pre_exec_init_qbb(repo: str, qgraph: str, **kwargs: Any) -> None:
262 """Execute pre-exec-init on Quantum-Backed Butler.
264 REPO is the location of the butler/registry config file.
266 QGRAPH is the path to a serialized Quantum Graph file.
267 """
268 with coverage_context(kwargs):
269 script.pre_exec_init_qbb(repo, qgraph, **kwargs)
272@click.command(cls=PipetaskCommand)
273@repo_argument()
274@ctrlMpExecOpts.qgraph_argument()
275@ctrlMpExecOpts.config_search_path_option()
276@ctrlMpExecOpts.qgraph_id_option()
277@ctrlMpExecOpts.qgraph_node_id_option()
278@processes_option()
279@ctrlMpExecOpts.pdb_option()
280@ctrlMpExecOpts.profile_option()
281@ctrlMpExecOpts.coverage_options()
282@ctrlMpExecOpts.debug_option()
283@ctrlMpExecOpts.start_method_option()
284@ctrlMpExecOpts.timeout_option()
285@ctrlMpExecOpts.fail_fast_option()
286@ctrlMpExecOpts.raise_on_partial_outputs_option()
287@ctrlMpExecOpts.summary_option()
288@ctrlMpExecOpts.enable_implicit_threading_option()
289@ctrlMpExecOpts.cores_per_quantum_option()
290@ctrlMpExecOpts.memory_per_quantum_option()
291@ctrlMpExecOpts.no_existing_outputs_option()
292def run_qbb(repo: str, qgraph: str, **kwargs: Any) -> None:
293 """Execute pipeline using Quantum-Backed Butler.
295 REPO is the location of the butler/registry config file.
297 QGRAPH is the path to a serialized Quantum Graph file.
298 """
299 with coverage_context(kwargs):
300 script.run_qbb(butler_config=repo, qgraph=qgraph, **kwargs)
303@click.command(cls=PipetaskCommand)
304@ctrlMpExecOpts.qgraph_argument()
305@ctrlMpExecOpts.run_argument()
306@ctrlMpExecOpts.output_qgraph_argument()
307@ctrlMpExecOpts.metadata_run_key_option()
308@ctrlMpExecOpts.update_graph_id_option()
309def update_graph_run(
310 qgraph: str,
311 run: str,
312 output_qgraph: str,
313 metadata_run_key: str,
314 update_graph_id: bool,
315) -> None:
316 """Update existing quantum graph with new output run name and re-generate
317 output dataset IDs.
319 QGRAPH is the URL to a serialized Quantum Graph file.
321 RUN is the new RUN collection name for output graph.
323 OUTPUT_QGRAPH is the URL to store the updated Quantum Graph.
324 """
325 script.update_graph_run(qgraph, run, output_qgraph, metadata_run_key, update_graph_id)
328@click.command(cls=PipetaskCommand)
329@repo_argument()
330@click.argument("qgraphs", nargs=-1)
331@collections_option()
332@where_option()
333@click.option(
334 "--full-output-filename",
335 default="",
336 help="Output report as a file with this name. "
337 "For pipetask report on one graph, this should be a yaml file. For multiple graphs "
338 "or when using the --force-v2 option, this should be a json file. We will be "
339 "deprecating the single-graph-only (QuantumGraphExecutionReport) option soon.",
340)
341@click.option("--logs/--no-logs", default=True, help="Get butler log datasets for extra information.")
342@click.option(
343 "--brief",
344 default=False,
345 is_flag=True,
346 help="Only show counts in report (a brief summary). Note that counts are"
347 " also printed to the screen when using the --full-output-filename option.",
348)
349@click.option(
350 "--curse-failed-logs",
351 is_flag=True,
352 default=False,
353 help="If log datasets are missing in v2 (QuantumProvenanceGraph), mark them as cursed",
354)
355@click.option(
356 "--force-v2",
357 is_flag=True,
358 default=False,
359 help="Use the QuantumProvenanceGraph instead of the QuantumGraphExecutionReport, "
360 "even when there is only one qgraph. Otherwise, the QuantumGraphExecutionReport "
361 "will run on one graph by default.",
362)
363@click.option(
364 "--read-caveats",
365 type=click.Choice(["exhaustive", "lazy", "none"], case_sensitive=False),
366 default="lazy",
367)
368@click.option(
369 "--use-qbb/--no-use-qbb",
370 is_flag=True,
371 default=True,
372 help="Whether to use a quantum-backed butler for metadata and log reads.",
373)
374@click.option(
375 "--view-graph",
376 is_flag=True,
377 default=False,
378 help="Display pipeline processing status as a graph on stdout instead of a plain-text summary.",
379)
380@processes_option()
381def report(
382 repo: str,
383 qgraphs: Sequence[str],
384 collections: Sequence[str] | None,
385 where: str,
386 full_output_filename: str = "",
387 logs: bool = True,
388 brief: bool = False,
389 curse_failed_logs: bool = False,
390 force_v2: bool = False,
391 read_caveats: str = "lazy",
392 use_qbb: bool = True,
393 processes: int = 1,
394 view_graph: bool = False,
395) -> None:
396 """Summarize the state of executed quantum graph(s), with counts of failed,
397 successful and expected quanta, as well as counts of output datasets and
398 their query (visible/shadowed) states. Analyze one or more attempts at the
399 same processing on the same dataquery-identified "group" and resolve
400 recoveries and persistent failures. Identify mismatch errors between
401 attempts.
403 Save the report as a file (``--full-output-filename``) or print it to
404 stdout (default). If the terminal is overwhelmed with data_ids from
405 failures try the ``--brief`` option.
407 Butler ``collections`` and ``where`` options are for use in
408 `lsst.daf.butler.Registry.queryDatasets` if paring down the collections
409 would be useful. Pass collections in order of most to least recent. By
410 default the collections and query will be taken from the graphs.
412 REPO is the location of the butler/registry config file.
414 QGRAPHS is a sequence of links to serialized Quantum Graphs which have
415 been executed and are to be analyzed. Pass the graphs in order of first to
416 last executed.
417 """
418 if any([force_v2, len(qgraphs) > 1, collections, where, curse_failed_logs]):
419 script.report_v2(
420 repo,
421 qgraphs,
422 collections,
423 where,
424 full_output_filename,
425 logs,
426 brief,
427 curse_failed_logs,
428 read_caveats=(read_caveats if read_caveats != "none" else None), # type: ignore[arg-type]
429 use_qbb=use_qbb,
430 n_cores=processes,
431 view_graph=view_graph,
432 )
433 else:
434 assert len(qgraphs) == 1, "Cannot make a report without a quantum graph."
435 script.report(repo, qgraphs[0], full_output_filename, logs, brief)
438@click.command(cls=PipetaskCommand)
439@click.argument("filenames", nargs=-1)
440@click.option(
441 "--full-output-filename",
442 default="",
443 help="Output report as a file with this name (json).",
444)
445@click.option(
446 "--brief",
447 default=False,
448 is_flag=True,
449 help="Only show counts in report (a brief summary). Note that counts are"
450 " also printed to the screen when using the --full-output-filename option.",
451)
452def aggregate_reports(
453 filenames: Sequence[str], full_output_filename: str | None, brief: bool = False
454) -> None:
455 """Aggregate pipetask report output on disjoint data-id groups into one
456 Summary over common tasks and datasets. Intended for use when the same
457 pipeline has been run over all groups (i.e., to aggregate all reports
458 for a given step). This functionality is only compatible with reports
459 from the `~lsst.pipe.base.quantum_provenance_graph.QuantumProvenanceGraph`,
460 so the reports must be run over multiple groups or with the ``--force-v2``
461 option.
463 Save the report as a file (``--full-output-filename``) or print it to
464 stdout (default). If the terminal is overwhelmed with data_ids from
465 failures try the ``--brief`` option.
467 FILENAMES are the space-separated paths to json file output created by
468 pipetask report.
469 """
470 script.aggregate_reports(filenames, full_output_filename, brief)