Coverage for python / lsst / pipe / base / connectionTypes.py: 79%

75 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-22 08:57 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28"""Module defining connection types to be used within a 

29`.PipelineTaskConnections` class. 

30""" 

31 

32__all__ = ["BaseConnection", "InitInput", "InitOutput", "Input", "Output", "PrerequisiteInput"] 

33 

34import dataclasses 

35from collections.abc import Callable, Iterable, Sequence 

36from typing import ClassVar 

37 

38from lsst.daf.butler import DataCoordinate, DatasetRef, DatasetType, Registry 

39from lsst.utils.introspection import find_outside_stacklevel 

40 

41 

42@dataclasses.dataclass(frozen=True) 

43class BaseConnection: 

44 """Base class used for declaring `PipelineTask` connections.""" 

45 

46 name: str 

47 """The name used to identify the dataset type.""" 

48 

49 storageClass: str 

50 """The storage class used when (un)/persisting the dataset type.""" 

51 

52 doc: str = "" 

53 """Documentation for this connection.""" 

54 

55 multiple: bool = False 

56 """Indicates if this connection should expect to contain multiple objects 

57 of the given dataset type. 

58 

59 Tasks with more than one connection with ``multiple=True`` with the same 

60 dimensions may want to implement `.PipelineTaskConnections.adjustQuantum` 

61 to ensure those datasets are consistent (i.e. zip-iterable) in 

62 `PipelineTask.runQuantum()` and notify the execution system as early as 

63 possible of outputs that will not be produced because the corresponding 

64 input is missing. 

65 """ 

66 

67 deprecated: str | None = dataclasses.field(default=None, kw_only=True) 

68 """A description of why this connection is deprecated, including the 

69 version after which it may be removed. 

70 

71 If not `None`, the string is appended to the docstring for this 

72 connection and the corresponding config Field. 

73 """ 

74 

75 _connection_type_set: ClassVar[str] 

76 _deprecation_context: str = "" 

77 

78 def __post_init__(self): 

79 if self.deprecated and not self._deprecation_context: 79 ↛ 80line 79 didn't jump to line 80 because the condition on line 79 was never true

80 info = {} 

81 _ = find_outside_stacklevel("lsst.pipe.base", "dataclasses", stack_info=info) 

82 object.__setattr__(self, "_deprecation_context", f"{info['filename']}:{info['lineno']}") 

83 

84 def __get__(self, inst, klass): 

85 """Descriptor access method. 

86 

87 This is a method used to turn a connection into a descriptor. 

88 When a connection is added to a connection class, it is a class level 

89 variable. This method makes accessing this connection, on the 

90 instance of the connection class owning this connection, return a 

91 result specialized for that instance. In the case of connections 

92 this specifically means names specified in a config instance will 

93 be visible instead of the default names for the connection, and that 

94 removed connections will not be accessible on the instance. 

95 """ 

96 # If inst is None, this is being accessed by the class and not an 

97 # instance, return this connection itself 

98 if inst is None: 

99 return self 

100 # Attempt to return the configured connection object from the 

101 # connections instance allConnections mapping. 

102 try: 

103 return inst.allConnections[self.varName] 

104 except KeyError: 

105 raise AttributeError( 

106 f"Connection {self.varName!r} of {klass.__name__} has been removed." 

107 ) from None 

108 

109 

110@dataclasses.dataclass(frozen=True) 

111class DimensionedConnection(BaseConnection): 

112 """Class used for declaring PipelineTask connections that includes 

113 dimensions. 

114 """ 

115 

116 dimensions: Iterable[str] = () 

117 """The keys of the butler data coordinates for this dataset type.""" 

118 

119 isCalibration: bool = False 

120 """ `True` if this dataset type may be included in 

121 `~lsst.daf.butler.CollectionType.CALIBRATION` collections to associate it 

122 with a validity range, `False` (default) otherwise.""" 

123 

124 def __post_init__(self): 

125 super().__post_init__() 

126 if isinstance(self.dimensions, str): 126 ↛ 127line 126 didn't jump to line 127 because the condition on line 126 was never true

127 raise TypeError( 

128 "Dimensions must be iterable of dimensions, got str, possibly omitted trailing comma" 

129 ) 

130 if not isinstance(self.dimensions, Iterable): 130 ↛ 131line 130 didn't jump to line 131 because the condition on line 130 was never true

131 raise TypeError("Dimensions must be iterable of dimensions") 

132 

133 

134@dataclasses.dataclass(frozen=True) 

135class BaseInput(DimensionedConnection): 

136 """Class used for declaring PipelineTask input connections. 

137 

138 Raises 

139 ------ 

140 TypeError 

141 Raised if ``minimum`` is greater than one but ``multiple=False``. 

142 NotImplementedError 

143 Raised if ``minimum`` is zero for a regular `Input` connection; this 

144 is not currently supported by our QuantumGraph generation algorithm. 

145 """ 

146 

147 deferLoad: bool = False 

148 """Whether this dataset type will be loaded as a 

149 `lsst.daf.butler.DeferredDatasetHandle`. PipelineTasks can use this 

150 object to load the object at a later time. 

151 """ 

152 

153 minimum: int = 1 

