Coverage for python / lsst / pipe / base / exec_fixup_data_id.py: 41%
26 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 08:44 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 08:44 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28__all__ = ["ExecutionGraphFixup"]
30import itertools
31import uuid
32from collections import defaultdict
33from collections.abc import Mapping, Sequence
35import networkx as nx
37from lsst.daf.butler import DataCoordinate, DataIdValue
39from .execution_graph_fixup import ExecutionGraphFixup
40from .graph import QuantumGraph
43class ExecFixupDataId(ExecutionGraphFixup):
44 """Implementation of ExecutionGraphFixup for ordering of tasks based
45 on DataId values.
47 This class is a trivial implementation mostly useful as an example,
48 though it can be used to make actual fixup instances by defining
49 a method that instantiates it, e.g.::
51 # lsst/ap/verify/ci_fixup.py
53 from lsst.pipe.base.exec_fixup_data_id import ExecFixupDataId
56 def assoc_fixup():
57 return ExecFixupDataId(
58 taskLabel="ap_assoc", dimensions=("visit", "detector")
59 )
61 and then executing pipetask::
63 pipetask run --graph-fixup=lsst.ap.verify.ci_fixup.assoc_fixup ...
65 This will add new dependencies between quanta executed by the task with
66 label "ap_assoc". Quanta with higher visit number will depend on quanta
67 with lower visit number and their execution will wait until lower visit
68 number finishes.
70 Parameters
71 ----------
72 taskLabel : `str`
73 The label of the task for which to add dependencies.
74 dimensions : `str` or sequence [`str`]
75 One or more dimension names, quanta execution will be ordered
76 according to values of these dimensions.
77 reverse : `bool`, optional
78 If `False` (default) then quanta with higher values of dimensions
79 will be executed after quanta with lower values, otherwise the order
80 is reversed.
81 """
83 def __init__(self, taskLabel: str, dimensions: str | Sequence[str], reverse: bool = False):
84 self.taskLabel = taskLabel
85 self.dimensions = dimensions
86 self.reverse = reverse
87 if isinstance(self.dimensions, str):
88 self.dimensions = (self.dimensions,)
89 else:
90 self.dimensions = tuple(self.dimensions)
92 def fixupQuanta(self, graph: QuantumGraph) -> QuantumGraph:
93 raise NotImplementedError()
95 def fixup_graph(
96 self, xgraph: nx.DiGraph, quanta_by_task: Mapping[str, Mapping[DataCoordinate, uuid.UUID]]
97 ) -> None:
98 quanta_by_sort_key: defaultdict[tuple[DataIdValue, ...], list[uuid.UUID]] = defaultdict(list)
99 for data_id, quantum_id in quanta_by_task[self.taskLabel].items():
100 key = tuple(data_id[dim] for dim in self.dimensions)
101 quanta_by_sort_key[key].append(quantum_id)
102 sorted_keys = sorted(quanta_by_sort_key.keys(), reverse=self.reverse)
103 for prev_key, key in itertools.pairwise(sorted_keys):
104 xgraph.add_edges_from(itertools.product(quanta_by_sort_key[prev_key], quanta_by_sort_key[key]))