Coverage for tests / test_trivial_qg_builder.py: 17%

42 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-17 08:59 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <https://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30import unittest 

31 

32from lsst.daf.butler import DatasetIdGenEnum, DatasetRef 

33from lsst.pipe.base.tests.mocks import DynamicConnectionConfig, InMemoryRepo 

34from lsst.pipe.base.trivial_quantum_graph_builder import TrivialQuantumGraphBuilder 

35 

36 

37class TrivialQuantumGraphBuilderTestCase(unittest.TestCase): 

38 """Tests for the TrivialQuantumGraphBuilder class.""" 

39 

40 def test_trivial_qg_builder(self) -> None: 

41 # Make a test helper with a mock task appropriate for the QG builder: 

42 # - the QG will have no branching 

43 # - while the task have different dimensions, they can be 1-1 related 

44 # (for the purposes of this test, at least). 

45 helper = InMemoryRepo("base.yaml") 

46 helper.add_task( 

47 "a", 

48 dimensions=["band", "detector"], 

49 prerequisite_inputs={ 

50 "prereq_connection": DynamicConnectionConfig( 

51 dataset_type_name="dataset_prereq0", dimensions=["detector"] 

52 ) 

53 }, 

54 ) 

55 helper.add_task( 

56 "b", 

57 dimensions=["physical_filter", "detector"], 

58 inputs={ 

59 "input_connection": DynamicConnectionConfig( 

60 dataset_type_name="dataset_auto1", dimensions=["band", "detector"] 

61 ), 

62 "extra_input_connection": DynamicConnectionConfig( 

63 dataset_type_name="dataset_extra1", dimensions=["physical_filter", "detector"] 

64 ), 

65 }, 

66 ) 

67 # Use the helper to make a quantum graph using the general-purpose 

68 # builder. This will cover all data IDs in the test dataset, which 

69 # includes 4 detectors, 3 physical_filters, and 2 bands. 

70 # This also has useful side-effects: it inserts the input datasets 

71 # and registers all dataset types. 

72 general_qg = helper.make_quantum_graph() 

73 # Make the trivial QG builder we want to test giving it only one 

74 # detector and one band (is the one that corresponds to only one 

75 # physical_filter). 

76 (a_data_id,) = [ 

77 data_id 

78 for data_id in general_qg.quanta_by_task["a"] 

79 if data_id["detector"] == 1 and data_id["band"] == "g" 

80 ] 

81 (b_data_id,) = [ 

82 data_id 

83 for data_id in general_qg.quanta_by_task["b"] 

84 if data_id["detector"] == 1 and data_id["band"] == "g" 

85 ] 

86 prereq_data_id = a_data_id.subset(["detector"]) 

87 dataset_auto0_ref = helper.butler.get_dataset(general_qg.datasets_by_type["dataset_auto0"][a_data_id]) 

88 assert dataset_auto0_ref is not None, "Input dataset should have been inserted above." 

89 dataset_prereq0_ref = helper.butler.get_dataset( 

90 general_qg.datasets_by_type["dataset_prereq0"][prereq_data_id] 

91 ) 

92 assert dataset_prereq0_ref is not None, "Input dataset should have been inserted above." 

93 trivial_builder = TrivialQuantumGraphBuilder( 

94 helper.pipeline_graph, 

95 helper.butler, 

96 data_ids={a_data_id.dimensions: a_data_id, b_data_id.dimensions: b_data_id}, 

97 input_refs={ 

98 "a": {"input_connection": [dataset_auto0_ref], "prereq_connection": [dataset_prereq0_ref]} 

99 }, 

100 dataset_id_modes={"dataset_auto2": DatasetIdGenEnum.DATAID_TYPE_RUN}, 

101 output_run="trivial_output_run", 

102 input_collections=general_qg.header.inputs, 

103 ) 

104 trivial_qg = trivial_builder.finish(attach_datastore_records=False).assemble() 

105 self.assertEqual(len(trivial_qg.quanta_by_task), 2) 

