Coverage for tests / test_qgraph.py: 21%

68 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:48 +0000

1# This file is part of ctrl_mpexec. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28"""Unit tests for ctrl_mpexec CLI qgraph subcommand.""" 

29 

30import os 

31import textwrap 

32import unittest 

33 

34import click.testing 

35 

36from lsst.ctrl.mpexec.cli import opt, script 

37from lsst.ctrl.mpexec.cli.pipetask import cli as pipetask_cli 

38from lsst.daf.butler.cli.utils import LogCliRunner, clickResultMsg 

39from lsst.pipe.base.quantum_reports import Report 

40from lsst.pipe.base.tests.mocks import DirectButlerRepo 

41 

42 

43class QgraphTest(unittest.TestCase): 

44 """Tests for the "pipetask qgraph" command. 

45 

46 Note that significant coverage for the qgraph command is also provided by 

47 test_run.py, since those tests generally build a quantum graph and then run 

48 it. 

49 """ 

50 

51 def test_qgraph_summary(self): 

52 """Test reading a saved graph and writing a summary.""" 

53 with DirectButlerRepo.make_temporary() as (helper, root): 

54 helper.add_task() 

55 helper.add_task() 

56 qgc = helper.make_quantum_graph_builder().finish(attach_datastore_records=False) 

57 graph_uri = os.path.join(root, "graph.qg") 

58 qgc.write(graph_uri) 

59 test_filename = os.path.join(root, "summary.json") 

60 runner = LogCliRunner() 

61 result = runner.invoke( 

62 pipetask_cli, 

63 ["qgraph", "--butler-config", root, "--qgraph", graph_uri, "--summary", test_filename], 

64 input="no", 

65 ) 

66 # Check that we can read from the command line 

67 self.assertEqual(result.exit_code, 0, clickResultMsg(result)) 

68 # Check that we can open and read the file produced by make_reports 

69 with open(test_filename) as f: 

70 summary = Report.model_validate_json(f.read()) 

71 self.assertEqual(summary.qgraphSummary.outputRun, "output_run") 

72 self.assertEqual(len(summary.qgraphSummary.qgraphTaskSummaries), 2) 

73 

74 def test_partial_read_counts(self): 

75 """Test reading only one node from a saved graph and check that the 

76 printed task/quantum counts reflect only that node. 

77 """ 

78 with DirectButlerRepo.make_temporary() as (helper, root): 

79 helper.add_task() 

80 helper.add_task() 

81 qgc = helper.make_quantum_graph_builder().finish(attach_datastore_records=False) 

82 quantum_id = next(iter(qgc.quantum_datasets.keys())) 

83 graph_uri = os.path.join(root, "graph.qg") 

84 qgc.write(graph_uri) 

85 log_filename = os.path.join(root, "qgraph.log") 

86 runner = LogCliRunner() 

87 result = runner.invoke( 

88 pipetask_cli, 

89 [ 

90 "--log-level", 

91 "lsst.ctrl.mpexec.cli.utils=INFO", 

92 "--log-file", 

93 log_filename, 

94 "qgraph", 

95 "--butler-config", 

96 root, 

97 "--qgraph", 

98 graph_uri, 

99 "--qgraph-node-id", 

100 str(quantum_id), 

101 ], 

102 input="no", 

103 ) 

104 self.assertEqual(result.exit_code, 0, clickResultMsg(result)) 

105 with open(log_filename) as log_file: 

106 log = log_file.read() 

107 self.assertIn("1 quantum for 1 task", log) 

108 

109 def test_qgraph_show(self): 

110 with DirectButlerRepo.make_temporary() as (helper, root): 

111 helper.add_task() 

112 helper.add_task() 

113 qgc = helper.make_quantum_graph_builder().finish(attach_datastore_records=False) 

114 graph_uri = os.path.join(root, "graph.qg") 

115 qgc.write(graph_uri) 

116 runner = LogCliRunner() 

117 result = runner.invoke( 

118 pipetask_cli, 

119 ["qgraph", "--butler-config", root, "--qgraph", graph_uri, "--show", "graph"], 

120 input="no", 

121 ) 

122 self.assertIn( 

123 "task_auto1 (lsst.pipe.base.tests.mocks.DynamicTestPipelineTask)", 

124 result.output, 

125 ) 

126 self.assertIn( 

127 "task_auto2 (lsst.pipe.base.tests.mocks.DynamicTestPipelineTask)", 

128 result.output, 

129 ) 

130 self.assertIn("DatasetType('dataset_auto0', {}, _mock_StructuredDataDict)", result.output) 

131 self.assertIn("DatasetType('dataset_auto1', {}, _mock_StructuredDataDict)", result.output) 

132 self.assertIn("DatasetType('dataset_auto2', {}, _mock_StructuredDataDict)", result.output) 

133 result = runner.invoke( 

134 pipetask_cli, 

135 ["qgraph", "--butler-config", root, "--qgraph", graph_uri, "--show", "workflow"], 

136 input="no", 

137 ) 

138 id1, id2 = qgc.quantum_datasets.keys() 

139 self.assertEqual( 

140 result.output.strip(), 

141 textwrap.dedent(f""" 

142 Quantum {id1}: lsst.pipe.base.tests.mocks.DynamicTestPipelineTask 

143 Quantum {id2}: lsst.pipe.base.tests.mocks.DynamicTestPipelineTask 

144 Parent Quantum {id1} - Child Quantum {id2} 

145 """).strip(), 

146 ) 

147 result = runner.invoke( 

148 pipetask_cli, 

149 ["qgraph", "--butler-config", root, "--qgraph", graph_uri, "--show", "uri"], 

150 input="no", 

151 ) 

152 self.assertIn( 

153 f"Quantum {id1}: lsst.pipe.base.tests.mocks.DynamicTestPipelineTask", 

154 result.output, 

155 ) 

156 self.assertIn( 

157 f"Quantum {id2}: lsst.pipe.base.tests.mocks.DynamicTestPipelineTask", 

158 result.output, 

159 ) 

160 

161 def test_missing_option(self): 

162 """Test that if options for the qgraph script are missing that it 

163 fails. 

164 """ 

165 

166 @click.command() 

167 @opt.qgraph_options() 

168 def cli(**kwargs): 

169 script.qgraph(**kwargs) 

170 

171 runner = click.testing.CliRunner() 

172 result = runner.invoke(cli) 

173 # The cli call should fail, because qgraph takes more options than are 

174 # defined by qgraph_options. 

175 self.assertNotEqual(result.exit_code, 0) 

176 

177 

178if __name__ == "__main__": 

179 unittest.main()