Coverage for python / lsst / pipe / base / graph / _versionDeserializers.py: 26%

252 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-22 08:57 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27from __future__ import annotations 

28 

29__all__ = ("DESERIALIZER_MAP",) 

30 

31import json 

32import lzma 

33import pickle 

34import struct 

35import uuid 

36from abc import ABC, abstractmethod 

37from collections import defaultdict 

38from collections.abc import Callable 

39from dataclasses import dataclass 

40from types import SimpleNamespace 

41from typing import TYPE_CHECKING, ClassVar 

42 

43import networkx as nx 

44 

45from lsst.daf.butler import ( 

46 DatasetRef, 

47 DatasetType, 

48 DimensionConfig, 

49 DimensionUniverse, 

50 Quantum, 

51 SerializedDimensionRecord, 

52) 

53from lsst.daf.butler._rubin import generate_uuidv7 

54from lsst.utils import doImportType 

55 

56from ..config import PipelineTaskConfig 

57from ..pipeline import TaskDef 

58from ..pipelineTask import PipelineTask 

59from ._implDetails import DatasetTypeName, _DatasetTracker 

60from .quantumNode import QuantumNode, SerializedQuantumNode 

61 

62if TYPE_CHECKING: 

63 from .graph import QuantumGraph 

64 

65 

66class StructSizeDescriptor: 

67 """Class level property. It exists to report the size 

68 (number of bytes) of whatever the formatter string is for a deserializer. 

69 """ 

70 

71 def __get__(self, inst: DeserializerBase | None, owner: type[DeserializerBase]) -> int: 

72 return struct.calcsize(owner.FMT_STRING()) 

73 

74 

75@dataclass 

76class DeserializerBase(ABC): 

77 @classmethod 

78 @abstractmethod 

79 def FMT_STRING(cls) -> str: 

80 raise NotImplementedError("Base class does not implement this method") 

81 

82 structSize: ClassVar[StructSizeDescriptor] 

83 

84 preambleSize: int 

85 sizeBytes: bytes 

86 

87 def __init_subclass__(cls) -> None: 

88 # attach the size decriptor 

89 cls.structSize = StructSizeDescriptor() 

90 super().__init_subclass__() 

91 

92 def unpackHeader(self, rawHeader: bytes) -> str | None: 

93 """Transform the raw bytes corresponding to the header of a save into 

94 a string of the header information. 

95 

96 Parameters 

97 ---------- 

98 rawHeader : bytes 

99 The bytes that are to be parsed into the header information. These 

100 are the bytes after the preamble and structsize number of bytes 

101 and before the headerSize bytes. 

102 

103 Returns 

104 ------- 

105 header : `str` or `None` 

106 Header information as a string. Returns `None` if the save format 

107 has no header string implementation (such as save format 1 that is 

108 all pickle). 

109 """ 

110 raise NotImplementedError("Base class does not implement this method") 

111 

112 @property 

113 def headerSize(self) -> int: 

114 """Returns the number of bytes from the beginning of the file to the 

115 end of the metadata. 

116 """ 

117 raise NotImplementedError("Base class does not implement this method") 

118 

119 def readHeaderInfo(self, rawHeader: bytes) -> SimpleNamespace: 

120 """Parse the supplied raw bytes into the header information and 

121 byte ranges of specific TaskDefs and QuantumNodes. 

122 

123 Parameters 

124 ---------- 

125 rawHeader : bytes 

126 The bytes that are to be parsed into the header information. These 

127 are the bytes after the preamble and structsize number of bytes 

128 and before the headerSize bytes. 

129 """ 

130 raise NotImplementedError("Base class does not implement this method") 

131 

132 def constructGraph( 

133 self, 

134 nodes: set[uuid.UUID], 

135 _readBytes: Callable[[int, int], bytes], 

136 universe: DimensionUniverse | None = None, 

137 ) -> QuantumGraph: 

