Coverage for python / lsst / ctrl / mpexec / cli / utils.py: 32%

82 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-24 08:19 +0000

1# This file is part of ctrl_mpexec. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28 

29import collections 

30import contextlib 

31import logging 

32import re 

33from typing import Any 

34 

35import click 

36from astropy.table import Table 

37 

38from lsst.daf.butler.cli.opt import config_file_option, config_option 

39from lsst.daf.butler.cli.utils import MWCommand, MWCtxObj, split_commas 

40from lsst.pipe.base.cli.opt import instrument_option 

41from lsst.pipe.base.quantum_graph import BaseQuantumGraph 

42from lsst.utils.logging import getLogger 

43 

44from .opt import delete_option, task_option 

45 

46# Class which determines an action that needs to be performed 

47# when building pipeline, its attributes are: 

48# action: the name of the action, e.g. "new_task", "delete_task" 

49# label: task label, can be None if action does not require label 

50# value: argument value excluding task label. 

51_PipelineAction = collections.namedtuple("_PipelineAction", "action,label,value") 

52 

53_LOG = getLogger(__name__) 

54 

55MP_TIMEOUT = 3600 * 24 * 30 # Default timeout (sec) for multiprocessing 

56 

57 

58class _PipelineActionType: 

59 """Class defining a callable type which converts strings into 

60 ``_PipelineAction`` instances. 

61 

62 Parameters 

63 ---------- 

64 action : `str` 

65 Name of the action, will become `action` attribute of instance. 

66 regex : `str` 

67 Regular expression for argument value, it can define groups 'label' 

68 and 'value' which will become corresponding attributes of a 

69 returned instance. 

70 """ 

71 

72 def __init__(self, action: str, regex: str = ".*", valueType: type = str): 

73 self.action = action 

74 self.regex = re.compile(regex) 

75 self.valueType = valueType 

76 

77 def __call__(self, value: str) -> _PipelineAction: 

78 match = self.regex.match(value) 

79 if not match: 

80 raise TypeError( 

81 f"Unrecognized syntax for option {self.action!r}: {value!r} " 

82 f"(does not match pattern {self.regex.pattern})" 

83 ) 

84 # get "label" group or use None as label 

85 try: 

86 label = match.group("label") 

87 except IndexError: 

88 label = None 

89 # if "value" group is not defined use whole string 

90 with contextlib.suppress(IndexError): 

91 value = match.group("value") 

92 

93 value = self.valueType(value) 

94 return _PipelineAction(self.action, label, value) 

95 

96 def __repr__(self) -> str: 

97 """Return string representation of this class.""" 

98 return f"_PipelineActionType(action={self.action})" 

99 

100 

101_ACTION_ADD_TASK = _PipelineActionType("new_task", "(?P<value>[^:]+)(:(?P<label>.+))?") 

102_ACTION_DELETE_TASK = _PipelineActionType("delete_task", "(?P<value>)(?P<label>.+)") 

103_ACTION_CONFIG = _PipelineActionType("config", "(?P<label>.+):(?P<value>.+=.+)") 

104_ACTION_CONFIG_FILE = _PipelineActionType("configfile", "(?P<label>.+):(?P<value>.+)") 

105_ACTION_ADD_INSTRUMENT = _PipelineActionType("add_instrument", "(?P<value>[^:]+)") 

106 

107 

108def make_pipeline_actions( 

109 args: list[str], 

110 taskFlags: list[str] = task_option.opts(), 

111 deleteFlags: list[str] = delete_option.opts(), 

112 configFlags: list[str] = config_option.opts(), 

113 configFileFlags: list[str] = config_file_option.opts(), 

114 instrumentFlags: list[str] = instrument_option.opts(), 

115) -> list[_PipelineAction]: 

116 """Make a list of pipeline actions from a list of option flags and 

117 values. 

118 

119 Parameters 

120 ---------- 

121 args : `list` [`str`] 

122 The arguments, option flags, and option values in the order they were 

123 passed in on the command line. 

124 taskFlags : `list` [`str`], optional 

125 The option flags to use to recognize a task action, by default 

126 task_option.opts(). 

127 deleteFlags : `list` [`str`], optional 

128 The option flags to use to recognize a delete action, by default 

129 delete_option.opts(). 

130 configFlags : `list` [`str`], optional 

131 The option flags to use to recognize a config action, by default 

132 config_option.opts(). 

133 configFileFlags : `list` [`str`], optional 

134 The option flags to use to recognize a config-file action, by default 

135 config_file_option.opts(). 

136 instrumentFlags : `list` [`str`], optional 

137 The option flags to use to recognize an instrument action, by default 

138 instrument_option.opts(). 

139 

140 Returns 

141 ------- 

142 pipelineActions : `list` [`_PipelineActionType`] 

143 A list of pipeline actions constructed form their arguments in args, 

144 in the order they appeared in args. 

145 """ 

