Coverage for python / lsst / pipe / base / pipeline_graph / visualization / _formatting.py: 13%

168 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:47 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27from __future__ import annotations 

28 

29__all__ = ("GetNodeText", "format_dimensions", "format_task_class", "get_node_symbol") 

30 

31import itertools 

32import re 

33import textwrap 

34from collections.abc import Iterator 

35 

36import networkx 

37import networkx.algorithms.community 

38from wcwidth import wcswidth # type: ignore 

39 

40from lsst.daf.butler import DimensionGroup 

41 

42from .._nodes import NodeKey, NodeType 

43from ._merge import MergedNodeKey 

44from ._options import NodeAttributeOptions 

45from ._status_annotator import DatasetTypeStatusInfo, NodeStatusOptions, StatusColors, TaskStatusInfo 

46 

47DisplayNodeKey = NodeKey | MergedNodeKey 

48"""Type alias for graph keys that may be original task, task init, or dataset 

49type keys, or a merge of several keys for display purposes. 

50""" 

51 

52 

53def strip_ansi(s: str) -> str: 

54 """Remove ANSI escape codes from a string, so that `wcswidth()` measures 

55 the real visible width of the string. 

56 

57 Parameters 

58 ---------- 

59 s : `str` 

60 String to strip of ANSI escape codes. 

61 

62 Returns 

63 ------- 

64 stripped : `str` 

65 String with ANSI escape codes removed. 

66 """ 

67 # ANSI escape sequence remover 

68 ansi_escape = re.compile(r"\x1B\[[0-?]*[ -/]*[@-~]") 

69 return ansi_escape.sub("", s) 

70 

71 

72def render_segment(f: float) -> str: 

73 """Convert a float into a string of blocks, rounding to the nearest whole 

74 number of columns. 

75 

76 Parameters 

77 ---------- 

78 f : `float` 

79 Number of columns to fill. 

80 

81 Returns 

82 ------- 

83 blocks : `str` 

84 String of blocks filling the specified number of columns. 

85 """ 

86 return "█" * round(f) 

87 

88 

89def get_node_symbol(node: DisplayNodeKey, x: int | None = None) -> str: 

90 """Return a single-character symbol for a particular node type. 

91 

92 Parameters 

93 ---------- 

94 node : `DisplayNodeKey` 

95 Named tuple used as the node key. 

96 x : `str`, optional 

97 Ignored; may be passed for compatibility with the `Printer` class's 

98 ``get_symbol`` callback. 

99 

100 Returns 

101 ------- 

102 symbol : `str` 

103 A single-character symbol. 

104 """ 

105 match node: 

106 case NodeKey(node_type=NodeType.TASK): 

107 return "■" 

108 case NodeKey(node_type=NodeType.DATASET_TYPE): 

109 return "○" 

110 case NodeKey(node_type=NodeType.TASK_INIT): 

111 return "▣" 

112 case MergedNodeKey(node_type=NodeType.TASK): 

113 return "▥" 

114 case MergedNodeKey(node_type=NodeType.DATASET_TYPE): 

115 return "◍" 

116 case MergedNodeKey(node_type=NodeType.TASK_INIT): 

117 return "▤" 

118 raise ValueError(f"Unexpected node key: {node} of type {type(node)}.") 

119 

120 

121class GetNodeText: 

122 """A callback for the `Printer` class's `get_text` callback that 

123 prints detailed information about a node and can defer long entries to 

124 a footnote. 

125 

126 Parameters 

127 ---------- 

128 xgraph : `networkx.DiGraph` or `networkx.MultiDiGraph` 

129 NetworkX export of a `.PipelineGraph` that is being displayed. 

130 options : `NodeAttributeOptions` 

131 Options for how much information to display. 

132 width : `int` or `None` 

133 Number of display columns that node text can occupy. `None` for 

134 unlimited. 

135 """ 

136 

137 def __init__( 

138 self, 

139 xgraph: networkx.DiGraph | networkx.MultiDiGraph, 

140 options: NodeAttributeOptions, 

141 width: int | None, 

142 ): 

143 self.xgraph = xgraph 

144 self.options = options 

145 self.width = width 

146 self.deferred: list[tuple[str, tuple[str, str], list[str]]] = [] 

147 

148 def __call__(self, node: DisplayNodeKey, x: int, style: tuple[str, str]) -> str: 

