Coverage for python / lsst / ctrl / mpexec / showInfo.py: 11%

210 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 08:50 +0000

1# This file is part of ctrl_mpexec. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30__all__ = ["ShowInfo"] 

31 

32import fnmatch 

33import re 

34import sys 

35from collections import defaultdict 

36from collections.abc import Mapping 

37from typing import Any 

38 

39import lsst.pex.config as pexConfig 

40import lsst.pex.config.history as pexConfigHistory 

41from lsst.daf.butler import Butler, DatasetRef, DatasetType, NamedKeyMapping 

42from lsst.daf.butler.datastore.record_data import DatastoreRecordData 

43from lsst.pipe.base import PipelineGraph 

44from lsst.pipe.base.pipeline_graph import visualization 

45from lsst.pipe.base.quantum_graph import PredictedQuantumGraph 

46from lsst.resources import ResourcePathExpression 

47 

48from . import util 

49from ._pipeline_graph_factory import PipelineGraphFactory 

50 

51 

52class _FilteredStream: 

53 """A file-like object that filters some config fields. 

54 

55 Note 

56 ---- 

57 This class depends on implementation details of ``Config.saveToStream`` 

58 methods, in particular that that method uses single call to write() 

59 method to save information about single config field, and that call 

60 combines comments string(s) for a field and field path and value. 

61 This class will not work reliably on the "import" strings, so imports 

62 should be disabled by passing ``skipImports=True`` to ``saveToStream()``. 

63 """ 

64 

65 def __init__(self, pattern: str, stream: Any = None) -> None: 

66 if stream is None: 

67 stream = sys.stdout 

68 self.stream = stream 

69 # obey case if pattern isn't lowercase or requests NOIGNORECASE 

70 mat = re.search(r"(.*):NOIGNORECASE$", pattern) 

71 

72 if mat: 

73 pattern = mat.group(1) 

74 self._pattern = re.compile(fnmatch.translate(pattern)) 

75 else: 

76 if pattern != pattern.lower(): 

77 print( 

78 f'Matching "{pattern}" without regard to case (append :NOIGNORECASE to prevent this)', 

79 file=self.stream, 

80 ) 

81 self._pattern = re.compile(fnmatch.translate(pattern), re.IGNORECASE) 

82 

83 def write(self, showStr: str) -> None: 

84 # Strip off doc string line(s) and cut off at "=" for string matching 

85 matchStr = showStr.rstrip().split("\n")[-1].split("=")[0] 

86 if self._pattern.search(matchStr): 

87 self.stream.write(showStr) 

88 

89 

90class ShowInfo: 

91 """Show information about a pipeline or quantum graph. 

92 

93 Parameters 

94 ---------- 

95 show : `list` [`str`] 

96 A list of show commands, some of which may have additional parameters 

97 specified using an ``=``. 

98 stream : I/O stream or None 

99 The output stream to use. `None` will be treated as `sys.stdout`. 

100 

101 Raises 

102 ------ 

103 ValueError 

104 Raised if some show commands are not recognized. 

105 """ 

106 

107 pipeline_commands = { 

108 "pipeline", 

109 "config", 

110 "history", 

111 "tasks", 

112 "dump-config", 

113 "pipeline-graph", 

114 "task-graph", 

115 "subsets", 

116 "inputs", 

117 } 

118 graph_commands = {"graph", "workflow", "uri"} 

119 

120 def __init__(self, show: list[str], stream: Any = None) -> None: 

121 if stream is None: 

122 # Defer assigning sys.stdout to allow click to redefine it if 

123 # it wants. Assigning the default at class definition leads 

124 # to confusion on reassignment. 

125 stream = sys.stdout 

126 commands: dict[str, list[str]] = defaultdict(list) 

127 for value in show: 

128 command, _, args = value.partition("=") 

129 commands[command].append(args) 

130 self.commands = commands 

131 self.stream = stream 

132 self.handled: set[str] = set() 

133 

134 known = self.pipeline_commands | self.graph_commands 

135 unknown = set(commands) - known 

136 if unknown: 

137 raise ValueError( 

138 f"Unknown value(s) for show: {unknown} (choose from '{', '.join(sorted(known))}')" 

139 ) 