138 """Construct a graph from the deserialized information. 

139 

140 Parameters 

141 ---------- 

142 nodes : `set` of `uuid.UUID` 

143 The nodes to include in the graph. 

144 _readBytes : callable 

145 A callable that can be used to read bytes from the file handle. 

146 The callable will take two ints, start and stop, to use as the 

147 numerical bounds to read and returns a byte stream. 

148 universe : `~lsst.daf.butler.DimensionUniverse` 

149 The singleton of all dimensions known to the middleware registry. 

150 """ 

151 raise NotImplementedError("Base class does not implement this method") 

152 

153 def description(self) -> str: 

154 """Return the description of the serialized data format. 

155 

156 Returns 

157 ------- 

158 desc : `str` 

159 Description of serialized data format. 

160 """ 

161 raise NotImplementedError("Base class does not implement this method") 

162 

163 

164Version1Description = """ 

165The save file starts with the first few bytes corresponding to the magic bytes 

166in the QuantumGraph: `qgraph4\xf6\xe8\xa9`. 

167 

168The next few bytes are 2 big endian unsigned 64 bit integers. 

169 

170The first unsigned 64 bit integer corresponds to the number of bytes of a 

171python mapping of TaskDef labels to the byte ranges in the save file where the 

172definition can be loaded. 

173 

174The second unsigned 64 bit integer corrresponds to the number of bytes of a 

175python mapping of QuantumGraph Node number to the byte ranges in the save file 

176where the node can be loaded. The byte range is indexed starting after 

177the `header` bytes of the magic bytes, size bytes, and bytes of the two 

178mappings. 

179 

180Each of the above mappings are pickled and then lzma compressed, so to 

181deserialize the bytes, first lzma decompression must be performed and the 

182results passed to python pickle loader. 

183 

184As stated above, each map contains byte ranges of the corresponding 

185datastructure. Theses bytes are also lzma compressed pickles, and should 

186be deserialized in a similar manner. The byte range is indexed starting after 

187the `header` bytes of the magic bytes, size bytes, and bytes of the two 

188mappings. 

189 

190In addition to the the TaskDef byte locations, the TypeDef map also contains 

191an additional key '__GraphBuildID'. The value associated with this is the 

192unique id assigned to the graph at its creation time. 

193""" 

194 

195 

196@dataclass 

197class DeserializerV1(DeserializerBase): 

198 @classmethod 

199 def FMT_STRING(cls) -> str: 

200 return ">QQ" 

201 

202 def __post_init__(self) -> None: 

203 self.taskDefMapSize, self.nodeMapSize = struct.unpack(self.FMT_STRING(), self.sizeBytes) 

204 

205 @property 

206 def headerSize(self) -> int: 

207 return self.preambleSize + self.structSize + self.taskDefMapSize + self.nodeMapSize 

208 

209 def readHeaderInfo(self, rawHeader: bytes) -> SimpleNamespace: 

210 returnValue = SimpleNamespace() 

211 returnValue.taskDefMap = pickle.loads(rawHeader[: self.taskDefMapSize]) 

212 returnValue._buildId = returnValue.taskDefMap["__GraphBuildID"] 

213 returnValue.map = pickle.loads(rawHeader[self.taskDefMapSize :]) 

214 returnValue.metadata = None 

215 self.returnValue = returnValue 

216 return returnValue 

217 

218 def unpackHeader(self, rawHeader: bytes) -> str | None: 

219 return None 

220 

221 def constructGraph( 

222 self, 

223 nodes: set[uuid.UUID], 

224 _readBytes: Callable[[int, int], bytes], 

225 universe: DimensionUniverse | None = None, 

226 ) -> QuantumGraph: 

227 # need to import here to avoid cyclic imports 

228 from . import QuantumGraph 

229 

230 quanta: defaultdict[TaskDef, set[Quantum]] = defaultdict(set) 

231 quantumToNodeId: dict[Quantum, uuid.UUID] = {} 

232 loadedTaskDef = {} 

233 # loop over the nodes specified above 

234 for node in nodes: 

235 # Get the bytes to read from the map 

236 start, stop = self.returnValue.map[node] 

237 start += self.headerSize 

238 stop += self.headerSize 

