Coverage for python / lsst / daf / butler / cli / butler.py: 35%

169 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-22 08:55 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27from __future__ import annotations 

28 

29__all__ = ( 

30 "ButlerCLI", 

31 "LoaderCLI", 

32 "cli", 

33 "main", 

34) 

35 

36import abc 

37import dataclasses 

38import functools 

39import logging 

40import os 

41import traceback 

42import types 

43from collections import defaultdict 

44from functools import cache 

45from importlib.metadata import entry_points 

46from typing import Any 

47 

48import click 

49import yaml 

50 

51from lsst.resources import ResourcePath 

52from lsst.utils import doImport 

53from lsst.utils.introspection import get_full_type_name 

54from lsst.utils.timer import time_this 

55 

56from .cliLog import CliLog 

57from .opt import log_file_option, log_label_option, log_level_option, log_tty_option, long_log_option 

58from .progress import ClickProgressHandler 

59 

60log = logging.getLogger(__name__) 

61 

62 

63@functools.lru_cache 

64def _importPlugin(pluginName: str) -> types.ModuleType | type | None | click.Command: 

65 """Import a plugin that contains Click commands. 

66 

67 Parameters 

68 ---------- 

69 pluginName : `str` 

70 An importable module whose __all__ parameter contains the commands 

71 that can be called. 

72 

73 Returns 

74 ------- 

75 An imported module or None 

76 The imported module, or None if the module could not be imported. 

77 

78 Notes 

79 ----- 

80 A cache is used in order to prevent repeated reports of failure 

81 to import a module that can be triggered by ``butler --help``. 

82 """ 

83 try: 

84 return doImport(pluginName) 

85 except Exception as err: 

86 log.warning("Could not import plugin from %s, skipping.", pluginName) 

87 log.debug( 

88 "Plugin import exception: %s\nTraceback:\n%s", 

89 err, 

90 "".join(traceback.format_tb(err.__traceback__)), 

91 ) 

92 return None 

93 

94 

95@dataclasses.dataclass(frozen=True) 

96class PluginCommand: 

97 """A click Command and the plugin it came from.""" 

98 

99 command: click.Command 

100 """The command (`click.Command`).""" 

101 source: str 

102 """Where the command came from (`str`).""" 

103 

104 

105class LoaderCLI(click.Group, abc.ABC): 

106 """Extends `click.MultiCommand`, which dispatches to subcommands, to load 

107 subcommands at runtime. 

108 

109 Parameters 

110 ---------- 

111 *args : `typing.Any` 

112 Arguments passed to parent constructor. 

113 **kwargs : `typing.Any` 

114 Keyword arguments passed to parent constructor. 

115 """ 

116 

117 def __init__(self, *args: Any, **kwargs: Any) -> None: 

118 super().__init__(*args, **kwargs) 

119 

120 @property 

121 @abc.abstractmethod 

122 def localCmdPkg(self) -> str: 

123 """Identifies the location of the commands that are in this 

124 package. 

125 

126 `getLocalCommands` assumes that the commands can be found in 

127 ``localCmdPkg.__all__``, if this is not the case then 

128 `getLocalCommands` should be overridden. 

129 

130 Returns 

131 ------- 

132 package : `str` 

133 The fully qualified location of this package. 

134 """ 

135 raise NotImplementedError() 

136 

137 def getLocalCommands(self) -> defaultdict[str, list[str | PluginCommand]]: 

138 """Get the commands offered by the local package. This assumes that the 

139 commands can be found in ``localCmdPkg.__all__``, if this is not the 

140 case then this function should be overridden. 

141 

142 Returns 

143 ------- 

144 commands : `collections.defaultdict` \ 

145 [`str`, `list` [`str` | `PluginCommand` ]] 

146 The key is the command name. The value is a list of package(s) that 

147 contains the command. 

148 """ 

149 commandsLocation = _importPlugin(self.localCmdPkg) 

150 if commandsLocation is None: 

151 # _importPlugins logs an error, don't need to do it again here. 

152 return defaultdict(list) 

