Coverage for python / lsst / pipe / base / pipeline_graph / visualization / _status_annotator.py: 73%

93 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-28 08:44 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27from __future__ import annotations 

28 

29__all__ = ( 

30 "QuantumGraphExecutionStatusAnnotator", 

31 "QuantumGraphExecutionStatusOptions", 

32 "QuantumProvenanceGraphStatusAnnotator", 

33 "QuantumProvenanceGraphStatusOptions", 

34) 

35 

36import dataclasses 

37from typing import TYPE_CHECKING, Any, Literal, Protocol, overload 

38 

39import networkx 

40 

41from .._nodes import NodeKey, NodeType 

42 

43if TYPE_CHECKING: 

44 from ... import quantum_provenance_graph as qpg 

45 

46# ANSI color codes. 

47GREEN = "\033[32m" 

48RED = "\033[31m" 

49YELLOW = "\033[33m" 

50CYAN = "\033[36m" 

51WHITE = "\033[37m" 

52GRAY = "\033[90m" 

53MAGENTA = "\033[35m" 

54BROWN = "\u001b[38;5;130m" 

55RESET = "\033[0m" 

56 

57 

58@dataclasses.dataclass 

59class TaskStatusInfo: 

60 """Holds status information for a task.""" 

61 

62 expected: int 

63 succeeded: int 

64 failed: int 

65 blocked: int 

66 ready: int | None = None 

67 running: int | None = None 

68 wonky: int | None = None 

69 unknown: int | None = None 

70 

71 

72@dataclasses.dataclass 

73class DatasetTypeStatusInfo: 

74 """Holds status information for a dataset type.""" 

75 

76 expected: int 

77 produced: int 

78 

79 

80@dataclasses.dataclass 

81class StatusColors: 

82 """Base class for holding ANSI color codes for different progress segments 

83 or statuses. 

84 """ 

85 

86 # Base task status colors. 

87 expected: str = WHITE 

88 succeeded: str = GREEN 

89 failed: str = RED 

90 

91 # Base dataset type status colors. 

92 produced: str = GREEN 

93 

94 # Reset to default color. 

95 reset: str = RESET 

96 

97 

98@dataclasses.dataclass 

99class QuantumGraphExecutionStatusColors(StatusColors): 

100 """Holds ANSI color codes for different progress segments or statuses for 

101 quantum graph execution reports. 

102 

103 Status colors for both task and dataset type nodes are included. 

104 """ 

105 

106 def __post_init__(self) -> None: 

107 raise NotImplementedError("`QuantumGraphExecutionStatusColors` is not implemented yet.") 

108 

109 

110@dataclasses.dataclass 

111class QuantumProvenanceGraphStatusColors(StatusColors): 

112 """Holds ANSI color codes for different progress segments or statuses for 

113 quantum provenance graph reports. 

114 

115 Status colors for both task and dataset type nodes are included. 

116 """ 

117 

118 # Additional task status colors. 

119 blocked: str = YELLOW 

120 ready: str = GRAY 

121 running: str = MAGENTA 

122 wonky: str = CYAN 

123 unknown: str = BROWN 

124 

125 

126@dataclasses.dataclass 

127class NodeStatusOptions: 

128 """Base options for node status visualization. 

129 

130 Attributes 

131 ---------- 

132 colors : `StatusColors` 

133 A dataclass specifying ANSI color codes for distinct progress segments 

134 or statuses. 

135 display_percent : `bool` 

136 Whether to show percentage of progress. 

137 display_counts : `bool` 

138 Whether to show numeric counts (e.g., succeeded/expected). 

139 visualize : `bool` 

140 If `True`, status information for task or dataset type nodes will be 

141 visually indicated by segmented fills in text-based bars or flowchart 

142 nodes. 

143 min_bar_width : `int` 

144 Minimum width of the visualized progress bar in characters. Only counts 

145 the width of the bar itself, not any surrounding text. Only relevant if 

146 `visualize` is `True` and it's a text-based visualization. 

147 abbreviate : `bool` 

148 If `True`, status labels will be abbreviated to save space. For 

149 example, 'expected' will be abbreviated to 'exp' and 'blocked' to 

150 'blk'. 

151 """ 

152 

153 colors: QuantumGraphExecutionStatusColors | QuantumProvenanceGraphStatusColors 

154 display_percent: bool = True 

155 display_counts: bool = True 

156 visualize: bool = True 

157 min_bar_width: int = 15 

158 abbreviate: bool = True 

159 

160 def __post_init__(self) -> None: 

161 if not (self.display_percent or self.display_counts or self.visualize): 