239 

240 # read the specified bytes, will be overloaded by subclasses 

241 # bytes are compressed, so decompress them 

242 dump = lzma.decompress(_readBytes(start, stop)) 

243 

244 # reconstruct node 

245 qNode = pickle.loads(dump) 

246 object.__setattr__(qNode, "nodeId", generate_uuidv7()) 

247 

248 # read the saved node, name. If it has been loaded, attach it, if 

249 # not read in the taskDef first, and then load it 

250 nodeTask = qNode.taskDef 

251 if nodeTask not in loadedTaskDef: 

252 # Get the byte ranges corresponding to this taskDef 

253 start, stop = self.returnValue.taskDefMap[nodeTask] 

254 start += self.headerSize 

255 stop += self.headerSize 

256 

257 # load the taskDef, this method call will be overloaded by 

258 # subclasses. 

259 # bytes are compressed, so decompress them 

260 taskDef = pickle.loads(lzma.decompress(_readBytes(start, stop))) 

261 loadedTaskDef[nodeTask] = taskDef 

262 # Explicitly overload the "frozen-ness" of nodes to attach the 

263 # taskDef back into the un-persisted node 

264 object.__setattr__(qNode, "taskDef", loadedTaskDef[nodeTask]) 

265 quanta[qNode.taskDef].add(qNode.quantum) 

266 

267 # record the node for later processing 

268 quantumToNodeId[qNode.quantum] = qNode.nodeId 

269 

270 # construct an empty new QuantumGraph object, and run the associated 

271 # creation method with the un-persisted data 

272 qGraph = object.__new__(QuantumGraph) 

273 qGraph._buildGraphs( 

274 quanta, 

275 _quantumToNodeId=quantumToNodeId, 

276 _buildId=self.returnValue._buildId, 

277 metadata=self.returnValue.metadata, 

278 universe=universe, 

279 ) 

280 return qGraph 

281 

282 def description(self) -> str: 

283 return Version1Description 

284 

285 

286Version2Description = """ 

287The save file starts with the first few bytes corresponding to the magic bytes 

288in the QuantumGraph: `qgraph4\xf6\xe8\xa9`. 

289 

290The next few bytes are a big endian unsigned long long. 

291 

292The unsigned long long corresponds to the number of bytes of a python mapping 

293of header information. This mapping is encoded into json and then lzma 

294compressed, meaning the operations must be performed in the opposite order to 

295deserialize. 

296 

297The json encoded header mapping contains 4 fields: TaskDefs, GraphBuildId, 

298Nodes, and Metadata. 

299 

300The `TaskDefs` key corresponds to a value which is a mapping of Task label to 

301task data. The task data is a mapping of key to value, where the only key is 

302`bytes` and it corresponds to a tuple of a byte range of the start, stop 

303bytes (indexed after all the header bytes) 

304 

305The `GraphBuildId` corresponds with a string that is the unique id assigned to 

306this graph when it was created. 

307 

308The `Nodes` key is like the `TaskDefs` key except it corresponds to 

309QuantumNodes instead of TaskDefs. Another important difference is that JSON 

310formatting does not allow using numbers as keys, and this mapping is keyed by 

311the node number. Thus it is stored in JSON as two equal length lists, the first 

312being the keys, and the second the values associated with those keys. 

313 

314The `Metadata` key is a mapping of strings to associated values. This metadata 

315may be anything that is important to be transported alongside the graph. 

316 

317As stated above, each map contains byte ranges of the corresponding 

318datastructure. Theses bytes are also lzma compressed pickles, and should 

319be deserialized in a similar manner. 

320""" 

321 

322 

323@dataclass 

324class DeserializerV2(DeserializerBase): 

325 @classmethod 

326 def FMT_STRING(cls) -> str: 

327 return ">Q" 

328 

329 def __post_init__(self) -> None: 

330 (self.mapSize,) = struct.unpack(self.FMT_STRING(), self.sizeBytes) 

331 

332 @property 

333 def headerSize(self) -> int: 

334 return self.preambleSize + self.structSize + self.mapSize 

