Coverage for tests / test_graphBuilder.py: 20%

93 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:47 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <https://www.gnu.org/licenses/>. 

27 

28"""Tests of things related to the GraphBuilder class.""" 

29 

30import io 

31import logging 

32import unittest 

33 

34import lsst.utils.tests 

35from lsst.daf.butler import Butler, DatasetType 

36from lsst.daf.butler.registry import UserExpressionError 

37from lsst.pipe.base import PipelineGraph, QuantumGraph 

38from lsst.pipe.base.all_dimensions_quantum_graph_builder import ( 

39 AllDimensionsQuantumGraphBuilder, 

40 DatasetQueryConstraintVariant, 

41) 

42from lsst.pipe.base.tests import simpleQGraph 

43from lsst.pipe.base.tests.mocks import ( 

44 DynamicConnectionConfig, 

45 DynamicTestPipelineTask, 

46 DynamicTestPipelineTaskConfig, 

47 MockDataset, 

48 MockStorageClass, 

49) 

50from lsst.utils.tests import temporaryDirectory 

51 

52_LOG = logging.getLogger(__name__) 

53 

54 

55class GraphBuilderTestCase(unittest.TestCase): 

56 """Test graph building.""" 

57 

58 def _assertGraph(self, graph: QuantumGraph) -> None: 

59 """Check basic structure of the graph.""" 

60 for taskDef in graph.iterTaskGraph(): 

61 refs = graph.initOutputRefs(taskDef) 

62 # task has one initOutput, second ref is for config dataset 

63 self.assertEqual(len(refs), 2) 

64 

65 self.assertEqual(len(list(graph.inputQuanta)), 1) 

66 

67 # This includes only "packages" dataset for now. 

68 refs = graph.globalInitOutputRefs() 

69 self.assertEqual(len(refs), 1) 

70 

71 def testDefault(self): 

72 """Simple test to verify makeSimpleQGraph can be used to make a Quantum 

73 Graph. 

74 """ 

75 with temporaryDirectory() as root: 

76 # makeSimpleQGraph calls GraphBuilder. 

77 butler, qgraph = simpleQGraph.makeSimpleQGraph(root=root) 

78 self.enterContext(butler) 

79 # by default makeSimpleQGraph makes a graph with 5 nodes 

80 self.assertEqual(len(qgraph), 5) 

81 self._assertGraph(qgraph) 

82 constraint = DatasetQueryConstraintVariant.OFF 

83 _, qgraph2 = simpleQGraph.makeSimpleQGraph( 

84 butler=butler, datasetQueryConstraint=constraint, callPopulateButler=False 

85 ) 

86 # When all outputs are random resolved refs, direct comparison 

87 # of graphs does not work because IDs are different. Can only 

88 # verify the number of quanta in the graph without doing something 

89 # terribly complicated. 

90 self.assertEqual(len(qgraph2), 5) 

91 constraint = DatasetQueryConstraintVariant.fromExpression("add_dataset0") 

92 _, qgraph3 = simpleQGraph.makeSimpleQGraph( 

93 butler=butler, datasetQueryConstraint=constraint, callPopulateButler=False 

94 ) 

95 self.assertEqual(len(qgraph3), 5) 

96 

97 def test_empty_qg(self): 

98 """Test that making an empty QG doesn't raise exceptions.""" 

99 config = DynamicTestPipelineTaskConfig() 

100 config.inputs["i"] = DynamicConnectionConfig( 

101 dataset_type_name="input", 

102 storage_class="StructuredDataDict", 

103 dimensions=["detector"], 

104 ) 

105 config.init_inputs["ii"] = DynamicConnectionConfig( 

106 dataset_type_name="init_input", 

107 storage_class="StructuredDataDict", 

108 ) 

109 config.outputs["o"] = DynamicConnectionConfig( 

110 dataset_type_name="output", 

111 storage_class="StructuredDataDict", 

112 dimensions=["detector"], 

113 ) 

114 config.init_outputs["io"] = DynamicConnectionConfig( 

115 dataset_type_name="init_output", 

116 storage_class="StructuredDataDict", 

117 ) 

118 pipeline_graph = PipelineGraph() 

119 pipeline_graph.add_task("a", DynamicTestPipelineTask, config) 

120 with temporaryDirectory() as repo_path: 

121 Butler.makeRepo(repo_path) 

122 butler = Butler.from_config(repo_path, writeable=True, run="test_empty_qg") 

123 self.enterContext(butler) 

124 MockStorageClass.get_or_register_mock("StructuredDataDict") 

125 butler.registry.registerDatasetType( 

126 DatasetType( 

127 "input", 

128 dimensions=butler.dimensions.conform(["detector"]), 

129 storageClass="_mock_StructuredDataDict", 

130 ) 

131 ) 

