Coverage for tests / test_build.py: 20%

128 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-18 08:45 +0000

1# This file is part of ctrl_mpexec. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <https://www.gnu.org/licenses/>. 

27 

28import contextlib 

29import os 

30import tempfile 

31import textwrap 

32import unittest 

33import unittest.mock 

34 

35import click.testing 

36 

37import lsst.utils.tests 

38from lsst.ctrl.mpexec.cli import opt, script 

39from lsst.ctrl.mpexec.cli.pipetask import cli as pipetaskCli 

40from lsst.ctrl.mpexec.cli.utils import ( 

41 PipetaskCommand, 

42 collect_pipeline_actions, 

43) 

44from lsst.ctrl.mpexec.showInfo import ShowInfo 

45from lsst.daf.butler.cli.utils import LogCliRunner, clickResultMsg 

46from lsst.pipe.base import Pipeline 

47 

48 

49# I am not positive this adds value vs. `tempfile.NamedTemporaryFile`, but 

50# given the trouble we've had with temporary file/directory cleanup edge cases 

51# in CI, I'm not inclined to remove it (it was lifted from test_cmdLineFwk.py, 

52# which is going away). 

53@contextlib.contextmanager 

54def make_tmp_file(contents=None, suffix=None): 

55 """Context manager for generating temporary file name. 

56 

57 Temporary file is deleted on exiting context. 

58 

59 Parameters 

60 ---------- 

61 contents : `bytes` or `None`, optional 

62 Data to write into a file. 

63 suffix : `str` or `None`, optional 

64 Suffix to use for temporary file. 

65 

66 Yields 

67 ------ 

68 `str` 

69 Name of the temporary file. 

70 """ 

71 fd, tmpname = tempfile.mkstemp(suffix=suffix) 

72 if contents: 

73 os.write(fd, contents) 

74 os.close(fd) 

75 yield tmpname 

76 with contextlib.suppress(OSError): 

77 os.remove(tmpname) 

78 

79 

80class BuildTestCase(unittest.TestCase): 

81 """Test a few of the inputs to the build script function to test basic 

82 functionality. 

83 """ 

84 

85 @staticmethod 

86 def _make_args(*args: str) -> dict[str, object]: 

87 mock = unittest.mock.Mock() 

88 

89 @click.command(cls=PipetaskCommand) 

90 @click.pass_context 

91 @opt.pipeline_build_options() 

92 def fake_build(ctx: click.Context, **kwargs: object): 

93 kwargs = collect_pipeline_actions(ctx, **kwargs) 

94 mock(**kwargs) 

95 

96 runner = click.testing.CliRunner() 

97 result = runner.invoke(fake_build, args, catch_exceptions=False) 

98 if result.exit_code != 0: 

99 raise RuntimeError(f"Failure getting default args for 'build': {result}") 

100 mock.assert_called_once() 

101 result: dict[str, object] = mock.call_args[1] 

102 result["show"] = ShowInfo([]) 

103 return result 

104 

105 def test_make_empty_pipeline(self): 

106 """Test building a pipeline with default arguments, saving it, and 

107 reading it. 

108 """ 

109 with make_tmp_file() as tmpname: 

110 # make empty pipeline and store it in a file 

111 pgf = script.build(**self._make_args("--save-pipeline", tmpname)) 

112 self.assertIsInstance(pgf.pipeline, Pipeline) 

113 # read pipeline from a file 

114 pgf = script.build(**self._make_args("--pipeline", tmpname)) 

115 self.assertIsInstance(pgf.pipeline, Pipeline) 

116 self.assertEqual(len(pgf.pipeline), 0) 

117 

118 def test_single_task_pipeline(self): 

119 """Test building a pipeline with a single task.""" 

120 pgf = script.build(**self._make_args("-t", "lsst.pipe.base.tests.mocks.DynamicTestPipelineTask:a")) 

121 self.assertIsInstance(pgf.pipeline, Pipeline) 

122 self.assertEqual(len(pgf.pipeline), 1) 

123 

124 def test_multi_task_pipeline(self): 

125 """Test building a pipeline with multiple tasks.""" 

