Coverage for python / lsst / pipe / base / graph / quantumNode.py: 63%

62 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:47 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27from __future__ import annotations 

28 

29__all__ = ("BuildId", "NodeId", "QuantumNode") 

30 

31import uuid 

32from dataclasses import dataclass 

33from typing import Any, NewType 

34 

35import pydantic 

36 

37from lsst.daf.butler import ( 

38 DatasetRef, 

39 DimensionRecordsAccumulator, 

40 DimensionUniverse, 

41 Quantum, 

42 SerializedQuantum, 

43) 

44 

45from ..pipeline import TaskDef 

46from ..pipeline_graph import PipelineGraph, TaskNode 

47 

48BuildId = NewType("BuildId", str) 

49 

50 

51def _hashDsRef(ref: DatasetRef) -> int: 

52 return hash((ref.datasetType, ref.dataId)) 

53 

54 

55@dataclass(frozen=True, eq=True) 

56class NodeId: 

57 r"""Deprecated, this class is used with QuantumGraph save formats of 

58 1 and 2 when unpicking objects and must be retained until those formats 

59 are considered unloadable. 

60 

61 This represents an unique identifier of a node within an individual 

62 construction of a `QuantumGraph`. This identifier will stay constant 

63 through a pickle, and any `QuantumGraph` methods that return a new 

64 `QuantumGraph`. 

65 

66 A `NodeId` will not be the same if a new graph is built containing the same 

67 information in a `QuantumNode`, or even built from exactly the same inputs. 

68 

69 `NodeId`\ s do not play any role in deciding the equality or identity 

70 (hash) of a `QuantumNode`, and are mainly useful in debugging or working 

71 with various subsets of the same graph. 

72 

73 This interface is a convenance only, and no guarantees on long term 

74 stability are made. New implementations might change the `NodeId`, or 

75 provide more or less guarantees. 

76 """ 

77 

78 number: int 

79 """The unique position of the node within the graph assigned at graph 

80 creation. 

81 """ 

82 buildId: BuildId 

83 """Unique identifier created at the time the originating graph was created 

84 """ 

85 

86 

87@dataclass(frozen=True) 

88class QuantumNode: 

89 """Class representing a node in the quantum graph. 

90 

91 The ``quantum`` attribute represents the data that is to be processed at 

92 this node. 

93 """ 

94 

95 quantum: Quantum 

96 """The unit of data that is to be processed by this graph node""" 

97 taskDef: TaskDef 

98 """Definition of the task that will process the `Quantum` associated with 

99 this node. 

100 """ 

101 nodeId: uuid.UUID 

102 """The unique position of the node within the graph assigned at graph 

103 creation. 

104 """ 

105 

106 @property 

107 def task_node(self) -> TaskNode: 

108 """Return the node object that represents this task in a pipeline 

109 graph. 

110 """ 

111 pipeline_graph = PipelineGraph() 

112 return pipeline_graph.add_task( 

113 self.taskDef.label, 

114 self.taskDef.taskClass, 

115 self.taskDef.config, 

116 connections=self.taskDef.connections, 

117 ) 

118 

119 __slots__ = ("quantum", "taskDef", "nodeId", "_precomputedHash") 

120 

121 def __post_init__(self) -> None: 

122 # use setattr here to preserve the frozenness of the QuantumNode 

123 self._precomputedHash: int 

124 object.__setattr__(self, "_precomputedHash", hash((self.taskDef.label, self.quantum))) 

125 

126 def __eq__(self, other: object) -> bool: 

127 if not isinstance(other, QuantumNode): 

128 return False 

129 if self.quantum != other.quantum: 

130 return False 

131 return self.taskDef == other.taskDef 

132 

133 def __hash__(self) -> int: 

134 """For graphs it is useful to have a more robust hash than provided 

135 by the default quantum id based hashing 

136 """ 

137 return self._precomputedHash 

138 

139 def __repr__(self) -> str: 

140 """Make more human readable string representation.""" 

141 return ( 

142 f"{self.__class__.__name__}(quantum={self.quantum}, taskDef={self.taskDef}, nodeId={self.nodeId})" 

143 ) 

144 

145 def to_simple(self, accumulator: DimensionRecordsAccumulator | None = None) -> SerializedQuantumNode: 

146 return SerializedQuantumNode( 

147 quantum=self.quantum.to_simple(accumulator=accumulator), 

148 taskLabel=self.taskDef.label, 

149 nodeId=self.nodeId, 

150 ) 

151 

152 @classmethod 

153 def from_simple( 

154 cls, 

155 simple: SerializedQuantumNode, 

156 taskDefMap: dict[str, TaskDef], 

157 universe: DimensionUniverse, 

158 ) -> QuantumNode: 

159 return QuantumNode( 

160 quantum=Quantum.from_simple(simple.quantum, universe), 

161 taskDef=taskDefMap[simple.taskLabel], 

162 nodeId=simple.nodeId, 

163 ) 

164 

165 def _replace_quantum(self, quantum: Quantum) -> None: 

166 """Replace Quantum instance in this node. 

167 

168 Parameters 

169 ---------- 

170 quantum : `Quantum` 

171 New Quantum instance for this node. 

172 

173 Raises 

174 ------ 

175 ValueError 

176 Raised if the hash of the new quantum is different from the hash of 

177 the existing quantum. 

178 

179 Notes 

180 ----- 

181 This class is immutable and hashable, so this method checks that new 

182 quantum does not invalidate its current hash. This method is supposed 

183 to used only by `QuantumGraph` class as its implementation detail, 

184 so it is made "underscore-protected". 

185 """ 

186 if hash(quantum) != hash(self.quantum): 

187 raise ValueError( 

188 f"Hash of the new quantum {quantum} does not match hash of existing quantum {self.quantum}" 

189 ) 

190 object.__setattr__(self, "quantum", quantum) 

191 

192 

193_fields_set = {"quantum", "taskLabel", "nodeId"} 

194 

195 

196class SerializedQuantumNode(pydantic.BaseModel): 

197 """Model representing a `QuantumNode` in serializable form.""" 

198 

199 quantum: SerializedQuantum 

200 taskLabel: str 

201 nodeId: uuid.UUID 

202 

203 @classmethod 

204 def direct(cls, *, quantum: dict[str, Any], taskLabel: str, nodeId: str) -> SerializedQuantumNode: 

205 node = cls.model_construct( 

206 __fields_set=_fields_set, 

207 quantum=SerializedQuantum.direct(**quantum), 

208 taskLabel=taskLabel, 

209 nodeId=uuid.UUID(nodeId), 

210 ) 

211 

212 return node