Coverage for tests / test_workflow.py: 21%
81 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 08:37 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 08:37 +0000
1# This file is part of ctrl_bps_parsl.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org) and the LSST DESC (https://www.lsstdesc.org/).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <https://www.gnu.org/licenses/>.
28"""Unit tests for ParslWorkflow.restart, load_dfk, and shutdown."""
30from unittest.mock import MagicMock, patch
32import pytest
34from lsst.ctrl.bps import BpsConfig
35from lsst.ctrl.bps.parsl.workflow import ParslWorkflow
38def make_workflow():
39 """Return a minimal ParslWorkflow and its mock parsl_config.
41 ``get_parsl_config`` and ``SiteConfig`` are patched so that no real Parsl
42 executors are created. The returned parsl_config mock has an empty
43 ``executors`` list (so the ``bash_app`` loop in ``__init__`` is a no-op).
44 """
45 bps_config = BpsConfig(
46 {
47 "submitPath": ".",
48 "operator": "operator",
49 "computeSite": "local",
50 "uniqProcName": "test_run",
51 "project": "test_project",
52 "campaign": "test_campaign",
53 }
54 )
56 mock_parsl_config = MagicMock()
57 mock_parsl_config.executors = []
59 mock_site = MagicMock()
60 mock_site.get_command_prefix.return_value = ""
62 with (
63 patch(
64 "lsst.ctrl.bps.parsl.workflow.get_parsl_config",
65 return_value=mock_parsl_config,
66 ),
67 patch(
68 "lsst.ctrl.bps.parsl.workflow.SiteConfig",
69 ) as mock_site_class,
70 ):
71 mock_site_class.from_config.return_value = mock_site
72 workflow = ParslWorkflow(
73 name="test_run",
74 config=bps_config,
75 path=".",
76 jobs={},
77 parents={},
78 endpoints=[],
79 )
81 return workflow, mock_parsl_config
84# ---------------------------------------------------------------------------
85# restart()
86# ---------------------------------------------------------------------------
89def test_restart_calls_get_last_checkpoint():
90 """restart() must call parsl.utils.get_last_checkpoint()."""
91 workflow, _ = make_workflow()
93 with (
94 patch("parsl.utils.get_last_checkpoint", return_value="/runinfo/000/tasks.pkl") as mock_glc,
95 patch("parsl.load"),
96 patch("lsst.ctrl.bps.parsl.workflow.set_parsl_logging"),
97 ):
98 workflow.restart()
100 mock_glc.assert_called_once_with()
103def test_restart_checkpoint_files_set_when_checkpoint_exists():
104 """When a checkpoint file exists, get_last_checkpoint() returns [path]."""
105 workflow, mock_parsl_config = make_workflow()
106 checkpoint = "/runinfo/000/tasks.pkl"
108 with (
109 patch("parsl.utils.get_last_checkpoint", return_value=[checkpoint]),
110 patch("parsl.load"),
111 patch("lsst.ctrl.bps.parsl.workflow.set_parsl_logging"),
112 ):
113 workflow.restart()
115 assert mock_parsl_config.memoizer.checkpoint_files == [checkpoint]
118def test_restart_checkpoint_files_empty_when_no_checkpoint():
119 """When there are no checkpoint files, get_last_checkpoint() returns []."""
120 workflow, mock_parsl_config = make_workflow()
122 with (
123 patch("parsl.utils.get_last_checkpoint", return_value=[]),
124 patch("parsl.load"),
125 patch("lsst.ctrl.bps.parsl.workflow.set_parsl_logging"),
126 ):
127 workflow.restart()
129 assert mock_parsl_config.memoizer.checkpoint_files == []
132def test_restart_calls_parsl_load():
133 """restart() must call parsl.load with the workflow's parsl_config."""
134 workflow, mock_parsl_config = make_workflow()
136 with (
137 patch("parsl.utils.get_last_checkpoint", return_value="/runinfo/000/tasks.pkl"),
138 patch("parsl.load") as mock_load,
139 patch("lsst.ctrl.bps.parsl.workflow.set_parsl_logging"),
140 ):
141 workflow.restart()
143 mock_load.assert_called_once_with(mock_parsl_config)
146def test_restart_does_not_call_initialize_jobs():
147 """
148 restart() must not run initialize_jobs (no pipetaskInit side-effects).
149 """
150 workflow, _ = make_workflow()
151 mock_job = MagicMock()
152 workflow.jobs["pipetaskInit"] = mock_job
154 with (
155 patch("parsl.utils.get_last_checkpoint", return_value=None),
156 patch("parsl.load"),
157 patch("lsst.ctrl.bps.parsl.workflow.set_parsl_logging"),
158 ):
159 workflow.restart()
161 mock_job.run_local.assert_not_called()
164# ---------------------------------------------------------------------------
165# load_dfk()
166# ---------------------------------------------------------------------------
169def test_load_dfk_sets_dfk():
170 """After load_dfk(), workflow.dfk is the object returned by parsl.load."""
171 workflow, _ = make_workflow()
172 fake_dfk = MagicMock()
174 with (
175 patch("parsl.load", return_value=fake_dfk),
176 patch("lsst.ctrl.bps.parsl.workflow.set_parsl_logging"),
177 ):
178 workflow.load_dfk()
180 assert workflow.dfk is fake_dfk
183def test_load_dfk_raises_if_already_started():
184 """load_dfk() raises RuntimeError when the workflow is already running."""
185 workflow, _ = make_workflow()
186 workflow.dfk = MagicMock()
188 with pytest.raises(RuntimeError, match="already started"):
189 workflow.load_dfk()
192def test_load_dfk_calls_set_parsl_logging():
193 """load_dfk() forwards the BPS config to set_parsl_logging."""
194 workflow, _ = make_workflow()
196 with (
197 patch("parsl.load"),
198 patch("lsst.ctrl.bps.parsl.workflow.set_parsl_logging") as mock_logging,
199 ):
200 workflow.load_dfk()
202 mock_logging.assert_called_once_with(workflow.bps_config)
205# ---------------------------------------------------------------------------
206# shutdown()
207# ---------------------------------------------------------------------------
210def test_shutdown_calls_dfk_cleanup():
211 """shutdown() must invoke cleanup() on the active DFK."""
212 workflow, _ = make_workflow()
213 fake_dfk = MagicMock()
214 workflow.dfk = fake_dfk
216 with patch("parsl.DataFlowKernelLoader.clear"):
217 workflow.shutdown()
219 fake_dfk.cleanup.assert_called_once_with()
222def test_shutdown_clears_dfk():
223 """After shutdown(), workflow.dfk is None."""
224 workflow, _ = make_workflow()
225 workflow.dfk = MagicMock()
227 with patch("parsl.DataFlowKernelLoader.clear"):
228 workflow.shutdown()
230 assert workflow.dfk is None
233def test_shutdown_calls_dfkl_clear():
234 """shutdown() must call parsl.DataFlowKernelLoader.clear()."""
235 workflow, _ = make_workflow()
236 workflow.dfk = MagicMock()
238 with patch("parsl.DataFlowKernelLoader.clear") as mock_clear:
239 workflow.shutdown()
241 mock_clear.assert_called_once_with()
244def test_shutdown_raises_if_not_started():
245 """
246 shutdown() raises RuntimeError when the workflow has not been started.
247 """
248 workflow, _ = make_workflow()
250 with pytest.raises(RuntimeError, match="not started"):
251 workflow.shutdown()