Coverage for python / lsst / pipe / base / mp_graph_executor.py: 14%

410 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-24 08:19 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30__all__ = ["MPGraphExecutor", "MPGraphExecutorError", "MPTimeoutError"] 

31 

32import enum 

33import importlib 

34import logging 

35import multiprocessing 

36import pickle 

37import signal 

38import sys 

39import threading 

40import time 

41import uuid 

42from contextlib import ExitStack 

43from typing import Literal, cast 

44 

45import networkx 

46 

47from lsst.daf.butler import DataCoordinate, Quantum 

48from lsst.daf.butler.cli.cliLog import CliLog 

49from lsst.daf.butler.logging import ButlerLogRecords 

50from lsst.utils.threads import disable_implicit_threading 

51 

52from ._status import InvalidQuantumError, RepeatableQuantumError 

53from ._task_metadata import TaskMetadata 

54from .execution_graph_fixup import ExecutionGraphFixup 

55from .graph import QuantumGraph 

56from .graph_walker import GraphWalker 

57from .log_on_close import LogOnClose 

58from .pipeline_graph import TaskNode 

59from .quantum_graph import PredictedQuantumGraph, PredictedQuantumInfo, ProvenanceQuantumGraphWriter 

60from .quantum_graph_executor import QuantumExecutor, QuantumGraphExecutor 

61from .quantum_reports import ExecutionStatus, QuantumReport, Report 

62 

63_LOG = logging.getLogger(__name__) 

64 

65 

66class JobState(enum.Enum): 

67 """Possible state for an executing task.""" 

68 

69 PENDING = enum.auto() 

70 """The job has not started yet.""" 

71 

72 RUNNING = enum.auto() 

73 """The job is currently executing.""" 

74 

75 FINISHED = enum.auto() 

76 """The job finished successfully.""" 

77 

78 FAILED = enum.auto() 

79 """The job execution failed (process returned non-zero status).""" 

80 

81 TIMED_OUT = enum.auto() 

82 """The job was killed due to too long execution time.""" 

83 

84 FAILED_DEP = enum.auto() 

85 """One of the dependencies of this job failed or timed out.""" 

86 

87 

88class _Job: 

89 """Class representing a job running single task. 

90 

91 Parameters 

92 ---------- 

93 quantum_id : `uuid.UUID` 

94 ID of the quantum this job executes. 

95 quantum : `lsst.daf.butler.Quantum` 

96 Description of the inputs and outputs. 

97 task_node : `.pipeline_graph.TaskNode` 

98 Description of the task and configuration. 

99 """ 

100 

101 def __init__(self, quantum_id: uuid.UUID, quantum: Quantum, task_node: TaskNode): 

102 self.quantum_id = quantum_id 

103 self.quantum = quantum 

104 self.task_node = task_node 

105 self.process: multiprocessing.process.BaseProcess | None = None 

106 self._state = JobState.PENDING 

107 self.started: float = 0.0 

108 self._rcv_conn: multiprocessing.connection.Connection | None = None 

109 self._terminated = False 

110 

111 @property 

112 def state(self) -> JobState: 

113 """Job processing state (JobState).""" 

114 return self._state 

115 

116 @property 

117 def terminated(self) -> bool: 

118 """Return `True` if job was killed by stop() method and negative exit 

119 code is returned from child process (`bool`). 

120 """ 

121 if self._terminated: 

122 assert self.process is not None, "Process must be started" 

123 if self.process.exitcode is not None: 

124 return self.process.exitcode < 0 

125 return False 

126 

127 def start( 

128 self, 

129 quantumExecutor: QuantumExecutor, 

130 startMethod: Literal["spawn"] | Literal["forkserver"], 

131 fail_fast: bool, 

132 ) -> None: 

133 """Start process which runs the task. 

134 

135 Parameters 

136 ---------- 

137 quantumExecutor : `QuantumExecutor` 

138 Executor for single quantum. 

139 startMethod : `str`, optional 

140 Start method from `multiprocessing` module. 

141 fail_fast : `bool`, optional 

142 If `True` then kill subprocess on RepeatableQuantumError. 

143 """ 

144 # Unpickling of quantum has to happen after butler/executor, also we 

145 # want to setup logging before unpickling anything that can generate 

146 # messages, this is why things are pickled manually here. 

147 qe_pickle = pickle.dumps(quantumExecutor) 

148 task_node_pickle = pickle.dumps(self.task_node) 

149 quantum_pickle = pickle.dumps(self.quantum) 

