Coverage for python / lsst / ctrl / mpexec / cli / script / run.py: 15%

87 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-26 09:00 +0000

1# This file is part of ctrl_mpexec. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30from collections.abc import Iterable 

31from typing import TYPE_CHECKING, Literal 

32 

33import astropy.units as u 

34 

35import lsst.utils.timer 

36from lsst.pipe.base import ExecutionResources, TaskFactory 

37from lsst.pipe.base.mp_graph_executor import MPGraphExecutor 

38from lsst.pipe.base.quantum_graph import PredictedQuantumGraph 

39from lsst.pipe.base.single_quantum_executor import SingleQuantumExecutor 

40from lsst.resources import ResourcePath, ResourcePathExpression 

41from lsst.utils.doImport import doImportType 

42from lsst.utils.iteration import ensure_iterable 

43from lsst.utils.logging import getLogger 

44from lsst.utils.threads import disable_implicit_threading 

45 

46from ..butler_factory import ButlerFactory 

47from ..utils import MP_TIMEOUT 

48 

49if TYPE_CHECKING: 

50 from lsst.pipe.base.execution_graph_fixup import ExecutionGraphFixup 

51 

52_LOG = getLogger(__name__) 

53 

54 

55def run( 

56 qg: PredictedQuantumGraph, 

57 *, 

58 task_factory: TaskFactory | None = None, 

59 pdb: str | None, 

60 graph_fixup: str, 

61 init_only: bool, 

62 no_versions: bool, 

63 processes: int, 

64 start_method: Literal["spawn", "forkserver"] | None, 

65 profile: str, 

66 register_dataset_types: bool, 

67 skip_init_writes: bool, 

68 timeout: int | None, 

69 butler_config: ResourcePathExpression, 

70 input: Iterable[str] | str, 

71 output: str | None, 

72 output_run: str | None, 

73 extend_run: bool, 

74 replace_run: bool, 

75 prune_replaced: str | None, 

76 data_query: str | None, 

77 skip_existing_in: Iterable[str] | None, 

78 skip_existing: bool, 

79 debug: bool, 

80 fail_fast: bool, 

81 clobber_outputs: bool, 

82 summary: ResourcePathExpression | None, 

83 enable_implicit_threading: bool, 

84 cores_per_quantum: int, 

85 memory_per_quantum: str | None, 

86 rebase: bool, 

87 raise_on_partial_outputs: bool, 

88 **kwargs: object, 

89) -> None: 

