Coverage for python / lsst / ctrl / bps / parsl / sites / torque.py: 34%
50 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:24 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:24 +0000
1# This file is part of ctrl_bps_parsl.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org) and the LSST DESC (https://www.lsstdesc.org/).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <https://www.gnu.org/licenses/>.
28from typing import TYPE_CHECKING, Any
30from parsl.executors import HighThroughputExecutor
31from parsl.executors.base import ParslExecutor
32from parsl.launchers import MpiRunLauncher
33from parsl.providers import TorqueProvider
35from ..configuration import get_bps_config_value, get_workflow_name
36from ..site import SiteConfig
38if TYPE_CHECKING:
39 from ..job import ParslJob
41__all__ = ("Torque",)
44Kwargs = dict[str, Any]
47class Torque(SiteConfig):
48 """Configuration for generic Torque cluster.
50 Parameters
51 ----------
52 *args : `~typing.Any`
53 Parameters forwarded to base class constructor.
54 **kwargs : `~typing.Any`
55 Keyword arguments passed to base class constructor, augmented by
56 the ``resource_list`` argument.
58 Notes
59 -----
60 This can be used directly as the site configuration for a Torque cluster by
61 setting the BPS config, e.g.:
63 .. code-block:: yaml
65 computeSite: torque
66 site:
67 torque:
68 class: lsst.ctrl.bps.parsl.sites.Torque
69 nodes: 4
70 tasks_per_node: 20
71 walltime: "00:59:00" # Note: always quote walltime in YAML
73 Alternatively, it can be used as a base class for Torque cluster
74 configurations.
76 The following BPS configuration parameters are recognised (and required
77 unless there is a default mentioned here, or provided by a subclass):
79 - ``queue`` (`int`): Queue for the Torque job.
80 - ``nodes`` (`int`): number of nodes for each Torque job.
81 - ``tasks_per_node`` (`int`): number of cores per node for each Torque job;
82 by default we use all cores on the node.
83 - ``walltime`` (`str`): time limit for each Torque job.
84 - ``scheduler_options`` (`str`): text to prepend to the Torque submission
85 script (each line usually starting with ``#PBS``).
86 """
88 def __init__(self, *args, **kwargs):
89 # Have BPS-defined resource requests for each job passed to executor.
90 kwargs["resource_list"] = ["priority"]
91 super().__init__(*args, **kwargs)
93 def make_executor(
94 self,
95 label: str,
96 *,
97 queue: str | None = None,
98 nodes: int | None = None,
99 tasks_per_node: int | None = None,
100 walltime: str | None = None,
101 mem_per_worker: float | None = None,
102 scheduler_options: str | None = None,
103 worker_init: str | None = None,
104 provider_options: Kwargs | None = None,
105 executor_options: Kwargs | None = None,
106 ) -> ParslExecutor:
107 """Return an executor for running on a Torque cluster.
109 Parameters
110 ----------
111 label : `str`
112 Label for executor.
113 queue : `str`, optional
114 Queue for the Torque job.
115 nodes : `int`, optional
116 Default number of nodes for each Torque job.
117 tasks_per_node : `int`, optional
118 Default number of cores per node for each Torque job.
119 walltime : `str`, optional
120 Default time limit for each Torque job.
121 mem_per_worker : `float`, optional
122 Minimum memory per worker (GB), limited by the executor.
123 scheduler_options : `str`, optional
124 ``#SBATCH`` directives to prepend to the Torque submission script.
125 worker_init : `str`, optional
126 Environment initiation command.
127 provider_options : `dict`, optional
128 Additional arguments for `parsl.providers.TorqueProvider`
129 constructor.
130 executor_options : `dict`, optional
131 Additional arguments for `parsl.executors.HighThroughputExecutor`
132 constructor.
134 Returns
135 -------
136 executor : `parsl.executors.HighThroughputExecutor`
137 Executor for Torque jobs.
138 """
139 nodes = get_bps_config_value(self.site, "nodes", int, nodes, required=True)
140 walltime = get_bps_config_value(self.site, "walltime", str, walltime, required=True)
141 queue = get_bps_config_value(self.site, "queue", str, queue)
142 tasks_per_node = get_bps_config_value(self.site, "tasks_per_node", int, tasks_per_node)
143 worker_init = get_bps_config_value(self.site, "worker_init", str, "")
144 scheduler_options = get_bps_config_value(self.site, "scheduler_options", str, scheduler_options)
146 if tasks_per_node is None:
147 tasks_per_node = 1
149 job_name = get_workflow_name(self.config)
151 if scheduler_options is None:
152 scheduler_options = ""
153 else:
154 scheduler_options += "\n"
155 scheduler_options += f"#PBS -N {job_name}\n"
156 if queue:
157 scheduler_options += f"#PBS -q {queue}\n"
159 if worker_init is None:
160 worker_init = ""
162 launcher = PbsMpiRunLauncher(overrides=f"--map-by core:{tasks_per_node}")
164 return HighThroughputExecutor(
165 label,
166 provider=PbsTorqueProvider(
167 nodes_per_block=nodes,
168 tasks_per_node=tasks_per_node,
169 queue=queue,
170 walltime=walltime,
171 scheduler_options=scheduler_options,
172 worker_init=worker_init,
173 launcher=launcher,
174 **(provider_options or {}),
175 ),
176 max_workers_per_node=1,
177 mem_per_worker=mem_per_worker,
178 address=self.get_address(),
179 **(executor_options or {}),
180 )
182 def get_executors(self) -> list[ParslExecutor]:
183 """Get a list of executors to be used in processing.
185 Each executor should have a unique ``label``.
186 """
187 return [self.make_executor("torque")]
189 def select_executor(self, job: "ParslJob") -> str:
190 """Get the ``label`` of the executor to use to execute a job.
192 Parameters
193 ----------
194 job : `lsst.ctrl.bps.parsl.ParslJob`
195 Job to be executed.
197 Returns
198 -------
199 label : `str`
200 Label of executor to use to execute ``job``.
201 """
202 return "torque"
205class PbsTorqueProvider(TorqueProvider):
206 """Torque Execution Provider.
208 Parameters
209 ----------
210 *args : `~typing.Any`
211 Parameters forwarded to base class constructor.
212 tasks_per_node : `int`, optional
213 Number of tasks per node.
214 **kwargs : `~typing.Any`
215 Keyword arguments passed to base class constructor, augmented by
216 the ``resource_list`` argument.
218 Notes
219 -----
220 This provider uses qsub to submit, qstat for status, and qdel to cancel
221 jobs. The qsub script to be used is created from a template file in this
222 same module.
224 This subclass allows the ``tasks_per_node`` to be set at construction time
225 instead of at submission time.
226 """
228 def __init__(self, *args, tasks_per_node: int = 1, **kwargs):
229 super().__init__(*args, **kwargs)
230 self.tasks_per_node = tasks_per_node
232 def submit(self, command, tasks_per_node, job_name="parsl.torque"):
233 """Submit the command onto an Local Resource Manager job.
235 Parameters
236 ----------
237 command : `str`
238 Command-line invocation to be made on the remote side.
239 tasks_per_node : `int`
240 Number of tasks to be launched per node. This is ignored in this
241 provider.
242 job_name : `str`:
243 Name for job, must be unique.
245 Returns
246 -------
247 response : `str` or `None`
248 If `None`: At capacity, cannot provision more.
249 If job_id (`str`): Identifier for the job.
251 Notes
252 -----
253 This function returns an ID that corresponds to the task that was just
254 submitted.
256 The ``tasks_per_node`` parameter is ignored in this provider, as it is
257 set at construction time.
258 """
259 return super().submit(
260 command=command,
261 tasks_per_node=self.tasks_per_node,
262 job_name=job_name,
263 )
266class PbsMpiRunLauncher(MpiRunLauncher):
267 """Worker launcher that wraps the user's command with the framework to
268 launch multiple command invocations via ``mpirun``.
270 Parameters
271 ----------
272 debug : `bool`, optional
273 Enable or disable debug logging.
274 bash_location : `str`, optional
275 Path to the ``bash`` shell binary.
276 overrides : `str`, optional
277 Any override options.
279 Notes
280 -----
281 This wrapper sets the bash env variable ``CORES`` to the number of cores on
282 the machine.
284 This launcher makes the following assumptions:
286 - mpirun is installed and can be located in ``$PATH``
287 - The provider makes available the ``$PBS_NODEFILE`` environment variable
288 """
290 def __init__(
291 self,
292 debug: bool = True,
293 bash_location: str = "/bin/bash",
294 overrides: str = "",
295 ):
296 super().__init__(debug=debug, bash_location=bash_location, overrides=overrides)
298 def __call__(self, command: str, tasks_per_node: int, nodes_per_block: int) -> str:
299 """Wrap the user's command with mpirun invocation"""
300 worker_count = nodes_per_block * tasks_per_node
301 debug_num = int(self.debug)
303 return f"""set -e
304export CORES=$(getconf _NPROCESSORS_ONLN)
305[[ "{debug_num}" == "1" ]] && echo "Found cores : $CORES"
306WORKERCOUNT={worker_count}
308cat << MPIRUN_EOF > cmd_$JOBNAME.sh
309{command}
310MPIRUN_EOF
311chmod u+x cmd_$JOBNAME.sh
313mpirun -np $WORKERCOUNT {self.overrides} {self.bash_location} cmd_$JOBNAME.sh
315[[ "{debug_num}" == "1" ]] && echo "All workers done"
316"""