Coverage for python / lsst / ctrl / bps / parsl / sites / work_queue.py: 48%

31 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-28 08:51 +0000

1# This file is part of ctrl_bps_parsl. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org) and the LSST DESC (https://www.lsstdesc.org/). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <https://www.gnu.org/licenses/>. 

27 

28from typing import TYPE_CHECKING, Any 

29 

30from parsl.executors import WorkQueueExecutor 

31from parsl.executors.base import ParslExecutor 

32from parsl.launchers import SrunLauncher 

33from parsl.providers import LocalProvider 

34 

35try: 

36 from parsl.providers.base import ExecutionProvider 

37except ImportError: 

38 from parsl.providers.provider_base import ExecutionProvider # type: ignore 

39 

40from ..configuration import get_bps_config_value 

41from ..site import SiteConfig 

42 

43if TYPE_CHECKING: 

44 from ..job import ParslJob 

45 

46__all__ = ("LocalSrunWorkQueue", "WorkQueue") 

47 

48 

49class WorkQueue(SiteConfig): 

50 """Base class configuraton for `parsl.executors.WorkQueueExecutor`. 

51 

52 Subclasses must provide implementations for ``.get_executors`` 

53 and ``.select_executor``. In ``.get_executors``, the site-specific 

54 `~parsl.providers.base.ExecutionProvider` must be defined. 

55 

56 Parameters 

57 ---------- 

58 *args : `~typing.Any` 

59 Parameters forwarded to base class constructor. 

60 **kwargs : `~typing.Any` 

61 Keyword arguments passed to base class constructor, augmented by 

62 the ``resource_list`` argument. 

63 

64 Notes 

65 ----- 

66 The following BPS configuration parameters are recognized, overriding the 

67 defaults: 

68 

69 - ``port`` (`int`): The port used by work_queue. Default: ``9000``. 

70 - ``worker_options (`str`): Extra options to pass to work_queue workers. 

71 A typical option specifies the memory available per worker, e.g., 

72 ``"--memory=90000"``, which sets the available memory to 90 GB. 

73 Default: ``""`` 

74 - ``wq_max_retries`` (`int`): The number of retries that work_queue 

75 will make in case of task failures. Set to ``None`` to have work_queue 

76 retry forever; set to ``1`` to have retries managed by Parsl. 

77 Default: ``1`` 

78 """ 

79 

80 def __init__(self, *args, **kwargs): 

81 # Have BPS-defined resource requests for each job passed to work_queue. 

82 kwargs["resource_list"] = ["memory", "cores", "disk", "running_time_min", "priority"] 

83 super().__init__(*args, **kwargs) 

84 

85 def make_executor( 

86 self, 

87 label: str, 

88 provider: ExecutionProvider, 

89 *, 

90 port: int = 0, 

91 worker_options: str = "", 

92 wq_max_retries: int = 1, 

93 ) -> ParslExecutor: 

94 """Return a `parsl.executors.WorkQueueExecutor`. The ``provider`` 

95 contains the site-specific configuration. 

96 

97 Parameters 

98 ---------- 

99 label : `str` 

100 Label for executor. 

101 provider : `parsl.providers.base.ExecutionProvider` 

102 Parsl execution provider, e.g., `parsl.providers.SlurmProvider`. 

103 port : `int`, optional 

104 Port used by work_queue. Default: ``0``. For a value of ``0``, 

105 parsl will allocate a port number automatically. 

106 worker_options : `str`, optional 

107 Extra options to pass to work_queue workers, e.g., 

108 ``"--memory=90000"``. Default: ``""``. 

109 wq_max_retries : `int`, optional 

110 Number of retries for work_queue to attempt per job. Set to 

111 ``None`` to have it try indefinitely; set to ``1`` to have Parsl 

112 control the number of retries. Default: ``1``. 

113 """ 

114 port = get_bps_config_value(self.site, "port", int, port) 

115 worker_options = get_bps_config_value(self.site, "worker_options", str, worker_options) 

116 max_retries = get_bps_config_value(self.site, "wq_max_retries", int, wq_max_retries) 

117 return WorkQueueExecutor( 

118 label=label, 

119 provider=provider, 

120 port=port, 

121 worker_options=worker_options, 

122 max_retries=max_retries, 

123 shared_fs=True, 

124 autolabel=False, 

125 ) 

126 

127 

128class LocalSrunWorkQueue(WorkQueue): 

129 """Configuration for a `parsl.executors.WorkQueueExecutor` that uses a 

130 `parsl.providers.LocalProvider` to manage resources. 

131 

132 This can be used directly as the site configuration within a 

133 multi-node allocation when Slurm is available. For running on a 

134 single node, e.g., a laptop, a `parsl.launchers.SingleNodeLauncher` is 

135 used, and Slurm need not be available. 

136 

137 The following BPS configuration parameters are recognized, overriding the 

138 defaults: 

139 

140 - ``port`` (`int`): The port used by work_queue. Default: ``0``. 

141 - ``worker_options (`str`): Extra options to pass to work_queue workers. 

142 A typical option specifies the memory available per worker, e.g., 

143 ``"--memory=90000"``, which sets the available memory to 90 GB. 

144 Default: ``""`` 

145 - ``wq_max_retries`` (`int`): The number of retries that work_queue 

146 will make in case of task failures. Set to ``None`` to have work_queue 

147 retry forever; set to ``1`` to have retries managed by Parsl. Default: 

148 ``1``. 

149 - ``nodes_per_block`` (`int`): The number of allocated nodes. 

150 Default: ``1``. 

151 """ 

152 

153 def get_executors(self) -> list[ParslExecutor]: 

154 """Get a list of executors to be used in processing.""" 

155 nodes_per_block = get_bps_config_value(self.site, "nodes_per_block", int, 1) 

156 provider_options: dict[str, Any] = { 

157 "nodes_per_block": nodes_per_block, 

158 "init_blocks": 0, 

159 "min_blocks": 0, 

160 "max_blocks": 1, 

161 "parallelism": 0, 

162 "cmd_timeout": 300, 

163 } 

164 if nodes_per_block > 1: 

165 provider_options["launcher"] = SrunLauncher(overrides="-K0 -k --cpu-bind=none") 

166 provider = LocalProvider(**provider_options) 

167 return [self.make_executor("work_queue", provider)] 

168 

169 def select_executor(self, job: "ParslJob") -> str: 

170 """Get the ``label`` of the executor to use to execute a job. 

171 

172 Parameters 

173 ---------- 

174 job : `lsst.ctrl.bps.parsl.ParslJob` 

175 Job to be executed. 

176 

177 Returns 

178 ------- 

179 label : `str` 

180 Label of executor to use to execute ``job``. 

181 """ 

182 return "work_queue"