154 """Minimum number of datasets required for this connection, per quantum. 

155 

156 This is checked in the base implementation of 

157 `.PipelineTaskConnections.adjustQuantum`, which raises `NoWorkFound` if the 

158 minimum is not met for `Input` connections (causing the quantum to be 

159 pruned, skipped, or never created, depending on the context), and 

160 `FileNotFoundError` for `PrerequisiteInput` connections (causing 

161 QuantumGraph generation to fail). `PipelineTask` implementations may 

162 provide custom `~.PipelineTaskConnections.adjustQuantum` implementations 

163 for more fine-grained or configuration-driven constraints, as long as they 

164 are compatible with this minimum. 

165 """ 

166 

167 def __post_init__(self) -> None: 

168 super().__post_init__() 

169 if self.minimum > 1 and not self.multiple: 169 ↛ 170line 169 didn't jump to line 170 because the condition on line 169 was never true

170 raise TypeError(f"Cannot set minimum={self.minimum} if multiple=False.") 

171 

172 

173@dataclasses.dataclass(frozen=True) 

174class Input(BaseInput): 

175 """Class used for declaring PipelineTask input connections. 

176 

177 Raises 

178 ------ 

179 TypeError 

180 Raised if ``minimum`` is greater than one but ``multiple=False``. 

181 NotImplementedError 

182 Raised if ``minimum`` is zero for a regular `Input` connection; this 

183 is not currently supported by our QuantumGraph generation algorithm. 

184 """ 

185 

186 deferGraphConstraint: bool = False 

187 """If `True`, do not include this dataset type's existence in the initial 

188 query that starts the QuantumGraph generation process. 

189 

190 This can be used to make QuantumGraph generation faster by avoiding 

191 redundant datasets, and in certain cases it can (along with careful 

192 attention to which tasks are included in the same QuantumGraph) be used to 

193 work around the QuantumGraph generation algorithm's inflexible handling of 

194 spatial overlaps. This option has no effect when the connection is not an 

195 overall input of the pipeline (or subset thereof) for which a graph is 

196 being created, and it never affects the ordering of quanta. 

197 """ 

198 

199 deferBinding: bool = False 

200 """If `True`, the dataset will not be automatically included in the 

201 pipeline graph (``deferGraphConstraint=True`` is implied). 

202 

203 A custom `~.quantum_graph_builder.QuantumGraphBuilder` is required to bind 

204 it and add a corresponding edge to the pipeline graph. This option allows 

205 the same dataset type to be used as both an input and an output of a 

206 quantum. 

207 """ 

208 

209 _connection_type_set: ClassVar[str] = "inputs" 

210 

211 

212@dataclasses.dataclass(frozen=True) 

213class PrerequisiteInput(BaseInput): 

214 """Class used for declaring PipelineTask prerequisite connections. 

215 

216 Raises 

217 ------ 

218 TypeError 

219 Raised if ``minimum`` is greater than one but ``multiple=False``. 

220 

221 Notes 

222 ----- 

223 Prerequisite inputs are used for datasets that must exist in the data 

224 repository before a pipeline including this is run; they cannot be produced 

225 by another task in the same pipeline. 

226 

227 In exchange for this limitation, they have a number of advantages relative 

228 to regular `Input` connections: 

229 

230 - The query used to find them then during `QuantumGraph` generation can be 

231 fully customized by providing a ``lookupFunction``. 

232 - Failed searches for prerequisites during `QuantumGraph` generation will 

233 usually generate more helpful diagnostics than those for regular `Input` 

234 connections. 

235 - The default query for prerequisite inputs relates the quantum dimensions 

236 directly to the dimensions of its dataset type, without being constrained 

237 by any of the other dimensions in the pipeline. This allows them to be 

238 used for temporal calibration lookups (which regular `Input` connections 

239 cannot do at present) and to work around `QuantumGraph` generation 

240 limitations involving cases where naive spatial overlap relationships 

241 between dimensions are not desired (e.g. a task that wants all detectors 

242 in each visit for which the visit overlaps a tract, not just those where 

243 that detector+visit combination overlaps the tract). 

244 - Prerequisite inputs may be optional (regular inputs are never optional). 

245 """ 

246 

247 lookupFunction: ( 

248 Callable[[DatasetType, Registry, DataCoordinate, Sequence[str]], Iterable[DatasetRef]] | None 

249 ) = None 

250 """An optional callable function that will look up PrerequisiteInputs 

251 using the DatasetType, registry, quantum dataId, and input collections 

252 passed to it. 

253 

254 If no function is specified, the default temporal/spatial lookup will be 

255 used. 

256 """ 

257 

258 _connection_type_set: ClassVar[str] = "prerequisiteInputs" 

259 

260 

261@dataclasses.dataclass(frozen=True) 

262class Output(DimensionedConnection): 

263 """Connection for output dataset.""" 

264 

265 _connection_type_set: ClassVar[str] = "outputs" 

266 

267 

268@dataclasses.dataclass(frozen=True) 

269class InitInput(BaseConnection): 

270 """Connection for initInput dataset.""" 

271 

272 _connection_type_set: ClassVar[str] = "initInputs" 

273 

274 

275@dataclasses.dataclass(frozen=True) 

276class InitOutput(BaseConnection): 

277 """Connection for initOutput dataset.""" 

278 

279 _connection_type_set: ClassVar[str] = "initOutputs"