150 self._rcv_conn, snd_conn = multiprocessing.Pipe(False) 

151 logConfigState = CliLog.configState 

152 

153 mp_ctx = multiprocessing.get_context(startMethod) 

154 self.process = mp_ctx.Process( # type: ignore[attr-defined] 

155 target=_Job._executeJob, 

156 args=( 

157 qe_pickle, 

158 task_node_pickle, 

159 quantum_pickle, 

160 self.quantum_id, 

161 logConfigState, 

162 snd_conn, 

163 fail_fast, 

164 ), 

165 name=f"task-{self.quantum.dataId}", 

166 ) 

167 # mypy is getting confused by multiprocessing. 

168 assert self.process is not None 

169 self.process.start() 

170 self.started = time.time() 

171 self._state = JobState.RUNNING 

172 

173 @staticmethod 

174 def _executeJob( 

175 quantumExecutor_pickle: bytes, 

176 task_node_pickle: bytes, 

177 quantum_pickle: bytes, 

178 quantum_id: uuid.UUID, 

179 logConfigState: list, 

180 snd_conn: multiprocessing.connection.Connection, 

181 fail_fast: bool, 

182 ) -> None: 

183 """Execute a job with arguments. 

184 

185 Parameters 

186 ---------- 

187 quantumExecutor_pickle : `bytes` 

188 Executor for single quantum, pickled. 

189 task_node_pickle : `bytes` 

190 Task definition structure, pickled. 

191 quantum_pickle : `bytes` 

192 Quantum for this task execution in pickled form. 

193 quantum_id : `uuid.UUID` 

194 Unique ID for the quantum. 

195 logConfigState : `list` 

196 Logging state from parent process. 

197 snd_conn : `multiprocessing.Connection` 

198 Connection to send job report to parent process. 

199 fail_fast : `bool` 

200 If `True` then kill subprocess on RepeatableQuantumError. 

201 """ 

202 # This terrible hack is a workaround for Python threading bug: 

203 # https://github.com/python/cpython/issues/102512. Should be removed 

204 # when fix for that bug is deployed. Inspired by 

205 # https://github.com/QubesOS/qubes-core-admin-client/pull/236/files. 

206 thread = threading.current_thread() 

207 if isinstance(thread, threading._DummyThread): 

208 if getattr(thread, "_tstate_lock", "") is None: 

209 thread._set_tstate_lock() # type: ignore[attr-defined] 

210 

211 if logConfigState and not CliLog.configState: 

212 # means that we are in a new spawned Python process and we have to 

213 # re-initialize logging 

214 CliLog.replayConfigState(logConfigState) 

215 

216 quantumExecutor: QuantumExecutor = pickle.loads(quantumExecutor_pickle) 

217 task_node: TaskNode = pickle.loads(task_node_pickle) 

218 quantum = pickle.loads(quantum_pickle) 

219 report: QuantumReport | None = None 

220 # Catch a few known failure modes and stop the process immediately, 

221 # with exception-specific exit code. 

222 try: 

223 _, report = quantumExecutor.execute(task_node, quantum, quantum_id=quantum_id) 

224 except RepeatableQuantumError as exc: 

225 report = QuantumReport.from_exception( 

226 quantumId=quantum_id, 

227 exception=exc, 

228 dataId=quantum.dataId, 

229 taskLabel=task_node.label, 

230 exitCode=exc.EXIT_CODE if fail_fast else None, 

231 ) 

232 if fail_fast: 

233 _LOG.warning("Caught repeatable quantum error for %s (%s):", task_node.label, quantum.dataId) 

234 _LOG.warning(exc, exc_info=True) 

235 sys.exit(exc.EXIT_CODE) 

236 else: 

237 raise 

238 except InvalidQuantumError as exc: 

239 _LOG.fatal("Invalid quantum error for %s (%s): %s", task_node.label, quantum.dataId) 

240 _LOG.fatal(exc, exc_info=True) 

241 report = QuantumReport.from_exception( 

242 quantumId=quantum_id, 

243 exception=exc, 

244 dataId=quantum.dataId, 

245 taskLabel=task_node.label, 

246 exitCode=exc.EXIT_CODE, 

247 ) 

248 sys.exit(exc.EXIT_CODE) 

249 except Exception as exc: 

250 _LOG.debug("exception from task %s dataId %s: %s", task_node.label, quantum.dataId, exc) 

