Coverage for python / lsst / pipe / base / cli / cmd / commands.py: 73%

110 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 08:49 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28import functools 

29import operator 

30from collections.abc import Iterable 

31from typing import Any 

32 

33import click 

34 

35from lsst.daf.butler.cli.opt import ( 

36 dataset_type_option, 

37 options_file_option, 

38 register_dataset_types_option, 

39 repo_argument, 

40 transfer_dimensions_option, 

41 transfer_option, 

42) 

43from lsst.daf.butler.cli.utils import ButlerCommand, split_commas, unwrap 

44 

45from ... import script 

46from ..._status import QuantumAttemptStatus, QuantumSuccessCaveats 

47from ...quantum_graph import aggregator 

48from ..opt import instrument_argument, update_output_chain_option 

49 

50 

51@click.command(short_help="Add an instrument definition to the repository", cls=ButlerCommand) 

52@repo_argument(required=True) 

53@instrument_argument(required=True, nargs=-1, help="The fully-qualified name of an Instrument subclass.") 

54@click.option("--update", is_flag=True) 

55def register_instrument(*args: Any, **kwargs: Any) -> None: 

56 """Add an instrument to the data repository.""" 

57 script.register_instrument(*args, **kwargs) 

58 

59 

60@click.command(short_help="Transfer datasets from a graph to a butler.", cls=ButlerCommand) 

61@click.argument("graph", required=True) 

62@click.argument("dest", required=True) 

63@register_dataset_types_option() 

64@transfer_dimensions_option(default=False) 

65@update_output_chain_option() 

66@click.option( 

67 "--dry-run", is_flag=True, default=False, help="Run the transfer but do not update the destination butler" 

68) 

69@dataset_type_option(help="Subset of dataset types to transfer from graph.") 

70@options_file_option() 

71def transfer_from_graph(**kwargs: Any) -> None: 

72 """Transfer datasets from a quantum graph to a destination butler. 

73 

74 GRAPH is a URI to the source quantum graph file. 

75 

76 DEST is a URI to the Butler repository that will receive copies of the 

77 datasets. 

78 """ 

79 number = script.transfer_from_graph(**kwargs) 

80 print(f"Number of datasets transferred: {number}") 

81 

82 

83@click.command(short_help="Make Zip archive from output files using graph.", cls=ButlerCommand) 

84@click.argument("graph", required=True) 

85@repo_argument( 

86 required=True, 

87 help="REPO is a URI to a butler configuration that is used to configure " 

88 "the datastore of the quantum-backed butler.", 

89) 

90@click.argument("dest", required=True) 

91@dataset_type_option(help="Dataset types to include in Zip archive.") 

92@options_file_option() 

93def zip_from_graph(**kwargs: Any) -> None: 

94 """Transfer datasets from a quantum graph to a Zip archive. 

95 

96 GRAPH is a URI to the source quantum graph file to use when building the 

97 Zip archive. 

98 

99 DEST is a directory to write the Zip archive. 

100 """ 

101 zip = script.zip_from_graph(**kwargs) 

102 print(f"Zip archive written to {zip}") 

103 

104 

105@click.command(short_help="Retrieve artifacts from subset of graph.", cls=ButlerCommand) 

106@click.argument("graph", required=True) 

107@repo_argument( 

108 required=True, 

109 help="REPO is a URI to a butler configuration that is used to configure " 

110 "the datastore of the quantum-backed butler.", 

111) 

112@click.argument("dest", required=True) 

113@transfer_option() 

114@click.option( 

115 "--preserve-path/--no-preserve-path", 

116 is_flag=True, 

117 default=True, 

118 help="Preserve the datastore path to the artifact at the destination.", 

119) 

120@click.option( 

121 "--clobber/--no-clobber", 

122 is_flag=True, 

123 default=False, 

124 help="If clobber, overwrite files if they exist locally.", 

125) 

126@click.option( 

127 "--qgraph-node-id", 

128 callback=split_commas, 

129 multiple=True, 

130 help=unwrap( 

131 """Only load a specified set of nodes when graph is 

132 loaded from a file, nodes are identified by UUID 

133 values. One or more comma-separated strings are 

134 accepted. By default all nodes are loaded. Ignored if 

135 graph is not loaded from a file.""" 

136 ), 

137) 

138@click.option( 

139 "--include-inputs/--no-include-inputs", 

140 is_flag=True, 

141 default=True, 

142 help="Whether to include input datasets in retrieval.", 

143) 

144@click.option( 

145 "--include-outputs/--no-include-outputs", 

146 is_flag=True, 

147 default=True, 

148 help="Whether to include output datasets in retrieval.", 

149) 

150@options_file_option() 

151def retrieve_artifacts_for_quanta(**kwargs: Any) -> None: 

152 """Retrieve artifacts from given quanta defined in quantum graph. 

153 

154 GRAPH is a URI to the source quantum graph file to use when building the 

155 Zip archive. 

156 

157 DEST is a directory to write the Zip archive. 

158 """ 

159 artifacts = script.retrieve_artifacts_for_quanta(**kwargs) 

160 print(f"Written {len(artifacts)} artifacts to {kwargs['dest']}.") 

