Coverage for python / lsst / pipe / base / mp_graph_executor.py: 14%
410 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 08:59 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 08:59 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30__all__ = ["MPGraphExecutor", "MPGraphExecutorError", "MPTimeoutError"]
32import enum
33import importlib
34import logging
35import multiprocessing
36import pickle
37import signal
38import sys
39import threading
40import time
41import uuid
42from contextlib import ExitStack
43from typing import Literal, cast
45import networkx
47from lsst.daf.butler import DataCoordinate, Quantum
48from lsst.daf.butler.cli.cliLog import CliLog
49from lsst.daf.butler.logging import ButlerLogRecords
50from lsst.utils.threads import disable_implicit_threading
52from ._status import InvalidQuantumError, RepeatableQuantumError
53from ._task_metadata import TaskMetadata
54from .execution_graph_fixup import ExecutionGraphFixup
55from .graph import QuantumGraph
56from .graph_walker import GraphWalker
57from .log_on_close import LogOnClose
58from .pipeline_graph import TaskNode
59from .quantum_graph import PredictedQuantumGraph, PredictedQuantumInfo, ProvenanceQuantumGraphWriter
60from .quantum_graph_executor import QuantumExecutor, QuantumGraphExecutor
61from .quantum_reports import ExecutionStatus, QuantumReport, Report
63_LOG = logging.getLogger(__name__)
66class JobState(enum.Enum):
67 """Possible state for an executing task."""
69 PENDING = enum.auto()
70 """The job has not started yet."""
72 RUNNING = enum.auto()
73 """The job is currently executing."""
75 FINISHED = enum.auto()
76 """The job finished successfully."""
78 FAILED = enum.auto()
79 """The job execution failed (process returned non-zero status)."""
81 TIMED_OUT = enum.auto()
82 """The job was killed due to too long execution time."""
84 FAILED_DEP = enum.auto()
85 """One of the dependencies of this job failed or timed out."""
88class _Job:
89 """Class representing a job running single task.
91 Parameters
92 ----------
93 quantum_id : `uuid.UUID`
94 ID of the quantum this job executes.
95 quantum : `lsst.daf.butler.Quantum`
96 Description of the inputs and outputs.
97 task_node : `.pipeline_graph.TaskNode`
98 Description of the task and configuration.
99 """
101 def __init__(self, quantum_id: uuid.UUID, quantum: Quantum, task_node: TaskNode):
102 self.quantum_id = quantum_id
103 self.quantum = quantum
104 self.task_node = task_node
105 self.process: multiprocessing.process.BaseProcess | None = None
106 self._state = JobState.PENDING
107 self.started: float = 0.0
108 self._rcv_conn: multiprocessing.connection.Connection | None = None
109 self._terminated = False
111 @property
112 def state(self) -> JobState:
113 """Job processing state (JobState)."""
114 return self._state
116 @property
117 def terminated(self) -> bool:
118 """Return `True` if job was killed by stop() method and negative exit
119 code is returned from child process (`bool`).
120 """
121 if self._terminated:
122 assert self.process is not None, "Process must be started"
123 if self.process.exitcode is not None:
124 return self.process.exitcode < 0
125 return False
127 def start(
128 self,
129 quantumExecutor: QuantumExecutor,
130 startMethod: Literal["spawn"] | Literal["forkserver"],
131 fail_fast: bool,
132 ) -> None:
133 """Start process which runs the task.
135 Parameters
136 ----------
137 quantumExecutor : `QuantumExecutor`
138 Executor for single quantum.
139 startMethod : `str`, optional
140 Start method from `multiprocessing` module.
141 fail_fast : `bool`, optional
142 If `True` then kill subprocess on RepeatableQuantumError.
143 """
144 # Unpickling of quantum has to happen after butler/executor, also we
145 # want to setup logging before unpickling anything that can generate
146 # messages, this is why things are pickled manually here.
147 qe_pickle = pickle.dumps(quantumExecutor)
148 task_node_pickle = pickle.dumps(self.task_node)
149 quantum_pickle = pickle.dumps(self.quantum)
150 self._rcv_conn, snd_conn = multiprocessing.Pipe(False)
151 logConfigState = CliLog.configState
153 mp_ctx = multiprocessing.get_context(startMethod)
154 self.process = mp_ctx.Process( # type: ignore[attr-defined]
155 target=_Job._executeJob,
156 args=(
157 qe_pickle,
158 task_node_pickle,
159 quantum_pickle,
160 self.quantum_id,
161 logConfigState,
162 snd_conn,
163 fail_fast,
164 ),
165 name=f"task-{self.quantum.dataId}",
166 )
167 # mypy is getting confused by multiprocessing.
168 assert self.process is not None
169 self.process.start()
170 self.started = time.time()
171 self._state = JobState.RUNNING
173 @staticmethod
174 def _executeJob(
175 quantumExecutor_pickle: bytes,
176 task_node_pickle: bytes,
177 quantum_pickle: bytes,
178 quantum_id: uuid.UUID,
179 logConfigState: list,
180 snd_conn: multiprocessing.connection.Connection,
181 fail_fast: bool,
182 ) -> None:
183 """Execute a job with arguments.
185 Parameters
186 ----------
187 quantumExecutor_pickle : `bytes`
188 Executor for single quantum, pickled.
189 task_node_pickle : `bytes`
190 Task definition structure, pickled.
191 quantum_pickle : `bytes`
192 Quantum for this task execution in pickled form.
193 quantum_id : `uuid.UUID`
194 Unique ID for the quantum.
195 logConfigState : `list`
196 Logging state from parent process.
197 snd_conn : `multiprocessing.Connection`
198 Connection to send job report to parent process.
199 fail_fast : `bool`
200 If `True` then kill subprocess on RepeatableQuantumError.
201 """
202 # This terrible hack is a workaround for Python threading bug:
203 # https://github.com/python/cpython/issues/102512. Should be removed
204 # when fix for that bug is deployed. Inspired by
205 # https://github.com/QubesOS/qubes-core-admin-client/pull/236/files.
206 thread = threading.current_thread()
207 if isinstance(thread, threading._DummyThread):
208 if getattr(thread, "_tstate_lock", "") is None:
209 thread._set_tstate_lock() # type: ignore[attr-defined]
211 if logConfigState and not CliLog.configState:
212 # means that we are in a new spawned Python process and we have to
213 # re-initialize logging
214 CliLog.replayConfigState(logConfigState)
216 quantumExecutor: QuantumExecutor = pickle.loads(quantumExecutor_pickle)
217 task_node: TaskNode = pickle.loads(task_node_pickle)
218 quantum = pickle.loads(quantum_pickle)
219 report: QuantumReport | None = None
220 # Catch a few known failure modes and stop the process immediately,
221 # with exception-specific exit code.
222 try:
223 _, report = quantumExecutor.execute(task_node, quantum, quantum_id=quantum_id)
224 except RepeatableQuantumError as exc:
225 report = QuantumReport.from_exception(
226 quantumId=quantum_id,
227 exception=exc,
228 dataId=quantum.dataId,
229 taskLabel=task_node.label,
230 exitCode=exc.EXIT_CODE if fail_fast else None,
231 )
232 if fail_fast:
233 _LOG.warning("Caught repeatable quantum error for %s (%s):", task_node.label, quantum.dataId)
234 _LOG.warning(exc, exc_info=True)
235 sys.exit(exc.EXIT_CODE)
236 else:
237 raise
238 except InvalidQuantumError as exc:
239 _LOG.fatal("Invalid quantum error for %s (%s): %s", task_node.label, quantum.dataId)
240 _LOG.fatal(exc, exc_info=True)
241 report = QuantumReport.from_exception(
242 quantumId=quantum_id,
243 exception=exc,
244 dataId=quantum.dataId,
245 taskLabel=task_node.label,
246 exitCode=exc.EXIT_CODE,
247 )
248 sys.exit(exc.EXIT_CODE)
249 except Exception as exc:
250 _LOG.debug("exception from task %s dataId %s: %s", task_node.label, quantum.dataId, exc)
251 report = QuantumReport.from_exception(
252 quantumId=quantum_id,
253 exception=exc,
254 dataId=quantum.dataId,
255 taskLabel=task_node.label,
256 )
257 raise
258 finally:
259 if report is not None:
260 # If sending fails we do not want this new exception to be
261 # exposed.
262 try:
263 _LOG.debug("sending report for task %s dataId %s", task_node.label, quantum.dataId)
264 snd_conn.send(report)
265 except Exception:
266 pass
268 def stop(self) -> None:
269 """Stop the process."""
270 assert self.process is not None, "Process must be started"
271 self.process.terminate()
272 # give it 1 second to finish or KILL
273 for _ in range(10):
274 time.sleep(0.1)
275 if not self.process.is_alive():
276 break
277 else:
278 _LOG.debug("Killing process %s", self.process.name)
279 self.process.kill()
280 self._terminated = True
282 def cleanup(self) -> None:
283 """Release processes resources, has to be called for each finished
284 process.
285 """
286 if self.process and not self.process.is_alive():
287 self.process.close()
288 self.process = None
289 self._rcv_conn = None
291 def report(self) -> QuantumReport:
292 """Return task report, should be called after process finishes and
293 before cleanup().
294 """
295 assert self.process is not None, "Process must be started"
296 assert self._rcv_conn is not None, "Process must be started"
297 try:
298 report = self._rcv_conn.recv()
299 report.exitCode = self.process.exitcode
300 except Exception:
301 # Likely due to the process killed, but there may be other reasons.
302 # Exit code should not be None, this is to keep mypy happy.
303 exitcode = self.process.exitcode if self.process.exitcode is not None else -1
304 assert self.quantum.dataId is not None, "Quantum DataId cannot be None"
305 report = QuantumReport.from_exit_code(
306 quantumId=self.quantum_id,
307 exitCode=exitcode,
308 dataId=self.quantum.dataId,
309 taskLabel=self.task_node.label,
310 )
311 if self.terminated:
312 # Means it was killed, assume it's due to timeout
313 report.status = ExecutionStatus.TIMEOUT
314 return report
316 def failMessage(self) -> str:
317 """Return a message describing task failure."""
318 assert self.process is not None, "Process must be started"
319 assert self.process.exitcode is not None, "Process has to finish"
320 exitcode = self.process.exitcode
321 if exitcode < 0:
322 # Negative exit code means it is killed by signal
323 signum = -exitcode
324 msg = f"Task {self} failed, killed by signal {signum}"
325 # Just in case this is some very odd signal, expect ValueError
326 try:
327 strsignal = signal.strsignal(signum)
328 msg = f"{msg} ({strsignal})"
329 except ValueError:
330 pass
331 elif exitcode > 0:
332 msg = f"Task {self} failed, exit code={exitcode}"
333 else:
334 msg = ""
335 return msg
337 def __str__(self) -> str:
338 return f"<{self.task_node.label} dataId={self.quantum.dataId}>"
341class _JobList:
342 """Simple list of _Job instances with few convenience methods.
344 Parameters
345 ----------
346 xgraph : `networkx.DiGraph`
347 Directed acyclic graph of quantum IDs.
348 """
350 def __init__(self, xgraph: networkx.DiGraph):
351 self.jobs = {
352 quantum_id: _Job(
353 quantum_id=quantum_id,
354 quantum=xgraph.nodes[quantum_id]["quantum"],
355 task_node=xgraph.nodes[quantum_id]["pipeline_node"],
356 )
357 for quantum_id in xgraph
358 }
359 self.walker: GraphWalker[uuid.UUID] = GraphWalker(xgraph.copy())
360 self.pending = set(next(self.walker, ()))
361 self.running: set[uuid.UUID] = set()
362 self.finished: set[uuid.UUID] = set()
363 self.failed: set[uuid.UUID] = set()
364 self.timed_out: set[uuid.UUID] = set()
366 def submit(
367 self,
368 quantumExecutor: QuantumExecutor,
369 startMethod: Literal["spawn"] | Literal["forkserver"],
370 fail_fast: bool = False,
371 ) -> _Job:
372 """Submit a pending job for execution.
374 Parameters
375 ----------
376 quantumExecutor : `QuantumExecutor`
377 Executor for single quantum.
378 startMethod : `str`, optional
379 Start method from `multiprocessing` module.
380 fail_fast : `bool`, optional
381 If `True` then kill subprocess on RepeatableQuantumError.
383 Returns
384 -------
385 job : `_Job`
386 The job that was submitted.
387 """
388 quantum_id = self.pending.pop()
389 job = self.jobs[quantum_id]
390 job.start(quantumExecutor, startMethod, fail_fast=fail_fast)
391 self.running.add(job.quantum_id)
392 return job
394 def setJobState(self, job: _Job, state: JobState) -> list[_Job]:
395 """Update job state.
397 Parameters
398 ----------
399 job : `_Job`
400 Job to submit.
401 state : `JobState`
402 New job state; note that only the FINISHED, FAILED, and TIMED_OUT
403 states are acceptable.
405 Returns
406 -------
407 blocked : `list` [ `_Job` ]
408 Additional jobs that have been marked as failed because this job
409 was upstream of them and failed or timed out.
410 """
411 allowedStates = (JobState.FINISHED, JobState.FAILED, JobState.TIMED_OUT)
412 assert state in allowedStates, f"State {state} not allowed here"
414 # remove job from pending/running lists
415 if job.state == JobState.PENDING:
416 self.pending.remove(job.quantum_id)
417 elif job.state == JobState.RUNNING:
418 self.running.remove(job.quantum_id)
420 quantum_id = job.quantum_id
421 # it should not be in any of these, but just in case
422 self.finished.discard(quantum_id)
423 self.failed.discard(quantum_id)
424 self.timed_out.discard(quantum_id)
425 job._state = state
426 match job.state:
427 case JobState.FINISHED:
428 self.finished.add(quantum_id)
429 self.walker.finish(quantum_id)
430 self.pending.update(next(self.walker, ()))
431 return []
432 case JobState.FAILED:
433 self.failed.add(quantum_id)
434 case JobState.TIMED_OUT:
435 self.failed.add(quantum_id)
436 self.timed_out.add(quantum_id)
437 case _:
438 raise ValueError(f"Unexpected state value: {state}")
439 blocked: list[_Job] = []
440 for downstream_quantum_id in self.walker.fail(quantum_id):
441 self.failed.add(downstream_quantum_id)
442 blocked.append(self.jobs[downstream_quantum_id])
443 self.jobs[downstream_quantum_id]._state = JobState.FAILED_DEP
444 return blocked
446 def cleanup(self) -> None:
447 """Do periodic cleanup for jobs that did not finish correctly.
449 If timed out jobs are killed but take too long to stop then regular
450 cleanup will not work for them. Here we check all timed out jobs
451 periodically and do cleanup if they managed to die by this time.
452 """
453 for quantum_id in self.timed_out:
454 job = self.jobs[quantum_id]
455 assert job.state == JobState.TIMED_OUT, "Job state should be consistent with the set it's in."
456 if job.process is not None:
457 job.cleanup()
460class MPGraphExecutorError(Exception):
461 """Exception class for errors raised by MPGraphExecutor."""
463 pass
466class MPTimeoutError(MPGraphExecutorError):
467 """Exception raised when task execution times out."""
469 pass
472class MPGraphExecutor(QuantumGraphExecutor):
473 """Implementation of QuantumGraphExecutor using same-host multiprocess
474 execution of Quanta.
476 Parameters
477 ----------
478 num_proc : `int`
479 Number of processes to use for executing tasks.
480 timeout : `float`
481 Time in seconds to wait for tasks to finish.
482 quantum_executor : `.quantum_graph_executor.QuantumExecutor`
483 Executor for single quantum. For multiprocess-style execution when
484 ``num_proc`` is greater than one this instance must support pickle.
485 start_method : `str`, optional
486 Start method from `multiprocessing` module, `None` selects the best
487 one for current platform.
488 fail_fast : `bool`, optional
489 If set to ``True`` then stop processing on first error from any task.
490 pdb : `str`, optional
491 Debugger to import and use (via the ``post_mortem`` function) in the
492 event of an exception.
493 execution_graph_fixup : `.execution_graph_fixup.ExecutionGraphFixup`, \
494 optional
495 Instance used for modification of execution graph.
496 """
498 def __init__(
499 self,
500 *,
501 num_proc: int,
502 timeout: float,
503 quantum_executor: QuantumExecutor,
504 start_method: Literal["spawn"] | Literal["forkserver"] | None = None,
505 fail_fast: bool = False,
506 pdb: str | None = None,
507 execution_graph_fixup: ExecutionGraphFixup | None = None,
508 ):
509 self._num_proc = num_proc
510 self._timeout = timeout
511 self._quantum_executor = quantum_executor
512 self._fail_fast = fail_fast
513 self._pdb = pdb
514 self._execution_graph_fixup = execution_graph_fixup
515 self._report: Report | None = None
517 # We set default start method as spawn for all platforms.
518 if start_method is None:
519 start_method = "spawn"
520 self._start_method = start_method
522 def execute(
523 self, graph: QuantumGraph | PredictedQuantumGraph, *, provenance_graph_file: str | None = None
524 ) -> None:
525 # Docstring inherited from QuantumGraphExecutor.execute
526 old_graph: QuantumGraph | None = None
527 if isinstance(graph, QuantumGraph):
528 old_graph = graph
529 new_graph = PredictedQuantumGraph.from_old_quantum_graph(old_graph)
530 else:
531 new_graph = graph
532 xgraph = self._make_xgraph(new_graph, old_graph)
533 self._report = Report(qgraphSummary=new_graph._make_summary())
534 err: MPGraphExecutorError | None = None
535 with ExitStack() as exit_stack:
536 provenance_writer: ProvenanceQuantumGraphWriter | None = None
537 if provenance_graph_file is not None:
538 if provenance_graph_file is not None and self._num_proc > 1:
539 raise NotImplementedError(
540 "Provenance writing is not implemented for multiprocess execution."
541 )
542 provenance_writer = ProvenanceQuantumGraphWriter(
543 provenance_graph_file,
544 exit_stack=exit_stack,
545 log_on_close=LogOnClose(_LOG.log),
546 predicted=new_graph,
547 )
548 try:
549 if self._num_proc > 1:
550 self._execute_quanta_mp(xgraph, self._report)
551 else:
552 self._execute_quanta_in_process(xgraph, self._report, provenance_writer)
553 except MPGraphExecutorError as exc:
554 self._report.set_exception(exc)
555 err = exc
556 # Defer re-raising this exception only to let provenance writes
557 # finish as the ExitStack closes. The original traceback for
558 # this exception isn't useful anyway.
559 except Exception as exc:
560 self._report.set_exception(exc)
561 raise
562 if provenance_writer is not None:
563 provenance_writer.write_overall_inputs()
564 provenance_writer.write_packages()
565 provenance_writer.write_init_outputs(assume_existence=True)
566 if err is not None:
567 raise err
569 def _make_xgraph(
570 self, new_graph: PredictedQuantumGraph, old_graph: QuantumGraph | None
571 ) -> networkx.DiGraph:
572 """Obtain a networkx DAG from a quantum graph, applying any fixup and
573 adding `lsst.daf.butler.Quantum` and `~.pipeline_graph.TaskNode`
574 attributes.
576 Parameters
577 ----------
578 new_graph : `.quantum_graph.PredictedQuantumGraph`
579 New quantum graph object.
580 old_graph : `.QuantumGraph` or `None`
581 Equivalent old quantum graph object.
583 Returns
584 -------
585 xgraph : `networkx.DiGraph`
586 NetworkX DAG with quantum IDs as node keys.
588 Raises
589 ------
590 MPGraphExecutorError
591 Raised if execution graph cannot be ordered after modification,
592 i.e. it has dependency cycles.
593 """
594 new_graph.build_execution_quanta()
595 xgraph = new_graph.quantum_only_xgraph.copy()
596 if self._execution_graph_fixup:
597 try:
598 self._execution_graph_fixup.fixup_graph(xgraph, new_graph.quanta_by_task)
599 except NotImplementedError:
600 # Backwards compatibility.
601 if old_graph is None:
602 old_graph = new_graph.to_old_quantum_graph()
603 old_graph = self._execution_graph_fixup.fixupQuanta(old_graph)
604 # Adding all of the edges from old_graph is overkill, but the
605 # only option we really have to make sure we add any new ones.
606 xgraph.update([(a.nodeId, b.nodeId) for a, b in old_graph.graph.edges])
607 if networkx.dag.has_cycle(xgraph):
608 raise MPGraphExecutorError("Updated execution graph has dependency cycle.")
609 return xgraph
611 def _execute_quanta_in_process(
612 self, xgraph: networkx.DiGraph, report: Report, provenance_writer: ProvenanceQuantumGraphWriter | None
613 ) -> None:
614 """Execute all Quanta in current process.
616 Parameters
617 ----------
618 xgraph : `networkx.DiGraph`
619 DAG to execute. Should have quantum IDs for nodes and ``quantum``
620 (`lsst.daf.butler.Quantum`) and ``pipeline_node``
621 (`lsst.pipe.base.pipeline_graph.TaskNode`) attributes in addition
622 to those provided by
623 `.quantum_graph.PredictedQuantumGraph.quantum_only_xgraph`.
624 report : `Report`
625 Object for reporting execution status.
626 provenance_writer : `.quantum_graph.ProvenanceQuantumGraphWriter` or \
627 `None`
628 Object for recording provenance.
629 """
631 def tiebreaker_sort_key(quantum_id: uuid.UUID) -> tuple:
632 node_state = xgraph.nodes[quantum_id]
633 return (node_state["task_label"],) + node_state["data_id"].required_values
635 success_count, failed_count, total_count = 0, 0, len(xgraph.nodes)
636 walker = GraphWalker[uuid.UUID](xgraph.copy())
637 for unblocked_quanta in walker:
638 for quantum_id in sorted(unblocked_quanta, key=tiebreaker_sort_key):
639 node_state: PredictedQuantumInfo = xgraph.nodes[quantum_id]
640 data_id = node_state["data_id"]
641 task_node = node_state["pipeline_node"]
642 quantum = node_state["quantum"]
644 _LOG.debug("Executing %s (%s@%s)", quantum_id, task_node.label, data_id)
645 fail_exit_code: int | None = None
646 task_metadata: TaskMetadata | None = None
647 task_logs = ButlerLogRecords([])
648 try:
649 # For some exception types we want to exit immediately with
650 # exception-specific exit code, but we still want to start
651 # debugger before exiting if debugging is enabled.
652 try:
653 execution_result = self._quantum_executor.execute(
654 task_node, quantum, quantum_id=quantum_id, log_records=task_logs
655 )
656 if execution_result.report:
657 report.quantaReports.append(execution_result.report)
658 task_metadata = execution_result.task_metadata
659 success_count += 1
660 walker.finish(quantum_id)
661 except RepeatableQuantumError as exc:
662 if self._fail_fast:
663 _LOG.warning(
664 "Caught repeatable quantum error for %s (%s@%s):",
665 quantum_id,
666 task_node.label,
667 data_id,
668 )
669 _LOG.warning(exc, exc_info=True)
670 fail_exit_code = exc.EXIT_CODE
671 raise
672 except InvalidQuantumError as exc:
673 _LOG.fatal(
674 "Invalid quantum error for %s (%s@%s):", quantum_id, task_node.label, data_id
675 )
676 _LOG.fatal(exc, exc_info=True)
677 fail_exit_code = exc.EXIT_CODE
678 raise
679 except Exception as exc:
680 quantum_report = QuantumReport.from_exception(
681 exception=exc,
682 dataId=data_id,
683 taskLabel=task_node.label,
684 )
685 report.quantaReports.append(quantum_report)
687 if self._pdb and sys.stdin.isatty() and sys.stdout.isatty():
688 _LOG.error(
689 "%s (%s@%s) failed; dropping into pdb.",
690 quantum_id,
691 task_node.label,
692 data_id,
693 exc_info=exc,
694 )
695 try:
696 pdb = importlib.import_module(self._pdb)
697 except ImportError as imp_exc:
698 raise MPGraphExecutorError(
699 f"Unable to import specified debugger module ({self._pdb}): {imp_exc}"
700 ) from exc
701 if not hasattr(pdb, "post_mortem"):
702 raise MPGraphExecutorError(
703 f"Specified debugger module ({self._pdb}) can't debug with post_mortem",
704 ) from exc
705 pdb.post_mortem(exc.__traceback__)
707 report.status = ExecutionStatus.FAILURE
708 failed_count += 1
710 # If exception specified an exit code then just exit with
711 # that code, otherwise crash if fail-fast option is
712 # enabled.
713 if fail_exit_code is not None:
714 sys.exit(fail_exit_code)
715 if self._fail_fast:
716 raise MPGraphExecutorError(
717 f"Quantum {quantum_id} ({task_node.label}@{data_id}) failed."
718 ) from exc
719 else:
720 _LOG.error(
721 "%s (%s@%s) failed; processing will continue for remaining tasks.",
722 quantum_id,
723 task_node.label,
724 data_id,
725 exc_info=exc,
726 )
728 for downstream_quantum_id in walker.fail(quantum_id):
729 downstream_node_state = xgraph.nodes[downstream_quantum_id]
730 failed_quantum_report = QuantumReport(
731 status=ExecutionStatus.SKIPPED,
732 dataId=downstream_node_state["data_id"],
733 taskLabel=downstream_node_state["task_label"],
734 )
735 report.quantaReports.append(failed_quantum_report)
736 if provenance_writer is not None:
737 provenance_writer.write_blocked_quantum_provenance(downstream_quantum_id)
738 _LOG.error(
739 "Upstream job failed for task %s (%s@%s), skipping this quantum.",
740 downstream_quantum_id,
741 downstream_node_state["task_label"],
742 downstream_node_state["data_id"],
743 )
744 failed_count += 1
746 if provenance_writer is not None:
747 provenance_writer.write_quantum_provenance(
748 quantum_id, metadata=task_metadata, logs=task_logs
749 )
751 _LOG.info(
752 "Executed %d quanta successfully, %d failed and %d remain out of total %d quanta.",
753 success_count,
754 failed_count,
755 total_count - success_count - failed_count,
756 total_count,
757 )
759 # Raise an exception if there were any failures.
760 if failed_count:
761 raise MPGraphExecutorError("One or more tasks failed during execution.")
763 def _execute_quanta_mp(self, xgraph: networkx.DiGraph, report: Report) -> None:
764 """Execute all Quanta in separate processes.
766 Parameters
767 ----------
768 xgraph : `networkx.DiGraph`
769 DAG to execute. Should have quantum IDs for nodes and ``quantum``
770 (`lsst.daf.butler.Quantum`) and ``task_node``
771 (`lsst.pipe.base.pipeline_graph.TaskNode`) attributes in addition
772 to those provided by
773 `.quantum_graph.PredictedQuantumGraph.quantum_only_xgraph`.
774 report : `Report`
775 Object for reporting execution status.
776 """
777 disable_implicit_threading() # To prevent thread contention
779 _LOG.debug("Using %r for multiprocessing start method", self._start_method)
781 # re-pack input quantum data into jobs list
782 jobs = _JobList(xgraph)
784 # check that all tasks can run in sub-process
785 for job in jobs.jobs.values():
786 if not job.task_node.task_class.canMultiprocess:
787 raise MPGraphExecutorError(
788 f"Task {job.task_node.label!r} does not support multiprocessing; use single process"
789 )
791 finishedCount, failedCount = 0, 0
792 while jobs.pending or jobs.running:
793 _LOG.debug("#pendingJobs: %s", len(jobs.pending))
794 _LOG.debug("#runningJobs: %s", len(jobs.running))
796 # See if any jobs have finished
797 for quantum_id in list(jobs.running): # iterate over a copy so we can remove.
798 job = jobs.jobs[quantum_id]
799 assert job.process is not None, "Process cannot be None"
800 blocked: list[_Job] = []
801 if not job.process.is_alive():
802 _LOG.debug("finished: %s", job)
803 # finished
804 exitcode = job.process.exitcode
805 quantum_report = job.report()
806 report.quantaReports.append(quantum_report)
807 if exitcode == 0:
808 jobs.setJobState(job, JobState.FINISHED)
809 job.cleanup()
810 _LOG.debug("success: %s took %.3f seconds", job, time.time() - job.started)
811 else:
812 if job.terminated:
813 # Was killed due to timeout.
814 if report.status == ExecutionStatus.SUCCESS:
815 # Do not override global FAILURE status
816 report.status = ExecutionStatus.TIMEOUT
817 message = f"Timeout ({self._timeout} sec) for task {job}, task is killed"
818 blocked = jobs.setJobState(job, JobState.TIMED_OUT)
819 else:
820 report.status = ExecutionStatus.FAILURE
821 # failMessage() has to be called before cleanup()
822 message = job.failMessage()
823 blocked = jobs.setJobState(job, JobState.FAILED)
825 job.cleanup()
826 _LOG.debug("failed: %s", job)
827 if self._fail_fast or exitcode == InvalidQuantumError.EXIT_CODE:
828 # stop all running jobs
829 for stop_quantum_id in jobs.running:
830 stop_job = jobs.jobs[stop_quantum_id]
831 if stop_job is not job:
832 stop_job.stop()
833 if job.state is JobState.TIMED_OUT:
834 raise MPTimeoutError(f"Timeout ({self._timeout} sec) for task {job}.")
835 else:
836 raise MPGraphExecutorError(message)
837 else:
838 _LOG.error("%s; processing will continue for remaining tasks.", message)
839 else:
840 # check for timeout
841 now = time.time()
842 if now - job.started > self._timeout:
843 # Try to kill it, and there is a chance that it
844 # finishes successfully before it gets killed. Exit
845 # status is handled by the code above on next
846 # iteration.
847 _LOG.debug("Terminating job %s due to timeout", job)
848 job.stop()
850 for downstream_job in blocked:
851 quantum_report = QuantumReport(
852 quantumId=downstream_job.quantum_id,
853 status=ExecutionStatus.SKIPPED,
854 dataId=cast(DataCoordinate, downstream_job.quantum.dataId),
855 taskLabel=downstream_job.task_node.label,
856 )
857 report.quantaReports.append(quantum_report)
858 _LOG.error("Upstream job failed for task %s, skipping this task.", downstream_job)
860 # see if we can start more jobs
861 while len(jobs.running) < self._num_proc and jobs.pending:
862 job = jobs.submit(self._quantum_executor, self._start_method)
863 _LOG.debug("Submitted %s", job)
865 # Do cleanup for timed out jobs if necessary.
866 jobs.cleanup()
868 # Print progress message if something changed.
869 newFinished, newFailed = len(jobs.finished), len(jobs.failed)
870 if (finishedCount, failedCount) != (newFinished, newFailed):
871 finishedCount, failedCount = newFinished, newFailed
872 totalCount = len(jobs.jobs)
873 _LOG.info(
874 "Executed %d quanta successfully, %d failed and %d remain out of total %d quanta.",
875 finishedCount,
876 failedCount,
877 totalCount - finishedCount - failedCount,
878 totalCount,
879 )
881 # Here we want to wait until one of the running jobs completes
882 # but multiprocessing does not provide an API for that, for now
883 # just sleep a little bit and go back to the loop.
884 if jobs.running:
885 time.sleep(0.1)
887 if jobs.failed:
888 # print list of failed jobs
889 _LOG.error("Failed jobs:")
890 for quantum_id in jobs.failed:
891 job = jobs.jobs[quantum_id]
892 _LOG.error(" - %s: %s", job.state.name, job)
894 # if any job failed raise an exception
895 if jobs.failed == jobs.timed_out:
896 raise MPTimeoutError("One or more tasks timed out during execution.")
897 else:
898 raise MPGraphExecutorError("One or more tasks failed or timed out during execution.")
900 def getReport(self) -> Report:
901 # Docstring inherited from base class
902 if self._report is None:
903 raise RuntimeError("getReport() called before execute()")
904 return self._report