Coverage for python / lsst / pipe / base / pipeline_graph / _task_subsets.py: 36%
117 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:20 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:20 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
27from __future__ import annotations
29__all__ = ("StepDefinitions", "TaskSubset")
31from collections.abc import Iterable, Iterator, MutableSet
32from contextlib import contextmanager
34import networkx
35import networkx.algorithms.boundary
37from lsst.daf.butler import DimensionGroup, DimensionUniverse
39from ._exceptions import InvalidStepsError, PipelineGraphError, UnresolvedGraphError
40from ._nodes import NodeKey, NodeType
43class TaskSubset(MutableSet[str]):
44 """A specialized set that represents a labeled subset of the tasks in a
45 pipeline graph.
47 Instances of this class should never be constructed directly; they should
48 only be accessed via the `PipelineGraph.task_subsets` attribute and created
49 by the `PipelineGraph.add_task_subset` method.
51 Parameters
52 ----------
53 parent_xgraph : `networkx.DiGraph`
54 Parent networkx graph that this subgraph is part of.
55 label : `str`
56 Label associated with this subset of the pipeline.
57 members : `set` [ `str` ]
58 Labels of the tasks that are members of this subset.
59 description : `str`, optional
60 Description string associated with this labeled subset.
61 step_definitions : `StepDefinitions`
62 Information about special 'step' subsets that partition the pipeline.
64 Notes
65 -----
66 Iteration order is arbitrary, even when the parent pipeline graph is
67 ordered (there is no guarantee that an ordering of the tasks in the graph
68 implies a consistent ordering of subsets).
69 """
71 def __init__(
72 self,
73 parent_xgraph: networkx.DiGraph,
74 label: str,
75 members: set[str],
76 description: str,
77 step_definitions: StepDefinitions,
78 ):
79 self._parent_xgraph = parent_xgraph
80 self._label = label
81 self._members = members
82 self._description = description
83 self._step_definitions = step_definitions
85 @property
86 def label(self) -> str:
87 """Label associated with this subset of the pipeline."""
88 return self._label
90 @property
91 def description(self) -> str:
92 """Description string associated with this labeled subset."""
93 return self._description
95 @description.setter
96 def description(self, value: str) -> None:
97 # Docstring in getter.
98 self._description = value
100 @property
101 def is_step(self) -> bool:
102 """Whether this subset is a step."""
103 return self.label in self._step_definitions
105 @property
106 def dimensions(self) -> DimensionGroup:
107 """The dimensions that can be used to split up this subset's quanta
108 into independent groups.
110 This is only available if `is_step` is `True` and only if the pipeline
111 graph has been resolved.
112 """
113 return self._step_definitions.get_dimensions(self.label)
115 @dimensions.setter
116 def dimensions(self, dimensions: Iterable[str] | DimensionGroup) -> None:
117 # Docstring in getter.
118 self._step_definitions.set_dimensions(self.label, dimensions)
120 def __repr__(self) -> str:
121 return f"{self.label}: {self.description!r}, tasks={{{', '.join(iter(self))}}}"
123 def __contains__(self, key: object) -> bool:
124 return key in self._members
126 def __len__(self) -> int:
127 return len(self._members)
129 def __iter__(self) -> Iterator[str]:
130 return iter(self._members)
132 def add(self, value: str) -> None:
133 """Add a new task to this subset.
135 Parameters
136 ----------
137 value : `str`
138 Label for the task. Must already be present in the parent pipeline
139 graph.
140 """
141 key = NodeKey(NodeType.TASK, value)
142 if key not in self._parent_xgraph:
143 raise PipelineGraphError(f"{value!r} is not a task in the parent pipeline.")
144 with self._step_definitions._unverified_on_success():
145 self._members.add(key.name)
147 def discard(self, value: str) -> None:
148 """Remove a task from the subset if it is present.
150 Parameters
151 ----------
152 value : `str`
153 Label for the task. Must already be present in the parent pipeline
154 graph.
155 """
156 with self._step_definitions._unverified_on_success():
157 self._members.discard(value)
159 @classmethod
160 def _from_iterable[S](cls, iterable: Iterable[S]) -> set[S]:
161 # This is the hook used by collections.abc.Set when implementing
162 # operators that return new sets. In this case, we want those to be
163 # regular `set` (builtin) objects, not `TaskSubset` instances.
164 return set(iterable)
167class StepDefinitions:
168 """A collection of the 'steps' defined in a pipeline graph.
170 Steps are special task subsets that must be executed separately. They may
171 also be associated with "sharding dimensions", which are the dimensions of
172 data IDs that are independent within the step: splitting up a quantum graph
173 along a step's sharding dimensions produces groups that can be safely
174 executed independently.
176 Parameters
177 ----------
178 universe : `lsst.daf.butler.DimensionUniverse`, optional
179 Definitions for data dimensions.
180 dimensions_by_label : `dict` [ `str`, `frozenset` [ `str` ] ], optional
181 Sharding dimensions for step subsets, as dictionary with task labels as
182 keys and sets of dimension names as values.
183 verified : `bool`, optional
184 Whether the step definitions have been checked since the last time
185 they or some other relevant aspect of the pipeline graph was changed.
187 Notes
188 -----
189 This class only models `collections.abc.Collection` (it is iterable, sized,
190 and can be used with ``in`` tests on label names), but it also supports
191 `append`, `remove`, and `reset` for modifications.
192 """
194 def __init__(
195 self,
196 universe: DimensionUniverse | None = None,
197 dimensions_by_label: dict[str, frozenset[str]] | None = None,
198 verified: bool = False,
199 ):
200 self._universe = universe
201 self._dimensions_by_label = dimensions_by_label if dimensions_by_label is not None else {}
202 self._verified = verified
204 @property
205 def verified(self) -> bool:
206 """Whether the step definitions have been checked since the last time
207 they or some other relevant aspect of the pipeline graph was changed.
209 This is always `True` if there are no step definitions.
210 """
211 # If there are no steps, the step definitions are still verified.
212 return self._verified or not self._dimensions_by_label
214 def __contains__(self, label: object) -> bool:
215 return label in self._dimensions_by_label
217 def __iter__(self) -> Iterator[str]:
218 return iter(self._dimensions_by_label)
220 def __len__(self) -> int:
221 return len(self._dimensions_by_label)
223 def __repr__(self) -> str:
224 return str(list(self._dimensions_by_label))
226 def __eq__(self, other: object) -> bool:
227 if isinstance(other, StepDefinitions):
228 return self._dimensions_by_label == other._dimensions_by_label
229 return NotImplemented
231 def copy(self) -> StepDefinitions:
232 """Create a new instance that does not share any mutable state with
233 this one.
234 """
235 return StepDefinitions(
236 universe=self._universe,
237 dimensions_by_label=self._dimensions_by_label.copy(),
238 verified=self._verified,
239 )
241 def append(self, label: str, dimensions: Iterable[str] | DimensionGroup = ()) -> None:
242 """Append a new step.
244 Parameters
245 ----------
246 label : `str`
247 Task subset label for the new step.
248 dimensions : `~collections.abc.Iterable` [ `str` ] or \
249 `~lsst.daf.butler.DimensionGroup`, optional
250 Dimensions that can be used to split up the step's quanta
251 into independent groups.
252 """
253 if self._universe is not None:
254 dimensions = self._universe.conform(dimensions)
255 if isinstance(dimensions, DimensionGroup):
256 dimensions = frozenset(dimensions.names)
257 else:
258 dimensions = frozenset(dimensions)
259 with self._unverified_on_success():
260 self._dimensions_by_label[label] = dimensions
262 def remove(self, label: str) -> None:
263 """Remove a named step.
265 Parameters
266 ----------
267 label : `str`
268 Task subset label to remove from the list of steps.
270 Notes
271 -----
272 This does not remove the task subset itself; it just "demotes" it to a
273 non-step subset.
274 """
275 with self._unverified_on_success():
276 del self._dimensions_by_label[label]
278 def assign(self, labels: Iterable[str]) -> None:
279 """Set all step definitions to the given labels.
281 Parameters
282 ----------
283 labels : `~collections.abc.Iterable` [ `str` ]
284 Subset labels to use as the new steps.
286 Notes
287 -----
288 Sharding dimensions are preserved for any label that was previously a
289 step. If ``labels`` is a `StepDefinitions`` instance, sharding
290 dimensions from that instance will be used.
291 """
292 if isinstance(labels, StepDefinitions):
293 with self._unverified_on_success():
294 self._dimensions_by_label = labels._dimensions_by_label.copy()
295 else:
296 with self._unverified_on_success():
297 self._dimensions_by_label = {
298 label: self._dimensions_by_label.get(label, frozenset()) for label in labels
299 }
301 def clear(self) -> None:
302 """Remove all step definitions."""
303 self.assign(())
305 def get_dimensions(self, label: str) -> DimensionGroup:
306 """Return the dimensions that can be used to split up a step's quanta
307 into independent groups.
309 Parameters
310 ----------
311 label : `str`
312 Label for the step.
314 Returns
315 -------
316 dimensions : `lsst.daf.butler.DimensionGroup`
317 Dimensions that can be used to split up this step's quanta.
318 """
319 try:
320 raw_dimensions = self._dimensions_by_label[label]
321 except KeyError:
322 raise InvalidStepsError(f"Task subset {label!r} is not a step.") from None
323 if self._universe is not None:
324 return self._universe.conform(raw_dimensions)
325 else:
326 raise UnresolvedGraphError("Step sharding dimensions have not been resolved.")
328 def set_dimensions(self, label: str, dimensions: Iterable[str] | DimensionGroup) -> None:
329 """Set the dimensions that can be used to split up a step's quanta
330 into independent groups.
332 Parameters
333 ----------
334 label : `str`
335 Label for the step.
336 dimensions : `lsst.daf.butler.DimensionGroup`
337 Dimensions that can be used to split up this step's quanta.
338 """
339 if label not in self._dimensions_by_label:
340 raise PipelineGraphError(f"Subset {label!r} is not a step.")
341 if self._universe is not None:
342 dimensions = self._universe.conform(dimensions)
343 if isinstance(dimensions, DimensionGroup):
344 dimensions = frozenset(dimensions.names)
345 else:
346 dimensions = frozenset(dimensions)
347 with self._unverified_on_success():
348 self._dimensions_by_label[label] = dimensions
350 @contextmanager
351 def _unverified_on_success(self) -> Iterator[None]:
352 """Return the a context manager that marks the step definitions as
353 unverified if it is exited without an exception.
355 This should be used only for exception-safe modifications for which an
356 exception means no changes were made (and hence the verified state can
357 remain unchanged as well).
358 """
359 yield
360 self._verified = False