Coverage for tests / test_workflow.py: 21%

81 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-06 08:37 +0000

1# This file is part of ctrl_bps_parsl. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org) and the LSST DESC (https://www.lsstdesc.org/). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <https://www.gnu.org/licenses/>. 

27 

28"""Unit tests for ParslWorkflow.restart, load_dfk, and shutdown.""" 

29 

30from unittest.mock import MagicMock, patch 

31 

32import pytest 

33 

34from lsst.ctrl.bps import BpsConfig 

35from lsst.ctrl.bps.parsl.workflow import ParslWorkflow 

36 

37 

38def make_workflow(): 

39 """Return a minimal ParslWorkflow and its mock parsl_config. 

40 

41 ``get_parsl_config`` and ``SiteConfig`` are patched so that no real Parsl 

42 executors are created. The returned parsl_config mock has an empty 

43 ``executors`` list (so the ``bash_app`` loop in ``__init__`` is a no-op). 

44 """ 

45 bps_config = BpsConfig( 

46 { 

47 "submitPath": ".", 

48 "operator": "operator", 

49 "computeSite": "local", 

50 "uniqProcName": "test_run", 

51 "project": "test_project", 

52 "campaign": "test_campaign", 

53 } 

54 ) 

55 

56 mock_parsl_config = MagicMock() 

57 mock_parsl_config.executors = [] 

58 

59 mock_site = MagicMock() 

60 mock_site.get_command_prefix.return_value = "" 

61 

62 with ( 

63 patch( 

64 "lsst.ctrl.bps.parsl.workflow.get_parsl_config", 

65 return_value=mock_parsl_config, 

66 ), 

67 patch( 

68 "lsst.ctrl.bps.parsl.workflow.SiteConfig", 

69 ) as mock_site_class, 

70 ): 

71 mock_site_class.from_config.return_value = mock_site 

72 workflow = ParslWorkflow( 

73 name="test_run", 

74 config=bps_config, 

75 path=".", 

76 jobs={}, 

77 parents={}, 

78 endpoints=[], 

79 ) 

80 

81 return workflow, mock_parsl_config 

82 

83 

84# --------------------------------------------------------------------------- 

85# restart() 

86# --------------------------------------------------------------------------- 

87 

88 

89def test_restart_calls_get_last_checkpoint(): 

90 """restart() must call parsl.utils.get_last_checkpoint().""" 

91 workflow, _ = make_workflow() 

92 

93 with ( 

94 patch("parsl.utils.get_last_checkpoint", return_value="/runinfo/000/tasks.pkl") as mock_glc, 

95 patch("parsl.load"), 

96 patch("lsst.ctrl.bps.parsl.workflow.set_parsl_logging"), 

97 ): 

98 workflow.restart() 

99 

100 mock_glc.assert_called_once_with() 

101 

102 

103def test_restart_checkpoint_files_set_when_checkpoint_exists(): 

104 """When a checkpoint file exists, get_last_checkpoint() returns [path].""" 

105 workflow, mock_parsl_config = make_workflow() 

106 checkpoint = "/runinfo/000/tasks.pkl" 

107 

108 with ( 

109 patch("parsl.utils.get_last_checkpoint", return_value=[checkpoint]), 

110 patch("parsl.load"), 

111 patch("lsst.ctrl.bps.parsl.workflow.set_parsl_logging"), 

112 ): 

113 workflow.restart() 

114 

115 assert mock_parsl_config.memoizer.checkpoint_files == [checkpoint] 

116 

117 

118def test_restart_checkpoint_files_empty_when_no_checkpoint(): 

119 """When there are no checkpoint files, get_last_checkpoint() returns [].""" 

120 workflow, mock_parsl_config = make_workflow() 

121 

122 with ( 

123 patch("parsl.utils.get_last_checkpoint", return_value=[]), 

124 patch("parsl.load"), 

125 patch("lsst.ctrl.bps.parsl.workflow.set_parsl_logging"), 

126 ): 

127 workflow.restart() 

128 

129 assert mock_parsl_config.memoizer.checkpoint_files == [] 

130 

131 

132def test_restart_calls_parsl_load(): 

133 """restart() must call parsl.load with the workflow's parsl_config.""" 

134 workflow, mock_parsl_config = make_workflow() 

