Coverage for python / lsst / pipe / base / pipelineTask.py: 78%

18 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-24 08:19 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28"""Define `PipelineTask` class and related methods.""" 

29 

30from __future__ import annotations 

31 

32__all__ = ["PipelineTask"] # Classes in this module 

33 

34from collections.abc import Callable 

35from typing import TYPE_CHECKING, Any, ClassVar 

36 

37from .connections import InputQuantizedConnection, OutputQuantizedConnection 

38from .task import Task 

39 

40if TYPE_CHECKING: 

41 import logging 

42 

43 from lsst.utils.logging import LsstLogAdapter 

44 

45 from ._quantumContext import QuantumContext 

46 from .config import PipelineTaskConfig 

47 from .struct import Struct 

48 

49 

50class PipelineTask(Task): 

51 """Base class for all pipeline tasks. 

52 

53 This is an abstract base class for PipelineTasks which represents an 

54 algorithm executed by framework(s) on data which comes from data butler, 

55 resulting data is also stored in a data butler. 

56 

57 PipelineTask inherits from a `~lsst.pipe.base.Task` and uses the same 

58 configuration mechanism based on `lsst.pex.config`. `PipelineTask` 

59 classes also have a `PipelineTaskConnections` class associated with their 

60 config which defines all of the IO a `PipelineTask` will need to do. 

61 PipelineTask sub-class typically implements `run()` method which receives 

62 Python-domain data objects and returns `lsst.pipe.base.Struct` object with 

63 resulting data. `run()` method is not supposed to perform any I/O, it 

64 operates entirely on in-memory objects. `runQuantum()` is the method (can 

65 be re-implemented in sub-class) where all necessary I/O is performed, it 

66 reads all input data from data butler into memory, calls `run()` method 

67 with that data, examines returned `Struct` object and saves some or all of 

68 that data back to data butler. `runQuantum()` method receives a 

69 `QuantumContext` instance to facilitate I/O, a `InputQuantizedConnection` 

70 instance which defines all input `lsst.daf.butler.DatasetRef`, and a 

71 `OutputQuantizedConnection` instance which defines all the output 

72 `lsst.daf.butler.DatasetRef` for a single invocation of PipelineTask. 

73 

74 Subclasses must be constructable with exactly the arguments taken by the 

75 PipelineTask base class constructor, but may support other signatures as 

76 well. 

77 

78 Parameters 

79 ---------- 

80 config : `~lsst.pex.config.Config`, optional 

81 Configuration for this task (an instance of ``self.ConfigClass``, 

82 which is a task-specific subclass of `PipelineTaskConfig`). 

83 If not specified then it defaults to ``self.ConfigClass()``. 

84 log : `logging.Logger`, optional 

85 Logger instance whose name is used as a log name prefix, or ``None`` 

86 for no prefix. 

87 initInputs : `dict`, optional 

88 A dictionary of objects needed to construct this PipelineTask, with 

89 keys matching the keys of the dictionary returned by 

90 `getInitInputDatasetTypes` and values equivalent to what would be 

91 obtained by calling `~lsst.daf.butler.Butler.get` with those 

92 DatasetTypes and no data IDs. While it is optional for the base class, 

93 subclasses are permitted to require this argument. 

94 **kwargs : `~typing.Any` 

95 Arbitrary parameters, passed to base class constructor. 

96 """ 

97 

98 ConfigClass: ClassVar[type[PipelineTaskConfig]] 

99 

100 canMultiprocess: ClassVar[bool] = True 

101 """Whether this task can be run by an executor that uses subprocesses for 

102 parallelism. 

103 """ 

104 

105 def __init__( 

106 self, 

107 *, 

108 config: PipelineTaskConfig | None = None, 

109 log: logging.Logger | LsstLogAdapter | None = None, 

110 initInputs: dict[str, Any] | None = None, 

111 **kwargs: Any, 

112 ): 

113 super().__init__(config=config, log=log, **kwargs) 

114 

115 run: Callable[..., Struct] # The 'run' method for subclasses will have a different signature 

116 

117 def run(self, **kwargs: Any) -> Struct: # type: ignore 

118 """Run task algorithm on in-memory data. 

119 

120 This method should be implemented in a subclass. This method will 

121 receive keyword-only arguments whose names will be the same as names of 

122 connection fields describing input dataset types. Argument values will 

123 be data objects retrieved from data butler. If a dataset type is 

124 configured with ``multiple`` field set to ``True`` then the argument 

125 value will be a list of objects, otherwise it will be a single object. 

126 

127 If the task needs to know its input or output DataIds then it also has 

128 to override the `runQuantum` method. 

129 

130 This method should return a `Struct` whose attributes share the same 

131 name as the connection fields describing output dataset types. 

132 

133 Parameters 

134 ---------- 

135 **kwargs : `~typing.Any` 

136 Arbitrary parameters accepted by subclasses. 

137 

138 Returns 

139 ------- 

140 struct : `Struct` 

141 Struct with attribute names corresponding to output connection 

142 fields. 

143 

144 Examples 

145 -------- 

146 Typical implementation of this method may look like: 

147 

148 .. code-block:: python 

149 

150 def run(self, *, input, calib): 

151 # "input", "calib", and "output" are the names of the 

152 # connection fields. 

153 

154 # Assuming that input/calib datasets are `scalar` they are 

155 # simple objects, do something with inputs and calibs, produce 

156 # output image. 

157 image = self.makeImage(input, calib) 

158 

159 # If output dataset is `scalar` then return object, not list 

160 return Struct(output=image) 

161 """ 

162 raise NotImplementedError("run() is not implemented") 

163 

164 def runQuantum( 

165 self, 

166 butlerQC: QuantumContext, 

167 inputRefs: InputQuantizedConnection, 

168 outputRefs: OutputQuantizedConnection, 

169 ) -> None: 

170 """Do butler IO and transform to provide in memory 

171 objects for tasks `~Task.run` method. 

172 

173 Parameters 

174 ---------- 

175 butlerQC : `QuantumContext` 

176 A butler which is specialized to operate in the context of a 

177 `lsst.daf.butler.Quantum`. 

178 inputRefs : `InputQuantizedConnection` 

179 Datastructure whose attribute names are the names that identify 

180 connections defined in corresponding `PipelineTaskConnections` 

181 class. The values of these attributes are the 

182 `lsst.daf.butler.DatasetRef` objects associated with the defined 

183 input/prerequisite connections. 

184 outputRefs : `OutputQuantizedConnection` 

185 Datastructure whose attribute names are the names that identify 

186 connections defined in corresponding `PipelineTaskConnections` 

187 class. The values of these attributes are the 

188 `lsst.daf.butler.DatasetRef` objects associated with the defined 

189 output connections. 

190 """ 

191 inputs = butlerQC.get(inputRefs) 

192 outputs = self.run(**inputs) 

193 butlerQC.put(outputs, outputRefs)