Coverage for python / lsst / ctrl / mpexec / showInfo.py: 11%
210 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 08:33 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 08:33 +0000
1# This file is part of ctrl_mpexec.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30__all__ = ["ShowInfo"]
32import fnmatch
33import re
34import sys
35from collections import defaultdict
36from collections.abc import Mapping
37from typing import Any
39import lsst.pex.config as pexConfig
40import lsst.pex.config.history as pexConfigHistory
41from lsst.daf.butler import Butler, DatasetRef, DatasetType, NamedKeyMapping
42from lsst.daf.butler.datastore.record_data import DatastoreRecordData
43from lsst.pipe.base import PipelineGraph
44from lsst.pipe.base.pipeline_graph import visualization
45from lsst.pipe.base.quantum_graph import PredictedQuantumGraph
46from lsst.resources import ResourcePathExpression
48from . import util
49from ._pipeline_graph_factory import PipelineGraphFactory
52class _FilteredStream:
53 """A file-like object that filters some config fields.
55 Note
56 ----
57 This class depends on implementation details of ``Config.saveToStream``
58 methods, in particular that that method uses single call to write()
59 method to save information about single config field, and that call
60 combines comments string(s) for a field and field path and value.
61 This class will not work reliably on the "import" strings, so imports
62 should be disabled by passing ``skipImports=True`` to ``saveToStream()``.
63 """
65 def __init__(self, pattern: str, stream: Any = None) -> None:
66 if stream is None:
67 stream = sys.stdout
68 self.stream = stream
69 # obey case if pattern isn't lowercase or requests NOIGNORECASE
70 mat = re.search(r"(.*):NOIGNORECASE$", pattern)
72 if mat:
73 pattern = mat.group(1)
74 self._pattern = re.compile(fnmatch.translate(pattern))
75 else:
76 if pattern != pattern.lower():
77 print(
78 f'Matching "{pattern}" without regard to case (append :NOIGNORECASE to prevent this)',
79 file=self.stream,
80 )
81 self._pattern = re.compile(fnmatch.translate(pattern), re.IGNORECASE)
83 def write(self, showStr: str) -> None:
84 # Strip off doc string line(s) and cut off at "=" for string matching
85 matchStr = showStr.rstrip().split("\n")[-1].split("=")[0]
86 if self._pattern.search(matchStr):
87 self.stream.write(showStr)
90class ShowInfo:
91 """Show information about a pipeline or quantum graph.
93 Parameters
94 ----------
95 show : `list` [`str`]
96 A list of show commands, some of which may have additional parameters
97 specified using an ``=``.
98 stream : I/O stream or None
99 The output stream to use. `None` will be treated as `sys.stdout`.
101 Raises
102 ------
103 ValueError
104 Raised if some show commands are not recognized.
105 """
107 pipeline_commands = {
108 "pipeline",
109 "config",
110 "history",
111 "tasks",
112 "dump-config",
113 "pipeline-graph",
114 "task-graph",
115 "subsets",
116 "inputs",
117 }
118 graph_commands = {"graph", "workflow", "uri"}
120 def __init__(self, show: list[str], stream: Any = None) -> None:
121 if stream is None:
122 # Defer assigning sys.stdout to allow click to redefine it if
123 # it wants. Assigning the default at class definition leads
124 # to confusion on reassignment.
125 stream = sys.stdout
126 commands: dict[str, list[str]] = defaultdict(list)
127 for value in show:
128 command, _, args = value.partition("=")
129 commands[command].append(args)
130 self.commands = commands
131 self.stream = stream
132 self.handled: set[str] = set()
134 known = self.pipeline_commands | self.graph_commands
135 unknown = set(commands) - known
136 if unknown:
137 raise ValueError(
138 f"Unknown value(s) for show: {unknown} (choose from '{', '.join(sorted(known))}')"
139 )
140 self.needs_full_qg: bool = "graph" in self.commands.keys() or "uri" in self.commands.keys()
142 @property
143 def unhandled(self) -> frozenset[str]:
144 """Return the commands that have not yet been processed."""
145 return frozenset(set(self.commands) - self.handled)
147 def show_pipeline_info(self, pipeline_graph_factory: PipelineGraphFactory) -> None:
148 """Display useful information about the pipeline.
150 Parameters
151 ----------
152 pipeline_graph_factory : `PipelineGraphFactory`
153 Factory object that holds the pipeline and can produce a pipeline
154 graph.
155 """
156 for command in self.pipeline_commands:
157 if command not in self.commands:
158 continue
159 args = self.commands[command]
160 match command:
161 case "pipeline":
162 print(pipeline_graph_factory.pipeline, file=self.stream)
163 case "config":
164 for arg in args:
165 self._showConfig(pipeline_graph_factory(visualization_only=True), arg, False)
166 case "dump-config":
167 for arg in args:
168 self._showConfig(pipeline_graph_factory(visualization_only=True), arg, True)
169 case "history":
170 for arg in args:
171 self._showConfigHistory(pipeline_graph_factory(visualization_only=True), arg)
172 case "tasks":
173 self._showTaskHierarchy(pipeline_graph_factory(visualization_only=True))
174 case "subsets":
175 print(
176 "\n".join(
177 f"{subset}:\n" + "\n".join(f" - {s}" for s in sorted(tasks))
178 for subset, tasks in dict(pipeline_graph_factory.pipeline.subsets).items()
179 ),
180 file=self.stream,
181 )
182 case "pipeline-graph":
183 visualization.show(
184 pipeline_graph_factory(visualization_only=True), self.stream, dataset_types=True
185 )
186 case "task-graph":
187 visualization.show(
188 pipeline_graph_factory(visualization_only=True), self.stream, dataset_types=False
189 )
190 case "inputs":
191 pg = pipeline_graph_factory(visualization_only=True)
192 for dataset_type_name, dataset_type_node in sorted(pg.iter_overall_inputs()):
193 assert dataset_type_node is not None, "Pipeline graph was just resolved."
194 print(
195 dataset_type_name,
196 dataset_type_node.dimensions,
197 dataset_type_node.storage_class_name,
198 )
199 case _:
200 raise RuntimeError(f"Unexpectedly tried to process command {command!r}.")
201 self.handled.add(command)
203 def show_graph_info(
204 self,
205 qg: PredictedQuantumGraph,
206 butler_config: ResourcePathExpression | None = None,
207 ) -> None:
208 """Show information associated with this graph.
210 Parameters
211 ----------
212 qg : `lsst.pipe.base.quantum_graph.PredictedQuantumGraph`
213 Quantum graph.
214 butler_config : convertible to `lsst.resources.ResourcePath`, optional
215 Path to configuration for the butler.
216 """
217 for command in self.graph_commands:
218 if command not in self.commands:
219 continue
220 match command:
221 case "graph":
222 self._showGraph(qg)
223 case "uri":
224 if butler_config is None:
225 raise ValueError("Showing URIs requires the -b option")
226 self._showUri(qg, butler_config)
227 case "workflow":
228 self._showWorkflow(qg)
229 case _:
230 raise RuntimeError(f"Unexpectedly tried to process command {command!r}.")
231 self.handled.add(command)
233 def _showConfig(self, pipeline_graph: PipelineGraph, showArgs: str, dumpFullConfig: bool) -> None:
234 """Show task configuration
236 Parameters
237 ----------
238 pipeline : `lsst.pipe.base.pipeline_graph.Pipeline`
239 Pipeline definition as a graph.
240 showArgs : `str`
241 Defines what to show
242 dumpFullConfig : `bool`
243 If true then dump complete task configuration with all imports.
244 """
245 stream: Any = self.stream
246 if dumpFullConfig:
247 # Task label can be given with this option
248 taskName = showArgs
249 else:
250 # The argument can have form [TaskLabel::][pattern:NOIGNORECASE]
251 matConfig = re.search(r"^(?:(\w+)::)?(?:config.)?(.+)?", showArgs)
252 assert matConfig is not None, "regex always matches"
253 taskName = matConfig.group(1)
254 pattern = matConfig.group(2)
255 if pattern:
256 stream = _FilteredStream(pattern, stream=stream)
258 tasks = util.filterTaskNodes(pipeline_graph, taskName)
259 if not tasks:
260 raise ValueError(f"Pipeline has no tasks named {taskName}")
262 for task_node in tasks:
263 print(f"### Configuration for task `{task_node.label}'", file=self.stream)
264 task_node.config.saveToStream(stream, root="config", skipImports=not dumpFullConfig)
266 def _showConfigHistory(self, pipeline_graph: PipelineGraph, showArgs: str) -> None:
267 """Show history for task configuration.
269 Parameters
270 ----------
271 pipeline_graph : `lsst.pipe.base.pipeline_graph.PipelineGraph`
272 Pipeline definition as a graph.
273 showArgs : `str`
274 Defines what to show
275 """
276 taskName = None
277 pattern = None
278 matHistory = re.search(r"^(?:(\w+)::)?(?:config[.])?(.+)", showArgs)
279 if matHistory:
280 taskName = matHistory.group(1)
281 pattern = matHistory.group(2)
282 if not pattern:
283 raise ValueError("Please provide a value with --show history (e.g. history=Task::param)")
285 tasks = util.filterTaskNodes(pipeline_graph, taskName)
286 if not tasks:
287 raise ValueError(f"Pipeline has no tasks named {taskName}")
289 found = False
290 for task_node in tasks:
291 config = task_node.config
293 # Look for any matches in the config hierarchy for this name
294 for nmatch, thisName in enumerate(fnmatch.filter(config.names(), pattern)):
295 if nmatch > 0:
296 print("", file=self.stream)
298 cpath, _, cname = thisName.rpartition(".")
299 try:
300 if not cpath:
301 # looking for top-level field
302 hconfig = task_node.config
303 else:
304 hconfig = eval("config." + cpath, {}, {"config": config})
305 except AttributeError:
306 print(
307 f"Error: Unable to extract attribute {cpath} from task {task_node.label}",
308 file=sys.stderr,
309 )
310 hconfig = None
312 # Sometimes we end up with a non-Config so skip those
313 if isinstance(hconfig, pexConfig.Config | pexConfig.ConfigurableInstance) and hasattr(
314 hconfig, cname
315 ):
316 print(f"### Configuration field for task `{task_node.label}'", file=self.stream)
317 print(pexConfigHistory.format(hconfig, cname), file=self.stream)
318 found = True
320 if not found:
321 raise ValueError(f"None of the tasks has field matching {pattern}")
323 def _showTaskHierarchy(self, pipeline_graph: PipelineGraph) -> None:
324 """Print task hierarchy to stdout
326 Parameters
327 ----------
328 pipeline_graph : `lsst.pipe.base.pipeline_graph.PipelineGraph`
329 Pipeline definition as a graph.
330 """
331 for task_node in pipeline_graph.tasks.values():
332 print(f"### Subtasks for task `{task_node.task_class_name}'", file=self.stream)
334 for configName, taskName in util.subTaskIter(task_node.config):
335 print(f"{configName}: {taskName}", file=self.stream)
337 def _showGraph(self, qg: PredictedQuantumGraph) -> None:
338 """Print quanta information to stdout
340 Parameters
341 ----------
342 qg : `lsst.pipe.base.quantum_graph.PredictedQuantumGraph`
343 Quantum graph.
344 """
346 def _print_refs(
347 mapping: NamedKeyMapping[DatasetType, tuple[DatasetRef, ...]],
348 datastore_records: Mapping[str, DatastoreRecordData],
349 ) -> None:
350 """Print complete information on quantum input or output refs."""
351 for key, refs in mapping.items():
352 if refs:
353 print(f" {key}:", file=self.stream)
354 for ref in refs:
355 print(f" - {ref}", file=self.stream)
356 for datastore_name, record_data in datastore_records.items():
357 if record_map := record_data.records.get(ref.id):
358 print(f" records for {datastore_name}:", file=self.stream)
359 for table_name, records in record_map.items():
360 print(f" - {table_name}:", file=self.stream)
361 for record in records:
362 print(f" - {record}:", file=self.stream)
363 else:
364 print(f" {key}: []", file=self.stream)
366 for task_label, quanta_for_task in qg.quanta_by_task.items():
367 print(f"{task_label} ({qg.pipeline_graph.tasks[task_label].task_class_name})", file=self.stream)
368 execution_quanta = qg.build_execution_quanta(task_label=task_label)
369 for data_id, quantum_id in quanta_for_task.items():
370 quantum = execution_quanta[quantum_id]
371 print(f" Quantum {quantum_id} dataId={data_id}:", file=self.stream)
372 print(" inputs:", file=self.stream)
373 _print_refs(quantum.inputs, quantum.datastore_records)
374 print(" outputs:", file=self.stream)
375 _print_refs(quantum.outputs, quantum.datastore_records)
377 def _showWorkflow(self, qg: PredictedQuantumGraph) -> None:
378 """Print quanta information and dependency to stdout
380 Parameters
381 ----------
382 qg : `lsst.pipe.base.quantum_graph.PredictedQuantumGraph`
383 Quantum graph.
384 """
385 xgraph = qg.quantum_only_xgraph
386 for child_id, child_data in xgraph.nodes.items():
387 print(f"Quantum {child_id}: {child_data['pipeline_node'].task_class_name}", file=self.stream)
388 for parent_id in xgraph.predecessors(child_id):
389 print(f"Parent Quantum {parent_id} - Child Quantum {child_id}", file=self.stream)
391 def _showUri(self, qg: PredictedQuantumGraph, butler_config: ResourcePathExpression) -> None:
392 """Print input and predicted output URIs to stdout.
394 Parameters
395 ----------
396 qg : `lsst.pipe.base.quantum_graph.PredictedQuantumGraph`
397 Quantum graph.
398 butler_config : convertible to `lsst.resources.ResourcePath`
399 Path to configuration for the butler.
400 """
402 def dumpURIs(butler: Butler, thisRef: DatasetRef) -> None:
403 primary, components = butler.getURIs(thisRef, predict=True, run="TBD")
404 if primary:
405 print(f" {primary}", file=self.stream)
406 else:
407 print(" (disassembled artifact)", file=self.stream)
408 for compName, compUri in components.items():
409 print(f" {compName}: {compUri}", file=self.stream)
411 with Butler.from_config(butler_config) as butler:
412 xgraph = qg.quantum_only_xgraph
413 execution_quanta = qg.build_execution_quanta()
414 for quantum_id, quantum_data in xgraph.nodes.items():
415 print(
416 f"Quantum {quantum_id}: {quantum_data['pipeline_node'].task_class_name}", file=self.stream
417 )
418 print(" inputs:", file=self.stream)
419 execution_quantum = execution_quanta[quantum_id]
420 for refs in execution_quantum.inputs.values():
421 for ref in refs:
422 dumpURIs(butler, ref)
423 print(" outputs:", file=self.stream)
424 for refs in execution_quantum.outputs.values():
425 for ref in refs:
426 dumpURIs(butler, ref)