Coverage for python / lsst / pipe / base / pipeline_graph / _pipeline_graph.py: 12%
745 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:20 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:20 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
27from __future__ import annotations
29__all__ = ("PipelineGraph", "compare_packages", "log_config_mismatch")
31import gzip
32import itertools
33import json
34import logging
35from collections.abc import Callable, Iterable, Iterator, Mapping, Sequence, Set
36from typing import TYPE_CHECKING, Any, BinaryIO, Literal, cast
38import networkx
39import networkx.algorithms.bipartite
40import networkx.algorithms.dag
42from lsst.daf.butler import (
43 Butler,
44 DataCoordinate,
45 DataId,
46 DatasetRef,
47 DatasetType,
48 DimensionGroup,
49 DimensionUniverse,
50 MissingDatasetTypeError,
51)
52from lsst.daf.butler.registry import ConflictingDefinitionError, Registry
53from lsst.resources import ResourcePath, ResourcePathExpression
54from lsst.utils.packages import Packages
56from .._dataset_handle import InMemoryDatasetHandle
57from ..automatic_connection_constants import PACKAGES_INIT_OUTPUT_NAME, PACKAGES_INIT_OUTPUT_STORAGE_CLASS
58from . import expressions
59from ._dataset_types import DatasetTypeNode
60from ._edges import Edge, ReadEdge, WriteEdge
61from ._exceptions import (
62 DuplicateOutputError,
63 EdgesChangedError,
64 InvalidExpressionError,
65 InvalidStepsError,
66 PipelineDataCycleError,
67 PipelineGraphError,
68 PipelineGraphExceptionSafetyError,
69 UnresolvedGraphError,
70)
71from ._mapping_views import DatasetTypeMappingView, TaskMappingView
72from ._nodes import NodeKey, NodeType
73from ._task_subsets import StepDefinitions, TaskSubset
74from ._tasks import TaskImportMode, TaskInitNode, TaskNode, _TaskNodeImportedData
76if TYPE_CHECKING:
77 from ..config import PipelineTaskConfig
78 from ..connections import PipelineTaskConnections
79 from ..pipeline import TaskDef
80 from ..pipelineTask import PipelineTask
82_LOG = logging.getLogger("lsst.pipe.base.pipeline_graph")
85class PipelineGraph:
86 """A graph representation of fully-configured pipeline.
88 `PipelineGraph` instances are typically constructed by calling
89 `.Pipeline.to_graph`, but in rare cases constructing and then populating an
90 empty one may be preferable.
92 Parameters
93 ----------
94 description : `str`, optional
95 String description for this pipeline.
96 universe : `lsst.daf.butler.DimensionUniverse`, optional
97 Definitions for all butler dimensions. If not provided, some
98 attributes will not be available until `resolve` is called.
99 data_id : `lsst.daf.butler.DataCoordinate` or other data ID, optional
100 Data ID that represents a constraint on all quanta generated by this
101 pipeline. This typically just holds the instrument constraint included
102 in the pipeline definition, if there was one.
103 """
105 ###########################################################################
106 #
107 # Simple Pipeline Graph Inspection Interface:
108 #
109 # - for inspecting graph structure, not modifying it (except to sort and]
110 # resolve);
111 #
112 # - no NodeKey objects, just string dataset type name and task label keys;
113 #
114 # - graph structure is represented as a pair of mappings, with methods to
115 # find neighbors and edges of nodes.
116 #
117 ###########################################################################
119 def __init__(
120 self,
121 *,
122 description: str = "",
123 universe: DimensionUniverse | None = None,
124 data_id: DataId | None = None,
125 ) -> None:
126 self._init_from_args(
127 xgraph=None,
128 sorted_keys=None,
129 task_subsets=None,
130 description=description,
131 universe=universe,
132 data_id=data_id,
133 step_definitions=StepDefinitions(universe),
134 )
136 def __repr__(self) -> str:
137 return f"{type(self).__name__}({self.description!r}, tasks={self.tasks!s})"
139 @property
140 def description(self) -> str:
141 """String description for this pipeline."""
142 return self._description
144 @description.setter
145 def description(self, value: str) -> None:
146 # Docstring in setter.
147 self._description = value
149 @property
150 def universe(self) -> DimensionUniverse:
151 """Definitions for all butler dimensions."""
152 if self._universe is None:
153 raise UnresolvedGraphError("Pipeline graph is not resolved.")
154 return self._universe
156 @property
157 def data_id(self) -> DataCoordinate:
158 """Data ID that represents a constraint on all quanta generated from
159 this pipeline.
161 This is may not be available unless the graph is resolved.
162 """
163 return DataCoordinate.standardize(self._raw_data_id, universe=self.universe)
165 @property
166 def tasks(self) -> TaskMappingView:
167 """A mapping view of the tasks in the graph.
169 This mapping has `str` task label keys and `TaskNode` values. Iteration
170 is topologically and deterministically ordered if and only if `sort`
171 has been called since the last modification to the graph.
172 """
173 return self._tasks
175 @property
176 def dataset_types(self) -> DatasetTypeMappingView:
177 """A mapping view of the dataset types in the graph.
179 This mapping has `str` parent dataset type name keys, but only provides
180 access to its `DatasetTypeNode` values if `resolve` has been called
181 since the last modification involving a task that uses a dataset type.
182 See `DatasetTypeMappingView` for details.
183 """
184 return self._dataset_types
186 @property
187 def task_subsets(self) -> Mapping[str, TaskSubset]:
188 """A mapping of all labeled subsets of tasks.
190 Keys are subset labels, values are sets of task labels. See
191 `TaskSubset` for more information.
193 Use `add_task_subset` to add a new subset. The subsets themselves may
194 be modified in-place.
195 """
196 return self._task_subsets
198 @property
199 def steps(self) -> StepDefinitions:
200 """An ordered iterable of the labels of task subsets that must be
201 executed separately.
203 Steps are intended to be partition of the full pipeline graph - the
204 steps together include all tasks, and each task belongs to exactly one
205 step - but this is only checked when the graph is resolved.
207 The order of the steps is required to be consistent with the graph's
208 topological ordering, and when sorting a graph with steps, that sort
209 order is guaranteed to be consistent with the order of the steps. But
210 there may be other orderings of the steps that are still valid for
211 execution (e.g. two steps could be runnable in parallel).
212 """
213 return self._step_definitions
215 @steps.setter
216 def steps(self, labels: Iterable[str]) -> None:
217 # Docstring on getter.
218 self._step_definitions.assign(labels)
220 def get_task_step(self, task_label: str) -> str:
221 """Return the step that the given task belongs to.
223 Parameters
224 ----------
225 task_label : `str`
226 Label of the task to look up.
228 Returns
229 -------
230 step_label : `str`
231 Step the task belongs to.
233 Raises
234 ------
235 InvalidStepsError
236 Raised if the pipeline has no steps defined.
237 UnresolvedGraphError
238 Raised if the pipeline has been modified since the steps were
239 last verified.
240 """
241 if not self._step_definitions:
242 raise InvalidStepsError("No steps have been defined for this pipeline.")
243 elif self._step_definitions.verified:
244 return self._xgraph.nodes[NodeKey(NodeType.TASK, task_label)]["step"]
245 raise UnresolvedGraphError("Steps have not been verified since the last modification.")
247 @property
248 def is_fully_resolved(self) -> bool:
249 """Whether all of this graph's nodes are resolved and any all have
250 been checked for correctness.
252 A fully-resolved graph is always sorted as well, and a fully-resolved
253 graph with sorted
254 """
255 return (
256 self._universe is not None
257 and self._step_definitions.verified
258 and self._sorted_keys is not None
259 and all(self.dataset_types.is_resolved(k) for k in self.dataset_types)
260 )
262 @property
263 def has_been_sorted(self) -> bool:
264 """Whether this graph's tasks and dataset types have been
265 topologically sorted (with unspecified but deterministic tiebreakers)
266 since the last modification to the graph.
268 If the pipeline graph has step definitions, the sort order is
269 consistent with the step order.
271 This may return `False` if the graph *happens* to be sorted but `sort`
272 was never called,.
273 """
274 return self._sorted_keys is not None
276 def sort(self) -> None:
277 """Sort this graph's nodes topologically with deterministic (but
278 unspecified) tiebreakers.
280 This does nothing if the graph is already known to be sorted.
281 """
282 if self._sorted_keys is None:
283 try:
284 sorted_keys: Sequence[NodeKey] = list(networkx.lexicographical_topological_sort(self._xgraph))
285 except networkx.NetworkXUnfeasible as err: # pragma: no cover
286 # Should't be possible to get here, because we check for cycles
287 # when adding tasks, but we guard against it anyway.
288 cycle = networkx.find_cycle(self._xgraph)
289 raise PipelineDataCycleError(
290 f"Cycle detected while attempting to sort graph: {cycle}."
291 ) from err
292 self._reorder(sorted_keys)
294 def copy(self) -> PipelineGraph:
295 """Return a copy of this graph that copies all mutable state."""
296 xgraph = self._xgraph.copy()
297 step_definitions = self._step_definitions.copy()
298 result = PipelineGraph.__new__(PipelineGraph)
299 result._init_from_args(
300 xgraph,
301 self._sorted_keys,
302 task_subsets={
303 k: TaskSubset(xgraph, v.label, set(v._members), v.description, step_definitions)
304 for k, v in self._task_subsets.items()
305 },
306 description=self._description,
307 universe=self._universe,
308 data_id=self._raw_data_id,
309 step_definitions=step_definitions,
310 )
311 return result
313 def __copy__(self) -> PipelineGraph:
314 # Fully shallow copies are dangerous; we don't want shared mutable
315 # state to lead to broken class invariants.
316 return self.copy()
318 def __deepcopy__(self, memo: dict) -> PipelineGraph:
319 # Genuine deep copies are unnecessary, since we should only ever care
320 # that mutable state is copied.
321 return self.copy()
323 def diff_tasks(self, other: PipelineGraph) -> list[str]:
324 """Compare two pipeline graphs.
326 This only compares graph structure and task classes (including their
327 edges). It does *not* compare full configuration (which is subject to
328 spurious differences due to import-cache state), dataset type
329 resolutions, or sort state.
331 Parameters
332 ----------
333 other : `PipelineGraph`
334 Graph to compare to.
336 Returns
337 -------
338 differences : `list` [ `str` ]
339 List of string messages describing differences between the
340 pipelines. If empty, the graphs have the same tasks and
341 connections.
342 """
343 messages: list[str] = []
344 common_labels: Set[str]
345 if self.tasks.keys() != other.tasks.keys():
346 common_labels = self.tasks.keys() & other.tasks.keys()
347 messages.append(
348 f"Pipelines have different tasks: A & ~B = {list(self.tasks.keys() - common_labels)}, "
349 f"B & ~A = {list(other.tasks.keys() - common_labels)}."
350 )
351 else:
352 common_labels = self.tasks.keys()
353 for label in common_labels:
354 a = self.tasks[label]
355 b = other.tasks[label]
356 if a.task_class != b.task_class:
357 messages.append(
358 f"Task {label!r} has class {a.task_class_name} in A, but {b.task_class_name} in B."
359 )
360 messages.extend(a.diff_edges(b))
361 return messages
363 def producing_edge_of(self, dataset_type_name: str) -> WriteEdge | None:
364 """Return the `WriteEdge` that links the producing task to the named
365 dataset type.
367 Parameters
368 ----------
369 dataset_type_name : `str`
370 Dataset type name. Must not be a component.
372 Returns
373 -------
374 edge : `WriteEdge` or `None`
375 Producing edge or `None` if there isn't one in this graph.
377 Raises
378 ------
379 DuplicateOutputError
380 Raised if there are multiple tasks defined to produce this dataset
381 type. This is only possible if the graph's dataset types are not
382 resolved.
384 Notes
385 -----
386 On resolved graphs, it may be slightly more efficient to use::
388 graph.dataset_types[dataset_type_name].producing_edge
390 but this method works on graphs with unresolved dataset types as well.
391 """
392 producer: str | None = None
393 producing_edge: WriteEdge | None = None
394 for _, _, producing_edge in self._xgraph.in_edges(
395 NodeKey(NodeType.DATASET_TYPE, dataset_type_name), data="instance"
396 ):
397 assert producing_edge is not None, "Should only be None if we never loop."
398 if producer is not None:
399 raise DuplicateOutputError(
400 f"Dataset type {dataset_type_name!r} is produced by both {producing_edge.task_label!r} "
401 f"and {producer!r}."
402 )
403 return producing_edge
405 def consuming_edges_of(self, dataset_type_name: str) -> list[ReadEdge]:
406 """Return the `ReadEdge` objects that link the named dataset type to
407 the tasks that consume it.
409 Parameters
410 ----------
411 dataset_type_name : `str`
412 Dataset type name. Must not be a component.
414 Returns
415 -------
416 edges : `list` [ `ReadEdge` ]
417 Edges that connect this dataset type to the tasks that consume it.
419 Notes
420 -----
421 On resolved graphs, it may be slightly more efficient to use::
423 graph.dataset_types[dataset_type_name].producing_edges
425 but this method works on graphs with unresolved dataset types as well.
426 """
427 return [
428 edge
429 for _, _, edge in self._xgraph.out_edges(
430 NodeKey(NodeType.DATASET_TYPE, dataset_type_name), data="instance"
431 )
432 ]
434 def producer_of(self, dataset_type_name: str) -> TaskNode | TaskInitNode | None:
435 """Return the `TaskNode` or `TaskInitNode` that writes the given
436 dataset type.
438 Parameters
439 ----------
440 dataset_type_name : `str`
441 Dataset type name. Must not be a component.
443 Returns
444 -------
445 edge : `TaskNode`, `TaskInitNode`, or `None`
446 Producing node or `None` if there isn't one in this graph.
448 Raises
449 ------
450 DuplicateOutputError
451 Raised if there are multiple tasks defined to produce this dataset
452 type. This is only possible if the graph's dataset types are not
453 resolved.
454 """
455 if (producing_edge := self.producing_edge_of(dataset_type_name)) is not None:
456 return self._xgraph.nodes[producing_edge.task_key]["instance"]
457 return None
459 def consumers_of(self, dataset_type_name: str) -> list[TaskNode | TaskInitNode]:
460 """Return the `TaskNode` and/or `TaskInitNode` objects that read
461 the given dataset type.
463 Parameters
464 ----------
465 dataset_type_name : `str`
466 Dataset type name. Must not be a component.
468 Returns
469 -------
470 edges : `list` [ `ReadEdge` ]
471 Edges that connect this dataset type to the tasks that consume it.
473 Notes
474 -----
475 On resolved graphs, it may be slightly more efficient to use::
477 graph.dataset_types[dataset_type_name].producing_edges
479 but this method works on graphs with unresolved dataset types as well.
480 """
481 return [
482 self._xgraph.nodes[consuming_edge.task_key]["instance"]
483 for consuming_edge in self.consuming_edges_of(dataset_type_name)
484 ]
486 def inputs_of(self, task_label: str, init: bool = False) -> dict[str, DatasetTypeNode | None]:
487 """Return the dataset types that are inputs to a task.
489 Parameters
490 ----------
491 task_label : `str`
492 Label for the task in the pipeline.
493 init : `bool`, optional
494 If `True`, return init-input dataset types instead of runtime
495 (including prerequisite) inputs.
497 Returns
498 -------
499 inputs : `dict` [ `str`, `DatasetTypeNode` or `None` ]
500 Dictionary parent dataset type name keys and either
501 `DatasetTypeNode` values (if the dataset type has been resolved)
502 or `None` values.
504 Notes
505 -----
506 To get the input edges of a task or task init node (which provide
507 information about storage class overrides nd components) use::
509 graph.tasks[task_label].iter_all_inputs()
511 or
513 graph.tasks[task_label].init.iter_all_inputs()
515 or the various mapping attributes of the `TaskNode` and `TaskInitNode`
516 class.
517 """
518 node: TaskNode | TaskInitNode = self.tasks[task_label] if not init else self.tasks[task_label].init
519 return {
520 edge.parent_dataset_type_name: self._xgraph.nodes[edge.dataset_type_key]["instance"]
521 for edge in node.iter_all_inputs()
522 }
524 def outputs_of(
525 self, task_label: str, init: bool = False, include_automatic_connections: bool = True
526 ) -> dict[str, DatasetTypeNode | None]:
527 """Return the dataset types that are outputs of a task.
529 Parameters
530 ----------
531 task_label : `str`
532 Label for the task in the pipeline.
533 init : `bool`, optional
534 If `True`, return init-output dataset types instead of runtime
535 outputs.
536 include_automatic_connections : `bool`, optional
537 Whether to include automatic connections such as configs, metadata,
538 and logs.
540 Returns
541 -------
542 outputs : `dict` [ `str`, `DatasetTypeNode` or `None` ]
543 Dictionary parent dataset type name keys and either
544 `DatasetTypeNode` values (if the dataset type has been resolved)
545 or `None` values.
547 Notes
548 -----
549 To get the input edges of a task or task init node (which provide
550 information about storage class overrides nd components) use::
552 graph.tasks[task_label].iter_all_outputs()
554 or
556 graph.tasks[task_label].init.iter_all_outputs()
558 or the various mapping attributes of the `TaskNode` and `TaskInitNode`
559 class.
560 """
561 node: TaskNode | TaskInitNode = self.tasks[task_label] if not init else self.tasks[task_label].init
562 iterable = node.iter_all_outputs() if include_automatic_connections else node.outputs.values()
563 return {
564 edge.parent_dataset_type_name: self._xgraph.nodes[edge.dataset_type_key]["instance"]
565 for edge in iterable
566 }
568 def resolve(
569 self,
570 registry: Registry | None = None,
571 dimensions: DimensionUniverse | None = None,
572 dataset_types: Mapping[str, DatasetType] | None = None,
573 visualization_only: bool = False,
574 ) -> None:
575 """Resolve all dimensions and dataset types and check them for
576 consistency.
578 Resolving a graph also causes it to be sorted.
580 Parameters
581 ----------
582 registry : `lsst.daf.butler.Registry`, optional
583 Client for the data repository to resolve against.
584 dimensions : `lsst.daf.butler.DimensionUniverse`, optional
585 Definitions for all dimensions. Takes precedence over
586 ``registry.dimensions`` if both are provided. If neither is
587 provided, defaults to the default dimension universe
588 (``lsst.daf.butler.DimensionUniverse()``).
589 dataset_types : `~collection.abc.Mapping` [ `str`, \
590 `~lsst.daf.butler.DatasetType` ], optional
591 Mapping of dataset types to consider registered. Takes precedence
592 over ``registry.getDatasetType()`` if both are provided.
593 visualization_only : `bool`, optional
594 Resolve the graph as well as possible even when dimensions and
595 storage classes cannot really be determined. This can include
596 using the ``universe.commonSkyPix`` as the assumed dimensions of
597 connections that use the "skypix" placeholder and using "<UNKNOWN>"
598 as a storage class name (which will fail if the storage class
599 itself is ever actually loaded).
601 Notes
602 -----
603 The `universe` attribute is set to ``dimensions`` and used to set all
604 `TaskNode.dimensions` attributes. Dataset type nodes are resolved by
605 first looking for a registry definition, then using the producing
606 task's definition, then looking for consistency between all consuming
607 task definitions.
609 Raises
610 ------
611 ConnectionTypeConsistencyError
612 Raised if a prerequisite input for one task appears as a different
613 kind of connection in any other task.
614 DuplicateOutputError
615 Raised if multiple tasks have the same dataset type as an output.
616 IncompatibleDatasetTypeError
617 Raised if different tasks have different definitions of a dataset
618 type. Different but compatible storage classes are permitted.
619 MissingDatasetTypeError
620 Raised if a dataset type definition is required to exist in the
621 data repository but none was found. This should only occur for
622 dataset types that are not produced by a task in the pipeline and
623 are consumed with different storage classes or as components by
624 tasks in the pipeline.
625 EdgesChangedError
626 Raised if ``check_edges_unchanged=True`` and the edges of a task do
627 change after import and reconfiguration.
628 """
629 get_registered: Callable[[str], DatasetType | None] | None = None
630 if dataset_types is not None:
631 # Ruff seems confused about whether this is used below; it is!
632 get_registered = dataset_types.get
633 elif registry is not None:
635 def get_registered(name: str) -> DatasetType | None:
636 try:
637 return registry.getDatasetType(name)
638 except MissingDatasetTypeError:
639 return None
641 else:
643 def get_registered(name: str) -> None:
644 return None
646 if dimensions is None:
647 if registry is not None:
648 dimensions = registry.dimensions
649 else:
650 dimensions = DimensionUniverse()
652 sort_keys_from_steps = self._resolve_step_flow()
654 node_key: NodeKey
655 updates: dict[NodeKey, TaskNode | DatasetTypeNode] = {}
656 for node_key, node_state in self._xgraph.nodes.items():
657 match node_key.node_type:
658 case NodeType.TASK:
659 task_node: TaskNode = node_state["instance"]
660 new_task_node = task_node._resolved(dimensions)
661 if new_task_node is not task_node:
662 updates[node_key] = new_task_node
663 case NodeType.DATASET_TYPE:
664 dataset_type_node: DatasetTypeNode | None = node_state["instance"]
665 new_dataset_type_node = DatasetTypeNode._from_edges(
666 node_key,
667 self._xgraph,
668 get_registered,
669 dimensions,
670 previous=dataset_type_node,
671 visualization_only=visualization_only,
672 )
673 # Usage of `is`` here is intentional; `_from_edges` returns
674 # `previous=dataset_type_node` if it can determine that it
675 # doesn't need to change.
676 if new_dataset_type_node is not dataset_type_node:
677 updates[node_key] = new_dataset_type_node
678 try:
679 for node_key, node_value in updates.items():
680 self._xgraph.nodes[node_key]["instance"] = node_value
681 except Exception as err: # pragma: no cover
682 # There's no known way to get here, but we want to make it
683 # clear it's a big problem if we do.
684 raise PipelineGraphExceptionSafetyError(
685 "Error during dataset type resolution has left the graph in an inconsistent state."
686 ) from err
688 # If we get an error here, many graph nodes will have been resolved but
689 # the steps will still be marked as unverified and unsorted. That's
690 # still an acceptable state for the pipeline to be in.
691 self._resolve_step_dimensions(dimensions)
692 self._step_definitions._verified = True
694 if sort_keys_from_steps is not None:
695 self._reorder(sort_keys_from_steps)
696 else:
697 self.sort()
698 self._universe = dimensions
700 ###########################################################################
701 #
702 # Graph Modification Interface:
703 #
704 # - methods to add, remove, and replace tasks;
705 #
706 # - methods to add and remove task subsets.
707 #
708 # These are all things that are usually done in a Pipeline before making a
709 # graph at all, but there may be cases where we want to modify the graph
710 # instead. (These are also the methods used to make a graph from a
711 # Pipeline, or make a graph from another graph.)
712 #
713 ###########################################################################
715 def add_task(
716 self,
717 label: str | None,
718 task_class: type[PipelineTask],
719 config: PipelineTaskConfig | None = None,
720 connections: PipelineTaskConnections | None = None,
721 ) -> TaskNode:
722 """Add a new task to the graph.
724 Parameters
725 ----------
726 label : `str` or `None`
727 Label for the task in the pipeline. If `None`, `Task._DefaultName`
728 is used.
729 task_class : `type` [ `PipelineTask` ]
730 Class object for the task.
731 config : `PipelineTaskConfig`, optional
732 Configuration for the task. If not provided, a default-constructed
733 instance of ``task_class.ConfigClass`` is used.
734 connections : `PipelineTaskConnections`, optional
735 Object that describes the dataset types used by the task. If not
736 provided, one will be constructed from the given configuration. If
737 provided, it is assumed that ``config`` has already been validated
738 and frozen.
740 Returns
741 -------
742 node : `TaskNode`
743 The new task node added to the graph.
745 Raises
746 ------
747 ValueError
748 Raised if configuration validation failed when constructing
749 ``connections``.
750 PipelineDataCycleError
751 Raised if the graph is cyclic after this addition.
752 RuntimeError
753 Raised if an unexpected exception (which will be chained) occurred
754 at a stage that may have left the graph in an inconsistent state.
755 Other exceptions should leave the graph unchanged.
757 Notes
758 -----
759 Checks for dataset type consistency and multiple producers do not occur
760 until `resolve` is called, since the resolution depends on both the
761 state of the data repository and all contributing tasks.
763 Adding new tasks removes any existing resolutions of all dataset types
764 it references and marks the graph as unsorted. It is most efficient
765 to add all tasks up front and only then resolve and/or sort the graph.
766 """
767 if label is None:
768 label = task_class._DefaultName
769 if config is None:
770 config = task_class.ConfigClass()
771 _LOG.debug("Adding task %s %s to the pipeline graph", label, task_class)
772 task_node = TaskNode._from_imported_data(
773 key=NodeKey(NodeType.TASK, label),
774 init_key=NodeKey(NodeType.TASK_INIT, label),
775 data=_TaskNodeImportedData.configure(label, task_class, config, connections),
776 universe=self._universe,
777 )
778 self.add_task_nodes([task_node])
779 return task_node
781 def add_task_nodes(self, nodes: Iterable[TaskNode], parent: PipelineGraph | None = None) -> None:
782 """Add one or more existing task nodes to the graph.
784 Parameters
785 ----------
786 nodes : `~collections.abc.Iterable` [ `TaskNode` ]
787 Iterable of task nodes to add. If any tasks have resolved
788 dimensions, they must have the same dimension universe as the rest
789 of the graph.
790 parent : `PipelineGraph`, optional
791 If provided, another `PipelineGraph` from which these nodes were
792 obtained. Any dataset type nodes already present in ``parent``
793 that are referenced by the given tasks will be used in this graph
794 if they are not already present, preserving any dataset type
795 resolutions present in the parent graph. Adding nodes from a
796 parent graph after the graph has its own nodes (e.g. from
797 `add_task`) or nodes from a third graph may result in invalid
798 dataset type resolutions. It is safest to only use this argument
799 when populating an empty graph for the first time.
801 Raises
802 ------
803 PipelineDataCycleError
804 Raised if the graph is cyclic after this addition.
806 Notes
807 -----
808 Checks for dataset type consistency and multiple producers do not occur
809 until `resolve` is called, since the resolution depends on both the
810 state of the data repository and all contributing tasks.
812 Adding new tasks removes any existing resolutions of all dataset types
813 it references (unless ``parent is not None`` and marks the graph as
814 unsorted. It is most efficient to add all tasks up front and only then
815 resolve and/or sort the graph.
816 """
817 node_data: list[tuple[NodeKey, dict[str, Any]]] = []
818 edge_data: list[tuple[NodeKey, NodeKey, str, dict[str, Any]]] = []
819 for task_node in nodes:
820 task_node = task_node._resolved(self._universe)
821 node_data.append(
822 (task_node.key, {"instance": task_node, "bipartite": task_node.key.node_type.bipartite})
823 )
824 node_data.append(
825 (
826 task_node.init.key,
827 {"instance": task_node.init, "bipartite": task_node.init.key.node_type.bipartite},
828 )
829 )
830 # Convert the edge objects attached to the task node to networkx.
831 for read_edge in task_node.init.iter_all_inputs():
832 self._append_graph_data_from_edge(node_data, edge_data, read_edge, parent=parent)
833 for write_edge in task_node.init.iter_all_outputs():
834 self._append_graph_data_from_edge(node_data, edge_data, write_edge, parent=parent)
835 for read_edge in task_node.iter_all_inputs():
836 self._append_graph_data_from_edge(node_data, edge_data, read_edge, parent=parent)
837 for write_edge in task_node.iter_all_outputs():
838 self._append_graph_data_from_edge(node_data, edge_data, write_edge, parent=parent)
839 # Add a special edge (with no Edge instance) that connects the
840 # TaskInitNode to the runtime TaskNode.
841 edge_data.append((task_node.init.key, task_node.key, Edge.INIT_TO_TASK_NAME, {"instance": None}))
842 if not node_data and not edge_data:
843 return
844 # Checks and preparation complete; time to start the actual
845 # modification, during which it's hard to provide strong exception
846 # safety. Start by resetting the sort ordering, if there is one.
847 self._reset()
848 try:
849 self._xgraph.add_nodes_from(node_data)
850 self._xgraph.add_edges_from(edge_data)
851 if not networkx.algorithms.dag.is_directed_acyclic_graph(self._xgraph):
852 cycle = networkx.find_cycle(self._xgraph)
853 raise PipelineDataCycleError(f"Cycle detected while adding tasks: {cycle}.")
854 except Exception:
855 # First try to roll back our changes.
856 try:
857 self._xgraph.remove_edges_from(edge_data)
858 self._xgraph.remove_nodes_from(key for key, _ in node_data)
859 except Exception as err: # pragma: no cover
860 # There's no known way to get here, but we want to make it
861 # clear it's a big problem if we do.
862 raise PipelineGraphExceptionSafetyError(
863 "Error while attempting to revert PipelineGraph modification has left the graph in "
864 "an inconsistent state."
865 ) from err
866 # Successfully rolled back; raise the original exception.
867 raise
869 def reconfigure_tasks(
870 self,
871 *args: tuple[str, PipelineTaskConfig],
872 check_edges_unchanged: bool = False,
873 assume_edges_unchanged: bool = False,
874 **kwargs: PipelineTaskConfig,
875 ) -> None:
876 """Update the configuration for one or more tasks.
878 Parameters
879 ----------
880 *args : `tuple` [ `str`, `.PipelineTaskConfig` ]
881 Positional arguments are each a 2-tuple of task label and new
882 config object. Note that the same arguments may also be passed as
883 ``**kwargs``, which is usually more readable, but task labels in
884 ``*args`` are not required to be valid Python identifiers.
885 check_edges_unchanged : `bool`, optional
886 If `True`, require the edges (connections) of the modified tasks to
887 remain unchanged after the configuration updates, and verify that
888 this is the case.
889 assume_edges_unchanged : `bool`, optional
890 If `True`, the caller declares that the edges (connections) of the
891 modified tasks will remain unchanged after the configuration
892 updates, and that it is unnecessary to check this.
893 **kwargs : `.PipelineTaskConfig`
894 New config objects or overrides to apply to copies of the current
895 config objects, with task labels as the keywords.
897 Returns
898 -------
899 None
901 Raises
902 ------
903 ValueError
904 Raised if ``assume_edges_unchanged`` and ``check_edges_unchanged``
905 are both `True`, or if the same task appears twice.
906 EdgesChangedError
907 Raised if ``check_edges_unchanged=True`` and the edges of a task do
908 change.
910 Notes
911 -----
912 If reconfiguring a task causes its edges to change, any dataset type
913 nodes connected to that task (not just those whose edges have changed!)
914 will be unresolved.
915 """
916 new_configs: dict[str, PipelineTaskConfig] = {}
917 for task_label, config_update in itertools.chain(args, kwargs.items()):
918 if new_configs.setdefault(task_label, config_update) is not config_update:
919 raise ValueError(f"Config for {task_label!r} provided more than once.")
920 updates = {
921 task_label: self.tasks[task_label]._reconfigured(config, rebuild=not assume_edges_unchanged)
922 for task_label, config in new_configs.items()
923 }
924 self._replace_task_nodes(
925 updates,
926 check_edges_unchanged=check_edges_unchanged,
927 assume_edges_unchanged=assume_edges_unchanged,
928 message_header=(
929 "Unexpected change in edges for task {task_label!r} from original config (A) to "
930 "new configs (B):"
931 ),
932 )
934 def remove_tasks(
935 self, labels: Iterable[str], drop_from_subsets: bool = True
936 ) -> list[tuple[TaskNode, set[str]]]:
937 """Remove one or more tasks from the graph.
939 Parameters
940 ----------
941 labels : `~collections.abc.Iterable` [ `str` ]
942 Iterable of the labels of the tasks to remove.
943 drop_from_subsets : `bool`, optional
944 If `True`, drop each removed task from any subset in which it
945 currently appears. If `False`, raise `PipelineGraphError` if any
946 such subsets exist.
948 Returns
949 -------
950 nodes_and_subsets : `list` [ `tuple` [ `TaskNode`, `set` [ `str` ] ] ]
951 List of nodes removed and the labels of task subsets that
952 referenced them.
954 Raises
955 ------
956 PipelineGraphError
957 Raised if ``drop_from_subsets`` is `False` and the task is still
958 part of one or more subsets.
960 Notes
961 -----
962 Removing a task will cause dataset nodes with no other referencing
963 tasks to be removed. Any other dataset type nodes referenced by a
964 removed task will be reset to an "unresolved" state.
965 """
966 task_nodes_and_subsets = []
967 dataset_types: set[NodeKey] = set()
968 nodes_to_remove = set()
969 for label in labels:
970 task_node: TaskNode = self._xgraph.nodes[NodeKey(NodeType.TASK, label)]["instance"]
971 # Find task subsets that reference this task.
972 referencing_subsets = {
973 subset_label
974 for subset_label, task_subset in self.task_subsets.items()
975 if label in task_subset
976 }
977 if not drop_from_subsets and referencing_subsets:
978 raise PipelineGraphError(
979 f"Task {label!r} is still referenced by subset(s) {referencing_subsets}."
980 )
981 task_nodes_and_subsets.append((task_node, referencing_subsets))
982 # Find dataset types referenced by this task.
983 dataset_types.update(self._xgraph.predecessors(task_node.key))
984 dataset_types.update(self._xgraph.successors(task_node.key))
985 dataset_types.update(self._xgraph.predecessors(task_node.init.key))
986 dataset_types.update(self._xgraph.successors(task_node.init.key))
987 # Since there's an edge between the task and its init node, we'll
988 # have added those two nodes here, too, and we don't want that.
989 dataset_types.remove(task_node.init.key)
990 dataset_types.remove(task_node.key)
991 # Mark the task node and its init node for removal from the graph.
992 nodes_to_remove.add(task_node.key)
993 nodes_to_remove.add(task_node.init.key)
994 # Process the referenced datasets to see which ones are orphaned and
995 # need to be removed vs. just unresolved.
996 nodes_to_unresolve = []
997 for dataset_type_key in dataset_types:
998 related_tasks = set()
999 related_tasks.update(self._xgraph.predecessors(dataset_type_key))
1000 related_tasks.update(self._xgraph.successors(dataset_type_key))
1001 related_tasks.difference_update(nodes_to_remove)
1002 if not related_tasks:
1003 nodes_to_remove.add(dataset_type_key)
1004 else:
1005 nodes_to_unresolve.append(dataset_type_key)
1006 # Checks and preparation complete; time to start the actual
1007 # modification, during which it's hard to provide strong exception
1008 # safety. Start by resetting the sort ordering.
1009 self._reset()
1010 try:
1011 for dataset_type_key in nodes_to_unresolve:
1012 self._xgraph.nodes[dataset_type_key]["instance"] = None
1013 for task_node, referencing_subsets in task_nodes_and_subsets:
1014 for subset_label in referencing_subsets:
1015 self._task_subsets[subset_label].remove(task_node.label)
1016 self._xgraph.remove_nodes_from(nodes_to_remove)
1017 except Exception as err: # pragma: no cover
1018 # There's no known way to get here, but we want to make it
1019 # clear it's a big problem if we do.
1020 raise PipelineGraphExceptionSafetyError(
1021 "Error during task removal has left the graph in an inconsistent state."
1022 ) from err
1023 return task_nodes_and_subsets
1025 def add_task_subset(self, subset_label: str, task_labels: Iterable[str], description: str = "") -> None:
1026 """Add a label for a set of tasks that are already in the pipeline.
1028 Parameters
1029 ----------
1030 subset_label : `str`
1031 Label for this set of tasks.
1032 task_labels : `~collections.abc.Iterable` [ `str` ]
1033 Labels of the tasks to include in the set. All must already be
1034 included in the graph.
1035 description : `str`, optional
1036 String description to associate with this label.
1037 """
1038 subset = TaskSubset(self._xgraph, subset_label, set(task_labels), description, self._step_definitions)
1039 self._task_subsets[subset_label] = subset
1041 def remove_task_subset(self, subset_label: str) -> None:
1042 """Remove a labeled set of tasks.
1044 Parameters
1045 ----------
1046 subset_label : `str`
1047 Label for this set of tasks.
1049 Notes
1050 -----
1051 If this subset is a step, it is also removed from the step definitions.
1052 """
1053 try:
1054 self._step_definitions.remove(subset_label)
1055 except KeyError:
1056 pass
1057 del self._task_subsets[subset_label]
1059 ###########################################################################
1060 #
1061 # NetworkX Export Interface:
1062 #
1063 # - methods to export the PipelineGraph's content (or various subsets
1064 # thereof) as NetworkX objects.
1065 #
1066 # These are particularly useful when writing tools to visualize the graph,
1067 # while providing options for which aspects of the graph (tasks, dataset
1068 # types, or both) to include, since all exported graphs have similar
1069 # attributes regardless of their structure.
1070 #
1071 ###########################################################################
1073 def make_xgraph(self) -> networkx.MultiDiGraph:
1074 """Export a networkx representation of the full pipeline graph,
1075 including both init and runtime edges.
1077 Returns
1078 -------
1079 xgraph : `networkx.MultiDiGraph`
1080 Directed acyclic graph with parallel edges.
1082 Notes
1083 -----
1084 The returned graph uses `NodeKey` instances for nodes. Parallel edges
1085 represent the same dataset type appearing in multiple connections for
1086 the same task, and are hence rare. The connection name is used as the
1087 edge key to disambiguate those parallel edges.
1089 Almost all edges connect dataset type nodes to task or task init nodes
1090 or vice versa, but there is also a special edge that connects each task
1091 init node to its runtime node. The existence of these edges makes the
1092 graph not quite bipartite, though its init-only and runtime-only
1093 subgraphs are bipartite.
1095 See `TaskNode`, `TaskInitNode`, `DatasetTypeNode`, `ReadEdge`, and
1096 `WriteEdge` for the descriptive node and edge attributes added.
1097 """
1098 return self._transform_xgraph_state(self._xgraph.copy(), skip_edges=False)
1100 def make_bipartite_xgraph(self, init: bool = False) -> networkx.MultiDiGraph:
1101 """Return a bipartite networkx representation of just the runtime or
1102 init-time pipeline graph.
1104 Parameters
1105 ----------
1106 init : `bool`, optional
1107 If `True` (`False` is default) return the graph of task
1108 initialization nodes and init input/output dataset types, instead
1109 of the graph of runtime task nodes and regular
1110 input/output/prerequisite dataset types.
1112 Returns
1113 -------
1114 xgraph : `networkx.MultiDiGraph`
1115 Directed acyclic graph with parallel edges.
1117 Notes
1118 -----
1119 The returned graph uses `NodeKey` instances for nodes. Parallel edges
1120 represent the same dataset type appearing in multiple connections for
1121 the same task, and are hence rare. The connection name is used as the
1122 edge key to disambiguate those parallel edges.
1124 This graph is bipartite because each dataset type node only has edges
1125 that connect it to a task [init] node, and vice versa.
1127 See `TaskNode`, `TaskInitNode`, `DatasetTypeNode`, `ReadEdge`, and
1128 `WriteEdge` for the descriptive node and edge attributes added.
1129 """
1130 return self._transform_xgraph_state(
1131 self._make_bipartite_xgraph_internal(init).copy(), skip_edges=False
1132 )
1134 def make_task_xgraph(self, init: bool = False) -> networkx.DiGraph:
1135 """Return a networkx representation of just the tasks in the pipeline.
1137 Parameters
1138 ----------
1139 init : `bool`, optional
1140 If `True` (`False` is default) return the graph of task
1141 initialization nodes, instead of the graph of runtime task nodes.
1143 Returns
1144 -------
1145 xgraph : `networkx.DiGraph`
1146 Directed acyclic graph with no parallel edges.
1148 Notes
1149 -----
1150 The returned graph uses `NodeKey` instances for nodes. The dataset
1151 types that link these tasks are not represented at all; edges have no
1152 attributes, and there are no parallel edges.
1154 See `TaskNode` and `TaskInitNode` for the descriptive node and
1155 attributes added.
1156 """
1157 return self._transform_xgraph_state(self._make_task_xgraph_internal(init), skip_edges=True)
1159 def make_dataset_type_xgraph(self, init: bool = False) -> networkx.DiGraph:
1160 """Return a networkx representation of just the dataset types in the
1161 pipeline.
1163 Parameters
1164 ----------
1165 init : `bool`, optional
1166 If `True` (`False` is default) return the graph of init input and
1167 output dataset types, instead of the graph of runtime (input,
1168 output, prerequisite input) dataset types.
1170 Returns
1171 -------
1172 xgraph : `networkx.DiGraph`
1173 Directed acyclic graph with no parallel edges.
1175 Notes
1176 -----
1177 The returned graph uses `NodeKey` instances for nodes. The tasks that
1178 link these tasks are not represented at all; edges have no attributes,
1179 and there are no parallel edges.
1181 See `DatasetTypeNode` for the descriptive node and attributes added.
1182 """
1183 bipartite_xgraph = self._make_bipartite_xgraph_internal(init)
1184 dataset_type_keys = [
1185 key
1186 for key, bipartite in bipartite_xgraph.nodes(data="bipartite")
1187 if bipartite == NodeType.DATASET_TYPE.bipartite
1188 ]
1189 return self._transform_xgraph_state(
1190 networkx.algorithms.bipartite.projected_graph(
1191 networkx.DiGraph(bipartite_xgraph), dataset_type_keys
1192 ),
1193 skip_edges=True,
1194 )
1196 ###########################################################################
1197 #
1198 # Expression-based Selection Interface.
1199 #
1200 ###########################################################################
1202 def select_tasks(self, expression: str) -> set[str]:
1203 """Return the tasks that match an expression.
1205 Parameters
1206 ----------
1207 expression : `str`
1208 String expression to evaluate. See
1209 :ref:`pipeline-graph-subset-expressions`.
1211 Returns
1212 -------
1213 task_labels : `set` [ `str` ]
1214 Set of matching task labels.
1215 """
1216 task_xgraph = self._make_task_xgraph_internal(init=False)
1217 expr_tree = expressions.parse(expression)
1218 matching_task_keys = self._select_expression(expr_tree, task_xgraph)
1219 return {key.name for key in matching_task_keys}
1221 def select(self, expression: str) -> PipelineGraph:
1222 """Return a new pipeline graph with the tasks that match an expression.
1224 Parameters
1225 ----------
1226 expression : `str`
1227 String expression to evaluate. See
1228 :ref:`pipeline-graph-subset-expressions`.
1230 Returns
1231 -------
1232 new_graph : `PipelineGraph`
1233 New pipeline graph with just the matching tasks.
1235 Notes
1236 -----
1237 All resolved dataset type nodes will be preserved.
1239 If `has_been_sorted`, the new graph will be sorted as well.
1241 Task subsets will not be included in the returned graph.
1242 """
1243 selected_tasks = self.select_tasks(expression)
1244 new_pipeline_graph = PipelineGraph(universe=self._universe, data_id=self._raw_data_id)
1245 new_pipeline_graph.add_task_nodes(
1246 [self.tasks[task_label] for task_label in selected_tasks], parent=self
1247 )
1248 if self.has_been_sorted:
1249 new_pipeline_graph.sort()
1250 return new_pipeline_graph
1252 ###########################################################################
1253 #
1254 # Serialization Interface.
1255 #
1256 # Serialization of PipelineGraphs is currently experimental and may not be
1257 # retained in the future. All serialization methods are
1258 # underscore-prefixed to ensure nobody mistakes them for a stable interface
1259 # (let a lone a stable file format).
1260 #
1261 ###########################################################################
1263 @classmethod
1264 def _read_stream(
1265 cls, stream: BinaryIO, import_mode: TaskImportMode = TaskImportMode.REQUIRE_CONSISTENT_EDGES
1266 ) -> PipelineGraph:
1267 """Read a serialized `PipelineGraph` from a file-like object.
1269 Parameters
1270 ----------
1271 stream : `BinaryIO`
1272 File-like object opened for binary reading, containing
1273 gzip-compressed JSON.
1274 import_mode : `TaskImportMode`, optional
1275 Whether to import tasks, and how to reconcile any differences
1276 between the imported task's connections and the those that were
1277 persisted with the graph. Default is to check that they are the
1278 same.
1280 Returns
1281 -------
1282 graph : `PipelineGraph`
1283 Deserialized pipeline graph.
1285 Raises
1286 ------
1287 PipelineGraphReadError
1288 Raised if the serialized `PipelineGraph` is not self-consistent.
1289 EdgesChangedError
1290 Raised if ``import_mode`` is
1291 `TaskImportMode.REQUIRED_CONSISTENT_EDGES` and the edges of a task
1292 did change after import and reconfiguration.
1294 Notes
1295 -----
1296 `PipelineGraph` serialization is currently experimental and may be
1297 removed or significantly changed in the future, with no deprecation
1298 period.
1299 """
1300 from .io import SerializedPipelineGraph
1302 with gzip.open(stream, "rb") as uncompressed_stream:
1303 data = json.load(uncompressed_stream)
1304 serialized_graph = SerializedPipelineGraph.model_validate(data)
1305 return serialized_graph.deserialize(import_mode)
1307 @classmethod
1308 def _read_uri(
1309 cls,
1310 uri: ResourcePathExpression,
1311 import_mode: TaskImportMode = TaskImportMode.REQUIRE_CONSISTENT_EDGES,
1312 ) -> PipelineGraph:
1313 """Read a serialized `PipelineGraph` from a file at a URI.
1315 Parameters
1316 ----------
1317 uri : convertible to `lsst.resources.ResourcePath`
1318 URI to a gzip-compressed JSON file containing a serialized pipeline
1319 graph.
1320 import_mode : `TaskImportMode`, optional
1321 Whether to import tasks, and how to reconcile any differences
1322 between the imported task's connections and the those that were
1323 persisted with the graph. Default is to check that they are the
1324 same.
1326 Returns
1327 -------
1328 graph : `PipelineGraph`
1329 Deserialized pipeline graph.
1331 Raises
1332 ------
1333 PipelineGraphReadError
1334 Raised if the serialized `PipelineGraph` is not self-consistent.
1335 EdgesChangedError
1336 Raised if ``import_mode`` is
1337 `TaskImportMode.REQUIRED_CONSISTENT_EDGES` and the edges of a task
1338 did change after import and reconfiguration.
1340 Notes
1341 -----
1342 `PipelineGraph` serialization is currently experimental and may be
1343 removed or significantly changed in the future, with no deprecation
1344 period.
1345 """
1346 uri = ResourcePath(uri)
1347 with uri.open("rb") as stream:
1348 return cls._read_stream(cast(BinaryIO, stream), import_mode=import_mode)
1350 def _write_stream(self, stream: BinaryIO) -> None:
1351 """Write the pipeline to a file-like object.
1353 Parameters
1354 ----------
1355 stream
1356 File-like object opened for binary writing.
1358 Notes
1359 -----
1360 `PipelineGraph` serialization is currently experimental and may be
1361 removed or significantly changed in the future, with no deprecation
1362 period.
1364 The file format is gzipped JSON, and is intended to be human-readable,
1365 but it should not be considered a stable public interface for outside
1366 code, which should always use `PipelineGraph` methods (or at least the
1367 `io.SerializedPipelineGraph` class) to read these files.
1368 """
1369 from .io import SerializedPipelineGraph
1371 with gzip.open(stream, mode="wb") as compressed_stream:
1372 compressed_stream.write(
1373 SerializedPipelineGraph.serialize(self).model_dump_json(exclude_defaults=True).encode("utf-8")
1374 )
1376 def _write_uri(self, uri: ResourcePathExpression) -> None:
1377 """Write the pipeline to a file given a URI.
1379 Parameters
1380 ----------
1381 uri : convertible to `lsst.resources.ResourcePath`
1382 URI to write to . May have ``.json.gz`` or no extension (which
1383 will cause a ``.json.gz`` extension to be added).
1385 Notes
1386 -----
1387 `PipelineGraph` serialization is currently experimental and may be
1388 removed or significantly changed in the future, with no deprecation
1389 period.
1391 The file format is gzipped JSON, and is intended to be human-readable,
1392 but it should not be considered a stable public interface for outside
1393 code, which should always use `PipelineGraph` methods (or at least the
1394 `io.SerializedPipelineGraph` class) to read these files.
1395 """
1396 uri = ResourcePath(uri)
1397 extension = uri.getExtension()
1398 if not extension:
1399 uri = uri.updatedExtension(".json.gz")
1400 elif extension != ".json.gz":
1401 raise ValueError("Expanded pipeline files should always have a .json.gz extension.")
1402 with uri.open(mode="wb") as stream:
1403 self._write_stream(cast(BinaryIO, stream))
1405 def _import_and_configure(
1406 self, import_mode: TaskImportMode = TaskImportMode.REQUIRE_CONSISTENT_EDGES
1407 ) -> None:
1408 """Import the `PipelineTask` classes referenced by all task nodes and
1409 update those nodes accordingly.
1411 Parameters
1412 ----------
1413 import_mode : `TaskImportMode`, optional
1414 Whether to import tasks, and how to reconcile any differences
1415 between the imported task's connections and the those that were
1416 persisted with the graph. Default is to check that they are the
1417 same. This method does nothing if this is
1418 `TaskImportMode.DO_NOT_IMPORT`.
1420 Raises
1421 ------
1422 EdgesChangedError
1423 Raised if ``import_mode`` is
1424 `TaskImportMode.REQUIRED_CONSISTENT_EDGES` and the edges of a task
1425 did change after import and reconfiguration.
1427 Notes
1428 -----
1429 This method shouldn't need to be called unless the graph was
1430 deserialized without importing and configuring immediately, which is
1431 not the default behavior (but it can greatly speed up deserialization).
1432 If all tasks have already been imported this does nothing.
1434 Importing and configuring a task can change its
1435 `~TaskNode.task_class_name` or `~TaskClass.get_config_str` output,
1436 usually because the software used to read a serialized graph is newer
1437 than the software used to write it (e.g. a new config option has been
1438 added, or the task was moved to a new module with a forwarding alias
1439 left behind). These changes are allowed by
1440 `TaskImportMode.REQUIRE_CONSISTENT_EDGES`.
1442 If importing and configuring a task causes its edges to change, any
1443 dataset type nodes linked to those edges will be reset to the
1444 unresolved state.
1445 """
1446 if import_mode is TaskImportMode.DO_NOT_IMPORT:
1447 return
1448 rebuild = (
1449 import_mode is TaskImportMode.REQUIRE_CONSISTENT_EDGES
1450 or import_mode is TaskImportMode.OVERRIDE_EDGES
1451 )
1452 updates: dict[str, TaskNode] = {}
1453 node_key: NodeKey
1454 for node_key, node_state in self._xgraph.nodes.items():
1455 if node_key.node_type is NodeType.TASK:
1456 task_node: TaskNode = node_state["instance"]
1457 new_task_node = task_node._imported_and_configured(rebuild)
1458 if new_task_node is not task_node:
1459 updates[task_node.label] = new_task_node
1460 self._replace_task_nodes(
1461 updates,
1462 check_edges_unchanged=(import_mode is TaskImportMode.REQUIRE_CONSISTENT_EDGES),
1463 assume_edges_unchanged=(import_mode is TaskImportMode.ASSUME_CONSISTENT_EDGES),
1464 message_header=(
1465 "In task with label {task_label!r}, persisted edges (A)"
1466 "differ from imported and configured edges (B):"
1467 ),
1468 )
1470 ###########################################################################
1471 #
1472 # Advanced PipelineGraph Inspection Interface:
1473 #
1474 # - methods to iterate over all nodes and edges, utilizing NodeKeys;
1475 #
1476 # - methods to find overall inputs and group nodes by their dimensions,
1477 # which are important operations for QuantumGraph generation.
1478 #
1479 ###########################################################################
1481 def iter_edges(self, init: bool = False) -> Iterator[Edge]:
1482 """Iterate over edges in the graph.
1484 Parameters
1485 ----------
1486 init : `bool`, optional
1487 If `True` (`False` is default) iterate over the edges between task
1488 initialization node and init input/output dataset types, instead of
1489 the runtime task nodes and regular input/output/prerequisite
1490 dataset types.
1492 Returns
1493 -------
1494 edges : `~collections.abc.Iterator` [ `Edge` ]
1495 A lazy iterator over `Edge` (`WriteEdge` or `ReadEdge`) instances.
1497 Notes
1498 -----
1499 This method always returns _either_ init edges or runtime edges, never
1500 both. The full (internal) graph that contains both also includes a
1501 special edge that connects each task init node to its runtime node;
1502 that is also never returned by this method, since it is never a part of
1503 the init-only or runtime-only subgraphs.
1504 """
1505 edge: Edge
1506 for _, _, edge in self._xgraph.edges(data="instance"):
1507 if edge is not None and edge.is_init == init:
1508 yield edge
1510 def iter_nodes(
1511 self,
1512 ) -> Iterator[
1513 tuple[Literal[NodeType.TASK_INIT], str, TaskInitNode]
1514 | tuple[Literal[NodeType.TASK], str, TaskInitNode]
1515 | tuple[Literal[NodeType.DATASET_TYPE], str, DatasetTypeNode | None]
1516 ]:
1517 """Iterate over nodes in the graph.
1519 Returns
1520 -------
1521 nodes : `~collections.abc.Iterator` [ `tuple` ]
1522 A lazy iterator over all of the nodes in the graph. Each yielded
1523 element is a tuple of:
1525 - the node type enum value (`NodeType`);
1526 - the string name for the node (task label or parent dataset type
1527 name);
1528 - the node value (`TaskNode`, `TaskInitNode`, `DatasetTypeNode`,
1529 or `None` for dataset type nodes that have not been resolved).
1530 """
1531 key: NodeKey
1532 if self._sorted_keys is not None:
1533 for key in self._sorted_keys:
1534 yield key.node_type, key.name, self._xgraph.nodes[key]["instance"] # type: ignore
1535 else:
1536 for key, node in self._xgraph.nodes(data="instance"):
1537 yield key.node_type, key.name, node # type: ignore
1539 def iter_overall_inputs(self) -> Iterator[tuple[str, DatasetTypeNode | None]]:
1540 """Iterate over all of the dataset types that are consumed but not
1541 produced by the graph.
1543 Returns
1544 -------
1545 dataset_types : `~collections.abc.Iterator` [ `tuple` ]
1546 A lazy iterator over the overall-input dataset types (including
1547 overall init inputs and prerequisites). Each yielded element is a
1548 tuple of:
1550 - the parent dataset type name;
1551 - the resolved `DatasetTypeNode`, or `None` if the dataset type has
1552 not been resolved.
1553 """
1554 for generation in networkx.algorithms.dag.topological_generations(self._xgraph):
1555 key: NodeKey
1556 for key in generation:
1557 # While we expect all tasks to have at least one input and
1558 # hence never appear in the first topological generation, that
1559 # is not true of task init nodes.
1560 if key.node_type is NodeType.DATASET_TYPE:
1561 yield key.name, self._xgraph.nodes[key]["instance"]
1562 return
1564 def group_by_dimensions(
1565 self, prerequisites: bool = False
1566 ) -> dict[DimensionGroup, tuple[dict[str, TaskNode], dict[str, DatasetTypeNode]]]:
1567 """Group this graph's tasks and dataset types by their dimensions.
1569 Parameters
1570 ----------
1571 prerequisites : `bool`, optional
1572 If `True`, include prerequisite dataset types as well as regular
1573 input and output datasets (including intermediates).
1575 Returns
1576 -------
1577 groups : `dict` [ `~lsst.daf.butler.DimensionGroup`, `tuple` ]
1578 A dictionary of groups keyed by `~lsst.daf.butler.DimensionGroup`,
1579 in which each value is a tuple of:
1581 - a `dict` of `TaskNode` instances, keyed by task label
1582 - a `dict` of `DatasetTypeNode` instances, keyed by
1583 dataset type name.
1585 that have those dimensions.
1587 Notes
1588 -----
1589 Init inputs and outputs are always included, but always have empty
1590 dimensions and are hence are all grouped together.
1591 """
1592 result: dict[DimensionGroup, tuple[dict[str, TaskNode], dict[str, DatasetTypeNode]]] = {}
1593 next_new_value: tuple[dict[str, TaskNode], dict[str, DatasetTypeNode]] = ({}, {})
1594 for task_label, task_node in self.tasks.items():
1595 if task_node.dimensions is None:
1596 raise UnresolvedGraphError(f"Task with label {task_label!r} has not been resolved.")
1597 if (group := result.setdefault(task_node.dimensions, next_new_value)) is next_new_value:
1598 next_new_value = ({}, {}) # make new lists for next time
1599 group[0][task_node.label] = task_node
1600 for dataset_type_name, dataset_type_node in self.dataset_types.items():
1601 if dataset_type_node is None:
1602 raise UnresolvedGraphError(f"Dataset type {dataset_type_name!r} has not been resolved.")
1603 if not dataset_type_node.is_prerequisite or prerequisites:
1604 if (
1605 group := result.setdefault(dataset_type_node.dataset_type.dimensions, next_new_value)
1606 ) is next_new_value:
1607 next_new_value = ({}, {}) # make new lists for next time
1608 group[1][dataset_type_node.name] = dataset_type_node
1609 return result
1611 def get_all_dimensions(self, prerequisites: bool = True) -> DimensionGroup:
1612 """Return all dimensions used in this graph's tasks and dataset types.
1614 Parameters
1615 ----------
1616 prerequisites : `bool`, optional
1617 If `False`, do not include the dimensions that are only used by
1618 prerequisite input dataset types.
1620 Returns
1621 -------
1622 dimensions : `~lsst.daf.butler.DimensionGroup`.
1623 All dimensions in this pipeline.
1624 """
1625 return DimensionGroup.union(
1626 *self.group_by_dimensions(prerequisites=prerequisites).keys(),
1627 universe=self.universe,
1628 )
1630 def split_independent(self) -> Iterable[PipelineGraph]:
1631 """Iterate over independent subgraphs that together comprise this
1632 pipeline graph.
1634 Returns
1635 -------
1636 subgraphs : `~collections.abc.Iterable` [ `PipelineGraph` ]
1637 An iterable over component subgraphs that could be run
1638 independently (they have only overall inputs in common). May be a
1639 lazy iterator.
1641 Notes
1642 -----
1643 All resolved dataset type nodes will be preserved.
1645 If there is only one component, ``self`` may be returned as the only
1646 element in the iterable.
1648 If `has_been_sorted`, all subgraphs will be sorted as well.
1650 Task subsets will not be included in the returned graphs.
1651 """
1652 # Having an overall input in common isn't enough to make subgraphs
1653 # dependent on each other, so we want to look for connected component
1654 # subgraphs of the task-only projected graph.
1655 bipartite_xgraph = self._make_bipartite_xgraph_internal(init=False)
1656 task_keys = {
1657 key
1658 for key, bipartite in bipartite_xgraph.nodes(data="bipartite")
1659 if bipartite == NodeType.TASK.bipartite
1660 }
1661 task_xgraph = networkx.algorithms.bipartite.projected_graph(
1662 networkx.DiGraph(bipartite_xgraph), task_keys
1663 )
1664 # "Weakly" connected means connected in only one direction, which is
1665 # the only kind of "connected" a DAG can ever be.
1666 for component_task_keys in networkx.algorithms.weakly_connected_components(task_xgraph):
1667 if component_task_keys == task_keys:
1668 yield self
1669 return
1670 else:
1671 component_subgraph = PipelineGraph(universe=self._universe, data_id=self._raw_data_id)
1672 component_subgraph.add_task_nodes(
1673 [self._xgraph.nodes[key]["instance"] for key in component_task_keys], parent=self
1674 )
1675 if self.has_been_sorted:
1676 component_subgraph.sort()
1677 yield component_subgraph
1679 ###########################################################################
1680 #
1681 # Data repository/collection initialization
1682 #
1683 ###########################################################################
1685 @property
1686 def packages_dataset_type(self) -> DatasetType:
1687 """The special "packages" dataset type that records software versions.
1689 This is not associated with a task and hence is
1690 not considered part of the pipeline graph in other respects, but it
1691 does get written with other provenance datasets.
1692 """
1693 if self._universe is None:
1694 raise UnresolvedGraphError(
1695 "PipelineGraph must be resolved in order to get the packages dataset type."
1696 )
1697 return DatasetType(
1698 PACKAGES_INIT_OUTPUT_NAME, self._universe.empty, PACKAGES_INIT_OUTPUT_STORAGE_CLASS
1699 )
1701 def register_dataset_types(
1702 self,
1703 butler: Butler,
1704 include_packages: bool = True,
1705 *,
1706 include_inputs: bool = True,
1707 include_configs: bool = True,
1708 include_logs: bool = True,
1709 ) -> None:
1710 """Register all dataset types in a data repository.
1712 Parameters
1713 ----------
1714 butler : `~lsst.daf.butler.Butler`
1715 Data repository client.
1716 include_packages : `bool`, optional
1717 Whether to include the special "packages" dataset type that records
1718 software versions (this is not associated with a task and hence is
1719 not considered part of the pipeline graph in other respects, but it
1720 does get written with other provenance datasets).
1721 include_inputs : `bool`, optional
1722 Whether to register overall-input dataset types as well as outputs.
1723 include_configs : `bool`, optional
1724 Whether to register task config dataset types.
1725 include_logs : `bool`, optional
1726 Whether to register task log dataset types.
1727 """
1728 dataset_types = {
1729 node.name: node.dataset_type
1730 for node in self.dataset_types.values()
1731 if include_inputs or self.producer_of(node.name) is not None
1732 }
1733 if include_packages:
1734 dataset_types[self.packages_dataset_type.name] = self.packages_dataset_type
1735 if not include_configs:
1736 for task_node in self.tasks.values():
1737 del dataset_types[task_node.init.config_output.dataset_type_name]
1738 if not include_logs:
1739 for task_node in self.tasks.values():
1740 if task_node.log_output is not None:
1741 del dataset_types[task_node.log_output.dataset_type_name]
1742 for dataset_type in dataset_types.values():
1743 butler.registry.registerDatasetType(dataset_type)
1745 def check_dataset_type_registrations(self, butler: Butler, include_packages: bool = True) -> None:
1746 """Check that dataset type registrations in a data repository match
1747 the definitions in this pipeline graph.
1749 Parameters
1750 ----------
1751 butler : `~lsst.daf.butler.Butler`
1752 Data repository client.
1753 include_packages : `bool`, optional
1754 Whether to include the special "packages" dataset type that records
1755 software versions (this is not associated with a task and hence is
1756 not considered part of the pipeline graph in other respects, but it
1757 does get written with other provenance datasets).
1759 Returns
1760 -------
1761 None
1763 Raises
1764 ------
1765 lsst.daf.butler.MissingDatasetTypeError
1766 Raised if one or more non-optional-input or output dataset types in
1767 the pipeline is not registered at all.
1768 lsst.daf.butler.ConflictingDefinitionError
1769 Raised if the definition in the data repository is not identical
1770 to the definition in the pipeline graph.
1772 Notes
1773 -----
1774 Note that dataset type definitions that are storage-class-conversion
1775 compatible but not identical are not permitted by these checks, because
1776 the expectation is that these differences are handled by `resolve`,
1777 which makes the pipeline graph use the data repository definitions.
1778 This method is intended to check that none of those definitions have
1779 changed.
1780 """
1781 dataset_types = [node.dataset_type for node in self.dataset_types.values()]
1782 if include_packages:
1783 dataset_types.append(self.packages_dataset_type)
1784 missing_dataset_types: list[str] = []
1785 for dataset_type in dataset_types:
1786 try:
1787 expected = butler.registry.getDatasetType(dataset_type.name)
1788 except MissingDatasetTypeError:
1789 expected = None
1790 if expected is None:
1791 # The user probably forgot to register dataset types
1792 # at least once (which should be an error),
1793 # but we could also get here if this is an optional input for
1794 # which no datasets were found in this repo (not an error).
1795 if (
1796 not (
1797 self.producer_of(dataset_type.name) is None
1798 and all(
1799 self.tasks[input_edge.task_label].is_optional(input_edge.connection_name)
1800 for input_edge in self.consuming_edges_of(dataset_type.name)
1801 )
1802 )
1803 or dataset_type.name == PACKAGES_INIT_OUTPUT_NAME
1804 ):
1805 missing_dataset_types.append(dataset_type.name)
1806 elif expected != dataset_type:
1807 raise ConflictingDefinitionError(
1808 f"DatasetType definition in registry has changed since the pipeline graph was resolved: "
1809 f"{dataset_type} (graph) != {expected} (registry)."
1810 )
1811 if missing_dataset_types:
1812 plural = "s" if len(missing_dataset_types) != 1 else ""
1813 raise MissingDatasetTypeError(
1814 f"Missing dataset type definition{plural}: {', '.join(missing_dataset_types)}. "
1815 "Dataset types have to be registered in advance (on the command-line, either via "
1816 "`butler register-dataset-type` or the `--register-dataset-types` option to `pipetask run`."
1817 )
1819 def instantiate_tasks(
1820 self,
1821 get_init_input: Callable[[DatasetType], Any] | None = None,
1822 init_outputs: list[tuple[Any, DatasetType]] | None = None,
1823 labels: Iterable[str] | None = None,
1824 ) -> list[PipelineTask]:
1825 """Instantiate all tasks in the pipeline.
1827 Parameters
1828 ----------
1829 get_init_input : `~collections.abc.Callable`, optional
1830 Callable that accepts a single `~lsst.daf.butler.DatasetType`
1831 parameter and returns the init-input dataset associated with that
1832 dataset type. Must respect the storage class embedded in the type.
1833 This is optional if the pipeline does not have any overall init
1834 inputs. When a full butler is available,
1835 `lsst.daf.butler.Butler.get` can be used directly here.
1836 init_outputs : `list`, optional
1837 A list of ``(obj, dataset type)`` init-output dataset pairs, to be
1838 appended to in-place. Both the object and the dataset type will
1839 correspond to the storage class of the output connection, which
1840 may not be the same as the storage class on the graph's dataset
1841 type node.
1842 labels : `~collections.abc.Iterable` [ `str` ], optional
1843 The labels of tasks to instantiate. If not provided, all tasks in
1844 the graph will be instantiated.
1846 Returns
1847 -------
1848 tasks : `list`
1849 Constructed `PipelineTask` instances.
1850 """
1851 if not self.is_fully_resolved:
1852 raise UnresolvedGraphError("Pipeline graph must be fully resolved before instantiating tasks.")
1853 empty_data_id = DataCoordinate.make_empty(self.universe)
1854 labels = set(labels) if labels is not None else self.tasks.keys()
1855 handles: dict[str, InMemoryDatasetHandle] = {}
1856 tasks: list[PipelineTask] = []
1857 for task_node in self.tasks.values():
1858 if task_node.label not in labels:
1859 continue
1860 task_init_inputs: dict[str, Any] = {}
1861 for read_edge in task_node.init.inputs.values():
1862 if (handle := handles.get(read_edge.dataset_type_name)) is not None:
1863 obj = handle.get(storageClass=read_edge.storage_class_name)
1864 elif (
1865 read_edge.component is not None
1866 and (parent_handle := handles.get(read_edge.parent_dataset_type_name)) is not None
1867 ):
1868 obj = parent_handle.get(
1869 storageClass=read_edge.storage_class_name, component=read_edge.component
1870 )
1871 else:
1872 dataset_type_node = self.dataset_types[read_edge.parent_dataset_type_name]
1873 if get_init_input is None:
1874 raise ValueError(
1875 f"Task {task_node.label!r} requires init-input "
1876 f"{read_edge.dataset_type_name} but no 'get_init_input' callback was provided."
1877 )
1878 obj = get_init_input(read_edge.adapt_dataset_type(dataset_type_node.dataset_type))
1879 n_consumers = len(self.consumers_of(dataset_type_node.name))
1880 if (
1881 n_consumers > 1
1882 and read_edge.component is None
1883 and read_edge.storage_class_name == dataset_type_node.storage_class_name
1884 ):
1885 # Caching what we just got is safe in general only
1886 # if there was no storage class conversion, since
1887 # a->b and a->c does not imply b->c.
1888 handles[read_edge.dataset_type_name] = InMemoryDatasetHandle(
1889 obj,
1890 storageClass=dataset_type_node.storage_class,
1891 dataId=empty_data_id,
1892 copy=True,
1893 )
1894 task_init_inputs[read_edge.connection_name] = obj
1895 task = task_node.task_class(
1896 config=task_node.config, initInputs=task_init_inputs, name=task_node.label
1897 )
1898 tasks.append(task)
1899 for write_edge in task_node.init.outputs.values():
1900 dataset_type_node = self.dataset_types[write_edge.parent_dataset_type_name]
1901 obj = getattr(task, write_edge.connection_name)
1902 # We don't immediately coerce obj to the dataset_type_node
1903 # storage class (which should be the repo storage class, if
1904 # there is one) when appending to `init_outputs` because a
1905 # formatter might be able to do a better job of that later;
1906 # instead we pair it with a dataset type that's consistent with
1907 # the in-memory type. We do coerce when populating `handles`,
1908 # though, because going through the dataset_type_node storage
1909 # class is the conversion path we checked when we resolved the
1910 # pipeline graph.
1911 if init_outputs is not None:
1912 init_outputs.append((obj, write_edge.adapt_dataset_type(dataset_type_node.dataset_type)))
1913 n_consumers = len(self.consumers_of(dataset_type_node.name))
1914 if n_consumers > 0:
1915 handles[dataset_type_node.name] = InMemoryDatasetHandle(
1916 dataset_type_node.storage_class.coerce_type(obj),
1917 dataId=empty_data_id,
1918 storageClass=dataset_type_node.storage_class,
1919 copy=(n_consumers > 1),
1920 )
1921 return tasks
1923 def write_init_outputs(self, butler: Butler) -> None:
1924 """Write the init-output datasets for all tasks in the pipeline graph.
1926 Parameters
1927 ----------
1928 butler : `lsst.daf.butler.Butler`
1929 A full butler data repository client with its default run set
1930 to the collection where datasets should be written.
1932 Notes
1933 -----
1934 Datasets that already exist in the butler's output run collection will
1935 not be written.
1937 This method writes outputs with new random dataset IDs and should
1938 hence only be used when writing init-outputs prior to building a
1939 `QuantumGraph`. Use `QuantumGraph.write_init_outputs` if a quantum
1940 graph has already been built.
1941 """
1942 init_outputs: list[tuple[Any, DatasetType]] = []
1943 self.instantiate_tasks(butler.get, init_outputs)
1944 found_refs: dict[str, DatasetRef] = {}
1945 to_put: list[tuple[Any, DatasetType]] = []
1946 for obj, dataset_type in init_outputs:
1947 if (ref := butler.find_dataset(dataset_type, collections=butler.run)) is not None:
1948 found_refs[dataset_type.name] = ref
1949 else:
1950 to_put.append((obj, dataset_type))
1951 for ref, stored in butler.stored_many(found_refs.values()).items():
1952 if not stored:
1953 raise FileNotFoundError(
1954 f"Init-output dataset {ref.datasetType.name!r} was found in RUN {ref.run!r} "
1955 f"but had not actually been stored (or was stored and later deleted)."
1956 )
1957 for obj, dataset_type in to_put:
1958 butler.put(obj, dataset_type)
1960 def write_configs(self, butler: Butler) -> None:
1961 """Write the config datasets for all tasks in the pipeline graph.
1963 Parameters
1964 ----------
1965 butler : `lsst.daf.butler.Butler`
1966 A full butler data repository client with its default run set
1967 to the collection where datasets should be written.
1969 Notes
1970 -----
1971 Config datasets that already exist in the butler's output run
1972 collection will be checked for consistency.
1974 This method writes outputs with new random dataset IDs and should
1975 hence only be used when writing init-outputs prior to building a
1976 `QuantumGraph`. Use `QuantumGraph.write_configs` if a quantum graph
1977 has already been built.
1979 Raises
1980 ------
1981 lsst.daf.butler.registry.ConflictingDefinitionError
1982 Raised if a config dataset already exists and is not consistent
1983 with the config in the pipeline graph.
1984 """
1985 to_put: list[tuple[PipelineTaskConfig, str]] = []
1986 for task_node in self.tasks.values():
1987 dataset_type_name = task_node.init.config_output.dataset_type_name
1988 if (ref := butler.find_dataset(dataset_type_name, collections=butler.run)) is not None:
1989 old_config = butler.get(ref)
1990 if not task_node.config.compare(old_config, shortcut=False, output=log_config_mismatch):
1991 raise ConflictingDefinitionError(
1992 f"Config does not match existing task config {dataset_type_name!r} in "
1993 "butler; tasks configurations must be consistent within the same run collection"
1994 )
1995 else:
1996 to_put.append((task_node.config, dataset_type_name))
1997 # We do writes at the end to minimize the mess we leave behind when we
1998 # raise an exception.
1999 for config, dataset_type_name in to_put:
2000 butler.put(config, dataset_type_name)
2002 def write_packages(self, butler: Butler) -> None:
2003 """Write the 'packages' dataset for the currently-active software
2004 versions.
2006 Parameters
2007 ----------
2008 butler : `lsst.daf.butler.Butler`
2009 A full butler data repository client with its default run set
2010 to the collection where datasets should be written.
2012 Notes
2013 -----
2014 If the packages dataset already exists, it will be compared to the
2015 versions in the current packages. New packages that weren't present
2016 before are not considered an inconsistency.
2018 This method writes outputs with new random dataset IDs and should
2019 hence only be used when writing init-outputs prior to building a
2020 `QuantumGraph`. Use `QuantumGraph.write_packages` if a quantum graph
2021 has already been built.
2023 Raises
2024 ------
2025 lsst.daf.butler.registry.ConflictingDefinitionError
2026 Raised if the packages dataset already exists and is not consistent
2027 with the current packages.
2028 """
2029 new_packages = Packages.fromSystem()
2030 if (ref := butler.find_dataset(self.packages_dataset_type)) is not None:
2031 packages = butler.get(ref)
2032 if compare_packages(packages, new_packages):
2033 # have to remove existing dataset first; butler has no
2034 # replace option.
2035 butler.pruneDatasets([ref], unstore=True, purge=True)
2036 butler.put(packages, ref)
2037 else:
2038 butler.put(new_packages, self.packages_dataset_type)
2040 def init_output_run(self, butler: Butler) -> None:
2041 """Initialize a new output RUN collection by writing init-output
2042 datasets (including configs and packages).
2044 Parameters
2045 ----------
2046 butler : `lsst.daf.butler.Butler`
2047 A full butler data repository client with its default run set
2048 to the collection where datasets should be written.
2049 """
2050 self.write_configs(butler)
2051 self.write_packages(butler)
2052 self.write_init_outputs(butler)
2054 ###########################################################################
2055 #
2056 # Class- and Package-Private Methods.
2057 #
2058 ###########################################################################
2060 def _iter_task_defs(self) -> Iterator[TaskDef]:
2061 """Iterate over this pipeline as a sequence of `TaskDef` instances.
2063 Notes
2064 -----
2065 This is a package-private method intended to aid in the transition to a
2066 codebase more fully integrated with the `PipelineGraph` class, in which
2067 both `TaskDef` and `PipelineDatasetTypes` are expected to go away, and
2068 much of the functionality on the `Pipeline` class will be moved to
2069 `PipelineGraph` as well.
2071 Raises
2072 ------
2073 TaskNotImportedError
2074 Raised if `TaskNode.is_imported` is `False` for any task.
2075 """
2076 from ..pipeline import TaskDef
2078 for node in self._tasks.values():
2079 yield TaskDef(
2080 config=node.config,
2081 taskClass=node.task_class,
2082 label=node.label,
2083 connections=node.get_connections(),
2084 )
2086 def _init_from_args(
2087 self,
2088 xgraph: networkx.MultiDiGraph | None,
2089 sorted_keys: Sequence[NodeKey] | None,
2090 task_subsets: dict[str, TaskSubset] | None,
2091 description: str,
2092 universe: DimensionUniverse | None,
2093 data_id: DataId | None,
2094 step_definitions: StepDefinitions,
2095 ) -> None:
2096 """Initialize the graph with possibly-nontrivial arguments.
2098 Parameters
2099 ----------
2100 xgraph : `networkx.MultiDiGraph` or `None`
2101 The backing networkx graph, or `None` to create an empty one.
2102 This graph has `NodeKey` instances for nodes and the same structure
2103 as the graph exported by `make_xgraph`, but its nodes and edges
2104 have a single ``instance`` attribute that holds a `TaskNode`,
2105 `TaskInitNode`, `DatasetTypeNode` (or `None`), `ReadEdge`, or
2106 `WriteEdge` instance.
2107 sorted_keys : `~collections.abc.Sequence` [ `NodeKey` ] or `None`
2108 Topologically sorted sequence of node keys, or `None` if the graph
2109 is not sorted.
2110 task_subsets : `dict` [ `str`, `TaskSubset` ]
2111 Labeled subsets of tasks. Values must be constructed with
2112 ``xgraph`` as their parent graph.
2113 description : `str`
2114 String description for this pipeline.
2115 universe : `lsst.daf.butler.DimensionUniverse` or `None`
2116 Definitions of all dimensions.
2117 data_id : `lsst.daf.butler.DataCoordinate` or other data ID mapping.
2118 Data ID that represents a constraint on all quanta generated from
2119 this pipeline.
2120 step_definitions : `StepDefinitions`
2121 Struct holding information about steps.
2123 Notes
2124 -----
2125 Only empty `PipelineGraph` instances should be constructed directly by
2126 users, which sets the signature of ``__init__`` itself, but methods on
2127 `PipelineGraph` and its helper classes need to be able to create them
2128 with state. Those methods can call this after calling ``__new__``
2129 manually, skipping ``__init__``.
2130 """
2131 self._xgraph = xgraph if xgraph is not None else networkx.MultiDiGraph()
2132 self._sorted_keys: Sequence[NodeKey] | None = None
2133 self._task_subsets = task_subsets if task_subsets is not None else {}
2134 self._description = description
2135 self._tasks = TaskMappingView(self._xgraph)
2136 self._dataset_types = DatasetTypeMappingView(self._xgraph)
2137 self._raw_data_id: dict[str, Any]
2138 if isinstance(data_id, DataCoordinate):
2139 if universe is None:
2140 universe = data_id.universe
2141 else:
2142 assert universe is data_id.universe, "data_id.universe and given universe differ"
2143 self._raw_data_id = dict(data_id.required)
2144 elif data_id is None:
2145 self._raw_data_id = {}
2146 else:
2147 self._raw_data_id = dict(data_id)
2148 self._universe = universe
2149 if sorted_keys is not None:
2150 self._reorder(sorted_keys)
2151 self._step_definitions = step_definitions
2153 def _make_bipartite_xgraph_internal(self, init: bool) -> networkx.MultiDiGraph:
2154 """Make a bipartite init-only or runtime-only internal subgraph.
2156 See `make_bipartite_xgraph` for parameters and return values.
2158 Notes
2159 -----
2160 This method returns a view of the `PipelineGraph` object's internal
2161 backing graph, and hence should only be called in methods that copy the
2162 result either explicitly or by running a copying algorithm before
2163 returning it to the user.
2164 """
2165 return self._xgraph.edge_subgraph([edge.key for edge in self.iter_edges(init)])
2167 def _make_task_xgraph_internal(self, init: bool) -> networkx.DiGraph:
2168 """Make a init-only or runtime-only internal task subgraph.
2170 See `make_task_xgraph` for parameters and return values.
2172 Notes
2173 -----
2174 This method returns a view of the `PipelineGraph` object's internal
2175 backing graph, and hence should only be called in methods that copy the
2176 result either explicitly or by running a copying algorithm before
2177 returning it to the user.
2178 """
2179 bipartite_xgraph = self._make_bipartite_xgraph_internal(init=init)
2180 task_keys = [
2181 key
2182 for key, bipartite in bipartite_xgraph.nodes(data="bipartite")
2183 if bipartite == NodeType.TASK.bipartite
2184 ]
2185 return networkx.algorithms.bipartite.projected_graph(networkx.DiGraph(bipartite_xgraph), task_keys)
2187 def _transform_xgraph_state[G: networkx.DiGraph | networkx.MultiDiGraph](
2188 self, xgraph: G, skip_edges: bool
2189 ) -> G:
2190 """Transform networkx graph attributes in-place from the internal
2191 "instance" attributes to the documented exported attributes.
2193 Parameters
2194 ----------
2195 xgraph : `networkx.DiGraph` or `networkx.MultiDiGraph`
2196 Graph whose state should be transformed.
2197 skip_edges : `bool`
2198 If `True`, do not transform edge state.
2200 Returns
2201 -------
2202 xgraph : `networkx.DiGraph` or `networkx.MultiDiGraph`
2203 The same object passed in, after modification.
2205 Notes
2206 -----
2207 This should be called after making a copy of the internal graph but
2208 before any projection down to just task or dataset type nodes, since
2209 it assumes stateful edges.
2210 """
2211 state: dict[str, Any]
2212 for state in xgraph.nodes.values():
2213 node_value: TaskInitNode | TaskNode | DatasetTypeNode | None = state.pop("instance")
2214 if node_value is not None:
2215 state.update(node_value._to_xgraph_state())
2216 else:
2217 # This is a dataset type node that is not resolved.
2218 state["bipartite"] = NodeType.DATASET_TYPE.bipartite
2219 if not skip_edges:
2220 for _, _, state in xgraph.edges(data=True):
2221 edge: Edge | None = state.pop("instance", None)
2222 if edge is not None:
2223 state.update(edge._to_xgraph_state())
2224 return xgraph
2226 def _replace_task_nodes(
2227 self,
2228 updates: Mapping[str, TaskNode],
2229 check_edges_unchanged: bool,
2230 assume_edges_unchanged: bool,
2231 message_header: str,
2232 ) -> None:
2233 """Replace task nodes and update edges and dataset type nodes
2234 accordingly.
2236 Parameters
2237 ----------
2238 updates : `~collections.abc.Mapping` [ `str`, `TaskNode` ]
2239 New task nodes with task label keys. All keys must be task labels
2240 that are already present in the graph.
2241 check_edges_unchanged : `bool`, optional
2242 If `True`, require the edges (connections) of the modified tasks to
2243 remain unchanged after importing and configuring each task, and
2244 verify that this is the case.
2245 assume_edges_unchanged : `bool`, optional
2246 If `True`, the caller declares that the edges (connections) of the
2247 modified tasks will remain unchanged importing and configuring each
2248 task, and that it is unnecessary to check this.
2249 message_header : `str`
2250 Template for `str.format` with a single ``task_label`` placeholder
2251 to use as the first line in `EdgesChangedError` messages that show
2252 the differences between new task edges and old task edges. Should
2253 include the fact that the rest of the message will refer to the old
2254 task as "A" and the new task as "B", and end with a colon.
2256 Raises
2257 ------
2258 ValueError
2259 Raised if ``assume_edges_unchanged`` and ``check_edges_unchanged``
2260 are both `True`, or if a full config is provided for a task after
2261 another full config or an override has already been provided.
2262 EdgesChangedError
2263 Raised if ``check_edges_unchanged=True`` and the edges of a task do
2264 change.
2265 """
2266 deep: dict[str, TaskNode] = {}
2267 shallow: dict[str, TaskNode] = {}
2268 if assume_edges_unchanged:
2269 if check_edges_unchanged:
2270 raise ValueError("Cannot simultaneously assume and check that edges have not changed.")
2271 shallow.update(updates)
2272 else:
2273 for task_label, new_task_node in updates.items():
2274 old_task_node = self.tasks[task_label]
2275 messages = old_task_node.diff_edges(new_task_node)
2276 if messages:
2277 if check_edges_unchanged:
2278 messages.insert(0, message_header.format(task_label=task_label))
2279 raise EdgesChangedError("\n".join(messages))
2280 else:
2281 deep[task_label] = new_task_node
2282 else:
2283 shallow[task_label] = new_task_node
2284 try:
2285 if deep:
2286 removed = self.remove_tasks(deep.keys(), drop_from_subsets=True)
2287 self.add_task_nodes(deep.values())
2288 for replaced_task_node, referencing_subsets in removed:
2289 for subset_label in referencing_subsets:
2290 self._task_subsets[subset_label].add(replaced_task_node.label)
2291 for task_node in shallow.values():
2292 self._xgraph.nodes[task_node.key]["instance"] = task_node
2293 self._xgraph.nodes[task_node.init.key]["instance"] = task_node.init
2294 except PipelineGraphExceptionSafetyError: # pragma: no cover
2295 raise
2296 except Exception as err: # pragma: no cover
2297 # There's no known way to get here, but we want to make it clear
2298 # it's a big problem if we do.
2299 raise PipelineGraphExceptionSafetyError(
2300 "Error while replacing tasks has left the graph in an inconsistent state."
2301 ) from err
2303 def _append_graph_data_from_edge(
2304 self,
2305 node_data: list[tuple[NodeKey, dict[str, Any]]],
2306 edge_data: list[tuple[NodeKey, NodeKey, str, dict[str, Any]]],
2307 edge: Edge,
2308 parent: PipelineGraph | None,
2309 ) -> None:
2310 """Append networkx state dictionaries for an edge and the corresponding
2311 dataset type node.
2313 Parameters
2314 ----------
2315 node_data : `list`
2316 List of node keys and state dictionaries. A node is appended if
2317 one does not already exist for this dataset type.
2318 edge_data : `list`
2319 List of node key pairs, connection names, and state dictionaries
2320 for edges.
2321 edge : `Edge`
2322 New edge being processed.
2323 parent : `PipelineGraph` or `None`
2324 Another pipeline graph whose dataset type nodes should be used
2325 when present.
2326 """
2327 new_dataset_type_node = None
2328 if parent is not None:
2329 new_dataset_type_node = parent._xgraph.nodes[edge.dataset_type_key].get("instance")
2330 if (existing_dataset_type_state := self._xgraph.nodes.get(edge.dataset_type_key)) is not None:
2331 existing_dataset_type_state["instance"] = new_dataset_type_node
2332 else:
2333 node_data.append(
2334 (
2335 edge.dataset_type_key,
2336 {
2337 "instance": new_dataset_type_node,
2338 "bipartite": NodeType.DATASET_TYPE.bipartite,
2339 },
2340 )
2341 )
2342 edge_data.append(
2343 edge.nodes
2344 + (
2345 edge.connection_name,
2346 {"instance": edge},
2347 )
2348 )
2350 def _reorder(self, sorted_keys: Sequence[NodeKey]) -> None:
2351 """Set the order of all views of this graph from the given sorted
2352 sequence of task labels and dataset type names.
2353 """
2354 self._sorted_keys = sorted_keys
2355 self._tasks._reorder(sorted_keys)
2356 self._dataset_types._reorder(sorted_keys)
2358 def _reset(self) -> None:
2359 """Reset all views of this graph following a modification that might
2360 invalidate them.
2361 """
2362 self._sorted_keys = None
2363 self._tasks._reset()
2364 self._dataset_types._reset()
2366 def _resolve_step_flow(self) -> Sequence[NodeKey] | None:
2367 """Check that step definitions are consistent with the ordering of
2368 the graph's nodes and that they partition the task graph.
2370 Returns
2371 -------
2372 sort_keys : `~collections.abc.Sequence` [ `NodeKey` ] or `None`
2373 Sort order for the pipeline graph that is consistent with the
2374 step order, or `None` if there were no steps defined for this
2375 pipeline.
2376 """
2377 if not self._step_definitions:
2378 return None
2379 task_labels_so_far: set[str] = set()
2380 # Inputs we've already seen, and the tasks that wanted them:
2381 inputs_so_far: dict[str, set[str]] = {}
2382 sort_keys: list[NodeKey] = []
2383 keys_already_sorted: set[NodeKey] = set()
2384 for step_label in self.steps:
2385 try:
2386 task_subset = self.task_subsets[step_label]
2387 except KeyError:
2388 raise InvalidStepsError(f"Step {step_label!r} is not a task subset.") from None
2389 if not task_labels_so_far.isdisjoint(task_subset):
2390 raise InvalidStepsError(
2391 f"Step {step_label!r} repeats task(s) {task_labels_so_far & task_subset}."
2392 )
2393 task_labels_so_far.update(task_subset)
2394 # We need a temporary data structure for tracking the inputs of
2395 # just this step to avoid complaining about input-output
2396 # relationships within the step. We'll merge this into the
2397 # cumulative one later.
2398 step_inputs: dict[str, set[str]] = {}
2399 # We'll also gather all task, task-init and dataset type networkx
2400 # graph keys for the step so we can make and sort a step sub-graph.
2401 new_step_keys: set[NodeKey] = set()
2402 for task_label in task_subset:
2403 for input_name in self.inputs_of(task_label):
2404 step_inputs.setdefault(input_name, set()).add(task_label)
2405 # Check that none of the outputs of the tasks in this step were
2406 # expected as inputs of a previous step's tasks.
2407 task_outputs = self.outputs_of(task_label)
2408 if not inputs_so_far.keys().isdisjoint(task_outputs.keys()):
2409 msg: list[str] = []
2410 for input_name in inputs_so_far.keys() & task_outputs.keys():
2411 msg.append(f"{input_name} (used by {inputs_so_far[input_name]})")
2412 raise InvalidStepsError(
2413 f"Task {task_label} in step {step_label!r} produces dataset types "
2414 f"[{', '.join(msg)}], but these are consumed by tasks in earlier "
2415 "steps. Either steps are out of order or the graph is cyclic."
2416 )
2417 task_init_key = NodeKey(NodeType.TASK_INIT, task_label)
2418 task_key = NodeKey(NodeType.TASK, task_label)
2419 new_step_keys.add(task_init_key)
2420 new_step_keys.add(task_key)
2421 new_step_keys.update(self._xgraph.predecessors(task_init_key))
2422 new_step_keys.update(self._xgraph.predecessors(task_key))
2423 new_step_keys.update(self._xgraph.successors(task_init_key))
2424 new_step_keys.update(self._xgraph.successors(task_key))
2425 # Also record the step the task is in as a private xgraph
2426 # attribute so we can look up the step given a task (or,
2427 # indirectly, dataset type); this has to be used with care
2428 # because if the steps haven't been verified it can be wrong.
2429 self._xgraph.nodes[task_init_key]["step"] = step_label
2430 self._xgraph.nodes[task_key]["step"] = step_label
2431 # Drop step input keys that were already either inputs or outputs
2432 # of a previous step, since they'll have already been added to
2433 # sort_keys.
2434 new_step_keys.difference_update(keys_already_sorted)
2435 # Make the step subgraph, sort it, and extend the overall sort_keys
2436 # with result.
2437 step_xgraph = self._xgraph.subgraph(new_step_keys)
2438 sort_keys.extend(networkx.dag.lexicographical_topological_sort(step_xgraph))
2439 keys_already_sorted.update(new_step_keys)
2440 for input_name, consuming_tasks in step_inputs.items():
2441 inputs_so_far.setdefault(input_name, set()).update(consuming_tasks)
2442 if not task_labels_so_far.issuperset(self.tasks.keys()):
2443 # Note that the converse issubset test effectively happens when we
2444 # look up inputs and outputs of each task.
2445 raise InvalidStepsError(f"No step contains task(s) {self.tasks.keys() - task_labels_so_far}.")
2446 return sort_keys
2448 def _resolve_step_dimensions(self, universe: DimensionUniverse) -> None:
2449 """Check that step sharding dimensions are consistent with task and
2450 output dataset dimensions.
2452 Parameters
2453 ----------
2454 universe : `~lsst.daf.butler.DimensionUniverse`
2455 Definitions for all dimensions. Will be attached to the step
2456 definitions by this method.
2457 """
2458 self._step_definitions._universe = universe
2459 for step_label in self.steps:
2460 dimensions = self.steps.get_dimensions(step_label)
2461 for task_label in self.task_subsets[step_label]:
2462 task_node = self.tasks[task_label]
2463 if not _dimensions_compatible(dimensions, task_node.dimensions):
2464 raise InvalidStepsError(
2465 f"Dimensions {task_node.dimensions} of task {task_label!r} are not compatible with "
2466 f"the sharding dimensions {dimensions} of step {step_label!r}."
2467 )
2468 for dataset_type_node in self.outputs_of(task_label).values():
2469 assert dataset_type_node is not None, "dataset types should be resolved first"
2470 if not _dimensions_compatible(dimensions, dataset_type_node.dimensions):
2471 raise InvalidStepsError(
2472 f"Dimensions {dataset_type_node.dimensions} of dataset type "
2473 f"{dataset_type_node.name!r} (produced by task {task_label!r}) are not "
2474 f"compatible with the sharding dimensions {dimensions} of step "
2475 f"{step_label!r}."
2476 )
2478 def _select_expression(self, expr_tree: expressions.Node, task_xgraph: networkx.DiGraph) -> set[NodeKey]:
2479 """Select tasks from a pipeline based on a string expression.
2481 This is the primary implementation method for `select` and
2482 `select_tasks`.
2484 Parameters
2485 ----------
2486 expr_tree : `expressions.Node`
2487 Expression [sub]tree to process (recursively).
2488 task_xgraph : `networkx.DiGraph`
2489 NetworkX graph of all tasks (runtime nodes only) in the pipeline.
2491 Returns
2492 -------
2493 selected : `set` [ `NodeKey` ]
2494 Set of `NodeKey` objects for matching tasks (only; no dataset type
2495 or task-init nodes).
2496 """
2497 match expr_tree:
2498 case expressions.IdentifierNode(qualifier=qualifier, label=label):
2499 match self._select_identifier(qualifier, label):
2500 case NodeKey(node_type=NodeType.TASK) as task_key:
2501 return {task_key}
2502 case NodeKey(node_type=NodeType.DATASET_TYPE) as dataset_type_key:
2503 # Since a dataset type can have only one producer, this
2504 # yields 0- (for overall inputs) or 1-element sets.
2505 for producer_key, _ in self._xgraph.in_edges(dataset_type_key):
2506 if producer_key.node_type is NodeType.TASK_INIT:
2507 raise InvalidExpressionError(
2508 f"Init-output dataset type {label!r} cannot be used directly in an "
2509 "expression."
2510 )
2511 return {producer_key}
2512 return set()
2513 case TaskSubset() as task_subset:
2514 return {NodeKey(NodeType.TASK, label) for label in task_subset}
2515 case _: # pragma: no cover
2516 raise AssertionError("Identifier type inconsistent with grammar.")
2517 case expressions.DirectionNode(operator=operator, start=start):
2518 match self._select_identifier(start.qualifier, start.label):
2519 case NodeKey(node_type=NodeType.TASK) as task_key:
2520 if operator.startswith("<"):
2521 return self._select_task_ancestors(
2522 task_key, task_xgraph, inclusive=operator.endswith("=")
2523 )
2524 else:
2525 assert operator.startswith(">"), "Guaranteed by grammar."
2526 return self._select_task_descendants(
2527 task_key, task_xgraph, inclusive=operator.endswith("=")
2528 )
2529 case NodeKey(node_type=NodeType.DATASET_TYPE) as dataset_type_key:
2530 if operator.startswith("<"):
2531 return self._select_dataset_type_ancestors(
2532 dataset_type_key, task_xgraph, inclusive=operator.endswith("=")
2533 )
2534 else:
2535 assert operator.startswith(">"), "Guaranteed by grammar."
2536 return self._select_dataset_type_descendants(
2537 dataset_type_key, task_xgraph, inclusive=operator.endswith("=")
2538 )
2539 case TaskSubset():
2540 raise InvalidExpressionError(
2541 f"Task subset identifier {start!r} cannot be used as the start of an "
2542 "ancestor/descendant search."
2543 )
2544 case _: # pragma: no cover
2545 raise AssertionError("Unexpected parsed identifier result type.")
2546 case expressions.NotNode(operand=operand):
2547 operand_result = self._select_expression(operand, task_xgraph)
2548 return set(task_xgraph.nodes.keys() - operand_result)
2549 case expressions.UnionNode(lhs=lhs, rhs=rhs):
2550 lhs_result = self._select_expression(lhs, task_xgraph)
2551 rhs_result = self._select_expression(rhs, task_xgraph)
2552 return lhs_result.union(rhs_result)
2553 case expressions.IntersectionNode(lhs=lhs, rhs=rhs):
2554 lhs_result = self._select_expression(lhs, task_xgraph)
2555 rhs_result = self._select_expression(rhs, task_xgraph)
2556 return lhs_result.intersection(rhs_result)
2557 case _: # pragma: no cover
2558 raise AssertionError("Expression parse node inconsistent with grammar.")
2560 def _select_task_ancestors(
2561 self, start: NodeKey, task_xgraph: networkx.DiGraph, inclusive: bool
2562 ) -> set[NodeKey]:
2563 """Return all task-node ancestors of the given task node, as defined by
2564 the `select` expression language.
2566 Parameters
2567 ----------
2568 start : `NodeKey`
2569 A runtime task node key.
2570 task_xgraph : `networkx.DiGraph`
2571 NetworkX graph of all tasks (runtime nodes only) in the pipeline.
2572 inclusive : `bool`
2573 Whether to include the ``start`` node in the results.
2575 Returns
2576 -------
2577 selected : `set` [ `NodeKey` ]
2578 Set of `NodeKey` objects for matching tasks (only; no dataset type
2579 or task-init nodes).
2580 """
2581 result = set(networkx.dag.ancestors(task_xgraph, start))
2582 if inclusive:
2583 result.add(start)
2584 return result
2586 def _select_task_descendants(
2587 self, start: NodeKey, task_xgraph: networkx.DiGraph, inclusive: bool
2588 ) -> set[NodeKey]:
2589 """Return all task-node descendants of the given task node, as defined
2590 by the `select` expression language.
2592 Parameters
2593 ----------
2594 start : `NodeKey`
2595 A runtime task node key.
2596 task_xgraph : `networkx.DiGraph`
2597 NetworkX graph of all tasks (runtime nodes only) in the pipeline.
2598 inclusive : `bool`
2599 Whether to include the ``start`` node in the results.
2601 Returns
2602 -------
2603 selected : `set` [ `NodeKey` ]
2604 Set of `NodeKey` objects for matching tasks (only; no dataset type
2605 or task-init nodes).
2606 """
2607 result = set(networkx.dag.descendants(task_xgraph, start))
2608 if inclusive:
2609 result.add(start)
2610 return result
2612 def _select_dataset_type_ancestors(
2613 self, start: NodeKey, task_xgraph: networkx.DiGraph, inclusive: bool
2614 ) -> set[NodeKey]:
2615 """Return all task-node ancestors of the given dataset type node, as
2616 defined by the `select` expression language.
2618 Parameters
2619 ----------
2620 start : `NodeKey`
2621 A dataset type node key. May not be an init-output.
2622 task_xgraph : `networkx.DiGraph`
2623 NetworkX graph of all tasks (runtime nodes only) in the pipeline.
2624 inclusive : `bool`
2625 Whether to include the producer of the ``start`` node in the
2626 results.
2628 Returns
2629 -------
2630 selected : `set` [ `NodeKey` ]
2631 Set of `NodeKey` objects for matching tasks (only; no dataset type
2632 or task-init nodes).
2633 """
2634 result: set[NodeKey] = set()
2635 for producer_key, _ in self._xgraph.in_edges(start):
2636 if producer_key.node_type is NodeType.TASK_INIT:
2637 raise InvalidExpressionError(
2638 f"Init-output dataset type {start.name!r} cannot be used as the "
2639 "starting point for an ancestor ('<' or '<=') search."
2640 )
2641 result.update(networkx.dag.ancestors(task_xgraph, producer_key))
2642 if inclusive:
2643 result.add(producer_key)
2644 return result
2646 def _select_dataset_type_descendants(
2647 self, start: NodeKey, task_xgraph: networkx.DiGraph, inclusive: bool
2648 ) -> set[NodeKey]:
2649 """Return all task-node descendatns of the given dataset type node, as
2650 defined by the `select` expression language.
2652 Parameters
2653 ----------
2654 start : `NodeKey`
2655 A dataset type node key. May not be an init-output if
2656 ``inclusive=True``.
2657 task_xgraph : `networkx.DiGraph`
2658 NetworkX graph of all tasks (runtime nodes only) in the pipeline.
2659 inclusive : `bool`
2660 Whether to include the producer of the ``start`` node in the
2661 results.
2663 Returns
2664 -------
2665 selected : `set` [ `NodeKey` ]
2666 Set of `NodeKey` objects for matching tasks (only; no dataset type
2667 or task-init nodes).
2668 """
2669 result: set[NodeKey] = set()
2670 if inclusive:
2671 for producer_key, _ in self._xgraph.in_edges(start):
2672 if producer_key.node_type is NodeType.TASK_INIT:
2673 raise InvalidExpressionError(
2674 f"Init-output dataset type {start.name!r} cannot be used as the "
2675 "starting point for an includsive descendant ('>=') search."
2676 )
2677 result.add(producer_key)
2678 # We also include tasks that consume a dataset type as an init-input,
2679 # since that can affect their runtime behavior.
2680 consumer_keys: set[NodeKey] = {
2681 (
2682 consumer_key
2683 if consumer_key.node_type is NodeType.TASK
2684 else NodeKey(NodeType.TASK, consumer_key.name)
2685 )
2686 for _, consumer_key in self._xgraph.out_edges(start)
2687 }
2688 for consumer_key in consumer_keys:
2689 result.add(consumer_key)
2690 result.update(networkx.dag.descendants(task_xgraph, consumer_key))
2691 return result
2693 def _select_identifier(
2694 self, qualifier: Literal["T", "D", "S"] | None, label: str
2695 ) -> NodeKey | TaskSubset:
2696 """Return the node key or task subset that corresponds to a `select`
2697 expression identifier.
2699 Parameters
2700 ----------
2701 qualifier : `str` or `None`
2702 Task, dataset type, or task subset qualifier included in the
2703 identifier, if any.
2704 label : `str`
2705 Task label, dataset type name, or task subset label.
2707 Returns
2708 -------
2709 key_or_subset : `NodeKey` or `TaskSubset`
2710 A `NodeKey` for a task or dataset type, or a `TaskSubset` for a
2711 task subset.
2712 """
2713 match qualifier:
2714 case None:
2715 task_key = NodeKey(NodeType.TASK, label)
2716 dataset_type_key = NodeKey(NodeType.DATASET_TYPE, label)
2717 if task_key in self._xgraph.nodes:
2718 if dataset_type_key in self._xgraph.nodes:
2719 raise InvalidExpressionError(
2720 f"{label!r} is both a task label and a dataset type name; "
2721 "prefix with 'T:' or 'D:' (respectively) to specify which."
2722 )
2723 assert label not in self._task_subsets, "Should be prohibited at construction."
2724 return task_key
2725 elif dataset_type_key in self._xgraph.nodes:
2726 if label in self._task_subsets:
2727 raise InvalidExpressionError(
2728 f"{label!r} is both a subset label and a dataset type name; "
2729 "prefix with 'S:' or 'D:' (respectively) to specify which."
2730 )
2731 return dataset_type_key
2732 elif label in self._task_subsets:
2733 return self._task_subsets[label]
2734 else:
2735 raise InvalidExpressionError(
2736 f"{label!r} is not a task label, task subset label, or dataset type name."
2737 )
2738 case "T":
2739 task_key = NodeKey(NodeType.TASK, label)
2740 if task_key not in self._xgraph.nodes:
2741 raise InvalidExpressionError(f"Task with label {label!r} does not exist.")
2742 return task_key
2743 case "D":
2744 dataset_type_key = NodeKey(NodeType.DATASET_TYPE, label)
2745 if dataset_type_key not in self._xgraph.nodes:
2746 raise InvalidExpressionError(f"Dataset type with name {label!r} does not exist.")
2747 return dataset_type_key
2748 case "S":
2749 try:
2750 return self._task_subsets[label]
2751 except KeyError:
2752 raise InvalidExpressionError(f"Task subset with label {label!r} does not exist.")
2753 case _: # pragma: no cover
2754 raise AssertionError("Unexpected identifier qualifier in expression.")
2756 _xgraph: networkx.MultiDiGraph
2757 _sorted_keys: Sequence[NodeKey] | None
2758 _task_subsets: dict[str, TaskSubset]
2759 _step_definitions: StepDefinitions
2760 _description: str
2761 _tasks: TaskMappingView
2762 _dataset_types: DatasetTypeMappingView
2763 _raw_data_id: dict[str, Any]
2764 _universe: DimensionUniverse | None
2767def log_config_mismatch(msg: str) -> None:
2768 """Log messages about configuration mismatch.
2770 Parameters
2771 ----------
2772 msg : `str`
2773 Log message to use.
2774 """
2775 _LOG.fatal("Comparing configuration: %s", msg)
2778def compare_packages(packages: Packages, new_packages: Packages) -> bool:
2779 """Compare two versions of Packages.
2781 Parameters
2782 ----------
2783 packages : `Packages`
2784 Previously recorded package versions. Updated in place to include
2785 any new packages that weren't present before.
2786 new_packages : `Packages`
2787 New set of package versions.
2789 Returns
2790 -------
2791 updated : `bool`
2792 `True` if ``packages`` was updated, `False` if not.
2794 Raises
2795 ------
2796 ConflictingDefinitionError
2797 Raised if versions are inconsistent.
2798 """
2799 diff = new_packages.difference(packages)
2800 if diff:
2801 versions_str = "; ".join(f"{pkg}: {diff[pkg][1]} vs {diff[pkg][0]}" for pkg in diff)
2802 raise ConflictingDefinitionError(f"Package versions mismatch: ({versions_str})")
2803 else:
2804 _LOG.debug("new packages are consistent with old")
2805 # Update the old set of packages in case we have more packages
2806 # that haven't been persisted.
2807 extra = new_packages.extra(packages)
2808 if extra:
2809 _LOG.debug("extra packages: %s", extra)
2810 packages.update(new_packages)
2811 return True
2812 return False
2815def _dimensions_compatible(dimensions: DimensionGroup, object_dimensions: DimensionGroup) -> bool:
2816 if dimensions.issubset(object_dimensions):
2817 # Easy typical case.
2818 return True
2819 # Hard case: if any sharding dimensions that are not in the object
2820 # dimensions are related to something in the object dimensions by a
2821 # many-to-many join table, this is okay too. The main use case here is to
2822 # let {exposure, [detector]} satisfy sharding dimensions of
2823 # {visit, [detector]}.
2824 universe = dimensions.universe
2825 unmatched_sharding = dimensions.required - object_dimensions.required
2826 unmatched_object = object_dimensions.required - dimensions.required
2827 unmatched_union_group = dimensions.union(object_dimensions)
2828 for element_name in unmatched_union_group.elements:
2829 if (
2830 (element := universe[element_name]).defines_relationships
2831 and not element.dimensions.isdisjoint(unmatched_sharding)
2832 and not element.dimensions.isdisjoint(unmatched_object)
2833 ):
2834 return True
2835 return False