Coverage for python / lsst / ctrl / bps / parsl / sites / work_queue.py: 48%
31 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:24 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:24 +0000
1# This file is part of ctrl_bps_parsl.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org) and the LSST DESC (https://www.lsstdesc.org/).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <https://www.gnu.org/licenses/>.
28from typing import TYPE_CHECKING, Any
30from parsl.executors import WorkQueueExecutor
31from parsl.executors.base import ParslExecutor
32from parsl.launchers import SrunLauncher
33from parsl.providers import LocalProvider
35try:
36 from parsl.providers.base import ExecutionProvider
37except ImportError:
38 from parsl.providers.provider_base import ExecutionProvider # type: ignore
40from ..configuration import get_bps_config_value
41from ..site import SiteConfig
43if TYPE_CHECKING:
44 from ..job import ParslJob
46__all__ = ("LocalSrunWorkQueue", "WorkQueue")
49class WorkQueue(SiteConfig):
50 """Base class configuraton for `parsl.executors.WorkQueueExecutor`.
52 Subclasses must provide implementations for ``.get_executors``
53 and ``.select_executor``. In ``.get_executors``, the site-specific
54 `~parsl.providers.base.ExecutionProvider` must be defined.
56 Parameters
57 ----------
58 *args : `~typing.Any`
59 Parameters forwarded to base class constructor.
60 **kwargs : `~typing.Any`
61 Keyword arguments passed to base class constructor, augmented by
62 the ``resource_list`` argument.
64 Notes
65 -----
66 The following BPS configuration parameters are recognized, overriding the
67 defaults:
69 - ``port`` (`int`): The port used by work_queue. Default: ``9000``.
70 - ``worker_options (`str`): Extra options to pass to work_queue workers.
71 A typical option specifies the memory available per worker, e.g.,
72 ``"--memory=90000"``, which sets the available memory to 90 GB.
73 Default: ``""``
74 - ``wq_max_retries`` (`int`): The number of retries that work_queue
75 will make in case of task failures. Set to ``None`` to have work_queue
76 retry forever; set to ``1`` to have retries managed by Parsl.
77 Default: ``1``
78 """
80 def __init__(self, *args, **kwargs):
81 # Have BPS-defined resource requests for each job passed to work_queue.
82 kwargs["resource_list"] = ["memory", "cores", "disk", "running_time_min", "priority"]
83 super().__init__(*args, **kwargs)
85 def make_executor(
86 self,
87 label: str,
88 provider: ExecutionProvider,
89 *,
90 port: int = 0,
91 worker_options: str = "",
92 wq_max_retries: int = 1,
93 ) -> ParslExecutor:
94 """Return a `parsl.executors.WorkQueueExecutor`. The ``provider``
95 contains the site-specific configuration.
97 Parameters
98 ----------
99 label : `str`
100 Label for executor.
101 provider : `parsl.providers.base.ExecutionProvider`
102 Parsl execution provider, e.g., `parsl.providers.SlurmProvider`.
103 port : `int`, optional
104 Port used by work_queue. Default: ``0``. For a value of ``0``,
105 parsl will allocate a port number automatically.
106 worker_options : `str`, optional
107 Extra options to pass to work_queue workers, e.g.,
108 ``"--memory=90000"``. Default: ``""``.
109 wq_max_retries : `int`, optional
110 Number of retries for work_queue to attempt per job. Set to
111 ``None`` to have it try indefinitely; set to ``1`` to have Parsl
112 control the number of retries. Default: ``1``.
113 """
114 port = get_bps_config_value(self.site, "port", int, port)
115 worker_options = get_bps_config_value(self.site, "worker_options", str, worker_options)
116 max_retries = get_bps_config_value(self.site, "wq_max_retries", int, wq_max_retries)
117 return WorkQueueExecutor(
118 label=label,
119 provider=provider,
120 port=port,
121 worker_options=worker_options,
122 max_retries=max_retries,
123 shared_fs=True,
124 autolabel=False,
125 )
128class LocalSrunWorkQueue(WorkQueue):
129 """Configuration for a `parsl.executors.WorkQueueExecutor` that uses a
130 `parsl.providers.LocalProvider` to manage resources.
132 This can be used directly as the site configuration within a
133 multi-node allocation when Slurm is available. For running on a
134 single node, e.g., a laptop, a `parsl.launchers.SingleNodeLauncher` is
135 used, and Slurm need not be available.
137 The following BPS configuration parameters are recognized, overriding the
138 defaults:
140 - ``port`` (`int`): The port used by work_queue. Default: ``0``.
141 - ``worker_options (`str`): Extra options to pass to work_queue workers.
142 A typical option specifies the memory available per worker, e.g.,
143 ``"--memory=90000"``, which sets the available memory to 90 GB.
144 Default: ``""``
145 - ``wq_max_retries`` (`int`): The number of retries that work_queue
146 will make in case of task failures. Set to ``None`` to have work_queue
147 retry forever; set to ``1`` to have retries managed by Parsl. Default:
148 ``1``.
149 - ``nodes_per_block`` (`int`): The number of allocated nodes.
150 Default: ``1``.
151 """
153 def get_executors(self) -> list[ParslExecutor]:
154 """Get a list of executors to be used in processing."""
155 nodes_per_block = get_bps_config_value(self.site, "nodes_per_block", int, 1)
156 provider_options: dict[str, Any] = {
157 "nodes_per_block": nodes_per_block,
158 "init_blocks": 0,
159 "min_blocks": 0,
160 "max_blocks": 1,
161 "parallelism": 0,
162 "cmd_timeout": 300,
163 }
164 if nodes_per_block > 1:
165 provider_options["launcher"] = SrunLauncher(overrides="-K0 -k --cpu-bind=none")
166 provider = LocalProvider(**provider_options)
167 return [self.make_executor("work_queue", provider)]
169 def select_executor(self, job: "ParslJob") -> str:
170 """Get the ``label`` of the executor to use to execute a job.
172 Parameters
173 ----------
174 job : `lsst.ctrl.bps.parsl.ParslJob`
175 Job to be executed.
177 Returns
178 -------
179 label : `str`
180 Label of executor to use to execute ``job``.
181 """
182 return "work_queue"