Coverage for python / lsst / pipe / base / trivial_quantum_graph_builder.py: 16%

64 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:47 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30__all__ = "TrivialQuantumGraphBuilder" 

31 

32from collections.abc import Mapping, Sequence 

33from typing import TYPE_CHECKING, Any, final 

34 

35from lsst.daf.butler import Butler, DataCoordinate, DatasetIdGenEnum, DatasetRef, DimensionGroup 

36from lsst.utils.timer import timeMethod 

37 

38from .quantum_graph_builder import QuantumGraphBuilder 

39from .quantum_graph_skeleton import QuantumGraphSkeleton 

40 

41if TYPE_CHECKING: 

42 from .pipeline_graph import PipelineGraph 

43 

44 

45@final 

46class TrivialQuantumGraphBuilder(QuantumGraphBuilder): 

47 """An optimized quantum-graph builder for pipelines that operate on only 

48 a single data ID or a closely related set of data IDs. 

49 

50 Parameters 

51 ---------- 

52 pipeline_graph 

53 Pipeline to build a quantum graph from, as a graph. Will be resolved 

54 in-place with the given butler (any existing resolution is ignored). 

55 butler 

56 Client for the data repository. Should be read-only. 

57 data_ids 

58 Mapping from dimension group to the data ID to use for that dimension 

59 group. This is intended to allow the pipeline to switch between 

60 effectively-equivalent dimensions (e.g. ``group``, ``visit`` 

61 ``exposure``). 

62 input_refs 

63 References for input datasets, keyed by task label and then connection 

64 name. This should include all regular overall-input datasets whose 

65 data IDs are not included in ``data_ids``. It may (but need not) 

66 include prerequisite inputs. Existing intermediate datasets should 

67 also be provided when they need to be clobbered or used in skip logic. 

68 dataset_id_modes 

69 Mapping from dataset type name to the ID generation mode for that 

70 dataset type. They default is to generate random UUIDs. 

71 **kwargs 

72 Forwarded to the base `.quantum_graph_builder.QuantumGraphBuilder`. 

73 

74 Notes 

75 ----- 

76 If ``dataset_id_modes`` is provided, ``clobber=True`` will be passed to 

77 the base builder's constructor, as is this is necessary to avoid spurious 

78 errors about the affected datasets already existing. The only effect of 

79 this to silence *other* errors about datasets in the output run existing 

80 unexpectedly. 

81 """ 

82 

83 def __init__( 

84 self, 

85 pipeline_graph: PipelineGraph, 

86 butler: Butler, 

87 *, 

88 data_ids: Mapping[DimensionGroup, DataCoordinate], 

89 input_refs: Mapping[str, Mapping[str, Sequence[DatasetRef]]] | None = None, 

90 dataset_id_modes: Mapping[str, DatasetIdGenEnum] | None = None, 

91 **kwargs: Any, 

92 ) -> None: 

93 super().__init__(pipeline_graph, butler, **kwargs) 

94 if dataset_id_modes: 

95 self.clobber = True 

96 self.data_ids = dict(data_ids) 

97 self.data_ids[self.empty_data_id.dimensions] = self.empty_data_id 

98 self.input_refs = input_refs or {} 

99 self.dataset_id_modes = dataset_id_modes or {} 

100 

101 def _get_data_id(self, dimensions: DimensionGroup, context: str) -> DataCoordinate: 

102 try: 

103 return self.data_ids[dimensions] 

104 except KeyError as e: 

105 e.add_note(context) 

106 raise 

107 

108 @timeMethod 

109 def process_subgraph(self, subgraph: PipelineGraph) -> QuantumGraphSkeleton: 

110 skeleton = QuantumGraphSkeleton(subgraph.tasks) 

111 for task_node in subgraph.tasks.values(): 

112 quantum_key = skeleton.add_quantum_node( 

113 task_node.label, self._get_data_id(task_node.dimensions, context=f"task {task_node.label!r}") 

114 ) 

115 input_refs_for_task = self.input_refs.get(task_node.label, {}) 

116 

117 for read_edge in task_node.iter_all_inputs(): 

118 if (input_refs := input_refs_for_task.get(read_edge.connection_name)) is not None: 

119 for input_ref in input_refs: 

120 if read_edge.is_prerequisite: 

121 prereq_key = skeleton.add_prerequisite_node(input_ref) 

122 skeleton.add_input_edge(quantum_key, prereq_key) 

123 self.log.info( 

124 f"Added prereq {task_node.label}.{read_edge.connection_name} " 

125 f"for {input_ref.dataId} from input_refs" 

126 ) 

127 else: 

128 input_key = skeleton.add_dataset_node( 

129 read_edge.parent_dataset_type_name, 

130 input_ref.dataId, 

131 ref=input_ref, 

132 ) 

133 skeleton.add_input_edge(quantum_key, input_key) 

134 self.log.info( 

135 f"Added regular input {task_node.label}.{read_edge.connection_name} " 

136 f"for {input_ref.dataId} from input_refs" 

137 ) 

138 

139 if read_edge.is_prerequisite: 

140 continue 

141 dataset_type_node = subgraph.dataset_types[read_edge.parent_dataset_type_name] 

142 data_id = self._get_data_id( 

143 dataset_type_node.dimensions, 

144 context=f"input {task_node.label}.{read_edge.connection_name}", 

145 ) 

146 input_key = skeleton.add_dataset_node( 

147 read_edge.parent_dataset_type_name, 

148 data_id, 

149 ) 

150 skeleton.add_input_edge(quantum_key, input_key) 

151 if subgraph.producer_of(read_edge.parent_dataset_type_name) is None: 

152 if skeleton.get_dataset_ref(input_key) is None: 

153 ref = self.butler.find_dataset(dataset_type_node.dataset_type, data_id) 

154 if ref is not None: 

155 skeleton.set_dataset_ref(ref) 

156 self.log.info( 

157 f"Added regular input {task_node.label}.{read_edge.connection_name} for {data_id}" 

158 ) 

159 

160 for write_edge in task_node.iter_all_outputs(): 

161 dataset_type_node = subgraph.dataset_types[write_edge.parent_dataset_type_name] 

162 data_id = self._get_data_id( 

163 dataset_type_node.dimensions, 

164 context=f"output {task_node.label}.{write_edge.connection_name}", 

165 ) 

166 output_key = skeleton.add_dataset_node(write_edge.parent_dataset_type_name, data_id) 

167 skeleton.add_output_edge(quantum_key, output_key) 

168 self.log.info(f"Added output {task_node.label}.{write_edge.connection_name} for {data_id}") 

169 if mode := self.dataset_id_modes.get(write_edge.parent_dataset_type_name): 

170 ref = DatasetRef( 

171 dataset_type_node.dataset_type, 

172 data_id, 

173 run=self.output_run, 

174 id_generation_mode=mode, 

175 ) 

176 skeleton.set_dataset_ref(ref) 

177 skeleton.set_output_in_the_way(ref) 

178 self.log.info( 

179 f"Added ref for output {task_node.label}.{write_edge.connection_name} for " 

180 f"{data_id} with {mode=}" 

181 ) 

182 

183 return skeleton