335 

336 def readHeaderInfo(self, rawHeader: bytes) -> SimpleNamespace: 

337 uncompressedHeaderMap = self.unpackHeader(rawHeader) 

338 if uncompressedHeaderMap is None: 

339 raise ValueError( 

340 "This error is not possible because self.unpackHeader cannot return None," 

341 " but is done to satisfy type checkers" 

342 ) 

343 header = json.loads(uncompressedHeaderMap) 

344 returnValue = SimpleNamespace() 

345 returnValue.taskDefMap = header["TaskDefs"] 

346 returnValue._buildId = header["GraphBuildID"] 

347 returnValue.map = dict(header["Nodes"]) 

348 returnValue.metadata = header["Metadata"] 

349 self.returnValue = returnValue 

350 return returnValue 

351 

352 def unpackHeader(self, rawHeader: bytes) -> str | None: 

353 return lzma.decompress(rawHeader).decode() 

354 

355 def constructGraph( 

356 self, 

357 nodes: set[uuid.UUID], 

358 _readBytes: Callable[[int, int], bytes], 

359 universe: DimensionUniverse | None = None, 

360 ) -> QuantumGraph: 

361 # need to import here to avoid cyclic imports 

362 from . import QuantumGraph 

363 

364 quanta: defaultdict[TaskDef, set[Quantum]] = defaultdict(set) 

365 quantumToNodeId: dict[Quantum, uuid.UUID] = {} 

366 loadedTaskDef = {} 

367 # loop over the nodes specified above 

368 for node in nodes: 

369 # Get the bytes to read from the map 

370 start, stop = self.returnValue.map[node]["bytes"] 

371 start += self.headerSize 

372 stop += self.headerSize 

373 

374 # read the specified bytes, will be overloaded by subclasses 

375 # bytes are compressed, so decompress them 

376 dump = lzma.decompress(_readBytes(start, stop)) 

377 

378 # reconstruct node 

379 qNode = pickle.loads(dump) 

380 object.__setattr__(qNode, "nodeId", generate_uuidv7()) 

381 

382 # read the saved node, name. If it has been loaded, attach it, if 

383 # not read in the taskDef first, and then load it 

384 nodeTask = qNode.taskDef 

385 if nodeTask not in loadedTaskDef: 

386 # Get the byte ranges corresponding to this taskDef 

387 start, stop = self.returnValue.taskDefMap[nodeTask]["bytes"] 

388 start += self.headerSize 

389 stop += self.headerSize 

390 

391 # load the taskDef, this method call will be overloaded by 

392 # subclasses. 

393 # bytes are compressed, so decompress them 

394 taskDef = pickle.loads(lzma.decompress(_readBytes(start, stop))) 

395 loadedTaskDef[nodeTask] = taskDef 

396 # Explicitly overload the "frozen-ness" of nodes to attach the 

397 # taskDef back into the un-persisted node 

398 object.__setattr__(qNode, "taskDef", loadedTaskDef[nodeTask]) 

399 quanta[qNode.taskDef].add(qNode.quantum) 

400 

401 # record the node for later processing 

402 quantumToNodeId[qNode.quantum] = qNode.nodeId 

403 

404 # construct an empty new QuantumGraph object, and run the associated 

405 # creation method with the un-persisted data 

406 qGraph = object.__new__(QuantumGraph) 

407 qGraph._buildGraphs( 

408 quanta, 

409 _quantumToNodeId=quantumToNodeId, 

410 _buildId=self.returnValue._buildId, 

411 metadata=self.returnValue.metadata, 

412 universe=universe, 

413 ) 

414 return qGraph 

415 

416 def description(self) -> str: 

417 return Version2Description 

418 

419 

