Coverage for python / lsst / ctrl / bps / parsl / workflow.py: 25%

106 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:53 +0000

1# This file is part of ctrl_bps_parsl. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org) and the LSST DESC (https://www.lsstdesc.org/). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <https://www.gnu.org/licenses/>. 

27 

28import logging 

29import os 

30import pickle 

31from collections.abc import Iterable, Mapping 

32 

33import parsl 

34import parsl.config 

35from parsl.app.app import bash_app 

36from parsl.app.bash import BashApp 

37from parsl.app.futures import Future 

38 

39from lsst.ctrl.bps import BaseWmsWorkflow, BpsConfig, GenericWorkflow, GenericWorkflowJob 

40 

41from .configuration import get_workflow_filename, set_parsl_logging 

42from .job import ParslJob, get_file_paths 

43from .site import SiteConfig 

44 

45__all__ = ("ParslWorkflow", "get_parsl_config") 

46 

47_log = logging.getLogger("lsst.ctrl.bps.parsl") 

48 

49 

50def get_parsl_config(config: BpsConfig) -> parsl.config.Config: 

51 """Construct parsl configuration from BPS configuration. 

52 

53 For details on the site configuration, see `SiteConfig`. For details on the 

54 monitor configuration, see ``get_parsl_monitor``. 

55 

56 `SiteConfig` provides an implementation of the method ``get_parsl_config`` 

57 which returns a Parsl configuration with sensible defaults. Subclasses 

58 of `SiteConfig` can overwrite that method to configure Parsl in a 

59 way specific to the site's configuration. 

60 

61 Parameters 

62 ---------- 

63 config : `lsst.ctrl.bps.BpsConfig` 

64 BPS configuration. 

65 

66 Returns 

67 ------- 

68 parsl_config : `parsl.config.Config` 

69 Parsl configuration. 

70 """ 

71 site = SiteConfig.from_config(config) 

72 return site.get_parsl_config() 

73 

74 

75class ParslWorkflow(BaseWmsWorkflow): 

76 """Parsl-based workflow object to manage execution of workflow. 

77 

78 Parameters 

79 ---------- 

80 name : `str` 

81 Unique name of workflow. 

82 config : `lsst.ctrl.bps.BpsConfig` 

83 Generic workflow config. 

84 path : `str` 

85 Path prefix for workflow output files. 

86 jobs : `dict` mapping `str` to `lsst.ctrl.bps.parsl.ParslJob` 

87 Jobs to be executed. 

88 parents : `dict` mapping `str` to iterable of `str` 

89 Dependency tree. Keywords are job names, and values are a list of job 

90 names that must be executed before the keyword job name can be 

91 executed. 

92 endpoints : iterable of `str` 

93 Endpoints of the dependency tree. These jobs (specified by name) have 

94 no children. 

95 final : `lsst.ctrl.bps.parsl.ParslJob`, optional 

96 Final job to be done, e.g., to merge the execution butler. This is done 

97 locally. 

98 """ 

99 

100 def __init__( 

101 self, 

102 name: str, 

103 config: BpsConfig, 

104 path: str, 

105 jobs: dict[str, ParslJob], 

106 parents: Mapping[str, Iterable[str]], 

107 endpoints: Iterable[str], 

108 final: ParslJob | None = None, 

109 ): 

110 super().__init__(name, config) 

111 

112 self.path = path 

113 self.bps_config = config 

114 self.parsl_config = get_parsl_config(config) 

115 self.site = SiteConfig.from_config(config) 

116 self.dfk: parsl.DataFlowKernel | None = None # type: ignore 

117 self.command_prefix = self.site.get_command_prefix() 

118 

119 # these are function decorators 

120 self.apps: dict[str, BashApp] = { 

121 ex.label: bash_app( # type: ignore 

122 executors=[ex.label], 

123 cache=True, 

124 ignore_for_cache=["stderr", "stdout", "parsl_resource_specification"], 

125 ) 

126 for ex in self.parsl_config.executors 

127 } 

128 

129 self.jobs = jobs 

130 self.parents = parents 

131 self.endpoints = endpoints 

132 self.final = final 

133 

134 def __reduce__(self): 

135 """Recipe for pickle""" 

136 return type(self), ( 

137 self.name, 

138 self.bps_config, 

139 self.path, 

140 self.jobs, 

141 self.parents, 

142 self.endpoints, 

143 self.final, 

144 ) 

145 

146 @classmethod 

147 def from_generic_workflow( 

148 cls, config: BpsConfig, generic_workflow: GenericWorkflow, out_prefix: str, service_class: str 

149 ) -> BaseWmsWorkflow: 

150 """Create a ParslWorkflow object from a BPS GenericWorkflow. 

151 

152 Parameters 

153 ---------- 

154 config : `lsst.ctrl.bps.BpsConfig` 

155 Configuration of the workflow. 

156 generic_workflow : `lsst.ctrl.bps.generic_workflow.GenericWorkflow` 

157 Generic representation of a single workflow. 

158 out_prefix : `str` 

159 Prefix for workflow output files. 

160 service_class : `str` 

161 Full module name of WMS service class that created this workflow. 

162 

163 Returns 

164 ------- 

165 self : `ParslWorkflow` 

166 Constructed workflow. 

167 """ 

168 # Generate list of jobs 

169 jobs: dict[str, ParslJob] = {} 

170 for job_name in generic_workflow: 

171 generic_job = generic_workflow.get_job(job_name) 

172 assert generic_job.name not in jobs 

173 jobs[job_name] = ParslJob(generic_job, config, get_file_paths(generic_workflow, job_name)) 

174 

175 parents = {name: set(generic_workflow.predecessors(name)) for name in jobs} 

