Coverage for tests / test_single_quantum_executor.py: 15%

86 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-06 08:32 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <https://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30import os 

31import time 

32import unittest 

33 

34import lsst.pipe.base.automatic_connection_constants as acc 

35from lsst.pipe.base.resource_usage import QuantumResourceUsage 

36from lsst.pipe.base.single_quantum_executor import SingleQuantumExecutor 

37from lsst.pipe.base.tests.mocks import InMemoryRepo 

38 

39TESTDIR = os.path.abspath(os.path.dirname(__file__)) 

40 

41 

42class SingleQuantumExecutorTestCase(unittest.TestCase): 

43 """Tests for SingleQuantumExecutor implementation.""" 

44 

45 def test_simple_execute(self) -> None: 

46 """Run execute() method in simplest setup.""" 

47 helper = InMemoryRepo("base.yaml") 

48 self.enterContext(helper) 

49 helper.add_task() 

50 qgraph = helper.make_quantum_graph_builder().build(attach_datastore_records=False) 

51 executor, butler = helper.make_single_quantum_executor() 

52 nQuanta = 1 

53 nodes = list(qgraph) 

54 self.assertEqual(len(nodes), nQuanta) 

55 node = nodes[0] 

56 t1 = time.time() 

57 executor.execute(node.task_node, node.quantum) 

58 t2 = time.time() 

59 # There must be one dataset of task's output connection 

60 self.assertEqual(len(butler.get_datasets("dataset_auto1")), 1) 

61 # Test that we can construct resource usage information from the 

62 # metadata. 

63 (md,) = butler.get_datasets(acc.METADATA_OUTPUT_TEMPLATE.format(label="task_auto1")).values() 

64 ru = QuantumResourceUsage.from_task_metadata(md) 

65 self.assertIsNotNone(ru) 

66 self.assertGreater(ru.memory, 0) 

67 self.assertGreater(ru.prep_time, 0) 

68 self.assertGreater(ru.init_time, 0) 

69 self.assertGreater(ru.run_time, 0) 

70 self.assertGreater(ru.run_time_cpu, 0) 

71 self.assertGreater(ru.total_time, 0) 

72 self.assertLess(ru.total_time, t2 - t1) 

73 

74 def test_skip_existing_execute(self) -> None: 

75 """Run execute() method twice, with skip_existing_in.""" 

76 helper = InMemoryRepo("base.yaml") 

77 self.enterContext(helper) 

78 helper.add_task() 

79 qgraph = helper.make_quantum_graph_builder().build(attach_datastore_records=False) 

80 executor, butler = helper.make_single_quantum_executor() 

81 nQuanta = 1 

82 nodes = list(qgraph) 

83 self.assertEqual(len(nodes), nQuanta) 

84 node = nodes[0] 

85 executor.execute(node.task_node, node.quantum) 

86 

87 outputs1 = butler.get_datasets("dataset_auto1") 

88 self.assertEqual(len(outputs1), 1) 

89 ref1, obj1 = outputs1.popitem() 

90 

91 # Re-run it with skip_existing, it should not run. Note that if it did 

92 # run (and called 'butler.put') that would raise an exception. 

93 executor = SingleQuantumExecutor(limited_butler_factory=butler.factory, skip_existing=True) 

94 executor.execute(node.task_node, node.quantum) 

95 

96 outputs2 = butler.get_datasets("dataset_auto1") 

97 self.assertEqual(len(outputs2), 1) 

98 ref2, obj2 = outputs2.popitem() 

99 self.assertEqual(ref1, ref2) 

100 # Objects should be the same (but not identities, because the butler 

101 # will copy them). 

102 self.assertEqual(obj1, obj2) 

103 

104 def test_clobber_outputs_execute(self) -> None: 

105 """Run execute() method twice, with clobber_outputs.""" 

106 helper = InMemoryRepo("base.yaml") 

107 self.enterContext(helper) 

108 helper.add_task() 

109 qgraph = helper.make_quantum_graph_builder().build(attach_datastore_records=False) 

110 executor, butler = helper.make_single_quantum_executor() 

111 nQuanta = 1 

112 nodes = list(qgraph) 

113 self.assertEqual(len(nodes), nQuanta) 

114 node = nodes[0] 

115 executor.execute(node.task_node, node.quantum) 

116 

117 outputs1 = butler.get_datasets("dataset_auto1") 

118 self.assertEqual(len(outputs1), 1) 

119 ref1, obj1 = outputs1.popitem() 

120 

121 # Remove the dataset ourself, and replace it with something 

122 # different so we can check later whether it got replaced. 

123 butler.pruneDatasets([ref1], disassociate=False, unstore=True, purge=False) 

124 obj1.quantum = None 

125 butler.put(obj1, ref1) 

126 

127 # Re-run it with clobber_outputs and skip_existing, it should not 

128 # clobber but should skip instead. 

129 executor = SingleQuantumExecutor( 

130 limited_butler_factory=butler.factory, skip_existing=True, clobber_outputs=True 

131 ) 

132 executor.execute(node.task_node, node.quantum) 

133 outputs2 = butler.get_datasets("dataset_auto1") 

134 self.assertEqual(len(outputs2), 1) 

135 ref2, obj2 = outputs2.popitem() 

136 self.assertEqual(ref1, ref2) 

137 self.assertEqual(obj1, obj2) 

138 

139 # Re-run it with clobber_outputs but without skip_existing_in, it 

140 # should clobber. 

141 executor = SingleQuantumExecutor(limited_butler_factory=butler.factory, clobber_outputs=True) 

142 executor.execute(node.task_node, node.quantum) 

143 outputs3 = butler.get_datasets("dataset_auto1") 

144 self.assertEqual(len(outputs3), 1) 

145 ref3, obj3 = outputs3.popitem() 

146 self.assertEqual(ref1, ref3) 

147 self.assertNotEqual(obj1, obj3) 

148 

149 

150if __name__ == "__main__": 

151 unittest.main()