420Version3Description = """ 

421The save file starts with the first few bytes corresponding to the magic bytes 

422in the QuantumGraph: `qgraph4\xf6\xe8\xa9`. 

423 

424The next few bytes are a big endian unsigned long long. 

425 

426The unsigned long long corresponds to the number of bytes of a mapping 

427of header information. This mapping is encoded into json and then lzma 

428compressed, meaning the operations must be performed in the opposite order to 

429deserialize. 

430 

431The json encoded header mapping contains 5 fields: GraphBuildId, TaskDefs, 

432Nodes, Metadata, and DimensionRecords. 

433 

434The `GraphBuildId` key corresponds with a string that is the unique id assigned 

435to this graph when it was created. 

436 

437The `TaskDefs` key corresponds to a value which is a mapping of Task label to 

438task data. The task data is a mapping of key to value. The keys of this mapping 

439are `bytes`, `inputs`, and `outputs`. 

440 

441The `TaskDefs` `bytes` key corresponds to a tuple of a byte range of the 

442start, stop bytes (indexed after all the header bytes). This byte rage 

443corresponds to a lzma compressed json mapping. This mapping has keys of 

444`taskName`, corresponding to a fully qualified python class, `config` a 

445pex_config string that is used to configure the class, and `label` which 

446corresponds to a string that uniquely identifies the task within a given 

447execution pipeline. 

448 

449The `TaskDefs` `inputs` key is associated with a list of tuples where each 

450tuple is a label of a task that is considered coming before a given task, and 

451the name of the dataset that is shared between the tasks (think node and edge 

452in a graph sense). 

453 

454The `TaskDefs` `outputs` key is like inputs except the values in a list 

455correspond to all the output connections of a task. 

456 

457The `Nodes` key is also a json mapping with keys corresponding to the UUIDs of 

458QuantumNodes. The values associated with these keys is another mapping with 

459the keys `bytes`, `inputs`, and `outputs`. 

460 

461`Nodes` key `bytes` corresponds to a tuple of a byte range of the start, stop 

462bytes (indexed after all the header bytes). These bytes are a lzma compressed 

463json mapping which contains many sub elements, this mapping will be referred to 

464as the SerializedQuantumNode (related to the python class it corresponds to). 

465 

466SerializedQUantumNodes have 3 keys, `quantum` corresponding to a json mapping 

467(described below) referred to as a SerializedQuantum, `taskLabel` a string 

468which corresponds to a label in the `TaskDefs` mapping, and `nodeId. 

469 

470A SerializedQuantum has many keys; taskName, dataId, datasetTypeMapping, 

471initInputs, inputs, outputs, dimensionRecords. 

472 

473like the `TaskDefs` key except it corresponds to 

474QuantumNodes instead of TaskDefs, and the keys of the mappings are string 

475representations of the UUIDs of the QuantumNodes. 

476 

477The `Metadata` key is a mapping of strings to associated values. This metadata 

478may be anything that is important to be transported alongside the graph. 

479 

480As stated above, each map contains byte ranges of the corresponding 

481datastructure. Theses bytes are also lzma compressed pickles, and should 

482be deserialized in a similar manner. 

483""" 

484 

485 

486@dataclass 

487class DeserializerV3(DeserializerBase): 

488 @classmethod 

489 def FMT_STRING(cls) -> str: 

490 return ">Q" 

491 

492 def __post_init__(self) -> None: 

493 self.infoSize: int 

494 (self.infoSize,) = struct.unpack(self.FMT_STRING(), self.sizeBytes) 

495 

496 @property 

497 def headerSize(self) -> int: 

498 return self.preambleSize + self.structSize + self.infoSize 

499 

500 def readHeaderInfo(self, rawHeader: bytes) -> SimpleNamespace: 

501 uncompressedinfoMap = self.unpackHeader(rawHeader) 

502 assert uncompressedinfoMap is not None # for python typing, this variant can't be None 

503 infoMap = json.loads(uncompressedinfoMap) 

504 infoMappings = SimpleNamespace() 

505 infoMappings.taskDefMap = infoMap["TaskDefs"] 

506 infoMappings._buildId = infoMap["GraphBuildID"] 

507 infoMappings.map = {uuid.UUID(k): v for k, v in infoMap["Nodes"]} 

508 infoMappings.metadata = infoMap["Metadata"] 

509 infoMappings.dimensionRecords = {} 