126 pgf = script.build( 

127 **self._make_args( 

128 "-t", 

129 "lsst.pipe.base.tests.mocks.DynamicTestPipelineTask:a", 

130 "-t", 

131 "lsst.pipe.base.tests.mocks.DynamicTestPipelineTask:b", 

132 "-t", 

133 "lsst.pipe.base.tests.mocks.DynamicTestPipelineTask:c", 

134 ) 

135 ) 

136 self.assertIsInstance(pgf.pipeline, Pipeline) 

137 self.assertEqual(len(pgf.pipeline), 3) 

138 self.assertEqual(pgf.pipeline.task_labels, {"a", "b", "c"}) 

139 

140 def test_config(self): 

141 """Test building a pipeline with a config override.""" 

142 pgf = script.build( 

143 **self._make_args( 

144 "-t", 

145 "lsst.pipe.base.tests.mocks.DynamicTestPipelineTask:a", 

146 "-c", 

147 "a:int_value=100", 

148 ) 

149 ) 

150 pipeline_graph = pgf() 

151 self.assertEqual(len(pipeline_graph.tasks), 1) 

152 self.assertEqual(next(iter(pipeline_graph.tasks.values())).config.int_value, 100) 

153 

154 def test_config_file(self): 

155 """Test building a pipeline with a config file override.""" 

156 overrides = b"config.int_value = 1000\n" 

157 with make_tmp_file(overrides) as tmpname: 

158 pgf = script.build( 

159 **self._make_args( 

160 "-t", 

161 "lsst.pipe.base.tests.mocks.DynamicTestPipelineTask:a", 

162 "-C", 

163 f"a:{tmpname}", 

164 ) 

165 ) 

166 pipeline_graph = pgf() 

167 self.assertEqual(len(pipeline_graph.tasks), 1) 

168 self.assertEqual(next(iter(pipeline_graph.tasks.values())).config.int_value, 1000) 

169 

170 def test_build_show(self): 

171 """Test the --show option on the build subcommand.""" 

172 # This pipeline YAML snippet is formatted exactly the way 

173 # '--show pipeline' writes it back out. 

174 pipeline_data = textwrap.dedent(""" 

175 description: single-task test pipeline 

176 tasks: 

177 t: 

178 class: lsst.pipe.base.tests.mocks.DynamicTestPipelineTask 

179 config: 

180 - python: |- 

181 from lsst.pipe.base.tests.mocks import DynamicConnectionConfig 

182 config.inputs["a"] = DynamicConnectionConfig(dataset_type_name="d1") 

183 config.outputs["b"] = DynamicConnectionConfig(dataset_type_name="d2") 

184 int_value: 100 

185 subsets: 

186 test_subset: 

187 subset: 

188 - t 

189 """).strip() 

190 runner = LogCliRunner() 

191 with make_tmp_file(pipeline_data.encode()) as pipeline_file: 

192 result = runner.invoke(pipetaskCli, ["build", "-p", pipeline_file, "--show", "pipeline"]) 

193 self.assertEqual(result.exit_code, 0, clickResultMsg(result)) 

194 self.assertIn(pipeline_data, result.output) 

195 

196 result = runner.invoke(pipetaskCli, ["build", "-p", pipeline_file, "--show", "config"]) 

197 self.assertEqual(result.exit_code, 0, clickResultMsg(result)) 

198 self.assertIn("### Configuration for task `t'", result.output) 

199 self.assertIn("config.int_value=100", result.output) 

200 self.assertIn("config.inputs['a'].dataset_type_name='d1'", result.output) 

201 

202 result = runner.invoke( 

203 pipetaskCli, ["build", "-p", pipeline_file, "--show", "history=t::int_value"] 

204 ) 

205 self.assertEqual(result.exit_code, 0, clickResultMsg(result)) 

206 # history will contain machine-specific paths, TBD how to verify 

207 

208 result = runner.invoke(pipetaskCli, ["build", "-p", pipeline_file, "--show", "pipeline-graph"]) 

209 self.assertEqual(result.exit_code, 0, clickResultMsg(result)) 

210 self.assertIn( 

211 textwrap.dedent(""" 

212 ○ d1: {} _mock_StructuredDataDict 

213 

214 ■ t: {} 

215 

216 ○ d2: {} _mock_StructuredDataDict 

217 """).strip(), 

218 result.output, 

219 ) 