251 report = QuantumReport.from_exception( 

252 quantumId=quantum_id, 

253 exception=exc, 

254 dataId=quantum.dataId, 

255 taskLabel=task_node.label, 

256 ) 

257 raise 

258 finally: 

259 if report is not None: 

260 # If sending fails we do not want this new exception to be 

261 # exposed. 

262 try: 

263 _LOG.debug("sending report for task %s dataId %s", task_node.label, quantum.dataId) 

264 snd_conn.send(report) 

265 except Exception: 

266 pass 

267 

268 def stop(self) -> None: 

269 """Stop the process.""" 

270 assert self.process is not None, "Process must be started" 

271 self.process.terminate() 

272 # give it 1 second to finish or KILL 

273 for _ in range(10): 

274 time.sleep(0.1) 

275 if not self.process.is_alive(): 

276 break 

277 else: 

278 _LOG.debug("Killing process %s", self.process.name) 

279 self.process.kill() 

280 self._terminated = True 

281 

282 def cleanup(self) -> None: 

283 """Release processes resources, has to be called for each finished 

284 process. 

285 """ 

286 if self.process and not self.process.is_alive(): 

287 self.process.close() 

288 self.process = None 

289 self._rcv_conn = None 

290 

291 def report(self) -> QuantumReport: 

292 """Return task report, should be called after process finishes and 

293 before cleanup(). 

294 """ 

295 assert self.process is not None, "Process must be started" 

296 assert self._rcv_conn is not None, "Process must be started" 

297 try: 

298 report = self._rcv_conn.recv() 

299 report.exitCode = self.process.exitcode 

300 except Exception: 

301 # Likely due to the process killed, but there may be other reasons. 

302 # Exit code should not be None, this is to keep mypy happy. 

303 exitcode = self.process.exitcode if self.process.exitcode is not None else -1 

304 assert self.quantum.dataId is not None, "Quantum DataId cannot be None" 

305 report = QuantumReport.from_exit_code( 

306 quantumId=self.quantum_id, 

307 exitCode=exitcode, 

308 dataId=self.quantum.dataId, 

309 taskLabel=self.task_node.label, 

310 ) 

311 if self.terminated: 

312 # Means it was killed, assume it's due to timeout 

313 report.status = ExecutionStatus.TIMEOUT 

314 return report 

315 

316 def failMessage(self) -> str: 

317 """Return a message describing task failure.""" 

318 assert self.process is not None, "Process must be started" 

319 assert self.process.exitcode is not None, "Process has to finish" 

320 exitcode = self.process.exitcode 

321 if exitcode < 0: 

322 # Negative exit code means it is killed by signal 

323 signum = -exitcode 

324 msg = f"Task {self} failed, killed by signal {signum}" 

325 # Just in case this is some very odd signal, expect ValueError 

326 try: 

327 strsignal = signal.strsignal(signum) 

328 msg = f"{msg} ({strsignal})" 

329 except ValueError: 

330 pass 

331 elif exitcode > 0: 

332 msg = f"Task {self} failed, exit code={exitcode}" 

333 else: 

334 msg = "" 

335 return msg 

336 

337 def __str__(self) -> str: 

338 return f"<{self.task_node.label} dataId={self.quantum.dataId}>" 

339 

340 

341class _JobList: 

342 """Simple list of _Job instances with few convenience methods. 

343 

344 Parameters 

345 ---------- 

346 xgraph : `networkx.DiGraph` 

347 Directed acyclic graph of quantum IDs. 

348 """ 

349 

350 def __init__(self, xgraph: networkx.DiGraph): 

351 self.jobs = { 

352 quantum_id: _Job( 

353 quantum_id=quantum_id, 

354 quantum=xgraph.nodes[quantum_id]["quantum"], 

355 task_node=xgraph.nodes[quantum_id]["pipeline_node"], 

356 ) 

357 for quantum_id in xgraph 

358 } 

359 self.walker: GraphWalker[uuid.UUID] = GraphWalker(xgraph.copy()) 

360 self.pending = set(next(self.walker, ())) 

361 self.running: set[uuid.UUID] = set() 

362 self.finished: set[uuid.UUID] = set() 

363 self.failed: set[uuid.UUID] = set() 

364 self.timed_out: set[uuid.UUID] = set() 

365 

366 def submit( 

367 self, 

368 quantumExecutor: QuantumExecutor, 

369 startMethod: Literal["spawn"] | Literal["forkserver"], 

370 fail_fast: bool = False, 

371 ) -> _Job: 

