Coverage for python / lsst / ctrl / bps / parsl / workflow.py: 25%
105 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:24 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:24 +0000
1# This file is part of ctrl_bps_parsl.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org) and the LSST DESC (https://www.lsstdesc.org/).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <https://www.gnu.org/licenses/>.
28import logging
29import os
30import pickle
31from collections.abc import Iterable, Mapping
33import parsl
34import parsl.config
35from parsl.app.app import bash_app
36from parsl.app.bash import BashApp
37from parsl.app.futures import Future
39from lsst.ctrl.bps import BaseWmsWorkflow, BpsConfig, GenericWorkflow, GenericWorkflowJob
41from .configuration import get_workflow_filename, set_parsl_logging
42from .job import ParslJob, get_file_paths
43from .site import SiteConfig
45__all__ = ("ParslWorkflow", "get_parsl_config")
47_log = logging.getLogger("lsst.ctrl.bps.parsl")
50def get_parsl_config(config: BpsConfig) -> parsl.config.Config:
51 """Construct parsl configuration from BPS configuration.
53 For details on the site configuration, see `SiteConfig`. For details on the
54 monitor configuration, see ``get_parsl_monitor``.
56 `SiteConfig` provides an implementation of the method ``get_parsl_config``
57 which returns a Parsl configuration with sensible defaults. Subclasses
58 of `SiteConfig` can overwrite that method to configure Parsl in a
59 way specific to the site's configuration.
61 Parameters
62 ----------
63 config : `lsst.ctrl.bps.BpsConfig`
64 BPS configuration.
66 Returns
67 -------
68 parsl_config : `parsl.config.Config`
69 Parsl configuration.
70 """
71 site = SiteConfig.from_config(config)
72 return site.get_parsl_config()
75class ParslWorkflow(BaseWmsWorkflow):
76 """Parsl-based workflow object to manage execution of workflow.
78 Parameters
79 ----------
80 name : `str`
81 Unique name of workflow.
82 config : `lsst.ctrl.bps.BpsConfig`
83 Generic workflow config.
84 path : `str`
85 Path prefix for workflow output files.
86 jobs : `dict` mapping `str` to `lsst.ctrl.bps.parsl.ParslJob`
87 Jobs to be executed.
88 parents : `dict` mapping `str` to iterable of `str`
89 Dependency tree. Keywords are job names, and values are a list of job
90 names that must be executed before the keyword job name can be
91 executed.
92 endpoints : iterable of `str`
93 Endpoints of the dependency tree. These jobs (specified by name) have
94 no children.
95 final : `lsst.ctrl.bps.parsl.ParslJob`, optional
96 Final job to be done, e.g., to merge the execution butler. This is done
97 locally.
98 """
100 def __init__(
101 self,
102 name: str,
103 config: BpsConfig,
104 path: str,
105 jobs: dict[str, ParslJob],
106 parents: Mapping[str, Iterable[str]],
107 endpoints: Iterable[str],
108 final: ParslJob | None = None,
109 ):
110 super().__init__(name, config)
112 self.path = path
113 self.bps_config = config
114 self.parsl_config = get_parsl_config(config)
115 self.site = SiteConfig.from_config(config)
116 self.dfk: parsl.DataFlowKernel | None = None # type: ignore
117 self.command_prefix = self.site.get_command_prefix()
119 # these are function decorators
120 self.apps: dict[str, BashApp] = {
121 ex.label: bash_app( # type: ignore
122 executors=[ex.label],
123 cache=True,
124 ignore_for_cache=["stderr", "stdout", "parsl_resource_specification"],
125 )
126 for ex in self.parsl_config.executors
127 }
129 self.jobs = jobs
130 self.parents = parents
131 self.endpoints = endpoints
132 self.final = final
134 def __reduce__(self):
135 """Recipe for pickle"""
136 return type(self), (
137 self.name,
138 self.bps_config,
139 self.path,
140 self.jobs,
141 self.parents,
142 self.endpoints,
143 self.final,
144 )
146 @classmethod
147 def from_generic_workflow(
148 cls, config: BpsConfig, generic_workflow: GenericWorkflow, out_prefix: str, service_class: str
149 ) -> BaseWmsWorkflow:
150 """Create a ParslWorkflow object from a BPS GenericWorkflow.
152 Parameters
153 ----------
154 config : `lsst.ctrl.bps.BpsConfig`
155 Configuration of the workflow.
156 generic_workflow : `lsst.ctrl.bps.generic_workflow.GenericWorkflow`
157 Generic representation of a single workflow.
158 out_prefix : `str`
159 Prefix for workflow output files.
160 service_class : `str`
161 Full module name of WMS service class that created this workflow.
163 Returns
164 -------
165 self : `ParslWorkflow`
166 Constructed workflow.
167 """
168 # Generate list of jobs
169 jobs: dict[str, ParslJob] = {}
170 for job_name in generic_workflow:
171 generic_job = generic_workflow.get_job(job_name)
172 assert generic_job.name not in jobs
173 jobs[job_name] = ParslJob(generic_job, config, get_file_paths(generic_workflow, job_name))
175 parents = {name: set(generic_workflow.predecessors(name)) for name in jobs}
176 endpoints = [name for name in jobs if generic_workflow.out_degree(name) == 0]
178 # Add final job: execution butler merge
179 job = generic_workflow.get_final()
180 final: ParslJob | None = None
181 if job is not None:
182 assert isinstance(job, GenericWorkflowJob)
183 final = ParslJob(job, config, get_file_paths(generic_workflow, job.name))
185 return cls(generic_workflow.name, config, out_prefix, jobs, parents, endpoints, final)
187 def write(self, out_prefix: str):
188 """Write workflow state.
190 This, in combination with the parsl checkpoint files, can be used to
191 restart a workflow that was interrupted.
193 Parameters
194 ----------
195 out_prefix : `str`
196 Root directory to be used for WMS workflow inputs and outputs
197 as well as internal WMS files.
198 """
199 filename = get_workflow_filename(out_prefix)
200 _log.info("Writing workflow with ID=%s", out_prefix)
201 with open(filename, "wb") as fd:
202 pickle.dump(self, fd)
204 @classmethod
205 def read(cls, out_prefix: str) -> "ParslWorkflow":
206 """Construct from the saved workflow state.
208 Parameters
209 ----------
210 out_prefix : `str`
211 Root directory to be used for WMS workflow inputs and outputs
212 as well as internal WMS files.
214 Returns
215 -------
216 self : `ParslWorkflow`
217 Constructed workflow.
218 """
219 filename = get_workflow_filename(out_prefix)
220 with open(filename, "rb") as fd:
221 self = pickle.load(fd)
222 assert isinstance(self, cls)
223 return self
225 def run(self, block: bool = True) -> list[Future | None]:
226 """Run the workflow.
228 Parameters
229 ----------
230 block : `bool`, optional
231 Block returning from this method until the workflow is complete? If
232 `False`, jobs may still be running when this returns, and it is the
233 user's responsibility to call the ``finalize_jobs`` and
234 ``shutdown`` methods when they are complete.
236 Returns
237 -------
238 futures : `list` of `parsl.dataflow.futures.AppFuture`
239 `parsl.dataflow.futures.AppFuture` objects linked to the execution
240 of the endpoint jobs.
241 """
242 futures = [self.execute(name) for name in self.endpoints]
243 if block:
244 # Calling .exception() for each future blocks returning
245 # from this method until all the jobs have executed or
246 # raised an error. This is needed for running in a
247 # non-interactive python process that would otherwise end
248 # before the futures resolve.
249 for ff in futures:
250 if ff is not None:
251 ff.exception()
252 self.shutdown()
253 self.finalize_jobs()
254 return futures
256 def execute(self, name: str) -> Future | None:
257 """Execute a job.
259 Parameters
260 ----------
261 name : `str`
262 Name of job to execute.
264 Returns
265 -------
266 future : `parsl.dataflow.futures.AppFuture` or `None`
267 A `parsl.dataflow.futures.AppFuture` object linked to the execution
268 of the job, or `None` if the job is being reserved to run locally.
269 """
270 if name in ("pipetaskInit", "mergeExecutionButler"):
271 # These get done outside of parsl
272 return None
273 job = self.jobs[name]
274 inputs = [self.execute(parent) for parent in self.parents[name]]
275 executors = self.parsl_config.executors
276 if len(executors) > 1:
277 label = self.site.select_executor(job)
278 else:
279 label = executors[0].label
280 return job.get_future(
281 self.apps[label],
282 [ff for ff in inputs if ff is not None],
283 self.command_prefix,
284 self.site.resource_list,
285 )
287 def load_dfk(self):
288 """Load data frame kernel.
290 This starts parsl.
291 """
292 if self.dfk is not None:
293 raise RuntimeError("Workflow has already started.")
294 set_parsl_logging(self.bps_config)
295 self.dfk = parsl.load(self.parsl_config)
297 def start(self):
298 """Start the workflow."""
299 self.initialize_jobs()
300 self.load_dfk()
302 def restart(self):
303 """Restart the workflow after interruption."""
304 self.parsl_config.memoizer.checkpoint_files = parsl.utils.get_last_checkpoint()
305 self.load_dfk()
307 def shutdown(self):
308 """Shut down the workflow.
310 This stops parsl.
311 """
312 if self.dfk is None:
313 raise RuntimeError("Workflow not started.")
314 self.dfk.cleanup()
315 self.dfk = None
316 parsl.DataFlowKernelLoader.clear()
318 def initialize_jobs(self):
319 """Run initial jobs.
321 These jobs are run locally before any other jobs are submitted to
322 parsl.
324 This is used to set up the butler.
325 """
326 job = self.jobs.get("pipetaskInit", None)
327 if job is not None:
328 os.makedirs(os.path.join(self.path, "logs"))
329 job.run_local()
331 def finalize_jobs(self):
332 """Run final jobs.
334 These jobs are run locally after all other jobs are complete.
336 This is used to merge the execution butler.
337 """
338 if self.final is not None and not self.final.done:
339 self.final.run_local()