Coverage for python / lsst / pipe / base / graph / _versionDeserializers.py: 26%
252 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 08:59 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 08:59 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
27from __future__ import annotations
29__all__ = ("DESERIALIZER_MAP",)
31import json
32import lzma
33import pickle
34import struct
35import uuid
36from abc import ABC, abstractmethod
37from collections import defaultdict
38from collections.abc import Callable
39from dataclasses import dataclass
40from types import SimpleNamespace
41from typing import TYPE_CHECKING, ClassVar
43import networkx as nx
45from lsst.daf.butler import (
46 DatasetRef,
47 DatasetType,
48 DimensionConfig,
49 DimensionUniverse,
50 Quantum,
51 SerializedDimensionRecord,
52)
53from lsst.daf.butler._rubin import generate_uuidv7
54from lsst.utils import doImportType
56from ..config import PipelineTaskConfig
57from ..pipeline import TaskDef
58from ..pipelineTask import PipelineTask
59from ._implDetails import DatasetTypeName, _DatasetTracker
60from .quantumNode import QuantumNode, SerializedQuantumNode
62if TYPE_CHECKING:
63 from .graph import QuantumGraph
66class StructSizeDescriptor:
67 """Class level property. It exists to report the size
68 (number of bytes) of whatever the formatter string is for a deserializer.
69 """
71 def __get__(self, inst: DeserializerBase | None, owner: type[DeserializerBase]) -> int:
72 return struct.calcsize(owner.FMT_STRING())
75@dataclass
76class DeserializerBase(ABC):
77 @classmethod
78 @abstractmethod
79 def FMT_STRING(cls) -> str:
80 raise NotImplementedError("Base class does not implement this method")
82 structSize: ClassVar[StructSizeDescriptor]
84 preambleSize: int
85 sizeBytes: bytes
87 def __init_subclass__(cls) -> None:
88 # attach the size decriptor
89 cls.structSize = StructSizeDescriptor()
90 super().__init_subclass__()
92 def unpackHeader(self, rawHeader: bytes) -> str | None:
93 """Transform the raw bytes corresponding to the header of a save into
94 a string of the header information.
96 Parameters
97 ----------
98 rawHeader : bytes
99 The bytes that are to be parsed into the header information. These
100 are the bytes after the preamble and structsize number of bytes
101 and before the headerSize bytes.
103 Returns
104 -------
105 header : `str` or `None`
106 Header information as a string. Returns `None` if the save format
107 has no header string implementation (such as save format 1 that is
108 all pickle).
109 """
110 raise NotImplementedError("Base class does not implement this method")
112 @property
113 def headerSize(self) -> int:
114 """Returns the number of bytes from the beginning of the file to the
115 end of the metadata.
116 """
117 raise NotImplementedError("Base class does not implement this method")
119 def readHeaderInfo(self, rawHeader: bytes) -> SimpleNamespace:
120 """Parse the supplied raw bytes into the header information and
121 byte ranges of specific TaskDefs and QuantumNodes.
123 Parameters
124 ----------
125 rawHeader : bytes
126 The bytes that are to be parsed into the header information. These
127 are the bytes after the preamble and structsize number of bytes
128 and before the headerSize bytes.
129 """
130 raise NotImplementedError("Base class does not implement this method")
132 def constructGraph(
133 self,
134 nodes: set[uuid.UUID],
135 _readBytes: Callable[[int, int], bytes],
136 universe: DimensionUniverse | None = None,
137 ) -> QuantumGraph:
138 """Construct a graph from the deserialized information.
140 Parameters
141 ----------
142 nodes : `set` of `uuid.UUID`
143 The nodes to include in the graph.
144 _readBytes : callable
145 A callable that can be used to read bytes from the file handle.
146 The callable will take two ints, start and stop, to use as the
147 numerical bounds to read and returns a byte stream.
148 universe : `~lsst.daf.butler.DimensionUniverse`
149 The singleton of all dimensions known to the middleware registry.
150 """
151 raise NotImplementedError("Base class does not implement this method")
153 def description(self) -> str:
154 """Return the description of the serialized data format.
156 Returns
157 -------
158 desc : `str`
159 Description of serialized data format.
160 """
161 raise NotImplementedError("Base class does not implement this method")
164Version1Description = """
165The save file starts with the first few bytes corresponding to the magic bytes
166in the QuantumGraph: `qgraph4\xf6\xe8\xa9`.
168The next few bytes are 2 big endian unsigned 64 bit integers.
170The first unsigned 64 bit integer corresponds to the number of bytes of a
171python mapping of TaskDef labels to the byte ranges in the save file where the
172definition can be loaded.
174The second unsigned 64 bit integer corrresponds to the number of bytes of a
175python mapping of QuantumGraph Node number to the byte ranges in the save file
176where the node can be loaded. The byte range is indexed starting after
177the `header` bytes of the magic bytes, size bytes, and bytes of the two
178mappings.
180Each of the above mappings are pickled and then lzma compressed, so to
181deserialize the bytes, first lzma decompression must be performed and the
182results passed to python pickle loader.
184As stated above, each map contains byte ranges of the corresponding
185datastructure. Theses bytes are also lzma compressed pickles, and should
186be deserialized in a similar manner. The byte range is indexed starting after
187the `header` bytes of the magic bytes, size bytes, and bytes of the two
188mappings.
190In addition to the the TaskDef byte locations, the TypeDef map also contains
191an additional key '__GraphBuildID'. The value associated with this is the
192unique id assigned to the graph at its creation time.
193"""
196@dataclass
197class DeserializerV1(DeserializerBase):
198 @classmethod
199 def FMT_STRING(cls) -> str:
200 return ">QQ"
202 def __post_init__(self) -> None:
203 self.taskDefMapSize, self.nodeMapSize = struct.unpack(self.FMT_STRING(), self.sizeBytes)
205 @property
206 def headerSize(self) -> int:
207 return self.preambleSize + self.structSize + self.taskDefMapSize + self.nodeMapSize
209 def readHeaderInfo(self, rawHeader: bytes) -> SimpleNamespace:
210 returnValue = SimpleNamespace()
211 returnValue.taskDefMap = pickle.loads(rawHeader[: self.taskDefMapSize])
212 returnValue._buildId = returnValue.taskDefMap["__GraphBuildID"]
213 returnValue.map = pickle.loads(rawHeader[self.taskDefMapSize :])
214 returnValue.metadata = None
215 self.returnValue = returnValue
216 return returnValue
218 def unpackHeader(self, rawHeader: bytes) -> str | None:
219 return None
221 def constructGraph(
222 self,
223 nodes: set[uuid.UUID],
224 _readBytes: Callable[[int, int], bytes],
225 universe: DimensionUniverse | None = None,
226 ) -> QuantumGraph:
227 # need to import here to avoid cyclic imports
228 from . import QuantumGraph
230 quanta: defaultdict[TaskDef, set[Quantum]] = defaultdict(set)
231 quantumToNodeId: dict[Quantum, uuid.UUID] = {}
232 loadedTaskDef = {}
233 # loop over the nodes specified above
234 for node in nodes:
235 # Get the bytes to read from the map
236 start, stop = self.returnValue.map[node]
237 start += self.headerSize
238 stop += self.headerSize
240 # read the specified bytes, will be overloaded by subclasses
241 # bytes are compressed, so decompress them
242 dump = lzma.decompress(_readBytes(start, stop))
244 # reconstruct node
245 qNode = pickle.loads(dump)
246 object.__setattr__(qNode, "nodeId", generate_uuidv7())
248 # read the saved node, name. If it has been loaded, attach it, if
249 # not read in the taskDef first, and then load it
250 nodeTask = qNode.taskDef
251 if nodeTask not in loadedTaskDef:
252 # Get the byte ranges corresponding to this taskDef
253 start, stop = self.returnValue.taskDefMap[nodeTask]
254 start += self.headerSize
255 stop += self.headerSize
257 # load the taskDef, this method call will be overloaded by
258 # subclasses.
259 # bytes are compressed, so decompress them
260 taskDef = pickle.loads(lzma.decompress(_readBytes(start, stop)))
261 loadedTaskDef[nodeTask] = taskDef
262 # Explicitly overload the "frozen-ness" of nodes to attach the
263 # taskDef back into the un-persisted node
264 object.__setattr__(qNode, "taskDef", loadedTaskDef[nodeTask])
265 quanta[qNode.taskDef].add(qNode.quantum)
267 # record the node for later processing
268 quantumToNodeId[qNode.quantum] = qNode.nodeId
270 # construct an empty new QuantumGraph object, and run the associated
271 # creation method with the un-persisted data
272 qGraph = object.__new__(QuantumGraph)
273 qGraph._buildGraphs(
274 quanta,
275 _quantumToNodeId=quantumToNodeId,
276 _buildId=self.returnValue._buildId,
277 metadata=self.returnValue.metadata,
278 universe=universe,
279 )
280 return qGraph
282 def description(self) -> str:
283 return Version1Description
286Version2Description = """
287The save file starts with the first few bytes corresponding to the magic bytes
288in the QuantumGraph: `qgraph4\xf6\xe8\xa9`.
290The next few bytes are a big endian unsigned long long.
292The unsigned long long corresponds to the number of bytes of a python mapping
293of header information. This mapping is encoded into json and then lzma
294compressed, meaning the operations must be performed in the opposite order to
295deserialize.
297The json encoded header mapping contains 4 fields: TaskDefs, GraphBuildId,
298Nodes, and Metadata.
300The `TaskDefs` key corresponds to a value which is a mapping of Task label to
301task data. The task data is a mapping of key to value, where the only key is
302`bytes` and it corresponds to a tuple of a byte range of the start, stop
303bytes (indexed after all the header bytes)
305The `GraphBuildId` corresponds with a string that is the unique id assigned to
306this graph when it was created.
308The `Nodes` key is like the `TaskDefs` key except it corresponds to
309QuantumNodes instead of TaskDefs. Another important difference is that JSON
310formatting does not allow using numbers as keys, and this mapping is keyed by
311the node number. Thus it is stored in JSON as two equal length lists, the first
312being the keys, and the second the values associated with those keys.
314The `Metadata` key is a mapping of strings to associated values. This metadata
315may be anything that is important to be transported alongside the graph.
317As stated above, each map contains byte ranges of the corresponding
318datastructure. Theses bytes are also lzma compressed pickles, and should
319be deserialized in a similar manner.
320"""
323@dataclass
324class DeserializerV2(DeserializerBase):
325 @classmethod
326 def FMT_STRING(cls) -> str:
327 return ">Q"
329 def __post_init__(self) -> None:
330 (self.mapSize,) = struct.unpack(self.FMT_STRING(), self.sizeBytes)
332 @property
333 def headerSize(self) -> int:
334 return self.preambleSize + self.structSize + self.mapSize
336 def readHeaderInfo(self, rawHeader: bytes) -> SimpleNamespace:
337 uncompressedHeaderMap = self.unpackHeader(rawHeader)
338 if uncompressedHeaderMap is None:
339 raise ValueError(
340 "This error is not possible because self.unpackHeader cannot return None,"
341 " but is done to satisfy type checkers"
342 )
343 header = json.loads(uncompressedHeaderMap)
344 returnValue = SimpleNamespace()
345 returnValue.taskDefMap = header["TaskDefs"]
346 returnValue._buildId = header["GraphBuildID"]
347 returnValue.map = dict(header["Nodes"])
348 returnValue.metadata = header["Metadata"]
349 self.returnValue = returnValue
350 return returnValue
352 def unpackHeader(self, rawHeader: bytes) -> str | None:
353 return lzma.decompress(rawHeader).decode()
355 def constructGraph(
356 self,
357 nodes: set[uuid.UUID],
358 _readBytes: Callable[[int, int], bytes],
359 universe: DimensionUniverse | None = None,
360 ) -> QuantumGraph:
361 # need to import here to avoid cyclic imports
362 from . import QuantumGraph
364 quanta: defaultdict[TaskDef, set[Quantum]] = defaultdict(set)
365 quantumToNodeId: dict[Quantum, uuid.UUID] = {}
366 loadedTaskDef = {}
367 # loop over the nodes specified above
368 for node in nodes:
369 # Get the bytes to read from the map
370 start, stop = self.returnValue.map[node]["bytes"]
371 start += self.headerSize
372 stop += self.headerSize
374 # read the specified bytes, will be overloaded by subclasses
375 # bytes are compressed, so decompress them
376 dump = lzma.decompress(_readBytes(start, stop))
378 # reconstruct node
379 qNode = pickle.loads(dump)
380 object.__setattr__(qNode, "nodeId", generate_uuidv7())
382 # read the saved node, name. If it has been loaded, attach it, if
383 # not read in the taskDef first, and then load it
384 nodeTask = qNode.taskDef
385 if nodeTask not in loadedTaskDef:
386 # Get the byte ranges corresponding to this taskDef
387 start, stop = self.returnValue.taskDefMap[nodeTask]["bytes"]
388 start += self.headerSize
389 stop += self.headerSize
391 # load the taskDef, this method call will be overloaded by
392 # subclasses.
393 # bytes are compressed, so decompress them
394 taskDef = pickle.loads(lzma.decompress(_readBytes(start, stop)))
395 loadedTaskDef[nodeTask] = taskDef
396 # Explicitly overload the "frozen-ness" of nodes to attach the
397 # taskDef back into the un-persisted node
398 object.__setattr__(qNode, "taskDef", loadedTaskDef[nodeTask])
399 quanta[qNode.taskDef].add(qNode.quantum)
401 # record the node for later processing
402 quantumToNodeId[qNode.quantum] = qNode.nodeId
404 # construct an empty new QuantumGraph object, and run the associated
405 # creation method with the un-persisted data
406 qGraph = object.__new__(QuantumGraph)
407 qGraph._buildGraphs(
408 quanta,
409 _quantumToNodeId=quantumToNodeId,
410 _buildId=self.returnValue._buildId,
411 metadata=self.returnValue.metadata,
412 universe=universe,
413 )
414 return qGraph
416 def description(self) -> str:
417 return Version2Description
420Version3Description = """
421The save file starts with the first few bytes corresponding to the magic bytes
422in the QuantumGraph: `qgraph4\xf6\xe8\xa9`.
424The next few bytes are a big endian unsigned long long.
426The unsigned long long corresponds to the number of bytes of a mapping
427of header information. This mapping is encoded into json and then lzma
428compressed, meaning the operations must be performed in the opposite order to
429deserialize.
431The json encoded header mapping contains 5 fields: GraphBuildId, TaskDefs,
432Nodes, Metadata, and DimensionRecords.
434The `GraphBuildId` key corresponds with a string that is the unique id assigned
435to this graph when it was created.
437The `TaskDefs` key corresponds to a value which is a mapping of Task label to
438task data. The task data is a mapping of key to value. The keys of this mapping
439are `bytes`, `inputs`, and `outputs`.
441The `TaskDefs` `bytes` key corresponds to a tuple of a byte range of the
442start, stop bytes (indexed after all the header bytes). This byte rage
443corresponds to a lzma compressed json mapping. This mapping has keys of
444`taskName`, corresponding to a fully qualified python class, `config` a
445pex_config string that is used to configure the class, and `label` which
446corresponds to a string that uniquely identifies the task within a given
447execution pipeline.
449The `TaskDefs` `inputs` key is associated with a list of tuples where each
450tuple is a label of a task that is considered coming before a given task, and
451the name of the dataset that is shared between the tasks (think node and edge
452in a graph sense).
454The `TaskDefs` `outputs` key is like inputs except the values in a list
455correspond to all the output connections of a task.
457The `Nodes` key is also a json mapping with keys corresponding to the UUIDs of
458QuantumNodes. The values associated with these keys is another mapping with
459the keys `bytes`, `inputs`, and `outputs`.
461`Nodes` key `bytes` corresponds to a tuple of a byte range of the start, stop
462bytes (indexed after all the header bytes). These bytes are a lzma compressed
463json mapping which contains many sub elements, this mapping will be referred to
464as the SerializedQuantumNode (related to the python class it corresponds to).
466SerializedQUantumNodes have 3 keys, `quantum` corresponding to a json mapping
467(described below) referred to as a SerializedQuantum, `taskLabel` a string
468which corresponds to a label in the `TaskDefs` mapping, and `nodeId.
470A SerializedQuantum has many keys; taskName, dataId, datasetTypeMapping,
471initInputs, inputs, outputs, dimensionRecords.
473like the `TaskDefs` key except it corresponds to
474QuantumNodes instead of TaskDefs, and the keys of the mappings are string
475representations of the UUIDs of the QuantumNodes.
477The `Metadata` key is a mapping of strings to associated values. This metadata
478may be anything that is important to be transported alongside the graph.
480As stated above, each map contains byte ranges of the corresponding
481datastructure. Theses bytes are also lzma compressed pickles, and should
482be deserialized in a similar manner.
483"""
486@dataclass
487class DeserializerV3(DeserializerBase):
488 @classmethod
489 def FMT_STRING(cls) -> str:
490 return ">Q"
492 def __post_init__(self) -> None:
493 self.infoSize: int
494 (self.infoSize,) = struct.unpack(self.FMT_STRING(), self.sizeBytes)
496 @property
497 def headerSize(self) -> int:
498 return self.preambleSize + self.structSize + self.infoSize
500 def readHeaderInfo(self, rawHeader: bytes) -> SimpleNamespace:
501 uncompressedinfoMap = self.unpackHeader(rawHeader)
502 assert uncompressedinfoMap is not None # for python typing, this variant can't be None
503 infoMap = json.loads(uncompressedinfoMap)
504 infoMappings = SimpleNamespace()
505 infoMappings.taskDefMap = infoMap["TaskDefs"]
506 infoMappings._buildId = infoMap["GraphBuildID"]
507 infoMappings.map = {uuid.UUID(k): v for k, v in infoMap["Nodes"]}
508 infoMappings.metadata = infoMap["Metadata"]
509 infoMappings.dimensionRecords = {}
510 for k, v in infoMap["DimensionRecords"].items():
511 infoMappings.dimensionRecords[int(k)] = SerializedDimensionRecord(**v)
512 # This is important to be a get call here, so that it supports versions
513 # of saved quantum graph that might not have a saved universe without
514 # changing save format
515 if (universeConfig := infoMap.get("universe")) is not None:
516 universe = DimensionUniverse(config=DimensionConfig(universeConfig))
517 else:
518 universe = DimensionUniverse()
519 infoMappings.universe = universe
520 infoMappings.globalInitOutputRefs = []
521 if (json_refs := infoMap.get("GlobalInitOutputRefs")) is not None:
522 infoMappings.globalInitOutputRefs = [
523 DatasetRef.from_json(json_ref, universe=universe) for json_ref in json_refs
524 ]
525 infoMappings.registryDatasetTypes = []
526 if (json_refs := infoMap.get("RegistryDatasetTypes")) is not None:
527 infoMappings.registryDatasetTypes = [
528 DatasetType.from_json(json_ref, universe=universe) for json_ref in json_refs
529 ]
530 self.infoMappings = infoMappings
531 return infoMappings
533 def unpackHeader(self, rawHeader: bytes) -> str | None:
534 return lzma.decompress(rawHeader).decode()
536 def constructGraph(
537 self,
538 nodes: set[uuid.UUID],
539 _readBytes: Callable[[int, int], bytes],
540 universe: DimensionUniverse | None = None,
541 ) -> QuantumGraph:
542 # need to import here to avoid cyclic imports
543 from . import QuantumGraph
545 graph = nx.DiGraph()
546 loadedTaskDef: dict[str, TaskDef] = {}
547 container = {}
548 datasetDict = _DatasetTracker(createInverse=True)
549 taskToQuantumNode: defaultdict[TaskDef, set[QuantumNode]] = defaultdict(set)
550 initInputRefs: dict[str, list[DatasetRef]] = {}
551 initOutputRefs: dict[str, list[DatasetRef]] = {}
553 if universe is not None:
554 if not universe.isCompatibleWith(self.infoMappings.universe):
555 saved = self.infoMappings.universe
556 raise RuntimeError(
557 f"The saved dimension universe ({saved.namespace}@v{saved.version}) is not "
558 f"compatible with the supplied universe ({universe.namespace}@v{universe.version})."
559 )
560 else:
561 universe = self.infoMappings.universe
563 for node in nodes:
564 start, stop = self.infoMappings.map[node]["bytes"]
565 start, stop = start + self.headerSize, stop + self.headerSize
566 # Read in the bytes corresponding to the node to load and
567 # decompress it
568 dump = json.loads(lzma.decompress(_readBytes(start, stop)))
570 # Turn the json back into the pydandtic model
571 nodeDeserialized = SerializedQuantumNode.direct(**dump)
572 del dump
574 # attach the dictionary of dimension records to the pydantic model
575 # these are stored separately because the are stored over and over
576 # and this saves a lot of space and time.
577 nodeDeserialized.quantum.dimensionRecords = self.infoMappings.dimensionRecords
578 # get the label for the current task
579 nodeTaskLabel = nodeDeserialized.taskLabel
581 if nodeTaskLabel not in loadedTaskDef:
582 # Get the byte ranges corresponding to this taskDef
583 start, stop = self.infoMappings.taskDefMap[nodeTaskLabel]["bytes"]
584 start, stop = start + self.headerSize, stop + self.headerSize
586 # bytes are compressed, so decompress them
587 taskDefDump = json.loads(lzma.decompress(_readBytes(start, stop)))
588 taskClass: type[PipelineTask] = doImportType(taskDefDump["taskName"])
589 config: PipelineTaskConfig = taskClass.ConfigClass()
590 config.loadFromStream(taskDefDump["config"])
591 # Rebuild TaskDef
592 recreatedTaskDef = TaskDef(
593 taskName=taskDefDump["taskName"],
594 taskClass=taskClass,
595 config=config,
596 label=taskDefDump["label"],
597 )
598 loadedTaskDef[nodeTaskLabel] = recreatedTaskDef
600 # initInputRefs and initOutputRefs are optional
601 if (refs := taskDefDump.get("initInputRefs")) is not None:
602 initInputRefs[recreatedTaskDef.label] = [
603 DatasetRef.from_json(ref, universe=universe) for ref in refs
604 ]
605 if (refs := taskDefDump.get("initOutputRefs")) is not None:
606 initOutputRefs[recreatedTaskDef.label] = [
607 DatasetRef.from_json(ref, universe=universe) for ref in refs
608 ]
610 # rebuild the mappings that associate dataset type names with
611 # TaskDefs
612 for _, input in self.infoMappings.taskDefMap[nodeTaskLabel]["inputs"]:
613 datasetDict.addConsumer(DatasetTypeName(input), recreatedTaskDef)
615 added = set()
616 for outputConnection in self.infoMappings.taskDefMap[nodeTaskLabel]["outputs"]:
617 typeName = outputConnection[1]
618 if typeName not in added:
619 added.add(typeName)
620 datasetDict.addProducer(DatasetTypeName(typeName), recreatedTaskDef)
622 # reconstitute the node, passing in the dictionaries for the
623 # loaded TaskDefs and dimension records. These are used to ensure
624 # that each unique record is only loaded once
625 qnode = QuantumNode.from_simple(nodeDeserialized, loadedTaskDef, universe)
626 container[qnode.nodeId] = qnode
627 taskToQuantumNode[loadedTaskDef[nodeTaskLabel]].add(qnode)
629 # recreate the relations between each node from stored info
630 graph.add_node(qnode)
631 for id in self.infoMappings.map[qnode.nodeId]["inputs"]:
632 # uuid is stored as a string, turn it back into a uuid
633 id = uuid.UUID(id)
634 # if the id is not yet in the container, dont make a connection
635 # this is not an issue, because once it is, that id will add
636 # the reverse connection
637 if id in container:
638 graph.add_edge(container[id], qnode)
639 for id in self.infoMappings.map[qnode.nodeId]["outputs"]:
640 # uuid is stored as a string, turn it back into a uuid
641 id = uuid.UUID(id)
642 # if the id is not yet in the container, dont make a connection
643 # this is not an issue, because once it is, that id will add
644 # the reverse connection
645 if id in container:
646 graph.add_edge(qnode, container[id])
648 newGraph = object.__new__(QuantumGraph)
649 newGraph._metadata = self.infoMappings.metadata
650 newGraph._buildId = self.infoMappings._buildId
651 newGraph._datasetDict = datasetDict
652 newGraph._nodeIdMap = container
653 newGraph._count = len(nodes)
654 newGraph._taskToQuantumNode = dict(taskToQuantumNode.items())
655 newGraph._taskGraph = datasetDict.makeNetworkXGraph()
656 newGraph._connectedQuanta = graph
657 newGraph._initInputRefs = initInputRefs
658 newGraph._initOutputRefs = initOutputRefs
659 newGraph._globalInitOutputRefs = self.infoMappings.globalInitOutputRefs
660 newGraph._registryDatasetTypes = self.infoMappings.registryDatasetTypes
661 newGraph._universe = universe
662 newGraph._pipeline_graph = None
663 return newGraph
666DESERIALIZER_MAP: dict[int, type[DeserializerBase]] = {
667 1: DeserializerV1,
668 2: DeserializerV2,
669 3: DeserializerV3,
670}