372 """Submit a pending job for execution. 

373 

374 Parameters 

375 ---------- 

376 quantumExecutor : `QuantumExecutor` 

377 Executor for single quantum. 

378 startMethod : `str`, optional 

379 Start method from `multiprocessing` module. 

380 fail_fast : `bool`, optional 

381 If `True` then kill subprocess on RepeatableQuantumError. 

382 

383 Returns 

384 ------- 

385 job : `_Job` 

386 The job that was submitted. 

387 """ 

388 quantum_id = self.pending.pop() 

389 job = self.jobs[quantum_id] 

390 job.start(quantumExecutor, startMethod, fail_fast=fail_fast) 

391 self.running.add(job.quantum_id) 

392 return job 

393 

394 def setJobState(self, job: _Job, state: JobState) -> list[_Job]: 

395 """Update job state. 

396 

397 Parameters 

398 ---------- 

399 job : `_Job` 

400 Job to submit. 

401 state : `JobState` 

402 New job state; note that only the FINISHED, FAILED, and TIMED_OUT 

403 states are acceptable. 

404 

405 Returns 

406 ------- 

407 blocked : `list` [ `_Job` ] 

408 Additional jobs that have been marked as failed because this job 

409 was upstream of them and failed or timed out. 

410 """ 

411 allowedStates = (JobState.FINISHED, JobState.FAILED, JobState.TIMED_OUT) 

412 assert state in allowedStates, f"State {state} not allowed here" 

413 

414 # remove job from pending/running lists 

415 if job.state == JobState.PENDING: 

416 self.pending.remove(job.quantum_id) 

417 elif job.state == JobState.RUNNING: 

418 self.running.remove(job.quantum_id) 

419 

420 quantum_id = job.quantum_id 

421 # it should not be in any of these, but just in case 

422 self.finished.discard(quantum_id) 

423 self.failed.discard(quantum_id) 

424 self.timed_out.discard(quantum_id) 

425 job._state = state 

426 match job.state: 

427 case JobState.FINISHED: 

428 self.finished.add(quantum_id) 

429 self.walker.finish(quantum_id) 

430 self.pending.update(next(self.walker, ())) 

431 return [] 

432 case JobState.FAILED: 

433 self.failed.add(quantum_id) 

434 case JobState.TIMED_OUT: 

435 self.failed.add(quantum_id) 

436 self.timed_out.add(quantum_id) 

437 case _: 

438 raise ValueError(f"Unexpected state value: {state}") 

439 blocked: list[_Job] = [] 

440 for downstream_quantum_id in self.walker.fail(quantum_id): 

441 self.failed.add(downstream_quantum_id) 

442 blocked.append(self.jobs[downstream_quantum_id]) 

443 self.jobs[downstream_quantum_id]._state = JobState.FAILED_DEP 

444 return blocked 

445 

446 def cleanup(self) -> None: 

447 """Do periodic cleanup for jobs that did not finish correctly. 

448 

449 If timed out jobs are killed but take too long to stop then regular 

450 cleanup will not work for them. Here we check all timed out jobs 

451 periodically and do cleanup if they managed to die by this time. 

452 """ 

453 for quantum_id in self.timed_out: 

454 job = self.jobs[quantum_id] 

455 assert job.state == JobState.TIMED_OUT, "Job state should be consistent with the set it's in." 

456 if job.process is not None: 

457 job.cleanup() 

458 

459 

460class MPGraphExecutorError(Exception): 

461 """Exception class for errors raised by MPGraphExecutor.""" 

462 

463 pass 

464 

465 

466class MPTimeoutError(MPGraphExecutorError): 

467 """Exception raised when task execution times out.""" 

468 

469 pass 

470 

471 

472class MPGraphExecutor(QuantumGraphExecutor): 

473 """Implementation of QuantumGraphExecutor using same-host multiprocess 

474 execution of Quanta. 

475 

476 Parameters 

477 ---------- 

478 num_proc : `int` 

479 Number of processes to use for executing tasks. 

480 timeout : `float` 

481 Time in seconds to wait for tasks to finish. 

482 quantum_executor : `.quantum_graph_executor.QuantumExecutor` 

483 Executor for single quantum. For multiprocess-style execution when 

484 ``num_proc`` is greater than one this instance must support pickle. 

485 start_method : `str`, optional 

486 Start method from `multiprocessing` module, `None` selects the best 

487 one for current platform. 

488 fail_fast : `bool`, optional 

489 If set to ``True`` then stop processing on first error from any task. 

490 pdb : `str`, optional 

491 Debugger to import and use (via the ``post_mortem`` function) in the 

492 event of an exception. 

493 execution_graph_fixup : `.execution_graph_fixup.ExecutionGraphFixup`, \ 

494 optional 

495 Instance used for modification of execution graph. 

496 """ 

