Coverage for python / lsst / pipe / base / graph / quantumNode.py: 63%
62 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:44 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:44 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
27from __future__ import annotations
29__all__ = ("BuildId", "NodeId", "QuantumNode")
31import uuid
32from dataclasses import dataclass
33from typing import Any, NewType
35import pydantic
37from lsst.daf.butler import (
38 DatasetRef,
39 DimensionRecordsAccumulator,
40 DimensionUniverse,
41 Quantum,
42 SerializedQuantum,
43)
45from ..pipeline import TaskDef
46from ..pipeline_graph import PipelineGraph, TaskNode
48BuildId = NewType("BuildId", str)
51def _hashDsRef(ref: DatasetRef) -> int:
52 return hash((ref.datasetType, ref.dataId))
55@dataclass(frozen=True, eq=True)
56class NodeId:
57 r"""Deprecated, this class is used with QuantumGraph save formats of
58 1 and 2 when unpicking objects and must be retained until those formats
59 are considered unloadable.
61 This represents an unique identifier of a node within an individual
62 construction of a `QuantumGraph`. This identifier will stay constant
63 through a pickle, and any `QuantumGraph` methods that return a new
64 `QuantumGraph`.
66 A `NodeId` will not be the same if a new graph is built containing the same
67 information in a `QuantumNode`, or even built from exactly the same inputs.
69 `NodeId`\ s do not play any role in deciding the equality or identity
70 (hash) of a `QuantumNode`, and are mainly useful in debugging or working
71 with various subsets of the same graph.
73 This interface is a convenance only, and no guarantees on long term
74 stability are made. New implementations might change the `NodeId`, or
75 provide more or less guarantees.
76 """
78 number: int
79 """The unique position of the node within the graph assigned at graph
80 creation.
81 """
82 buildId: BuildId
83 """Unique identifier created at the time the originating graph was created
84 """
87@dataclass(frozen=True)
88class QuantumNode:
89 """Class representing a node in the quantum graph.
91 The ``quantum`` attribute represents the data that is to be processed at
92 this node.
93 """
95 quantum: Quantum
96 """The unit of data that is to be processed by this graph node"""
97 taskDef: TaskDef
98 """Definition of the task that will process the `Quantum` associated with
99 this node.
100 """
101 nodeId: uuid.UUID
102 """The unique position of the node within the graph assigned at graph
103 creation.
104 """
106 @property
107 def task_node(self) -> TaskNode:
108 """Return the node object that represents this task in a pipeline
109 graph.
110 """
111 pipeline_graph = PipelineGraph()
112 return pipeline_graph.add_task(
113 self.taskDef.label,
114 self.taskDef.taskClass,
115 self.taskDef.config,
116 connections=self.taskDef.connections,
117 )
119 __slots__ = ("quantum", "taskDef", "nodeId", "_precomputedHash")
121 def __post_init__(self) -> None:
122 # use setattr here to preserve the frozenness of the QuantumNode
123 self._precomputedHash: int
124 object.__setattr__(self, "_precomputedHash", hash((self.taskDef.label, self.quantum)))
126 def __eq__(self, other: object) -> bool:
127 if not isinstance(other, QuantumNode):
128 return False
129 if self.quantum != other.quantum:
130 return False
131 return self.taskDef == other.taskDef
133 def __hash__(self) -> int:
134 """For graphs it is useful to have a more robust hash than provided
135 by the default quantum id based hashing
136 """
137 return self._precomputedHash
139 def __repr__(self) -> str:
140 """Make more human readable string representation."""
141 return (
142 f"{self.__class__.__name__}(quantum={self.quantum}, taskDef={self.taskDef}, nodeId={self.nodeId})"
143 )
145 def to_simple(self, accumulator: DimensionRecordsAccumulator | None = None) -> SerializedQuantumNode:
146 return SerializedQuantumNode(
147 quantum=self.quantum.to_simple(accumulator=accumulator),
148 taskLabel=self.taskDef.label,
149 nodeId=self.nodeId,
150 )
152 @classmethod
153 def from_simple(
154 cls,
155 simple: SerializedQuantumNode,
156 taskDefMap: dict[str, TaskDef],
157 universe: DimensionUniverse,
158 ) -> QuantumNode:
159 return QuantumNode(
160 quantum=Quantum.from_simple(simple.quantum, universe),
161 taskDef=taskDefMap[simple.taskLabel],
162 nodeId=simple.nodeId,
163 )
165 def _replace_quantum(self, quantum: Quantum) -> None:
166 """Replace Quantum instance in this node.
168 Parameters
169 ----------
170 quantum : `Quantum`
171 New Quantum instance for this node.
173 Raises
174 ------
175 ValueError
176 Raised if the hash of the new quantum is different from the hash of
177 the existing quantum.
179 Notes
180 -----
181 This class is immutable and hashable, so this method checks that new
182 quantum does not invalidate its current hash. This method is supposed
183 to used only by `QuantumGraph` class as its implementation detail,
184 so it is made "underscore-protected".
185 """
186 if hash(quantum) != hash(self.quantum):
187 raise ValueError(
188 f"Hash of the new quantum {quantum} does not match hash of existing quantum {self.quantum}"
189 )
190 object.__setattr__(self, "quantum", quantum)
193_fields_set = {"quantum", "taskLabel", "nodeId"}
196class SerializedQuantumNode(pydantic.BaseModel):
197 """Model representing a `QuantumNode` in serializable form."""
199 quantum: SerializedQuantum
200 taskLabel: str
201 nodeId: uuid.UUID
203 @classmethod
204 def direct(cls, *, quantum: dict[str, Any], taskLabel: str, nodeId: str) -> SerializedQuantumNode:
205 node = cls.model_construct(
206 __fields_set=_fields_set,
207 quantum=SerializedQuantum.direct(**quantum),
208 taskLabel=taskLabel,
209 nodeId=uuid.UUID(nodeId),
210 )
212 return node