Coverage for python / lsst / ctrl / mpexec / cli / utils.py: 32%
82 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 08:50 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 08:50 +0000
1# This file is part of ctrl_mpexec.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
29import collections
30import contextlib
31import logging
32import re
33from typing import Any
35import click
36from astropy.table import Table
38from lsst.daf.butler.cli.opt import config_file_option, config_option
39from lsst.daf.butler.cli.utils import MWCommand, MWCtxObj, split_commas
40from lsst.pipe.base.cli.opt import instrument_option
41from lsst.pipe.base.quantum_graph import BaseQuantumGraph
42from lsst.utils.logging import getLogger
44from .opt import delete_option, task_option
46# Class which determines an action that needs to be performed
47# when building pipeline, its attributes are:
48# action: the name of the action, e.g. "new_task", "delete_task"
49# label: task label, can be None if action does not require label
50# value: argument value excluding task label.
51_PipelineAction = collections.namedtuple("_PipelineAction", "action,label,value")
53_LOG = getLogger(__name__)
55MP_TIMEOUT = 3600 * 24 * 30 # Default timeout (sec) for multiprocessing
58class _PipelineActionType:
59 """Class defining a callable type which converts strings into
60 ``_PipelineAction`` instances.
62 Parameters
63 ----------
64 action : `str`
65 Name of the action, will become `action` attribute of instance.
66 regex : `str`
67 Regular expression for argument value, it can define groups 'label'
68 and 'value' which will become corresponding attributes of a
69 returned instance.
70 """
72 def __init__(self, action: str, regex: str = ".*", valueType: type = str):
73 self.action = action
74 self.regex = re.compile(regex)
75 self.valueType = valueType
77 def __call__(self, value: str) -> _PipelineAction:
78 match = self.regex.match(value)
79 if not match:
80 raise TypeError(
81 f"Unrecognized syntax for option {self.action!r}: {value!r} "
82 f"(does not match pattern {self.regex.pattern})"
83 )
84 # get "label" group or use None as label
85 try:
86 label = match.group("label")
87 except IndexError:
88 label = None
89 # if "value" group is not defined use whole string
90 with contextlib.suppress(IndexError):
91 value = match.group("value")
93 value = self.valueType(value)
94 return _PipelineAction(self.action, label, value)
96 def __repr__(self) -> str:
97 """Return string representation of this class."""
98 return f"_PipelineActionType(action={self.action})"
101_ACTION_ADD_TASK = _PipelineActionType("new_task", "(?P<value>[^:]+)(:(?P<label>.+))?")
102_ACTION_DELETE_TASK = _PipelineActionType("delete_task", "(?P<value>)(?P<label>.+)")
103_ACTION_CONFIG = _PipelineActionType("config", "(?P<label>.+):(?P<value>.+=.+)")
104_ACTION_CONFIG_FILE = _PipelineActionType("configfile", "(?P<label>.+):(?P<value>.+)")
105_ACTION_ADD_INSTRUMENT = _PipelineActionType("add_instrument", "(?P<value>[^:]+)")
108def make_pipeline_actions(
109 args: list[str],
110 taskFlags: list[str] = task_option.opts(),
111 deleteFlags: list[str] = delete_option.opts(),
112 configFlags: list[str] = config_option.opts(),
113 configFileFlags: list[str] = config_file_option.opts(),
114 instrumentFlags: list[str] = instrument_option.opts(),
115) -> list[_PipelineAction]:
116 """Make a list of pipeline actions from a list of option flags and
117 values.
119 Parameters
120 ----------
121 args : `list` [`str`]
122 The arguments, option flags, and option values in the order they were
123 passed in on the command line.
124 taskFlags : `list` [`str`], optional
125 The option flags to use to recognize a task action, by default
126 task_option.opts().
127 deleteFlags : `list` [`str`], optional
128 The option flags to use to recognize a delete action, by default
129 delete_option.opts().
130 configFlags : `list` [`str`], optional
131 The option flags to use to recognize a config action, by default
132 config_option.opts().
133 configFileFlags : `list` [`str`], optional
134 The option flags to use to recognize a config-file action, by default
135 config_file_option.opts().
136 instrumentFlags : `list` [`str`], optional
137 The option flags to use to recognize an instrument action, by default
138 instrument_option.opts().
140 Returns
141 -------
142 pipelineActions : `list` [`_PipelineActionType`]
143 A list of pipeline actions constructed form their arguments in args,
144 in the order they appeared in args.
145 """
146 pipelineActions = []
147 # iterate up to the second-to-last element, if the second to last element
148 # is a key we're looking for, the last item will be its value.
149 for i in range(len(args) - 1):
150 if args[i] in taskFlags:
151 pipelineActions.append(_ACTION_ADD_TASK(args[i + 1]))
152 elif args[i] in deleteFlags:
153 pipelineActions.append(_ACTION_DELETE_TASK(args[i + 1]))
154 elif args[i] in configFlags:
155 pipelineActions.append(_ACTION_CONFIG(args[i + 1]))
156 elif args[i] in configFileFlags:
157 # --config-file allows multiple comma-separated values.
158 configfile_args = split_commas(None, None, args[i + 1])
159 pipelineActions.extend(_ACTION_CONFIG_FILE(c) for c in configfile_args)
160 elif args[i] in instrumentFlags:
161 pipelineActions.append(_ACTION_ADD_INSTRUMENT(args[i + 1]))
162 return pipelineActions
165def collect_pipeline_actions(ctx: click.Context, **kwargs: Any) -> dict[str, Any]:
166 """Extract pipeline building options, replace them with PipelineActions,
167 return updated `kwargs`.
169 Parameters
170 ----------
171 ctx : `click.Context`
172 Click context to extract actions from.
173 **kwargs : `object`
174 Keyword arguments to start from.
176 Returns
177 -------
178 kwargs : `dict`
179 Updated keyword arguments.
181 Notes
182 -----
183 The pipeline actions (task, delete, config, config_file, and instrument)
184 must be handled in the order they appear on the command line, but the CLI
185 specification gives them all different option names. So, instead of using
186 the individual action options as they appear in kwargs (because
187 invocation order can't be known), we capture the CLI arguments by
188 overriding `click.Command.parse_args` and save them in the Context's
189 `obj` parameter. We use `makePipelineActions` to create a list of
190 pipeline actions from the CLI arguments and pass that list to the script
191 function using the `pipeline_actions` kwarg name, and remove the action
192 options from kwargs.
193 """
194 from lsst.pipe.base.cli.opt import instrument_option
196 from .opt import delete_option, task_option
198 for pipelineAction in (
199 task_option.name(),
200 delete_option.name(),
201 config_option.name(),
202 config_file_option.name(),
203 instrument_option.name(),
204 ):
205 kwargs.pop(pipelineAction)
207 actions = make_pipeline_actions(MWCtxObj.getFrom(ctx).args)
208 pipeline_actions = []
209 for action in actions:
210 pipeline_actions.append(action)
212 kwargs["pipeline_actions"] = pipeline_actions
213 return kwargs
216class PipetaskCommand(MWCommand):
217 """Command subclass with pipetask-command specific overrides."""
219 extra_epilog = "See 'pipetask --help' for more options."
222def summarize_quantum_graph(qg: BaseQuantumGraph) -> int:
223 """Report a summary of the quanta in the graph.
225 This only reports quanta that were actually loaded.
227 Parameters
228 ----------
229 qg : `lsst.pipe.base.quantum_graph.BaseQuantumGraph`
230 Quantum graph object.
232 Returns
233 -------
234 n_quanta : `int`
235 The number of quanta in the graph.
236 """
237 n_task_quanta = {
238 task_label: len(quanta_for_task)
239 for task_label, quanta_for_task in qg.quanta_by_task.items()
240 if quanta_for_task
241 }
242 n_quanta = sum(n_task_quanta.values())
243 if n_quanta == 0:
244 _LOG.info("QuantumGraph contains no quanta.")
245 else:
246 if _LOG.isEnabledFor(logging.INFO):
247 qg_tasks, qg_quanta = zip(*n_task_quanta.items())
248 qg_task_table = Table(dict(Quanta=qg_quanta, Tasks=qg_tasks))
249 qg_task_table_formatted = "\n".join(qg_task_table.pformat())
250 quanta_str = "quantum" if n_quanta == 1 else "quanta"
251 n_tasks = len(n_task_quanta)
252 n_tasks_plural = "" if n_tasks == 1 else "s"
253 _LOG.info(
254 "QuantumGraph contains %d %s for %d task%s\n%s",
255 n_quanta,
256 quanta_str,
257 n_tasks,
258 n_tasks_plural,
259 qg_task_table_formatted,
260 )
261 return n_quanta