149 """Return a line of text describing a node. 

150 

151 Parameters 

152 ---------- 

153 node : `DisplayNodeKey` 

154 Named tuple used as the node key. 

155 x : `int` 

156 Ignored; may be passed for compatibility with the `Printer` class's 

157 ``get_text`` callback. 

158 style : `tuple` [`str`, `str`] 

159 Tuple of ANSI color codes for overflow markers. 

160 """ 

161 state = self.xgraph.nodes[node] 

162 has_status = "status" in state 

163 

164 # Build description. 

165 description = self._build_description(node, state) 

166 

167 # Possibly build progress bar to append to description. 

168 progress_portion = "" 

169 if has_status: 

170 progress_portion = self.format_node_status(description, state["status"]) 

171 

172 # Stitch together the final line, handling overflow if needed. 

173 final_line = self._stitch_node_text(description, progress_portion, style) 

174 return final_line 

175 

176 def _build_description(self, node: DisplayNodeKey, state: dict) -> str: 

177 """Build the node description, possibly with additional details. 

178 

179 Parameters 

180 ---------- 

181 node : `DisplayNodeKey` 

182 Named tuple used as the node key. 

183 state : `dict` 

184 Node attributes. 

185 

186 Returns 

187 ------- 

188 description : `str` 

189 The node description. 

190 """ 

191 terms = [f"{node}:" if self.options.has_details(node.node_type) else str(node)] 

192 # Optionally append dimension info. 

193 if self.options.dimensions and node.node_type != NodeType.TASK_INIT: 

194 terms.append(self.format_dimensions(state["dimensions"])) 

195 

196 # Optionally append task class name. 

197 if self.options.task_classes and ( 

198 node.node_type is NodeType.TASK or node.node_type is NodeType.TASK_INIT 

199 ): 

200 terms.append(self.format_task_class(state["task_class_name"])) 

201 

202 # Optionally append storage class name. 

203 if self.options.storage_classes and node.node_type is NodeType.DATASET_TYPE: 

204 terms.append(state["storage_class_name"]) 

205 

206 description = " ".join(terms) 

207 

208 return description 

209 

210 def _stitch_node_text(self, description: str, progress_portion: str, style: tuple[str, str]) -> str: 

211 """Make the final line of node text to display given the description 

212 and possibly a progress portion, and handle overflow. 

213 

214 It measures the total width of the description and progress portion, 

215 and if it exceeds the screen width, it truncates the description and 

216 appends a footnote. 

217 

218 Parameters 

219 ---------- 

220 description : `str` 

221 The node description. 

222 progress_portion : `str` 

223 The progress portion of the node. 

224 style : `tuple` [`str`, `str`] 

225 Tuple of ANSI color codes for overflow markers. 

226 

227 Returns 

228 ------- 

229 final_line : `str` 

230 The final line of text to display. 

231 """ 

232 final_line = f"{description}{progress_portion}" if progress_portion else description 

233 total_len = wcswidth(strip_ansi(final_line)) 

234 

235 if self.width and total_len > self.width: 

236 overflow_index = f"[{len(self.deferred) + 1}]" 

237 overflow_marker = f"...{style[0]}{overflow_index}{style[1]}" 

238 

239 avail_desc_width = ( 

240 self.width - wcswidth(strip_ansi(progress_portion)) - wcswidth(strip_ansi(overflow_marker)) 

241 ) 

242 if avail_desc_width < 0: 

243 avail_desc_width = 0 

244 

245 truncated_desc = description[:avail_desc_width] + overflow_marker 

246 self.deferred.append((overflow_index, style, [description])) 

247 

248 return f"{truncated_desc}{progress_portion}" 

249 else: 

250 return final_line 

251 

252 def format_node_status(self, description: str, status: TaskStatusInfo | DatasetTypeStatusInfo) -> str: 

253 """Format the status of a task node. 

254 

255 Parameters 

256 ---------- 

257 description : `str` 

258 The node description. 

259 status : `TaskStatusInfo` or `DatasetTypeStatusInfo` 

260 Holds status information for a task or dataset type. 

261 

262 Returns 

263 ------- 

264 formatted : `str` 

265 The formatted status string. 

266 """ 

267 if not isinstance(self.options.status, NodeStatusOptions): 

268 raise ValueError(f"Invalid node status options: {self.options.status!r}.") 

269 

270 return format_node_status( 

271 description, 

272 self.options.status, 

273 status, 

274 self.width, 

275 ) 