161 

162 

163_AGGREGATOR_DEFAULTS = aggregator.AggregatorConfig() 

164 

165 

166@click.command(short_help="Scan for the outputs of an active or completed quantum graph.", cls=ButlerCommand) 

167@click.argument("predicted_graph", required=True) 

168@repo_argument(required=True, help="Path or alias for the butler repository.") 

169@click.option( 

170 "-o", 

171 "--output", 

172 "output_path", 

173 default=_AGGREGATOR_DEFAULTS.output_path, 

174 help=( 

175 "Path to the output provenance quantum graph. THIS OPTION IS FOR " 

176 "DEVELOPMENT AND DEBUGGING ONLY. IT MAY BE REMOVED IN THE FUTURE." 

177 ), 

178) 

179@click.option( 

180 "--processes", 

181 "-j", 

182 "n_processes", 

183 default=_AGGREGATOR_DEFAULTS.n_processes, 

184 type=click.IntRange(min=1), 

185 help="Number of processes to use.", 

186) 

187@click.option( 

188 "--incomplete/--complete", 

189 "incomplete", 

190 default=_AGGREGATOR_DEFAULTS.incomplete, 

191 help="Whether execution has completed (and failures cannot be retried).", 

192) 

193@click.option( 

194 "--dry-run", 

195 is_flag=True, 

196 default=_AGGREGATOR_DEFAULTS.dry_run, 

197 help="Do not actually perform any central database ingests.", 

198) 

199@click.option( 

200 "--interactive-status/--no-interactive-status", 

201 "interactive_status", 

202 default=_AGGREGATOR_DEFAULTS.interactive_status, 

203 help="Use progress bars for status reporting instead of periodic logging.", 

204) 

205@click.option( 

206 "--log-status-interval", 

207 type=int, 

208 default=_AGGREGATOR_DEFAULTS.log_status_interval, 

209 help="Interval (in seconds) between periodic logger status updates.", 

210) 

211@click.option( 

212 "--register-dataset-types/--no-register-dataset-types", 

213 default=_AGGREGATOR_DEFAULTS.register_dataset_types, 

214 help="Register output dataset types.", 

215) 

216@click.option( 

217 "--update-output-chain/--no-update-output-chain", 

218 default=_AGGREGATOR_DEFAULTS.update_output_chain, 

219 help="Prepend the output RUN collection to the output CHAINED collection.", 

220) 

221@click.option( 

222 "--worker-log-dir", 

223 type=str, 

224 default=_AGGREGATOR_DEFAULTS.worker_log_dir, 

225 help="Path to a directory (POSIX only) for parallel worker logs.", 

226) 

227@click.option( 

228 "--worker-log-level", 

229 type=str, 

230 default=_AGGREGATOR_DEFAULTS.worker_log_level, 

231 help="Log level for worker processes/threads (use DEBUG for per-quantum messages).", 

232) 

233@click.option( 

234 "--zstd-level", 

235 type=int, 

236 default=_AGGREGATOR_DEFAULTS.zstd_level, 

237 help="Compression level for the provenance quantum graph file.", 

238) 

239@click.option( 

240 "--zstd-dict-size", 

241 type=int, 

242 default=_AGGREGATOR_DEFAULTS.zstd_dict_size, 

243 help="Size (in bytes) of the ZStandard compression dictionary.", 

244) 

245@click.option( 

246 "--zstd-dict-n-inputs", 

247 type=int, 

248 default=_AGGREGATOR_DEFAULTS.zstd_dict_n_inputs, 

249 help=("Number of samples of each type to include in ZStandard compression dictionary training."), 

250) 

251@click.option( 

252 "--mock-storage-classes/--no-mock-storage-classes", 

253 default=_AGGREGATOR_DEFAULTS.mock_storage_classes, 

254 help="Enable support for storage classes created by the lsst.pipe.base.tests.mocks package.", 

255) 

256@click.option( 

257 "--promise-ingest-graph/--no-promise-ingest-graph", 

258 default=_AGGREGATOR_DEFAULTS.promise_ingest_graph, 

259 help=( 

260 "Promise to run 'butler ingest-graph' later, allowing aggregate-graph " 

261 "to skip metadata/log/config ingestion for now." 

262 ), 

263) 

264def aggregate_graph(predicted_graph: str, repo: str, **kwargs: Any) -> None: 

265 """Scan for quantum graph's outputs to gather provenance, ingest datasets 

266 into the central butler repository, and delete datasets that are no 

267 longer needed. 

268 """ 

269 # It'd be nice to allow to the user to provide a path to an 

270 # AggregatorConfig JSON file for options that weren't provided, but Click 

271 # 8.1 fundamentally cannot handle flag options that default to None rather 

272 # than True or False (i.e. so they fall back to the config value when not 

273 # set). It's not clear whether Click 8.2.x has actually fixed this; Click 

274 # 8.2.0 tried but caused new problems. 

275 

276 config = aggregator.AggregatorConfig(**kwargs) 

277 try: 

278 aggregator.aggregate_graph(predicted_graph, repo, config) 

