Coverage for python / lsst / ctrl / bps / pre_transform.py: 18%
93 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:22 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:22 +0000
1# This file is part of ctrl_bps.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <https://www.gnu.org/licenses/>.
28"""Driver to execute steps outside of BPS that need to be done first
29including running QuantumGraph generation and reading the QuantumGraph
30into memory.
31"""
33import logging
34import os
35import shlex
36import shutil
37import subprocess
38from pathlib import Path
40from lsst.ctrl.bps import BpsConfig, BpsSubprocessError
41from lsst.pipe.base import QuantumGraph
42from lsst.pipe.base.pipeline_graph import TaskImportMode
43from lsst.pipe.base.quantum_graph import PredictedQuantumGraph
44from lsst.resources import ResourcePath
45from lsst.utils import doImport
46from lsst.utils.logging import VERBOSE
47from lsst.utils.timer import time_this, timeMethod
49_LOG = logging.getLogger(__name__)
52@timeMethod(logger=_LOG, logLevel=VERBOSE)
53def acquire_quantum_graph(config: BpsConfig, out_prefix: str = "") -> tuple[str, PredictedQuantumGraph]:
54 """Read a quantum graph from a file or create one from scratch.
56 Parameters
57 ----------
58 config : `lsst.ctrl.bps.BpsConfig`
59 Configuration values for BPS. In particular, looking for qgraphFile.
60 out_prefix : `str`, optional
61 Output path for the QuantumGraph and stdout/stderr from generating
62 the QuantumGraph. Default value is empty string.
64 Returns
65 -------
66 qgraph_filename : `str`
67 Name of file containing QuantumGraph that was read into qgraph.
68 qgraph : `lsst.pipe.base.quantum_graph.PredictedQuantumGraph`
69 A quantum graph read in from pre-generated file or one that is the
70 result of running code that generates it.
71 """
72 # Check to see if user provided pre-generated QuantumGraph.
73 found, input_qgraph_filename = config.search("qgraphFile")
74 if found and input_qgraph_filename:
75 if out_prefix is not None:
76 # Save a copy of the QuantumGraph file in out_prefix.
77 _LOG.info("Copying quantum graph from '%s'", input_qgraph_filename)
78 with time_this(log=_LOG, level=logging.INFO, prefix=None, msg="Completed copying quantum graph"):
79 qgraph_filename = os.path.join(out_prefix, os.path.basename(input_qgraph_filename))
80 shutil.copy2(input_qgraph_filename, qgraph_filename)
81 else:
82 # Use QuantumGraph file in original given location.
83 qgraph_filename = input_qgraph_filename
85 # Update the output run in the user provided quantum graph.
86 if "finalJob" in config:
87 update_quantum_graph(config, qgraph_filename, out_prefix)
88 else:
89 # Run command to create the QuantumGraph.
90 _LOG.info("Creating quantum graph")
91 with time_this(log=_LOG, level=logging.INFO, prefix=None, msg="Completed creating quantum graph"):
92 qgraph_filename = create_quantum_graph(config, out_prefix)
94 _LOG.info("Reading quantum graph from '%s'", qgraph_filename)
95 with time_this(log=_LOG, level=logging.INFO, prefix=None, msg="Completed reading quantum graph"):
96 qgraph_path = ResourcePath(qgraph_filename)
97 if qgraph_path.getExtension() == ".qg":
98 with PredictedQuantumGraph.open(qgraph_path, import_mode=TaskImportMode.DO_NOT_IMPORT) as reader:
99 reader.read_thin_graph()
100 qgraph = reader.finish()
101 elif qgraph_path.getExtension() == ".qgraph":
102 qgraph = PredictedQuantumGraph.from_old_quantum_graph(QuantumGraph.loadUri(qgraph_path))
103 else:
104 raise ValueError(f"Unrecognized extension for quantum graph file: {qgraph_filename}.")
105 return qgraph_filename, qgraph
108def execute(command: str, filename: str, write_buffering: int = 1) -> int:
109 """Execute a command.
111 Parameters
112 ----------
113 command : `str`
114 String representing the command to execute.
115 filename : `str`
116 A file to which both stderr and stdout will be written to.
117 write_buffering : `int`, optional
118 Buffering policy passed to open for the stdout/stderr file.
119 0 - not allowed here because writing in text mode.
120 1 - line buffering (default).
121 > 1 - size in bytes for a chunk buffer.
123 Returns
124 -------
125 exit_code : `int`
126 The exit code the command being executed finished with.
128 Raises
129 ------
130 ValueError
131 Raised if write_buffering is 0.
132 """
133 buffer_size = 5000
134 with open(filename, "w", write_buffering) as fh:
135 print(command, file=fh)
136 print("\n", file=fh) # Note: want a blank line
137 process = subprocess.Popen(
138 shlex.split(command), shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
139 )
140 assert process.stdout is not None # for mypy
141 buffer = os.read(process.stdout.fileno(), buffer_size).decode()
142 while process.poll is None or buffer:
143 stripped_buffer = buffer.rstrip()
144 print(stripped_buffer, file=fh)
145 _LOG.info(stripped_buffer)
146 buffer = os.read(process.stdout.fileno(), buffer_size).decode()
147 process.stdout.close()
148 process.wait()
149 return process.returncode
152def create_quantum_graph(config: BpsConfig, out_prefix: str = "") -> str:
153 """Create QuantumGraph from pipeline definition.
155 Parameters
156 ----------
157 config : `lsst.ctrl.bps.BpsConfig`
158 BPS configuration.
159 out_prefix : `str`, optional
160 Path in which to output QuantumGraph as well as the stdout/stderr
161 from generating the QuantumGraph. Defaults to empty string so
162 code will write the QuantumGraph and stdout/stderr to the current
163 directory.
165 Returns
166 -------
167 qgraph_filename : `str`
168 Name of file containing generated QuantumGraph.
170 Raises
171 ------
172 KeyError
173 Raised if the command for generating the QuantumGraph was not found
174 in the provided configuration.
175 BpsSubprocessError
176 Raised if the command for generating the QuantumGraph failed.
177 """
178 # Create name of file to store QuantumGraph.
179 qgraph_filename = os.path.join(out_prefix, config["qgraphFileTemplate"])
181 # Get QuantumGraph generation command.
182 search_opt = {"curvals": {"qgraphFile": qgraph_filename}}
183 found, cmd = config.search("createQuantumGraph", opt=search_opt)
184 if not found:
185 raise KeyError("command for generating QuantumGraph not found")
186 _LOG.info(cmd)
188 # Run QuantumGraph generation.
189 out = os.path.join(out_prefix, "quantumGraphGeneration.out")
190 status = execute(cmd, out)
191 if status != 0:
192 raise BpsSubprocessError(
193 status,
194 f"Generating quantum graph failed with non-zero exit code ({status})\n"
195 f"Check {out} for more details.",
196 )
197 return qgraph_filename
200def update_quantum_graph(
201 config: BpsConfig, qgraph_filename: str, out_prefix: str = "", inplace: bool = False
202) -> None:
203 """Update output run in an existing quantum graph.
205 Parameters
206 ----------
207 config : `BpsConfig`
208 BPS configuration.
209 qgraph_filename : `str`
210 Name of file containing the quantum graph that needs to be updated.
211 out_prefix : `str`, optional
212 Path in which to output QuantumGraph as well as the stdout/stderr
213 from generating the QuantumGraph. Defaults to empty string so
214 code will write the QuantumGraph and stdout/stderr to the current
215 directory.
216 inplace : `bool`, optional
217 If set to True, all updates of the graph will be done in place without
218 creating a backup copy. Defaults to False.
219 """
220 src_qgraph = Path(qgraph_filename)
221 dest_qgraph = Path(qgraph_filename)
223 # If requested, create a backup copy of the quantum graph by adding
224 # '_orig' suffix to its stem (the filename without the extension).
225 if not inplace:
226 _LOG.info("Backing up quantum graph from '%s'", qgraph_filename)
227 src_qgraph = src_qgraph.parent / f"{src_qgraph.stem}_orig{src_qgraph.suffix}"
228 with time_this(log=_LOG, level=logging.INFO, prefix=None, msg="Completed backing up quantum graph"):
229 shutil.copy2(qgraph_filename, src_qgraph)
231 # Get the command for updating the quantum graph.
232 search_opt = {"curvals": {"inputQgraphFile": str(src_qgraph), "qgraphFile": str(dest_qgraph)}}
233 found, cmd = config.search("updateQuantumGraph", opt=search_opt)
234 if not found:
235 raise KeyError("command for updating quantum graph not found")
236 _LOG.info(cmd)
238 # Run the command to update the quantum graph.
239 out = os.path.join(out_prefix, "quantumGraphUpdate.out")
240 status = execute(cmd, out)
241 if status != 0:
242 raise BpsSubprocessError(
243 status,
244 f"Updating quantum graph failed with non-zero exit code ({status})\n"
245 f"Check {out} for more details.",
246 )
249def cluster_quanta(config, qgraph, name):
250 """Call specified function to group quanta into clusters to be run
251 together.
253 Parameters
254 ----------
255 config : `lsst.ctrl.bps.BpsConfig`
256 BPS configuration.
257 qgraph : `lsst.pipe.base.quantum_graph.PredictedQuantumGraph`
258 Original full quantum graph for the run.
259 name : `str`
260 Name for the ClusteredQuantumGraph that will be generated.
262 Returns
263 -------
264 cqgraph : `lsst.ctrl.bps.ClusteredQuantumGraph`
265 Generated ClusteredQuantumGraph.
267 Raises
268 ------
269 RuntimeError
270 If asked to validate and generated ClusteredQuantumGraph fails a test.
271 """
272 cluster_func = doImport(config["clusterAlgorithm"])
273 cqgraph = cluster_func(config, qgraph, name)
274 _, validate = config.search("validateClusteredQgraph", opt={"default": False})
275 if validate:
276 cqgraph.validate()
277 return cqgraph