497 

498 def __init__( 

499 self, 

500 *, 

501 num_proc: int, 

502 timeout: float, 

503 quantum_executor: QuantumExecutor, 

504 start_method: Literal["spawn"] | Literal["forkserver"] | None = None, 

505 fail_fast: bool = False, 

506 pdb: str | None = None, 

507 execution_graph_fixup: ExecutionGraphFixup | None = None, 

508 ): 

509 self._num_proc = num_proc 

510 self._timeout = timeout 

511 self._quantum_executor = quantum_executor 

512 self._fail_fast = fail_fast 

513 self._pdb = pdb 

514 self._execution_graph_fixup = execution_graph_fixup 

515 self._report: Report | None = None 

516 

517 # We set default start method as spawn for all platforms. 

518 if start_method is None: 

519 start_method = "spawn" 

520 self._start_method = start_method 

521 

522 def execute( 

523 self, graph: QuantumGraph | PredictedQuantumGraph, *, provenance_graph_file: str | None = None 

524 ) -> None: 

525 # Docstring inherited from QuantumGraphExecutor.execute 

526 old_graph: QuantumGraph | None = None 

527 if isinstance(graph, QuantumGraph): 

528 old_graph = graph 

529 new_graph = PredictedQuantumGraph.from_old_quantum_graph(old_graph) 

530 else: 

531 new_graph = graph 

532 xgraph = self._make_xgraph(new_graph, old_graph) 

533 self._report = Report(qgraphSummary=new_graph._make_summary()) 

534 err: MPGraphExecutorError | None = None 

535 with ExitStack() as exit_stack: 

536 provenance_writer: ProvenanceQuantumGraphWriter | None = None 

537 if provenance_graph_file is not None: 

538 if provenance_graph_file is not None and self._num_proc > 1: 

539 raise NotImplementedError( 

540 "Provenance writing is not implemented for multiprocess execution." 

541 ) 

542 provenance_writer = ProvenanceQuantumGraphWriter( 

543 provenance_graph_file, 

544 exit_stack=exit_stack, 

545 log_on_close=LogOnClose(_LOG.log), 

546 predicted=new_graph, 

547 ) 

548 try: 

549 if self._num_proc > 1: 

550 self._execute_quanta_mp(xgraph, self._report) 

551 else: 

552 self._execute_quanta_in_process(xgraph, self._report, provenance_writer) 

553 except MPGraphExecutorError as exc: 

554 self._report.set_exception(exc) 

555 err = exc 

556 # Defer re-raising this exception only to let provenance writes 

557 # finish as the ExitStack closes. The original traceback for 

558 # this exception isn't useful anyway. 

559 except Exception as exc: 

560 self._report.set_exception(exc) 

561 raise 

562 if provenance_writer is not None: 

563 provenance_writer.write_overall_inputs() 

564 provenance_writer.write_packages() 

565 provenance_writer.write_init_outputs(assume_existence=True) 

566 if err is not None: 

567 raise err 

568 

569 def _make_xgraph( 

570 self, new_graph: PredictedQuantumGraph, old_graph: QuantumGraph | None 

571 ) -> networkx.DiGraph: 

572 """Obtain a networkx DAG from a quantum graph, applying any fixup and 

573 adding `lsst.daf.butler.Quantum` and `~.pipeline_graph.TaskNode` 

574 attributes. 

575 

576 Parameters 

577 ---------- 

578 new_graph : `.quantum_graph.PredictedQuantumGraph` 

579 New quantum graph object. 

580 old_graph : `.QuantumGraph` or `None` 

581 Equivalent old quantum graph object. 

582 

583 Returns 

584 ------- 

585 xgraph : `networkx.DiGraph` 

586 NetworkX DAG with quantum IDs as node keys. 

587 

588 Raises 

589 ------ 

590 MPGraphExecutorError 

591 Raised if execution graph cannot be ordered after modification, 

592 i.e. it has dependency cycles. 

593 """ 

594 new_graph.build_execution_quanta() 

595 xgraph = new_graph.quantum_only_xgraph.copy() 

596 if self._execution_graph_fixup: 

597 try: 

598 self._execution_graph_fixup.fixup_graph(xgraph, new_graph.quanta_by_task) 

