Coverage for python / lsst / pipe / base / script / transfer_from_graph.py: 17%
47 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-24 08:18 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-24 08:18 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28__all__ = ["transfer_from_graph"]
30import math
32from lsst.daf.butler import Butler, CollectionType, MissingCollectionError, QuantumBackedButler
33from lsst.pipe.base import QuantumGraph
34from lsst.utils.iteration import chunk_iterable
35from lsst.utils.logging import getLogger
37from .utils import filter_by_dataset_type_glob, filter_by_existence
39_LOG = getLogger(__name__)
42def transfer_from_graph(
43 graph: str,
44 dest: str,
45 register_dataset_types: bool,
46 transfer_dimensions: bool,
47 update_output_chain: bool,
48 dry_run: bool,
49 dataset_type: tuple[str, ...],
50) -> int:
51 """Transfer output datasets from quantum graph to dest.
53 Parameters
54 ----------
55 graph : `str`
56 URI string of the quantum graph.
57 dest : `str`
58 URI string of the destination Butler repo.
59 register_dataset_types : `bool`
60 Indicate whether missing dataset types should be registered.
61 transfer_dimensions : `bool`
62 Indicate whether dimensions should be transferred along with datasets.
63 It can be more efficient to disable this if it is known that all
64 dimensions exist.
65 update_output_chain : `bool`
66 If quantum graph metadata includes output run name and output
67 collection which is a chain, update the chain definition to include run
68 name as a the first collection in the chain.
69 dry_run : `bool`
70 Run the transfer without updating the destination butler.
71 dataset_type : `tuple` of `str`
72 Dataset type names. An empty tuple implies all dataset types.
73 Can include globs.
75 Returns
76 -------
77 count : `int`
78 Actual count of transferred datasets.
79 """
80 # Read whole graph into memory
81 qgraph = QuantumGraph.loadUri(graph)
83 output_refs, _ = qgraph.get_refs(include_outputs=True, include_init_outputs=True, conform_outputs=True)
85 # Get data repository dataset type definitions from the QuantumGraph.
86 dataset_types = {dstype.name: dstype for dstype in qgraph.registryDatasetTypes()}
88 # Filter the refs based on requested dataset types.
89 filtered_refs = filter_by_dataset_type_glob(output_refs, dataset_type)
90 _LOG.verbose("After filtering by dataset_type, number of datasets to transfer: %d", len(filtered_refs))
92 # Make QBB, its config is the same as output Butler.
93 with (
94 QuantumBackedButler.from_predicted(
95 config=dest,
96 predicted_inputs=[ref.id for ref in output_refs],
97 predicted_outputs=[],
98 dimensions=qgraph.universe,
99 datastore_records={},
100 dataset_types=dataset_types,
101 ) as qbb,
102 Butler.from_config(dest, writeable=True) as dest_butler,
103 ):
104 # For faster restarts, filter out those the destination already knows.
105 filtered_refs = filter_by_existence(dest_butler, filtered_refs)
107 # Transfer in chunks
108 chunk_size = 50_000
109 n_chunks = math.ceil(len(filtered_refs) / chunk_size)
110 chunk_num = 0
111 count = 0
112 for chunk in chunk_iterable(filtered_refs, chunk_size=chunk_size):
113 chunk_num += 1
114 if n_chunks > 1:
115 _LOG.verbose("Transferring %d datasets in chunk %d/%d", len(chunk), chunk_num, n_chunks)
116 transferred = dest_butler.transfer_from(
117 qbb,
118 chunk,
119 transfer="auto",
120 register_dataset_types=register_dataset_types,
121 transfer_dimensions=transfer_dimensions,
122 dry_run=dry_run,
123 )
124 count += len(transferred)
126 # If asked to do so, update output chain definition.
127 if update_output_chain and (metadata := qgraph.metadata) is not None:
128 # These are defined in CmdLineFwk.
129 output_run = metadata.get("output_run")
130 output = metadata.get("output")
131 input = metadata.get("input")
132 if output_run is not None and output is not None:
133 _update_chain(dest_butler, output, output_run, input)
135 return count
138def _update_chain(butler: Butler, output_chain: str, output_run: str, inputs: list[str] | None) -> None:
139 """Update chain definition if it exists to include run as the first item
140 in a chain. If it does not exist then create it to include all inputs and
141 output.
143 Parameters
144 ----------
145 butler : `lsst.daf.butler.Butler`
146 Butler where to update the collection chain.
147 output_chain : `str`
148 Name of the output CHAINED collection.
149 output_run : `str`
150 Name of the output RUN collection.
151 inputs : `list` [`str`] | None
152 All the input collections to be included in the chain if the
153 chain is created.
154 """
155 # Do not need to update chain if output_run does not already exist.
156 try:
157 _ = butler.collections.get_info(output_run)
158 except MissingCollectionError:
159 _LOG.verbose(
160 "Output RUN collection (%s) does not exist. Skipping updating the output chain.",
161 output_run,
162 )
163 return
165 # Make chain collection if doesn't exist before calling prepend_chain.
166 created_now = butler.collections.register(output_chain, CollectionType.CHAINED)
167 if created_now:
168 _LOG.verbose("Registered chain collection: %s", output_chain)
169 if inputs:
170 # First must flatten any input chains
171 flattened = butler.collections.query(inputs, flatten_chains=True)
173 # Add input collections to chain collection just made. Using
174 # extend instead of prepend in case of race condition where another
175 # execution adds a run before this adds the inputs to the chain.
176 butler.collections.extend_chain(output_chain, flattened)
177 _LOG.verbose(
178 "Prepending output chain collection (%s) with output RUN collection (%s)", output_chain, output_run
179 )
180 butler.collections.prepend_chain(output_chain, output_run)