276 

277 def format_dimensions(self, dimensions: DimensionGroup) -> str: 

278 """Format the dimensions of a task or dataset type node. 

279 

280 Parameters 

281 ---------- 

282 dimensions : `~lsst.daf.butler.DimensionGroup` 

283 The dimensions to be formatted. 

284 

285 Returns 

286 ------- 

287 formatted : `str` 

288 The formatted dimension string. 

289 """ 

290 return format_dimensions(self.options, dimensions) 

291 

292 def format_task_class(self, task_class_name: str) -> str: 

293 """Format the type object for a task or task init node. 

294 

295 Parameters 

296 ---------- 

297 task_class_name : `str` 

298 The name of the task class. 

299 

300 Returns 

301 ------- 

302 formatted : `str` 

303 The formatted string. 

304 """ 

305 return format_task_class(self.options, task_class_name) 

306 

307 def format_deferrals(self, width: int | None) -> Iterator[str]: 

308 """Iterate over all descriptions that were truncated earlier and 

309 replace with footnote placeholders. 

310 

311 Parameters 

312 ---------- 

313 width : `int` or `None` 

314 Number of columns to wrap descriptions at. 

315 

316 Returns 

317 ------- 

318 deferrals : `collections.abc.Iterator` [ `str` ] 

319 Lines of deferred text, already wrapped. 

320 """ 

321 indent = " " 

322 for index, style, terms in self.deferred: 

323 yield f"{style[0]}{index}{style[1]}" 

324 for term in terms: 

325 if width: 

326 yield from textwrap.wrap(term, width, initial_indent=indent, subsequent_indent=indent) 

327 else: 

328 yield term 

329 

330 

331def format_dimensions(options: NodeAttributeOptions, dimensions: DimensionGroup) -> str: 

332 """Format the dimensions of a task or dataset type node. 

333 

334 Parameters 

335 ---------- 

336 options : `NodeAttributeOptions` 

337 Options for how much information to display. 

338 dimensions : `~lsst.daf.butler.DimensionGroup` 

339 The dimensions to be formatted. 

340 

341 Returns 

342 ------- 

343 formatted : `str` 

344 The formatted dimension string. 

345 """ 

346 match options.dimensions: 

347 case "full": 

348 return str(dimensions.names) 

349 case "concise": 

350 redundant: set[str] = set() 

351 for a, b in itertools.permutations(dimensions.required, 2): 

352 if a in dimensions.universe[b].required: 

353 redundant.add(a) 

354 kept = [d for d in dimensions.required if d not in redundant] 

355 assert dimensions.universe.conform(kept) == dimensions 

356 return f"{{{', '.join(kept)}}}" 

357 case False: 

358 return "" 

359 raise ValueError(f"Invalid display option for dimensions: {options.dimensions!r}.") 

360 

361 

362def format_task_class(options: NodeAttributeOptions, task_class_name: str) -> str: 

363 """Format the type object for a task or task init node. 

364 

365 Parameters 

366 ---------- 

367 options : `NodeAttributeOptions` 

368 Options for how much information to display. 

369 task_class_name : `str` 

370 The name of the task class. 

371 

372 Returns 

373 ------- 

374 formatted : `str` 

375 The formatted string. 

376 """ 

377 match options.task_classes: 

378 case "full": 

379 return task_class_name 

380 case "concise": 

381 return task_class_name.split(".")[-1] 

382 case False: 

383 return "" 

384 raise ValueError(f"Invalid display option for task_classes: {options.task_classes!r}.") 

385 

386 

387def _build_progress_bar( 

388 description: str, 

389 prefix: str, 

390 suffix: str, 

391 segments: list[tuple[str, float]], 

392 width: int | None, 

393 colors: StatusColors, 

394 min_bar_width: int, 

395) -> str: 

396 """Shared helper that constructs a multi-segment progress bar. 

397 

398 Parameters 

399 ---------- 

400 description : `str` 

401 Main node description (used for measuring available space). 

402 prefix : `str` 

403 Text before the bar (e.g. ' · 42%▕'). 

404 suffix : `str` 

405 Text after the bar (e.g. '▏exp: 107 ...'). 

406 segments : `list` of `tuple` [`str`, `float`] 

407 Each tuple is (color_code, fractionOfBarWidth). We'll create these 

408 segments in sequence. 

409 width : `int` or None 

410 Overall maximum line width. None for unlimited. We'll still respect the 

411 minimum bar width. 

412 colors : `StatusColors` 

413 An instance containing .reset and color fields. 

414 min_bar_width : `int` 

415 Minimum number of display columns for the bar. 

416 

417 Returns 

418 ------- 

419 formatted : str 

420 The assembled prefix + bar + suffix, sized with respect to the width. 

421 """ 

