Coverage for tests / test_cliCmdPurge.py: 25%

59 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-24 08:19 +0000

1# This file is part of ctrl_mpexec. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28"""Unit tests for ctrl_mpexec CLI purge subcommand.""" 

29 

30import os 

31import unittest 

32 

33from lsst.ctrl.mpexec.cli.pipetask import cli as pipetask_cli 

34from lsst.daf.butler.cli.butler import cli as butler_cli 

35from lsst.daf.butler.cli.utils import LogCliRunner, clickResultMsg 

36from lsst.daf.butler.tests.utils import MetricTestRepo, makeTestTempDir, removeTestTempDir 

37 

38TESTDIR = os.path.abspath(os.path.dirname(__file__)) 

39 

40 

41class PurgeTest(unittest.TestCase): 

42 """Test executing "pipetask purge" commands.""" 

43 

44 def setUp(self): 

45 self.runner = LogCliRunner() 

46 

47 # this creates a repo with collections: 

48 # Name Type 

49 # ---------- ------ 

50 # ingest TAGGED 

51 # ingest/run RUN 

52 self.root = makeTestTempDir(TESTDIR) 

53 self.testRepo = MetricTestRepo( 

54 self.root, 

55 configFile=os.path.join(TESTDIR, "config/metricTestRepoButler.yaml"), 

56 ) 

57 self.enterContext(self.testRepo.butler) 

58 

59 def tearDown(self): 

60 removeTestTempDir(self.root) 

61 

62 def test_singleChain_yesNo(self): 

63 """Test removing a chain with one child, and the yes/no 

64 confirmation. 

65 """ 

66 # add the collection ingest/run to a CHAINED collection called "in" 

67 result = self.runner.invoke(butler_cli, ["collection-chain", self.root, "in", "ingest/run"]) 

68 self.assertEqual(result.exit_code, 0, clickResultMsg(result)) 

69 

70 # purge the CHAINED collection called "in", but say "no", check for 

71 # expected outputs. 

72 result = self.runner.invoke(pipetask_cli, ["purge", "-b", self.root, "in"], input="no") 

73 self.assertEqual(result.exit_code, 0, clickResultMsg(result)) 

74 self.assertIn("Will remove:\n runs: ingest/run\n chains: in\n others: \n", result.output) 

75 self.assertIn("Aborted.", result.output) 

76 

77 # purge the CHAINED collection called "in", and say "yes", check for 

78 # expected outputs. 

79 result = self.runner.invoke(pipetask_cli, ["purge", "-b", self.root, "in"], input="yes") 

80 self.assertEqual(result.exit_code, 0, clickResultMsg(result)) 

81 self.assertIn("Will remove:\n runs: ingest/run\n chains: in\n others: \n", result.output) 

82 self.assertIn("Done.", result.output) 

83 

84 def test_granparentChain_noConfirm(self): 

85 """Test removing a chain with children and grandchildren, and the 

86 --no-confirm option. 

87 """ 

88 # add the collection ingest/run to a CHAINED collection called "ing" 

89 result = self.runner.invoke(butler_cli, ["collection-chain", self.root, "ing", "ingest/run"]) 

90 self.assertEqual(result.exit_code, 0, clickResultMsg(result)) 

91 

92 # add the CHAINED collectin "ing" a CHAINED collection called "in" 

93 result = self.runner.invoke(butler_cli, ["collection-chain", self.root, "in", "ing"]) 

94 self.assertEqual(result.exit_code, 0, clickResultMsg(result)) 

95 

96 # purge the CHAINED collection called "in" with --no-confirm and check 

97 # for expected outputs. 

98 result = self.runner.invoke( 

99 pipetask_cli, 

100 ["purge", "-b", self.root, "in", "--recursive", "--no-confirm"], 

101 ) 

102 self.assertEqual(result.exit_code, 0, clickResultMsg(result)) 

103 self.assertIn("Removed:\n runs: ingest/run\n chains: in, ing\n others: \n", result.output) 

104 

105 def test_topParentWithParent(self): 

106 """Test that purging a chain with a parent fails.""" 

107 # add the collection ingest/run to a CHAINED collection called "ing" 

108 result = self.runner.invoke(butler_cli, ["collection-chain", self.root, "ing", "ingest/run"]) 

109 self.assertEqual(result.exit_code, 0, clickResultMsg(result)) 

110 

111 # add the CHAINED collectin "ing" a CHAINED collection called "in" 

112 result = self.runner.invoke(butler_cli, ["collection-chain", self.root, "in", "ing"]) 

113 self.assertEqual(result.exit_code, 0, clickResultMsg(result)) 

114 

115 # purge the CHAINED collection called "ing" and check for expected 

116 # outputs. 

117 result = self.runner.invoke( 

118 pipetask_cli, 

119 ["purge", "-b", self.root, "ing"], 

120 ) 

121 self.assertEqual(result.exit_code, 1, clickResultMsg(result)) 

122 self.assertIn( 

123 'The passed-in collection "ing" must not be contained in other collections but ' 

124 'is contained in collection(s) "in".', 

125 result.output, 

126 ) 

127 

128 def test_childWithMultipleParents(self): 

129 """Test that a child chain with multiple parents fails.""" 

130 # add the collection ingest/run to a CHAINED collection called "ing" 

131 result = self.runner.invoke(butler_cli, ["collection-chain", self.root, "ing", "ingest/run"]) 

132 self.assertEqual(result.exit_code, 0, clickResultMsg(result)) 

133 

134 # add the collectin ingest/run to a CHAINED collection called "foo" 

135 result = self.runner.invoke(butler_cli, ["collection-chain", self.root, "foo", "ingest/run"]) 

136 self.assertEqual(result.exit_code, 0, clickResultMsg(result)) 

137 

138 # purge the CHAINED collection called "ing" and check for expected 

139 # outputs. 

140 result = self.runner.invoke( 

141 pipetask_cli, 

142 ["purge", "-b", self.root, "ing"], 

143 ) 

144 self.assertEqual(result.exit_code, 1, clickResultMsg(result)) 

145 self.assertIn( 

146 'Collection "ingest/run" is in multiple chained collections:', 

147 result.output, 

148 ) 

149 self.assertIn('"foo"', result.output) 

150 self.assertIn('"ing"', result.output) 

151 

152 def test_notFound_notChained(self): 

153 """Test for failure when the top level collection is not found, 

154 and when a top level connection is not a CHAINED collection. 

155 """ 

156 # Test purging a collection that does not exist. 

157 result = self.runner.invoke( 

158 pipetask_cli, 

159 ["purge", "-b", self.root, "notACollection"], 

160 ) 

161 self.assertEqual(result.exit_code, 1, clickResultMsg(result)) 

162 self.assertIn( 

163 'The passed-in collection "notACollection" was not found.', 

164 result.output, 

165 ) 

166 

167 # Test purging a collection that is not a CHAINED collection. 

168 result = self.runner.invoke( 

169 pipetask_cli, 

170 ["purge", "-b", self.root, "ingest/run"], 

171 ) 

172 self.assertEqual(result.exit_code, 1, clickResultMsg(result)) 

173 self.assertIn( 

174 'The passed-in collection must be a CHAINED collection; "ingest/run" is a RUN collection.', 

175 result.output, 

176 ) 

177 

178 

179if __name__ == "__main__": 

180 unittest.main()