Coverage for python / lsst / pipe / base / trivial_quantum_graph_builder.py: 16%
64 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 08:57 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 08:57 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30__all__ = "TrivialQuantumGraphBuilder"
32from collections.abc import Mapping, Sequence
33from typing import TYPE_CHECKING, Any, final
35from lsst.daf.butler import Butler, DataCoordinate, DatasetIdGenEnum, DatasetRef, DimensionGroup
36from lsst.utils.timer import timeMethod
38from .quantum_graph_builder import QuantumGraphBuilder
39from .quantum_graph_skeleton import QuantumGraphSkeleton
41if TYPE_CHECKING:
42 from .pipeline_graph import PipelineGraph
45@final
46class TrivialQuantumGraphBuilder(QuantumGraphBuilder):
47 """An optimized quantum-graph builder for pipelines that operate on only
48 a single data ID or a closely related set of data IDs.
50 Parameters
51 ----------
52 pipeline_graph
53 Pipeline to build a quantum graph from, as a graph. Will be resolved
54 in-place with the given butler (any existing resolution is ignored).
55 butler
56 Client for the data repository. Should be read-only.
57 data_ids
58 Mapping from dimension group to the data ID to use for that dimension
59 group. This is intended to allow the pipeline to switch between
60 effectively-equivalent dimensions (e.g. ``group``, ``visit``
61 ``exposure``).
62 input_refs
63 References for input datasets, keyed by task label and then connection
64 name. This should include all regular overall-input datasets whose
65 data IDs are not included in ``data_ids``. It may (but need not)
66 include prerequisite inputs. Existing intermediate datasets should
67 also be provided when they need to be clobbered or used in skip logic.
68 dataset_id_modes
69 Mapping from dataset type name to the ID generation mode for that
70 dataset type. They default is to generate random UUIDs.
71 **kwargs
72 Forwarded to the base `.quantum_graph_builder.QuantumGraphBuilder`.
74 Notes
75 -----
76 If ``dataset_id_modes`` is provided, ``clobber=True`` will be passed to
77 the base builder's constructor, as is this is necessary to avoid spurious
78 errors about the affected datasets already existing. The only effect of
79 this to silence *other* errors about datasets in the output run existing
80 unexpectedly.
81 """
83 def __init__(
84 self,
85 pipeline_graph: PipelineGraph,
86 butler: Butler,
87 *,
88 data_ids: Mapping[DimensionGroup, DataCoordinate],
89 input_refs: Mapping[str, Mapping[str, Sequence[DatasetRef]]] | None = None,
90 dataset_id_modes: Mapping[str, DatasetIdGenEnum] | None = None,
91 **kwargs: Any,
92 ) -> None:
93 super().__init__(pipeline_graph, butler, **kwargs)
94 if dataset_id_modes:
95 self.clobber = True
96 self.data_ids = dict(data_ids)
97 self.data_ids[self.empty_data_id.dimensions] = self.empty_data_id
98 self.input_refs = input_refs or {}
99 self.dataset_id_modes = dataset_id_modes or {}
101 def _get_data_id(self, dimensions: DimensionGroup, context: str) -> DataCoordinate:
102 try:
103 return self.data_ids[dimensions]
104 except KeyError as e:
105 e.add_note(context)
106 raise
108 @timeMethod
109 def process_subgraph(self, subgraph: PipelineGraph) -> QuantumGraphSkeleton:
110 skeleton = QuantumGraphSkeleton(subgraph.tasks)
111 for task_node in subgraph.tasks.values():
112 quantum_key = skeleton.add_quantum_node(
113 task_node.label, self._get_data_id(task_node.dimensions, context=f"task {task_node.label!r}")
114 )
115 input_refs_for_task = self.input_refs.get(task_node.label, {})
117 for read_edge in task_node.iter_all_inputs():
118 if (input_refs := input_refs_for_task.get(read_edge.connection_name)) is not None:
119 for input_ref in input_refs:
120 if read_edge.is_prerequisite:
121 prereq_key = skeleton.add_prerequisite_node(input_ref)
122 skeleton.add_input_edge(quantum_key, prereq_key)
123 self.log.info(
124 f"Added prereq {task_node.label}.{read_edge.connection_name} "
125 f"for {input_ref.dataId} from input_refs"
126 )
127 else:
128 input_key = skeleton.add_dataset_node(
129 read_edge.parent_dataset_type_name,
130 input_ref.dataId,
131 ref=input_ref,
132 )
133 skeleton.add_input_edge(quantum_key, input_key)
134 self.log.info(
135 f"Added regular input {task_node.label}.{read_edge.connection_name} "
136 f"for {input_ref.dataId} from input_refs"
137 )
139 if read_edge.is_prerequisite:
140 continue
141 dataset_type_node = subgraph.dataset_types[read_edge.parent_dataset_type_name]
142 data_id = self._get_data_id(
143 dataset_type_node.dimensions,
144 context=f"input {task_node.label}.{read_edge.connection_name}",
145 )
146 input_key = skeleton.add_dataset_node(
147 read_edge.parent_dataset_type_name,
148 data_id,
149 )
150 skeleton.add_input_edge(quantum_key, input_key)
151 if subgraph.producer_of(read_edge.parent_dataset_type_name) is None:
152 if skeleton.get_dataset_ref(input_key) is None:
153 ref = self.butler.find_dataset(dataset_type_node.dataset_type, data_id)
154 if ref is not None:
155 skeleton.set_dataset_ref(ref)
156 self.log.info(
157 f"Added regular input {task_node.label}.{read_edge.connection_name} for {data_id}"
158 )
160 for write_edge in task_node.iter_all_outputs():
161 dataset_type_node = subgraph.dataset_types[write_edge.parent_dataset_type_name]
162 data_id = self._get_data_id(
163 dataset_type_node.dimensions,
164 context=f"output {task_node.label}.{write_edge.connection_name}",
165 )
166 output_key = skeleton.add_dataset_node(write_edge.parent_dataset_type_name, data_id)
167 skeleton.add_output_edge(quantum_key, output_key)
168 self.log.info(f"Added output {task_node.label}.{write_edge.connection_name} for {data_id}")
169 if mode := self.dataset_id_modes.get(write_edge.parent_dataset_type_name):
170 ref = DatasetRef(
171 dataset_type_node.dataset_type,
172 data_id,
173 run=self.output_run,
174 id_generation_mode=mode,
175 )
176 skeleton.set_dataset_ref(ref)
177 skeleton.set_output_in_the_way(ref)
178 self.log.info(
179 f"Added ref for output {task_node.label}.{write_edge.connection_name} for "
180 f"{data_id} with {mode=}"
181 )
183 return skeleton