140 self.needs_full_qg: bool = "graph" in self.commands.keys() or "uri" in self.commands.keys() 

141 

142 @property 

143 def unhandled(self) -> frozenset[str]: 

144 """Return the commands that have not yet been processed.""" 

145 return frozenset(set(self.commands) - self.handled) 

146 

147 def show_pipeline_info(self, pipeline_graph_factory: PipelineGraphFactory) -> None: 

148 """Display useful information about the pipeline. 

149 

150 Parameters 

151 ---------- 

152 pipeline_graph_factory : `PipelineGraphFactory` 

153 Factory object that holds the pipeline and can produce a pipeline 

154 graph. 

155 """ 

156 for command in self.pipeline_commands: 

157 if command not in self.commands: 

158 continue 

159 args = self.commands[command] 

160 match command: 

161 case "pipeline": 

162 print(pipeline_graph_factory.pipeline, file=self.stream) 

163 case "config": 

164 for arg in args: 

165 self._showConfig(pipeline_graph_factory(visualization_only=True), arg, False) 

166 case "dump-config": 

167 for arg in args: 

168 self._showConfig(pipeline_graph_factory(visualization_only=True), arg, True) 

169 case "history": 

170 for arg in args: 

171 self._showConfigHistory(pipeline_graph_factory(visualization_only=True), arg) 

172 case "tasks": 

173 self._showTaskHierarchy(pipeline_graph_factory(visualization_only=True)) 

174 case "subsets": 

175 print( 

176 "\n".join( 

177 f"{subset}:\n" + "\n".join(f" - {s}" for s in sorted(tasks)) 

178 for subset, tasks in dict(pipeline_graph_factory.pipeline.subsets).items() 

179 ), 

180 file=self.stream, 

181 ) 

182 case "pipeline-graph": 

183 visualization.show( 

184 pipeline_graph_factory(visualization_only=True), self.stream, dataset_types=True 

185 ) 

186 case "task-graph": 

187 visualization.show( 

188 pipeline_graph_factory(visualization_only=True), self.stream, dataset_types=False 

189 ) 

190 case "inputs": 

191 pg = pipeline_graph_factory(visualization_only=True) 

192 for dataset_type_name, dataset_type_node in sorted(pg.iter_overall_inputs()): 

193 assert dataset_type_node is not None, "Pipeline graph was just resolved." 

194 print( 

195 dataset_type_name, 

196 dataset_type_node.dimensions, 

197 dataset_type_node.storage_class_name, 

198 ) 

199 case _: 

200 raise RuntimeError(f"Unexpectedly tried to process command {command!r}.") 

201 self.handled.add(command) 

202 

203 def show_graph_info( 

204 self, 

205 qg: PredictedQuantumGraph, 

206 butler_config: ResourcePathExpression | None = None, 

207 ) -> None: 

208 """Show information associated with this graph. 

209 

210 Parameters 

211 ---------- 

212 qg : `lsst.pipe.base.quantum_graph.PredictedQuantumGraph` 

213 Quantum graph. 

214 butler_config : convertible to `lsst.resources.ResourcePath`, optional 

215 Path to configuration for the butler. 

216 """ 

217 for command in self.graph_commands: 

218 if command not in self.commands: 

219 continue 

220 match command: 

221 case "graph": 

222 self._showGraph(qg) 

223 case "uri": 

224 if butler_config is None: 

225 raise ValueError("Showing URIs requires the -b option") 

226 self._showUri(qg, butler_config) 

227 case "workflow": 

228 self._showWorkflow(qg) 

229 case _: 

230 raise RuntimeError(f"Unexpectedly tried to process command {command!r}.") 

231 self.handled.add(command) 

232 

233 def _showConfig(self, pipeline_graph: PipelineGraph, showArgs: str, dumpFullConfig: bool) -> None: 

234 """Show task configuration 

235 

236 Parameters 

237 ---------- 

238 pipeline : `lsst.pipe.base.pipeline_graph.Pipeline` 

239 Pipeline definition as a graph. 

240 showArgs : `str` 

241 Defines what to show 

242 dumpFullConfig : `bool` 

243 If true then dump complete task configuration with all imports. 

244 """ 

