Coverage for python / lsst / ctrl / mpexec / _pipeline_graph_factory.py: 18%

39 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-17 09:00 +0000

1# This file is part of ctrl_mpexec. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30__all__ = ("PipelineGraphFactory",) 

31 

32from lsst.daf.butler import Butler 

33from lsst.pipe.base import Pipeline, PipelineGraph 

34 

35 

36class PipelineGraphFactory: 

37 """A factory for building and caching a PipelineGraph. 

38 

39 Parameters 

40 ---------- 

41 pipeline : `lsst.pipe.base.Pipeline` 

42 Pipeline definition to start from. 

43 butler : `lsst.daf.butler.Butler` or `None`, optional 

44 Butler that can be used to resolve dataset type definitions and get 

45 dimension schema. 

46 select_tasks : `str`, optional 

47 String expression that filters the tasks in the pipeline graph. 

48 pipeline_graph : `lsst.pipe.base.pipeline_graph.PipelineGraph`, optional 

49 Already-constructed pipeline graph. 

50 """ 

51 

52 def __init__( 

53 self, 

54 pipeline: Pipeline | None = None, 

55 butler: Butler | None = None, 

56 select_tasks: str = "", 

57 *, 

58 pipeline_graph: PipelineGraph | None = None, 

59 ): 

60 if pipeline is None and pipeline_graph is None: 

61 raise TypeError("At least one of 'pipeline' and 'pipeline_graph' must not be `None`.") 

62 self._pipeline = pipeline 

63 self._registry = butler.registry if butler is not None else None 

64 self._select_tasks = select_tasks 

65 self._pipeline_graph: PipelineGraph | None = pipeline_graph 

66 self._resolved: bool = False 

67 self._for_visualization_only: bool = False 

68 

69 def __call__(self, *, resolve: bool = True, visualization_only: bool = False) -> PipelineGraph: 

70 if self._pipeline_graph is None: 

71 assert self._pipeline is not None, "Guaranteed at construction." 

72 self._pipeline_graph = self._pipeline.to_graph() 

73 if self._select_tasks: 

74 self._pipeline_graph = self._pipeline_graph.select(self._select_tasks) 

75 if resolve and not self._resolved: 

76 self._pipeline_graph.resolve(self._registry, visualization_only=visualization_only) 

77 self._resolved = True 

78 self._for_visualization_only = self._registry is None 

79 elif resolve and not visualization_only and self._for_visualization_only: 

80 raise RuntimeError("Cannot resolve pipeline graph without butler.") 

81 return self._pipeline_graph 

82 

83 @property 

84 def pipeline(self) -> Pipeline: 

85 """The original pipeline definition.""" 

86 if self._pipeline is None: 

87 raise RuntimeError("Cannot obtain pipeline from pipeline graph.") 

88 if self._select_tasks: 

89 raise RuntimeError( 

90 "The --select-tasks option cannot be used with operations that return or display a " 

91 "pipeline as YAML, since it only operates on the pipeline graph." 

92 ) 

93 return self._pipeline 

94 

95 def __bool__(self) -> bool: 

96 if self._pipeline is not None: 

97 return bool(self._pipeline) 

98 else: 

99 assert self._pipeline_graph is not None, "Guaranteed at construction." 

100 return bool(self._pipeline_graph.tasks)