Coverage for python / lsst / pipe / base / connectionTypes.py: 79%
75 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:47 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:47 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28"""Module defining connection types to be used within a
29`.PipelineTaskConnections` class.
30"""
32__all__ = ["BaseConnection", "InitInput", "InitOutput", "Input", "Output", "PrerequisiteInput"]
34import dataclasses
35from collections.abc import Callable, Iterable, Sequence
36from typing import ClassVar
38from lsst.daf.butler import DataCoordinate, DatasetRef, DatasetType, Registry
39from lsst.utils.introspection import find_outside_stacklevel
42@dataclasses.dataclass(frozen=True)
43class BaseConnection:
44 """Base class used for declaring `PipelineTask` connections."""
46 name: str
47 """The name used to identify the dataset type."""
49 storageClass: str
50 """The storage class used when (un)/persisting the dataset type."""
52 doc: str = ""
53 """Documentation for this connection."""
55 multiple: bool = False
56 """Indicates if this connection should expect to contain multiple objects
57 of the given dataset type.
59 Tasks with more than one connection with ``multiple=True`` with the same
60 dimensions may want to implement `.PipelineTaskConnections.adjustQuantum`
61 to ensure those datasets are consistent (i.e. zip-iterable) in
62 `PipelineTask.runQuantum()` and notify the execution system as early as
63 possible of outputs that will not be produced because the corresponding
64 input is missing.
65 """
67 deprecated: str | None = dataclasses.field(default=None, kw_only=True)
68 """A description of why this connection is deprecated, including the
69 version after which it may be removed.
71 If not `None`, the string is appended to the docstring for this
72 connection and the corresponding config Field.
73 """
75 _connection_type_set: ClassVar[str]
76 _deprecation_context: str = ""
78 def __post_init__(self):
79 if self.deprecated and not self._deprecation_context: 79 ↛ 80line 79 didn't jump to line 80 because the condition on line 79 was never true
80 info = {}
81 _ = find_outside_stacklevel("lsst.pipe.base", "dataclasses", stack_info=info)
82 object.__setattr__(self, "_deprecation_context", f"{info['filename']}:{info['lineno']}")
84 def __get__(self, inst, klass):
85 """Descriptor access method.
87 This is a method used to turn a connection into a descriptor.
88 When a connection is added to a connection class, it is a class level
89 variable. This method makes accessing this connection, on the
90 instance of the connection class owning this connection, return a
91 result specialized for that instance. In the case of connections
92 this specifically means names specified in a config instance will
93 be visible instead of the default names for the connection, and that
94 removed connections will not be accessible on the instance.
95 """
96 # If inst is None, this is being accessed by the class and not an
97 # instance, return this connection itself
98 if inst is None:
99 return self
100 # Attempt to return the configured connection object from the
101 # connections instance allConnections mapping.
102 try:
103 return inst.allConnections[self.varName]
104 except KeyError:
105 raise AttributeError(
106 f"Connection {self.varName!r} of {klass.__name__} has been removed."
107 ) from None
110@dataclasses.dataclass(frozen=True)
111class DimensionedConnection(BaseConnection):
112 """Class used for declaring PipelineTask connections that includes
113 dimensions.
114 """
116 dimensions: Iterable[str] = ()
117 """The keys of the butler data coordinates for this dataset type."""
119 isCalibration: bool = False
120 """ `True` if this dataset type may be included in
121 `~lsst.daf.butler.CollectionType.CALIBRATION` collections to associate it
122 with a validity range, `False` (default) otherwise."""
124 def __post_init__(self):
125 super().__post_init__()
126 if isinstance(self.dimensions, str): 126 ↛ 127line 126 didn't jump to line 127 because the condition on line 126 was never true
127 raise TypeError(
128 "Dimensions must be iterable of dimensions, got str, possibly omitted trailing comma"
129 )
130 if not isinstance(self.dimensions, Iterable): 130 ↛ 131line 130 didn't jump to line 131 because the condition on line 130 was never true
131 raise TypeError("Dimensions must be iterable of dimensions")
134@dataclasses.dataclass(frozen=True)
135class BaseInput(DimensionedConnection):
136 """Class used for declaring PipelineTask input connections.
138 Raises
139 ------
140 TypeError
141 Raised if ``minimum`` is greater than one but ``multiple=False``.
142 NotImplementedError
143 Raised if ``minimum`` is zero for a regular `Input` connection; this
144 is not currently supported by our QuantumGraph generation algorithm.
145 """
147 deferLoad: bool = False
148 """Whether this dataset type will be loaded as a
149 `lsst.daf.butler.DeferredDatasetHandle`. PipelineTasks can use this
150 object to load the object at a later time.
151 """
153 minimum: int = 1
154 """Minimum number of datasets required for this connection, per quantum.
156 This is checked in the base implementation of
157 `.PipelineTaskConnections.adjustQuantum`, which raises `NoWorkFound` if the
158 minimum is not met for `Input` connections (causing the quantum to be
159 pruned, skipped, or never created, depending on the context), and
160 `FileNotFoundError` for `PrerequisiteInput` connections (causing
161 QuantumGraph generation to fail). `PipelineTask` implementations may
162 provide custom `~.PipelineTaskConnections.adjustQuantum` implementations
163 for more fine-grained or configuration-driven constraints, as long as they
164 are compatible with this minimum.
165 """
167 def __post_init__(self) -> None:
168 super().__post_init__()
169 if self.minimum > 1 and not self.multiple: 169 ↛ 170line 169 didn't jump to line 170 because the condition on line 169 was never true
170 raise TypeError(f"Cannot set minimum={self.minimum} if multiple=False.")
173@dataclasses.dataclass(frozen=True)
174class Input(BaseInput):
175 """Class used for declaring PipelineTask input connections.
177 Raises
178 ------
179 TypeError
180 Raised if ``minimum`` is greater than one but ``multiple=False``.
181 NotImplementedError
182 Raised if ``minimum`` is zero for a regular `Input` connection; this
183 is not currently supported by our QuantumGraph generation algorithm.
184 """
186 deferGraphConstraint: bool = False
187 """If `True`, do not include this dataset type's existence in the initial
188 query that starts the QuantumGraph generation process.
190 This can be used to make QuantumGraph generation faster by avoiding
191 redundant datasets, and in certain cases it can (along with careful
192 attention to which tasks are included in the same QuantumGraph) be used to
193 work around the QuantumGraph generation algorithm's inflexible handling of
194 spatial overlaps. This option has no effect when the connection is not an
195 overall input of the pipeline (or subset thereof) for which a graph is
196 being created, and it never affects the ordering of quanta.
197 """
199 deferBinding: bool = False
200 """If `True`, the dataset will not be automatically included in the
201 pipeline graph (``deferGraphConstraint=True`` is implied).
203 A custom `~.quantum_graph_builder.QuantumGraphBuilder` is required to bind
204 it and add a corresponding edge to the pipeline graph. This option allows
205 the same dataset type to be used as both an input and an output of a
206 quantum.
207 """
209 _connection_type_set: ClassVar[str] = "inputs"
212@dataclasses.dataclass(frozen=True)
213class PrerequisiteInput(BaseInput):
214 """Class used for declaring PipelineTask prerequisite connections.
216 Raises
217 ------
218 TypeError
219 Raised if ``minimum`` is greater than one but ``multiple=False``.
221 Notes
222 -----
223 Prerequisite inputs are used for datasets that must exist in the data
224 repository before a pipeline including this is run; they cannot be produced
225 by another task in the same pipeline.
227 In exchange for this limitation, they have a number of advantages relative
228 to regular `Input` connections:
230 - The query used to find them then during `QuantumGraph` generation can be
231 fully customized by providing a ``lookupFunction``.
232 - Failed searches for prerequisites during `QuantumGraph` generation will
233 usually generate more helpful diagnostics than those for regular `Input`
234 connections.
235 - The default query for prerequisite inputs relates the quantum dimensions
236 directly to the dimensions of its dataset type, without being constrained
237 by any of the other dimensions in the pipeline. This allows them to be
238 used for temporal calibration lookups (which regular `Input` connections
239 cannot do at present) and to work around `QuantumGraph` generation
240 limitations involving cases where naive spatial overlap relationships
241 between dimensions are not desired (e.g. a task that wants all detectors
242 in each visit for which the visit overlaps a tract, not just those where
243 that detector+visit combination overlaps the tract).
244 - Prerequisite inputs may be optional (regular inputs are never optional).
245 """
247 lookupFunction: (
248 Callable[[DatasetType, Registry, DataCoordinate, Sequence[str]], Iterable[DatasetRef]] | None
249 ) = None
250 """An optional callable function that will look up PrerequisiteInputs
251 using the DatasetType, registry, quantum dataId, and input collections
252 passed to it.
254 If no function is specified, the default temporal/spatial lookup will be
255 used.
256 """
258 _connection_type_set: ClassVar[str] = "prerequisiteInputs"
261@dataclasses.dataclass(frozen=True)
262class Output(DimensionedConnection):
263 """Connection for output dataset."""
265 _connection_type_set: ClassVar[str] = "outputs"
268@dataclasses.dataclass(frozen=True)
269class InitInput(BaseConnection):
270 """Connection for initInput dataset."""
272 _connection_type_set: ClassVar[str] = "initInputs"
275@dataclasses.dataclass(frozen=True)
276class InitOutput(BaseConnection):
277 """Connection for initOutput dataset."""
279 _connection_type_set: ClassVar[str] = "initOutputs"