90 """Implement the command line interface `pipetask run` subcommand. 

91 

92 Should only be called by command line tools and unit test code that test 

93 this function. 

94 

95 Parameters 

96 ---------- 

97 qg : `lsst.pipe.base.quantum_graph.PredictedQuantumGraph` 

98 A quantum graph generated by a previous subcommand. 

99 task_factory : `lsst.pipe.base.TaskFactory`, optional 

100 A custom task factory to use. 

101 pdb : `str`, optional 

102 Debugger to import and use (via the ``post_mortem`` function) in the 

103 event of an exception. 

104 graph_fixup : `str` 

105 The name of the class or factory method which makes an instance used 

106 for execution graph fixup. 

107 init_only : `bool` 

108 If true, do not actually run; just register dataset types and/or save 

109 init outputs. 

110 no_versions : `bool` 

111 If true, do not save or check package versions. 

112 processes : `int` 

113 The number of processes to use. 

114 start_method : `str` or `None` 

115 Start method from `multiprocessing` module, `None` selects the best 

116 one for current platform. 

117 profile : `str` 

118 File name to dump cProfile information to. 

119 register_dataset_types : `bool` 

120 If true, register DatasetTypes that do not already exist in the 

121 Registry. 

122 skip_init_writes : `bool` 

123 If true, do not write collection-wide 'init output' datasets (e.g. 

124 schemas). 

125 timeout : `int` 

126 Timeout for multiprocessing; maximum wall time (sec). 

127 butler_config : convertible to `lsst.resources.ResourcePath` 

128 Path to butler repository configuration. 

129 input : `~collections.abc.Iterable` [ `str` ] or `None` 

130 List of names of the input collection(s). 

131 output : `str` or `None` 

132 Name of the output CHAINED collection. This may either be an existing 

133 CHAINED collection to use as both input and output (if `input` is 

134 `None`), or a new CHAINED collection created to include all inputs 

135 (if `input` is not `None`). In both cases, the collection's children 

136 will start with an output RUN collection that directly holds all new 

137 datasets (see `output_run`). 

138 output_run : `str` or `None` 

139 Name of the new output RUN collection. If not provided then `output` 

140 must be provided and a new RUN collection will be created by appending 

141 a timestamp to the value passed with `output`. If this collection 

142 already exists then `extend_run` must be passed. 

143 extend_run : `bool` 

144 Instead of creating a new RUN collection, insert datasets into either 

145 the one given by `output_run` (if provided) or the first child 

146 collection of `output` (which must be of type RUN). 

147 replace_run : `bool` 

148 Before creating a new RUN collection in an existing CHAINED collection, 

149 remove the first child collection (which must be of type RUN). This can 

150 be used to repeatedly write to the same (parent) collection during 

151 development, but it does not delete the datasets associated with the 

152 replaced run unless `prune-replaced` is also True. Requires `output`, 

153 and `extend_run` must be `None`. 

154 prune_replaced : `str` or `None` 

155 If not `None`, delete the datasets in the collection replaced by 

156 `replace_run`, either just from the datastore ("unstore") or by 

157 removing them and the RUN completely ("purge"). Requires 

158 ``replace_run`` to be `True`. 

159 data_query : `str` 

160 User query selection expression. 

161 skip_existing_in : `~collections.abc.Iterable` [ `str` ] or `None` 

162 Accepts list of collections, if all Quantum outputs already exist in 

163 the specified list of collections then that Quantum will be excluded 

164 from the QuantumGraph. 

165 skip_existing : `bool` 

166 Appends output RUN collection to the ``skip_existing_in`` list. 

167 debug : `bool` 

168 If true, enable debugging output using lsstDebug facility (imports 

169 debug.py). 

170 fail_fast : `bool` 

171 If true then stop processing at first error, otherwise process as many 

172 tasks as possible. 

173 clobber_outputs : `bool` 

174 Remove outputs from previous execution of the same quantum before new 

175 execution. Only applies to failed quanta if skip_existing is also 

176 given. 

177 summary : `str` 

178 File path to store job report in JSON format. 

179 enable_implicit_threading : `bool`, optional 

180 If `True`, do not disable implicit threading by third-party libraries. 

181 Implicit threading is always disabled during actual quantum execution 

182 if ``processes > 1``. 

183 cores_per_quantum : `int` 

184 Number of cores that can be used by each quantum. 

185 memory_per_quantum : `str` 

186 Amount of memory that each quantum can be allowed to use. Empty string 

187 implies no limit. The string can be either a single integer (implying 

188 units of MB) or a combination of number and unit. 

189 rebase : `bool` 

190 If `True` then reset output collection chain if it is inconsistent with 

191 the ``inputs``. 

192 raise_on_partial_outputs : `bool` 

193 Consider partial outputs an error instead of a success. 

194 **kwargs : `object` 

195 Ignored; click commands may accept options for more than one script 

196 function and pass all the option kwargs to each of the script functions 

197 which ignore these unused kwargs. 

198 """ 

199 # Fork option still exists for compatibility but we use spawn instead. 

200 if start_method == "fork": # type: ignore[comparison-overlap] 

201 start_method = "spawn" # type: ignore[unreachable] 

202 _LOG.warning("Option --start-method=fork is unsafe and no longer supported, using spawn instead.") 

203 

204 if not enable_implicit_threading: 

205 disable_implicit_threading() 

206 

207 skip_existing_in = tuple(skip_existing_in) if skip_existing_in is not None else () 

208 if data_query is None: 

209 data_query = "" 

210 inputs = list(ensure_iterable(input)) if input else [] 

211 del input 

212 enable_lsst_debug = debug 

213 del debug 

214 

215 if not output_run: 

216 # If we have no output run specified, use the one from the graph rather 

217 # than letting a new timestamped run be created. 

218 output_run = qg.header.output_run 

219 else: 

220 # Check that output run defined on command line is consistent with 

221 # quantum graph. 

222 if qg.header.output_run != output_run: 

223 raise ValueError( 

224 f"Output run defined on command line ({output_run}) has to be " 

225 f"identical to graph metadata ({qg.header.output_run}). " 

226 "To update graph metadata run `pipetask update-graph-run` command." 

227 ) 

228 

229 # Make sure that --extend-run always enables --skip-existing, 

230 # clobbering should be disabled if --extend-run is not specified. 

231 if extend_run: 

232 skip_existing = True 

233 else: 

234 clobber_outputs = False 