422 used_len = wcswidth(strip_ansi(prefix)) + wcswidth(strip_ansi(suffix)) 

423 if width is not None: 

424 bar_width = max(width - wcswidth(strip_ansi(description)) - used_len, min_bar_width) 

425 else: 

426 bar_width = min_bar_width 

427 

428 # Build bar from fractional segments. 

429 bar_str = "" 

430 total_cols = 0 

431 for ansi_color, fraction in segments: 

432 cols = round(bar_width * fraction) 

433 bar_str += f"{ansi_color}{render_segment(cols)}{colors.reset}" 

434 total_cols += cols 

435 

436 # Pad the bar to the full width. 

437 if total_cols < bar_width: 

438 bar_str += " " * (bar_width - total_cols) 

439 

440 return prefix + bar_str + suffix 

441 

442 

443def format_node_status( 

444 description: str, 

445 status_options: NodeStatusOptions, 

446 status: TaskStatusInfo | DatasetTypeStatusInfo, 

447 width: int | None, 

448) -> str: 

449 """Build a progress bar for a task or dataset type node. 

450 

451 Parameters 

452 ---------- 

453 description : `str` 

454 Node description for measuring leftover columns. 

455 status_options : `NodeStatusOptions` 

456 Options for node status visualization. 

457 status : `TaskStatusInfo` or `DatasetTypeStatusInfo` 

458 Holds status information for a task or dataset type. 

459 width : `int` or None 

460 Overall width limit (None => unlimited). 

461 

462 Returns 

463 ------- 

464 formatted : str 

465 The final prefix + bar + suffix line. 

466 """ 

467 import dataclasses 

468 

469 status_abbreviations = { 

470 "expected": "exp", 

471 "succeeded": "suc", 

472 "failed": "fail", 

473 "blocked": "blk", 

474 "ready": "rdy", 

475 "running": "run", 

476 "wonky": "wnk", 

477 "unknown": "unk", 

478 "produced": "prd", 

479 } 

480 

481 colors = status_options.colors 

482 expected = status.expected 

483 done = status.succeeded if isinstance(status, TaskStatusInfo) else status.produced 

484 full_success = done == expected 

485 status_lookup = dataclasses.asdict(status) 

486 

487 percent = 100.0 * done / expected if expected else 0.0 

488 total = float(expected) if expected else 1.0 

489 prefix = "" 

490 

491 if status_options.display_percent or status_options.display_counts: 

492 if not status_options.visualize or (status_options.visualize and status_options.display_percent): 

493 full_success_color = colors.succeeded if isinstance(status, TaskStatusInfo) else colors.produced 

494 color_code = full_success_color if full_success else colors.failed 

495 prefix += f"{color_code}{colors.reset}" 

496 if status_options.display_percent: 

497 pct = round(percent) 

498 if pct == 100 and not full_success: 

499 pct == 99 # Avoid showing 100% if not fully successful. 

500 prefix += f"{pct}%" 

501 if not status_options.visualize and status_options.display_percent and status_options.display_counts: 

502 prefix += " | " 

503 

504 if status_options.visualize: 

505 prefix += "▕" 

506 

507 suffix_parts = [] 

508 segments = [] 

509 

510 for key, value in status_lookup.items(): 

511 if value is not None: 

512 color_code = getattr(colors, key) 

513 if status_options.display_counts: 

514 label = status_abbreviations[key] if status_options.abbreviate else key 

515 suffix_parts.append(f"{label}:{color_code}{value}{colors.reset}") 

516 if key != "expected": 

517 # Build a progress bar segment. 

518 segments.append((color_code, value / total)) 

519 

520 # Produce suffix from the parts. 

521 suffix = "▏" if status_options.visualize else "" 

522 suffix += " | ".join(suffix_parts) 

523 

524 if status_options.visualize: 

525 return _build_progress_bar( 

526 description, prefix, suffix, segments, width, colors, status_options.min_bar_width 

527 ) 

528 else: 

529 return f"{prefix}{suffix}"