Coverage for python / lsst / pipe / base / separable_pipeline_executor.py: 33%

73 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 08:49 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28 

29from __future__ import annotations 

30 

31__all__ = [ 

32 "SeparablePipelineExecutor", 

33] 

34 

35 

36import datetime 

37import getpass 

38import logging 

39from collections.abc import Iterable 

40from typing import Any 

41 

42import lsst.resources 

43from lsst.daf.butler import Butler, DatasetRef 

44from lsst.daf.butler._rubin.temporary_for_ingest import TemporaryForIngest 

45 

46from ._quantumContext import ExecutionResources 

47from .all_dimensions_quantum_graph_builder import AllDimensionsQuantumGraphBuilder 

48from .graph import QuantumGraph 

49from .mp_graph_executor import MPGraphExecutor, MPGraphExecutorError 

50from .pipeline import Pipeline 

51from .quantum_graph import PredictedQuantumGraph 

52from .quantum_graph_builder import QuantumGraphBuilder 

53from .quantum_graph_executor import QuantumGraphExecutor 

54from .single_quantum_executor import SingleQuantumExecutor 

55from .taskFactory import TaskFactory 

56 

57_LOG = logging.getLogger(__name__) 

58 

59 

60class SeparablePipelineExecutor: 

61 """An executor that allows each step of pipeline execution to be 

62 run independently. 

63 

64 The executor can run any or all of the following steps: 

65 

66 * pre-execution initialization 

67 * pipeline building 

68 * quantum graph generation 

69 * quantum graph execution 

70 

71 Any of these steps can also be handed off to external code without 

72 compromising the remaining ones. 

73 

74 Parameters 

75 ---------- 

76 butler : `lsst.daf.butler.Butler` 

77 A Butler whose ``collections`` and ``run`` attributes contain the input 

78 and output collections to use for processing. 

79 clobber_output : `bool`, optional 

80 If set, the pipeline execution overwrites existing output files. 

81 Otherwise, any conflict between existing and new outputs is an error. 

82 skip_existing_in : `~collections.abc.Iterable` [`str`], optional 

83 If not empty, the pipeline execution searches the listed collections 

84 for existing outputs, and skips any quanta that have run to completion 

85 (or have no work to do). Otherwise, all tasks are attempted (subject to 

86 ``clobber_output``). 

87 task_factory : `.TaskFactory`, optional 

88 A custom task factory for use in pre-execution and execution. By 

89 default, a new instance of `.TaskFactory` is used. 

90 resources : `.ExecutionResources` 

91 The resources available to each quantum being executed. 

92 raise_on_partial_outputs : `bool`, optional 

93 If `True` raise exceptions chained by 

94 `.AnnotatedPartialOutputsError` immediately, instead of 

95 considering the partial result a success and continuing to run 

96 downstream tasks. 

97 """ 

98 

99 def __init__( 

100 self, 

101 butler: Butler, 

102 clobber_output: bool = False, 

103 skip_existing_in: Iterable[str] | None = None, 

104 task_factory: TaskFactory | None = None, 

105 resources: ExecutionResources | None = None, 

106 raise_on_partial_outputs: bool = True, 

107 ): 

108 self._butler = Butler.from_config( 

109 butler=butler, collections=butler.collections.defaults, run=butler.run 

110 ) 

111 if not self._butler.collections.defaults: 

112 raise ValueError("Butler must specify input collections for pipeline.") 

113 if not self._butler.run: 

114 raise ValueError("Butler must specify output run for pipeline.") 

115 

116 self._clobber_output = clobber_output 

117 self._skip_existing_in = list(skip_existing_in) if skip_existing_in else [] 

118 

119 self._task_factory = task_factory if task_factory else TaskFactory() 

120 self.resources = resources 

121 self.raise_on_partial_outputs = raise_on_partial_outputs 

122 

123 def pre_execute_qgraph( 

124 self, 

125 graph: QuantumGraph | PredictedQuantumGraph, 

126 register_dataset_types: bool = False, 

127 save_init_outputs: bool = True, 

128 save_versions: bool = True, 

129 ) -> None: 

