Coverage for python / lsst / pipe / base / tests / pipelineStepTester.py: 0%

50 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-01 08:19 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28"""Utility to facilitate testing of pipelines consisting of multiple steps.""" 

29 

30__all__ = ["PipelineStepTester"] 

31 

32import dataclasses 

33import unittest 

34import warnings 

35 

36from lsst.daf.butler import Butler, DatasetType 

37from lsst.pipe.base import Pipeline, PipelineGraph 

38from lsst.resources import ResourcePath, ResourcePathExpression 

39 

40 

41@dataclasses.dataclass 

42class PipelineStepTester: 

43 """Utility class which facilitates testing of pipelines, optionally 

44 consisting of multiple steps. 

45 

46 Two sets will be constructed by looping over the entire pipeline or all 

47 named subsets within the pipeline: `pure_inputs` and `all_outputs`. 

48 

49 The `pure_inputs` set consists of all inputs which must be constructed and 

50 provided as an input into the pipeline (i.e., they will not be generated 

51 by the named pipeline). 

52 

53 The `all_outputs` set consists of all dataset types which are generated by 

54 the named pipeline, either as intermediates or as final outputs. 

55 

56 These sets will be checked against user-supplied sets to ensure that the 

57 named pipeline may still be run without raising a missing data error. 

58 

59 Attributes 

60 ---------- 

61 filename : `lsst.resources.ResourcePathExpression` 

62 The full path or URI to the pipeline YAML. 

63 step_suffixes : `list` [`str`] 

64 A list, in the order of data reduction, of the step subsets to check. 

65 Must include any initial "#". 

66 initial_dataset_types : `list` [`tuple`] 

67 Dataset types which require initial registry by the butler. Each 

68 element must be a tuple of the type name, dimensions, storage class, 

69 and calibration flag, as described in the constructor for 

70 `~lsst.daf.butler.DatasetType`. All tuple elements are required. 

71 expected_inputs : `set` [`str`] 

72 Dataset types expected as an input into the pipeline. 

73 expected_outputs : `set` [`str`] 

74 Dataset types expected to be produced as an output by the pipeline. 

75 pipeline_patches : `dict` [`str`, `str`], optional 

76 Any config overrides to be applied to the pipeline before testing it. 

77 This is rarely appropriate, and should be reserved for fields that 

78 cannot have a file-level default. The key must have the form 

79 "task:subtask.field". 

80 """ 

81 

82 filename: ResourcePathExpression 

83 step_suffixes: list[str] 

84 initial_dataset_types: list[tuple[str, set[str], str, bool]] 

85 expected_inputs: set[str] 

86 expected_outputs: set[str] 

87 pipeline_patches: dict[str, str] = dataclasses.field(default_factory=dict) 

88 

89 def register_dataset_types(self, butler: Butler) -> None: 

90 """Register any dataset types passed to the class constructor. 

91 

92 The types registered are those specified in 

93 ``self.initial_dataset_types``. 

94 

95 Parameters 

96 ---------- 

97 butler : `lsst.daf.butler.Butler` 

98 The (test) butler in which to register the types. 

99 """ 

100 for name, dimensions, storageClass, isCalibration in self.initial_dataset_types: 

101 butler.registry.registerDatasetType( 

102 DatasetType( 

103 name, 

104 dimensions, 

105 storageClass=storageClass, 

106 isCalibration=isCalibration, 

107 universe=butler.dimensions, 

108 ) 

109 ) 

110 

111 def run(self, butler: Butler, test_case: unittest.TestCase) -> None: 

112 """Run the test on all pipelines passed to the class constructor. 

113 

114 Parameters 

115 ---------- 

116 butler : `lsst.daf.butler.Butler` 

117 The butler in which to run the test. The butler must be writeable, 

118 and its state will be modified as part of the test. 

119 test_case : `unittest.TestCase` 

120 The test case in which this test is run. 

121 

122 Raises 

123 ------ 

124 AssertionError 

125 Raised if the pipeline requires inputs that are not in 

126 ``self.expected_inputs``, or fails to produce outputs that are in 

127 ``self.expected_outputs``. 

128 """ 

129 self.register_dataset_types(butler) 

130 

131 all_outputs: dict[str, DatasetType] = dict() 

132 pure_inputs: dict[str, str] = dict() 

133 

134 file_uri = ResourcePath(self.filename, forceDirectory=False) 

135 for suffix in self.step_suffixes: 

136 step_uri = file_uri.replace(fragment=suffix.removeprefix("#")) 

137 step_graph = self.load_pipeline_graph(step_uri) 

138 step_graph.resolve(butler.registry) 

139 

140 for name, _ in step_graph.iter_overall_inputs(): 

141 if name not in all_outputs: 

142 tasks = step_graph.consumers_of(name) 

143 pure_inputs[name] = f"{suffix}: {', '.join(task.label for task in tasks)}" 

144 all_outputs.update( 

145 { 

146 name: node.dataset_type 

147 for name, node in step_graph.dataset_types.items() 

148 if step_graph.producing_edge_of(name) is not None 

149 } 

150 ) 

151 

152 for node in step_graph.dataset_types.values(): 

153 butler.registry.registerDatasetType(node.dataset_type) 

154 

155 if not pure_inputs.keys() <= self.expected_inputs: 

156 missing = [] 

157 for type_name in pure_inputs.keys() - self.expected_inputs: 

158 suffix = pure_inputs[type_name] 

159 missing.append(type_name + (f" ({suffix})" if suffix else "")) 

160 raise AssertionError(f"Got unexpected pure_inputs: {missing}") 

161 

162 if not all_outputs.keys() >= self.expected_outputs: 

163 missing = list(self.expected_outputs - all_outputs.keys()) 

164 raise AssertionError(f"Missing expected_outputs: {missing}") 

165 

166 def load_pipeline_graph(self, uri: ResourcePathExpression) -> PipelineGraph: 

167 pipeline = Pipeline.from_uri(uri) 

168 for fullkey, value in self.pipeline_patches.items(): 

169 label, key = fullkey.split(":", maxsplit=1) 

170 try: 

171 pipeline.addConfigOverride(label, key, value) 

172 except LookupError as e: 

173 warnings.warn(f"{e}, skipping configuration {fullkey}={value}", UserWarning) 

174 return pipeline.to_graph()