599 except NotImplementedError: 

600 # Backwards compatibility. 

601 if old_graph is None: 

602 old_graph = new_graph.to_old_quantum_graph() 

603 old_graph = self._execution_graph_fixup.fixupQuanta(old_graph) 

604 # Adding all of the edges from old_graph is overkill, but the 

605 # only option we really have to make sure we add any new ones. 

606 xgraph.update([(a.nodeId, b.nodeId) for a, b in old_graph.graph.edges]) 

607 if networkx.dag.has_cycle(xgraph): 

608 raise MPGraphExecutorError("Updated execution graph has dependency cycle.") 

609 return xgraph 

610 

611 def _execute_quanta_in_process( 

612 self, xgraph: networkx.DiGraph, report: Report, provenance_writer: ProvenanceQuantumGraphWriter | None 

613 ) -> None: 

614 """Execute all Quanta in current process. 

615 

616 Parameters 

617 ---------- 

618 xgraph : `networkx.DiGraph` 

619 DAG to execute. Should have quantum IDs for nodes and ``quantum`` 

620 (`lsst.daf.butler.Quantum`) and ``pipeline_node`` 

621 (`lsst.pipe.base.pipeline_graph.TaskNode`) attributes in addition 

622 to those provided by 

623 `.quantum_graph.PredictedQuantumGraph.quantum_only_xgraph`. 

624 report : `Report` 

625 Object for reporting execution status. 

626 provenance_writer : `.quantum_graph.ProvenanceQuantumGraphWriter` or \ 

627 `None` 

628 Object for recording provenance. 

629 """ 

630 

631 def tiebreaker_sort_key(quantum_id: uuid.UUID) -> tuple: 

632 node_state = xgraph.nodes[quantum_id] 

633 return (node_state["task_label"],) + node_state["data_id"].required_values 

634 

635 success_count, failed_count, total_count = 0, 0, len(xgraph.nodes) 

636 walker = GraphWalker[uuid.UUID](xgraph.copy()) 

637 for unblocked_quanta in walker: 

638 for quantum_id in sorted(unblocked_quanta, key=tiebreaker_sort_key): 

639 node_state: PredictedQuantumInfo = xgraph.nodes[quantum_id] 

640 data_id = node_state["data_id"] 

641 task_node = node_state["pipeline_node"] 

642 quantum = node_state["quantum"] 

643 

644 _LOG.debug("Executing %s (%s@%s)", quantum_id, task_node.label, data_id) 

645 fail_exit_code: int | None = None 

646 task_metadata: TaskMetadata | None = None 

647 task_logs = ButlerLogRecords([]) 

648 try: 

649 # For some exception types we want to exit immediately with 

650 # exception-specific exit code, but we still want to start 

651 # debugger before exiting if debugging is enabled. 

652 try: 

653 execution_result = self._quantum_executor.execute( 

654 task_node, quantum, quantum_id=quantum_id, log_records=task_logs 

655 ) 

656 if execution_result.report: 

657 report.quantaReports.append(execution_result.report) 

658 task_metadata = execution_result.task_metadata 

659 success_count += 1 

660 walker.finish(quantum_id) 

661 except RepeatableQuantumError as exc: 

662 if self._fail_fast: 

663 _LOG.warning( 

664 "Caught repeatable quantum error for %s (%s@%s):", 

665 quantum_id, 

666 task_node.label, 

667 data_id, 

668 ) 

669 _LOG.warning(exc, exc_info=True) 

670 fail_exit_code = exc.EXIT_CODE 

671 raise 

672 except InvalidQuantumError as exc: 

673 _LOG.fatal( 

674 "Invalid quantum error for %s (%s@%s):", quantum_id, task_node.label, data_id 

675 ) 

676 _LOG.fatal(exc, exc_info=True) 

677 fail_exit_code = exc.EXIT_CODE 

678 raise 

679 except Exception as exc: 

680 quantum_report = QuantumReport.from_exception( 

681 exception=exc, 

682 dataId=data_id, 

683 taskLabel=task_node.label, 

684 ) 

685 report.quantaReports.append(quantum_report) 

686 

687 if self._pdb and sys.stdin.isatty() and sys.stdout.isatty(): 

688 _LOG.error( 

689 "%s (%s@%s) failed; dropping into pdb.", 

690 quantum_id, 

691 task_node.label, 

692 data_id, 

693 exc_info=exc, 

694 ) 

695 try: 

