Coverage for python / lsst / pipe / base / pipeline_graph / _dataset_types.py: 43%

76 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-24 08:19 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27from __future__ import annotations 

28 

29__all__ = ("DatasetTypeNode",) 

30 

31import dataclasses 

32from collections.abc import Callable, Collection 

33from typing import TYPE_CHECKING, Any 

34 

35import networkx 

36 

37from lsst.daf.butler import DatasetRef, DatasetType, DimensionGroup, DimensionUniverse, StorageClass 

38 

39from ._exceptions import DuplicateOutputError 

40from ._nodes import NodeKey, NodeType 

41 

42if TYPE_CHECKING: 

43 from ._edges import ReadEdge, WriteEdge 

44 

45 

46@dataclasses.dataclass(frozen=True, eq=False) 

47class DatasetTypeNode: 

48 """A node in a pipeline graph that represents a resolved dataset type. 

49 

50 Notes 

51 ----- 

52 A dataset type node represents a common definition of the dataset type 

53 across the entire graph - it is never a component, and the storage class is 

54 the registry dataset type's storage class or (if there isn't one) the one 

55 defined by the producing task. 

56 

57 Dataset type nodes are intentionally not equality comparable, since there 

58 are many different (and useful) ways to compare these objects with no clear 

59 winner as the most obvious behavior. 

60 """ 

61 

62 dataset_type: DatasetType 

63 """Common definition of this dataset type for the graph 

64 (`~lsst.daf.butler.DatasetType`). 

65 """ 

66 

67 is_initial_query_constraint: bool 

68 """Whether this dataset should be included as a constraint in the initial 

69 query for data IDs in QuantumGraph generation (`bool`). 

70 

71 This is only `True` for dataset types that are overall regular inputs, and 

72 only if none of those input connections had ``deferQueryConstraint=True``. 

73 """ 

74 

75 is_prerequisite: bool 

76 """Whether this dataset type is a prerequisite input that must exist in 

77 the Registry before graph creation (`bool`). 

78 """ 

79 

80 producing_edge: WriteEdge | None 

81 """The edge to the task that produces this dataset type 

82 (`lsst.pipe.base.pipeline_graph.WriteEdge` or `None`).""" 

83 

84 consuming_edges: Collection[ReadEdge] 

85 """The edges to tasks that consume this dataset type 

86 (collection [`lsst.pipe.base.pipeline_graph.ReadEdge`]).""" 

87 

88 @classmethod 

89 def _from_edges( 

90 cls, 

91 key: NodeKey, 

92 xgraph: networkx.MultiDiGraph, 

93 get_registered: Callable[[str], DatasetType | None], 

94 dimensions: DimensionUniverse, 

95 previous: DatasetTypeNode | None, 

96 visualization_only: bool = False, 

97 ) -> DatasetTypeNode: 

98 """Construct a dataset type node from its edges. 

99 

100 Parameters 

101 ---------- 

102 key : `NodeKey` 

103 Named tuple that holds the dataset type and serves as the node 

104 object in the internal networkx graph. 

105 xgraph : `networkx.MultiDiGraph` 

106 The internal networkx graph. 

107 get_registered : `~collections.abc.Callable` or `None` 

108 Callable that takes a dataset type name and returns the 

109 `~lsst.daf.butler.DatasetType` registered in the data repository, 

110 or `None` if it is not registered. 

111 dimensions : `lsst.daf.butler.DimensionUniverse` 

112 Definitions of all dimensions. 

113 previous : `DatasetTypeNode` or `None` 

114 Previous node for this dataset type. 

115 visualization_only : `bool`, optional 

116 Resolve the graph as well as possible even when dimensions and 

117 storage classes cannot really be determined. This can include 

118 using the ``universe.commonSkyPix`` as the assumed dimensions of 

119 connections that use the "skypix" placeholder and using "<UNKNOWN>" 

120 as a storage class name (which will fail if the storage class 

121 itself is ever actually loaded). 

122 

123 Returns 

124 ------- 

125 node : `DatasetTypeNode` 

126 Node consistent with all edges pointing to it and the data 

127 repository. 

128 """ 

129 dataset_type = get_registered(key.name) if get_registered is not None else None 

130 is_registered = dataset_type is not None 

131 if previous is not None and previous.dataset_type == dataset_type: 

132 # This node was already resolved (with exactly the same edges 

133 # contributing, since we clear resolutions when edges are added or 

134 # removed). The only thing that might have changed was the 

135 # definition in the registry, and it didn't. 

136 return previous 

137 is_initial_query_constraint = True 

138 is_prerequisite: bool | None = None 

139 producer: str | None = None 

140 producing_edge: WriteEdge | None = None 

141 # Iterate over the incoming edges to this node, which represent the 

142 # output connections of tasks that write this dataset type; these take 

143 # precedence over the inputs in determining the graph-wide dataset type 

144 # definition (and hence which storage class we register when using the 

145 # graph to register dataset types). There should only be one such 

146 # connection, but we won't necessarily have checked that rule until 

147 # here. As a result there can be at most one iteration of this loop. 