162 raise ValueError( 

163 "At least one of 'display_percent', 'display_counts', or 'visualize' must be True." 

164 ) 

165 

166 

167@dataclasses.dataclass 

168class QuantumGraphExecutionStatusOptions(NodeStatusOptions): 

169 """Specialized status options for quantum graph execution reports.""" 

170 

171 colors: QuantumGraphExecutionStatusColors = dataclasses.field( 

172 default_factory=QuantumGraphExecutionStatusColors 

173 ) 

174 

175 

176@dataclasses.dataclass 

177class QuantumProvenanceGraphStatusOptions(NodeStatusOptions): 

178 """Specialized status options for quantum provenance graph reports.""" 

179 

180 colors: QuantumProvenanceGraphStatusColors = dataclasses.field( 

181 default_factory=QuantumProvenanceGraphStatusColors 

182 ) 

183 

184 

185class NodeStatusAnnotator(Protocol): 

186 """Protocol for annotating a networkx graph with task and dataset type 

187 status information. 

188 """ 

189 

190 @overload 

191 def __call__(self, xgraph: networkx.DiGraph, dataset_types: Literal[False]) -> None: ... 191 ↛ exitline 191 didn't return from function '__call__' because

192 

193 @overload 

194 def __call__(self, xgraph: networkx.MultiDiGraph, dataset_types: Literal[True]) -> None: ... 194 ↛ exitline 194 didn't return from function '__call__' because

195 

196 def __call__(self, xgraph: networkx.DiGraph | networkx.MultiDiGraph, dataset_types: bool) -> None: ... 196 ↛ exitline 196 didn't return from function '__call__' because

197 

198 

199class QuantumGraphExecutionStatusAnnotator: 

200 """Annotates a networkx graph with task and dataset status information from 

201 a quantum graph execution summary, implementing the StatusAnnotator 

202 protocol to update the graph with status data. 

203 

204 Parameters 

205 ---------- 

206 *args : `typing.Any` 

207 Arbitrary arguments. 

208 **kwargs : `typing.Any` 

209 Arbitrary keyword arguments. 

210 """ 

211 

212 def __init__(self, *args: Any, **kwargs: Any) -> None: 

213 raise NotImplementedError("`QuantumGraphExecutionStatusAnnotator` is not implemented yet.") 

214 

215 

216class QuantumProvenanceGraphStatusAnnotator: 

217 """Annotates a networkx graph with task and dataset status information from 

218 a quantum provenance summary, implementing the StatusAnnotator protocol to 

219 update the graph with status data. 

220 

221 Parameters 

222 ---------- 

223 qpg_summary : `~lsst.pipe.base.quantum_provenance_graph.Summary` 

224 The quantum provenance summary to use for status information. 

225 """ 

226 

227 def __init__(self, qpg_summary: qpg.Summary) -> None: 

228 self.qpg_summary = qpg_summary 

229 

230 @overload 

231 def __call__(self, xgraph: networkx.DiGraph, dataset_types: Literal[False]) -> None: ... 231 ↛ exitline 231 didn't return from function '__call__' because

232 

233 @overload 

234 def __call__(self, xgraph: networkx.MultiDiGraph, dataset_types: Literal[True]) -> None: ... 234 ↛ exitline 234 didn't return from function '__call__' because

235 

236 def __call__(self, xgraph: networkx.DiGraph | networkx.MultiDiGraph, dataset_types: bool) -> None: 

237 for task_label, task_summary in self.qpg_summary.tasks.items(): 

238 fields = { 

239 name.replace("n_", "").replace("successful", "succeeded"): getattr(task_summary, name) 

240 for name in dir(task_summary) 

241 if name.startswith("n_") 

242 } 

243 assert sum(fields.values()) == 2 * task_summary.n_expected, f"Incosistent status counts: {fields}" 

244 task_status_info = TaskStatusInfo(**fields) 

245 

246 key = NodeKey(NodeType.TASK, task_label) 

247 xgraph.nodes[key]["status"] = task_status_info 

248 

249 if dataset_types: 

250 for dataset_type_name, dataset_type_summary in self.qpg_summary.datasets.items(): 

251 expected = dataset_type_summary.n_expected 

252 produced = dataset_type_summary.n_visible + dataset_type_summary.n_shadowed 

253 assert produced <= expected, f"Dataset types produced ({produced}) > expected ({expected})" 

254 dataset_type_status_info = DatasetTypeStatusInfo(expected=expected, produced=produced) 

255 

256 key = NodeKey(NodeType.DATASET_TYPE, dataset_type_name) 

257 xgraph.nodes[key]["status"] = dataset_type_status_info