Coverage for python / lsst / pipe / base / pipeline.py: 21%
285 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-24 08:19 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-24 08:19 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28"""Module defining Pipeline class and related methods."""
30from __future__ import annotations
32__all__ = ["LabelSpecifier", "Pipeline", "TaskDef"]
34import copy
35import logging
36import re
37import urllib.parse
39# -------------------------------
40# Imports of standard modules --
41# -------------------------------
42from collections.abc import Callable, Set
43from dataclasses import dataclass
44from types import MappingProxyType
45from typing import TYPE_CHECKING, cast
47# -----------------------------
48# Imports for other modules --
49# -----------------------------
50from lsst.daf.butler import DataCoordinate, DimensionUniverse, Registry
51from lsst.resources import ResourcePath, ResourcePathExpression
52from lsst.utils import doImportType
53from lsst.utils.introspection import get_full_type_name
55from . import automatic_connection_constants as acc
56from . import pipeline_graph, pipelineIR
57from ._instrument import Instrument as Instrument
58from .config import PipelineTaskConfig
59from .connections import PipelineTaskConnections
60from .pipelineTask import PipelineTask
62if TYPE_CHECKING: # Imports needed only for type annotations; may be circular.
63 from lsst.pex.config import Config
65# ----------------------------------
66# Local non-exported definitions --
67# ----------------------------------
69_LOG = logging.getLogger(__name__)
71# ------------------------
72# Exported definitions --
73# ------------------------
76@dataclass
77class LabelSpecifier:
78 """A structure to specify a subset of labels to load.
80 This structure may contain a set of labels to be used in subsetting a
81 pipeline, or a beginning and end point. Beginning or end may be empty, in
82 which case the range will be a half open interval. Unlike python iteration
83 bounds, end bounds are *INCLUDED*.
85 There are multiple potential definitions of range-based slicing for graphs
86 that are not a simple linear sequence. The definition used here is the
87 intersection of the tasks downstream of ``begin`` and the tasks upstream of
88 ``end``, i.e. tasks with no dependency relationship to a bounding task are
89 not included.
90 """
92 labels: set[str] | None = None
93 begin: str | None = None
94 end: str | None = None
96 def __post_init__(self) -> None:
97 if self.labels is not None and (self.begin or self.end):
98 raise ValueError(
99 "This struct can only be initialized with a labels set or a begin (and/or) end specifier"
100 )
103class TaskDef:
104 """TaskDef is a collection of information about task needed by Pipeline.
106 The information includes task name, configuration object and optional
107 task class. This class is just a collection of attributes and it exposes
108 all of them so that attributes could potentially be modified in place
109 (e.g. if configuration needs extra overrides).
111 Parameters
112 ----------
113 taskName : `str`, optional
114 The fully-qualified `PipelineTask` class name. If not provided,
115 ``taskClass`` must be.
116 config : `lsst.pipe.base.config.PipelineTaskConfig`, optional
117 Instance of the configuration class corresponding to this task class,
118 usually with all overrides applied. This config will be frozen. If
119 not provided, ``taskClass`` must be provided and
120 ``taskClass.ConfigClass()`` will be used.
121 taskClass : `type`, optional
122 `PipelineTask` class object; if provided and ``taskName`` is as well,
123 the caller guarantees that they are consistent. If not provided,
124 ``taskName`` is used to import the type.
125 label : `str`, optional
126 Task label, usually a short string unique in a pipeline. If not
127 provided, ``taskClass`` must be, and ``taskClass._DefaultName`` will
128 be used.
129 connections : `PipelineTaskConnections`, optional
130 Object that describes the dataset types used by the task. If not
131 provided, one will be constructed from the given configuration. If
132 provided, it is assumed that ``config`` has already been validated
133 and frozen.
134 """
136 def __init__(
137 self,
138 taskName: str | None = None,
139 config: PipelineTaskConfig | None = None,
140 taskClass: type[PipelineTask] | None = None,
141 label: str | None = None,
142 connections: PipelineTaskConnections | None = None,
143 ):
144 if taskName is None:
145 if taskClass is None:
146 raise ValueError("At least one of `taskName` and `taskClass` must be provided.")
147 taskName = get_full_type_name(taskClass)
148 elif taskClass is None:
149 taskClass = doImportType(taskName)
150 if config is None:
151 if taskClass is None:
152 raise ValueError("`taskClass` must be provided if `config` is not.")
153 config = taskClass.ConfigClass()
154 if label is None:
155 if taskClass is None:
156 raise ValueError("`taskClass` must be provided if `label` is not.")
157 label = taskClass._DefaultName
158 self.taskName = taskName
159 if connections is None:
160 # If we don't have connections yet, assume the config hasn't been
161 # validated yet.
162 try:
163 config.validate()
164 except Exception:
165 _LOG.error("Configuration validation failed for task %s (%s)", label, taskName)
166 raise
167 config.freeze()
168 connections = config.connections.ConnectionsClass(config=config)
169 self.config = config
170 self.taskClass = taskClass
171 self.label = label
172 self.connections = connections
174 @property
175 def configDatasetName(self) -> str:
176 """Name of a dataset type for configuration of this task (`str`)."""
177 return acc.CONFIG_INIT_OUTPUT_TEMPLATE.format(label=self.label)
179 @property
180 def metadataDatasetName(self) -> str:
181 """Name of a dataset type for metadata of this task (`str`)."""
182 return self.makeMetadataDatasetName(self.label)
184 @classmethod
185 def makeMetadataDatasetName(cls, label: str) -> str:
186 """Construct the name of the dataset type for metadata for a task.
188 Parameters
189 ----------
190 label : `str`
191 Label for the task within its pipeline.
193 Returns
194 -------
195 name : `str`
196 Name of the task's metadata dataset type.
197 """
198 return acc.METADATA_OUTPUT_TEMPLATE.format(label=label)
200 @property
201 def logOutputDatasetName(self) -> str | None:
202 """Name of a dataset type for log output from this task, `None` if
203 logs are not to be saved (`str`).
204 """
205 if self.config.saveLogOutput:
206 return acc.LOG_OUTPUT_TEMPLATE.format(label=self.label)
207 else:
208 return None
210 def __str__(self) -> str:
211 rep = "TaskDef(" + self.taskName
212 if self.label:
213 rep += ", label=" + self.label
214 rep += ")"
215 return rep
217 def __eq__(self, other: object) -> bool:
218 if not isinstance(other, TaskDef):
219 return False
220 # This does not consider equality of configs when determining equality
221 # as config equality is a difficult thing to define. Should be updated
222 # after DM-27847
223 return self.taskClass == other.taskClass and self.label == other.label
225 def __hash__(self) -> int:
226 return hash((self.taskClass, self.label))
228 @classmethod
229 def _unreduce(cls, taskName: str, config: PipelineTaskConfig, label: str) -> TaskDef:
230 """Unpickle pickle. Custom callable for unpickling.
232 All arguments are forwarded directly to the constructor; this
233 trampoline is only needed because ``__reduce__`` callables can't be
234 called with keyword arguments.
235 """
236 return cls(taskName=taskName, config=config, label=label)
238 def __reduce__(self) -> tuple[Callable[[str, PipelineTaskConfig, str], TaskDef], tuple[str, Config, str]]:
239 return (self._unreduce, (self.taskName, self.config, self.label))
242class Pipeline:
243 """A `Pipeline` is a representation of a series of tasks to run, and the
244 configuration for those tasks.
246 Parameters
247 ----------
248 description : `str`
249 A description of that this pipeline does.
250 """
252 PipelineSubsetCtrl = pipelineIR.PipelineSubsetCtrl
254 def __init__(self, description: str):
255 pipeline_dict = {"description": description, "tasks": {}}
256 self._pipelineIR = pipelineIR.PipelineIR(pipeline_dict)
258 @classmethod
259 def fromFile(cls, filename: str) -> Pipeline:
260 """Load a pipeline defined in a pipeline yaml file.
262 Parameters
263 ----------
264 filename : `str`
265 A path that points to a pipeline defined in yaml format. This
266 filename may also supply additional labels to be used in
267 subsetting the loaded Pipeline. These labels are separated from
268 the path by a ``#``, and may be specified as a comma separated
269 list, or a range denoted as beginning..end. Beginning or end may
270 be empty, in which case the range will be a half open interval.
271 Unlike python iteration bounds, end bounds are *INCLUDED*. Note
272 that range based selection is not well defined for pipelines that
273 are not linear in nature, and correct behavior is not guaranteed,
274 or may vary from run to run.
276 Returns
277 -------
278 pipeline: `Pipeline`
279 The pipeline loaded from specified location with appropriate (if
280 any) subsetting.
282 Notes
283 -----
284 This method attempts to prune any contracts that contain labels which
285 are not in the declared subset of labels. This pruning is done using a
286 string based matching due to the nature of contracts and may prune more
287 than it should.
288 """
289 return cls.from_uri(filename)
291 @classmethod
292 def from_uri(cls, uri: ResourcePathExpression) -> Pipeline:
293 """Load a pipeline defined in a pipeline yaml file at a location
294 specified by a URI.
296 Parameters
297 ----------
298 uri : convertible to `~lsst.resources.ResourcePath`
299 If a string is supplied this should be a URI path that points to a
300 pipeline defined in yaml format, either as a direct path to the
301 yaml file, or as a directory containing a ``pipeline.yaml`` file
302 the form used by `write_to_uri` with ``expand=True``). This uri may
303 also supply additional labels to be used in subsetting the loaded
304 `Pipeline`. These labels are separated from the path by a ``#``,
305 and may be specified as a comma separated list, or a range denoted
306 as beginning..end. Beginning or end may be empty, in which case the
307 range will be a half open interval. Unlike python iteration bounds,
308 end bounds are *INCLUDED*. Note that range based selection is not
309 well defined for pipelines that are not linear in nature, and
310 correct behavior is not guaranteed, or may vary from run to run.
311 The same specifiers can be used with a
312 `~lsst.resources.ResourcePath` object, by being the sole contents
313 in the fragments attribute.
315 Returns
316 -------
317 pipeline : `Pipeline`
318 The pipeline loaded from specified location with appropriate (if
319 any) subsetting.
321 Notes
322 -----
323 This method attempts to prune any contracts that contain labels which
324 are not in the declared subset of labels. This pruning is done using a
325 string based matching due to the nature of contracts and may prune more
326 than it should.
327 """
328 # Split up the uri and any labels that were supplied
329 uri, label_specifier = cls._parse_file_specifier(uri)
330 pipeline: Pipeline = cls.fromIR(pipelineIR.PipelineIR.from_uri(uri))
332 # If there are labels supplied, only keep those
333 if label_specifier is not None:
334 pipeline = pipeline.subsetFromLabels(label_specifier)
335 return pipeline
337 def subsetFromLabels(
338 self,
339 labelSpecifier: LabelSpecifier,
340 subsetCtrl: pipelineIR.PipelineSubsetCtrl = PipelineSubsetCtrl.DROP,
341 ) -> Pipeline:
342 """Subset a pipeline to contain only labels specified in
343 ``labelSpecifier``.
345 Parameters
346 ----------
347 labelSpecifier : `labelSpecifier`
348 Object containing labels that describes how to subset a pipeline.
349 subsetCtrl : `PipelineSubsetCtrl`
350 Control object which decides how subsets with missing labels are
351 handled. Setting to `PipelineSubsetCtrl.DROP` (the default) will
352 cause any subsets that have labels which are not in the set of all
353 task labels to be dropped. Setting to `PipelineSubsetCtrl.EDIT`
354 will cause the subset to instead be edited to remove the
355 nonexistent label.
357 Returns
358 -------
359 pipeline : `Pipeline`
360 A new pipeline object that is a subset of the old pipeline.
362 Raises
363 ------
364 ValueError
365 Raised if there is an issue with specified labels
367 Notes
368 -----
369 This method attempts to prune any contracts that contain labels which
370 are not in the declared subset of labels. This pruning is done using a
371 string based matching due to the nature of contracts and may prune more
372 than it should.
373 """
374 # Labels supplied as a set
375 if labelSpecifier.labels:
376 labelSet = labelSpecifier.labels
377 # Labels supplied as a range, first create a list of all the labels
378 # in the pipeline sorted according to task dependency. Then only
379 # keep labels that lie between the supplied bounds
380 else:
381 # Create a copy of the pipeline to use when assessing the label
382 # ordering. Use a dict for fast searching while preserving order.
383 # Remove contracts so they do not fail in the expansion step. This
384 # is needed because a user may only configure the tasks they intend
385 # to run, which may cause some contracts to fail if they will later
386 # be dropped
387 pipeline = copy.deepcopy(self)
388 pipeline._pipelineIR.contracts = []
389 graph = pipeline.to_graph()
391 # Verify the bounds are in the labels
392 if labelSpecifier.begin is not None and labelSpecifier.begin not in graph.tasks:
393 raise ValueError(
394 f"Beginning of range subset, {labelSpecifier.begin}, not found in pipeline definition"
395 )
396 if labelSpecifier.end is not None and labelSpecifier.end not in graph.tasks:
397 raise ValueError(
398 f"End of range subset, {labelSpecifier.end}, not found in pipeline definition"
399 )
401 labelSet = set(graph.tasks.between(labelSpecifier.begin, labelSpecifier.end))
402 return Pipeline.fromIR(self._pipelineIR.subset_from_labels(labelSet, subsetCtrl))
404 @staticmethod
405 def _parse_file_specifier(uri: ResourcePathExpression) -> tuple[ResourcePath, LabelSpecifier | None]:
406 """Split appart a uri and any possible label subsets"""
407 if isinstance(uri, str):
408 # This is to support legacy pipelines during transition
409 uri, num_replace = re.subn("[:](?!\\/\\/)", "#", uri)
410 if num_replace:
411 raise ValueError(
412 f"The pipeline file {uri} seems to use the legacy :"
413 " to separate labels, please use # instead."
414 )
415 if uri.count("#") > 1:
416 raise ValueError("Only one set of labels is allowed when specifying a pipeline to load")
417 # Everything else can be converted directly to ResourcePath.
418 uri = ResourcePath(uri)
419 label_subset = uri.fragment or None
421 specifier: LabelSpecifier | None
422 if label_subset is not None:
423 label_subset = urllib.parse.unquote(label_subset)
424 args: dict[str, set[str] | str | None]
425 # labels supplied as a list
426 if "," in label_subset:
427 if ".." in label_subset:
428 raise ValueError(
429 "Can only specify a list of labels or a range when loading a Pipeline, not both."
430 )
431 args = {"labels": set(label_subset.split(","))}
432 # labels supplied as a range
433 elif ".." in label_subset:
434 # Try to de-structure the labelSubset, this will fail if more
435 # than one range is specified
436 begin, end, *rest = label_subset.split("..")
437 if rest:
438 raise ValueError("Only one range can be specified when loading a pipeline")
439 args = {"begin": begin if begin else None, "end": end if end else None}
440 # Assume anything else is a single label
441 else:
442 args = {"labels": {label_subset}}
444 # MyPy doesn't like how cavalier kwarg construction is with types.
445 specifier = LabelSpecifier(**args) # type: ignore
446 else:
447 specifier = None
449 return uri, specifier
451 @classmethod
452 def fromString(cls, pipeline_string: str) -> Pipeline:
453 """Create a pipeline from string formatted as a pipeline document.
455 Parameters
456 ----------
457 pipeline_string : `str`
458 A string that is formatted according like a pipeline document.
460 Returns
461 -------
462 pipeline: `Pipeline`
463 The new pipeline.
464 """
465 pipeline = cls.fromIR(pipelineIR.PipelineIR.from_string(pipeline_string))
466 return pipeline
468 @classmethod
469 def fromIR(cls, deserialized_pipeline: pipelineIR.PipelineIR) -> Pipeline:
470 """Create a pipeline from an already created `PipelineIR` object.
472 Parameters
473 ----------
474 deserialized_pipeline : `PipelineIR`
475 An already created pipeline intermediate representation object.
477 Returns
478 -------
479 pipeline: `Pipeline`
480 The new pipeline.
481 """
482 pipeline = cls.__new__(cls)
483 pipeline._pipelineIR = deserialized_pipeline
484 return pipeline
486 @classmethod
487 def fromPipeline(cls, pipeline: Pipeline) -> Pipeline:
488 """Create a new pipeline by copying an already existing `Pipeline`.
490 Parameters
491 ----------
492 pipeline : `Pipeline`
493 An already created pipeline intermediate representation object.
495 Returns
496 -------
497 pipeline: `Pipeline`
498 The new pipeline.
499 """
500 return cls.fromIR(copy.deepcopy(pipeline._pipelineIR))
502 def __str__(self) -> str:
503 return str(self._pipelineIR)
505 def mergePipeline(self, pipeline: Pipeline) -> None:
506 """Merge another in-memory `Pipeline` object into this one.
508 This merges another pipeline into this object, as if it were declared
509 in the import block of the yaml definition of this pipeline. This
510 modifies this pipeline in place.
512 Parameters
513 ----------
514 pipeline : `Pipeline`
515 The `Pipeline` object that is to be merged into this object.
516 """
517 self._pipelineIR.merge_pipelines((pipeline._pipelineIR,))
519 def addLabelToSubset(self, subset: str, label: str) -> None:
520 """Add a task label from the specified subset.
522 Parameters
523 ----------
524 subset : `str`
525 The labeled subset to modify.
526 label : `str`
527 The task label to add to the specified subset.
529 Raises
530 ------
531 ValueError
532 Raised if the specified subset does not exist within the pipeline.
533 Raised if the specified label does not exist within the pipeline.
534 """
535 if label not in self._pipelineIR.tasks:
536 raise ValueError(f"Label {label} does not appear within the pipeline")
537 if subset not in self._pipelineIR.labeled_subsets:
538 raise ValueError(f"Subset {subset} does not appear within the pipeline")
539 self._pipelineIR.labeled_subsets[subset].subset.add(label)
541 def removeLabelFromSubset(self, subset: str, label: str) -> None:
542 """Remove a task label from the specified subset.
544 Parameters
545 ----------
546 subset : `str`
547 The labeled subset to modify.
548 label : `str`
549 The task label to remove from the specified subset.
551 Raises
552 ------
553 ValueError
554 Raised if the specified subset does not exist in the pipeline.
555 Raised if the specified label does not exist within the specified
556 subset.
557 """
558 if subset not in self._pipelineIR.labeled_subsets:
559 raise ValueError(f"Subset {subset} does not appear within the pipeline")
560 if label not in self._pipelineIR.labeled_subsets[subset].subset:
561 raise ValueError(f"Label {label} does not appear within the pipeline")
562 self._pipelineIR.labeled_subsets[subset].subset.remove(label)
564 def findSubsetsWithLabel(self, label: str) -> set[str]:
565 """Find any subsets which may contain the specified label.
567 This function returns the name of subsets which return the specified
568 label. May return an empty set if there are no subsets, or no subsets
569 containing the specified label.
571 Parameters
572 ----------
573 label : `str`
574 The task label to use in membership check.
576 Returns
577 -------
578 subsets : `set` of `str`
579 Returns a set (possibly empty) of subsets names which contain the
580 specified label.
582 Raises
583 ------
584 ValueError
585 Raised if the specified label does not exist within this pipeline.
586 """
587 results = set()
588 if label not in self._pipelineIR.tasks:
589 raise ValueError(f"Label {label} does not appear within the pipeline")
590 for subset in self._pipelineIR.labeled_subsets.values():
591 if label in subset.subset:
592 results.add(subset.label)
593 return results
595 @property
596 def task_labels(self) -> Set[str]:
597 """Labels of all tasks in the pipelines.
599 For simple pipelines with no imports, iteration over this set will
600 match the order in which tasks are defined in the pipeline file. In
601 all other cases the order is unspecified but deterministic. It is not
602 dependency-ordered (use ``to_graph().tasks.keys()`` for that).
603 """
604 return self._pipelineIR.tasks.keys()
606 @property
607 def subsets(self) -> MappingProxyType[str, set]:
608 """Returns a `types.MappingProxyType` where the keys are the labels of
609 labeled subsets in the `Pipeline` and the values are the set of task
610 labels contained within that subset.
611 """
612 return MappingProxyType(
613 {label: subsetIr.subset for label, subsetIr in self._pipelineIR.labeled_subsets.items()}
614 )
616 def addLabeledSubset(self, label: str, description: str, taskLabels: set[str]) -> None:
617 """Add a new labeled subset to the `Pipeline`.
619 Parameters
620 ----------
621 label : `str`
622 The label to assign to the subset.
623 description : `str`
624 A description of what the subset is for.
625 taskLabels : `set` [`str`]
626 The set of task labels to be associated with the labeled subset.
628 Raises
629 ------
630 ValueError
631 Raised if label already exists in the `Pipeline`.
632 Raised if a task label is not found within the `Pipeline`.
633 """
634 if label in self._pipelineIR.labeled_subsets.keys():
635 raise ValueError(f"Subset label {label} is already found within the Pipeline")
636 if extra := (taskLabels - self._pipelineIR.tasks.keys()):
637 raise ValueError(f"Task labels {extra} were not found within the Pipeline")
638 self._pipelineIR.labeled_subsets[label] = pipelineIR.LabeledSubset(label, taskLabels, description)
640 def removeLabeledSubset(self, label: str) -> None:
641 """Remove a labeled subset from the `Pipeline`.
643 Parameters
644 ----------
645 label : `str`
646 The label of the subset to remove from the `Pipeline`.
648 Raises
649 ------
650 ValueError
651 Raised if the label is not found within the `Pipeline`.
652 """
653 if label not in self._pipelineIR.labeled_subsets.keys():
654 raise ValueError(f"Subset label {label} was not found in the pipeline")
655 self._pipelineIR.labeled_subsets.pop(label)
657 def addInstrument(self, instrument: Instrument | str) -> None:
658 """Add an instrument to the pipeline, or replace an instrument that is
659 already defined.
661 Parameters
662 ----------
663 instrument : `~lsst.daf.butler.instrument.Instrument` or `str`
664 Either a derived class object of a `lsst.daf.butler.instrument` or
665 a string corresponding to a fully qualified
666 `lsst.daf.butler.instrument` name.
667 """
668 if isinstance(instrument, str):
669 pass
670 else:
671 # TODO: assume that this is a subclass of Instrument, no type
672 # checking
673 instrument = get_full_type_name(instrument)
674 self._pipelineIR.instrument = instrument
676 def getInstrument(self) -> str | None:
677 """Get the instrument from the pipeline.
679 Returns
680 -------
681 instrument : `str`, or None
682 The fully qualified name of a `lsst.obs.base.Instrument` subclass,
683 name, or None if the pipeline does not have an instrument.
684 """
685 return self._pipelineIR.instrument
687 def get_data_id(self, universe: DimensionUniverse) -> DataCoordinate:
688 """Return a data ID with all dimension constraints embedded in the
689 pipeline.
691 Parameters
692 ----------
693 universe : `lsst.daf.butler.DimensionUniverse`
694 Object that defines all dimensions.
696 Returns
697 -------
698 data_id : `lsst.daf.butler.DataCoordinate`
699 Data ID with all dimension constraints embedded in the
700 pipeline.
701 """
702 instrument_class_name = self._pipelineIR.instrument
703 if instrument_class_name is not None:
704 instrument_class = cast(Instrument, doImportType(instrument_class_name))
705 if instrument_class is not None:
706 return DataCoordinate.standardize(instrument=instrument_class.getName(), universe=universe)
707 return DataCoordinate.make_empty(universe)
709 def addTask(self, task: type[PipelineTask] | str, label: str) -> None:
710 """Add a new task to the pipeline, or replace a task that is already
711 associated with the supplied label.
713 Parameters
714 ----------
715 task : `PipelineTask` or `str`
716 Either a derived class object of a `PipelineTask` or a string
717 corresponding to a fully qualified `PipelineTask` name.
718 label : `str`
719 A label that is used to identify the `PipelineTask` being added.
720 """
721 if isinstance(task, str):
722 taskName = task
723 elif issubclass(task, PipelineTask):
724 taskName = get_full_type_name(task)
725 else:
726 raise ValueError(
727 "task must be either a child class of PipelineTask or a string containing"
728 " a fully qualified name to one"
729 )
730 if not label:
731 # in some cases (with command line-generated pipeline) tasks can
732 # be defined without label which is not acceptable, use task
733 # _DefaultName in that case
734 if isinstance(task, str):
735 task_class = cast(PipelineTask, doImportType(task))
736 label = task_class._DefaultName
737 self._pipelineIR.tasks[label] = pipelineIR.TaskIR(label, taskName)
739 def removeTask(self, label: str) -> None:
740 """Remove a task from the pipeline.
742 Parameters
743 ----------
744 label : `str`
745 The label used to identify the task that is to be removed.
747 Raises
748 ------
749 KeyError
750 If no task with that label exists in the pipeline.
751 """
752 self._pipelineIR.tasks.pop(label)
754 def addConfigOverride(self, label: str, key: str, value: object) -> None:
755 """Apply single config override.
757 Parameters
758 ----------
759 label : `str`
760 Label of the task.
761 key : `str`
762 Fully-qualified field name.
763 value : object
764 Value to be given to a field.
765 """
766 self._addConfigImpl(label, pipelineIR.ConfigIR(rest={key: value}))
768 def addConfigFile(self, label: str, filename: str) -> None:
769 """Add overrides from a specified file.
771 Parameters
772 ----------
773 label : `str`
774 The label used to identify the task associated with config to
775 modify.
776 filename : `str`
777 Path to the override file.
778 """
779 self._addConfigImpl(label, pipelineIR.ConfigIR(file=[filename]))
781 def addConfigPython(self, label: str, pythonString: str) -> None:
782 """Add Overrides by running a snippet of python code against a config.
784 Parameters
785 ----------
786 label : `str`
787 The label used to identity the task associated with config to
788 modify.
789 pythonString : `str`
790 A string which is valid python code to be executed. This is done
791 with config as the only local accessible value.
792 """
793 self._addConfigImpl(label, pipelineIR.ConfigIR(python=pythonString))
795 def _addConfigImpl(self, label: str, newConfig: pipelineIR.ConfigIR) -> None:
796 if label == "parameters":
797 self._pipelineIR.parameters.mapping.update(newConfig.rest)
798 if newConfig.file:
799 raise ValueError("Setting parameters section with config file is not supported")
800 if newConfig.python:
801 raise ValueError("Setting parameters section using python block in unsupported")
802 return
803 if label not in self._pipelineIR.tasks:
804 raise LookupError(f"There are no tasks labeled '{label}' in the pipeline")
805 self._pipelineIR.tasks[label].add_or_update_config(newConfig)
807 def write_to_uri(self, uri: ResourcePathExpression) -> None:
808 """Write the pipeline to a file or directory.
810 Parameters
811 ----------
812 uri : convertible to `~lsst.resources.ResourcePath`
813 URI to write to; may have any scheme with
814 `~lsst.resources.ResourcePath` write support or no scheme for a
815 local file/directory. Should have a ``.yaml`` extension.
816 """
817 self._pipelineIR.write_to_uri(uri)
819 def to_graph(
820 self, registry: Registry | None = None, visualization_only: bool = False
821 ) -> pipeline_graph.PipelineGraph:
822 """Construct a pipeline graph from this pipeline.
824 Constructing a graph applies all configuration overrides, freezes all
825 configuration, checks all contracts, and checks for dataset type
826 consistency between tasks (as much as possible without access to a data
827 repository). It cannot be reversed.
829 Parameters
830 ----------
831 registry : `lsst.daf.butler.Registry`, optional
832 Data repository client. If provided, the graph's dataset types
833 and dimensions will be resolved (see `PipelineGraph.resolve`).
834 visualization_only : `bool`, optional
835 Resolve the graph as well as possible even when dimensions and
836 storage classes cannot really be determined. This can include
837 using the ``universe.commonSkyPix`` as the assumed dimensions of
838 connections that use the "skypix" placeholder and using "<UNKNOWN>"
839 as a storage class name (which will fail if the storage class
840 itself is ever actually loaded).
842 Returns
843 -------
844 graph : `pipeline_graph.PipelineGraph`
845 Representation of the pipeline as a graph.
846 """
847 instrument_class_name = self._pipelineIR.instrument
848 data_id = {}
849 if instrument_class_name is not None:
850 instrument_class: type[Instrument] = doImportType(instrument_class_name)
851 if instrument_class is not None:
852 data_id["instrument"] = instrument_class.getName()
853 graph = pipeline_graph.PipelineGraph(data_id=data_id)
854 graph.description = self._pipelineIR.description
855 for label in self._pipelineIR.tasks:
856 self._add_task_to_graph(label, graph)
857 if self._pipelineIR.contracts is not None:
858 label_to_config = {x.label: x.config for x in graph.tasks.values()}
859 for contract in self._pipelineIR.contracts:
860 # execute this in its own line so it can raise a good error
861 # message if there was problems with the eval
862 success = eval(contract.contract, None, label_to_config)
863 if not success:
864 extra_info = f": {contract.msg}" if contract.msg is not None else ""
865 raise pipelineIR.ContractError(
866 f"Contract(s) '{contract.contract}' were not satisfied{extra_info}"
867 )
868 for label, subset in self._pipelineIR.labeled_subsets.items():
869 graph.add_task_subset(
870 label, subset.subset, subset.description if subset.description is not None else ""
871 )
872 for step_ir in self._pipelineIR.steps:
873 graph.steps.append(step_ir.label, step_ir.dimensions)
874 if registry is not None or visualization_only:
875 graph.resolve(registry=registry, visualization_only=visualization_only)
876 else:
877 graph.sort()
878 return graph
880 def _add_task_to_graph(self, label: str, graph: pipeline_graph.PipelineGraph) -> None:
881 """Add a single task from this pipeline to a pipeline graph that is
882 under construction.
884 Parameters
885 ----------
886 label : `str`
887 Label for the task to be added.
888 graph : `pipeline_graph.PipelineGraph`
889 Graph to add the task to.
890 """
891 if (taskIR := self._pipelineIR.tasks.get(label)) is None:
892 raise NameError(f"Label {label} does not appear in this pipeline")
893 taskClass: type[PipelineTask] = doImportType(taskIR.klass)
894 config = taskClass.ConfigClass()
895 instrument: Instrument | None = None
896 if (instrumentName := self._pipelineIR.instrument) is not None:
897 instrument_cls: type = doImportType(instrumentName)
898 instrument = instrument_cls()
899 config.applyConfigOverrides(
900 instrument,
901 getattr(taskClass, "_DefaultName", ""),
902 taskIR.config,
903 self._pipelineIR.parameters,
904 label,
905 )
906 graph.add_task(label, taskClass, config)
908 def __len__(self) -> int:
909 return len(self._pipelineIR.tasks)
911 def __eq__(self, other: object) -> bool:
912 if not isinstance(other, Pipeline):
913 return False
914 elif self._pipelineIR == other._pipelineIR:
915 # Shortcut: if the IR is the same, the expanded pipeline must be
916 # the same as well. But the converse is not true.
917 return True
918 else:
919 # Compare as much as we can (task classes and their edges).
920 if self.to_graph().diff_tasks(other.to_graph()):
921 return False
922 # After DM-27847, we should compare configuration here.
923 raise NotImplementedError(
924 "Pipelines cannot be compared because config instances cannot be compared; see DM-27847."
925 )