Coverage for tests / test_build.py: 20%
128 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 08:45 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 08:45 +0000
1# This file is part of ctrl_mpexec.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <https://www.gnu.org/licenses/>.
28import contextlib
29import os
30import tempfile
31import textwrap
32import unittest
33import unittest.mock
35import click.testing
37import lsst.utils.tests
38from lsst.ctrl.mpexec.cli import opt, script
39from lsst.ctrl.mpexec.cli.pipetask import cli as pipetaskCli
40from lsst.ctrl.mpexec.cli.utils import (
41 PipetaskCommand,
42 collect_pipeline_actions,
43)
44from lsst.ctrl.mpexec.showInfo import ShowInfo
45from lsst.daf.butler.cli.utils import LogCliRunner, clickResultMsg
46from lsst.pipe.base import Pipeline
49# I am not positive this adds value vs. `tempfile.NamedTemporaryFile`, but
50# given the trouble we've had with temporary file/directory cleanup edge cases
51# in CI, I'm not inclined to remove it (it was lifted from test_cmdLineFwk.py,
52# which is going away).
53@contextlib.contextmanager
54def make_tmp_file(contents=None, suffix=None):
55 """Context manager for generating temporary file name.
57 Temporary file is deleted on exiting context.
59 Parameters
60 ----------
61 contents : `bytes` or `None`, optional
62 Data to write into a file.
63 suffix : `str` or `None`, optional
64 Suffix to use for temporary file.
66 Yields
67 ------
68 `str`
69 Name of the temporary file.
70 """
71 fd, tmpname = tempfile.mkstemp(suffix=suffix)
72 if contents:
73 os.write(fd, contents)
74 os.close(fd)
75 yield tmpname
76 with contextlib.suppress(OSError):
77 os.remove(tmpname)
80class BuildTestCase(unittest.TestCase):
81 """Test a few of the inputs to the build script function to test basic
82 functionality.
83 """
85 @staticmethod
86 def _make_args(*args: str) -> dict[str, object]:
87 mock = unittest.mock.Mock()
89 @click.command(cls=PipetaskCommand)
90 @click.pass_context
91 @opt.pipeline_build_options()
92 def fake_build(ctx: click.Context, **kwargs: object):
93 kwargs = collect_pipeline_actions(ctx, **kwargs)
94 mock(**kwargs)
96 runner = click.testing.CliRunner()
97 result = runner.invoke(fake_build, args, catch_exceptions=False)
98 if result.exit_code != 0:
99 raise RuntimeError(f"Failure getting default args for 'build': {result}")
100 mock.assert_called_once()
101 result: dict[str, object] = mock.call_args[1]
102 result["show"] = ShowInfo([])
103 return result
105 def test_make_empty_pipeline(self):
106 """Test building a pipeline with default arguments, saving it, and
107 reading it.
108 """
109 with make_tmp_file() as tmpname:
110 # make empty pipeline and store it in a file
111 pgf = script.build(**self._make_args("--save-pipeline", tmpname))
112 self.assertIsInstance(pgf.pipeline, Pipeline)
113 # read pipeline from a file
114 pgf = script.build(**self._make_args("--pipeline", tmpname))
115 self.assertIsInstance(pgf.pipeline, Pipeline)
116 self.assertEqual(len(pgf.pipeline), 0)
118 def test_single_task_pipeline(self):
119 """Test building a pipeline with a single task."""
120 pgf = script.build(**self._make_args("-t", "lsst.pipe.base.tests.mocks.DynamicTestPipelineTask:a"))
121 self.assertIsInstance(pgf.pipeline, Pipeline)
122 self.assertEqual(len(pgf.pipeline), 1)
124 def test_multi_task_pipeline(self):
125 """Test building a pipeline with multiple tasks."""
126 pgf = script.build(
127 **self._make_args(
128 "-t",
129 "lsst.pipe.base.tests.mocks.DynamicTestPipelineTask:a",
130 "-t",
131 "lsst.pipe.base.tests.mocks.DynamicTestPipelineTask:b",
132 "-t",
133 "lsst.pipe.base.tests.mocks.DynamicTestPipelineTask:c",
134 )
135 )
136 self.assertIsInstance(pgf.pipeline, Pipeline)
137 self.assertEqual(len(pgf.pipeline), 3)
138 self.assertEqual(pgf.pipeline.task_labels, {"a", "b", "c"})
140 def test_config(self):
141 """Test building a pipeline with a config override."""
142 pgf = script.build(
143 **self._make_args(
144 "-t",
145 "lsst.pipe.base.tests.mocks.DynamicTestPipelineTask:a",
146 "-c",
147 "a:int_value=100",
148 )
149 )
150 pipeline_graph = pgf()
151 self.assertEqual(len(pipeline_graph.tasks), 1)
152 self.assertEqual(next(iter(pipeline_graph.tasks.values())).config.int_value, 100)
154 def test_config_file(self):
155 """Test building a pipeline with a config file override."""
156 overrides = b"config.int_value = 1000\n"
157 with make_tmp_file(overrides) as tmpname:
158 pgf = script.build(
159 **self._make_args(
160 "-t",
161 "lsst.pipe.base.tests.mocks.DynamicTestPipelineTask:a",
162 "-C",
163 f"a:{tmpname}",
164 )
165 )
166 pipeline_graph = pgf()
167 self.assertEqual(len(pipeline_graph.tasks), 1)
168 self.assertEqual(next(iter(pipeline_graph.tasks.values())).config.int_value, 1000)
170 def test_build_show(self):
171 """Test the --show option on the build subcommand."""
172 # This pipeline YAML snippet is formatted exactly the way
173 # '--show pipeline' writes it back out.
174 pipeline_data = textwrap.dedent("""
175 description: single-task test pipeline
176 tasks:
177 t:
178 class: lsst.pipe.base.tests.mocks.DynamicTestPipelineTask
179 config:
180 - python: |-
181 from lsst.pipe.base.tests.mocks import DynamicConnectionConfig
182 config.inputs["a"] = DynamicConnectionConfig(dataset_type_name="d1")
183 config.outputs["b"] = DynamicConnectionConfig(dataset_type_name="d2")
184 int_value: 100
185 subsets:
186 test_subset:
187 subset:
188 - t
189 """).strip()
190 runner = LogCliRunner()
191 with make_tmp_file(pipeline_data.encode()) as pipeline_file:
192 result = runner.invoke(pipetaskCli, ["build", "-p", pipeline_file, "--show", "pipeline"])
193 self.assertEqual(result.exit_code, 0, clickResultMsg(result))
194 self.assertIn(pipeline_data, result.output)
196 result = runner.invoke(pipetaskCli, ["build", "-p", pipeline_file, "--show", "config"])
197 self.assertEqual(result.exit_code, 0, clickResultMsg(result))
198 self.assertIn("### Configuration for task `t'", result.output)
199 self.assertIn("config.int_value=100", result.output)
200 self.assertIn("config.inputs['a'].dataset_type_name='d1'", result.output)
202 result = runner.invoke(
203 pipetaskCli, ["build", "-p", pipeline_file, "--show", "history=t::int_value"]
204 )
205 self.assertEqual(result.exit_code, 0, clickResultMsg(result))
206 # history will contain machine-specific paths, TBD how to verify
208 result = runner.invoke(pipetaskCli, ["build", "-p", pipeline_file, "--show", "pipeline-graph"])
209 self.assertEqual(result.exit_code, 0, clickResultMsg(result))
210 self.assertIn(
211 textwrap.dedent("""
212 ○ d1: {} _mock_StructuredDataDict
213 │
214 ■ t: {}
215 │
216 ○ d2: {} _mock_StructuredDataDict
217 """).strip(),
218 result.output,
219 )
221 result = runner.invoke(pipetaskCli, ["build", "-p", pipeline_file, "--show", "task-graph"])
222 self.assertEqual(result.exit_code, 0, clickResultMsg(result))
223 self.assertIn("■ t: {}", result.output)
225 # Trying to show the pipeline with --select-tasks should fail,
226 # because --select-tasks acts on the PipelineGraph and hence
227 # wouldn't affect the YAML and that'd be confusing.
228 result = runner.invoke(
229 pipetaskCli,
230 [
231 "build",
232 "-p",
233 pipeline_file,
234 "--show",
235 "pipeline",
236 "--select-tasks",
237 ">=task",
238 ],
239 )
240 self.assertEqual(result.exit_code, 1)
242 result = runner.invoke(
243 pipetaskCli, ["build", "-p", pipeline_file, "--show", "config=t::int_Value:NOIGNORECASE"]
244 )
245 self.assertEqual(result.exit_code, 0, clickResultMsg(result))
246 self.assertEqual(
247 "### Configuration for task `t'", result.output.strip()
248 ) # No match for the field.
250 result = runner.invoke(
251 pipetaskCli, ["build", "-p", pipeline_file, "--show", "config=t::int_Value"]
252 )
253 self.assertEqual(result.exit_code, 0, clickResultMsg(result))
254 self.assertIn("### Configuration for task `t'", result.output) # Matches...
255 self.assertIn("config.int_value=100", result.output)
256 self.assertIn("NOIGNORECASE", result.output) # ...but warns and recommends NOIGNORECASE
258 result = runner.invoke(pipetaskCli, ["build", "-p", pipeline_file, "--show", "dump-config=b"])
259 self.assertNotEqual(result.exit_code, 0)
260 self.assertIn("Pipeline has no tasks named b", result.output)
262 result = runner.invoke(pipetaskCli, ["build", "-p", pipeline_file, "--show", "history"])
263 self.assertNotEqual(result.exit_code, 0)
264 self.assertIn("Please provide a value", result.output)
266 result = runner.invoke(pipetaskCli, ["build", "-p", pipeline_file, "--show", "history=b::param"])
267 self.assertNotEqual(result.exit_code, 0)
268 self.assertIn("Pipeline has no tasks named b", result.output)
270 result = runner.invoke(pipetaskCli, ["build", "-p", pipeline_file, "--show", "subsets"])
271 self.assertEqual(result.exit_code, 0)
272 self.assertIn("test_subset", result.output)
274 result = runner.invoke(pipetaskCli, ["build", "-p", pipeline_file, "--show", "tasks"])
275 self.assertEqual(result.exit_code, 0)
276 self.assertIn(
277 "### Subtasks for task `lsst.pipe.base.tests.mocks.DynamicTestPipelineTask'", result.output
278 )
280 result = runner.invoke(pipetaskCli, ["build", "-p", pipeline_file, "--show", "inputs"])
281 self.assertEqual(result.exit_code, 0)
282 self.assertIn("d1", result.output)
284 def test_show_unrecognized(self):
285 """Test that ShowInfo raises when given unrecognized commands."""
286 with self.assertRaises(ValueError):
287 ShowInfo(["unrecognized", "config"])
289 def test_missing_option(self):
290 """Test that the build script fails if options are missing."""
292 @click.command()
293 @opt.pipeline_build_options()
294 def cli(**kwargs):
295 script.build(**kwargs)
297 runner = click.testing.CliRunner()
298 result = runner.invoke(cli)
299 # The cli call should fail, because script.build takes more options
300 # than are defined by pipeline_build_options.
301 self.assertNotEqual(result.exit_code, 0)
304if __name__ == "__main__":
305 lsst.utils.tests.init()
306 unittest.main()