Coverage for python / lsst / ctrl / mpexec / cli / script / run_qbb.py: 33%

65 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:48 +0000

1# This file is part of ctrl_mpexec. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30import pickle 

31import uuid 

32from collections.abc import Mapping 

33from typing import Literal 

34 

35import astropy.units as u 

36 

37import lsst.utils.timer 

38from lsst.daf.butler import ( 

39 DatasetType, 

40 DimensionConfig, 

41 DimensionUniverse, 

42 LimitedButler, 

43 Quantum, 

44 QuantumBackedButler, 

45) 

46from lsst.pipe.base import ExecutionResources, TaskFactory 

47from lsst.pipe.base.mp_graph_executor import MPGraphExecutor 

48from lsst.pipe.base.quantum_graph import PredictedQuantumGraph 

49from lsst.pipe.base.single_quantum_executor import SingleQuantumExecutor 

50from lsst.resources import ResourcePath, ResourcePathExpression 

51from lsst.utils.logging import VERBOSE, getLogger 

52from lsst.utils.threads import disable_implicit_threading 

53 

54from ..butler_factory import ButlerFactory 

55from ..utils import MP_TIMEOUT, summarize_quantum_graph 

56 

57_LOG = getLogger(__name__) 

58 

59 

60def run_qbb( 

61 *, 

62 task_factory: TaskFactory | None = None, 

63 butler_config: ResourcePathExpression, 

64 qgraph: ResourcePathExpression, 

65 config_search_path: list[str] | None, 

66 qgraph_id: str | None, 

67 qgraph_node_id: list[str | uuid.UUID] | None, 

68 processes: int, 

69 pdb: str | None, 

70 profile: str | None, 

71 debug: bool, 

72 start_method: Literal["spawn", "forkserver"] | None, 

73 timeout: int | None, 

74 fail_fast: bool, 

75 summary: ResourcePathExpression | None, 

76 enable_implicit_threading: bool, 

77 cores_per_quantum: int, 

78 memory_per_quantum: str, 

79 raise_on_partial_outputs: bool, 

80 no_existing_outputs: bool, 

81 **kwargs: object, 

82) -> None: 

83 """Implement the command line interface ``pipetask run-qbb`` subcommand. 

84 

85 Should only be called by command line tools and unit test code that tests 

86 this function. 

87 

88 Parameters 

89 ---------- 

90 task_factory : `lsst.pipe.base.TaskFactory`, optional 

91 A custom task factory to use. 

92 butler_config : `str` 

93 The path location of the gen3 butler/registry config file. 

94 qgraph : `str` 

95 URI location for a serialized quantum graph definition. 

96 config_search_path : `list` [`str`] 

97 Additional search paths for butler configuration. 

98 qgraph_id : `str` or `None` 

99 Quantum graph identifier, if specified must match the identifier of the 

100 graph loaded from a file. Ignored if graph is not loaded from a file. 

101 qgraph_node_id : `iterable` of `int`, or `None` 

102 Only load a specified set of nodes if graph is loaded from a file, 

103 nodes are identified by integer IDs. 

104 processes : `int` 

105 The number of processes to use. 

106 pdb : `str` or `None` 

107 Debugger to launch for exceptions. 

108 profile : `str` 

109 File name to dump cProfile information to. 

110 debug : `bool` 

111 If true, enable debugging output using lsstDebug facility (imports 

112 debug.py). 

113 start_method : `str` or `None` 

114 Start method from `multiprocessing` module, `None` selects the best 

115 one for current platform. 

116 timeout : `int` 

117 Timeout for multiprocessing; maximum wall time (sec). 

118 fail_fast : `bool` 

119 If true then stop processing at first error, otherwise process as many 

120 tasks as possible. 

121 summary : `str` or `None` 

122 File path to store job report in JSON format. 

123 enable_implicit_threading : `bool` 

124 If `True`, do not disable implicit threading by third-party libraries. 

125 Implicit threading is always disabled during actual quantum execution 

126 if ``processes > 1``. 

127 cores_per_quantum : `int` 

128 Number of cores that can be used by each quantum. 

129 memory_per_quantum : `str` 

130 Amount of memory that each quantum can be allowed to use. Empty string 

131 implies no limit. The string can be either a single integer (implying 

132 units of MB) or a combination of number and unit. 

133 raise_on_partial_outputs : `bool` 

134 Consider partial outputs an error instead of a success. 

135 no_existing_outputs : `bool` 

136 Whether to assume that no predicted outputs for these quanta already 

137 exist in the output run collection. 

138 **kwargs : `object` 

139 Ignored; click commands may accept options for more than one script 

140 function and pass all the option kwargs to each of the script functions 

141 which ignore these unused kwargs. 

142 """ 

143 # Fork option still exists for compatibility but we use spawn instead. 

144 if start_method == "fork": # type: ignore[comparison-overlap] 

145 start_method = "spawn" # type: ignore[unreachable] 

146 _LOG.warning("Option --start-method=fork is unsafe and no longer supported, using spawn instead.") 

147 

148 if not enable_implicit_threading: 

149 disable_implicit_threading() 

150 

151 # click passes empty tuple as default value for qgraph_node_id 

