Coverage for python / lsst / pipe / base / quantum_graph / _predicted.py: 21%
614 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 08:59 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 08:59 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30__all__ = (
31 "PredictedDatasetInfo",
32 "PredictedDatasetModel",
33 "PredictedInitQuantaModel",
34 "PredictedQuantumDatasetsModel",
35 "PredictedQuantumGraph",
36 "PredictedQuantumGraphComponents",
37 "PredictedQuantumGraphReader",
38 "PredictedQuantumInfo",
39 "PredictedThinGraphModel",
40 "PredictedThinQuantumModel",
41)
43import dataclasses
44import itertools
45import logging
46import sys
47import uuid
48import warnings
49from collections import defaultdict
50from collections.abc import Iterable, Iterator, Mapping, Sequence
51from contextlib import AbstractContextManager, contextmanager
52from typing import TYPE_CHECKING, Any, cast
54import networkx
55import networkx.algorithms.bipartite
56import pydantic
57import zstandard
59from lsst.daf.butler import (
60 Config,
61 DataCoordinate,
62 DataIdValue,
63 DatasetRef,
64 DatasetType,
65 DimensionDataAttacher,
66 DimensionDataExtractor,
67 DimensionGroup,
68 DimensionRecordSetDeserializer,
69 DimensionUniverse,
70 LimitedButler,
71 Quantum,
72 QuantumBackedButler,
73 SerializableDimensionData,
74)
75from lsst.daf.butler._rubin import generate_uuidv7
76from lsst.daf.butler.datastore.record_data import DatastoreRecordData, SerializedDatastoreRecordData
77from lsst.daf.butler.registry import ConflictingDefinitionError
78from lsst.resources import ResourcePath, ResourcePathExpression
79from lsst.utils.packages import Packages
81from .. import automatic_connection_constants as acc
82from ..pipeline import TaskDef
83from ..pipeline_graph import (
84 PipelineGraph,
85 TaskImportMode,
86 TaskInitNode,
87 TaskNode,
88 compare_packages,
89 log_config_mismatch,
90)
91from ._common import (
92 FORMAT_VERSION,
93 BaseQuantumGraph,
94 BaseQuantumGraphReader,
95 BaseQuantumGraphWriter,
96 ConnectionName,
97 DataCoordinateValues,
98 DatasetInfo,
99 DatasetTypeName,
100 DatastoreName,
101 HeaderModel,
102 IncompleteQuantumGraphError,
103 QuantumIndex,
104 QuantumInfo,
105 TaskLabel,
106)
107from ._multiblock import DEFAULT_PAGE_SIZE, AddressRow, MultiblockReader, MultiblockWriter
109if TYPE_CHECKING:
110 from ..config import PipelineTaskConfig
111 from ..graph import QgraphSummary, QuantumGraph
113# Sphinx needs imports for type annotations of base class members.
114if "sphinx" in sys.modules:
115 import zipfile # noqa: F401
117 from ._multiblock import AddressReader, Decompressor # noqa: F401
120_LOG = logging.getLogger(__name__)
123class _PredictedThinQuantumModelV0(pydantic.BaseModel):
124 """Data model for a quantum data ID and internal integer ID in a predicted
125 quantum graph.
126 """
128 quantum_index: QuantumIndex
129 """Internal integer ID for this quantum."""
131 data_coordinate: DataCoordinateValues = pydantic.Field(default_factory=list)
132 """Full (required and implied) data coordinate values for this quantum."""
135class PredictedThinQuantumModel(pydantic.BaseModel):
136 """Data model for a quantum data ID and UUID in a predicted
137 quantum graph.
138 """
140 quantum_id: uuid.UUID
141 """Universally unique ID for this quantum."""
143 data_coordinate: DataCoordinateValues = pydantic.Field(default_factory=list)
144 """Full (required and implied) data coordinate values for this quantum."""
146 # Work around the fact that Sphinx chokes on Pydantic docstring formatting,
147 # when we inherit those docstrings in our public classes.
148 if "sphinx" in sys.modules and not TYPE_CHECKING:
150 def copy(self, *args: Any, **kwargs: Any) -> Any:
151 """See `pydantic.BaseModel.copy`."""
152 return super().copy(*args, **kwargs)
154 def model_dump(self, *args: Any, **kwargs: Any) -> Any:
155 """See `pydantic.BaseModel.model_dump`."""
156 return super().model_dump(*args, **kwargs)
158 def model_dump_json(self, *args: Any, **kwargs: Any) -> Any:
159 """See `pydantic.BaseModel.model_dump_json`."""
160 return super().model_dump(*args, **kwargs)
162 def model_copy(self, *args: Any, **kwargs: Any) -> Any:
163 """See `pydantic.BaseModel.model_copy`."""
164 return super().model_copy(*args, **kwargs)
166 @classmethod
167 def model_construct(cls, *args: Any, **kwargs: Any) -> Any: # type: ignore[misc, override]
168 """See `pydantic.BaseModel.model_construct`."""
169 return super().model_construct(*args, **kwargs)
171 @classmethod
172 def model_json_schema(cls, *args: Any, **kwargs: Any) -> Any:
173 """See `pydantic.BaseModel.model_json_schema`."""
174 return super().model_json_schema(*args, **kwargs)
176 @classmethod
177 def model_validate(cls, *args: Any, **kwargs: Any) -> Any:
178 """See `pydantic.BaseModel.model_validate`."""
179 return super().model_validate(*args, **kwargs)
181 @classmethod
182 def model_validate_json(cls, *args: Any, **kwargs: Any) -> Any:
183 """See `pydantic.BaseModel.model_validate_json`."""
184 return super().model_validate_json(*args, **kwargs)
186 @classmethod
187 def model_validate_strings(cls, *args: Any, **kwargs: Any) -> Any:
188 """See `pydantic.BaseModel.model_validate_strings`."""
189 return super().model_validate_strings(*args, **kwargs)
192class _PredictedThinGraphModelV0(pydantic.BaseModel):
193 """Data model for the predicted quantum graph component that maps each
194 task label to the data IDs and internal integer IDs of its quanta.
195 """
197 quanta: dict[TaskLabel, list[_PredictedThinQuantumModelV0]] = pydantic.Field(default_factory=dict)
198 """Minimal descriptions of all quanta, grouped by task label."""
200 edges: list[tuple[QuantumIndex, QuantumIndex]] = pydantic.Field(default_factory=list)
201 """Pairs of (predecessor, successor) internal integer quantum IDs."""
203 def _upgraded(self, address_rows: Mapping[uuid.UUID, AddressRow]) -> PredictedThinGraphModel:
204 """Convert to the v1+ model."""
205 uuid_by_index = {v.index: k for k, v in address_rows.items()}
206 return PredictedThinGraphModel.model_construct(
207 quanta={
208 task_label: [
209 PredictedThinQuantumModel.model_construct(
210 quantum_id=uuid_by_index[q.quantum_index], data_coordinate=q.data_coordinate
211 )
212 for q in quanta
213 ]
214 for task_label, quanta in self.quanta.items()
215 },
216 edges=[(uuid_by_index[index1], uuid_by_index[index2]) for index1, index2 in self.edges],
217 )
220class PredictedThinGraphModel(pydantic.BaseModel):
221 """Data model for the predicted quantum graph component that maps each
222 task label to the data IDs and UUIDs of its quanta.
223 """
225 quanta: dict[TaskLabel, list[PredictedThinQuantumModel]] = pydantic.Field(default_factory=dict)
226 """Minimal descriptions of all quanta, grouped by task label."""
228 edges: list[tuple[uuid.UUID, uuid.UUID]] = pydantic.Field(default_factory=list)
229 """Pairs of (predecessor, successor) quantum IDs."""
231 # Work around the fact that Sphinx chokes on Pydantic docstring formatting,
232 # when we inherit those docstrings in our public classes.
233 if "sphinx" in sys.modules and not TYPE_CHECKING:
235 def copy(self, *args: Any, **kwargs: Any) -> Any:
236 """See `pydantic.BaseModel.copy`."""
237 return super().copy(*args, **kwargs)
239 def model_dump(self, *args: Any, **kwargs: Any) -> Any:
240 """See `pydantic.BaseModel.model_dump`."""
241 return super().model_dump(*args, **kwargs)
243 def model_dump_json(self, *args: Any, **kwargs: Any) -> Any:
244 """See `pydantic.BaseModel.model_dump_json`."""
245 return super().model_dump(*args, **kwargs)
247 def model_copy(self, *args: Any, **kwargs: Any) -> Any:
248 """See `pydantic.BaseModel.model_copy`."""
249 return super().model_copy(*args, **kwargs)
251 @classmethod
252 def model_construct(cls, *args: Any, **kwargs: Any) -> Any: # type: ignore[misc, override]
253 """See `pydantic.BaseModel.model_construct`."""
254 return super().model_construct(*args, **kwargs)
256 @classmethod
257 def model_json_schema(cls, *args: Any, **kwargs: Any) -> Any:
258 """See `pydantic.BaseModel.model_json_schema`."""
259 return super().model_json_schema(*args, **kwargs)
261 @classmethod
262 def model_validate(cls, *args: Any, **kwargs: Any) -> Any:
263 """See `pydantic.BaseModel.model_validate`."""
264 return super().model_validate(*args, **kwargs)
266 @classmethod
267 def model_validate_json(cls, *args: Any, **kwargs: Any) -> Any:
268 """See `pydantic.BaseModel.model_validate_json`."""
269 return super().model_validate_json(*args, **kwargs)
271 @classmethod
272 def model_validate_strings(cls, *args: Any, **kwargs: Any) -> Any:
273 """See `pydantic.BaseModel.model_validate_strings`."""
274 return super().model_validate_strings(*args, **kwargs)
277class PredictedDatasetModel(pydantic.BaseModel):
278 """Data model for the datasets in a predicted quantum graph file."""
280 dataset_id: uuid.UUID
281 """Universally unique ID for the dataset."""
283 dataset_type_name: DatasetTypeName
284 """Name of the type of this dataset.
286 This is always a parent dataset type name, not a component.
288 Note that full dataset type definitions are stored in the pipeline graph.
289 """
291 data_coordinate: DataCoordinateValues = pydantic.Field(default_factory=list)
292 """The full values (required and implied) of this dataset's data ID."""
294 run: str
295 """This dataset's RUN collection name."""
297 @classmethod
298 def from_dataset_ref(cls, ref: DatasetRef) -> PredictedDatasetModel:
299 """Construct from a butler `~lsst.daf.butler.DatasetRef`.
301 Parameters
302 ----------
303 ref : `lsst.daf.butler.DatasetRef`
304 Dataset reference.
306 Returns
307 -------
308 model : `PredictedDatasetModel`
309 Model for the dataset.
310 """
311 dataset_type_name, _ = DatasetType.splitDatasetTypeName(ref.datasetType.name)
312 return cls.model_construct(
313 dataset_id=ref.id,
314 dataset_type_name=dataset_type_name,
315 data_coordinate=list(ref.dataId.full_values),
316 run=ref.run,
317 )
319 # Work around the fact that Sphinx chokes on Pydantic docstring formatting,
320 # when we inherit those docstrings in our public classes.
321 if "sphinx" in sys.modules and not TYPE_CHECKING:
323 def copy(self, *args: Any, **kwargs: Any) -> Any:
324 """See `pydantic.BaseModel.copy`."""
325 return super().copy(*args, **kwargs)
327 def model_dump(self, *args: Any, **kwargs: Any) -> Any:
328 """See `pydantic.BaseModel.model_dump`."""
329 return super().model_dump(*args, **kwargs)
331 def model_dump_json(self, *args: Any, **kwargs: Any) -> Any:
332 """See `pydantic.BaseModel.model_dump_json`."""
333 return super().model_dump(*args, **kwargs)
335 def model_copy(self, *args: Any, **kwargs: Any) -> Any:
336 """See `pydantic.BaseModel.model_copy`."""
337 return super().model_copy(*args, **kwargs)
339 @classmethod
340 def model_construct(cls, *args: Any, **kwargs: Any) -> Any: # type: ignore[misc, override]
341 """See `pydantic.BaseModel.model_construct`."""
342 return super().model_construct(*args, **kwargs)
344 @classmethod
345 def model_json_schema(cls, *args: Any, **kwargs: Any) -> Any:
346 """See `pydantic.BaseModel.model_json_schema`."""
347 return super().model_json_schema(*args, **kwargs)
349 @classmethod
350 def model_validate(cls, *args: Any, **kwargs: Any) -> Any:
351 """See `pydantic.BaseModel.model_validate`."""
352 return super().model_validate(*args, **kwargs)
354 @classmethod
355 def model_validate_json(cls, *args: Any, **kwargs: Any) -> Any:
356 """See `pydantic.BaseModel.model_validate_json`."""
357 return super().model_validate_json(*args, **kwargs)
359 @classmethod
360 def model_validate_strings(cls, *args: Any, **kwargs: Any) -> Any:
361 """See `pydantic.BaseModel.model_validate_strings`."""
362 return super().model_validate_strings(*args, **kwargs)
365class PredictedQuantumDatasetsModel(pydantic.BaseModel):
366 """Data model for a description of a single predicted quantum that includes
367 its inputs and outputs.
368 """
370 quantum_id: uuid.UUID
371 """Universally unique ID for the quantum."""
373 task_label: TaskLabel
374 """Label of the task.
376 Note that task label definitions are stored in the pipeline graph.
377 """
379 data_coordinate: DataCoordinateValues = pydantic.Field(default_factory=list)
380 """The full values (required and implied) of this quantum's data ID."""
382 inputs: dict[ConnectionName, list[PredictedDatasetModel]] = pydantic.Field(default_factory=dict)
383 """The input datasets to this quantum, grouped by connection name."""
385 outputs: dict[ConnectionName, list[PredictedDatasetModel]] = pydantic.Field(default_factory=dict)
386 """The datasets output by this quantum, grouped by connection name."""
388 datastore_records: dict[DatastoreName, SerializedDatastoreRecordData] = pydantic.Field(
389 default_factory=dict
390 )
391 """Datastore records for inputs to this quantum that are already present in
392 the data repository.
393 """
395 def iter_input_dataset_ids(self) -> Iterator[uuid.UUID]:
396 """Return an iterator over the UUIDs of all datasets consumed by this
397 quantum.
399 Returns
400 -------
401 iter : `~collections.abc.Iterator` [ `uuid.UUID` ]
402 Iterator over dataset IDs.
403 """
404 for datasets in self.inputs.values():
405 for dataset in datasets:
406 yield dataset.dataset_id
408 def iter_output_dataset_ids(self) -> Iterator[uuid.UUID]:
409 """Return an iterator over the UUIDs of all datasets produced by this
410 quantum.
412 Returns
413 -------
414 iter : `~collections.abc.Iterator` [ `uuid.UUID` ]
415 Iterator over dataset IDs.
416 """
417 for datasets in self.outputs.values():
418 for dataset in datasets:
419 yield dataset.dataset_id
421 def iter_dataset_ids(self) -> Iterator[uuid.UUID]:
422 """Return an iterator over the UUIDs of all datasets referenced by this
423 quantum.
425 Returns
426 -------
427 iter : `~collections.abc.Iterator` [ `uuid.UUID` ]
428 Iterator over dataset IDs.
429 """
430 yield from self.iter_input_dataset_ids()
431 yield from self.iter_output_dataset_ids()
433 def deserialize_datastore_records(self) -> dict[DatastoreName, DatastoreRecordData]:
434 """Deserialize the mapping of datastore records."""
435 return {
436 datastore_name: DatastoreRecordData.from_simple(serialized_records)
437 for datastore_name, serialized_records in self.datastore_records.items()
438 }
440 @classmethod
441 def from_execution_quantum(
442 cls, task_node: TaskNode, quantum: Quantum, quantum_id: uuid.UUID
443 ) -> PredictedQuantumDatasetsModel:
444 """Construct from an `lsst.daf.butler.Quantum` instance.
446 Parameters
447 ----------
448 task_node : `.pipeline_graph.TaskNode`
449 Task node from the pipeline graph.
450 quantum : `lsst.daf.butler.quantum`
451 Quantum object.
452 quantum_id : `uuid.UUID`
453 ID for this quantum.
455 Returns
456 -------
457 model : `PredictedFullQuantumModel`
458 Model for this quantum.
459 """
460 result: PredictedQuantumDatasetsModel = cls.model_construct(
461 quantum_id=quantum_id,
462 task_label=task_node.label,
463 data_coordinate=list(cast(DataCoordinate, quantum.dataId).full_values),
464 )
465 for read_edge in task_node.iter_all_inputs():
466 refs = sorted(quantum.inputs[read_edge.dataset_type_name], key=lambda ref: ref.dataId)
467 result.inputs[read_edge.connection_name] = [
468 PredictedDatasetModel.from_dataset_ref(ref) for ref in refs
469 ]
470 for write_edge in task_node.iter_all_outputs():
471 refs = sorted(quantum.outputs[write_edge.dataset_type_name], key=lambda ref: ref.dataId)
472 result.outputs[write_edge.connection_name] = [
473 PredictedDatasetModel.from_dataset_ref(ref) for ref in refs
474 ]
475 result.datastore_records = {
476 store_name: records.to_simple() for store_name, records in quantum.datastore_records.items()
477 }
478 return result
480 @classmethod
481 def from_old_quantum_graph_init(
482 cls, task_init_node: TaskInitNode, old_quantum_graph: QuantumGraph
483 ) -> PredictedQuantumDatasetsModel:
484 """Construct from the init-input and init-output dataset types of a
485 task in an old `QuantumGraph` instance.
487 Parameters
488 ----------
489 task_init_node : `.pipeline_graph.TaskNode`
490 Task init node from the pipeline graph.
491 old_quantum_graph : `QuantumGraph`
492 Quantum graph.
494 Returns
495 -------
496 model : `PredictedFullQuantumModel`
497 Model for this "init" quantum.
498 """
499 task_def = old_quantum_graph.findTaskDefByLabel(task_init_node.label)
500 assert task_def is not None
501 init_input_refs = {
502 ref.datasetType.name: ref for ref in (old_quantum_graph.initInputRefs(task_def) or [])
503 }
504 init_output_refs = {
505 ref.datasetType.name: ref for ref in (old_quantum_graph.initOutputRefs(task_def) or [])
506 }
507 init_input_ids = {ref.id for ref in init_input_refs.values()}
508 result: PredictedQuantumDatasetsModel = cls.model_construct(
509 quantum_id=generate_uuidv7(), task_label=task_init_node.label
510 )
511 for read_edge in task_init_node.iter_all_inputs():
512 ref = init_input_refs[read_edge.dataset_type_name]
513 result.inputs[read_edge.connection_name] = [PredictedDatasetModel.from_dataset_ref(ref)]
514 for write_edge in task_init_node.iter_all_outputs():
515 ref = init_output_refs[write_edge.dataset_type_name]
516 result.outputs[write_edge.connection_name] = [PredictedDatasetModel.from_dataset_ref(ref)]
517 datastore_records: dict[str, DatastoreRecordData] = {}
518 for quantum in old_quantum_graph.get_task_quanta(task_init_node.label).values():
519 for store_name, records in quantum.datastore_records.items():
520 subset = records.subset(init_input_ids)
521 if subset is not None:
522 datastore_records.setdefault(store_name, DatastoreRecordData()).update(subset)
523 break # All quanta have same init-inputs, so we only need one.
524 result.datastore_records = {
525 store_name: records.to_simple() for store_name, records in datastore_records.items()
526 }
527 return result
529 # Work around the fact that Sphinx chokes on Pydantic docstring formatting,
530 # when we inherit those docstrings in our public classes.
531 if "sphinx" in sys.modules and not TYPE_CHECKING:
533 def copy(self, *args: Any, **kwargs: Any) -> Any:
534 """See `pydantic.BaseModel.copy`."""
535 return super().copy(*args, **kwargs)
537 def model_dump(self, *args: Any, **kwargs: Any) -> Any:
538 """See `pydantic.BaseModel.model_dump`."""
539 return super().model_dump(*args, **kwargs)
541 def model_dump_json(self, *args: Any, **kwargs: Any) -> Any:
542 """See `pydantic.BaseModel.model_dump_json`."""
543 return super().model_dump(*args, **kwargs)
545 def model_copy(self, *args: Any, **kwargs: Any) -> Any:
546 """See `pydantic.BaseModel.model_copy`."""
547 return super().model_copy(*args, **kwargs)
549 @classmethod
550 def model_construct(cls, *args: Any, **kwargs: Any) -> Any: # type: ignore[misc, override]
551 """See `pydantic.BaseModel.model_construct`."""
552 return super().model_construct(*args, **kwargs)
554 @classmethod
555 def model_json_schema(cls, *args: Any, **kwargs: Any) -> Any:
556 """See `pydantic.BaseModel.model_json_schema`."""
557 return super().model_json_schema(*args, **kwargs)
559 @classmethod
560 def model_validate(cls, *args: Any, **kwargs: Any) -> Any:
561 """See `pydantic.BaseModel.model_validate`."""
562 return super().model_validate(*args, **kwargs)
564 @classmethod
565 def model_validate_json(cls, *args: Any, **kwargs: Any) -> Any:
566 """See `pydantic.BaseModel.model_validate_json`."""
567 return super().model_validate_json(*args, **kwargs)
569 @classmethod
570 def model_validate_strings(cls, *args: Any, **kwargs: Any) -> Any:
571 """See `pydantic.BaseModel.model_validate_strings`."""
572 return super().model_validate_strings(*args, **kwargs)
575class PredictedInitQuantaModel(pydantic.RootModel):
576 """Data model for the init-inputs and init-outputs of a predicted quantum
577 graph.
578 """
580 root: list[PredictedQuantumDatasetsModel] = pydantic.Field(default_factory=list)
581 """List of special "init" quanta: one for each task, and another for global
582 init-outputs.
583 """
585 def update_from_old_quantum_graph(self, old_quantum_graph: QuantumGraph) -> None:
586 """Update this model in-place by extracting from an old `QuantumGraph`
587 instance.
589 Parameters
590 ----------
591 old_quantum_graph : `QuantumGraph`
592 Quantum graph.
593 """
594 global_init_quantum = PredictedQuantumDatasetsModel.model_construct(
595 quantum_id=generate_uuidv7(), task_label=""
596 )
597 for ref in old_quantum_graph.globalInitOutputRefs():
598 global_init_quantum.outputs[ref.datasetType.name] = [PredictedDatasetModel.from_dataset_ref(ref)]
599 self.root.append(global_init_quantum)
600 for task_node in old_quantum_graph.pipeline_graph.tasks.values():
601 self.root.append(
602 PredictedQuantumDatasetsModel.from_old_quantum_graph_init(task_node.init, old_quantum_graph)
603 )
605 # Work around the fact that Sphinx chokes on Pydantic docstring formatting,
606 # when we inherit those docstrings in our public classes.
607 if "sphinx" in sys.modules and not TYPE_CHECKING:
609 def copy(self, *args: Any, **kwargs: Any) -> Any:
610 """See `pydantic.BaseModel.copy`."""
611 return super().copy(*args, **kwargs)
613 def model_dump(self, *args: Any, **kwargs: Any) -> Any:
614 """See `pydantic.BaseModel.model_dump`."""
615 return super().model_dump(*args, **kwargs)
617 def model_dump_json(self, *args: Any, **kwargs: Any) -> Any:
618 """See `pydantic.BaseModel.model_dump_json`."""
619 return super().model_dump(*args, **kwargs)
621 def model_copy(self, *args: Any, **kwargs: Any) -> Any:
622 """See `pydantic.BaseModel.model_copy`."""
623 return super().model_copy(*args, **kwargs)
625 @classmethod
626 def model_construct(cls, *args: Any, **kwargs: Any) -> Any: # type: ignore[misc, override]
627 """See `pydantic.BaseModel.model_construct`."""
628 return super().model_construct(*args, **kwargs)
630 @classmethod
631 def model_json_schema(cls, *args: Any, **kwargs: Any) -> Any:
632 """See `pydantic.BaseModel.model_json_schema`."""
633 return super().model_json_schema(*args, **kwargs)
635 @classmethod
636 def model_validate(cls, *args: Any, **kwargs: Any) -> Any:
637 """See `pydantic.BaseModel.model_validate`."""
638 return super().model_validate(*args, **kwargs)
640 @classmethod
641 def model_validate_json(cls, *args: Any, **kwargs: Any) -> Any:
642 """See `pydantic.BaseModel.model_validate_json`."""
643 return super().model_validate_json(*args, **kwargs)
645 @classmethod
646 def model_validate_strings(cls, *args: Any, **kwargs: Any) -> Any:
647 """See `pydantic.BaseModel.model_validate_strings`."""
648 return super().model_validate_strings(*args, **kwargs)
651class PredictedQuantumInfo(QuantumInfo):
652 """A typed dictionary that annotates the attributes of the NetworkX graph
653 node data for a predicted quantum.
655 Since NetworkX types are not generic over their node mapping type, this has
656 to be used explicitly, e.g.::
658 node_data: PredictedQuantumInfo = xgraph.nodes[quantum_id]
660 where ``xgraph`` can be either `PredictedQuantumGraph.quantum_only_xgraph`
661 or `PredictedQuantumGraph.bipartite_xgraph`.
662 """
664 quantum: Quantum
665 """Quantum object that can be passed directly to an executor.
667 This attribute is only present if
668 `PredictedQuantumGraph.build_execution_quanta` has been run on this node's
669 quantum ID already.
670 """
673class PredictedDatasetInfo(DatasetInfo):
674 """A typed dictionary that annotates the attributes of the NetworkX graph
675 node data for a dataset.
677 Since NetworkX types are not generic over their node mapping type, this has
678 to be used explicitly, e.g.::
680 node_data: PredictedDatasetInfo = xgraph.nodes[dataset_ids]
682 where ``xgraph`` is from the `PredictedQuantumGraph.bipartite_xgraph`
683 property.
684 """
687class PredictedQuantumGraph(BaseQuantumGraph):
688 """A directed acyclic graph that predicts a processing run and supports it
689 during execution.
691 Parameters
692 ----------
693 components : `PredictedQuantumGraphComponents`
694 A struct of components used to construct the graph.
696 Notes
697 -----
698 Iteration over a `PredictedQuantumGraph` yields loaded quantum IDs in
699 deterministic topological order (but the tiebreaker is unspecified). The
700 `len` of a `PredictedQuantumGraph` is the number of loaded non-init quanta,
701 i.e. the same as the number of quanta iterated over.
702 """
704 def __init__(self, components: PredictedQuantumGraphComponents):
705 if not components.header.graph_type == "predicted":
706 raise TypeError(f"Header is for a {components.header.graph_type!r} graph, not 'predicted'.")
707 super().__init__(components.header, components.pipeline_graph)
708 self._quantum_only_xgraph = networkx.DiGraph()
709 self._bipartite_xgraph = networkx.DiGraph()
710 self._quanta_by_task_label: dict[str, dict[DataCoordinate, uuid.UUID]] = {
711 task_label: {} for task_label in self.pipeline_graph.tasks.keys()
712 }
713 self._datasets_by_type: dict[str, dict[DataCoordinate, uuid.UUID]] = {
714 dataset_type_name: {} for dataset_type_name in self.pipeline_graph.dataset_types.keys()
715 }
716 self._datasets_by_type[self.pipeline_graph.packages_dataset_type.name] = {}
717 self._dimension_data = components.dimension_data
718 self._add_init_quanta(components.init_quanta)
719 self._quantum_datasets: dict[uuid.UUID, PredictedQuantumDatasetsModel] = {}
720 self._expanded_data_ids: dict[DataCoordinate, DataCoordinate] = {}
721 self._add_thin_graph(components.thin_graph)
722 for quantum_datasets in components.quantum_datasets.values():
723 self._add_quantum_datasets(quantum_datasets)
724 if not components.thin_graph.edges:
725 # If we loaded the thin_graph, we've already populated this graph.
726 self._quantum_only_xgraph.update(
727 networkx.algorithms.bipartite.projected_graph(
728 networkx.DiGraph(self._bipartite_xgraph),
729 self._quantum_only_xgraph.nodes.keys(),
730 )
731 )
732 if _LOG.isEnabledFor(logging.DEBUG):
733 for quantum_id in self:
734 _LOG.debug(
735 "%s: %s @ %s",
736 quantum_id,
737 self._quantum_only_xgraph.nodes[quantum_id]["task_label"],
738 self._quantum_only_xgraph.nodes[quantum_id]["data_id"].required,
739 )
741 def _add_init_quanta(self, component: PredictedInitQuantaModel) -> None:
742 self._init_quanta = {q.task_label: q for q in component.root}
743 empty_data_id = DataCoordinate.make_empty(self.pipeline_graph.universe)
744 for quantum_datasets in self._init_quanta.values():
745 for init_datasets in itertools.chain(
746 quantum_datasets.inputs.values(), quantum_datasets.outputs.values()
747 ):
748 for init_dataset in init_datasets:
749 self._datasets_by_type[init_dataset.dataset_type_name][empty_data_id] = (
750 init_dataset.dataset_id
751 )
752 _LOG.debug(
753 "%s: %s @ init",
754 quantum_datasets.quantum_id,
755 quantum_datasets.task_label,
756 )
758 def _add_thin_graph(self, component: PredictedThinGraphModel) -> None:
759 self._quantum_only_xgraph.add_edges_from(component.edges)
760 for task_label, thin_quanta_for_task in component.quanta.items():
761 for thin_quantum in thin_quanta_for_task:
762 self._add_quantum(thin_quantum.quantum_id, task_label, thin_quantum.data_coordinate)
764 def _add_quantum_datasets(self, quantum_datasets: PredictedQuantumDatasetsModel) -> None:
765 self._quantum_datasets[quantum_datasets.quantum_id] = quantum_datasets
766 self._add_quantum(
767 quantum_datasets.quantum_id, quantum_datasets.task_label, quantum_datasets.data_coordinate
768 )
769 task_node = self.pipeline_graph.tasks[quantum_datasets.task_label]
770 for connection_name, input_datasets in quantum_datasets.inputs.items():
771 pipeline_edge = task_node.get_input_edge(connection_name)
772 for input_dataset in input_datasets:
773 self._add_dataset(input_dataset)
774 self._bipartite_xgraph.add_edge(
775 input_dataset.dataset_id,
776 quantum_datasets.quantum_id,
777 key=connection_name,
778 is_read=True,
779 )
780 # There might be multiple input connections for the same
781 # dataset type.
782 self._bipartite_xgraph.edges[
783 input_dataset.dataset_id, quantum_datasets.quantum_id
784 ].setdefault("pipeline_edges", []).append(pipeline_edge)
785 for connection_name, output_datasets in quantum_datasets.outputs.items():
786 pipeline_edges = [task_node.get_output_edge(connection_name)]
787 for output_dataset in output_datasets:
788 self._add_dataset(output_dataset)
789 self._bipartite_xgraph.add_edge(
790 quantum_datasets.quantum_id,
791 output_dataset.dataset_id,
792 key=connection_name,
793 is_read=False,
794 pipeline_edges=pipeline_edges,
795 )
797 def _add_quantum(
798 self, quantum_id: uuid.UUID, task_label: str, data_coordinate_values: Sequence[DataIdValue]
799 ) -> None:
800 task_node = self.pipeline_graph.tasks[task_label]
801 self._quantum_only_xgraph.add_node(quantum_id, task_label=task_label, pipeline_node=task_node)
802 self._bipartite_xgraph.add_node(quantum_id, task_label=task_label, pipeline_node=task_node)
803 data_coordinate_values = tuple(data_coordinate_values)
804 dimensions = self.pipeline_graph.tasks[task_label].dimensions
805 data_id = DataCoordinate.from_full_values(dimensions, tuple(data_coordinate_values))
806 self._quantum_only_xgraph.nodes[quantum_id].setdefault("data_id", data_id)
807 self._bipartite_xgraph.nodes[quantum_id].setdefault("data_id", data_id)
808 self._quanta_by_task_label[task_label][data_id] = quantum_id
810 def _add_dataset(self, model: PredictedDatasetModel) -> None:
811 dataset_type_node = self.pipeline_graph.dataset_types[model.dataset_type_name]
812 data_id = DataCoordinate.from_full_values(dataset_type_node.dimensions, tuple(model.data_coordinate))
813 self._bipartite_xgraph.add_node(
814 model.dataset_id,
815 dataset_type_name=dataset_type_node.name,
816 pipeline_node=dataset_type_node,
817 run=model.run,
818 )
819 self._bipartite_xgraph.nodes[model.dataset_id].setdefault("data_id", data_id)
820 self._datasets_by_type[model.dataset_type_name][data_id] = model.dataset_id
822 @classmethod
823 def open(
824 cls,
825 uri: ResourcePathExpression,
826 page_size: int = DEFAULT_PAGE_SIZE,
827 import_mode: TaskImportMode = TaskImportMode.ASSUME_CONSISTENT_EDGES,
828 ) -> AbstractContextManager[PredictedQuantumGraphReader]:
829 """Open a quantum graph and return a reader to load from it.
831 Parameters
832 ----------
833 uri : convertible to `lsst.resources.ResourcePath`
834 URI to open. Should have a ``.qg`` extension.
835 page_size : `int`, optional
836 Approximate number of bytes to read at once from address files.
837 Note that this does not set a page size for *all* reads, but it
838 does affect the smallest, most numerous reads.
839 import_mode : `.pipeline_graph.TaskImportMode`, optional
840 How to handle importing the task classes referenced in the pipeline
841 graph.
843 Returns
844 -------
845 reader : `contextlib.AbstractContextManager` [ \
846 `PredictedQuantumGraphReader` ]
847 A context manager that returns the reader when entered.
848 """
849 return PredictedQuantumGraphReader.open(uri, page_size=page_size, import_mode=import_mode)
851 @classmethod
852 def read_execution_quanta(
853 cls,
854 uri: ResourcePathExpression,
855 quantum_ids: Iterable[uuid.UUID] | None = None,
856 page_size: int = DEFAULT_PAGE_SIZE,
857 ) -> PredictedQuantumGraph:
858 """Read one or more executable quanta from a quantum graph file.
860 Parameters
861 ----------
862 uri : convertible to `lsst.resources.ResourcePath`
863 URI to open. Should have a ``.qg`` extension for new quantum graph
864 files, or ``.qgraph`` for the old format.
865 quantum_ids : `~collections.abc.Iterable` [ `uuid.UUID` ], optional
866 Iterable of quantum IDs to load. If not provided, all quanta will
867 be loaded. The UUIDs of special init quanta will be ignored.
868 page_size : `int`, optional
869 Approximate number of bytes to read at once from address files.
870 Note that this does not set a page size for *all* reads, but it
871 does affect the smallest, most numerous reads.
873 Returns
874 -------
875 quantum_graph : `PredictedQuantumGraph` ]
876 A quantum graph that can build execution quanta for all of the
877 given IDs.
878 """
879 return PredictedQuantumGraphComponents.read_execution_quanta(
880 uri,
881 quantum_ids,
882 page_size=page_size,
883 ).assemble()
885 @classmethod
886 def make_empty(
887 cls,
888 universe: DimensionUniverse,
889 *,
890 output_run: str,
891 inputs: Iterable[str] = (),
892 output: str | None = None,
893 add_packages: bool = True,
894 ) -> PredictedQuantumGraph:
895 """Make an empty quantum graph with no tasks.
897 Parameters
898 ----------
899 universe : `lsst.daf.butler.DimensionUniverse`
900 Definitions for all butler dimensions.
901 output_run : `str`
902 Output run collection.
903 inputs : `~collections.abc.Iterable` [`str`], optional
904 Iterable of input collection names.
905 output : `str` or `None`, optional
906 Output chained collection.
907 add_packages : `bool`, optional
908 Whether to add the special init quantum that writes the 'packages'
909 dataset. The default (`True`) is consistent with
910 `~..quantum_graph_builder.QuantumGraphBuilder` behavior when there
911 are no regular quanta generated.
913 Returns
914 -------
915 quantum_graph : `PredictedQuantumGraph`
916 An empty quantum graph.
917 """
918 return cls(
919 PredictedQuantumGraphComponents.make_empty(
920 universe,
921 output_run=output_run,
922 inputs=inputs,
923 output=output,
924 add_packages=add_packages,
925 )
926 )
928 @property
929 def quanta_by_task(self) -> Mapping[str, Mapping[DataCoordinate, uuid.UUID]]:
930 """A nested mapping of all quanta, keyed first by task name and then by
931 data ID.
933 Notes
934 -----
935 This is populated by the ``thin_graph`` component (all quanta are
936 added) and the `quantum_datasets`` component (only loaded quanta are
937 added). All tasks in the pipeline graph are included, even if none of
938 their quanta were loaded (i.e. nested mappings may be empty).
940 The returned object may be an internal dictionary; as the type
941 annotation indicates, it should not be modified in place.
942 """
943 return self._quanta_by_task_label
945 @property
946 def datasets_by_type(self) -> Mapping[str, Mapping[DataCoordinate, uuid.UUID]]:
947 """A nested mapping of all datasets, keyed first by dataset type name
948 and then by data ID.
950 Notes
951 -----
952 This is populated only by the ``quantum_datasets`` and ``init_quanta``
953 components, and only datasets referenced by loaded quanta are present.
954 All dataset types in the pipeline graph are included, even if none of
955 their datasets were loaded (i.e. nested mappings may be empty).
957 The returned object may be an internal dictionary; as the type
958 annotation indicates, it should not be modified in place.
959 """
960 return self._datasets_by_type
962 @property
963 def quantum_only_xgraph(self) -> networkx.DiGraph:
964 """A directed acyclic graph with quanta as nodes and datasets elided.
966 Notes
967 -----
968 Node keys are quantum UUIDs, and are populated by the ``thin_graph``
969 component (all nodes and edges) and ``quantum_datasets`` component
970 (only those that were loaded).
972 Node state dictionaries are described by the
973 `PredictedQuantumInfo` type.
975 The returned object is a read-only view of an internal one.
976 """
977 return self._quantum_only_xgraph.copy(as_view=True)
979 @property
980 def bipartite_xgraph(self) -> networkx.MultiDiGraph:
981 """A directed acyclic graph with quantum and dataset nodes.
983 This graph never includes init-input and init-output datasets.
985 Notes
986 -----
987 Node keys are quantum or dataset UUIDs. Nodes for quanta are present
988 if the ``thin_graph`` component is loaded (all nodes) or if the
989 ``quantum_datasets`` component is loaded (just loaded quanta). Edges
990 and dataset nodes are only present for quanta whose
991 ``quantum_datasets`` were loaded.
993 Node state dictionaries are described by the
994 `PredictedQuantumInfo` and `PredictedDatasetInfo` types.
996 The returned object is a read-only view of an internal one.
997 """
998 return self._bipartite_xgraph.copy(as_view=True)
1000 @property
1001 def dimension_data(self) -> DimensionDataAttacher | None:
1002 """All dimension records needed to expand the data IDS in the graph.
1004 This may be `None` if the dimension data was not loaded. If all
1005 execution quanta have been built, all records are guaranteed to have
1006 been deserialized and the ``records`` attribute is complete. In other
1007 cases some records may still only be present in the ``deserializers``
1008 attribute.
1009 """
1010 return self._dimension_data
1012 def __iter__(self) -> Iterator[uuid.UUID]:
1013 for quanta_for_task in self.quanta_by_task.values():
1014 for data_id in sorted(quanta_for_task.keys()):
1015 yield quanta_for_task[data_id]
1017 def __len__(self) -> int:
1018 return len(self._quantum_only_xgraph)
1020 def get_init_inputs(self, task_label: str) -> dict[ConnectionName, DatasetRef]:
1021 """Return the init-input datasets for the given task.
1023 Parameters
1024 ----------
1025 task_label : `str`
1026 Label of the task.
1028 Returns
1029 -------
1030 init_inputs : `dict` [ `str`, `lsst.daf.butler.DatasetRef` ]
1031 Dataset references for init-input datasets, keyed by connection
1032 name. Dataset types storage classes match the task connection
1033 declarations, not necessarily the data repository, and may be
1034 components.
1035 """
1036 if self._init_quanta is None:
1037 raise IncompleteQuantumGraphError("The init_quanta component was not loaded.")
1038 task_init_node = self.pipeline_graph.tasks[task_label].init
1039 return {
1040 connection_name: task_init_node.inputs[connection_name].adapt_dataset_ref(
1041 self._make_init_ref(datasets[0])
1042 )
1043 for connection_name, datasets in self._init_quanta[task_label].inputs.items()
1044 }
1046 def get_init_outputs(self, task_label: str) -> dict[ConnectionName, DatasetRef]:
1047 """Return the init-output datasets for the given task.
1049 Parameters
1050 ----------
1051 task_label : `str`
1052 Label of the task. ``""`` may be used to get global init-outputs.
1054 Returns
1055 -------
1056 init_outputs : `dict` [ `str`, `lsst.daf.butler.DatasetRef` ]
1057 Dataset references for init-outputs datasets, keyed by connection
1058 name. Dataset types storage classes match the task connection
1059 declarations, not necessarily the data repository.
1060 """
1061 if self._init_quanta is None:
1062 raise IncompleteQuantumGraphError("The init_quanta component was not loaded.")
1063 if not task_label:
1064 (datasets,) = self._init_quanta[""].outputs.values()
1065 return {
1066 acc.PACKAGES_INIT_OUTPUT_NAME: DatasetRef(
1067 self.pipeline_graph.packages_dataset_type,
1068 DataCoordinate.make_empty(self.pipeline_graph.universe),
1069 run=datasets[0].run,
1070 id=datasets[0].dataset_id,
1071 conform=False,
1072 )
1073 }
1074 task_init_node = self.pipeline_graph.tasks[task_label].init
1075 result: dict[ConnectionName, DatasetRef] = {}
1076 for connection_name, datasets in self._init_quanta[task_label].outputs.items():
1077 if connection_name == acc.CONFIG_INIT_OUTPUT_CONNECTION_NAME:
1078 edge = task_init_node.config_output
1079 else:
1080 edge = task_init_node.outputs[connection_name]
1081 result[connection_name] = edge.adapt_dataset_ref(self._make_init_ref(datasets[0]))
1082 return result
1084 def _make_init_ref(self, dataset: PredictedDatasetModel) -> DatasetRef:
1085 dataset_type = self.pipeline_graph.dataset_types[dataset.dataset_type_name].dataset_type
1086 return DatasetRef(
1087 dataset_type,
1088 DataCoordinate.make_empty(self.pipeline_graph.universe),
1089 run=dataset.run,
1090 id=dataset.dataset_id,
1091 conform=False,
1092 )
1094 def build_execution_quanta(
1095 self,
1096 quantum_ids: Iterable[uuid.UUID] | None = None,
1097 task_label: str | None = None,
1098 ) -> dict[uuid.UUID, Quantum]:
1099 """Build `lsst.daf.butler.Quantum` objects suitable for executing
1100 tasks.
1102 In addition to returning the quantum objects directly, this also causes
1103 the `quantum_only_xgraph` and `bipartite_xgraph` graphs to include a
1104 ``quantum`` attribute for the affected quanta.
1106 Parameters
1107 ----------
1108 quantum_ids : `~collections.abc.Iterable` [ `uuid.UUID` ], optional
1109 IDs of all quanta to return. If not provided, all quanta for the
1110 given task label (if given) or graph are returned.
1111 task_label : `str`, optional
1112 Task label whose quanta should be generated. Ignored if
1113 ``quantum_ids`` is not `None`.
1115 Returns
1116 -------
1117 quanta : `dict` [ `uuid.UUID`, `lsst.daf.butler.Quantum` ]
1118 Mapping of quanta, keyed by UUID. All dataset types are adapted to
1119 the task's storage class declarations and inputs may be components.
1120 All data IDs have dimension records attached.
1121 """
1122 if not self._init_quanta:
1123 raise IncompleteQuantumGraphError(
1124 "Cannot build execution quanta without loading the ``init_quanta`` component."
1125 )
1126 if quantum_ids is None:
1127 if task_label is not None:
1128 quantum_ids = self._quanta_by_task_label[task_label].values()
1129 else:
1130 quantum_ids = self._quantum_only_xgraph.nodes.keys()
1131 else:
1132 # Guard against single-pass iterators.
1133 quantum_ids = list(quantum_ids)
1134 del task_label # make sure we don't accidentally use this.
1135 result: dict[uuid.UUID, Quantum] = {}
1136 self._expand_execution_quantum_data_ids(quantum_ids)
1137 task_init_datastore_records: dict[TaskLabel, dict[DatastoreName, DatastoreRecordData]] = {}
1138 for quantum_id in quantum_ids:
1139 quantum_node_dict: PredictedQuantumInfo = self._quantum_only_xgraph.nodes[quantum_id]
1140 if "quantum" in quantum_node_dict:
1141 result[quantum_id] = quantum_node_dict["quantum"]
1142 continue
1143 # We've declare the info dict keys to all be required because that
1144 # saves a lot of casting, but the reality is that they can either
1145 # be fully populated or totally unpopulated. But that makes mypy
1146 # think the check above always succeeds.
1147 try: # type:ignore [unreachable]
1148 quantum_datasets = self._quantum_datasets[quantum_id]
1149 except KeyError:
1150 raise IncompleteQuantumGraphError(
1151 f"Full quantum information for {quantum_id} was not loaded."
1152 ) from None
1153 task_node = self.pipeline_graph.tasks[quantum_datasets.task_label]
1154 quantum_data_id = self._expanded_data_ids[self._bipartite_xgraph.nodes[quantum_id]["data_id"]]
1155 inputs = self._build_execution_quantum_refs(task_node, quantum_datasets.inputs)
1156 outputs = self._build_execution_quantum_refs(task_node, quantum_datasets.outputs)
1157 if task_node.label not in task_init_datastore_records:
1158 task_init_datastore_records[task_node.label] = self._init_quanta[
1159 task_node.label
1160 ].deserialize_datastore_records()
1161 quantum = Quantum(
1162 taskName=task_node.task_class_name,
1163 taskClass=task_node.task_class,
1164 dataId=quantum_data_id,
1165 initInputs={
1166 ref.datasetType: ref for ref in self.get_init_inputs(quantum_datasets.task_label).values()
1167 },
1168 inputs=inputs,
1169 outputs=outputs,
1170 datastore_records=DatastoreRecordData.merge_mappings(
1171 quantum_datasets.deserialize_datastore_records(),
1172 task_init_datastore_records[task_node.label],
1173 ),
1174 )
1175 self._quantum_only_xgraph.nodes[quantum_id]["quantum"] = quantum
1176 self._bipartite_xgraph.nodes[quantum_id]["quantum"] = quantum
1177 result[quantum_id] = quantum
1178 return result
1180 def _expand_execution_quantum_data_ids(self, quantum_ids: Iterable[uuid.UUID]) -> None:
1181 if self._dimension_data is None:
1182 raise IncompleteQuantumGraphError(
1183 "Cannot build execution quanta without loading the ``dimension_data`` component."
1184 )
1185 data_ids_to_expand: dict[DimensionGroup, set[DataCoordinate]] = defaultdict(set)
1186 for quantum_id in quantum_ids:
1187 data_id: DataCoordinate = self._bipartite_xgraph.nodes[quantum_id]["data_id"]
1188 if data_id.hasRecords():
1189 self._expanded_data_ids[data_id] = data_id
1190 else:
1191 data_ids_to_expand[data_id.dimensions].add(data_id)
1192 for dataset_id in itertools.chain(
1193 self._bipartite_xgraph.predecessors(quantum_id),
1194 self._bipartite_xgraph.successors(quantum_id),
1195 ):
1196 data_id = self._bipartite_xgraph.nodes[dataset_id]["data_id"]
1197 if data_id.hasRecords():
1198 self._expanded_data_ids[data_id] = data_id
1199 else:
1200 data_ids_to_expand[data_id.dimensions].add(data_id)
1201 for dimensions, data_ids_for_dimensions in data_ids_to_expand.items():
1202 self._expanded_data_ids.update(
1203 (d, d) for d in self._dimension_data.attach(dimensions, data_ids_for_dimensions)
1204 )
1206 def _build_execution_quantum_refs(
1207 self, task_node: TaskNode, model_mapping: dict[ConnectionName, list[PredictedDatasetModel]]
1208 ) -> dict[DatasetType, list[DatasetRef]]:
1209 results: dict[DatasetType, list[DatasetRef]] = {}
1210 for connection_name, datasets in model_mapping.items():
1211 edge = task_node.get_edge(connection_name)
1212 dataset_type = edge.adapt_dataset_type(
1213 self.pipeline_graph.dataset_types[edge.parent_dataset_type_name].dataset_type
1214 )
1215 results[dataset_type] = [self._make_general_ref(dataset_type, d.dataset_id) for d in datasets]
1216 return results
1218 def _make_general_ref(self, dataset_type: DatasetType, dataset_id: uuid.UUID) -> DatasetRef:
1219 node_state = self._bipartite_xgraph.nodes[dataset_id]
1220 data_id = self._expanded_data_ids[node_state["data_id"]]
1221 return DatasetRef(dataset_type, data_id, run=node_state["run"], id=dataset_id)
1223 def make_init_qbb(
1224 self,
1225 butler_config: Config | ResourcePathExpression,
1226 *,
1227 config_search_paths: Iterable[str] | None = None,
1228 ) -> QuantumBackedButler:
1229 """Construct an quantum-backed butler suitable for reading and writing
1230 init input and init output datasets, respectively.
1232 This only requires the ``init_quanta`` component to have been loaded.
1234 Parameters
1235 ----------
1236 butler_config : `~lsst.daf.butler.Config` or \
1237 `~lsst.resources.ResourcePathExpression`
1238 A butler repository root, configuration filename, or configuration
1239 instance.
1240 config_search_paths : `~collections.abc.Iterable` [ `str` ], optional
1241 Additional search paths for butler configuration.
1243 Returns
1244 -------
1245 qbb : `~lsst.daf.butler.QuantumBackedButler`
1246 A limited butler that can ``get`` init-input datasets and ``put``
1247 init-output datasets.
1248 """
1249 # Collect all init input/output dataset IDs.
1250 predicted_inputs: set[uuid.UUID] = set()
1251 predicted_outputs: set[uuid.UUID] = set()
1252 datastore_record_maps: list[dict[DatastoreName, DatastoreRecordData]] = []
1253 for init_quantum_datasets in self._init_quanta.values():
1254 predicted_inputs.update(
1255 d.dataset_id for d in itertools.chain.from_iterable(init_quantum_datasets.inputs.values())
1256 )
1257 predicted_outputs.update(
1258 d.dataset_id for d in itertools.chain.from_iterable(init_quantum_datasets.outputs.values())
1259 )
1260 datastore_record_maps.append(
1261 {
1262 datastore_name: DatastoreRecordData.from_simple(serialized_records)
1263 for datastore_name, serialized_records in init_quantum_datasets.datastore_records.items()
1264 }
1265 )
1266 # Remove intermediates from inputs.
1267 predicted_inputs -= predicted_outputs
1268 dataset_types = {d.name: d.dataset_type for d in self.pipeline_graph.dataset_types.values()}
1269 # Make butler from everything.
1270 return QuantumBackedButler.from_predicted(
1271 config=butler_config,
1272 predicted_inputs=predicted_inputs,
1273 predicted_outputs=predicted_outputs,
1274 dimensions=self.pipeline_graph.universe,
1275 datastore_records=DatastoreRecordData.merge_mappings(*datastore_record_maps),
1276 search_paths=list(config_search_paths) if config_search_paths is not None else None,
1277 dataset_types=dataset_types,
1278 )
1280 def write_init_outputs(self, butler: LimitedButler, skip_existing: bool = True) -> None:
1281 """Write the init-output datasets for all tasks in the quantum graph.
1283 This only requires the ``init_quanta`` component to have been loaded.
1285 Parameters
1286 ----------
1287 butler : `lsst.daf.butler.LimitedButler`
1288 A limited butler data repository client.
1289 skip_existing : `bool`, optional
1290 If `True` (default) ignore init-outputs that already exist. If
1291 `False`, raise.
1293 Raises
1294 ------
1295 lsst.daf.butler.registry.ConflictingDefinitionError
1296 Raised if an init-output dataset already exists and
1297 ``skip_existing=False``.
1298 """
1299 # Extract init-input and init-output refs from the QG.
1300 input_refs: dict[str, DatasetRef] = {}
1301 output_refs: dict[str, DatasetRef] = {}
1302 for task_node in self.pipeline_graph.tasks.values():
1303 if task_node.label not in self._init_quanta:
1304 continue
1305 input_refs.update(
1306 {ref.datasetType.name: ref for ref in self.get_init_inputs(task_node.label).values()}
1307 )
1308 output_refs.update(
1309 {
1310 ref.datasetType.name: ref
1311 for ref in self.get_init_outputs(task_node.label).values()
1312 if ref.datasetType.name != task_node.init.config_output.dataset_type_name
1313 }
1314 )
1315 for ref, is_stored in butler.stored_many(output_refs.values()).items():
1316 if is_stored:
1317 if not skip_existing:
1318 raise ConflictingDefinitionError(f"Init-output dataset {ref} already exists.")
1319 # We'll `put` whatever's left in output_refs at the end.
1320 del output_refs[ref.datasetType.name]
1321 # Instantiate tasks, reading overall init-inputs and gathering
1322 # init-output in-memory objects.
1323 init_outputs: list[tuple[Any, DatasetType]] = []
1324 self.pipeline_graph.instantiate_tasks(
1325 get_init_input=lambda dataset_type: butler.get(
1326 input_refs[dataset_type.name].overrideStorageClass(dataset_type.storageClass)
1327 ),
1328 init_outputs=init_outputs,
1329 # A task can be in the pipeline graph without having an init
1330 # quantum if it doesn't have any regular quanta either (e.g. they
1331 # were all skipped), and the _init_quanta has a "" entry for global
1332 # init-outputs that we don't want to pass here.
1333 labels=self.pipeline_graph.tasks.keys() & self._init_quanta.keys(),
1334 )
1335 # Write init-outputs that weren't already present.
1336 for obj, dataset_type in init_outputs:
1337 if new_ref := output_refs.get(dataset_type.name):
1338 assert new_ref.datasetType.storageClass_name == dataset_type.storageClass_name, (
1339 "QG init refs should use task connection storage classes."
1340 )
1341 butler.put(obj, new_ref)
1343 def write_configs(self, butler: LimitedButler, compare_existing: bool = True) -> None:
1344 """Write the config datasets for all tasks in the quantum graph.
1346 Parameters
1347 ----------
1348 butler : `lsst.daf.butler.LimitedButler`
1349 A limited butler data repository client.
1350 compare_existing : `bool`, optional
1351 If `True` check configs that already exist for consistency. If
1352 `False`, always raise if configs already exist.
1354 Raises
1355 ------
1356 lsst.daf.butler.registry.ConflictingDefinitionError
1357 Raised if an config dataset already exists and
1358 ``compare_existing=False``, or if the existing config is not
1359 consistent with the config in the quantum graph.
1360 """
1361 to_put: list[tuple[PipelineTaskConfig, DatasetRef]] = []
1362 for task_node in self.pipeline_graph.tasks.values():
1363 if task_node.label not in self._init_quanta:
1364 continue
1365 dataset_type_name = task_node.init.config_output.dataset_type_name
1366 ref = self.get_init_outputs(task_node.label)[acc.CONFIG_INIT_OUTPUT_CONNECTION_NAME]
1367 try:
1368 old_config = butler.get(ref)
1369 except (LookupError, FileNotFoundError):
1370 old_config = None
1371 if old_config is not None:
1372 if not compare_existing:
1373 raise ConflictingDefinitionError(f"Config dataset {ref} already exists.")
1374 if not task_node.config.compare(old_config, shortcut=False, output=log_config_mismatch):
1375 raise ConflictingDefinitionError(
1376 f"Config does not match existing task config {dataset_type_name!r} in "
1377 "butler; tasks configurations must be consistent within the same run collection."
1378 )
1379 else:
1380 to_put.append((task_node.config, ref))
1381 # We do writes at the end to minimize the mess we leave behind when we
1382 # raise an exception.
1383 for config, ref in to_put:
1384 butler.put(config, ref)
1386 def write_packages(self, butler: LimitedButler, compare_existing: bool = True) -> None:
1387 """Write the 'packages' dataset for the currently-active software
1388 versions.
1390 Parameters
1391 ----------
1392 butler : `lsst.daf.butler.LimitedButler`
1393 A limited butler data repository client.
1394 compare_existing : `bool`, optional
1395 If `True` check packages that already exist for consistency. If
1396 `False`, always raise if the packages dataset already exists.
1398 Raises
1399 ------
1400 lsst.daf.butler.registry.ConflictingDefinitionError
1401 Raised if the packages dataset already exists and is not consistent
1402 with the current packages.
1403 """
1404 new_packages = Packages.fromSystem()
1405 (ref,) = self.get_init_outputs("").values()
1406 try:
1407 packages = butler.get(ref)
1408 except (LookupError, FileNotFoundError):
1409 packages = None
1410 if packages is not None:
1411 if not compare_existing:
1412 raise ConflictingDefinitionError(f"Packages dataset {ref} already exists.")
1413 if compare_packages(packages, new_packages):
1414 # have to remove existing dataset first; butler has no
1415 # replace option.
1416 butler.pruneDatasets([ref], unstore=True, purge=True)
1417 butler.put(packages, ref)
1418 else:
1419 butler.put(new_packages, ref)
1421 def init_output_run(self, butler: LimitedButler, existing: bool = True) -> None:
1422 """Initialize a new output RUN collection by writing init-output
1423 datasets (including configs and packages).
1425 Parameters
1426 ----------
1427 butler : `lsst.daf.butler.LimitedButler`
1428 A limited butler data repository client.
1429 existing : `bool`, optional
1430 If `True` check or ignore outputs that already exist. If
1431 `False`, always raise if an output dataset already exists.
1433 Raises
1434 ------
1435 lsst.daf.butler.registry.ConflictingDefinitionError
1436 Raised if there are existing init output datasets, and either
1437 ``existing=False`` or their contents are not compatible with this
1438 graph.
1439 """
1440 self.write_configs(butler, compare_existing=existing)
1441 self.write_packages(butler, compare_existing=existing)
1442 self.write_init_outputs(butler, skip_existing=existing)
1444 @classmethod
1445 def from_old_quantum_graph(cls, old_quantum_graph: QuantumGraph) -> PredictedQuantumGraph:
1446 """Construct from an old `QuantumGraph` instance.
1448 Parameters
1449 ----------
1450 old_quantum_graph : `QuantumGraph`
1451 Quantum graph to transform.
1453 Returns
1454 -------
1455 predicted_quantum_graph : `PredictedQuantumGraph`
1456 A new predicted quantum graph.
1457 """
1458 return PredictedQuantumGraphComponents.from_old_quantum_graph(old_quantum_graph).assemble()
1460 def to_old_quantum_graph(self) -> QuantumGraph:
1461 """Transform into an old `QuantumGraph` instance.
1463 Returns
1464 -------
1465 old_quantum_graph : `QuantumGraph`
1466 Old quantum graph.
1468 Notes
1469 -----
1470 This can only be called on graphs that have loaded all quantum
1471 datasets, init datasets, and dimension records.
1472 """
1473 from ..graph import QuantumGraph
1475 quanta: dict[TaskDef, set[Quantum]] = {}
1476 quantum_to_quantum_id: dict[Quantum, uuid.UUID] = {}
1477 init_inputs: dict[TaskDef, list[DatasetRef]] = {}
1478 init_outputs: dict[TaskDef, list[DatasetRef]] = {}
1479 for task_def in self.pipeline_graph._iter_task_defs():
1480 if not self._quanta_by_task_label.get(task_def.label):
1481 continue
1482 quanta_for_task: set[Quantum] = set()
1483 for quantum_id, quantum in self.build_execution_quanta(task_label=task_def.label).items():
1484 quanta_for_task.add(quantum)
1485 quantum_to_quantum_id[quantum] = quantum_id
1486 quanta[task_def] = quanta_for_task
1487 init_inputs[task_def] = list(self.get_init_inputs(task_def.label).values())
1488 init_outputs[task_def] = list(self.get_init_outputs(task_def.label).values())
1489 global_init_outputs = list(self.get_init_outputs("").values())
1490 registry_dataset_types = [d.dataset_type for d in self.pipeline_graph.dataset_types.values()]
1491 result = object.__new__(QuantumGraph)
1492 result._buildGraphs(
1493 quanta,
1494 _quantumToNodeId=quantum_to_quantum_id,
1495 metadata=self.header.to_old_metadata(),
1496 universe=self.pipeline_graph.universe,
1497 initInputs=init_inputs,
1498 initOutputs=init_outputs,
1499 globalInitOutputs=global_init_outputs,
1500 registryDatasetTypes=registry_dataset_types,
1501 )
1502 return result
1504 def _make_summary(self) -> QgraphSummary:
1505 from ..graph import QgraphSummary, QgraphTaskSummary
1507 summary = QgraphSummary(
1508 cmdLine=self.header.command or None,
1509 creationUTC=str(self.header.timestamp) if self.header.timestamp is not None else None,
1510 inputCollection=self.header.inputs or None,
1511 outputCollection=self.header.output,
1512 outputRun=self.header.output_run,
1513 )
1514 for task_label, quanta_for_task in self.quanta_by_task.items():
1515 task_summary = QgraphTaskSummary(taskLabel=task_label, numQuanta=len(quanta_for_task))
1516 task_node = self.pipeline_graph.tasks[task_label]
1517 for quantum_id in quanta_for_task.values():
1518 quantum_datasets = self._quantum_datasets[quantum_id]
1519 for connection_name, input_datasets in quantum_datasets.inputs.items():
1520 task_summary.numInputs[
1521 task_node.get_input_edge(connection_name).parent_dataset_type_name
1522 ] += len(input_datasets)
1523 for connection_name, output_datasets in quantum_datasets.outputs.items():
1524 task_summary.numOutputs[
1525 task_node.get_output_edge(connection_name).parent_dataset_type_name
1526 ] += len(output_datasets)
1527 summary.qgraphTaskSummaries[task_label] = task_summary
1528 return summary
1531@dataclasses.dataclass(kw_only=True)
1532class PredictedQuantumGraphComponents:
1533 """A helper class for building and writing predicted quantum graphs.
1535 Notes
1536 -----
1537 This class is a simple struct of model classes to allow different tools
1538 that build predicted quantum graphs to assemble them in whatever order they
1539 prefer. It does not enforce any internal invariants (e.g. the quantum and
1540 dataset counts in the header, different representations of quanta, internal
1541 ID sorting, etc.), but it does provide methods that can satisfy them.
1542 """
1544 def __post_init__(self) -> None:
1545 self.header.graph_type = "predicted"
1547 header: HeaderModel = dataclasses.field(default_factory=HeaderModel)
1548 """Basic metadata about the graph."""
1550 pipeline_graph: PipelineGraph
1551 """Description of the pipeline this graph runs, including all task label
1552 and dataset type definitions.
1554 This may include tasks that do not have any quanta (e.g. due to skipping
1555 already-executed tasks).
1557 This also includes the dimension universe used to construct the graph.
1558 """
1560 dimension_data: DimensionDataAttacher | None = None
1561 """Object that can attach dimension records to data IDs.
1562 """
1564 init_quanta: PredictedInitQuantaModel = dataclasses.field(default_factory=PredictedInitQuantaModel)
1565 """A list of special quanta that describe the init-inputs and init-outputs
1566 of the graph.
1568 Tasks that are included in the pipeline graph but do not have any quanta
1569 may or may not have an init quantum, but tasks that do have regular quanta
1570 always have an init quantum as well.
1572 When used to construct a `PredictedQuantumGraph`, this must have either
1573 zero entries or all tasks in the pipeline.
1574 """
1576 thin_graph: PredictedThinGraphModel = dataclasses.field(default_factory=PredictedThinGraphModel)
1577 """A lightweight quantum-quantum DAG with task labels and data IDs only.
1579 This does not include the special "init" quanta.
1580 """
1582 quantum_datasets: dict[uuid.UUID, PredictedQuantumDatasetsModel] = dataclasses.field(default_factory=dict)
1583 """The full descriptions of all quanta, including input and output
1584 dataset, keyed by UUID.
1586 When used to construct a `PredictedQuantumGraph`, this need not have all
1587 entries.
1589 This does not include special "init" quanta.
1590 """
1592 @classmethod
1593 def make_empty(
1594 cls,
1595 universe: DimensionUniverse,
1596 *,
1597 output_run: str,
1598 inputs: Iterable[str] = (),
1599 output: str | None = None,
1600 add_packages: bool = True,
1601 ) -> PredictedQuantumGraphComponents:
1602 """Make components for an empty quantum graph with no tasks.
1604 Parameters
1605 ----------
1606 universe : `lsst.daf.butler.DimensionUniverse`
1607 Definitions for all butler dimensions.
1608 output_run : `str`
1609 Output run collection.
1610 inputs : `~collections.abc.Iterable` [`str`], optional
1611 Iterable of input collection names.
1612 output : `str` or `None`, optional
1613 Output chained collection.
1614 add_packages : `bool`, optional
1615 Whether to add the special init quantum that writes the 'packages'
1616 dataset. The default (`True`) is consistent with
1617 `~..quantum_graph_builder.QuantumGraphBuilder` behavior when there
1618 are no regular quanta generated.
1620 Returns
1621 -------
1622 components : `PredictedQuantumGraphComponents`
1623 Components that can be used to build or write an empty quantum
1624 graph.
1625 """
1626 components = cls(pipeline_graph=PipelineGraph(universe=universe))
1627 components.header.inputs = list(inputs)
1628 components.header.output_run = output_run
1629 components.header.output = output
1630 if add_packages:
1631 components.init_quanta.root = [
1632 PredictedQuantumDatasetsModel.model_construct(
1633 quantum_id=generate_uuidv7(),
1634 task_label="",
1635 outputs={
1636 acc.PACKAGES_INIT_OUTPUT_NAME: [
1637 PredictedDatasetModel(
1638 dataset_id=generate_uuidv7(),
1639 dataset_type_name=acc.PACKAGES_INIT_OUTPUT_NAME,
1640 data_coordinate=[],
1641 run=output_run,
1642 )
1643 ]
1644 },
1645 )
1646 ]
1647 return components
1649 def make_dataset_ref(self, predicted: PredictedDatasetModel) -> DatasetRef:
1650 """Make a `lsst.daf.butler.DatasetRef` from information in the
1651 predicted quantum graph.
1653 Parameters
1654 ----------
1655 predicted : `PredictedDatasetModel`
1656 Model for the dataset in the predicted graph.
1658 Returns
1659 -------
1660 ref : `lsst.daf.butler.DatasetRef`
1661 A dataset reference. Data ID will be expanded if and only if
1662 the dimension data has been loaded.
1663 """
1664 try:
1665 dataset_type = self.pipeline_graph.dataset_types[predicted.dataset_type_name].dataset_type
1666 except KeyError:
1667 if predicted.dataset_type_name == acc.PACKAGES_INIT_OUTPUT_NAME:
1668 dataset_type = self.pipeline_graph.packages_dataset_type
1669 else:
1670 raise
1671 data_id = DataCoordinate.from_full_values(dataset_type.dimensions, tuple(predicted.data_coordinate))
1672 if self.dimension_data is not None:
1673 (data_id,) = self.dimension_data.attach(dataset_type.dimensions, [data_id])
1674 return DatasetRef(
1675 dataset_type,
1676 data_id,
1677 run=predicted.run,
1678 id=predicted.dataset_id,
1679 )
1681 def set_thin_graph(self) -> None:
1682 """Populate the `thin_graph` component from the `pipeline_graph`,
1683 `quantum_datasets` components (which must be complete).
1684 """
1685 bipartite_xgraph = networkx.DiGraph()
1686 self.thin_graph.quanta = {task_label: [] for task_label in self.pipeline_graph.tasks}
1687 graph_quantum_ids: list[uuid.UUID] = []
1688 for quantum_datasets in self.quantum_datasets.values():
1689 self.thin_graph.quanta[quantum_datasets.task_label].append(
1690 PredictedThinQuantumModel.model_construct(
1691 quantum_id=quantum_datasets.quantum_id,
1692 data_coordinate=quantum_datasets.data_coordinate,
1693 )
1694 )
1695 for dataset in itertools.chain.from_iterable(quantum_datasets.inputs.values()):
1696 bipartite_xgraph.add_edge(dataset.dataset_id, quantum_datasets.quantum_id)
1697 for dataset in itertools.chain.from_iterable(quantum_datasets.outputs.values()):
1698 bipartite_xgraph.add_edge(quantum_datasets.quantum_id, dataset.dataset_id)
1699 graph_quantum_ids.append(quantum_datasets.quantum_id)
1700 quantum_only_xgraph: networkx.DiGraph = networkx.bipartite.projected_graph(
1701 bipartite_xgraph, graph_quantum_ids
1702 )
1703 self.thin_graph.edges = list(quantum_only_xgraph.edges)
1705 def set_header_counts(self) -> None:
1706 """Populate the quantum and dataset counts in the header from the
1707 `thin_graph`, `init_quanta`, and `quantum_datasets` components.
1708 """
1709 self.header.n_quanta = len(self.quantum_datasets)
1710 self.header.n_task_quanta = {
1711 task_label: len(thin_quanta) for task_label, thin_quanta in self.thin_graph.quanta.items()
1712 }
1713 all_dataset_ids: set[uuid.UUID] = set()
1714 for quantum_datasets in itertools.chain(self.init_quanta.root, self.quantum_datasets.values()):
1715 all_dataset_ids.update(quantum_datasets.iter_dataset_ids())
1716 self.header.n_datasets = len(all_dataset_ids)
1718 def update_output_run(self, output_run: str) -> None:
1719 """Update the output `~lsst.daf.butler.CollectionType.RUN` collection
1720 name in all datasets and regenerate all output dataset and quantum
1721 UUIDs.
1723 Parameters
1724 ----------
1725 output_run : `str`
1726 New output `~lsst.daf.butler.CollectionType.RUN` collection name.
1727 """
1728 uuid_map: dict[uuid.UUID, uuid.UUID] = {}
1729 # Do all outputs and then all inputs in separate passes so we don't
1730 # need to rely on topological ordering of anything.
1731 for quantum_datasets in itertools.chain(self.init_quanta.root, self.quantum_datasets.values()):
1732 new_quantum_id = generate_uuidv7()
1733 quantum_datasets.quantum_id = new_quantum_id
1734 for output_dataset in itertools.chain.from_iterable(quantum_datasets.outputs.values()):
1735 assert output_dataset.run == self.header.output_run, (
1736 f"Incorrect run {output_dataset.run} for output dataset {output_dataset.dataset_id}."
1737 )
1738 new_dataset_id = generate_uuidv7()
1739 uuid_map[output_dataset.dataset_id] = new_dataset_id
1740 output_dataset.dataset_id = new_dataset_id
1741 output_dataset.run = output_run
1742 for quantum_datasets in itertools.chain(self.init_quanta.root, self.quantum_datasets.values()):
1743 for input_dataset in itertools.chain.from_iterable(quantum_datasets.inputs.values()):
1744 if input_dataset.run == self.header.output_run:
1745 input_dataset.run = output_run
1746 input_dataset.dataset_id = uuid_map.get(
1747 input_dataset.dataset_id,
1748 # This dataset isn't necessary an output of the graph
1749 # just because it's in the output run; the graph could
1750 # have been built with extend_run=True.
1751 input_dataset.dataset_id,
1752 )
1753 # Update the keys of the quantum_datasets dict.
1754 self.quantum_datasets = {qd.quantum_id: qd for qd in self.quantum_datasets.values()}
1755 # Since the UUIDs have changed, the thin graph needs to be rewritten.
1756 self.set_thin_graph()
1757 # Update the header last, since we use it above to get the old run.
1758 self.header.output_run = output_run
1760 def assemble(self) -> PredictedQuantumGraph:
1761 """Construct a `PredictedQuantumGraph` from these components."""
1762 return PredictedQuantumGraph(self)
1764 @classmethod
1765 def read_execution_quanta(
1766 cls,
1767 uri: ResourcePathExpression,
1768 quantum_ids: Iterable[uuid.UUID] | None = None,
1769 page_size: int = DEFAULT_PAGE_SIZE,
1770 ) -> PredictedQuantumGraphComponents:
1771 """Read one or more executable quanta from a quantum graph file.
1773 Parameters
1774 ----------
1775 uri : convertible to `lsst.resources.ResourcePath`
1776 URI to open. Should have a ``.qg`` extension for new quantum graph
1777 files, or ``.qgraph`` for the old format.
1778 quantum_ids : `~collections.abc.Iterable` [ `uuid.UUID` ], optional
1779 Iterable of quantum IDs to load. If not provided, all quanta will
1780 be loaded. The UUIDs of special init quanta will be ignored.
1781 page_size : `int`, optional
1782 Approximate number of bytes to read at once from address files.
1783 Note that this does not set a page size for *all* reads, but it
1784 does affect the smallest, most numerous reads.
1786 Returns
1787 -------
1788 components : `PredictedQuantumGraphComponents` ]
1789 Components for quantum graph that can build execution quanta for
1790 all of the given IDs.
1791 """
1792 uri = ResourcePath(uri)
1793 if uri.getExtension() == ".qgraph":
1794 _LOG.warning(
1795 f"Reading and converting old quantum graph {uri}. "
1796 "Use the '.qg' extension to write in the new format."
1797 )
1798 from ..graph import QuantumGraph
1800 old_qg = QuantumGraph.loadUri(uri, nodes=quantum_ids)
1801 return PredictedQuantumGraphComponents.from_old_quantum_graph(old_qg)
1803 with PredictedQuantumGraph.open(uri, page_size=page_size) as reader:
1804 reader.read_execution_quanta(quantum_ids)
1805 return reader.components
1807 @classmethod
1808 def from_old_quantum_graph(cls, old_quantum_graph: QuantumGraph) -> PredictedQuantumGraphComponents:
1809 """Construct from an old `QuantumGraph` instance.
1811 Parameters
1812 ----------
1813 old_quantum_graph : `QuantumGraph`
1814 Quantum graph to transform.
1816 Returns
1817 -------
1818 components : `PredictedQuantumGraphComponents`
1819 Components for a new predicted quantum graph.
1820 """
1821 header = HeaderModel.from_old_quantum_graph(old_quantum_graph)
1822 result = cls(header=header, pipeline_graph=old_quantum_graph.pipeline_graph)
1823 result.init_quanta.update_from_old_quantum_graph(old_quantum_graph)
1824 dimension_data_extractor = DimensionDataExtractor.from_dimension_group(
1825 old_quantum_graph.pipeline_graph.get_all_dimensions()
1826 )
1827 for task_node in old_quantum_graph.pipeline_graph.tasks.values():
1828 task_quanta = old_quantum_graph.get_task_quanta(task_node.label)
1829 for quantum_id, quantum in task_quanta.items():
1830 result.quantum_datasets[quantum_id] = PredictedQuantumDatasetsModel.from_execution_quantum(
1831 task_node, quantum, quantum_id
1832 )
1833 dimension_data_extractor.update([cast(DataCoordinate, quantum.dataId)])
1834 for refs in itertools.chain(quantum.inputs.values(), quantum.outputs.values()):
1835 dimension_data_extractor.update(ref.dataId for ref in refs)
1836 result.dimension_data = DimensionDataAttacher(
1837 records=dimension_data_extractor.records.values(),
1838 dimensions=result.pipeline_graph.get_all_dimensions(),
1839 )
1840 result.set_thin_graph()
1841 result.set_header_counts()
1842 return result
1844 def write(
1845 self,
1846 uri: ResourcePathExpression,
1847 *,
1848 zstd_level: int = 10,
1849 zstd_dict_size: int = 32768,
1850 zstd_dict_n_inputs: int = 512,
1851 ) -> None:
1852 """Write the graph to a file.
1854 Parameters
1855 ----------
1856 uri : convertible to `lsst.resources.ResourcePath`
1857 Path to write to. Should have a ``.qg`` extension, or ``.qgraph``
1858 to force writing the old format.
1859 zstd_level : `int`, optional
1860 ZStandard compression level to use on JSON blocks.
1861 zstd_dict_size : `int`, optional
1862 Size of a ZStandard dictionary that shares compression information
1863 across components. Set to zero to disable the dictionary.
1864 Dictionary compression is automatically disabled if the number of
1865 quanta is smaller than ``zstd_dict_n_inputs``.
1866 zstd_dict_n_inputs : `int`, optional
1867 Maximum number of `PredictedQuantumDatasetsModel` JSON
1868 representations to feed the ZStandard dictionary training routine.
1870 Notes
1871 -----
1872 Only a complete predicted quantum graph with all components fully
1873 populated should be written.
1874 """
1875 if self.header.n_task_quanta != {
1876 task_label: len(quanta) for task_label, quanta in self.thin_graph.quanta.items()
1877 }:
1878 raise RuntimeError(
1879 "Cannot save graph after partial read of quanta: thin graph is inconsistent with header."
1880 )
1881 # Ensure we record the actual version we're about to write, in case
1882 # we're rewriting an old graph in a new format.
1883 self.header.version = FORMAT_VERSION
1884 uri = ResourcePath(uri)
1885 match uri.getExtension():
1886 case ".qg":
1887 pass
1888 case ".qgraph":
1889 _LOG.warning(
1890 "Converting to an old-format quantum graph.. "
1891 "Use '.qg' instead of '.qgraph' to save in the new format."
1892 )
1893 old_qg = self.assemble().to_old_quantum_graph()
1894 old_qg.saveUri(uri)
1895 return
1896 case ext:
1897 raise ValueError(
1898 f"Unsupported extension {ext!r} for quantum graph; "
1899 "expected '.qg' (or '.qgraph' to force the old format)."
1900 )
1901 cdict_data: bytes | None = None
1902 quantum_datasets_json: dict[uuid.UUID, bytes] = {}
1903 if len(self.quantum_datasets) < zstd_dict_n_inputs:
1904 # ZStandard will fail if we ask to use a compression dict without
1905 # giving it enough data, and it only helps if we have a lot of
1906 # quanta.
1907 zstd_dict_size = 0
1908 if zstd_dict_size:
1909 quantum_datasets_json = {
1910 quantum_model.quantum_id: quantum_model.model_dump_json().encode()
1911 for quantum_model in itertools.islice(self.quantum_datasets.values(), zstd_dict_n_inputs)
1912 }
1913 try:
1914 cdict_data = zstandard.train_dictionary(
1915 zstd_dict_size,
1916 list(quantum_datasets_json.values()),
1917 level=zstd_level,
1918 ).as_bytes()
1919 except zstandard.ZstdError as err:
1920 warnings.warn(f"Not using a compression dictionary: {err}.")
1921 with BaseQuantumGraphWriter.open(
1922 uri,
1923 header=self.header,
1924 pipeline_graph=self.pipeline_graph,
1925 address_filename="quanta",
1926 cdict_data=cdict_data,
1927 zstd_level=zstd_level,
1928 ) as writer:
1929 writer.write_single_model("thin_graph", self.thin_graph)
1930 if self.dimension_data is None:
1931 raise IncompleteQuantumGraphError(
1932 "Cannot save predicted quantum graph with no dimension data."
1933 )
1934 serialized_dimension_data = self.dimension_data.serialized()
1935 writer.write_single_model("dimension_data", serialized_dimension_data)
1936 del serialized_dimension_data
1937 writer.write_single_model("init_quanta", self.init_quanta)
1938 with MultiblockWriter.open_in_zip(
1939 writer.zf, "quantum_datasets", writer.int_size
1940 ) as quantum_datasets_mb:
1941 for quantum_model in self.quantum_datasets.values():
1942 if json_data := quantum_datasets_json.get(quantum_model.quantum_id):
1943 quantum_datasets_mb.write_bytes(
1944 quantum_model.quantum_id, writer.compressor.compress(json_data)
1945 )
1946 else:
1947 quantum_datasets_mb.write_model(
1948 quantum_model.quantum_id, quantum_model, writer.compressor
1949 )
1950 writer.address_writer.addresses.append(quantum_datasets_mb.addresses)
1953@dataclasses.dataclass
1954class PredictedQuantumGraphReader(BaseQuantumGraphReader):
1955 """A helper class for reading predicted quantum graphs."""
1957 components: PredictedQuantumGraphComponents = dataclasses.field(init=False)
1958 """Quantum graph components populated by this reader's methods."""
1960 @classmethod
1961 @contextmanager
1962 def open(
1963 cls,
1964 uri: ResourcePathExpression,
1965 *,
1966 page_size: int = DEFAULT_PAGE_SIZE,
1967 import_mode: TaskImportMode = TaskImportMode.ASSUME_CONSISTENT_EDGES,
1968 ) -> Iterator[PredictedQuantumGraphReader]:
1969 """Construct a reader from a URI.
1971 Parameters
1972 ----------
1973 uri : convertible to `lsst.resources.ResourcePath`
1974 URI to open. Should have a ``.qg`` extension.
1975 page_size : `int`, optional
1976 Approximate number of bytes to read at once from address files.
1977 Note that this does not set a page size for *all* reads, but it
1978 does affect the smallest, most numerous reads.
1979 import_mode : `.pipeline_graph.TaskImportMode`, optional
1980 How to handle importing the task classes referenced in the pipeline
1981 graph.
1983 Returns
1984 -------
1985 reader : `contextlib.AbstractContextManager` [ \
1986 `PredictedQuantumGraphReader` ]
1987 A context manager that returns the reader when entered.
1988 """
1989 with cls._open(
1990 uri,
1991 graph_type="predicted",
1992 address_filename="quanta",
1993 page_size=page_size,
1994 import_mode=import_mode,
1995 n_addresses=1,
1996 ) as self:
1997 yield self
1999 def __post_init__(self) -> None:
2000 self.components = PredictedQuantumGraphComponents(
2001 header=self.header, pipeline_graph=self.pipeline_graph
2002 )
2004 def finish(self) -> PredictedQuantumGraph:
2005 """Construct a `PredictedQuantumGraph` instance from this reader."""
2006 return self.components.assemble()
2008 def read_all(self) -> None:
2009 """Read all components in full."""
2010 self.read_thin_graph()
2011 self.read_execution_quanta()
2013 def read_thin_graph(self) -> None:
2014 """Read the thin graph.
2016 The thin graph is a quantum-quantum DAG with just task labels and data
2017 IDs as node attributes. It always includes all regular quanta, and
2018 does not include init-input or init-output information.
2019 """
2020 if not self.components.thin_graph.quanta:
2021 if self.header.version > 0:
2022 self.components.thin_graph = self._read_single_block("thin_graph", PredictedThinGraphModel)
2023 else:
2024 self.address_reader.read_all()
2025 thin_graph_v0 = self._read_single_block("thin_graph", _PredictedThinGraphModelV0)
2026 self.components.thin_graph = thin_graph_v0._upgraded(self.address_reader.rows)
2028 def read_init_quanta(self) -> None:
2029 """Read the list of special quanta that represent init-inputs and
2030 init-outputs.
2031 """
2032 if not self.components.init_quanta.root:
2033 self.components.init_quanta = self._read_single_block("init_quanta", PredictedInitQuantaModel)
2035 def read_dimension_data(self) -> None:
2036 """Read all dimension records.
2038 Record data IDs will be immediately deserialized, while other fields
2039 will be left in serialized form until they are needed.
2040 """
2041 if self.components.dimension_data is None:
2042 serializable_dimension_data = self._read_single_block("dimension_data", SerializableDimensionData)
2043 self.components.dimension_data = DimensionDataAttacher(
2044 deserializers=[
2045 DimensionRecordSetDeserializer.from_raw(
2046 self.components.pipeline_graph.universe[element], serialized_records
2047 )
2048 for element, serialized_records in serializable_dimension_data.root.items()
2049 ],
2050 dimensions=DimensionGroup.union(
2051 *self.components.pipeline_graph.group_by_dimensions(prerequisites=True).keys(),
2052 universe=self.components.pipeline_graph.universe,
2053 ),
2054 )
2056 def read_quantum_datasets(self, quantum_ids: Iterable[uuid.UUID] | None = None) -> None:
2057 """Read information about all datasets produced and consumed by the
2058 given quantum IDs.
2060 Parameters
2061 ----------
2062 quantum_ids : `~collections.abc.Iterable` [ `uuid.UUID` ], optional
2063 Iterable of quantum IDs to load. If not provided, all quanta will
2064 be loaded. The UUIDs of special init quanta will be ignored.
2065 """
2066 quantum_datasets: PredictedQuantumDatasetsModel | None
2067 if quantum_ids is None:
2068 if len(self.components.quantum_datasets) != self.header.n_quanta:
2069 for quantum_datasets in MultiblockReader.read_all_models_in_zip(
2070 self.zf,
2071 "quantum_datasets",
2072 PredictedQuantumDatasetsModel,
2073 self.decompressor,
2074 int_size=self.components.header.int_size,
2075 page_size=self.page_size,
2076 ):
2077 self.components.quantum_datasets.setdefault(quantum_datasets.quantum_id, quantum_datasets)
2078 self.address_reader.read_all()
2079 return
2080 with MultiblockReader.open_in_zip(
2081 self.zf, "quantum_datasets", int_size=self.components.header.int_size
2082 ) as mb_reader:
2083 for quantum_id in quantum_ids:
2084 if quantum_id in self.components.quantum_datasets:
2085 continue
2086 address_row = self.address_reader.find(quantum_id)
2087 quantum_datasets = mb_reader.read_model(
2088 address_row.addresses[0], PredictedQuantumDatasetsModel, self.decompressor
2089 )
2090 if quantum_datasets is not None:
2091 self.components.quantum_datasets[address_row.key] = quantum_datasets
2092 return
2094 def read_execution_quanta(self, quantum_ids: Iterable[uuid.UUID] | None = None) -> None:
2095 """Read all information needed to execute the given quanta.
2097 Parameters
2098 ----------
2099 quantum_ids : `~collections.abc.Iterable` [ `uuid.UUID` ], optional
2100 Iterable of quantum IDs to load. If not provided, all quanta will
2101 be loaded. The UUIDs of special init quanta will be ignored.
2102 """
2103 self.read_init_quanta()
2104 self.read_dimension_data()
2105 self.read_quantum_datasets(quantum_ids)