135 

136 with ( 

137 patch("parsl.utils.get_last_checkpoint", return_value="/runinfo/000/tasks.pkl"), 

138 patch("parsl.load") as mock_load, 

139 patch("lsst.ctrl.bps.parsl.workflow.set_parsl_logging"), 

140 ): 

141 workflow.restart() 

142 

143 mock_load.assert_called_once_with(mock_parsl_config) 

144 

145 

146def test_restart_does_not_call_initialize_jobs(): 

147 """ 

148 restart() must not run initialize_jobs (no pipetaskInit side-effects). 

149 """ 

150 workflow, _ = make_workflow() 

151 mock_job = MagicMock() 

152 workflow.jobs["pipetaskInit"] = mock_job 

153 

154 with ( 

155 patch("parsl.utils.get_last_checkpoint", return_value=None), 

156 patch("parsl.load"), 

157 patch("lsst.ctrl.bps.parsl.workflow.set_parsl_logging"), 

158 ): 

159 workflow.restart() 

160 

161 mock_job.run_local.assert_not_called() 

162 

163 

164# --------------------------------------------------------------------------- 

165# load_dfk() 

166# --------------------------------------------------------------------------- 

167 

168 

169def test_load_dfk_sets_dfk(): 

170 """After load_dfk(), workflow.dfk is the object returned by parsl.load.""" 

171 workflow, _ = make_workflow() 

172 fake_dfk = MagicMock() 

173 

174 with ( 

175 patch("parsl.load", return_value=fake_dfk), 

176 patch("lsst.ctrl.bps.parsl.workflow.set_parsl_logging"), 

177 ): 

178 workflow.load_dfk() 

179 

180 assert workflow.dfk is fake_dfk 

181 

182 

183def test_load_dfk_raises_if_already_started(): 

184 """load_dfk() raises RuntimeError when the workflow is already running.""" 

185 workflow, _ = make_workflow() 

186 workflow.dfk = MagicMock() 

187 

188 with pytest.raises(RuntimeError, match="already started"): 

189 workflow.load_dfk() 

190 

191 

192def test_load_dfk_calls_set_parsl_logging(): 

193 """load_dfk() forwards the BPS config to set_parsl_logging.""" 

194 workflow, _ = make_workflow() 

195 

196 with ( 

197 patch("parsl.load"), 

198 patch("lsst.ctrl.bps.parsl.workflow.set_parsl_logging") as mock_logging, 

199 ): 

200 workflow.load_dfk() 

201 

202 mock_logging.assert_called_once_with(workflow.bps_config) 

203 

204 

205# --------------------------------------------------------------------------- 

206# shutdown() 

207# --------------------------------------------------------------------------- 

208 

209 

210def test_shutdown_calls_dfk_cleanup(): 

211 """shutdown() must invoke cleanup() on the active DFK.""" 

212 workflow, _ = make_workflow() 

213 fake_dfk = MagicMock() 

214 workflow.dfk = fake_dfk 

215 

216 with patch("parsl.DataFlowKernelLoader.clear"): 

217 workflow.shutdown() 

218 

219 fake_dfk.cleanup.assert_called_once_with() 

220 

221 

222def test_shutdown_clears_dfk(): 

223 """After shutdown(), workflow.dfk is None.""" 

224 workflow, _ = make_workflow() 

225 workflow.dfk = MagicMock() 

226 

227 with patch("parsl.DataFlowKernelLoader.clear"): 

228 workflow.shutdown() 

229 

230 assert workflow.dfk is None 

231 

232 

233def test_shutdown_calls_dfkl_clear(): 

234 """shutdown() must call parsl.DataFlowKernelLoader.clear().""" 

235 workflow, _ = make_workflow() 

236 workflow.dfk = MagicMock() 

237 

238 with patch("parsl.DataFlowKernelLoader.clear") as mock_clear: 

239 workflow.shutdown() 

240 

241 mock_clear.assert_called_once_with() 

242 

243 

244def test_shutdown_raises_if_not_started(): 

245 """ 

246 shutdown() raises RuntimeError when the workflow has not been started. 

247 """ 

248 workflow, _ = make_workflow() 

249 

250 with pytest.raises(RuntimeError, match="not started"): 

251 workflow.shutdown()