152 quantum_ids = ( 

153 {uuid.UUID(q) if not isinstance(q, uuid.UUID) else q for q in qgraph_node_id} 

154 if qgraph_node_id 

155 else None 

156 ) 

157 # Load quantum graph. 

158 with lsst.utils.timer.time_this( 

159 _LOG, 

160 msg=f"Reading {str(len(quantum_ids)) if quantum_ids is not None else 'all'} quanta.", 

161 level=VERBOSE, 

162 ) as qg_read_time: 

163 qg = PredictedQuantumGraph.read_execution_quanta(qgraph, quantum_ids=quantum_ids) 

164 job_metadata = {"qg_read_time": qg_read_time.duration, "qg_size": len(qg)} 

165 

166 summarize_quantum_graph(qg) 

167 

168 dataset_types = {dtn.name: dtn.dataset_type for dtn in qg.pipeline_graph.dataset_types.values()} 

169 

170 # Ensure that QBB uses shared datastore cache. 

171 ButlerFactory.define_datastore_cache() 

172 

173 _butler_factory = _QBBFactory( 

174 butler_config=butler_config, 

175 dimensions=qg.pipeline_graph.universe, 

176 dataset_types=dataset_types, 

177 config_search_path=config_search_path, 

178 ) 

179 

180 # make special quantum executor 

181 resources = ExecutionResources( 

182 num_cores=cores_per_quantum, max_mem=memory_per_quantum, default_mem_units=u.MB 

183 ) 

184 quantumExecutor = SingleQuantumExecutor( 

185 butler=None, 

186 task_factory=task_factory, 

187 enable_lsst_debug=debug, 

188 limited_butler_factory=_butler_factory, 

189 resources=resources, 

190 assume_no_existing_outputs=no_existing_outputs, 

191 skip_existing=True, 

192 clobber_outputs=True, 

193 raise_on_partial_outputs=raise_on_partial_outputs, 

194 job_metadata=job_metadata, 

195 ) 

196 

197 timeout = MP_TIMEOUT if timeout is None else timeout 

198 executor = MPGraphExecutor( 

199 num_proc=processes, 

200 timeout=timeout, 

201 start_method=start_method, 

202 quantum_executor=quantumExecutor, 

203 fail_fast=fail_fast, 

204 pdb=pdb, 

205 ) 

206 try: 

207 with lsst.utils.timer.profile(profile, _LOG): 

208 executor.execute(qg) 

209 finally: 

210 if summary: 

211 report = executor.getReport() 

212 if report: 

213 with ResourcePath(summary).open("w") as out: 

214 # Do not save fields that are not set. 

215 out.write(report.model_dump_json(exclude_none=True, indent=2)) 

216 

217 

218class _QBBFactory: 

219 """Class which is a callable for making QBB instances. 

220 

221 This class is also responsible for reconstructing correct dimension 

222 universe after unpickling. When pickling multiple things that require 

223 dimension universe, this class must be unpickled first. The logic in 

224 MPGraphExecutor ensures that SingleQuantumExecutor is unpickled first in 

225 the subprocess, which causes unpickling of this class. 

226 """ 

227 

228 def __init__( 

229 self, 

230 butler_config: ResourcePathExpression, 

231 dimensions: DimensionUniverse, 

232 dataset_types: Mapping[str, DatasetType], 

233 config_search_path: list[str] | None, 

234 ): 

235 self.butler_config = butler_config 

236 self.dimensions = dimensions 

237 self.dataset_types = dataset_types 

238 self.config_search_path = config_search_path 

239 

240 def __call__(self, quantum: Quantum) -> LimitedButler: 

241 """Return freshly initialized `~lsst.daf.butler.QuantumBackedButler`. 

242 

243 Factory method to create QuantumBackedButler instances. 

244 """ 

245 return QuantumBackedButler.initialize( 

246 config=self.butler_config, 

247 quantum=quantum, 

248 dimensions=self.dimensions, 

249 dataset_types=self.dataset_types, 

250 ) 

251 

252 @classmethod 

253 def _unpickle( 

254 cls, 

255 butler_config: ResourcePathExpression, 

256 dimensions_config: DimensionConfig | None, 

257 dataset_types_pickle: bytes, 

258 config_search_path: list[str] | None, 

259 ) -> _QBBFactory: 

260 universe = DimensionUniverse(dimensions_config) 

261 dataset_types = pickle.loads(dataset_types_pickle) 

262 return _QBBFactory(butler_config, universe, dataset_types, config_search_path) 

263 

264 def __reduce__(self) -> tuple: 

265 # If dimension universe is not default one, we need to dump/restore 

266 # its config. 

267 config = self.dimensions.dimensionConfig 

268 default = DimensionConfig() 

269 # Only send configuration to other side if it is non-default, default 

270 # will be instantiated from config=None. 

271 if (config["namespace"], config["version"]) != (default["namespace"], default["version"]): 

272 dimension_config = config 

273 else: 

274 dimension_config = None 

275 # Dataset types need to be unpickled only after universe is made. 

276 dataset_types_pickle = pickle.dumps(self.dataset_types) 

277 return ( 

278 self._unpickle, 

279 (self.butler_config, dimension_config, dataset_types_pickle, self.config_search_path), 

280 )