245 stream: Any = self.stream 

246 if dumpFullConfig: 

247 # Task label can be given with this option 

248 taskName = showArgs 

249 else: 

250 # The argument can have form [TaskLabel::][pattern:NOIGNORECASE] 

251 matConfig = re.search(r"^(?:(\w+)::)?(?:config.)?(.+)?", showArgs) 

252 assert matConfig is not None, "regex always matches" 

253 taskName = matConfig.group(1) 

254 pattern = matConfig.group(2) 

255 if pattern: 

256 stream = _FilteredStream(pattern, stream=stream) 

257 

258 tasks = util.filterTaskNodes(pipeline_graph, taskName) 

259 if not tasks: 

260 raise ValueError(f"Pipeline has no tasks named {taskName}") 

261 

262 for task_node in tasks: 

263 print(f"### Configuration for task `{task_node.label}'", file=self.stream) 

264 task_node.config.saveToStream(stream, root="config", skipImports=not dumpFullConfig) 

265 

266 def _showConfigHistory(self, pipeline_graph: PipelineGraph, showArgs: str) -> None: 

267 """Show history for task configuration. 

268 

269 Parameters 

270 ---------- 

271 pipeline_graph : `lsst.pipe.base.pipeline_graph.PipelineGraph` 

272 Pipeline definition as a graph. 

273 showArgs : `str` 

274 Defines what to show 

275 """ 

276 taskName = None 

277 pattern = None 

278 matHistory = re.search(r"^(?:(\w+)::)?(?:config[.])?(.+)", showArgs) 

279 if matHistory: 

280 taskName = matHistory.group(1) 

281 pattern = matHistory.group(2) 

282 if not pattern: 

283 raise ValueError("Please provide a value with --show history (e.g. history=Task::param)") 

284 

285 tasks = util.filterTaskNodes(pipeline_graph, taskName) 

286 if not tasks: 

287 raise ValueError(f"Pipeline has no tasks named {taskName}") 

288 

289 found = False 

290 for task_node in tasks: 

291 config = task_node.config 

292 

293 # Look for any matches in the config hierarchy for this name 

294 for nmatch, thisName in enumerate(fnmatch.filter(config.names(), pattern)): 

295 if nmatch > 0: 

296 print("", file=self.stream) 

297 

298 cpath, _, cname = thisName.rpartition(".") 

299 try: 

300 if not cpath: 

301 # looking for top-level field 

302 hconfig = task_node.config 

303 else: 

304 hconfig = eval("config." + cpath, {}, {"config": config}) 

305 except AttributeError: 

306 print( 

307 f"Error: Unable to extract attribute {cpath} from task {task_node.label}", 

308 file=sys.stderr, 

309 ) 

310 hconfig = None 

311 

312 # Sometimes we end up with a non-Config so skip those 

313 if isinstance(hconfig, pexConfig.Config | pexConfig.ConfigurableInstance) and hasattr( 

314 hconfig, cname 

315 ): 

316 print(f"### Configuration field for task `{task_node.label}'", file=self.stream) 

317 print(pexConfigHistory.format(hconfig, cname), file=self.stream) 

318 found = True 

319 

320 if not found: 

321 raise ValueError(f"None of the tasks has field matching {pattern}") 

322 

323 def _showTaskHierarchy(self, pipeline_graph: PipelineGraph) -> None: 

324 """Print task hierarchy to stdout 

325 

326 Parameters 

327 ---------- 

328 pipeline_graph : `lsst.pipe.base.pipeline_graph.PipelineGraph` 

329 Pipeline definition as a graph. 

330 """ 

331 for task_node in pipeline_graph.tasks.values(): 

332 print(f"### Subtasks for task `{task_node.task_class_name}'", file=self.stream) 

333 

334 for configName, taskName in util.subTaskIter(task_node.config): 

335 print(f"{configName}: {taskName}", file=self.stream) 

336 

337 def _showGraph(self, qg: PredictedQuantumGraph) -> None: 

338 """Print quanta information to stdout 

339 

340 Parameters 

341 ---------- 

342 qg : `lsst.pipe.base.quantum_graph.PredictedQuantumGraph` 

343 Quantum graph. 

344 """ 