279 except aggregator.FatalWorkerError as err: 

280 # When this exception is raised, we'll have already logged the relevant 

281 # traceback from a separate worker. 

282 raise click.ClickException(str(err)) from None 

283 

284 

285@click.command( 

286 short_help="Ingest a provenance quantum graph into a butler.", 

287 cls=ButlerCommand, 

288) 

289@repo_argument(required=True, help="Path or alias for the butler repository.") 

290@click.argument("provenance_graph", required=False) 

291@transfer_option(default="move") 

292@click.option("--batch-size", default=10000, help="How many datasets to process in each transaction.") 

293@click.option( 

294 "--output-run", 

295 default=None, 

296 help=( 

297 "Name of the output RUN collection. Must be provided if the provenance graph is not" 

298 " provided (so the graph can be found in the butler)." 

299 ), 

300) 

301def ingest_graph( 

302 *, 

303 repo: str, 

304 provenance_graph: str | None, 

305 transfer: str | None, 

306 batch_size: int, 

307 output_run: str | None, 

308) -> None: 

309 """Ingest a provenance graph into a butler repository.""" 

310 from ...quantum_graph.ingest_graph import ingest_graph as ingest_graph_py 

311 

312 ingest_graph_py(repo, provenance_graph, transfer=transfer, batch_size=batch_size, output_run=output_run) 

313 

314 

315@click.command( 

316 short_help="Print and write provenance reports.", 

317 cls=ButlerCommand, 

318) 

319@click.argument("repo_or_qg") 

320@click.argument("collection", required=False, default=None) 

321@click.option( 

322 "--state", 

323 multiple=True, 

324 type=click.Choice(QuantumAttemptStatus), 

325 help=( 

326 "Additional quantum state to include in the status report and data ID tables " 

327 "(FAILED, ABORTED, and ABORTED_SUCCESS are included by default)." 

328 ), 

329) 

330@click.option( 

331 "--no-state", 

332 multiple=True, 

333 type=str, 

334 metavar="STATE", 

335 help="Quantum state to drop from in status report and data ID tables (same options as --state).", 

336) 

337@click.option( 

338 "--status-report", 

339 default=None, 

340 metavar="URI", 

341 help="File or URI (.json) for a detailed report (with data IDs) on quanta with certain states.", 

342) 

343@click.option( 

344 "--quantum-table/--no-quantum-table", 

345 default=True, 

346 help="Whether to print summary of quantum status counts to STDOUT.", 

347) 

348@click.option( 

349 "--exception-table/--no-exception-table", 

350 default=True, 

351 help="Whether to print summary of exception type counts STDOUT.", 

352) 

353@click.option( 

354 "--caveat", 

355 multiple=True, 

356 type=click.Choice(QuantumSuccessCaveats), 

357 help=( 

358 "Include successful quanta in the status report if they have this caveat. " 

359 "May be passed multiple times; any matching caveat is included. " 

360 "Passing this option implicitly adds '--state SUCCESSFUL'." 

361 ), 

362) 

363@click.option( 

364 "--data-id-table-dir", 

365 default=None, 

366 metavar="URI", 

367 help=( 

368 "Directory (may be a URI) for a tree of data ID tables for each " 

369 "task label, status, and exception type combination in the status report." 

370 ), 

371) 

372def provenance_report( 

373 *, 

374 repo_or_qg: str, 

375 collection: str | None, 

376 state: Iterable[QuantumAttemptStatus], 

377 no_state: Iterable[str], 

378 status_report: str | None, 

379 quantum_table: bool = False, 

380 exception_table: bool = False, 

381 caveat: Iterable[QuantumSuccessCaveats], 

382 data_id_table_dir: str | None, 

383) -> None: 

384 """Read a provenance quantum graph from a butler or file and use it to 

385 generate reports. 

386 

387 REPO_OR_QG is a path or alias for the butler repository (if reading an 

388 ingested graph, as indicated by passing COLLECTION), or the path to a 

389 provenance quantum graph file. 

390 """ 

391 from ...quantum_graph import ProvenanceQuantumGraph 

392 

393 states = set(state) 

394 states.add(QuantumAttemptStatus.FAILED) 

395 states.add(QuantumAttemptStatus.ABORTED) 

396 states.add(QuantumAttemptStatus.ABORTED_SUCCESS) 

397 for state_name in no_state: 

398 states.discard(QuantumAttemptStatus.__members__[state_name]) 

399 with_caveats: QuantumSuccessCaveats | None = None 

400 if caveat: 

401 states.add(QuantumAttemptStatus.SUCCESSFUL) 

402 with_caveats = functools.reduce( 

403 operator.__or__, 

404 caveat, 

405 QuantumSuccessCaveats.NO_CAVEATS, 

406 ) 

407 with ProvenanceQuantumGraph.from_args(repo_or_qg, collection=collection, datasets=()) as (graph, _): 

408 graph.make_many_reports( 

409 status_report_file=status_report, 

410 states=states, 

411 print_quantum_table=quantum_table, 

412 print_exception_table=exception_table, 

413 with_caveats=with_caveats, 

414 data_id_table_dir=data_id_table_dir, 

415 )