510 for k, v in infoMap["DimensionRecords"].items(): 

511 infoMappings.dimensionRecords[int(k)] = SerializedDimensionRecord(**v) 

512 # This is important to be a get call here, so that it supports versions 

513 # of saved quantum graph that might not have a saved universe without 

514 # changing save format 

515 if (universeConfig := infoMap.get("universe")) is not None: 

516 universe = DimensionUniverse(config=DimensionConfig(universeConfig)) 

517 else: 

518 universe = DimensionUniverse() 

519 infoMappings.universe = universe 

520 infoMappings.globalInitOutputRefs = [] 

521 if (json_refs := infoMap.get("GlobalInitOutputRefs")) is not None: 

522 infoMappings.globalInitOutputRefs = [ 

523 DatasetRef.from_json(json_ref, universe=universe) for json_ref in json_refs 

524 ] 

525 infoMappings.registryDatasetTypes = [] 

526 if (json_refs := infoMap.get("RegistryDatasetTypes")) is not None: 

527 infoMappings.registryDatasetTypes = [ 

528 DatasetType.from_json(json_ref, universe=universe) for json_ref in json_refs 

529 ] 

530 self.infoMappings = infoMappings 

531 return infoMappings 

532 

533 def unpackHeader(self, rawHeader: bytes) -> str | None: 

534 return lzma.decompress(rawHeader).decode() 

535 

536 def constructGraph( 

537 self, 

538 nodes: set[uuid.UUID], 

539 _readBytes: Callable[[int, int], bytes], 

540 universe: DimensionUniverse | None = None, 

541 ) -> QuantumGraph: 

542 # need to import here to avoid cyclic imports 

543 from . import QuantumGraph 

544 

545 graph = nx.DiGraph() 

546 loadedTaskDef: dict[str, TaskDef] = {} 

547 container = {} 

548 datasetDict = _DatasetTracker(createInverse=True) 

549 taskToQuantumNode: defaultdict[TaskDef, set[QuantumNode]] = defaultdict(set) 

550 initInputRefs: dict[str, list[DatasetRef]] = {} 

551 initOutputRefs: dict[str, list[DatasetRef]] = {} 

552 

553 if universe is not None: 

554 if not universe.isCompatibleWith(self.infoMappings.universe): 

555 saved = self.infoMappings.universe 

556 raise RuntimeError( 

557 f"The saved dimension universe ({saved.namespace}@v{saved.version}) is not " 

558 f"compatible with the supplied universe ({universe.namespace}@v{universe.version})." 

559 ) 

560 else: 

561 universe = self.infoMappings.universe 

562 

563 for node in nodes: 

564 start, stop = self.infoMappings.map[node]["bytes"] 

565 start, stop = start + self.headerSize, stop + self.headerSize 

566 # Read in the bytes corresponding to the node to load and 

567 # decompress it 

568 dump = json.loads(lzma.decompress(_readBytes(start, stop))) 

569 

570 # Turn the json back into the pydandtic model 

571 nodeDeserialized = SerializedQuantumNode.direct(**dump) 

572 del dump 

573 

574 # attach the dictionary of dimension records to the pydantic model 

575 # these are stored separately because the are stored over and over 

576 # and this saves a lot of space and time. 

577 nodeDeserialized.quantum.dimensionRecords = self.infoMappings.dimensionRecords 

578 # get the label for the current task 

579 nodeTaskLabel = nodeDeserialized.taskLabel 

580 

581 if nodeTaskLabel not in loadedTaskDef: 

582 # Get the byte ranges corresponding to this taskDef 

583 start, stop = self.infoMappings.taskDefMap[nodeTaskLabel]["bytes"] 

584 start, stop = start + self.headerSize, stop + self.headerSize 

585 

586 # bytes are compressed, so decompress them 

587 taskDefDump = json.loads(lzma.decompress(_readBytes(start, stop))) 

588 taskClass: type[PipelineTask] = doImportType(taskDefDump["taskName"]) 

589 config: PipelineTaskConfig = taskClass.ConfigClass() 