132 init_input_dataset_type = DatasetType( 

133 "init_input", 

134 dimensions=butler.dimensions.empty, 

135 storageClass="_mock_StructuredDataDict", 

136 ) 

137 butler.registry.registerDatasetType(init_input_dataset_type) 

138 # Init-input initially exists, but input does not (hence empty QG). 

139 butler.put( 

140 MockDataset( 

141 dataset_id=None, 

142 dataset_type=init_input_dataset_type.to_simple(), 

143 data_id={}, 

144 run=butler.run, 

145 ), 

146 "init_input", 

147 ) 

148 # Attempt to make QG; should just be empty, with no exceptions. 

149 self.assertFalse(AllDimensionsQuantumGraphBuilder(pipeline_graph, butler).build()) 

150 # Initialize the output run, try again, with same expected result. 

151 pipeline_graph.register_dataset_types(butler) 

152 pipeline_graph.init_output_run(butler) 

153 self.assertFalse(AllDimensionsQuantumGraphBuilder(pipeline_graph, butler).build()) 

154 

155 # Inconsistent governor dimensions are no longer an error, so this test 

156 # fails with the new query system. We should probably check instead that 

157 # logging includes an explanation for the empty QG, but it might not 

158 # because explain_no_results isn't good enough yet. 

159 @unittest.expectedFailure 

160 def testAddInstrumentMismatch(self): 

161 """Verify that a RuntimeError is raised if the instrument in the user 

162 query does not match the instrument in the pipeline. 

163 """ 

164 with temporaryDirectory() as root: 

165 pipeline = simpleQGraph.makeSimplePipeline( 

166 nQuanta=5, instrument="lsst.pipe.base.tests.simpleQGraph.SimpleInstrument" 

167 ) 

168 with self.assertRaises(UserExpressionError): 

169 butler, _ = simpleQGraph.makeSimpleQGraph( 

170 root=root, pipeline=pipeline, userQuery="instrument = 'foo'" 

171 ) 

172 self.enterContext(butler) 

173 

174 def testUserQueryBind(self): 

175 """Verify that bind values work for user query.""" 

176 pipeline = simpleQGraph.makeSimplePipeline( 

177 nQuanta=5, instrument="lsst.pipe.base.tests.simpleQGraph.SimpleInstrument" 

178 ) 

179 instr = simpleQGraph.SimpleInstrument.getName() 

180 # With a literal in the user query 

181 with temporaryDirectory() as root: 

182 butler, _ = simpleQGraph.makeSimpleQGraph( 

183 root=root, pipeline=pipeline, userQuery=f"instrument = '{instr}'" 

184 ) 

185 self.enterContext(butler) 

186 # With a bind for the user query 

187 with temporaryDirectory() as root: 

188 butler, _ = simpleQGraph.makeSimpleQGraph( 

189 root=root, pipeline=pipeline, userQuery="instrument = :instr", bind={"instr": instr} 

190 ) 

191 self.enterContext(butler) 

192 

193 def test_datastore_records(self): 

194 """Test for generating datastore records.""" 

195 with temporaryDirectory() as root: 

196 # need FileDatastore for this tests 

197 butler, qgraph1 = simpleQGraph.makeSimpleQGraph( 

198 root=root, inMemory=False, makeDatastoreRecords=True 

199 ) 

200 self.enterContext(butler) 

201 

202 # save and reload 

203 buffer = io.BytesIO() 

204 qgraph1.save(buffer) 

205 buffer.seek(0) 

206 qgraph2 = QuantumGraph.load(buffer, universe=butler.dimensions) 

207 del buffer 

208 

209 for qgraph in (qgraph1, qgraph2): 

210 self.assertEqual(len(qgraph), 5) 

211 for i, qnode in enumerate(qgraph): 

212 quantum = qnode.quantum 

213 self.assertIsNotNone(quantum.datastore_records) 

214 # only the first quantum has a pre-existing input 

215 if i == 0: 

216 datastore_name = "FileDatastore@<butlerRoot>" 

217 self.assertEqual(set(quantum.datastore_records.keys()), {datastore_name}) 

218 records_data = quantum.datastore_records[datastore_name] 

219 records = dict(records_data.records) 

220 self.assertEqual(len(records), 1) 

221 _, records = records.popitem() 

222 records = records["file_datastore_records"] 

223 self.assertEqual( 

224 [record.path for record in records], 

225 ["test/add_dataset0/add_dataset0_INSTR_det0_test.pickle"], 

226 ) 

227 else: 

228 self.assertEqual(quantum.datastore_records, {}) 

229 

230 

231if __name__ == "__main__": 

232 lsst.utils.tests.init() 

233 unittest.main()