Coverage for python / lsst / ctrl / bps / pre_transform.py: 18%

93 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-06 08:35 +0000

1# This file is part of ctrl_bps. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <https://www.gnu.org/licenses/>. 

27 

28"""Driver to execute steps outside of BPS that need to be done first 

29including running QuantumGraph generation and reading the QuantumGraph 

30into memory. 

31""" 

32 

33import logging 

34import os 

35import shlex 

36import shutil 

37import subprocess 

38from pathlib import Path 

39 

40from lsst.ctrl.bps import BpsConfig, BpsSubprocessError 

41from lsst.pipe.base import QuantumGraph 

42from lsst.pipe.base.pipeline_graph import TaskImportMode 

43from lsst.pipe.base.quantum_graph import PredictedQuantumGraph 

44from lsst.resources import ResourcePath 

45from lsst.utils import doImport 

46from lsst.utils.logging import VERBOSE 

47from lsst.utils.timer import time_this, timeMethod 

48 

49_LOG = logging.getLogger(__name__) 

50 

51 

52@timeMethod(logger=_LOG, logLevel=VERBOSE) 

53def acquire_quantum_graph(config: BpsConfig, out_prefix: str = "") -> tuple[str, PredictedQuantumGraph]: 

54 """Read a quantum graph from a file or create one from scratch. 

55 

56 Parameters 

57 ---------- 

58 config : `lsst.ctrl.bps.BpsConfig` 

59 Configuration values for BPS. In particular, looking for qgraphFile. 

60 out_prefix : `str`, optional 

61 Output path for the QuantumGraph and stdout/stderr from generating 

62 the QuantumGraph. Default value is empty string. 

63 

64 Returns 

65 ------- 

66 qgraph_filename : `str` 

67 Name of file containing QuantumGraph that was read into qgraph. 

68 qgraph : `lsst.pipe.base.quantum_graph.PredictedQuantumGraph` 

69 A quantum graph read in from pre-generated file or one that is the 

70 result of running code that generates it. 

71 """ 

72 # Check to see if user provided pre-generated QuantumGraph. 

73 found, input_qgraph_filename = config.search("qgraphFile") 

74 if found and input_qgraph_filename: 

75 if out_prefix is not None: 

76 # Save a copy of the QuantumGraph file in out_prefix. 

77 _LOG.info("Copying quantum graph from '%s'", input_qgraph_filename) 

78 with time_this(log=_LOG, level=logging.INFO, prefix=None, msg="Completed copying quantum graph"): 

79 qgraph_filename = os.path.join(out_prefix, os.path.basename(input_qgraph_filename)) 

80 shutil.copy2(input_qgraph_filename, qgraph_filename) 

81 else: 

82 # Use QuantumGraph file in original given location. 

83 qgraph_filename = input_qgraph_filename 

84 

85 # Update the output run in the user provided quantum graph. 

86 if "finalJob" in config: 

87 update_quantum_graph(config, qgraph_filename, out_prefix) 

88 else: 

89 # Run command to create the QuantumGraph. 

90 _LOG.info("Creating quantum graph") 

91 with time_this(log=_LOG, level=logging.INFO, prefix=None, msg="Completed creating quantum graph"): 

92 qgraph_filename = create_quantum_graph(config, out_prefix) 

93 

94 _LOG.info("Reading quantum graph from '%s'", qgraph_filename) 

95 with time_this(log=_LOG, level=logging.INFO, prefix=None, msg="Completed reading quantum graph"): 

96 qgraph_path = ResourcePath(qgraph_filename) 

97 if qgraph_path.getExtension() == ".qg": 

98 with PredictedQuantumGraph.open(qgraph_path, import_mode=TaskImportMode.DO_NOT_IMPORT) as reader: 

99 reader.read_thin_graph() 

100 qgraph = reader.finish() 

101 elif qgraph_path.getExtension() == ".qgraph": 

102 qgraph = PredictedQuantumGraph.from_old_quantum_graph(QuantumGraph.loadUri(qgraph_path)) 

103 else: 

104 raise ValueError(f"Unrecognized extension for quantum graph file: {qgraph_filename}.") 

105 return qgraph_filename, qgraph 

106 

107 

108def execute(command: str, filename: str, write_buffering: int = 1) -> int: 

109 """Execute a command. 

110 

111 Parameters 

112 ---------- 

113 command : `str` 

114 String representing the command to execute. 

115 filename : `str` 

116 A file to which both stderr and stdout will be written to. 

117 write_buffering : `int`, optional 

118 Buffering policy passed to open for the stdout/stderr file. 

119 0 - not allowed here because writing in text mode. 

120 1 - line buffering (default). 

121 > 1 - size in bytes for a chunk buffer. 

122 

123 Returns 

124 ------- 

125 exit_code : `int` 

126 The exit code the command being executed finished with. 

127 

128 Raises 

129 ------ 

130 ValueError 

131 Raised if write_buffering is 0. 

132 """ 

133 buffer_size = 5000 

134 with open(filename, "w", write_buffering) as fh: 

135 print(command, file=fh) 

136 print("\n", file=fh) # Note: want a blank line 

137 process = subprocess.Popen( 

138 shlex.split(command), shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT 

139 ) 

140 assert process.stdout is not None # for mypy 

141 buffer = os.read(process.stdout.fileno(), buffer_size).decode() 

142 while process.poll is None or buffer: 

143 stripped_buffer = buffer.rstrip() 

144 print(stripped_buffer, file=fh) 