153 assert hasattr(commandsLocation, "__all__"), f"Must define __all__ in {commandsLocation}" 

154 commands = [getattr(commandsLocation, name) for name in commandsLocation.__all__] 

155 return defaultdict( 

156 list, 

157 { 

158 command.name: [PluginCommand(command, get_full_type_name(commandsLocation))] 

159 for command in commands 

160 }, 

161 ) 

162 

163 def list_commands(self, ctx: click.Context) -> list[str]: 

164 """Get all the commands that can be called by the 

165 butler command, it is used to generate the --help output. 

166 

167 Used by Click. 

168 

169 Parameters 

170 ---------- 

171 ctx : `click.Context` 

172 The current Click context. 

173 

174 Returns 

175 ------- 

176 commands : `list` [`str`] 

177 The names of the commands that can be called by the butler command. 

178 """ 

179 self._setupLogging(ctx) 

180 commands = self._getCommands() 

181 self._raiseIfDuplicateCommands(commands) 

182 return sorted(commands) 

183 

184 def get_command(self, ctx: click.Context, name: str) -> click.Command | None: 

185 """Get a single command for execution. 

186 

187 Used by Click. 

188 

189 Parameters 

190 ---------- 

191 ctx : `click.Context` 

192 The current Click context. 

193 name : `str` 

194 The name of the command to return. 

195 

196 Returns 

197 ------- 

198 command : `click.Command` 

199 A Command that wraps a callable command function. 

200 """ 

201 self._setupLogging(ctx) 

202 commands = self._getCommands() 

203 if name not in commands: 

204 return None 

205 self._raiseIfDuplicateCommands(commands) 

206 command = commands[name][0] 

207 if isinstance(command, str): 

208 module_str = command + "." + self._cmdNameToFuncName(name) 

209 # The click.command decorator returns an instance of a class, which 

210 # is something that doImport is not expecting. We add it in as an 

211 # option here to appease mypy. 

212 with time_this(log, msg="Importing command %s (via %s)", args=(name, module_str)): 

213 plugin = _importPlugin(module_str) 

214 if not plugin: 

215 return None 

216 else: 

217 plugin = command.command 

218 if not isinstance(plugin, click.Command): 

219 raise RuntimeError( 

220 f"Command {name!r} loaded from {module_str} is not a click Command, is {type(plugin)}" 

221 ) 

222 return plugin 

223 

224 def _setupLogging(self, ctx: click.Context | None) -> None: 

225 """Init the logging system and config it for the command. 

226 

227 Subcommands may further configure the log settings. 

228 """ 

229 if isinstance(ctx, click.Context): 

230 CliLog.initLog( 

231 longlog=ctx.params.get(long_log_option.name(), False), 

232 log_tty=ctx.params.get(log_tty_option.name(), True), 

233 log_file=ctx.params.get(log_file_option.name(), ()), 

234 log_label=ctx.params.get(log_label_option.name(), ()), 

235 ) 

236 if log_level_option.name() in ctx.params: 

237 CliLog.setLogLevels(ctx.params[log_level_option.name()]) 

238 else: 

239 # This works around a bug in sphinx-click, where it passes in the 

240 # click.MultiCommand instead of the context. 

241 # https://github.com/click-contrib/sphinx-click/issues/70 

242 CliLog.initLog(longlog=False) 

243 logging.debug( 

244 "The passed-in context was not a click.Context, could not determine --long-log or " 

245 "--log-level values." 

246 ) 

247 

248 @classmethod 

249 def getPluginList(cls) -> list[ResourcePath]: 

250 """Get the list of importable yaml files that contain cli data for this 

251 command. 

252 

253 Returns 

254 ------- 

255 `list` [`lsst.resources.ResourcePath`] 

256 The list of files that contain yaml data about a cli plugin. 

257 """ 

258 yaml_files = [] 

259 if hasattr(cls, "pluginEnvVar"): 

260 pluginModules = os.environ.get(cls.pluginEnvVar) 

261 if pluginModules: 

262 yaml_files.extend([ResourcePath(p) for p in pluginModules.split(":") if p != ""]) 

263 

