Coverage for tests / test_blocking_limited_butler.py: 26%
46 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:47 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:47 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <https://www.gnu.org/licenses/>.
28"""Tests for execution butler."""
30import logging
31import os
32import unittest
34import lsst.utils.tests
35from lsst.daf.butler import DataCoordinate, DatasetRef
36from lsst.pipe.base.blocking_limited_butler import _LOG, BlockingLimitedButler
37from lsst.pipe.base.tests.mocks import InMemoryRepo
39TESTDIR = os.path.abspath(os.path.dirname(__file__))
42class BlockingLimitedButlerTestCase(unittest.TestCase):
43 """Unit tests for BlockingLimitedButler"""
45 def test_no_block_nonexistent(self):
46 """Test checking/getting with no dataset and blocking disabled."""
47 helper = InMemoryRepo("base.yaml")
48 helper.add_task()
49 helper.pipeline_graph.resolve(helper.butler.registry)
50 ref = DatasetRef(
51 helper.pipeline_graph.dataset_types["dataset_auto0"].dataset_type,
52 DataCoordinate.make_empty(helper.butler.dimensions),
53 run="input_run",
54 )
55 helper.pipeline_graph.register_dataset_types(helper.butler)
56 in_memory_butler = helper.make_limited_butler()
57 blocking_butler = BlockingLimitedButler(in_memory_butler, timeouts={})
58 with self.assertNoLogs(_LOG, level=logging.INFO):
59 self.assertFalse(blocking_butler.stored_many([ref])[ref])
60 with self.assertRaises(FileNotFoundError):
61 blocking_butler.get(ref)
63 def test_timeout_nonexistent(self):
64 """Test checking/getting with no dataset, leading to a timeout."""
65 helper = InMemoryRepo("base.yaml")
66 helper.add_task()
67 helper.pipeline_graph.resolve(helper.butler.registry)
68 ref = DatasetRef(
69 helper.pipeline_graph.dataset_types["dataset_auto0"].dataset_type,
70 DataCoordinate.make_empty(helper.butler.dimensions),
71 run="input_run",
72 )
73 helper.pipeline_graph.register_dataset_types(helper.butler)
74 in_memory_butler = helper.make_limited_butler()
75 blocking_butler = BlockingLimitedButler(in_memory_butler, timeouts={"dataset_auto0": 0.1})
76 with self.assertLogs(_LOG, level=logging.INFO) as cm:
77 self.assertFalse(blocking_butler.stored_many([ref])[ref])
78 self.assertIn("not immediately available", cm.output[0])
79 with self.assertLogs(_LOG, level=logging.INFO) as cm:
80 with self.assertRaises(FileNotFoundError):
81 blocking_butler.get(ref)
82 self.assertIn("not immediately available", cm.output[0])
84 def test_no_waiting_if_exists(self):
85 """Test checking/getting with dataset present immediately, so no
86 waiting should be necessary.
87 """
88 helper = InMemoryRepo("base.yaml")
89 helper.add_task()
90 (ref,) = helper.insert_datasets("dataset_auto0")
91 helper.pipeline_graph.register_dataset_types(helper.butler)
92 in_memory_butler = helper.make_limited_butler()
93 blocking_butler = BlockingLimitedButler(in_memory_butler, timeouts={})
94 with self.assertNoLogs(_LOG, level=logging.INFO):
95 self.assertTrue(blocking_butler.stored_many([ref])[ref])
96 self.assertIsNotNone(blocking_butler.get(ref))
99if __name__ == "__main__":
100 lsst.utils.tests.init()
101 unittest.main()