176 endpoints = [name for name in jobs if generic_workflow.out_degree(name) == 0] 

177 

178 # Add final job: execution butler merge 

179 job = generic_workflow.get_final() 

180 final: ParslJob | None = None 

181 if job is not None: 

182 assert isinstance(job, GenericWorkflowJob) 

183 final = ParslJob(job, config, get_file_paths(generic_workflow, job.name)) 

184 

185 return cls(generic_workflow.name, config, out_prefix, jobs, parents, endpoints, final) 

186 

187 def write(self, out_prefix: str): 

188 """Write workflow state. 

189 

190 This, in combination with the parsl checkpoint files, can be used to 

191 restart a workflow that was interrupted. 

192 

193 Parameters 

194 ---------- 

195 out_prefix : `str` 

196 Root directory to be used for WMS workflow inputs and outputs 

197 as well as internal WMS files. 

198 """ 

199 filename = get_workflow_filename(out_prefix) 

200 _log.info("Writing workflow with ID=%s", out_prefix) 

201 with open(filename, "wb") as fd: 

202 pickle.dump(self, fd) 

203 

204 @classmethod 

205 def read(cls, out_prefix: str) -> "ParslWorkflow": 

206 """Construct from the saved workflow state. 

207 

208 Parameters 

209 ---------- 

210 out_prefix : `str` 

211 Root directory to be used for WMS workflow inputs and outputs 

212 as well as internal WMS files. 

213 

214 Returns 

215 ------- 

216 self : `ParslWorkflow` 

217 Constructed workflow. 

218 """ 

219 filename = get_workflow_filename(out_prefix) 

220 with open(filename, "rb") as fd: 

221 self = pickle.load(fd) 

222 assert isinstance(self, cls) 

223 return self 

224 

225 def run(self, block: bool = True) -> list[Future | None]: 

226 """Run the workflow. 

227 

228 Parameters 

229 ---------- 

230 block : `bool`, optional 

231 Block returning from this method until the workflow is complete? If 

232 `False`, jobs may still be running when this returns, and it is the 

233 user's responsibility to call the ``finalize_jobs`` and 

234 ``shutdown`` methods when they are complete. 

235 

236 Returns 

237 ------- 

238 futures : `list` of `parsl.dataflow.futures.AppFuture` 

239 `parsl.dataflow.futures.AppFuture` objects linked to the execution 

240 of the endpoint jobs. 

241 """ 

242 futures = [self.execute(name) for name in self.endpoints] 

243 if block: 

244 # Calling .exception() for each future blocks returning 

245 # from this method until all the jobs have executed or 

246 # raised an error. This is needed for running in a 

247 # non-interactive python process that would otherwise end 

248 # before the futures resolve. 

249 for ff in futures: 

250 if ff is not None: 

251 ff.exception() 

252 self.shutdown() 

253 self.finalize_jobs() 

254 return futures 

255 

256 def execute(self, name: str) -> Future | None: 

257 """Execute a job. 

258 

259 Parameters 

260 ---------- 

261 name : `str` 

262 Name of job to execute. 

263 

264 Returns 

265 ------- 

266 future : `parsl.dataflow.futures.AppFuture` or `None` 

267 A `parsl.dataflow.futures.AppFuture` object linked to the execution 

268 of the job, or `None` if the job is being reserved to run locally. 

269 """ 

270 if name in ("pipetaskInit", "mergeExecutionButler"): 

271 # These get done outside of parsl 

272 return None 

273 job = self.jobs[name] 

274 inputs = [self.execute(parent) for parent in self.parents[name]] 

275 executors = self.parsl_config.executors 

276 if len(executors) > 1: 

277 label = self.site.select_executor(job) 

278 else: 

279 label = executors[0].label 

280 return job.get_future( 

281 self.apps[label], 

282 [ff for ff in inputs if ff is not None], 

283 self.command_prefix, 

284 self.site.resource_list, 

285 ) 

286 

287 def load_dfk(self): 

288 """Load data frame kernel. 

289 

290 This starts parsl. 

291 """ 

292 if self.dfk is not None: 

293 raise RuntimeError("Workflow has already started.") 

294 set_parsl_logging(self.bps_config) 

295 self.dfk = parsl.load(self.parsl_config) 

296 

297 def start(self): 

298 """Start the workflow.""" 

299 self.initialize_jobs() 

300 self.load_dfk() 

301 

302 def restart(self): 

303 """Restart the workflow after interruption.""" 

304 # The following is a workaround for a bug in 

305 # parsl.utils.get_last_checkpoint. 

306 checkpoint_files = parsl.utils.get_all_checkpoints() 

307 self.parsl_config.checkpoint_files = checkpoint_files[-1:] 

308 self.load_dfk() 

309 

310 def shutdown(self): 

311 """Shut down the workflow. 

312 

313 This stops parsl. 

314 """ 

315 if self.dfk is None: 

316 raise RuntimeError("Workflow not started.") 

317 self.dfk.cleanup() 

318 self.dfk = None 

319 parsl.DataFlowKernelLoader.clear() 

320 

321 def initialize_jobs(self): 

322 """Run initial jobs. 

323 

324 These jobs are run locally before any other jobs are submitted to 

325 parsl. 

326 

327 This is used to set up the butler. 

328 """ 

329 job = self.jobs.get("pipetaskInit", None) 

330 if job is not None: 

331 os.makedirs(os.path.join(self.path, "logs")) 

332 job.run_local() 

333 

334 def finalize_jobs(self): 

335 """Run final jobs. 

336 

337 These jobs are run locally after all other jobs are complete. 

338 

339 This is used to merge the execution butler. 

340 """ 

341 if self.final is not None and not self.final.done: 

342 self.final.run_local()