130 """Run pre-execution initialization. 

131 

132 This method will be deprecated after DM-38041, to be replaced with a 

133 method that takes either a `.Pipeline` or a 

134 resolved `.pipeline_graph.PipelineGraph` instead of a `.QuantumGraph`. 

135 

136 Parameters 

137 ---------- 

138 graph : `.QuantumGraph` or `.quantum_graph.PredictedQuantumGraph` 

139 The quantum graph defining the pipeline and datasets to 

140 be initialized. 

141 register_dataset_types : `bool`, optional 

142 If `True`, register all output dataset types from the pipeline 

143 represented by ``graph``. 

144 save_init_outputs : `bool`, optional 

145 If `True`, create init-output datasets in this object's output run. 

146 save_versions : `bool`, optional 

147 If `True`, save a package versions dataset. 

148 """ 

149 if register_dataset_types: 

150 graph.pipeline_graph.register_dataset_types(self._butler, include_packages=save_versions) 

151 if save_init_outputs: 

152 graph.write_init_outputs(self._butler, skip_existing=(self._butler.run in self._skip_existing_in)) 

153 graph.write_configs(self._butler) 

154 if save_versions: 

155 graph.write_packages(self._butler) 

156 

157 def make_pipeline(self, pipeline_uri: str | lsst.resources.ResourcePath) -> Pipeline: 

158 """Build a pipeline from pipeline and configuration information. 

159 

160 Parameters 

161 ---------- 

162 pipeline_uri : `str` or `lsst.resources.ResourcePath` 

163 URI to a file containing a pipeline definition. A URI fragment may 

164 be used to specify a subset of the pipeline, as described in 

165 :ref:`pipeline-running-intro`. 

166 

167 Returns 

168 ------- 

169 pipeline : `.Pipeline` 

170 The fully-built pipeline. 

171 """ 

172 return Pipeline.from_uri(pipeline_uri) 

173 

174 def make_quantum_graph_builder( 

175 self, 

176 pipeline: Pipeline, 

177 where: str = "", 

178 *, 

179 builder_class: type[QuantumGraphBuilder] = AllDimensionsQuantumGraphBuilder, 

180 **kwargs: Any, 

181 ) -> QuantumGraphBuilder: 

182 """Initialize a quantum graph builder from a pipeline and input 

183 datasets. 

184 

185 Parameters 

186 ---------- 

187 pipeline : `.Pipeline` 

188 The pipeline for which to generate a quantum graph. 

189 where : `str`, optional 

190 A data ID query that constrains the quanta generated. Must not be 

191 provided if a custom ``builder_class`` is given and that class does 

192 not accept ``where`` as a construction argument. 

193 builder_class : `type` [ \ 

194 `.quantum_graph_builder.QuantumGraphBuilder` ], optional 

195 Quantum graph builder implementation. Ignored if ``builder`` is 

196 provided. 

197 **kwargs 

198 Additional keyword arguments are forwarded to ``builder_class`` 

199 when a quantum graph builder instance is constructed. All 

200 arguments accepted by the 

201 `~.quantum_graph_builder.QuantumGraphBuilder` base 

202 class are provided automatically (from explicit arguments to this 

203 method and executor attributes) and do not need to be included 

204 as keyword arguments. 

205 

206 Returns 

207 ------- 

208 builder : `.quantum_graph_builder.QuantumGraphBuilder` 

209 A quantum graph builder. 

210 """ 

211 if where: 

212 # Only pass 'where' if it's actually provided, since some 

213 # QuantumGraphBuilder subclasses may not accept it. 

214 kwargs["where"] = where 

215 return builder_class( 

216 pipeline.to_graph(), 

217 self._butler, 

218 skip_existing_in=self._skip_existing_in, 

219 clobber=self._clobber_output, 

220 **kwargs, 

221 ) 

222 

223 def make_quantum_graph( 

224 self, 

225 pipeline: Pipeline, 

226 where: str = "", 

227 *, 

228 builder_class: type[QuantumGraphBuilder] = AllDimensionsQuantumGraphBuilder, 

229 attach_datastore_records: bool = False, 

230 **kwargs: Any, 

231 ) -> QuantumGraph: 