264 return yaml_files 

265 

266 @classmethod 

267 def _funcNameToCmdName(cls, functionName: str) -> str: 

268 """Convert function name to the butler command name: change 

269 underscores, (used in functions) to dashes (used in commands), and 

270 change local-package command names that conflict with python keywords 

271 to a legal function name. 

272 """ 

273 return functionName.replace("_", "-") 

274 

275 @classmethod 

276 def _cmdNameToFuncName(cls, commandName: str) -> str: 

277 """Convert butler command name to function name: change dashes (used in 

278 commands) to underscores (used in functions), and for local-package 

279 commands names that conflict with python keywords, change the local, 

280 legal, function name to the command name. 

281 """ 

282 return commandName.replace("-", "_") 

283 

284 @staticmethod 

285 def _mergeCommandLists( 

286 a: defaultdict[str, list[str | PluginCommand]], b: defaultdict[str, list[str | PluginCommand]] 

287 ) -> defaultdict[str, list[str | PluginCommand]]: 

288 """Combine two dicts whose keys are strings (command name) and values 

289 are list of string (the package(s) that provide the named command). 

290 

291 Parameters 

292 ---------- 

293 a : `collections.defaultdict` \ 

294 [`str`, `list` [`str` | `PluginCommand` ]] 

295 The key is the command name. The value is a list of package(s) that 

296 contains the command. 

297 b : (same as a) 

298 

299 Returns 

300 ------- 

301 commands : `collections.defaultdict` \ 

302 [`str`: [`str` | `PluginCommand` ]] 

303 For convenience, returns a extended with b. ('a' is modified in 

304 place.) 

305 """ 

306 for key, val in b.items(): 

307 a[key].extend(val) 

308 return a 

309 

310 @classmethod 

311 def _getPluginCommands(cls) -> defaultdict[str, list[str | PluginCommand]]: 

312 """Get the commands offered by plugin packages. 

313 

314 Returns 

315 ------- 

316 commands : `collections.defaultdict` [`str`, `list` [`str`]] 

317 The key is the command name. The value is a list of package(s) that 

318 contains the command. 

319 

320 Notes 

321 ----- 

322 Assumes that if entry points are defined, the plugin environment 

323 variable will not be defined for that same package. 

324 """ 

325 commands: defaultdict[str, list[str | PluginCommand]] = defaultdict(list) 

326 for pluginName in cls.getPluginList(): 

327 try: 

328 resources = defaultdict(list, yaml.safe_load(pluginName.read())) 

329 except Exception as err: 

330 log.warning("Error loading commands from %s, skipping. %s", pluginName, err) 

331 continue 

332 if "cmd" not in resources: 

333 log.warning("No commands found in %s, skipping.", pluginName) 

334 continue 

335 pluginCommands = {cmd: [resources["cmd"]["import"]] for cmd in resources["cmd"]["commands"]} 

336 cls._mergeCommandLists(commands, defaultdict(list, pluginCommands)) 

337 

338 if hasattr(cls, "entryPoint"): 

339 plugins = entry_points(group=cls.entryPoint) 

340 for p in plugins: 

341 try: 

342 func = p.load() 

343 except Exception as err: 

344 log.warning("Could not import plugin from entry point %s, skipping.", p) 

345 log.debug( 

346 "Plugin import exception: %s\nTraceback:\n%s", 

347 err, 

348 "".join(traceback.format_tb(err.__traceback__)), 

349 ) 

350 continue 

351 func_name = get_full_type_name(func) 

352 pluginCommands = {cmd.name: [PluginCommand(cmd, func_name)] for cmd in func()} 

353 cls._mergeCommandLists(commands, defaultdict(list, pluginCommands)) 

354 

355 return commands 

356 

357 @cache 

358 def _getCommands(self) -> defaultdict[str, list[str | PluginCommand]]: 

359 """Get the commands offered by daf_butler and plugin packages. 

360 

361 Returns 

362 ------- 

363 commands : `collections.defaultdict` \ 

364 [`str`, `list` [`str` | `PluginCommand` ]] 

365 The key is the command name. The value is a list of package(s) that 

366 contains the command. 

367 """ 

