Coverage for python / lsst / pipe / base / pipeline_graph / visualization / _status_annotator.py: 73%
93 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:47 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:47 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
27from __future__ import annotations
29__all__ = (
30 "QuantumGraphExecutionStatusAnnotator",
31 "QuantumGraphExecutionStatusOptions",
32 "QuantumProvenanceGraphStatusAnnotator",
33 "QuantumProvenanceGraphStatusOptions",
34)
36import dataclasses
37from typing import TYPE_CHECKING, Any, Literal, Protocol, overload
39import networkx
41from .._nodes import NodeKey, NodeType
43if TYPE_CHECKING:
44 from ... import quantum_provenance_graph as qpg
46# ANSI color codes.
47GREEN = "\033[32m"
48RED = "\033[31m"
49YELLOW = "\033[33m"
50CYAN = "\033[36m"
51WHITE = "\033[37m"
52GRAY = "\033[90m"
53MAGENTA = "\033[35m"
54BROWN = "\u001b[38;5;130m"
55RESET = "\033[0m"
58@dataclasses.dataclass
59class TaskStatusInfo:
60 """Holds status information for a task."""
62 expected: int
63 succeeded: int
64 failed: int
65 blocked: int
66 ready: int | None = None
67 running: int | None = None
68 wonky: int | None = None
69 unknown: int | None = None
72@dataclasses.dataclass
73class DatasetTypeStatusInfo:
74 """Holds status information for a dataset type."""
76 expected: int
77 produced: int
80@dataclasses.dataclass
81class StatusColors:
82 """Base class for holding ANSI color codes for different progress segments
83 or statuses.
84 """
86 # Base task status colors.
87 expected: str = WHITE
88 succeeded: str = GREEN
89 failed: str = RED
91 # Base dataset type status colors.
92 produced: str = GREEN
94 # Reset to default color.
95 reset: str = RESET
98@dataclasses.dataclass
99class QuantumGraphExecutionStatusColors(StatusColors):
100 """Holds ANSI color codes for different progress segments or statuses for
101 quantum graph execution reports.
103 Status colors for both task and dataset type nodes are included.
104 """
106 def __post_init__(self) -> None:
107 raise NotImplementedError("`QuantumGraphExecutionStatusColors` is not implemented yet.")
110@dataclasses.dataclass
111class QuantumProvenanceGraphStatusColors(StatusColors):
112 """Holds ANSI color codes for different progress segments or statuses for
113 quantum provenance graph reports.
115 Status colors for both task and dataset type nodes are included.
116 """
118 # Additional task status colors.
119 blocked: str = YELLOW
120 ready: str = GRAY
121 running: str = MAGENTA
122 wonky: str = CYAN
123 unknown: str = BROWN
126@dataclasses.dataclass
127class NodeStatusOptions:
128 """Base options for node status visualization.
130 Attributes
131 ----------
132 colors : `StatusColors`
133 A dataclass specifying ANSI color codes for distinct progress segments
134 or statuses.
135 display_percent : `bool`
136 Whether to show percentage of progress.
137 display_counts : `bool`
138 Whether to show numeric counts (e.g., succeeded/expected).
139 visualize : `bool`
140 If `True`, status information for task or dataset type nodes will be
141 visually indicated by segmented fills in text-based bars or flowchart
142 nodes.
143 min_bar_width : `int`
144 Minimum width of the visualized progress bar in characters. Only counts
145 the width of the bar itself, not any surrounding text. Only relevant if
146 `visualize` is `True` and it's a text-based visualization.
147 abbreviate : `bool`
148 If `True`, status labels will be abbreviated to save space. For
149 example, 'expected' will be abbreviated to 'exp' and 'blocked' to
150 'blk'.
151 """
153 colors: QuantumGraphExecutionStatusColors | QuantumProvenanceGraphStatusColors
154 display_percent: bool = True
155 display_counts: bool = True
156 visualize: bool = True
157 min_bar_width: int = 15
158 abbreviate: bool = True
160 def __post_init__(self) -> None:
161 if not (self.display_percent or self.display_counts or self.visualize):
162 raise ValueError(
163 "At least one of 'display_percent', 'display_counts', or 'visualize' must be True."
164 )
167@dataclasses.dataclass
168class QuantumGraphExecutionStatusOptions(NodeStatusOptions):
169 """Specialized status options for quantum graph execution reports."""
171 colors: QuantumGraphExecutionStatusColors = dataclasses.field(
172 default_factory=QuantumGraphExecutionStatusColors
173 )
176@dataclasses.dataclass
177class QuantumProvenanceGraphStatusOptions(NodeStatusOptions):
178 """Specialized status options for quantum provenance graph reports."""
180 colors: QuantumProvenanceGraphStatusColors = dataclasses.field(
181 default_factory=QuantumProvenanceGraphStatusColors
182 )
185class NodeStatusAnnotator(Protocol):
186 """Protocol for annotating a networkx graph with task and dataset type
187 status information.
188 """
190 @overload
191 def __call__(self, xgraph: networkx.DiGraph, dataset_types: Literal[False]) -> None: ... 191 ↛ exitline 191 didn't return from function '__call__' because
193 @overload
194 def __call__(self, xgraph: networkx.MultiDiGraph, dataset_types: Literal[True]) -> None: ... 194 ↛ exitline 194 didn't return from function '__call__' because
196 def __call__(self, xgraph: networkx.DiGraph | networkx.MultiDiGraph, dataset_types: bool) -> None: ... 196 ↛ exitline 196 didn't return from function '__call__' because
199class QuantumGraphExecutionStatusAnnotator:
200 """Annotates a networkx graph with task and dataset status information from
201 a quantum graph execution summary, implementing the StatusAnnotator
202 protocol to update the graph with status data.
204 Parameters
205 ----------
206 *args : `typing.Any`
207 Arbitrary arguments.
208 **kwargs : `typing.Any`
209 Arbitrary keyword arguments.
210 """
212 def __init__(self, *args: Any, **kwargs: Any) -> None:
213 raise NotImplementedError("`QuantumGraphExecutionStatusAnnotator` is not implemented yet.")
216class QuantumProvenanceGraphStatusAnnotator:
217 """Annotates a networkx graph with task and dataset status information from
218 a quantum provenance summary, implementing the StatusAnnotator protocol to
219 update the graph with status data.
221 Parameters
222 ----------
223 qpg_summary : `~lsst.pipe.base.quantum_provenance_graph.Summary`
224 The quantum provenance summary to use for status information.
225 """
227 def __init__(self, qpg_summary: qpg.Summary) -> None:
228 self.qpg_summary = qpg_summary
230 @overload
231 def __call__(self, xgraph: networkx.DiGraph, dataset_types: Literal[False]) -> None: ... 231 ↛ exitline 231 didn't return from function '__call__' because
233 @overload
234 def __call__(self, xgraph: networkx.MultiDiGraph, dataset_types: Literal[True]) -> None: ... 234 ↛ exitline 234 didn't return from function '__call__' because
236 def __call__(self, xgraph: networkx.DiGraph | networkx.MultiDiGraph, dataset_types: bool) -> None:
237 for task_label, task_summary in self.qpg_summary.tasks.items():
238 fields = {
239 name.replace("n_", "").replace("successful", "succeeded"): getattr(task_summary, name)
240 for name in dir(task_summary)
241 if name.startswith("n_")
242 }
243 assert sum(fields.values()) == 2 * task_summary.n_expected, f"Incosistent status counts: {fields}"
244 task_status_info = TaskStatusInfo(**fields)
246 key = NodeKey(NodeType.TASK, task_label)
247 xgraph.nodes[key]["status"] = task_status_info
249 if dataset_types:
250 for dataset_type_name, dataset_type_summary in self.qpg_summary.datasets.items():
251 expected = dataset_type_summary.n_expected
252 produced = dataset_type_summary.n_visible + dataset_type_summary.n_shadowed
253 assert produced <= expected, f"Dataset types produced ({produced}) > expected ({expected})"
254 dataset_type_status_info = DatasetTypeStatusInfo(expected=expected, produced=produced)
256 key = NodeKey(NodeType.DATASET_TYPE, dataset_type_name)
257 xgraph.nodes[key]["status"] = dataset_type_status_info