Coverage for python / lsst / daf / butler / cli / butler.py: 35%
169 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:18 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:18 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
27from __future__ import annotations
29__all__ = (
30 "ButlerCLI",
31 "LoaderCLI",
32 "cli",
33 "main",
34)
36import abc
37import dataclasses
38import functools
39import logging
40import os
41import traceback
42import types
43from collections import defaultdict
44from functools import cache
45from importlib.metadata import entry_points
46from typing import Any
48import click
49import yaml
51from lsst.resources import ResourcePath
52from lsst.utils import doImport
53from lsst.utils.introspection import get_full_type_name
54from lsst.utils.timer import time_this
56from .cliLog import CliLog
57from .opt import log_file_option, log_label_option, log_level_option, log_tty_option, long_log_option
58from .progress import ClickProgressHandler
60log = logging.getLogger(__name__)
63@functools.lru_cache
64def _importPlugin(pluginName: str) -> types.ModuleType | type | None | click.Command:
65 """Import a plugin that contains Click commands.
67 Parameters
68 ----------
69 pluginName : `str`
70 An importable module whose __all__ parameter contains the commands
71 that can be called.
73 Returns
74 -------
75 An imported module or None
76 The imported module, or None if the module could not be imported.
78 Notes
79 -----
80 A cache is used in order to prevent repeated reports of failure
81 to import a module that can be triggered by ``butler --help``.
82 """
83 try:
84 return doImport(pluginName)
85 except Exception as err:
86 log.warning("Could not import plugin from %s, skipping.", pluginName)
87 log.debug(
88 "Plugin import exception: %s\nTraceback:\n%s",
89 err,
90 "".join(traceback.format_tb(err.__traceback__)),
91 )
92 return None
95@dataclasses.dataclass(frozen=True)
96class PluginCommand:
97 """A click Command and the plugin it came from."""
99 command: click.Command
100 """The command (`click.Command`)."""
101 source: str
102 """Where the command came from (`str`)."""
105class LoaderCLI(click.Group, abc.ABC):
106 """Extends `click.MultiCommand`, which dispatches to subcommands, to load
107 subcommands at runtime.
109 Parameters
110 ----------
111 *args : `typing.Any`
112 Arguments passed to parent constructor.
113 **kwargs : `typing.Any`
114 Keyword arguments passed to parent constructor.
115 """
117 def __init__(self, *args: Any, **kwargs: Any) -> None:
118 super().__init__(*args, **kwargs)
120 @property
121 @abc.abstractmethod
122 def localCmdPkg(self) -> str:
123 """Identifies the location of the commands that are in this
124 package.
126 `getLocalCommands` assumes that the commands can be found in
127 ``localCmdPkg.__all__``, if this is not the case then
128 `getLocalCommands` should be overridden.
130 Returns
131 -------
132 package : `str`
133 The fully qualified location of this package.
134 """
135 raise NotImplementedError()
137 def getLocalCommands(self) -> defaultdict[str, list[str | PluginCommand]]:
138 """Get the commands offered by the local package. This assumes that the
139 commands can be found in ``localCmdPkg.__all__``, if this is not the
140 case then this function should be overridden.
142 Returns
143 -------
144 commands : `collections.defaultdict` \
145 [`str`, `list` [`str` | `PluginCommand` ]]
146 The key is the command name. The value is a list of package(s) that
147 contains the command.
148 """
149 commandsLocation = _importPlugin(self.localCmdPkg)
150 if commandsLocation is None:
151 # _importPlugins logs an error, don't need to do it again here.
152 return defaultdict(list)
153 assert hasattr(commandsLocation, "__all__"), f"Must define __all__ in {commandsLocation}"
154 commands = [getattr(commandsLocation, name) for name in commandsLocation.__all__]
155 return defaultdict(
156 list,
157 {
158 command.name: [PluginCommand(command, get_full_type_name(commandsLocation))]
159 for command in commands
160 },
161 )
163 def list_commands(self, ctx: click.Context) -> list[str]:
164 """Get all the commands that can be called by the
165 butler command, it is used to generate the --help output.
167 Used by Click.
169 Parameters
170 ----------
171 ctx : `click.Context`
172 The current Click context.
174 Returns
175 -------
176 commands : `list` [`str`]
177 The names of the commands that can be called by the butler command.
178 """
179 self._setupLogging(ctx)
180 commands = self._getCommands()
181 self._raiseIfDuplicateCommands(commands)
182 return sorted(commands)
184 def get_command(self, ctx: click.Context, name: str) -> click.Command | None:
185 """Get a single command for execution.
187 Used by Click.
189 Parameters
190 ----------
191 ctx : `click.Context`
192 The current Click context.
193 name : `str`
194 The name of the command to return.
196 Returns
197 -------
198 command : `click.Command`
199 A Command that wraps a callable command function.
200 """
201 self._setupLogging(ctx)
202 commands = self._getCommands()
203 if name not in commands:
204 return None
205 self._raiseIfDuplicateCommands(commands)
206 command = commands[name][0]
207 if isinstance(command, str):
208 module_str = command + "." + self._cmdNameToFuncName(name)
209 # The click.command decorator returns an instance of a class, which
210 # is something that doImport is not expecting. We add it in as an
211 # option here to appease mypy.
212 with time_this(log, msg="Importing command %s (via %s)", args=(name, module_str)):
213 plugin = _importPlugin(module_str)
214 if not plugin:
215 return None
216 else:
217 plugin = command.command
218 if not isinstance(plugin, click.Command):
219 raise RuntimeError(
220 f"Command {name!r} loaded from {module_str} is not a click Command, is {type(plugin)}"
221 )
222 return plugin
224 def _setupLogging(self, ctx: click.Context | None) -> None:
225 """Init the logging system and config it for the command.
227 Subcommands may further configure the log settings.
228 """
229 if isinstance(ctx, click.Context):
230 CliLog.initLog(
231 longlog=ctx.params.get(long_log_option.name(), False),
232 log_tty=ctx.params.get(log_tty_option.name(), True),
233 log_file=ctx.params.get(log_file_option.name(), ()),
234 log_label=ctx.params.get(log_label_option.name(), ()),
235 )
236 if log_level_option.name() in ctx.params:
237 CliLog.setLogLevels(ctx.params[log_level_option.name()])
238 else:
239 # This works around a bug in sphinx-click, where it passes in the
240 # click.MultiCommand instead of the context.
241 # https://github.com/click-contrib/sphinx-click/issues/70
242 CliLog.initLog(longlog=False)
243 logging.debug(
244 "The passed-in context was not a click.Context, could not determine --long-log or "
245 "--log-level values."
246 )
248 @classmethod
249 def getPluginList(cls) -> list[ResourcePath]:
250 """Get the list of importable yaml files that contain cli data for this
251 command.
253 Returns
254 -------
255 `list` [`lsst.resources.ResourcePath`]
256 The list of files that contain yaml data about a cli plugin.
257 """
258 yaml_files = []
259 if hasattr(cls, "pluginEnvVar"):
260 pluginModules = os.environ.get(cls.pluginEnvVar)
261 if pluginModules:
262 yaml_files.extend([ResourcePath(p) for p in pluginModules.split(":") if p != ""])
264 return yaml_files
266 @classmethod
267 def _funcNameToCmdName(cls, functionName: str) -> str:
268 """Convert function name to the butler command name: change
269 underscores, (used in functions) to dashes (used in commands), and
270 change local-package command names that conflict with python keywords
271 to a legal function name.
272 """
273 return functionName.replace("_", "-")
275 @classmethod
276 def _cmdNameToFuncName(cls, commandName: str) -> str:
277 """Convert butler command name to function name: change dashes (used in
278 commands) to underscores (used in functions), and for local-package
279 commands names that conflict with python keywords, change the local,
280 legal, function name to the command name.
281 """
282 return commandName.replace("-", "_")
284 @staticmethod
285 def _mergeCommandLists(
286 a: defaultdict[str, list[str | PluginCommand]], b: defaultdict[str, list[str | PluginCommand]]
287 ) -> defaultdict[str, list[str | PluginCommand]]:
288 """Combine two dicts whose keys are strings (command name) and values
289 are list of string (the package(s) that provide the named command).
291 Parameters
292 ----------
293 a : `collections.defaultdict` \
294 [`str`, `list` [`str` | `PluginCommand` ]]
295 The key is the command name. The value is a list of package(s) that
296 contains the command.
297 b : (same as a)
299 Returns
300 -------
301 commands : `collections.defaultdict` \
302 [`str`: [`str` | `PluginCommand` ]]
303 For convenience, returns a extended with b. ('a' is modified in
304 place.)
305 """
306 for key, val in b.items():
307 a[key].extend(val)
308 return a
310 @classmethod
311 def _getPluginCommands(cls) -> defaultdict[str, list[str | PluginCommand]]:
312 """Get the commands offered by plugin packages.
314 Returns
315 -------
316 commands : `collections.defaultdict` [`str`, `list` [`str`]]
317 The key is the command name. The value is a list of package(s) that
318 contains the command.
320 Notes
321 -----
322 Assumes that if entry points are defined, the plugin environment
323 variable will not be defined for that same package.
324 """
325 commands: defaultdict[str, list[str | PluginCommand]] = defaultdict(list)
326 for pluginName in cls.getPluginList():
327 try:
328 resources = defaultdict(list, yaml.safe_load(pluginName.read()))
329 except Exception as err:
330 log.warning("Error loading commands from %s, skipping. %s", pluginName, err)
331 continue
332 if "cmd" not in resources:
333 log.warning("No commands found in %s, skipping.", pluginName)
334 continue
335 pluginCommands = {cmd: [resources["cmd"]["import"]] for cmd in resources["cmd"]["commands"]}
336 cls._mergeCommandLists(commands, defaultdict(list, pluginCommands))
338 if hasattr(cls, "entryPoint"):
339 plugins = entry_points(group=cls.entryPoint)
340 for p in plugins:
341 try:
342 func = p.load()
343 except Exception as err:
344 log.warning("Could not import plugin from entry point %s, skipping.", p)
345 log.debug(
346 "Plugin import exception: %s\nTraceback:\n%s",
347 err,
348 "".join(traceback.format_tb(err.__traceback__)),
349 )
350 continue
351 func_name = get_full_type_name(func)
352 pluginCommands = {cmd.name: [PluginCommand(cmd, func_name)] for cmd in func()}
353 cls._mergeCommandLists(commands, defaultdict(list, pluginCommands))
355 return commands
357 @cache
358 def _getCommands(self) -> defaultdict[str, list[str | PluginCommand]]:
359 """Get the commands offered by daf_butler and plugin packages.
361 Returns
362 -------
363 commands : `collections.defaultdict` \
364 [`str`, `list` [`str` | `PluginCommand` ]]
365 The key is the command name. The value is a list of package(s) that
366 contains the command.
367 """
368 return self._mergeCommandLists(self.getLocalCommands(), self._getPluginCommands())
370 @staticmethod
371 def _raiseIfDuplicateCommands(commands: defaultdict[str, list[str | PluginCommand]]) -> None:
372 """If any provided command is offered by more than one package raise an
373 exception.
375 Parameters
376 ----------
377 commands : `collections.defaultdict` \
378 [`str`, `list` [`str` | `PLuginCommand` ]]
379 The key is the command name. The value is a list of package(s) that
380 contains the command.
382 Raises
383 ------
384 click.ClickException
385 Raised if a command is offered by more than one package, with an
386 error message to be displayed to the user.
387 """
388 msg = ""
389 for command, packages in commands.items():
390 if len(packages) > 1:
391 pkg_names: list[str] = []
392 for p in packages:
393 if not isinstance(p, str):
394 p = p.source
395 pkg_names.append(p)
396 msg += f"Command '{command}' exists in packages {', '.join(pkg_names)}. "
397 if msg:
398 raise click.ClickException(message=msg + "Duplicate commands are not supported, aborting.")
401class ButlerCLI(LoaderCLI):
402 """Specialized command loader implementing the ``butler`` command."""
404 localCmdPkg = "lsst.daf.butler.cli.cmd"
406 pluginEnvVar = "DAF_BUTLER_PLUGINS"
407 entryPoint = "butler.cli"
409 @classmethod
410 def _funcNameToCmdName(cls, functionName: str) -> str:
411 # Docstring inherited from base class.
413 # The "import" command name and "butler_import" function name are
414 # defined in cli/cmd/commands.py, and if those names are changed they
415 # must be changed here as well.
416 # It is expected that there will be very few butler command names that
417 # need to be changed because of e.g. conflicts with python keywords (as
418 # is done here and in _cmdNameToFuncName for the 'import' command). If
419 # this becomes a common need then some way of doing this should be
420 # invented that is better than hard coding the function names into
421 # these conversion functions. An extension of the 'cli/resources.yaml'
422 # file (as is currently used in obs_base) might be a good way to do it.
423 if functionName == "butler_import":
424 return "import"
425 return super()._funcNameToCmdName(functionName)
427 @classmethod
428 def _cmdNameToFuncName(cls, commandName: str) -> str:
429 # Docstring inherited from base class.
430 if commandName == "import":
431 return "butler_import"
432 return super()._cmdNameToFuncName(commandName)
435class UncachedButlerCLI(ButlerCLI):
436 """ButlerCLI that can be used where caching of the commands is disabled."""
438 def _getCommands(self) -> defaultdict[str, list[str | PluginCommand]]: # type: ignore[override]
439 """Get the commands offered by daf_butler and plugin packages.
441 Returns
442 -------
443 commands : `collections.defaultdict` \
444 [`str`, `list` [`str` | `PluginCommand` ]]
445 The key is the command name. The value is a list of package(s) that
446 contains the command.
447 """
448 return self._mergeCommandLists(self.getLocalCommands(), self._getPluginCommands())
451@click.command(cls=ButlerCLI, context_settings=dict(help_option_names=["-h", "--help"]))
452@log_level_option()
453@long_log_option()
454@log_file_option()
455@log_tty_option()
456@log_label_option()
457@ClickProgressHandler.option
458def cli(log_level: str, long_log: bool, log_file: str, log_tty: bool, log_label: str, progress: bool) -> None:
459 """Butler command-line tools.
461 Log options apply to all subcommands.
462 """
463 pass
466def main() -> click.Command:
467 """Return main entry point for command-line."""
468 return cli()