220 

221 result = runner.invoke(pipetaskCli, ["build", "-p", pipeline_file, "--show", "task-graph"]) 

222 self.assertEqual(result.exit_code, 0, clickResultMsg(result)) 

223 self.assertIn("■ t: {}", result.output) 

224 

225 # Trying to show the pipeline with --select-tasks should fail, 

226 # because --select-tasks acts on the PipelineGraph and hence 

227 # wouldn't affect the YAML and that'd be confusing. 

228 result = runner.invoke( 

229 pipetaskCli, 

230 [ 

231 "build", 

232 "-p", 

233 pipeline_file, 

234 "--show", 

235 "pipeline", 

236 "--select-tasks", 

237 ">=task", 

238 ], 

239 ) 

240 self.assertEqual(result.exit_code, 1) 

241 

242 result = runner.invoke( 

243 pipetaskCli, ["build", "-p", pipeline_file, "--show", "config=t::int_Value:NOIGNORECASE"] 

244 ) 

245 self.assertEqual(result.exit_code, 0, clickResultMsg(result)) 

246 self.assertEqual( 

247 "### Configuration for task `t'", result.output.strip() 

248 ) # No match for the field. 

249 

250 result = runner.invoke( 

251 pipetaskCli, ["build", "-p", pipeline_file, "--show", "config=t::int_Value"] 

252 ) 

253 self.assertEqual(result.exit_code, 0, clickResultMsg(result)) 

254 self.assertIn("### Configuration for task `t'", result.output) # Matches... 

255 self.assertIn("config.int_value=100", result.output) 

256 self.assertIn("NOIGNORECASE", result.output) # ...but warns and recommends NOIGNORECASE 

257 

258 result = runner.invoke(pipetaskCli, ["build", "-p", pipeline_file, "--show", "dump-config=b"]) 

259 self.assertNotEqual(result.exit_code, 0) 

260 self.assertIn("Pipeline has no tasks named b", result.output) 

261 

262 result = runner.invoke(pipetaskCli, ["build", "-p", pipeline_file, "--show", "history"]) 

263 self.assertNotEqual(result.exit_code, 0) 

264 self.assertIn("Please provide a value", result.output) 

265 

266 result = runner.invoke(pipetaskCli, ["build", "-p", pipeline_file, "--show", "history=b::param"]) 

267 self.assertNotEqual(result.exit_code, 0) 

268 self.assertIn("Pipeline has no tasks named b", result.output) 

269 

270 result = runner.invoke(pipetaskCli, ["build", "-p", pipeline_file, "--show", "subsets"]) 

271 self.assertEqual(result.exit_code, 0) 

272 self.assertIn("test_subset", result.output) 

273 

274 result = runner.invoke(pipetaskCli, ["build", "-p", pipeline_file, "--show", "tasks"]) 

275 self.assertEqual(result.exit_code, 0) 

276 self.assertIn( 

277 "### Subtasks for task `lsst.pipe.base.tests.mocks.DynamicTestPipelineTask'", result.output 

278 ) 

279 

280 result = runner.invoke(pipetaskCli, ["build", "-p", pipeline_file, "--show", "inputs"]) 

281 self.assertEqual(result.exit_code, 0) 

282 self.assertIn("d1", result.output) 

283 

284 def test_show_unrecognized(self): 

285 """Test that ShowInfo raises when given unrecognized commands.""" 

286 with self.assertRaises(ValueError): 

287 ShowInfo(["unrecognized", "config"]) 

288 

289 def test_missing_option(self): 

290 """Test that the build script fails if options are missing.""" 

291 

292 @click.command() 

293 @opt.pipeline_build_options() 

294 def cli(**kwargs): 

295 script.build(**kwargs) 

296 

297 runner = click.testing.CliRunner() 

298 result = runner.invoke(cli) 

299 # The cli call should fail, because script.build takes more options 

300 # than are defined by pipeline_build_options. 

301 self.assertNotEqual(result.exit_code, 0) 

302 

303 

304if __name__ == "__main__": 

305 lsst.utils.tests.init() 

306 unittest.main()