590 config.loadFromStream(taskDefDump["config"]) 

591 # Rebuild TaskDef 

592 recreatedTaskDef = TaskDef( 

593 taskName=taskDefDump["taskName"], 

594 taskClass=taskClass, 

595 config=config, 

596 label=taskDefDump["label"], 

597 ) 

598 loadedTaskDef[nodeTaskLabel] = recreatedTaskDef 

599 

600 # initInputRefs and initOutputRefs are optional 

601 if (refs := taskDefDump.get("initInputRefs")) is not None: 

602 initInputRefs[recreatedTaskDef.label] = [ 

603 DatasetRef.from_json(ref, universe=universe) for ref in refs 

604 ] 

605 if (refs := taskDefDump.get("initOutputRefs")) is not None: 

606 initOutputRefs[recreatedTaskDef.label] = [ 

607 DatasetRef.from_json(ref, universe=universe) for ref in refs 

608 ] 

609 

610 # rebuild the mappings that associate dataset type names with 

611 # TaskDefs 

612 for _, input in self.infoMappings.taskDefMap[nodeTaskLabel]["inputs"]: 

613 datasetDict.addConsumer(DatasetTypeName(input), recreatedTaskDef) 

614 

615 added = set() 

616 for outputConnection in self.infoMappings.taskDefMap[nodeTaskLabel]["outputs"]: 

617 typeName = outputConnection[1] 

618 if typeName not in added: 

619 added.add(typeName) 

620 datasetDict.addProducer(DatasetTypeName(typeName), recreatedTaskDef) 

621 

622 # reconstitute the node, passing in the dictionaries for the 

623 # loaded TaskDefs and dimension records. These are used to ensure 

624 # that each unique record is only loaded once 

625 qnode = QuantumNode.from_simple(nodeDeserialized, loadedTaskDef, universe) 

626 container[qnode.nodeId] = qnode 

627 taskToQuantumNode[loadedTaskDef[nodeTaskLabel]].add(qnode) 

628 

629 # recreate the relations between each node from stored info 

630 graph.add_node(qnode) 

631 for id in self.infoMappings.map[qnode.nodeId]["inputs"]: 

632 # uuid is stored as a string, turn it back into a uuid 

633 id = uuid.UUID(id) 

634 # if the id is not yet in the container, dont make a connection 

635 # this is not an issue, because once it is, that id will add 

636 # the reverse connection 

637 if id in container: 

638 graph.add_edge(container[id], qnode) 

639 for id in self.infoMappings.map[qnode.nodeId]["outputs"]: 

640 # uuid is stored as a string, turn it back into a uuid 

641 id = uuid.UUID(id) 

642 # if the id is not yet in the container, dont make a connection 

643 # this is not an issue, because once it is, that id will add 

644 # the reverse connection 

645 if id in container: 

646 graph.add_edge(qnode, container[id]) 

647 

648 newGraph = object.__new__(QuantumGraph) 

649 newGraph._metadata = self.infoMappings.metadata 

650 newGraph._buildId = self.infoMappings._buildId 

651 newGraph._datasetDict = datasetDict 

652 newGraph._nodeIdMap = container 

653 newGraph._count = len(nodes) 

654 newGraph._taskToQuantumNode = dict(taskToQuantumNode.items()) 

655 newGraph._taskGraph = datasetDict.makeNetworkXGraph() 

656 newGraph._connectedQuanta = graph 

657 newGraph._initInputRefs = initInputRefs 

658 newGraph._initOutputRefs = initOutputRefs 

659 newGraph._globalInitOutputRefs = self.infoMappings.globalInitOutputRefs 

660 newGraph._registryDatasetTypes = self.infoMappings.registryDatasetTypes 

661 newGraph._universe = universe 

662 newGraph._pipeline_graph = None 

663 return newGraph 

664 

665 

666DESERIALIZER_MAP: dict[int, type[DeserializerBase]] = { 

667 1: DeserializerV1, 

668 2: DeserializerV2, 

669 3: DeserializerV3, 

670}