Coverage for python / lsst / ctrl / mpexec / taskFactory.py: 24%
27 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 22:28 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 22:28 +0000
1# This file is part of ctrl_mpexec.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
22from __future__ import annotations
24__all__ = ["TaskFactory"]
26import logging
27from typing import TYPE_CHECKING, Optional
29from lsst.daf.butler import DatasetType
30from lsst.pipe.base import TaskFactory as BaseTaskFactory
32if TYPE_CHECKING:
33 from lsst.daf.butler import Butler
34 from lsst.pipe.base import PipelineTask, PipelineTaskConfig
35 from lsst.pipe.base.configOverrides import ConfigOverrides
37_LOG = logging.getLogger(__name__)
40class TaskFactory(BaseTaskFactory):
41 """Class instantiating PipelineTasks."""
43 def makeTask(
44 self,
45 taskClass: type[PipelineTask],
46 label: Optional[str],
47 config: Optional[PipelineTaskConfig],
48 overrides: Optional[ConfigOverrides],
49 butler: Optional[Butler],
50 ) -> PipelineTask:
51 """Create new PipelineTask instance from its class.
53 Parameters
54 ----------
55 taskClass : type
56 PipelineTask class.
57 label : `str` or `None`
58 The label of the new task; if `None` then use
59 ``taskClass._DefaultName``.
60 config : `pex.Config` or None
61 Configuration object, if ``None`` then use task-defined
62 configuration class to create new instance.
63 overrides : `ConfigOverrides` or None
64 Configuration overrides, this should contain all overrides to be
65 applied to a default task config, including instrument-specific,
66 obs-package specific, and possibly command-line overrides.
67 butler : `lsst.daf.butler.Butler` or None
68 Butler instance used to obtain initialization inputs for
69 PipelineTasks. If None, some PipelineTasks will not be usable
71 Returns
72 -------
73 Instance of a PipelineTask class or None on errors.
75 Raises
76 ------
77 Any exceptions that are raised by PipelineTask constructor or its
78 configuration class are propagated back to caller.
79 """
81 # configuration
82 if config is None:
83 config = taskClass.ConfigClass()
84 if overrides:
85 overrides.applyTo(config)
86 elif overrides is not None:
87 _LOG.warning(
88 "Both config and overrides are specified for task %s, overrides are ignored",
89 taskClass.__name__,
90 )
92 # if we don't have a butler, try to construct without initInputs;
93 # let PipelineTasks raise if that's impossible
94 if butler is None:
95 initInputs = None
96 else:
97 connections = config.connections.ConnectionsClass(config=config)
98 descriptorMap = {}
99 for name in connections.initInputs:
100 attribute = getattr(connections, name)
101 dsType = DatasetType(
102 attribute.name, butler.registry.dimensions.extract(set()), attribute.storageClass
103 )
104 descriptorMap[name] = dsType
105 initInputs = {k: butler.get(v) for k, v in descriptorMap.items()}
107 # Freeze the config
108 config.freeze()
110 # make task instance
111 task = taskClass(config=config, initInputs=initInputs, name=label)
113 return task