345 

346 def _print_refs( 

347 mapping: NamedKeyMapping[DatasetType, tuple[DatasetRef, ...]], 

348 datastore_records: Mapping[str, DatastoreRecordData], 

349 ) -> None: 

350 """Print complete information on quantum input or output refs.""" 

351 for key, refs in mapping.items(): 

352 if refs: 

353 print(f" {key}:", file=self.stream) 

354 for ref in refs: 

355 print(f" - {ref}", file=self.stream) 

356 for datastore_name, record_data in datastore_records.items(): 

357 if record_map := record_data.records.get(ref.id): 

358 print(f" records for {datastore_name}:", file=self.stream) 

359 for table_name, records in record_map.items(): 

360 print(f" - {table_name}:", file=self.stream) 

361 for record in records: 

362 print(f" - {record}:", file=self.stream) 

363 else: 

364 print(f" {key}: []", file=self.stream) 

365 

366 for task_label, quanta_for_task in qg.quanta_by_task.items(): 

367 print(f"{task_label} ({qg.pipeline_graph.tasks[task_label].task_class_name})", file=self.stream) 

368 execution_quanta = qg.build_execution_quanta(task_label=task_label) 

369 for data_id, quantum_id in quanta_for_task.items(): 

370 quantum = execution_quanta[quantum_id] 

371 print(f" Quantum {quantum_id} dataId={data_id}:", file=self.stream) 

372 print(" inputs:", file=self.stream) 

373 _print_refs(quantum.inputs, quantum.datastore_records) 

374 print(" outputs:", file=self.stream) 

375 _print_refs(quantum.outputs, quantum.datastore_records) 

376 

377 def _showWorkflow(self, qg: PredictedQuantumGraph) -> None: 

378 """Print quanta information and dependency to stdout 

379 

380 Parameters 

381 ---------- 

382 qg : `lsst.pipe.base.quantum_graph.PredictedQuantumGraph` 

383 Quantum graph. 

384 """ 

385 xgraph = qg.quantum_only_xgraph 

386 for child_id, child_data in xgraph.nodes.items(): 

387 print(f"Quantum {child_id}: {child_data['pipeline_node'].task_class_name}", file=self.stream) 

388 for parent_id in xgraph.predecessors(child_id): 

389 print(f"Parent Quantum {parent_id} - Child Quantum {child_id}", file=self.stream) 

390 

391 def _showUri(self, qg: PredictedQuantumGraph, butler_config: ResourcePathExpression) -> None: 

392 """Print input and predicted output URIs to stdout. 

393 

394 Parameters 

395 ---------- 

396 qg : `lsst.pipe.base.quantum_graph.PredictedQuantumGraph` 

397 Quantum graph. 

398 butler_config : convertible to `lsst.resources.ResourcePath` 

399 Path to configuration for the butler. 

400 """ 

401 

402 def dumpURIs(butler: Butler, thisRef: DatasetRef) -> None: 

403 primary, components = butler.getURIs(thisRef, predict=True, run="TBD") 

404 if primary: 

405 print(f" {primary}", file=self.stream) 

406 else: 

407 print(" (disassembled artifact)", file=self.stream) 

408 for compName, compUri in components.items(): 

409 print(f" {compName}: {compUri}", file=self.stream) 

410 

411 with Butler.from_config(butler_config) as butler: 

412 xgraph = qg.quantum_only_xgraph 

413 execution_quanta = qg.build_execution_quanta() 

414 for quantum_id, quantum_data in xgraph.nodes.items(): 

415 print( 

416 f"Quantum {quantum_id}: {quantum_data['pipeline_node'].task_class_name}", file=self.stream 

417 ) 

418 print(" inputs:", file=self.stream) 

419 execution_quantum = execution_quanta[quantum_id] 

420 for refs in execution_quantum.inputs.values(): 

421 for ref in refs: 

422 dumpURIs(butler, ref) 

423 print(" outputs:", file=self.stream) 

424 for refs in execution_quantum.outputs.values(): 

425 for ref in refs: 

426 dumpURIs(butler, ref)