696 pdb = importlib.import_module(self._pdb) 

697 except ImportError as imp_exc: 

698 raise MPGraphExecutorError( 

699 f"Unable to import specified debugger module ({self._pdb}): {imp_exc}" 

700 ) from exc 

701 if not hasattr(pdb, "post_mortem"): 

702 raise MPGraphExecutorError( 

703 f"Specified debugger module ({self._pdb}) can't debug with post_mortem", 

704 ) from exc 

705 pdb.post_mortem(exc.__traceback__) 

706 

707 report.status = ExecutionStatus.FAILURE 

708 failed_count += 1 

709 

710 # If exception specified an exit code then just exit with 

711 # that code, otherwise crash if fail-fast option is 

712 # enabled. 

713 if fail_exit_code is not None: 

714 sys.exit(fail_exit_code) 

715 if self._fail_fast: 

716 raise MPGraphExecutorError( 

717 f"Quantum {quantum_id} ({task_node.label}@{data_id}) failed." 

718 ) from exc 

719 else: 

720 _LOG.error( 

721 "%s (%s@%s) failed; processing will continue for remaining tasks.", 

722 quantum_id, 

723 task_node.label, 

724 data_id, 

725 exc_info=exc, 

726 ) 

727 

728 for downstream_quantum_id in walker.fail(quantum_id): 

729 downstream_node_state = xgraph.nodes[downstream_quantum_id] 

730 failed_quantum_report = QuantumReport( 

731 status=ExecutionStatus.SKIPPED, 

732 dataId=downstream_node_state["data_id"], 

733 taskLabel=downstream_node_state["task_label"], 

734 ) 

735 report.quantaReports.append(failed_quantum_report) 

736 if provenance_writer is not None: 

737 provenance_writer.write_blocked_quantum_provenance(downstream_quantum_id) 

738 _LOG.error( 

739 "Upstream job failed for task %s (%s@%s), skipping this quantum.", 

740 downstream_quantum_id, 

741 downstream_node_state["task_label"], 

742 downstream_node_state["data_id"], 

743 ) 

744 failed_count += 1 

745 

746 if provenance_writer is not None: 

747 provenance_writer.write_quantum_provenance( 

748 quantum_id, metadata=task_metadata, logs=task_logs 

749 ) 

750 

751 _LOG.info( 

752 "Executed %d quanta successfully, %d failed and %d remain out of total %d quanta.", 

753 success_count, 

754 failed_count, 

755 total_count - success_count - failed_count, 

756 total_count, 

757 ) 

758 

759 # Raise an exception if there were any failures. 

760 if failed_count: 

761 raise MPGraphExecutorError("One or more tasks failed during execution.") 

762 

763 def _execute_quanta_mp(self, xgraph: networkx.DiGraph, report: Report) -> None: 

764 """Execute all Quanta in separate processes. 

765 

766 Parameters 

767 ---------- 

768 xgraph : `networkx.DiGraph` 

769 DAG to execute. Should have quantum IDs for nodes and ``quantum`` 

770 (`lsst.daf.butler.Quantum`) and ``task_node`` 

771 (`lsst.pipe.base.pipeline_graph.TaskNode`) attributes in addition 

772 to those provided by 

773 `.quantum_graph.PredictedQuantumGraph.quantum_only_xgraph`. 

774 report : `Report` 

775 Object for reporting execution status. 

776 """ 

777 disable_implicit_threading() # To prevent thread contention 

778 

779 _LOG.debug("Using %r for multiprocessing start method", self._start_method) 

780 

781 # re-pack input quantum data into jobs list 

782 jobs = _JobList(xgraph) 

783 

784 # check that all tasks can run in sub-process 

785 for job in jobs.jobs.values(): 

786 if not job.task_node.task_class.canMultiprocess: 

787 raise MPGraphExecutorError( 

788 f"Task {job.task_node.label!r} does not support multiprocessing; use single process" 

789 ) 

790 

791 finishedCount, failedCount = 0, 0 

792 while jobs.pending or jobs.running: 

793 _LOG.debug("#pendingJobs: %s", len(jobs.pending)) 

794 _LOG.debug("#runningJobs: %s", len(jobs.running)) 

795 

796 # See if any jobs have finished 

797 for quantum_id in list(jobs.running): # iterate over a copy so we can remove. 

798 job = jobs.jobs[quantum_id] 

799 assert job.process is not None, "Process cannot be None" 

800 blocked: list[_Job] = [] 

801 if not job.process.is_alive(): 

