Coverage for python / lsst / pipe / base / pipeline_graph / visualization / _formatting.py: 13%
168 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 08:32 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 08:32 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
27from __future__ import annotations
29__all__ = ("GetNodeText", "format_dimensions", "format_task_class", "get_node_symbol")
31import itertools
32import re
33import textwrap
34from collections.abc import Iterator
36import networkx
37import networkx.algorithms.community
38from wcwidth import wcswidth # type: ignore
40from lsst.daf.butler import DimensionGroup
42from .._nodes import NodeKey, NodeType
43from ._merge import MergedNodeKey
44from ._options import NodeAttributeOptions
45from ._status_annotator import DatasetTypeStatusInfo, NodeStatusOptions, StatusColors, TaskStatusInfo
47DisplayNodeKey = NodeKey | MergedNodeKey
48"""Type alias for graph keys that may be original task, task init, or dataset
49type keys, or a merge of several keys for display purposes.
50"""
53def strip_ansi(s: str) -> str:
54 """Remove ANSI escape codes from a string, so that `wcswidth()` measures
55 the real visible width of the string.
57 Parameters
58 ----------
59 s : `str`
60 String to strip of ANSI escape codes.
62 Returns
63 -------
64 stripped : `str`
65 String with ANSI escape codes removed.
66 """
67 # ANSI escape sequence remover
68 ansi_escape = re.compile(r"\x1B\[[0-?]*[ -/]*[@-~]")
69 return ansi_escape.sub("", s)
72def render_segment(f: float) -> str:
73 """Convert a float into a string of blocks, rounding to the nearest whole
74 number of columns.
76 Parameters
77 ----------
78 f : `float`
79 Number of columns to fill.
81 Returns
82 -------
83 blocks : `str`
84 String of blocks filling the specified number of columns.
85 """
86 return "█" * round(f)
89def get_node_symbol(node: DisplayNodeKey, x: int | None = None) -> str:
90 """Return a single-character symbol for a particular node type.
92 Parameters
93 ----------
94 node : `DisplayNodeKey`
95 Named tuple used as the node key.
96 x : `str`, optional
97 Ignored; may be passed for compatibility with the `Printer` class's
98 ``get_symbol`` callback.
100 Returns
101 -------
102 symbol : `str`
103 A single-character symbol.
104 """
105 match node:
106 case NodeKey(node_type=NodeType.TASK):
107 return "■"
108 case NodeKey(node_type=NodeType.DATASET_TYPE):
109 return "○"
110 case NodeKey(node_type=NodeType.TASK_INIT):
111 return "▣"
112 case MergedNodeKey(node_type=NodeType.TASK):
113 return "▥"
114 case MergedNodeKey(node_type=NodeType.DATASET_TYPE):
115 return "◍"
116 case MergedNodeKey(node_type=NodeType.TASK_INIT):
117 return "▤"
118 raise ValueError(f"Unexpected node key: {node} of type {type(node)}.")
121class GetNodeText:
122 """A callback for the `Printer` class's `get_text` callback that
123 prints detailed information about a node and can defer long entries to
124 a footnote.
126 Parameters
127 ----------
128 xgraph : `networkx.DiGraph` or `networkx.MultiDiGraph`
129 NetworkX export of a `.PipelineGraph` that is being displayed.
130 options : `NodeAttributeOptions`
131 Options for how much information to display.
132 width : `int` or `None`
133 Number of display columns that node text can occupy. `None` for
134 unlimited.
135 """
137 def __init__(
138 self,
139 xgraph: networkx.DiGraph | networkx.MultiDiGraph,
140 options: NodeAttributeOptions,
141 width: int | None,
142 ):
143 self.xgraph = xgraph
144 self.options = options
145 self.width = width
146 self.deferred: list[tuple[str, tuple[str, str], list[str]]] = []
148 def __call__(self, node: DisplayNodeKey, x: int, style: tuple[str, str]) -> str:
149 """Return a line of text describing a node.
151 Parameters
152 ----------
153 node : `DisplayNodeKey`
154 Named tuple used as the node key.
155 x : `int`
156 Ignored; may be passed for compatibility with the `Printer` class's
157 ``get_text`` callback.
158 style : `tuple` [`str`, `str`]
159 Tuple of ANSI color codes for overflow markers.
160 """
161 state = self.xgraph.nodes[node]
162 has_status = "status" in state
164 # Build description.
165 description = self._build_description(node, state)
167 # Possibly build progress bar to append to description.
168 progress_portion = ""
169 if has_status:
170 progress_portion = self.format_node_status(description, state["status"])
172 # Stitch together the final line, handling overflow if needed.
173 final_line = self._stitch_node_text(description, progress_portion, style)
174 return final_line
176 def _build_description(self, node: DisplayNodeKey, state: dict) -> str:
177 """Build the node description, possibly with additional details.
179 Parameters
180 ----------
181 node : `DisplayNodeKey`
182 Named tuple used as the node key.
183 state : `dict`
184 Node attributes.
186 Returns
187 -------
188 description : `str`
189 The node description.
190 """
191 terms = [f"{node}:" if self.options.has_details(node.node_type) else str(node)]
192 # Optionally append dimension info.
193 if self.options.dimensions and node.node_type != NodeType.TASK_INIT:
194 terms.append(self.format_dimensions(state["dimensions"]))
196 # Optionally append task class name.
197 if self.options.task_classes and (
198 node.node_type is NodeType.TASK or node.node_type is NodeType.TASK_INIT
199 ):
200 terms.append(self.format_task_class(state["task_class_name"]))
202 # Optionally append storage class name.
203 if self.options.storage_classes and node.node_type is NodeType.DATASET_TYPE:
204 terms.append(state["storage_class_name"])
206 description = " ".join(terms)
208 return description
210 def _stitch_node_text(self, description: str, progress_portion: str, style: tuple[str, str]) -> str:
211 """Make the final line of node text to display given the description
212 and possibly a progress portion, and handle overflow.
214 It measures the total width of the description and progress portion,
215 and if it exceeds the screen width, it truncates the description and
216 appends a footnote.
218 Parameters
219 ----------
220 description : `str`
221 The node description.
222 progress_portion : `str`
223 The progress portion of the node.
224 style : `tuple` [`str`, `str`]
225 Tuple of ANSI color codes for overflow markers.
227 Returns
228 -------
229 final_line : `str`
230 The final line of text to display.
231 """
232 final_line = f"{description}{progress_portion}" if progress_portion else description
233 total_len = wcswidth(strip_ansi(final_line))
235 if self.width and total_len > self.width:
236 overflow_index = f"[{len(self.deferred) + 1}]"
237 overflow_marker = f"...{style[0]}{overflow_index}{style[1]}"
239 avail_desc_width = (
240 self.width - wcswidth(strip_ansi(progress_portion)) - wcswidth(strip_ansi(overflow_marker))
241 )
242 if avail_desc_width < 0:
243 avail_desc_width = 0
245 truncated_desc = description[:avail_desc_width] + overflow_marker
246 self.deferred.append((overflow_index, style, [description]))
248 return f"{truncated_desc}{progress_portion}"
249 else:
250 return final_line
252 def format_node_status(self, description: str, status: TaskStatusInfo | DatasetTypeStatusInfo) -> str:
253 """Format the status of a task node.
255 Parameters
256 ----------
257 description : `str`
258 The node description.
259 status : `TaskStatusInfo` or `DatasetTypeStatusInfo`
260 Holds status information for a task or dataset type.
262 Returns
263 -------
264 formatted : `str`
265 The formatted status string.
266 """
267 if not isinstance(self.options.status, NodeStatusOptions):
268 raise ValueError(f"Invalid node status options: {self.options.status!r}.")
270 return format_node_status(
271 description,
272 self.options.status,
273 status,
274 self.width,
275 )
277 def format_dimensions(self, dimensions: DimensionGroup) -> str:
278 """Format the dimensions of a task or dataset type node.
280 Parameters
281 ----------
282 dimensions : `~lsst.daf.butler.DimensionGroup`
283 The dimensions to be formatted.
285 Returns
286 -------
287 formatted : `str`
288 The formatted dimension string.
289 """
290 return format_dimensions(self.options, dimensions)
292 def format_task_class(self, task_class_name: str) -> str:
293 """Format the type object for a task or task init node.
295 Parameters
296 ----------
297 task_class_name : `str`
298 The name of the task class.
300 Returns
301 -------
302 formatted : `str`
303 The formatted string.
304 """
305 return format_task_class(self.options, task_class_name)
307 def format_deferrals(self, width: int | None) -> Iterator[str]:
308 """Iterate over all descriptions that were truncated earlier and
309 replace with footnote placeholders.
311 Parameters
312 ----------
313 width : `int` or `None`
314 Number of columns to wrap descriptions at.
316 Returns
317 -------
318 deferrals : `collections.abc.Iterator` [ `str` ]
319 Lines of deferred text, already wrapped.
320 """
321 indent = " "
322 for index, style, terms in self.deferred:
323 yield f"{style[0]}{index}{style[1]}"
324 for term in terms:
325 if width:
326 yield from textwrap.wrap(term, width, initial_indent=indent, subsequent_indent=indent)
327 else:
328 yield term
331def format_dimensions(options: NodeAttributeOptions, dimensions: DimensionGroup) -> str:
332 """Format the dimensions of a task or dataset type node.
334 Parameters
335 ----------
336 options : `NodeAttributeOptions`
337 Options for how much information to display.
338 dimensions : `~lsst.daf.butler.DimensionGroup`
339 The dimensions to be formatted.
341 Returns
342 -------
343 formatted : `str`
344 The formatted dimension string.
345 """
346 match options.dimensions:
347 case "full":
348 return str(dimensions.names)
349 case "concise":
350 redundant: set[str] = set()
351 for a, b in itertools.permutations(dimensions.required, 2):
352 if a in dimensions.universe[b].required:
353 redundant.add(a)
354 kept = [d for d in dimensions.required if d not in redundant]
355 assert dimensions.universe.conform(kept) == dimensions
356 return f"{{{', '.join(kept)}}}"
357 case False:
358 return ""
359 raise ValueError(f"Invalid display option for dimensions: {options.dimensions!r}.")
362def format_task_class(options: NodeAttributeOptions, task_class_name: str) -> str:
363 """Format the type object for a task or task init node.
365 Parameters
366 ----------
367 options : `NodeAttributeOptions`
368 Options for how much information to display.
369 task_class_name : `str`
370 The name of the task class.
372 Returns
373 -------
374 formatted : `str`
375 The formatted string.
376 """
377 match options.task_classes:
378 case "full":
379 return task_class_name
380 case "concise":
381 return task_class_name.split(".")[-1]
382 case False:
383 return ""
384 raise ValueError(f"Invalid display option for task_classes: {options.task_classes!r}.")
387def _build_progress_bar(
388 description: str,
389 prefix: str,
390 suffix: str,
391 segments: list[tuple[str, float]],
392 width: int | None,
393 colors: StatusColors,
394 min_bar_width: int,
395) -> str:
396 """Shared helper that constructs a multi-segment progress bar.
398 Parameters
399 ----------
400 description : `str`
401 Main node description (used for measuring available space).
402 prefix : `str`
403 Text before the bar (e.g. ' · 42%▕').
404 suffix : `str`
405 Text after the bar (e.g. '▏exp: 107 ...').
406 segments : `list` of `tuple` [`str`, `float`]
407 Each tuple is (color_code, fractionOfBarWidth). We'll create these
408 segments in sequence.
409 width : `int` or None
410 Overall maximum line width. None for unlimited. We'll still respect the
411 minimum bar width.
412 colors : `StatusColors`
413 An instance containing .reset and color fields.
414 min_bar_width : `int`
415 Minimum number of display columns for the bar.
417 Returns
418 -------
419 formatted : str
420 The assembled prefix + bar + suffix, sized with respect to the width.
421 """
422 used_len = wcswidth(strip_ansi(prefix)) + wcswidth(strip_ansi(suffix))
423 if width is not None:
424 bar_width = max(width - wcswidth(strip_ansi(description)) - used_len, min_bar_width)
425 else:
426 bar_width = min_bar_width
428 # Build bar from fractional segments.
429 bar_str = ""
430 total_cols = 0
431 for ansi_color, fraction in segments:
432 cols = round(bar_width * fraction)
433 bar_str += f"{ansi_color}{render_segment(cols)}{colors.reset}"
434 total_cols += cols
436 # Pad the bar to the full width.
437 if total_cols < bar_width:
438 bar_str += " " * (bar_width - total_cols)
440 return prefix + bar_str + suffix
443def format_node_status(
444 description: str,
445 status_options: NodeStatusOptions,
446 status: TaskStatusInfo | DatasetTypeStatusInfo,
447 width: int | None,
448) -> str:
449 """Build a progress bar for a task or dataset type node.
451 Parameters
452 ----------
453 description : `str`
454 Node description for measuring leftover columns.
455 status_options : `NodeStatusOptions`
456 Options for node status visualization.
457 status : `TaskStatusInfo` or `DatasetTypeStatusInfo`
458 Holds status information for a task or dataset type.
459 width : `int` or None
460 Overall width limit (None => unlimited).
462 Returns
463 -------
464 formatted : str
465 The final prefix + bar + suffix line.
466 """
467 import dataclasses
469 status_abbreviations = {
470 "expected": "exp",
471 "succeeded": "suc",
472 "failed": "fail",
473 "blocked": "blk",
474 "ready": "rdy",
475 "running": "run",
476 "wonky": "wnk",
477 "unknown": "unk",
478 "produced": "prd",
479 }
481 colors = status_options.colors
482 expected = status.expected
483 done = status.succeeded if isinstance(status, TaskStatusInfo) else status.produced
484 full_success = done == expected
485 status_lookup = dataclasses.asdict(status)
487 percent = 100.0 * done / expected if expected else 0.0
488 total = float(expected) if expected else 1.0
489 prefix = ""
491 if status_options.display_percent or status_options.display_counts:
492 if not status_options.visualize or (status_options.visualize and status_options.display_percent):
493 full_success_color = colors.succeeded if isinstance(status, TaskStatusInfo) else colors.produced
494 color_code = full_success_color if full_success else colors.failed
495 prefix += f"{color_code} ▶ {colors.reset}"
496 if status_options.display_percent:
497 pct = round(percent)
498 if pct == 100 and not full_success:
499 pct == 99 # Avoid showing 100% if not fully successful.
500 prefix += f"{pct}%"
501 if not status_options.visualize and status_options.display_percent and status_options.display_counts:
502 prefix += " | "
504 if status_options.visualize:
505 prefix += "▕"
507 suffix_parts = []
508 segments = []
510 for key, value in status_lookup.items():
511 if value is not None:
512 color_code = getattr(colors, key)
513 if status_options.display_counts:
514 label = status_abbreviations[key] if status_options.abbreviate else key
515 suffix_parts.append(f"{label}:{color_code}{value}{colors.reset}")
516 if key != "expected":
517 # Build a progress bar segment.
518 segments.append((color_code, value / total))
520 # Produce suffix from the parts.
521 suffix = "▏" if status_options.visualize else ""
522 suffix += " | ".join(suffix_parts)
524 if status_options.visualize:
525 return _build_progress_bar(
526 description, prefix, suffix, segments, width, colors, status_options.min_bar_width
527 )
528 else:
529 return f"{prefix}{suffix}"