Coverage for python / lsst / ctrl / mpexec / cli / cmd / commands.py: 52%

184 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-01 08:20 +0000

1# This file is part of ctrl_mpexec. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28import sys 

29from collections.abc import Iterator, Sequence 

30from contextlib import contextmanager 

31from functools import partial 

32from importlib import import_module 

33from tempfile import NamedTemporaryFile 

34from typing import Any 

35 

36import click 

37 

38from lsst.ctrl.mpexec.showInfo import ShowInfo 

39from lsst.daf.butler.cli.opt import ( 

40 collections_option, 

41 confirm_option, 

42 options_file_option, 

43 processes_option, 

44 repo_argument, 

45 where_option, 

46) 

47from lsst.daf.butler.cli.utils import catch_and_exit, option_section, unwrap 

48from lsst.pipe.base.quantum_reports import Report 

49 

50from .. import opt as ctrlMpExecOpts 

51from .. import script 

52from ..script import confirmable 

53from ..utils import PipetaskCommand, collect_pipeline_actions 

54 

55epilog = unwrap( 

56 """Notes: 

57 

58--task, --delete, --config, --config-file, and --instrument action options can 

59appear multiple times; all values are used, in order left to right. 

60 

61FILE reads command-line options from the specified file. Data may be 

62distributed among multiple lines (e.g. one option per line). Data after # is 

63treated as a comment and ignored. Blank lines and lines starting with # are 

64ignored.) 

65""" 

66) 

67 

68 

69def _unhandledShow(show: ShowInfo, cmd: str) -> None: 

70 if show.unhandled: 

71 print( 

72 f"The following '--show' options were not known to the {cmd} command: " 

73 f"{', '.join(show.unhandled)}", 

74 file=sys.stderr, 

75 ) 

76 

77 

78@click.command(cls=PipetaskCommand, epilog=epilog, short_help="Build pipeline definition.") 

79@click.pass_context 

80@ctrlMpExecOpts.show_option() 

81@ctrlMpExecOpts.pipeline_build_options() 

82@option_section(sectionText="") 

83@options_file_option() 

84@catch_and_exit 

85def build(ctx: click.Context, **kwargs: Any) -> None: 

86 """Build and optionally save pipeline definition. 

87 

88 This does not require input data to be specified. 

89 """ 

90 kwargs = collect_pipeline_actions(ctx, **kwargs) 

91 show = ShowInfo(kwargs.pop("show", [])) 

92 if kwargs.get("butler_config") is not None and ( 

93 {"pipeline-graph", "task-graph"}.isdisjoint(show.commands) and not kwargs.get("pipeline_dot") 

94 ): 

95 raise click.ClickException( 

96 "--butler-config was provided but nothing uses it " 

97 "(only --show pipeline-graph, --show task-graph and --pipeline-dot do)." 

98 ) 

99 script.build(**kwargs, show=show) 

100 _unhandledShow(show, "build") 

101 

102 

103@contextmanager 

104def coverage_context(kwargs: dict[str, Any]) -> Iterator[None]: 

105 """Enable coverage recording.""" 

106 packages = kwargs.pop("cov_packages", ()) 

107 report = kwargs.pop("cov_report", True) 

108 if not kwargs.pop("coverage", False): 

109 yield 

110 return 

111 # Lazily import coverage only when we might need it 

112 try: 

113 coverage = import_module("coverage") 

114 except ModuleNotFoundError: 

115 raise click.ClickException("coverage was requested but the coverage package is not installed.") 

116 with NamedTemporaryFile("w") as rcfile: 

117 rcfile.write( 

118 """ 

119[run] 

120branch = True 

121concurrency = multiprocessing 

122""" 

123 ) 

124 if packages: 

125 packages_str = ",".join(packages) 

126 rcfile.write(f"source_pkgs = {packages_str}\n") 

127 rcfile.flush() 

128 cov = coverage.Coverage(config_file=rcfile.name) 

129 cov.start() 

130 try: 

131 yield 

132 finally: 

133 cov.stop() 

134 cov.save() 

135 if report: 

136 outdir = "./covhtml" 

137 cov.html_report(directory=outdir) 

138 click.echo(f"Coverage report written to {outdir}.") 

139 

140 

141@click.command(cls=PipetaskCommand, epilog=epilog) 

142@click.pass_context 

143@ctrlMpExecOpts.show_option() 

144@ctrlMpExecOpts.pipeline_build_options(skip_butler_config=True) 

145@ctrlMpExecOpts.qgraph_options() 

146@ctrlMpExecOpts.butler_options() 

