Coverage for tests / test_execution_storage_class_conversion.py: 24%
113 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 08:32 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 08:32 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <https://www.gnu.org/licenses/>.
28from __future__ import annotations
30import os
31import shutil
32import tempfile
33import unittest
35import lsst.daf.butler
36import lsst.utils.tests
37from lsst.pipe.base import PipelineGraph
38from lsst.pipe.base.pipeline_graph import IncompatibleDatasetTypeError
39from lsst.pipe.base.simple_pipeline_executor import SimplePipelineExecutor
40from lsst.pipe.base.tests.mocks import (
41 DynamicConnectionConfig,
42 DynamicTestPipelineTask,
43 DynamicTestPipelineTaskConfig,
44 MockStorageClass,
45 get_mock_name,
46)
48TESTDIR = os.path.abspath(os.path.dirname(__file__))
51class TestExecutionStorageClassConversion(lsst.utils.tests.TestCase):
52 """Test storage class conversions during execution.
54 Task connection declarations should always define which storage class they
55 see, while data repository registrations should always define what is
56 stored.
58 This test uses mock storage classes for intermediate and output datasets,
59 which let us load the dataset to see what storage class the task saw when
60 it was running. These storage class names need to be wrapped in
61 get_mock_name calls to get what the butler actually sees. Overall input
62 datasets are not declared with mock datasets, so we can `put` them directly
63 in test code.
64 """
66 def setUp(self):
67 self.path = tempfile.mkdtemp()
68 # standalone parameter forces the returned config to also include
69 # the information from the search paths.
70 config = lsst.daf.butler.Butler.makeRepo(
71 self.path, standalone=True, searchPaths=[os.path.join(TESTDIR, "config")]
72 )
73 self.butler = SimplePipelineExecutor.prep_butler(config, [], "fake")
74 self.enterContext(self.butler)
75 self.butler.registry.registerDatasetType(
76 lsst.daf.butler.DatasetType(
77 "input",
78 dimensions=self.butler.dimensions.empty,
79 storageClass="StructuredDataDict",
80 )
81 )
82 self.butler.put({"zero": 0}, "input")
83 MockStorageClass.get_or_register_mock("StructuredDataDict")
84 MockStorageClass.get_or_register_mock("TaskMetadataLike")
86 def tearDown(self):
87 shutil.rmtree(self.path, ignore_errors=True)
89 def _make_config(
90 self,
91 input_storage_class="StructuredDataDict",
92 output_storage_class="StructuredDataDict",
93 input_name="input",
94 output_name="output",
95 ):
96 """Create configuration for a test task with a single input and single
97 output of the given storage classes and dataset type names.
98 """
99 config = DynamicTestPipelineTaskConfig()
100 config.inputs["i"] = DynamicConnectionConfig(
101 dataset_type_name=input_name,
102 storage_class=input_storage_class,
103 # Since the overall input is special, we only use a mock storage
104 # class for it when there's a storage class conversion.
105 mock_storage_class=(input_name != "input" or (input_storage_class != "StructuredDataDict")),
106 )
107 config.outputs["o"] = DynamicConnectionConfig(
108 dataset_type_name=output_name, storage_class=output_storage_class
109 )
110 return config
112 def _make_executor(
113 self,
114 a_i_storage_class="StructuredDataDict",
115 a_o_storage_class="StructuredDataDict",
116 b_i_storage_class="StructuredDataDict",
117 b_o_storage_class="StructuredDataDict",
118 ):
119 """Configure a SimplePipelineExecutor with tasks with the given
120 storage classes as inputs and outputs.
122 This sets up a simple pipeline with two tasks ('a' and 'b') where the
123 second task's only input is the first task's only output.
124 """
125 config_a = self._make_config(a_i_storage_class, a_o_storage_class, output_name="intermediate")
126 config_b = self._make_config(b_i_storage_class, b_o_storage_class, input_name="intermediate")
127 pipeline_graph = PipelineGraph()
128 pipeline_graph.add_task("a", DynamicTestPipelineTask, config_a)
129 pipeline_graph.add_task("b", DynamicTestPipelineTask, config_b)
130 executor = SimplePipelineExecutor.from_pipeline_graph(pipeline_graph, butler=self.butler)
131 return executor
133 def _assert_datasets(
134 self,
135 a_i_storage_class="StructuredDataDict",
136 a_o_storage_class="StructuredDataDict",
137 b_i_storage_class="StructuredDataDict",
138 b_o_storage_class="StructuredDataDict",
139 stored_intermediate_storage_class="StructuredDataDict",
140 stored_output_storage_class="StructuredDataDict",
141 butler: lsst.daf.butler.Butler | None = None,
142 ):
143 """Check that a butler repository's contents are consistent with
144 running a pipeline created by _make_executor.
145 """
146 if butler is None:
147 butler = self.butler
148 # Read input and output datasets from butler, inspect their storage
149 # classes directly.
150 stored_intermediate = butler.get("intermediate")
151 stored_output = butler.get("output")
152 self.assertEqual(
153 butler.get_dataset_type("intermediate").storageClass_name,
154 get_mock_name(stored_intermediate_storage_class),
155 )
156 self.assertEqual(stored_output.storage_class, get_mock_name(stored_output_storage_class))
157 self.assertEqual(
158 butler.get_dataset_type("output").storageClass_name,
159 get_mock_name(stored_output_storage_class),
160 )
161 # Since we didn't tell the butler to convert storage classes on read,
162 # they'll remember their last conversion (on write).
163 if a_o_storage_class != stored_intermediate_storage_class:
164 self.assertEqual(
165 stored_intermediate.converted_from.storage_class,
166 get_mock_name(a_o_storage_class),
167 )
168 else:
169 self.assertIsNone(stored_intermediate.converted_from)
170 if b_o_storage_class != stored_output_storage_class:
171 self.assertEqual(
172 stored_output.converted_from.storage_class,
173 get_mock_name(b_o_storage_class),
174 )
175 else:
176 self.assertIsNone(stored_output.converted_from)
177 # Extract the inputs as seen by the tasks from those stored outputs.
178 quantum_a = stored_intermediate.quantum
179 quantum_b = stored_output.quantum
180 b_input = quantum_b.inputs["i"][0]
181 a_input = quantum_a.inputs["i"][0]
182 if a_i_storage_class == "StructuredDataDict":
183 self.assertIsNone(a_input.converted_from, None)
184 else:
185 self.assertEqual(a_input.original_type, "dict")
186 self.assertEqual(b_input.storage_class, get_mock_name(b_i_storage_class))
188 def test_no_conversions(self):
189 """Test execution with no storage class conversions as a baseline."""
190 executor = self._make_executor()
191 quanta = executor.run(register_dataset_types=True, save_versions=False)
192 self.assertEqual(len(quanta), 2)
193 self._assert_datasets()
195 def test_intermediate_registration_differs(self):
196 """Test execution where an intermediate is registered to be different
197 from both the producing and consuming task.
198 """
199 self.butler.registry.registerDatasetType(
200 lsst.daf.butler.DatasetType(
201 "intermediate",
202 dimensions=self.butler.dimensions.empty,
203 storageClass=get_mock_name("TaskMetadataLike"),
204 )
205 )
206 executor = self._make_executor()
207 quanta = executor.run(register_dataset_types=True, save_versions=False)
208 self.assertEqual(len(quanta), 2)
209 self._assert_datasets(stored_intermediate_storage_class="TaskMetadataLike")
211 def test_intermediate_producer_differs(self):
212 """Test execution where an intermediate is registered to be consistent
213 with the consumer but different from its producer.
214 """
215 self.butler.registry.registerDatasetType(
216 lsst.daf.butler.DatasetType(
217 "intermediate",
218 dimensions=self.butler.dimensions.empty,
219 storageClass=get_mock_name("TaskMetadataLike"),
220 )
221 )
222 executor = self._make_executor(b_i_storage_class="TaskMetadataLike")
223 quanta = executor.run(register_dataset_types=True, save_versions=False)
224 self.assertEqual(len(quanta), 2)
225 self._assert_datasets(
226 stored_intermediate_storage_class="TaskMetadataLike", b_i_storage_class="TaskMetadataLike"
227 )
229 def test_intermediate_consumer_differs(self):
230 """Test execution where an intermediate is registered to be consistent
231 with its producer but different from its consumer.
232 """
233 executor = self._make_executor(a_o_storage_class="TaskMetadataLike")
234 quanta = executor.run(register_dataset_types=True, save_versions=False)
235 self.assertEqual(len(quanta), 2)
236 self._assert_datasets(
237 stored_intermediate_storage_class="TaskMetadataLike", a_o_storage_class="TaskMetadataLike"
238 )
240 def test_output_differs(self):
241 """Test execution where an overall output is registered to be
242 different from the producing task.
243 """
244 self.butler.registry.registerDatasetType(
245 lsst.daf.butler.DatasetType(
246 "output",
247 dimensions=self.butler.dimensions.empty,
248 storageClass=get_mock_name("TaskMetadataLike"),
249 )
250 )
251 executor = self._make_executor()
252 quanta = executor.run(register_dataset_types=True, save_versions=False)
253 self.assertEqual(len(quanta), 2)
254 self._assert_datasets(stored_output_storage_class="TaskMetadataLike")
256 def test_input_differs(self):
257 """Test execution where an overall input's storage class is different
258 from the consuming task.
259 """
260 executor = self._make_executor(a_i_storage_class="TaskMetadataLike")
261 quanta = executor.run(register_dataset_types=True, save_versions=False)
262 self.assertEqual(len(quanta), 2)
263 self._assert_datasets(a_i_storage_class="TaskMetadataLike")
265 def test_input_differs_use_local_butler(self):
266 """Test execution where an overall input's storage class is different
267 from the consuming task, and we use a local butler.
268 """
269 executor = self._make_executor(a_i_storage_class="TaskMetadataLike")
270 with tempfile.TemporaryDirectory() as tempdir:
271 root = os.path.join(tempdir, "butler_root")
272 local_butler = executor.use_local_butler(root)
273 self.enterContext(local_butler)
274 quanta = executor.run(register_dataset_types=True, save_versions=False)
275 self.assertEqual(len(quanta), 2)
276 self._assert_datasets(a_i_storage_class="TaskMetadataLike", butler=executor.butler)
278 def test_incompatible(self):
279 """Test that we cannot make a QG if the registry and pipeline have
280 incompatible storage classes for a dataset type.
281 """
282 # Incompatible output dataset type.
283 self.butler.registry.registerDatasetType(
284 lsst.daf.butler.DatasetType(
285 "output",
286 dimensions=self.butler.dimensions.empty,
287 storageClass="StructuredDataList",
288 )
289 )
290 with self.assertRaisesRegex(
291 IncompatibleDatasetTypeError, "Incompatible definition.*StructuredDataDict.*StructuredDataList.*"
292 ):
293 self._make_executor()
295 def test_registry_changed(self):
296 """Run pipeline, but change registry dataset types between making the
297 QG and executing it.
299 This only fails with full-butler execution; we don't have a way to
300 prevent it with QBB.
301 """
302 executor = self._make_executor()
303 self.butler.registry.registerDatasetType(
304 lsst.daf.butler.DatasetType(
305 "output",
306 dimensions=self.butler.dimensions.empty,
307 storageClass="TaskMetadataLike", # even compatible is not okay
308 )
309 )
310 with self.assertRaisesRegex(
311 lsst.daf.butler.registry.ConflictingDefinitionError,
312 ".*_mock_StructuredDataDict.*is inconsistent with.*TaskMetadataLike.*",
313 ):
314 executor.run(register_dataset_types=True, save_versions=False)
317class MemoryTester(lsst.utils.tests.MemoryTestCase):
318 """Generic tests for file leaks."""
321def setup_module(module):
322 """Set up the module for pytest.
324 Parameters
325 ----------
326 module : `~types.ModuleType`
327 Module to set up.
328 """
329 lsst.utils.tests.init()
332if __name__ == "__main__":
333 lsst.utils.tests.init()
334 unittest.main()