146 pipelineActions = [] 

147 # iterate up to the second-to-last element, if the second to last element 

148 # is a key we're looking for, the last item will be its value. 

149 for i in range(len(args) - 1): 

150 if args[i] in taskFlags: 

151 pipelineActions.append(_ACTION_ADD_TASK(args[i + 1])) 

152 elif args[i] in deleteFlags: 

153 pipelineActions.append(_ACTION_DELETE_TASK(args[i + 1])) 

154 elif args[i] in configFlags: 

155 pipelineActions.append(_ACTION_CONFIG(args[i + 1])) 

156 elif args[i] in configFileFlags: 

157 # --config-file allows multiple comma-separated values. 

158 configfile_args = split_commas(None, None, args[i + 1]) 

159 pipelineActions.extend(_ACTION_CONFIG_FILE(c) for c in configfile_args) 

160 elif args[i] in instrumentFlags: 

161 pipelineActions.append(_ACTION_ADD_INSTRUMENT(args[i + 1])) 

162 return pipelineActions 

163 

164 

165def collect_pipeline_actions(ctx: click.Context, **kwargs: Any) -> dict[str, Any]: 

166 """Extract pipeline building options, replace them with PipelineActions, 

167 return updated `kwargs`. 

168 

169 Parameters 

170 ---------- 

171 ctx : `click.Context` 

172 Click context to extract actions from. 

173 **kwargs : `object` 

174 Keyword arguments to start from. 

175 

176 Returns 

177 ------- 

178 kwargs : `dict` 

179 Updated keyword arguments. 

180 

181 Notes 

182 ----- 

183 The pipeline actions (task, delete, config, config_file, and instrument) 

184 must be handled in the order they appear on the command line, but the CLI 

185 specification gives them all different option names. So, instead of using 

186 the individual action options as they appear in kwargs (because 

187 invocation order can't be known), we capture the CLI arguments by 

188 overriding `click.Command.parse_args` and save them in the Context's 

189 `obj` parameter. We use `makePipelineActions` to create a list of 

190 pipeline actions from the CLI arguments and pass that list to the script 

191 function using the `pipeline_actions` kwarg name, and remove the action 

192 options from kwargs. 

193 """ 

194 from lsst.pipe.base.cli.opt import instrument_option 

195 

196 from .opt import delete_option, task_option 

197 

198 for pipelineAction in ( 

199 task_option.name(), 

200 delete_option.name(), 

201 config_option.name(), 

202 config_file_option.name(), 

203 instrument_option.name(), 

204 ): 

205 kwargs.pop(pipelineAction) 

206 

207 actions = make_pipeline_actions(MWCtxObj.getFrom(ctx).args) 

208 pipeline_actions = [] 

209 for action in actions: 

210 pipeline_actions.append(action) 

211 

212 kwargs["pipeline_actions"] = pipeline_actions 

213 return kwargs 

214 

215 

216class PipetaskCommand(MWCommand): 

217 """Command subclass with pipetask-command specific overrides.""" 

218 

219 extra_epilog = "See 'pipetask --help' for more options." 

220 

221 

222def summarize_quantum_graph(qg: BaseQuantumGraph) -> int: 

223 """Report a summary of the quanta in the graph. 

224 

225 This only reports quanta that were actually loaded. 

226 

227 Parameters 

228 ---------- 

229 qg : `lsst.pipe.base.quantum_graph.BaseQuantumGraph` 

230 Quantum graph object. 

231 

232 Returns 

233 ------- 

234 n_quanta : `int` 

235 The number of quanta in the graph. 

236 """ 

237 n_task_quanta = { 

238 task_label: len(quanta_for_task) 

239 for task_label, quanta_for_task in qg.quanta_by_task.items() 

240 if quanta_for_task 

241 } 

242 n_quanta = sum(n_task_quanta.values()) 

243 if n_quanta == 0: 

244 _LOG.info("QuantumGraph contains no quanta.") 

245 else: 

246 if _LOG.isEnabledFor(logging.INFO): 

247 qg_tasks, qg_quanta = zip(*n_task_quanta.items()) 

248 qg_task_table = Table(dict(Quanta=qg_quanta, Tasks=qg_tasks)) 

249 qg_task_table_formatted = "\n".join(qg_task_table.pformat()) 

250 quanta_str = "quantum" if n_quanta == 1 else "quanta" 

251 n_tasks = len(n_task_quanta) 

252 n_tasks_plural = "" if n_tasks == 1 else "s" 

253 _LOG.info( 

254 "QuantumGraph contains %d %s for %d task%s\n%s", 

255 n_quanta, 

256 quanta_str, 

257 n_tasks, 

258 n_tasks_plural, 

259 qg_task_table_formatted, 

260 ) 

261 return n_quanta