147@option_section(sectionText="") 

148@options_file_option() 

149@catch_and_exit 

150def qgraph(ctx: click.Context, **kwargs: Any) -> None: 

151 """Build and optionally save quantum graph.""" 

152 kwargs = collect_pipeline_actions(ctx, **kwargs) 

153 summary = kwargs.pop("summary", None) 

154 with coverage_context(kwargs): 

155 show = ShowInfo(kwargs.pop("show", [])) 

156 # The only reason 'build' might want a butler is to resolve the 

157 # pipeline graph for its own 'show' options, which wouldn't run in this 

158 # context. Take it out of the kwargs so it doesn't instantiate a 

159 # butler unnecessarily. 

160 butler_config = kwargs.pop("butler_config", None) 

161 pipeline_graph_factory = script.build(**kwargs, show=show) 

162 kwargs["butler_config"] = butler_config 

163 if show.handled and not show.unhandled: 

164 print( 

165 "No quantum graph generated. The --show option was given and all options were processed.", 

166 file=sys.stderr, 

167 ) 

168 return 

169 if ( 

170 qgraph := script.qgraph( 

171 pipeline_graph_factory, 

172 **kwargs, 

173 show=show, 

174 # Making a summary report requires that we load the same graph 

175 # components as execution. 

176 for_execution=(summary is not None), 

177 ) 

178 ) is None: 

179 raise click.ClickException("QuantumGraph was empty; ERROR logs above should provide details.") 

180 # QuantumGraph-only summary call here since script.qgraph also called 

181 # by run methods. 

182 if summary: 

183 report = Report(qgraphSummary=qgraph._make_summary()) 

184 with open(summary, "w") as out: 

185 # Do not save fields that are not set. 

186 out.write(report.model_dump_json(exclude_none=True, indent=2)) 

187 

188 _unhandledShow(show, "qgraph") 

189 

190 

191@click.command(cls=PipetaskCommand, epilog=epilog) 

192@ctrlMpExecOpts.run_options() 

193@catch_and_exit 

194def run(ctx: click.Context, **kwargs: Any) -> None: 

195 """Build and execute pipeline and quantum graph.""" 

196 kwargs = collect_pipeline_actions(ctx, **kwargs) 

197 with coverage_context(kwargs): 

198 show = ShowInfo(kwargs.pop("show", [])) 

199 pipeline_graph_factory = script.build(**kwargs, show=show) 

200 if show.handled and not show.unhandled: 

201 print( 

202 "No quantum graph generated or pipeline executed. " 

203 "The --show option was given and all options were processed.", 

204 file=sys.stderr, 

205 ) 

206 return 

207 if (qgraph := script.qgraph(pipeline_graph_factory, for_execution=True, **kwargs, show=show)) is None: 

208 raise click.ClickException("QuantumGraph was empty; ERROR logs above should provide details.") 

209 _unhandledShow(show, "run") 

210 if show.handled: 

211 print( 

212 "No pipeline executed. The --show option was given and all options were processed.", 

213 file=sys.stderr, 

214 ) 

215 return 

216 script.run(qgraph, **kwargs) 

217 

218 

219@click.command(cls=PipetaskCommand) 

220@ctrlMpExecOpts.butler_config_option() 

221@ctrlMpExecOpts.collection_argument() 

222@confirm_option() 

223@ctrlMpExecOpts.recursive_option( 

224 help="""If the parent CHAINED collection has child CHAINED collections, 

225 search the children until nested chains that start with the parent's name 

226 are removed.""" 

227) 

228def purge(confirm: bool, **kwargs: Any) -> None: 

229 """Remove a CHAINED collection and its contained collections. 

230 

231 COLLECTION is the name of the chained collection to purge. it must not be a 

232 child of any other CHAINED collections 

233 

234 Child collections must be members of exactly one collection. 

235 

236 The collections that will be removed will be printed, there will be an 

237 option to continue or abort (unless using --no-confirm). 

238 """ 

239 confirmable.confirm(partial(script.purge, **kwargs), confirm) 

240 

241 

242@click.command(cls=PipetaskCommand) 

243@ctrlMpExecOpts.butler_config_option() 

244@ctrlMpExecOpts.collection_argument() 

245@confirm_option() 

246def cleanup(confirm: bool, **kwargs: Any) -> None: 

247 """Remove non-members of CHAINED collections. 

248 

249 Removes collections that start with the same name as a CHAINED 

250 collection but are not members of that collection. 

251 """ 

252 confirmable.confirm(partial(script.cleanup, **kwargs), confirm) 

