Coverage for python / lsst / pipe / base / taskFactory.py: 29%
18 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-24 08:19 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-24 08:19 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30__all__ = ["TaskFactory"]
32from abc import ABCMeta
33from collections.abc import Iterable
34from typing import TYPE_CHECKING, Any
36if TYPE_CHECKING:
37 from lsst.daf.butler import DatasetRef, LimitedButler
39 from .pipeline_graph import TaskNode
40 from .pipelineTask import PipelineTask
43class TaskFactory(metaclass=ABCMeta):
44 """A helper class for creating instances of PipelineTask subclasses."""
46 def makeTask(
47 self,
48 task_node: TaskNode,
49 /,
50 butler: LimitedButler,
51 initInputRefs: Iterable[DatasetRef] | None,
52 ) -> PipelineTask:
53 """Create new PipelineTask instance from its
54 `~.pipeline_graph.TaskNode`.
56 Parameters
57 ----------
58 task_node : `~pipeline_graph.TaskNode`
59 Task definition structure.
60 butler : `lsst.daf.butler.LimitedButler`
61 Butler instance used to obtain initialization inputs for task.
62 initInputRefs : `~collections.abc.Iterable` of \
63 `~lsst.daf.butler.DatasetRef` or `None`
64 List of resolved dataset references for init inputs for this task.
66 Returns
67 -------
68 task : `PipelineTask`
69 Instance of a PipelineTask class.
71 Raises
72 ------
73 Any exceptions that are raised by PipelineTask constructor or its
74 configuration class are propagated back to caller.
75 """
76 config = task_node.config
77 init_inputs: dict[str, Any] = {}
78 init_input_refs_by_dataset_type = {}
79 if initInputRefs is not None:
80 init_input_refs_by_dataset_type = {ref.datasetType.name: ref for ref in initInputRefs}
81 task_class = task_node.task_class
82 if init_input_refs_by_dataset_type:
83 for read_edge in task_node.init.inputs.values():
84 init_inputs[read_edge.connection_name] = butler.get(
85 init_input_refs_by_dataset_type[read_edge.dataset_type_name]
86 )
87 task = task_class(config=config, initInputs=init_inputs, name=task_node.label)
88 return task