Coverage for tests / test_blocking_limited_butler.py: 26%

46 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-01 08:20 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <https://www.gnu.org/licenses/>. 

27 

28"""Tests for execution butler.""" 

29 

30import logging 

31import os 

32import unittest 

33 

34import lsst.utils.tests 

35from lsst.daf.butler import DataCoordinate, DatasetRef 

36from lsst.pipe.base.blocking_limited_butler import _LOG, BlockingLimitedButler 

37from lsst.pipe.base.tests.mocks import InMemoryRepo 

38 

39TESTDIR = os.path.abspath(os.path.dirname(__file__)) 

40 

41 

42class BlockingLimitedButlerTestCase(unittest.TestCase): 

43 """Unit tests for BlockingLimitedButler""" 

44 

45 def test_no_block_nonexistent(self): 

46 """Test checking/getting with no dataset and blocking disabled.""" 

47 helper = InMemoryRepo("base.yaml") 

48 helper.add_task() 

49 helper.pipeline_graph.resolve(helper.butler.registry) 

50 ref = DatasetRef( 

51 helper.pipeline_graph.dataset_types["dataset_auto0"].dataset_type, 

52 DataCoordinate.make_empty(helper.butler.dimensions), 

53 run="input_run", 

54 ) 

55 helper.pipeline_graph.register_dataset_types(helper.butler) 

56 in_memory_butler = helper.make_limited_butler() 

57 blocking_butler = BlockingLimitedButler(in_memory_butler, timeouts={}) 

58 with self.assertNoLogs(_LOG, level=logging.INFO): 

59 self.assertFalse(blocking_butler.stored_many([ref])[ref]) 

60 with self.assertRaises(FileNotFoundError): 

61 blocking_butler.get(ref) 

62 

63 def test_timeout_nonexistent(self): 

64 """Test checking/getting with no dataset, leading to a timeout.""" 

65 helper = InMemoryRepo("base.yaml") 

66 helper.add_task() 

67 helper.pipeline_graph.resolve(helper.butler.registry) 

68 ref = DatasetRef( 

69 helper.pipeline_graph.dataset_types["dataset_auto0"].dataset_type, 

70 DataCoordinate.make_empty(helper.butler.dimensions), 

71 run="input_run", 

72 ) 

73 helper.pipeline_graph.register_dataset_types(helper.butler) 

74 in_memory_butler = helper.make_limited_butler() 

75 blocking_butler = BlockingLimitedButler(in_memory_butler, timeouts={"dataset_auto0": 0.1}) 

76 with self.assertLogs(_LOG, level=logging.INFO) as cm: 

77 self.assertFalse(blocking_butler.stored_many([ref])[ref]) 

78 self.assertIn("not immediately available", cm.output[0]) 

79 with self.assertLogs(_LOG, level=logging.INFO) as cm: 

80 with self.assertRaises(FileNotFoundError): 

81 blocking_butler.get(ref) 

82 self.assertIn("not immediately available", cm.output[0]) 

83 

84 def test_no_waiting_if_exists(self): 

85 """Test checking/getting with dataset present immediately, so no 

86 waiting should be necessary. 

87 """ 

88 helper = InMemoryRepo("base.yaml") 

89 helper.add_task() 

90 (ref,) = helper.insert_datasets("dataset_auto0") 

91 helper.pipeline_graph.register_dataset_types(helper.butler) 

92 in_memory_butler = helper.make_limited_butler() 

93 blocking_butler = BlockingLimitedButler(in_memory_butler, timeouts={}) 

94 with self.assertNoLogs(_LOG, level=logging.INFO): 

95 self.assertTrue(blocking_butler.stored_many([ref])[ref]) 

96 self.assertIsNotNone(blocking_butler.get(ref)) 

97 

98 

99if __name__ == "__main__": 

100 lsst.utils.tests.init() 

101 unittest.main()