253 

254 

255@click.command(cls=PipetaskCommand) 

256@repo_argument() 

257@ctrlMpExecOpts.qgraph_argument() 

258@ctrlMpExecOpts.config_search_path_option() 

259@ctrlMpExecOpts.qgraph_id_option() 

260@ctrlMpExecOpts.coverage_options() 

261def pre_exec_init_qbb(repo: str, qgraph: str, **kwargs: Any) -> None: 

262 """Execute pre-exec-init on Quantum-Backed Butler. 

263 

264 REPO is the location of the butler/registry config file. 

265 

266 QGRAPH is the path to a serialized Quantum Graph file. 

267 """ 

268 with coverage_context(kwargs): 

269 script.pre_exec_init_qbb(repo, qgraph, **kwargs) 

270 

271 

272@click.command(cls=PipetaskCommand) 

273@repo_argument() 

274@ctrlMpExecOpts.qgraph_argument() 

275@ctrlMpExecOpts.config_search_path_option() 

276@ctrlMpExecOpts.qgraph_id_option() 

277@ctrlMpExecOpts.qgraph_node_id_option() 

278@processes_option() 

279@ctrlMpExecOpts.pdb_option() 

280@ctrlMpExecOpts.profile_option() 

281@ctrlMpExecOpts.coverage_options() 

282@ctrlMpExecOpts.debug_option() 

283@ctrlMpExecOpts.start_method_option() 

284@ctrlMpExecOpts.timeout_option() 

285@ctrlMpExecOpts.fail_fast_option() 

286@ctrlMpExecOpts.raise_on_partial_outputs_option() 

287@ctrlMpExecOpts.summary_option() 

288@ctrlMpExecOpts.enable_implicit_threading_option() 

289@ctrlMpExecOpts.cores_per_quantum_option() 

290@ctrlMpExecOpts.memory_per_quantum_option() 

291@ctrlMpExecOpts.no_existing_outputs_option() 

292def run_qbb(repo: str, qgraph: str, **kwargs: Any) -> None: 

293 """Execute pipeline using Quantum-Backed Butler. 

294 

295 REPO is the location of the butler/registry config file. 

296 

297 QGRAPH is the path to a serialized Quantum Graph file. 

298 """ 

299 with coverage_context(kwargs): 

300 script.run_qbb(butler_config=repo, qgraph=qgraph, **kwargs) 

301 

302 

303@click.command(cls=PipetaskCommand) 

304@ctrlMpExecOpts.qgraph_argument() 

305@ctrlMpExecOpts.run_argument() 

306@ctrlMpExecOpts.output_qgraph_argument() 

307@ctrlMpExecOpts.metadata_run_key_option() 

308@ctrlMpExecOpts.update_graph_id_option() 

309def update_graph_run( 

310 qgraph: str, 

311 run: str, 

312 output_qgraph: str, 

313 metadata_run_key: str, 

314 update_graph_id: bool, 

315) -> None: 

316 """Update existing quantum graph with new output run name and re-generate 

317 output dataset IDs. 

318 

319 QGRAPH is the URL to a serialized Quantum Graph file. 

320 

321 RUN is the new RUN collection name for output graph. 

322 

323 OUTPUT_QGRAPH is the URL to store the updated Quantum Graph. 

324 """ 

325 script.update_graph_run(qgraph, run, output_qgraph, metadata_run_key, update_graph_id) 

326 

327 

328@click.command(cls=PipetaskCommand) 

329@repo_argument() 

330@click.argument("qgraphs", nargs=-1) 

331@collections_option() 

332@where_option() 

333@click.option( 

334 "--full-output-filename", 

335 default="", 

336 help="Output report as a file with this name. " 

337 "For pipetask report on one graph, this should be a yaml file. For multiple graphs " 

338 "or when using the --force-v2 option, this should be a json file. We will be " 

339 "deprecating the single-graph-only (QuantumGraphExecutionReport) option soon.", 

340) 

341@click.option("--logs/--no-logs", default=True, help="Get butler log datasets for extra information.") 

342@click.option( 

343 "--brief", 

344 default=False, 

345 is_flag=True, 

346 help="Only show counts in report (a brief summary). Note that counts are" 

347 " also printed to the screen when using the --full-output-filename option.", 

348) 

349@click.option( 

350 "--curse-failed-logs", 

351 is_flag=True, 

352 default=False, 

353 help="If log datasets are missing in v2 (QuantumProvenanceGraph), mark them as cursed", 

354) 