802 _LOG.debug("finished: %s", job) 

803 # finished 

804 exitcode = job.process.exitcode 

805 quantum_report = job.report() 

806 report.quantaReports.append(quantum_report) 

807 if exitcode == 0: 

808 jobs.setJobState(job, JobState.FINISHED) 

809 job.cleanup() 

810 _LOG.debug("success: %s took %.3f seconds", job, time.time() - job.started) 

811 else: 

812 if job.terminated: 

813 # Was killed due to timeout. 

814 if report.status == ExecutionStatus.SUCCESS: 

815 # Do not override global FAILURE status 

816 report.status = ExecutionStatus.TIMEOUT 

817 message = f"Timeout ({self._timeout} sec) for task {job}, task is killed" 

818 blocked = jobs.setJobState(job, JobState.TIMED_OUT) 

819 else: 

820 report.status = ExecutionStatus.FAILURE 

821 # failMessage() has to be called before cleanup() 

822 message = job.failMessage() 

823 blocked = jobs.setJobState(job, JobState.FAILED) 

824 

825 job.cleanup() 

826 _LOG.debug("failed: %s", job) 

827 if self._fail_fast or exitcode == InvalidQuantumError.EXIT_CODE: 

828 # stop all running jobs 

829 for stop_quantum_id in jobs.running: 

830 stop_job = jobs.jobs[stop_quantum_id] 

831 if stop_job is not job: 

832 stop_job.stop() 

833 if job.state is JobState.TIMED_OUT: 

834 raise MPTimeoutError(f"Timeout ({self._timeout} sec) for task {job}.") 

835 else: 

836 raise MPGraphExecutorError(message) 

837 else: 

838 _LOG.error("%s; processing will continue for remaining tasks.", message) 

839 else: 

840 # check for timeout 

841 now = time.time() 

842 if now - job.started > self._timeout: 

843 # Try to kill it, and there is a chance that it 

844 # finishes successfully before it gets killed. Exit 

845 # status is handled by the code above on next 

846 # iteration. 

847 _LOG.debug("Terminating job %s due to timeout", job) 

848 job.stop() 

849 

850 for downstream_job in blocked: 

851 quantum_report = QuantumReport( 

852 quantumId=downstream_job.quantum_id, 

853 status=ExecutionStatus.SKIPPED, 

854 dataId=cast(DataCoordinate, downstream_job.quantum.dataId), 

855 taskLabel=downstream_job.task_node.label, 

856 ) 

857 report.quantaReports.append(quantum_report) 

858 _LOG.error("Upstream job failed for task %s, skipping this task.", downstream_job) 

859 

860 # see if we can start more jobs 

861 while len(jobs.running) < self._num_proc and jobs.pending: 

862 job = jobs.submit(self._quantum_executor, self._start_method) 

863 _LOG.debug("Submitted %s", job) 

864 

865 # Do cleanup for timed out jobs if necessary. 

866 jobs.cleanup() 

867 

868 # Print progress message if something changed. 

869 newFinished, newFailed = len(jobs.finished), len(jobs.failed) 

870 if (finishedCount, failedCount) != (newFinished, newFailed): 

871 finishedCount, failedCount = newFinished, newFailed 

872 totalCount = len(jobs.jobs) 

873 _LOG.info( 

874 "Executed %d quanta successfully, %d failed and %d remain out of total %d quanta.", 

875 finishedCount, 

876 failedCount, 

877 totalCount - finishedCount - failedCount, 

878 totalCount, 

879 ) 

880 

881 # Here we want to wait until one of the running jobs completes 

882 # but multiprocessing does not provide an API for that, for now 

883 # just sleep a little bit and go back to the loop. 

884 if jobs.running: 

885 time.sleep(0.1) 

886 

887 if jobs.failed: 

888 # print list of failed jobs 

889 _LOG.error("Failed jobs:") 

890 for quantum_id in jobs.failed: 

891 job = jobs.jobs[quantum_id] 

892 _LOG.error(" - %s: %s", job.state.name, job) 

893 

894 # if any job failed raise an exception 

895 if jobs.failed == jobs.timed_out: 

896 raise MPTimeoutError("One or more tasks timed out during execution.") 

897 else: 

898 raise MPGraphExecutorError("One or more tasks failed or timed out during execution.") 

899 

900 def getReport(self) -> Report: 

901 # Docstring inherited from base class 

902 if self._report is None: 

903 raise RuntimeError("getReport() called before execute()") 

904 return self._report