368 return self._mergeCommandLists(self.getLocalCommands(), self._getPluginCommands()) 

369 

370 @staticmethod 

371 def _raiseIfDuplicateCommands(commands: defaultdict[str, list[str | PluginCommand]]) -> None: 

372 """If any provided command is offered by more than one package raise an 

373 exception. 

374 

375 Parameters 

376 ---------- 

377 commands : `collections.defaultdict` \ 

378 [`str`, `list` [`str` | `PLuginCommand` ]] 

379 The key is the command name. The value is a list of package(s) that 

380 contains the command. 

381 

382 Raises 

383 ------ 

384 click.ClickException 

385 Raised if a command is offered by more than one package, with an 

386 error message to be displayed to the user. 

387 """ 

388 msg = "" 

389 for command, packages in commands.items(): 

390 if len(packages) > 1: 

391 pkg_names: list[str] = [] 

392 for p in packages: 

393 if not isinstance(p, str): 

394 p = p.source 

395 pkg_names.append(p) 

396 msg += f"Command '{command}' exists in packages {', '.join(pkg_names)}. " 

397 if msg: 

398 raise click.ClickException(message=msg + "Duplicate commands are not supported, aborting.") 

399 

400 

401class ButlerCLI(LoaderCLI): 

402 """Specialized command loader implementing the ``butler`` command.""" 

403 

404 localCmdPkg = "lsst.daf.butler.cli.cmd" 

405 

406 pluginEnvVar = "DAF_BUTLER_PLUGINS" 

407 entryPoint = "butler.cli" 

408 

409 @classmethod 

410 def _funcNameToCmdName(cls, functionName: str) -> str: 

411 # Docstring inherited from base class. 

412 

413 # The "import" command name and "butler_import" function name are 

414 # defined in cli/cmd/commands.py, and if those names are changed they 

415 # must be changed here as well. 

416 # It is expected that there will be very few butler command names that 

417 # need to be changed because of e.g. conflicts with python keywords (as 

418 # is done here and in _cmdNameToFuncName for the 'import' command). If 

419 # this becomes a common need then some way of doing this should be 

420 # invented that is better than hard coding the function names into 

421 # these conversion functions. An extension of the 'cli/resources.yaml' 

422 # file (as is currently used in obs_base) might be a good way to do it. 

423 if functionName == "butler_import": 

424 return "import" 

425 return super()._funcNameToCmdName(functionName) 

426 

427 @classmethod 

428 def _cmdNameToFuncName(cls, commandName: str) -> str: 

429 # Docstring inherited from base class. 

430 if commandName == "import": 

431 return "butler_import" 

432 return super()._cmdNameToFuncName(commandName) 

433 

434 

435class UncachedButlerCLI(ButlerCLI): 

436 """ButlerCLI that can be used where caching of the commands is disabled.""" 

437 

438 def _getCommands(self) -> defaultdict[str, list[str | PluginCommand]]: # type: ignore[override] 

439 """Get the commands offered by daf_butler and plugin packages. 

440 

441 Returns 

442 ------- 

443 commands : `collections.defaultdict` \ 

444 [`str`, `list` [`str` | `PluginCommand` ]] 

445 The key is the command name. The value is a list of package(s) that 

446 contains the command. 

447 """ 

448 return self._mergeCommandLists(self.getLocalCommands(), self._getPluginCommands()) 

449 

450 

451@click.command(cls=ButlerCLI, context_settings=dict(help_option_names=["-h", "--help"])) 

452@log_level_option() 

453@long_log_option() 

454@log_file_option() 

455@log_tty_option() 

456@log_label_option() 

457@ClickProgressHandler.option 

458def cli(log_level: str, long_log: bool, log_file: str, log_tty: bool, log_label: str, progress: bool) -> None: 

459 """Butler command-line tools. 

460 

461 Log options apply to all subcommands. 

462 """ 

463 pass 

464 

465 

466def main() -> click.Command: 

467 """Return main entry point for command-line.""" 

468 return cli()