148 for _, _, producing_edge in xgraph.in_edges(key, data="instance"): 

149 assert producing_edge is not None, "Should only be None if we never loop." 

150 if producer is not None: 

151 raise DuplicateOutputError( 

152 f"Dataset type {key.name!r} is produced by both {producing_edge.task_label!r} " 

153 f"and {producer!r}." 

154 ) 

155 producer = producing_edge.task_label 

156 dataset_type = producing_edge._resolve_dataset_type(dataset_type, universe=dimensions) 

157 is_prerequisite = False 

158 is_initial_query_constraint = False 

159 consuming_edge: ReadEdge 

160 consumers: list[str] = [] 

161 consuming_edges = list( 

162 consuming_edge for _, _, consuming_edge in xgraph.out_edges(key, data="instance") 

163 ) 

164 # Put edges that are not component datasets before any edges that are. 

165 consuming_edges.sort(key=lambda consuming_edge: consuming_edge.component is not None) 

166 for consuming_edge in consuming_edges: 

167 dataset_type, is_initial_query_constraint, is_prerequisite = consuming_edge._resolve_dataset_type( 

168 current=dataset_type, 

169 universe=dimensions, 

170 is_initial_query_constraint=is_initial_query_constraint, 

171 is_prerequisite=is_prerequisite, 

172 is_registered=is_registered, 

173 producer=producer, 

174 consumers=consumers, 

175 visualization_only=visualization_only, 

176 ) 

177 consumers.append(consuming_edge.task_label) 

178 assert dataset_type is not None, "Graph structure guarantees at least one edge." 

179 assert is_prerequisite is not None, "Having at least one edge guarantees is_prerequisite is known." 

180 return DatasetTypeNode( 

181 dataset_type=dataset_type, 

182 is_initial_query_constraint=is_initial_query_constraint, 

183 is_prerequisite=is_prerequisite, 

184 producing_edge=producing_edge, 

185 consuming_edges=tuple(consuming_edges), 

186 ) 

187 

188 @property 

189 def name(self) -> str: 

190 """Name of the dataset type (`str`). 

191 

192 This is always the parent dataset type, never that of a component. 

193 """ 

194 return self.dataset_type.name 

195 

196 @property 

197 def key(self) -> NodeKey: 

198 """Key that identifies this dataset type in internal and exported 

199 networkx graphs (`~lsst.pipe.base.pipeline_graph.NodeKey`). 

200 """ 

201 return NodeKey(NodeType.DATASET_TYPE, self.dataset_type.name) 

202 

203 @property 

204 def dimensions(self) -> DimensionGroup: 

205 """Dimensions of the dataset type 

206 (`~lsst.daf.butler.DimensionGroup`). 

207 """ 

208 return self.dataset_type.dimensions 

209 

210 @property 

211 def storage_class_name(self) -> str: 

212 """String name of the storage class for this dataset type (`str`).""" 

213 return self.dataset_type.storageClass_name 

214 

215 @property 

216 def storage_class(self) -> StorageClass: 

217 """Storage class for this dataset type 

218 (`~lsst.daf.butler.StorageClass`). 

219 """ 

220 return self.dataset_type.storageClass 

221 

222 @property 

223 def is_calibration(self) -> bool: 

224 """Whether this dataset type can be included in 

225 `~lsst.daf.butler.CollectionType.CALIBRATION` collections (`bool`). 

226 """ 

227 return self.dataset_type.isCalibration() 

228 

229 def __repr__(self) -> str: 

230 return f"{self.name} ({self.storage_class_name}, {self.dimensions})" 

231 

232 def generalize_ref(self, ref: DatasetRef) -> DatasetRef: 

233 """Convert a `~lsst.daf.butler.DatasetRef` with the dataset type 

234 associated with some task to one with the common dataset type defined 

235 by this node. 

236 

237 Parameters 

238 ---------- 

239 ref : `lsst.daf.butler.DatasetRef` 

240 Reference whose dataset type is convertible to this node's, either 

241 because it is a component with the node's dataset type as its 

242 parent, or because it has a compatible storage class. 

243 

244 Returns 

245 ------- 

246 ref : `lsst.daf.butler.DatasetRef` 

247 Reference with exactly this node's dataset type. 

248 """ 

249 if ref.isComponent(): 

250 ref = ref.makeCompositeRef() 

251 if ref.datasetType.storageClass_name != self.dataset_type.storageClass_name: 

252 return ref.overrideStorageClass(self.dataset_type.storageClass_name) 

253 return ref 

254 

255 def _to_xgraph_state(self) -> dict[str, Any]: 

256 """Convert this node's attributes into a dictionary suitable for use 

257 in exported networkx graphs. 

258 """ 

259 return { 

260 "dataset_type": self.dataset_type, 

261 "is_initial_query_constraint": self.is_initial_query_constraint, 

262 "is_prerequisite": self.is_prerequisite, 

263 "dimensions": self.dimensions, 

264 "storage_class_name": self.dataset_type.storageClass_name, 

265 "bipartite": NodeType.DATASET_TYPE.bipartite, 

266 }