Coverage for python / lsst / pipe / base / tests / pipelineStepTester.py: 0%
50 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 08:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 08:49 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28"""Utility to facilitate testing of pipelines consisting of multiple steps."""
30__all__ = ["PipelineStepTester"]
32import dataclasses
33import unittest
34import warnings
36from lsst.daf.butler import Butler, DatasetType
37from lsst.pipe.base import Pipeline, PipelineGraph
38from lsst.resources import ResourcePath, ResourcePathExpression
41@dataclasses.dataclass
42class PipelineStepTester:
43 """Utility class which facilitates testing of pipelines, optionally
44 consisting of multiple steps.
46 Two sets will be constructed by looping over the entire pipeline or all
47 named subsets within the pipeline: `pure_inputs` and `all_outputs`.
49 The `pure_inputs` set consists of all inputs which must be constructed and
50 provided as an input into the pipeline (i.e., they will not be generated
51 by the named pipeline).
53 The `all_outputs` set consists of all dataset types which are generated by
54 the named pipeline, either as intermediates or as final outputs.
56 These sets will be checked against user-supplied sets to ensure that the
57 named pipeline may still be run without raising a missing data error.
59 Attributes
60 ----------
61 filename : `lsst.resources.ResourcePathExpression`
62 The full path or URI to the pipeline YAML.
63 step_suffixes : `list` [`str`]
64 A list, in the order of data reduction, of the step subsets to check.
65 Must include any initial "#".
66 initial_dataset_types : `list` [`tuple`]
67 Dataset types which require initial registry by the butler. Each
68 element must be a tuple of the type name, dimensions, storage class,
69 and calibration flag, as described in the constructor for
70 `~lsst.daf.butler.DatasetType`. All tuple elements are required.
71 expected_inputs : `set` [`str`]
72 Dataset types expected as an input into the pipeline.
73 expected_outputs : `set` [`str`]
74 Dataset types expected to be produced as an output by the pipeline.
75 pipeline_patches : `dict` [`str`, `str`], optional
76 Any config overrides to be applied to the pipeline before testing it.
77 This is rarely appropriate, and should be reserved for fields that
78 cannot have a file-level default. The key must have the form
79 "task:subtask.field".
80 """
82 filename: ResourcePathExpression
83 step_suffixes: list[str]
84 initial_dataset_types: list[tuple[str, set[str], str, bool]]
85 expected_inputs: set[str]
86 expected_outputs: set[str]
87 pipeline_patches: dict[str, str] = dataclasses.field(default_factory=dict)
89 def register_dataset_types(self, butler: Butler) -> None:
90 """Register any dataset types passed to the class constructor.
92 The types registered are those specified in
93 ``self.initial_dataset_types``.
95 Parameters
96 ----------
97 butler : `lsst.daf.butler.Butler`
98 The (test) butler in which to register the types.
99 """
100 for name, dimensions, storageClass, isCalibration in self.initial_dataset_types:
101 butler.registry.registerDatasetType(
102 DatasetType(
103 name,
104 dimensions,
105 storageClass=storageClass,
106 isCalibration=isCalibration,
107 universe=butler.dimensions,
108 )
109 )
111 def run(self, butler: Butler, test_case: unittest.TestCase) -> None:
112 """Run the test on all pipelines passed to the class constructor.
114 Parameters
115 ----------
116 butler : `lsst.daf.butler.Butler`
117 The butler in which to run the test. The butler must be writeable,
118 and its state will be modified as part of the test.
119 test_case : `unittest.TestCase`
120 The test case in which this test is run.
122 Raises
123 ------
124 AssertionError
125 Raised if the pipeline requires inputs that are not in
126 ``self.expected_inputs``, or fails to produce outputs that are in
127 ``self.expected_outputs``.
128 """
129 self.register_dataset_types(butler)
131 all_outputs: dict[str, DatasetType] = dict()
132 pure_inputs: dict[str, str] = dict()
134 file_uri = ResourcePath(self.filename, forceDirectory=False)
135 for suffix in self.step_suffixes:
136 step_uri = file_uri.replace(fragment=suffix.removeprefix("#"))
137 step_graph = self.load_pipeline_graph(step_uri)
138 step_graph.resolve(butler.registry)
140 for name, _ in step_graph.iter_overall_inputs():
141 if name not in all_outputs:
142 tasks = step_graph.consumers_of(name)
143 pure_inputs[name] = f"{suffix}: {', '.join(task.label for task in tasks)}"
144 all_outputs.update(
145 {
146 name: node.dataset_type
147 for name, node in step_graph.dataset_types.items()
148 if step_graph.producing_edge_of(name) is not None
149 }
150 )
152 for node in step_graph.dataset_types.values():
153 butler.registry.registerDatasetType(node.dataset_type)
155 if not pure_inputs.keys() <= self.expected_inputs:
156 missing = []
157 for type_name in pure_inputs.keys() - self.expected_inputs:
158 suffix = pure_inputs[type_name]
159 missing.append(type_name + (f" ({suffix})" if suffix else ""))
160 raise AssertionError(f"Got unexpected pure_inputs: {missing}")
162 if not all_outputs.keys() >= self.expected_outputs:
163 missing = list(self.expected_outputs - all_outputs.keys())
164 raise AssertionError(f"Missing expected_outputs: {missing}")
166 def load_pipeline_graph(self, uri: ResourcePathExpression) -> PipelineGraph:
167 pipeline = Pipeline.from_uri(uri)
168 for fullkey, value in self.pipeline_patches.items():
169 label, key = fullkey.split(":", maxsplit=1)
170 try:
171 pipeline.addConfigOverride(label, key, value)
172 except LookupError as e:
173 warnings.warn(f"{e}, skipping configuration {fullkey}={value}", UserWarning)
174 return pipeline.to_graph()