235 

236 # Make butler instance. QuantumGraph should have an output run defined, 

237 # but we ignore it here and let command line decide actual output run. 

238 with ButlerFactory.make_write_butler( 

239 butler_config, 

240 qg.pipeline_graph, 

241 output=output, 

242 output_run=output_run, 

243 inputs=inputs, 

244 extend_run=extend_run, 

245 rebase=rebase, 

246 replace_run=replace_run, 

247 prune_replaced=prune_replaced, 

248 ) as butler: 

249 assert butler.run is not None, "Guaranteed by make_write_butler." 

250 if skip_existing: 

251 skip_existing_in += (butler.run,) 

252 

253 # Enable lsstDebug debugging. Note that this is done once in the 

254 # main process before PreExecInit and it is also repeated before 

255 # running each task in SingleQuantumExecutor (which may not be 

256 # needed if `multiprocessing` always uses fork start method). 

257 if enable_lsst_debug: 

258 try: 

259 _LOG.debug("Will try to import debug.py") 

260 import debug # type: ignore # noqa: F401 

261 except ImportError: 

262 _LOG.warning("No 'debug' module found.") 

263 

264 # Save all InitOutputs, configs, etc. 

265 if register_dataset_types: 

266 qg.pipeline_graph.register_dataset_types(butler, include_packages=not no_versions) 

267 if not skip_init_writes: 

268 qg.write_init_outputs(butler, skip_existing=skip_existing) 

269 qg.write_configs(butler, compare_existing=extend_run) 

270 if not no_versions: 

271 qg.write_packages(butler, compare_existing=extend_run) 

272 

273 if init_only: 

274 return 

275 

276 if task_factory is None: 

277 task_factory = TaskFactory() 

278 resources = ExecutionResources( 

279 num_cores=cores_per_quantum, max_mem=memory_per_quantum, default_mem_units=u.MB 

280 ) 

281 quantum_executor = SingleQuantumExecutor( 

282 butler=butler, 

283 task_factory=task_factory, 

284 skip_existing_in=skip_existing_in, 

285 clobber_outputs=clobber_outputs, 

286 enable_lsst_debug=enable_lsst_debug, 

287 resources=resources, 

288 raise_on_partial_outputs=raise_on_partial_outputs, 

289 ) 

290 

291 if timeout is None: 

292 timeout = MP_TIMEOUT 

293 executor = MPGraphExecutor( 

294 num_proc=processes, 

295 timeout=timeout, 

296 start_method=start_method, 

297 quantum_executor=quantum_executor, 

298 fail_fast=fail_fast, 

299 pdb=pdb, 

300 execution_graph_fixup=_import_graph_fixup(graph_fixup), 

301 ) 

302 # Have to reset connection pool to avoid sharing connections with 

303 # forked processes. 

304 butler.registry.resetConnectionPool() 

305 try: 

306 with lsst.utils.timer.profile(profile, _LOG): 

307 executor.execute(qg) 

308 finally: 

309 if summary: 

310 report = executor.getReport() 

311 if report: 

312 with ResourcePath(summary).open("w") as out: 

313 # Do not save fields that are not set. 

314 out.write(report.model_dump_json(exclude_none=True, indent=2)) 

315 

316 

317def _import_graph_fixup(graph_fixup: str) -> ExecutionGraphFixup | None: 

318 """Import/instantiate graph fixup object. 

319 

320 Parameters 

321 ---------- 

322 graph_fixup : `str` 

323 Graph fixup command-line argument. 

324 

325 Returns 

326 ------- 

327 fixup : `ExecutionGraphFixup` or `None` 

328 Object that imposes additional ordering constraints on the graph. 

329 

330 Raises 

331 ------ 

332 ValueError 

333 Raised if import fails, method call raises exception, or returned 

334 instance has unexpected type. 

335 """ 

336 from lsst.pipe.base.execution_graph_fixup import ExecutionGraphFixup 

337 

338 if graph_fixup: 

339 try: 

340 factory = doImportType(graph_fixup) 

341 except Exception as exc: 

342 raise ValueError("Failed to import graph fixup class/method") from exc 

343 try: 

344 fixup = factory() 

345 except Exception as exc: 

346 raise ValueError("Failed to make instance of graph fixup") from exc 

347 if not isinstance(fixup, ExecutionGraphFixup): 

348 raise ValueError("Graph fixup is not an instance of ExecutionGraphFixup class") 

349 return fixup 

350 return None