355@click.option( 

356 "--force-v2", 

357 is_flag=True, 

358 default=False, 

359 help="Use the QuantumProvenanceGraph instead of the QuantumGraphExecutionReport, " 

360 "even when there is only one qgraph. Otherwise, the QuantumGraphExecutionReport " 

361 "will run on one graph by default.", 

362) 

363@click.option( 

364 "--read-caveats", 

365 type=click.Choice(["exhaustive", "lazy", "none"], case_sensitive=False), 

366 default="lazy", 

367) 

368@click.option( 

369 "--use-qbb/--no-use-qbb", 

370 is_flag=True, 

371 default=True, 

372 help="Whether to use a quantum-backed butler for metadata and log reads.", 

373) 

374@click.option( 

375 "--view-graph", 

376 is_flag=True, 

377 default=False, 

378 help="Display pipeline processing status as a graph on stdout instead of a plain-text summary.", 

379) 

380@processes_option() 

381def report( 

382 repo: str, 

383 qgraphs: Sequence[str], 

384 collections: Sequence[str] | None, 

385 where: str, 

386 full_output_filename: str = "", 

387 logs: bool = True, 

388 brief: bool = False, 

389 curse_failed_logs: bool = False, 

390 force_v2: bool = False, 

391 read_caveats: str = "lazy", 

392 use_qbb: bool = True, 

393 processes: int = 1, 

394 view_graph: bool = False, 

395) -> None: 

396 """Summarize the state of executed quantum graph(s), with counts of failed, 

397 successful and expected quanta, as well as counts of output datasets and 

398 their query (visible/shadowed) states. Analyze one or more attempts at the 

399 same processing on the same dataquery-identified "group" and resolve 

400 recoveries and persistent failures. Identify mismatch errors between 

401 attempts. 

402 

403 Save the report as a file (``--full-output-filename``) or print it to 

404 stdout (default). If the terminal is overwhelmed with data_ids from 

405 failures try the ``--brief`` option. 

406 

407 Butler ``collections`` and ``where`` options are for use in 

408 `lsst.daf.butler.Registry.queryDatasets` if paring down the collections 

409 would be useful. Pass collections in order of most to least recent. By 

410 default the collections and query will be taken from the graphs. 

411 

412 REPO is the location of the butler/registry config file. 

413 

414 QGRAPHS is a sequence of links to serialized Quantum Graphs which have 

415 been executed and are to be analyzed. Pass the graphs in order of first to 

416 last executed. 

417 """ 

418 if any([force_v2, len(qgraphs) > 1, collections, where, curse_failed_logs]): 

419 script.report_v2( 

420 repo, 

421 qgraphs, 

422 collections, 

423 where, 

424 full_output_filename, 

425 logs, 

426 brief, 

427 curse_failed_logs, 

428 read_caveats=(read_caveats if read_caveats != "none" else None), # type: ignore[arg-type] 

429 use_qbb=use_qbb, 

430 n_cores=processes, 

431 view_graph=view_graph, 

432 ) 

433 else: 

434 assert len(qgraphs) == 1, "Cannot make a report without a quantum graph." 

435 script.report(repo, qgraphs[0], full_output_filename, logs, brief) 

436 

437 

438@click.command(cls=PipetaskCommand) 

439@click.argument("filenames", nargs=-1) 

440@click.option( 

441 "--full-output-filename", 

442 default="", 

443 help="Output report as a file with this name (json).", 

444) 

445@click.option( 

446 "--brief", 

447 default=False, 

448 is_flag=True, 

449 help="Only show counts in report (a brief summary). Note that counts are" 

450 " also printed to the screen when using the --full-output-filename option.", 

451) 

452def aggregate_reports( 

453 filenames: Sequence[str], full_output_filename: str | None, brief: bool = False 

454) -> None: 

455 """Aggregate pipetask report output on disjoint data-id groups into one 

456 Summary over common tasks and datasets. Intended for use when the same 

457 pipeline has been run over all groups (i.e., to aggregate all reports 

458 for a given step). This functionality is only compatible with reports 

459 from the `~lsst.pipe.base.quantum_provenance_graph.QuantumProvenanceGraph`, 

460 so the reports must be run over multiple groups or with the ``--force-v2`` 

461 option. 

462 

463 Save the report as a file (``--full-output-filename``) or print it to 

464 stdout (default). If the terminal is overwhelmed with data_ids from 

465 failures try the ``--brief`` option. 

466 

467 FILENAMES are the space-separated paths to json file output created by 

468 pipetask report. 

469 """ 

470 script.aggregate_reports(filenames, full_output_filename, brief)