145 _LOG.info(stripped_buffer) 

146 buffer = os.read(process.stdout.fileno(), buffer_size).decode() 

147 process.stdout.close() 

148 process.wait() 

149 return process.returncode 

150 

151 

152def create_quantum_graph(config: BpsConfig, out_prefix: str = "") -> str: 

153 """Create QuantumGraph from pipeline definition. 

154 

155 Parameters 

156 ---------- 

157 config : `lsst.ctrl.bps.BpsConfig` 

158 BPS configuration. 

159 out_prefix : `str`, optional 

160 Path in which to output QuantumGraph as well as the stdout/stderr 

161 from generating the QuantumGraph. Defaults to empty string so 

162 code will write the QuantumGraph and stdout/stderr to the current 

163 directory. 

164 

165 Returns 

166 ------- 

167 qgraph_filename : `str` 

168 Name of file containing generated QuantumGraph. 

169 

170 Raises 

171 ------ 

172 KeyError 

173 Raised if the command for generating the QuantumGraph was not found 

174 in the provided configuration. 

175 BpsSubprocessError 

176 Raised if the command for generating the QuantumGraph failed. 

177 """ 

178 # Create name of file to store QuantumGraph. 

179 qgraph_filename = os.path.join(out_prefix, config["qgraphFileTemplate"]) 

180 

181 # Get QuantumGraph generation command. 

182 search_opt = {"curvals": {"qgraphFile": qgraph_filename}} 

183 found, cmd = config.search("createQuantumGraph", opt=search_opt) 

184 if not found: 

185 raise KeyError("command for generating QuantumGraph not found") 

186 _LOG.info(cmd) 

187 

188 # Run QuantumGraph generation. 

189 out = os.path.join(out_prefix, "quantumGraphGeneration.out") 

190 status = execute(cmd, out) 

191 if status != 0: 

192 raise BpsSubprocessError( 

193 status, 

194 f"Generating quantum graph failed with non-zero exit code ({status})\n" 

195 f"Check {out} for more details.", 

196 ) 

197 return qgraph_filename 

198 

199 

200def update_quantum_graph( 

201 config: BpsConfig, qgraph_filename: str, out_prefix: str = "", inplace: bool = False 

202) -> None: 

203 """Update output run in an existing quantum graph. 

204 

205 Parameters 

206 ---------- 

207 config : `BpsConfig` 

208 BPS configuration. 

209 qgraph_filename : `str` 

210 Name of file containing the quantum graph that needs to be updated. 

211 out_prefix : `str`, optional 

212 Path in which to output QuantumGraph as well as the stdout/stderr 

213 from generating the QuantumGraph. Defaults to empty string so 

214 code will write the QuantumGraph and stdout/stderr to the current 

215 directory. 

216 inplace : `bool`, optional 

217 If set to True, all updates of the graph will be done in place without 

218 creating a backup copy. Defaults to False. 

219 """ 

220 src_qgraph = Path(qgraph_filename) 

221 dest_qgraph = Path(qgraph_filename) 

222 

223 # If requested, create a backup copy of the quantum graph by adding 

224 # '_orig' suffix to its stem (the filename without the extension). 

225 if not inplace: 

226 _LOG.info("Backing up quantum graph from '%s'", qgraph_filename) 

227 src_qgraph = src_qgraph.parent / f"{src_qgraph.stem}_orig{src_qgraph.suffix}" 

228 with time_this(log=_LOG, level=logging.INFO, prefix=None, msg="Completed backing up quantum graph"): 

229 shutil.copy2(qgraph_filename, src_qgraph) 

230 

231 # Get the command for updating the quantum graph. 

232 search_opt = {"curvals": {"inputQgraphFile": str(src_qgraph), "qgraphFile": str(dest_qgraph)}} 

233 found, cmd = config.search("updateQuantumGraph", opt=search_opt) 

234 if not found: 

235 raise KeyError("command for updating quantum graph not found") 

236 _LOG.info(cmd) 

237 

238 # Run the command to update the quantum graph. 

239 out = os.path.join(out_prefix, "quantumGraphUpdate.out") 

240 status = execute(cmd, out) 

241 if status != 0: 

242 raise BpsSubprocessError( 

243 status, 

244 f"Updating quantum graph failed with non-zero exit code ({status})\n" 

245 f"Check {out} for more details.", 

246 ) 

247 

248 

249def cluster_quanta(config, qgraph, name): 

250 """Call specified function to group quanta into clusters to be run 

251 together. 

252 

253 Parameters 

254 ---------- 

255 config : `lsst.ctrl.bps.BpsConfig` 

256 BPS configuration. 

257 qgraph : `lsst.pipe.base.quantum_graph.PredictedQuantumGraph` 

258 Original full quantum graph for the run. 

259 name : `str` 

260 Name for the ClusteredQuantumGraph that will be generated. 

261 

262 Returns 

263 ------- 

264 cqgraph : `lsst.ctrl.bps.ClusteredQuantumGraph` 

265 Generated ClusteredQuantumGraph. 

266 

267 Raises 

268 ------ 

269 RuntimeError 

270 If asked to validate and generated ClusteredQuantumGraph fails a test. 

271 """ 

272 cluster_func = doImport(config["clusterAlgorithm"]) 

273 cqgraph = cluster_func(config, qgraph, name) 

274 _, validate = config.search("validateClusteredQgraph", opt={"default": False}) 

275 if validate: 

276 cqgraph.validate() 

277 return cqgraph