232 """Build a quantum graph from a pipeline and input datasets. 

233 

234 This returns an instance of the old `.QuantumGraph` class. Use 

235 `build_quantum_graph` to construct a 

236 `.quantum_graph.PredictedQuantumGraph`. 

237 

238 Parameters 

239 ---------- 

240 pipeline : `.Pipeline` 

241 The pipeline for which to generate a quantum graph. 

242 where : `str`, optional 

243 A data ID query that constrains the quanta generated. Must not be 

244 provided if a custom ``builder_class`` is given and that class does 

245 not accept ``where`` as a construction argument. 

246 builder_class : `type` [ \ 

247 `.quantum_graph_builder.QuantumGraphBuilder` ], optional 

248 Quantum graph builder implementation. Ignored if ``builder`` is 

249 provided. 

250 attach_datastore_records : `bool`, optional 

251 Whether to attach datastore records. These are currently used only 

252 by `lsst.daf.butler.QuantumBackedButler`, which is not used by 

253 `SeparablePipelineExecutor` for execution. 

254 **kwargs 

255 Additional keyword arguments are forwarded to ``builder_class`` 

256 when a quantum graph builder instance is constructed. All 

257 arguments accepted by the 

258 `~.quantum_graph_builder.QuantumGraphBuilder` base 

259 class are provided automatically (from explicit arguments to this 

260 method and executor attributes) and do not need to be included 

261 as keyword arguments. 

262 

263 Returns 

264 ------- 

265 graph : `.QuantumGraph` 

266 The quantum graph for ``.Pipeline`` as run on the datasets 

267 identified by ``where``. 

268 

269 Notes 

270 ----- 

271 This method does no special handling of empty quantum graphs. If 

272 needed, clients can use `len` to test if the returned graph is empty. 

273 """ 

274 metadata = { 

275 "input": self._butler.collections.defaults, 

276 "output_run": self._butler.run, 

277 "skip_existing_in": self._skip_existing_in, 

278 "skip_existing": bool(self._skip_existing_in), 

279 "data_query": where, 

280 "user": getpass.getuser(), 

281 "time": str(datetime.datetime.now()), 

282 } 

283 qg_builder = self.make_quantum_graph_builder(pipeline, where, builder_class=builder_class, **kwargs) 

284 graph = qg_builder.build(metadata=metadata, attach_datastore_records=attach_datastore_records) 

285 _LOG.info( 

286 "QuantumGraph contains %d quanta for %d tasks, graph ID: %r", 

287 len(graph), 

288 len(graph.taskGraph), 

289 graph.graphID, 

290 ) 

291 return graph 

292 

293 def build_quantum_graph( 

294 self, 

295 pipeline: Pipeline, 

296 where: str = "", 

297 *, 

298 builder_class: type[QuantumGraphBuilder] = AllDimensionsQuantumGraphBuilder, 

299 attach_datastore_records: bool = False, 

300 **kwargs: Any, 

301 ) -> PredictedQuantumGraph: 

302 """Build a quantum graph from a pipeline and input datasets. 

303 

304 This returns an instance of the new 

305 `.quantum_graph.PredictedQuantumGraph` class. Use `make_quantum_graph` 

306 to construct a `.QuantumGraph`. 

307 

308 Parameters 

309 ---------- 

310 pipeline : `.Pipeline` 

311 The pipeline for which to generate a quantum graph. 

312 where : `str`, optional 

313 A data ID query that constrains the quanta generated. Must not be 

314 provided if a custom ``builder_class`` is given and that class does 

315 not accept ``where`` as a construction argument. 

316 builder_class : `type` [ \ 

317 `.quantum_graph_builder.QuantumGraphBuilder` ], optional 

318 Quantum graph builder implementation. Ignored if ``builder`` is 

319 provided. 

320 attach_datastore_records : `bool`, optional 

321 Whether to attach datastore records. These are currently used only 

322 by `lsst.daf.butler.QuantumBackedButler`, which is not used by 

323 `SeparablePipelineExecutor` for execution. 

324 **kwargs 

325 Additional keyword arguments are forwarded to ``builder_class`` 

326 when a quantum graph builder instance is constructed. All 

327 arguments accepted by the 

328 `~.quantum_graph_builder.QuantumGraphBuilder` base 

329 class are provided automatically (from explicit arguments to this 

330 method and executor attributes) and do not need to be included 

331 as keyword arguments. 

332 

333 Returns 

334 ------- 

335 graph : `.QuantumGraph` 

336 The quantum graph for ``.Pipeline`` as run on the datasets 

337 identified by ``where``. 

338 

339 Notes 

340 ----- 

341 This method does no special handling of empty quantum graphs. If 

342 needed, clients can use `len` to test if the returned graph is empty. 

343 """ 