106 self.assertEqual(trivial_qg.quanta_by_task["a"].keys(), {a_data_id}) 

107 self.assertEqual(trivial_qg.quanta_by_task["b"].keys(), {b_data_id}) 

108 self.assertEqual(trivial_qg.datasets_by_type["dataset_prereq0"].keys(), {prereq_data_id}) 

109 self.assertEqual( 

110 trivial_qg.datasets_by_type["dataset_prereq0"][prereq_data_id], 

111 general_qg.datasets_by_type["dataset_prereq0"][prereq_data_id], 

112 ) 

113 self.assertEqual(trivial_qg.datasets_by_type["dataset_auto0"].keys(), {a_data_id}) 

114 self.assertEqual( 

115 trivial_qg.datasets_by_type["dataset_auto0"][a_data_id], 

116 general_qg.datasets_by_type["dataset_auto0"][a_data_id], 

117 ) 

118 self.assertEqual(trivial_qg.datasets_by_type["dataset_extra1"].keys(), {b_data_id}) 

119 self.assertEqual( 

120 trivial_qg.datasets_by_type["dataset_extra1"][b_data_id], 

121 general_qg.datasets_by_type["dataset_extra1"][b_data_id], 

122 ) 

123 self.assertEqual(trivial_qg.datasets_by_type["dataset_auto1"].keys(), {a_data_id}) 

124 self.assertNotEqual( 

125 trivial_qg.datasets_by_type["dataset_auto1"][a_data_id], 

126 general_qg.datasets_by_type["dataset_auto1"][a_data_id], 

127 ) 

128 self.assertEqual(trivial_qg.datasets_by_type["dataset_auto2"].keys(), {b_data_id}) 

129 self.assertNotEqual( 

130 trivial_qg.datasets_by_type["dataset_auto2"][b_data_id], 

131 general_qg.datasets_by_type["dataset_auto2"][b_data_id], 

132 ) 

133 self.assertEqual( 

134 trivial_qg.datasets_by_type["dataset_auto2"][b_data_id], 

135 DatasetRef( 

136 helper.pipeline_graph.dataset_types["dataset_auto2"].dataset_type, 

137 b_data_id, 

138 run="trivial_output_run", 

139 id_generation_mode=DatasetIdGenEnum.DATAID_TYPE_RUN, 

140 ).id, 

141 ) 

142 qo_xg = trivial_qg.quantum_only_xgraph 

143 self.assertEqual(len(qo_xg.nodes), 2) 

144 self.assertEqual(len(qo_xg.edges), 1) 

145 bp_xg = trivial_qg.bipartite_xgraph 

146 self.assertEqual( 

147 set(bp_xg.predecessors(trivial_qg.quanta_by_task["a"][a_data_id])), 

148 set(trivial_qg.datasets_by_type["dataset_auto0"].values()) 

149 | set(trivial_qg.datasets_by_type["dataset_prereq0"].values()), 

150 ) 

151 self.assertEqual( 

152 set(bp_xg.successors(trivial_qg.quanta_by_task["a"][a_data_id])), 

153 set(trivial_qg.datasets_by_type["dataset_auto1"].values()) 

154 | set(trivial_qg.datasets_by_type["a_metadata"].values()) 

155 | set(trivial_qg.datasets_by_type["a_log"].values()), 

156 ) 

157 self.assertEqual( 

158 set(bp_xg.predecessors(trivial_qg.quanta_by_task["b"][b_data_id])), 

159 set(trivial_qg.datasets_by_type["dataset_auto1"].values()) 

160 | set(trivial_qg.datasets_by_type["dataset_extra1"].values()), 

161 ) 

162 self.assertEqual( 

163 set(bp_xg.successors(trivial_qg.quanta_by_task["b"][b_data_id])), 

164 set(trivial_qg.datasets_by_type["dataset_auto2"].values()) 

165 | set(trivial_qg.datasets_by_type["b_metadata"].values()) 

166 | set(trivial_qg.datasets_by_type["b_log"].values()), 

167 ) 

168 

169 

170if __name__ == "__main__": 

171 unittest.main()