Coverage for python / lsst / pipe / base / cli / cmd / commands.py: 73%
110 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:44 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:44 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28import functools
29import operator
30from collections.abc import Iterable
31from typing import Any
33import click
35from lsst.daf.butler.cli.opt import (
36 dataset_type_option,
37 options_file_option,
38 register_dataset_types_option,
39 repo_argument,
40 transfer_dimensions_option,
41 transfer_option,
42)
43from lsst.daf.butler.cli.utils import ButlerCommand, split_commas, unwrap
45from ... import script
46from ..._status import QuantumAttemptStatus, QuantumSuccessCaveats
47from ...quantum_graph import aggregator
48from ..opt import instrument_argument, update_output_chain_option
51@click.command(short_help="Add an instrument definition to the repository", cls=ButlerCommand)
52@repo_argument(required=True)
53@instrument_argument(required=True, nargs=-1, help="The fully-qualified name of an Instrument subclass.")
54@click.option("--update", is_flag=True)
55def register_instrument(*args: Any, **kwargs: Any) -> None:
56 """Add an instrument to the data repository."""
57 script.register_instrument(*args, **kwargs)
60@click.command(short_help="Transfer datasets from a graph to a butler.", cls=ButlerCommand)
61@click.argument("graph", required=True)
62@click.argument("dest", required=True)
63@register_dataset_types_option()
64@transfer_dimensions_option(default=False)
65@update_output_chain_option()
66@click.option(
67 "--dry-run", is_flag=True, default=False, help="Run the transfer but do not update the destination butler"
68)
69@dataset_type_option(help="Subset of dataset types to transfer from graph.")
70@options_file_option()
71def transfer_from_graph(**kwargs: Any) -> None:
72 """Transfer datasets from a quantum graph to a destination butler.
74 GRAPH is a URI to the source quantum graph file.
76 DEST is a URI to the Butler repository that will receive copies of the
77 datasets.
78 """
79 number = script.transfer_from_graph(**kwargs)
80 print(f"Number of datasets transferred: {number}")
83@click.command(short_help="Make Zip archive from output files using graph.", cls=ButlerCommand)
84@click.argument("graph", required=True)
85@repo_argument(
86 required=True,
87 help="REPO is a URI to a butler configuration that is used to configure "
88 "the datastore of the quantum-backed butler.",
89)
90@click.argument("dest", required=True)
91@dataset_type_option(help="Dataset types to include in Zip archive.")
92@options_file_option()
93def zip_from_graph(**kwargs: Any) -> None:
94 """Transfer datasets from a quantum graph to a Zip archive.
96 GRAPH is a URI to the source quantum graph file to use when building the
97 Zip archive.
99 DEST is a directory to write the Zip archive.
100 """
101 zip = script.zip_from_graph(**kwargs)
102 print(f"Zip archive written to {zip}")
105@click.command(short_help="Retrieve artifacts from subset of graph.", cls=ButlerCommand)
106@click.argument("graph", required=True)
107@repo_argument(
108 required=True,
109 help="REPO is a URI to a butler configuration that is used to configure "
110 "the datastore of the quantum-backed butler.",
111)
112@click.argument("dest", required=True)
113@transfer_option()
114@click.option(
115 "--preserve-path/--no-preserve-path",
116 is_flag=True,
117 default=True,
118 help="Preserve the datastore path to the artifact at the destination.",
119)
120@click.option(
121 "--clobber/--no-clobber",
122 is_flag=True,
123 default=False,
124 help="If clobber, overwrite files if they exist locally.",
125)
126@click.option(
127 "--qgraph-node-id",
128 callback=split_commas,
129 multiple=True,
130 help=unwrap(
131 """Only load a specified set of nodes when graph is
132 loaded from a file, nodes are identified by UUID
133 values. One or more comma-separated strings are
134 accepted. By default all nodes are loaded. Ignored if
135 graph is not loaded from a file."""
136 ),
137)
138@click.option(
139 "--include-inputs/--no-include-inputs",
140 is_flag=True,
141 default=True,
142 help="Whether to include input datasets in retrieval.",
143)
144@click.option(
145 "--include-outputs/--no-include-outputs",
146 is_flag=True,
147 default=True,
148 help="Whether to include output datasets in retrieval.",
149)
150@options_file_option()
151def retrieve_artifacts_for_quanta(**kwargs: Any) -> None:
152 """Retrieve artifacts from given quanta defined in quantum graph.
154 GRAPH is a URI to the source quantum graph file to use when building the
155 Zip archive.
157 DEST is a directory to write the Zip archive.
158 """
159 artifacts = script.retrieve_artifacts_for_quanta(**kwargs)
160 print(f"Written {len(artifacts)} artifacts to {kwargs['dest']}.")
163_AGGREGATOR_DEFAULTS = aggregator.AggregatorConfig()
166@click.command(short_help="Scan for the outputs of an active or completed quantum graph.", cls=ButlerCommand)
167@click.argument("predicted_graph", required=True)
168@repo_argument(required=True, help="Path or alias for the butler repository.")
169@click.option(
170 "-o",
171 "--output",
172 "output_path",
173 default=_AGGREGATOR_DEFAULTS.output_path,
174 help=(
175 "Path to the output provenance quantum graph. THIS OPTION IS FOR "
176 "DEVELOPMENT AND DEBUGGING ONLY. IT MAY BE REMOVED IN THE FUTURE."
177 ),
178)
179@click.option(
180 "--processes",
181 "-j",
182 "n_processes",
183 default=_AGGREGATOR_DEFAULTS.n_processes,
184 type=click.IntRange(min=1),
185 help="Number of processes to use.",
186)
187@click.option(
188 "--incomplete/--complete",
189 "incomplete",
190 default=_AGGREGATOR_DEFAULTS.incomplete,
191 help="Whether execution has completed (and failures cannot be retried).",
192)
193@click.option(
194 "--dry-run",
195 is_flag=True,
196 default=_AGGREGATOR_DEFAULTS.dry_run,
197 help="Do not actually perform any central database ingests.",
198)
199@click.option(
200 "--interactive-status/--no-interactive-status",
201 "interactive_status",
202 default=_AGGREGATOR_DEFAULTS.interactive_status,
203 help="Use progress bars for status reporting instead of periodic logging.",
204)
205@click.option(
206 "--log-status-interval",
207 type=int,
208 default=_AGGREGATOR_DEFAULTS.log_status_interval,
209 help="Interval (in seconds) between periodic logger status updates.",
210)
211@click.option(
212 "--register-dataset-types/--no-register-dataset-types",
213 default=_AGGREGATOR_DEFAULTS.register_dataset_types,
214 help="Register output dataset types.",
215)
216@click.option(
217 "--update-output-chain/--no-update-output-chain",
218 default=_AGGREGATOR_DEFAULTS.update_output_chain,
219 help="Prepend the output RUN collection to the output CHAINED collection.",
220)
221@click.option(
222 "--worker-log-dir",
223 type=str,
224 default=_AGGREGATOR_DEFAULTS.worker_log_dir,
225 help="Path to a directory (POSIX only) for parallel worker logs.",
226)
227@click.option(
228 "--worker-log-level",
229 type=str,
230 default=_AGGREGATOR_DEFAULTS.worker_log_level,
231 help="Log level for worker processes/threads (use DEBUG for per-quantum messages).",
232)
233@click.option(
234 "--zstd-level",
235 type=int,
236 default=_AGGREGATOR_DEFAULTS.zstd_level,
237 help="Compression level for the provenance quantum graph file.",
238)
239@click.option(
240 "--zstd-dict-size",
241 type=int,
242 default=_AGGREGATOR_DEFAULTS.zstd_dict_size,
243 help="Size (in bytes) of the ZStandard compression dictionary.",
244)
245@click.option(
246 "--zstd-dict-n-inputs",
247 type=int,
248 default=_AGGREGATOR_DEFAULTS.zstd_dict_n_inputs,
249 help=("Number of samples of each type to include in ZStandard compression dictionary training."),
250)
251@click.option(
252 "--mock-storage-classes/--no-mock-storage-classes",
253 default=_AGGREGATOR_DEFAULTS.mock_storage_classes,
254 help="Enable support for storage classes created by the lsst.pipe.base.tests.mocks package.",
255)
256@click.option(
257 "--promise-ingest-graph/--no-promise-ingest-graph",
258 default=_AGGREGATOR_DEFAULTS.promise_ingest_graph,
259 help=(
260 "Promise to run 'butler ingest-graph' later, allowing aggregate-graph "
261 "to skip metadata/log/config ingestion for now."
262 ),
263)
264def aggregate_graph(predicted_graph: str, repo: str, **kwargs: Any) -> None:
265 """Scan for quantum graph's outputs to gather provenance, ingest datasets
266 into the central butler repository, and delete datasets that are no
267 longer needed.
268 """
269 # It'd be nice to allow to the user to provide a path to an
270 # AggregatorConfig JSON file for options that weren't provided, but Click
271 # 8.1 fundamentally cannot handle flag options that default to None rather
272 # than True or False (i.e. so they fall back to the config value when not
273 # set). It's not clear whether Click 8.2.x has actually fixed this; Click
274 # 8.2.0 tried but caused new problems.
276 config = aggregator.AggregatorConfig(**kwargs)
277 try:
278 aggregator.aggregate_graph(predicted_graph, repo, config)
279 except aggregator.FatalWorkerError as err:
280 # When this exception is raised, we'll have already logged the relevant
281 # traceback from a separate worker.
282 raise click.ClickException(str(err)) from None
285@click.command(
286 short_help="Ingest a provenance quantum graph into a butler.",
287 cls=ButlerCommand,
288)
289@repo_argument(required=True, help="Path or alias for the butler repository.")
290@click.argument("provenance_graph", required=False)
291@transfer_option(default="move")
292@click.option("--batch-size", default=10000, help="How many datasets to process in each transaction.")
293@click.option(
294 "--output-run",
295 default=None,
296 help=(
297 "Name of the output RUN collection. Must be provided if the provenance graph is not"
298 " provided (so the graph can be found in the butler)."
299 ),
300)
301def ingest_graph(
302 *,
303 repo: str,
304 provenance_graph: str | None,
305 transfer: str | None,
306 batch_size: int,
307 output_run: str | None,
308) -> None:
309 """Ingest a provenance graph into a butler repository."""
310 from ...quantum_graph.ingest_graph import ingest_graph as ingest_graph_py
312 ingest_graph_py(repo, provenance_graph, transfer=transfer, batch_size=batch_size, output_run=output_run)
315@click.command(
316 short_help="Print and write provenance reports.",
317 cls=ButlerCommand,
318)
319@click.argument("repo_or_qg")
320@click.argument("collection", required=False, default=None)
321@click.option(
322 "--state",
323 multiple=True,
324 type=click.Choice(QuantumAttemptStatus),
325 help=(
326 "Additional quantum state to include in the status report and data ID tables "
327 "(FAILED, ABORTED, and ABORTED_SUCCESS are included by default)."
328 ),
329)
330@click.option(
331 "--no-state",
332 multiple=True,
333 type=str,
334 metavar="STATE",
335 help="Quantum state to drop from in status report and data ID tables (same options as --state).",
336)
337@click.option(
338 "--status-report",
339 default=None,
340 metavar="URI",
341 help="File or URI (.json) for a detailed report (with data IDs) on quanta with certain states.",
342)
343@click.option(
344 "--quantum-table/--no-quantum-table",
345 default=True,
346 help="Whether to print summary of quantum status counts to STDOUT.",
347)
348@click.option(
349 "--exception-table/--no-exception-table",
350 default=True,
351 help="Whether to print summary of exception type counts STDOUT.",
352)
353@click.option(
354 "--caveat",
355 multiple=True,
356 type=click.Choice(QuantumSuccessCaveats),
357 help=(
358 "Include successful quanta in the status report if they have this caveat. "
359 "May be passed multiple times; any matching caveat is included. "
360 "Passing this option implicitly adds '--state SUCCESSFUL'."
361 ),
362)
363@click.option(
364 "--data-id-table-dir",
365 default=None,
366 metavar="URI",
367 help=(
368 "Directory (may be a URI) for a tree of data ID tables for each "
369 "task label, status, and exception type combination in the status report."
370 ),
371)
372def provenance_report(
373 *,
374 repo_or_qg: str,
375 collection: str | None,
376 state: Iterable[QuantumAttemptStatus],
377 no_state: Iterable[str],
378 status_report: str | None,
379 quantum_table: bool = False,
380 exception_table: bool = False,
381 caveat: Iterable[QuantumSuccessCaveats],
382 data_id_table_dir: str | None,
383) -> None:
384 """Read a provenance quantum graph from a butler or file and use it to
385 generate reports.
387 REPO_OR_QG is a path or alias for the butler repository (if reading an
388 ingested graph, as indicated by passing COLLECTION), or the path to a
389 provenance quantum graph file.
390 """
391 from ...quantum_graph import ProvenanceQuantumGraph
393 states = set(state)
394 states.add(QuantumAttemptStatus.FAILED)
395 states.add(QuantumAttemptStatus.ABORTED)
396 states.add(QuantumAttemptStatus.ABORTED_SUCCESS)
397 for state_name in no_state:
398 states.discard(QuantumAttemptStatus.__members__[state_name])
399 with_caveats: QuantumSuccessCaveats | None = None
400 if caveat:
401 states.add(QuantumAttemptStatus.SUCCESSFUL)
402 with_caveats = functools.reduce(
403 operator.__or__,
404 caveat,
405 QuantumSuccessCaveats.NO_CAVEATS,
406 )
407 with ProvenanceQuantumGraph.from_args(repo_or_qg, collection=collection, datasets=()) as (graph, _):
408 graph.make_many_reports(
409 status_report_file=status_report,
410 states=states,
411 print_quantum_table=quantum_table,
412 print_exception_table=exception_table,
413 with_caveats=with_caveats,
414 data_id_table_dir=data_id_table_dir,
415 )