344 metadata = { 

345 "skip_existing_in": self._skip_existing_in, 

346 "skip_existing": bool(self._skip_existing_in), 

347 "data_query": where, 

348 } 

349 qg_builder = self.make_quantum_graph_builder(pipeline, where, builder_class=builder_class, **kwargs) 

350 graph = qg_builder.finish( 

351 metadata=metadata, attach_datastore_records=attach_datastore_records 

352 ).assemble() 

353 _LOG.info( 

354 "PredictedQuantumGraph contains %d quanta for %d tasks.", 

355 len(graph), 

356 len(graph.quanta_by_task), 

357 ) 

358 return graph 

359 

360 def run_pipeline( 

361 self, 

362 graph: QuantumGraph | PredictedQuantumGraph, 

363 fail_fast: bool = False, 

364 graph_executor: QuantumGraphExecutor | None = None, 

365 num_proc: int = 1, 

366 *, 

367 provenance_dataset_ref: DatasetRef | None = None, 

368 ) -> None: 

369 """Run a pipeline in the form of a prepared quantum graph. 

370 

371 Pre-execution initialization must have already been run; 

372 see `pre_execute_qgraph`. 

373 

374 Parameters 

375 ---------- 

376 graph : `.QuantumGraph` or `.quantum_graph.PredictedQuantumGraph` 

377 The pipeline and datasets to execute. 

378 fail_fast : `bool`, optional 

379 If `True`, abort all execution if any task fails when 

380 running with multiple processes. Only used with the default graph 

381 executor). 

382 graph_executor : `.quantum_graph_executor.QuantumGraphExecutor`,\ 

383 optional 

384 A custom graph executor. By default, a new instance of 

385 `.mp_graph_executor.MPGraphExecutor` is used. 

386 num_proc : `int`, optional 

387 The number of processes that can be used to run the pipeline. The 

388 default value ensures that no subprocess is created. Only used with 

389 the default graph executor. 

390 provenance_dataset_ref : `lsst.daf.butler.DatasetRef`, optional 

391 Dataset that should be used to save provenance. Provenance is only 

392 supported when running in a single process (at least for the 

393 default quantum executor), and should not be enabled in contexts 

394 where a quantum might be executed more than once (i.e. retried) 

395 within the same `~lsst.daf.butler.CollectionType.RUN` collection. 

396 The caller is responsible for registering the dataset type and for 

397 ensuring that the dimensions of this dataset do not lead to 

398 uniqueness conflicts. 

399 """ 

400 if not graph_executor: 

401 quantum_executor = SingleQuantumExecutor( 

402 butler=self._butler, 

403 task_factory=self._task_factory, 

404 skip_existing_in=self._skip_existing_in, 

405 clobber_outputs=self._clobber_output, 

406 resources=self.resources, 

407 raise_on_partial_outputs=self.raise_on_partial_outputs, 

408 ) 

409 graph_executor = MPGraphExecutor( 

410 num_proc=num_proc, 

411 timeout=2_592_000.0, # In practice, timeout is never helpful; set to 30 days. 

412 quantum_executor=quantum_executor, 

413 fail_fast=fail_fast, 

414 ) 

415 # Have to reset connection pool to avoid sharing connections with 

416 # forked processes. 

417 self._butler.registry.resetConnectionPool() 

418 

419 if provenance_dataset_ref is not None: 

420 with TemporaryForIngest(self._butler, provenance_dataset_ref) as temporary: 

421 try: 

422 graph_executor.execute(graph, provenance_graph_file=temporary.ospath) 

423 temporary.ingest() 

424 except MPGraphExecutorError: 

425 # If the graph executor itself raised, it will have 

426 # finished the provenance rewrite. In other cases the 

427 # temporary file might be incomplete or corrupted and we 

428 # can't roll the dice on ingesting it. 

429 temporary.ingest() 

430 raise 

431 

432 else: 

433 graph_executor.execute(graph)