Coverage for python / lsst / ctrl / mpexec / preExecInit.py: 41%

41 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:48 +0000

1# This file is part of ctrl_mpexec. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30__all__ = ["PreExecInit", "PreExecInitBase"] 

31 

32# ------------------------------- 

33# Imports of standard modules -- 

34# ------------------------------- 

35import abc 

36import logging 

37from typing import TYPE_CHECKING 

38 

39# ----------------------------- 

40# Imports for other modules -- 

41# ----------------------------- 

42 

43if TYPE_CHECKING: 

44 from lsst.daf.butler import Butler, LimitedButler 

45 from lsst.pipe.base import QuantumGraph, TaskFactory 

46 

47_LOG = logging.getLogger(__name__) 

48 

49 

50class PreExecInitBase(abc.ABC): 

51 """Common part of the implementation of PreExecInit classes that does not 

52 depend on Butler type. 

53 

54 Parameters 

55 ---------- 

56 butler : `~lsst.daf.butler.LimitedButler` 

57 Butler to use. 

58 taskFactory : `lsst.pipe.base.TaskFactory` 

59 Ignored and accepted for backwards compatibility. 

60 extendRun : `bool` 

61 Whether extend run parameter is in use. 

62 """ 

63 

64 def __init__(self, butler: LimitedButler, taskFactory: TaskFactory, extendRun: bool): 

65 self.butler = butler 

66 self.extendRun = extendRun 

67 

68 def initialize( 

69 self, 

70 graph: QuantumGraph, 

71 saveInitOutputs: bool = True, 

72 registerDatasetTypes: bool = False, 

73 saveVersions: bool = True, 

74 ) -> None: 

75 """Perform all initialization steps. 

76 

77 Convenience method to execute all initialization steps. Instead of 

78 calling this method and providing all options it is also possible to 

79 call methods individually. 

80 

81 Parameters 

82 ---------- 

83 graph : `~lsst.pipe.base.QuantumGraph` 

84 Execution graph. 

85 saveInitOutputs : `bool`, optional 

86 If ``True`` (default) then save "init outputs", configurations, 

87 and package versions to butler. 

88 registerDatasetTypes : `bool`, optional 

89 If ``True`` then register dataset types in registry, otherwise 

90 they must be already registered. 

91 saveVersions : `bool`, optional 

92 If ``False`` then do not save package versions even if 

93 ``saveInitOutputs`` is set to ``True``. 

94 """ 

95 # register dataset types or check consistency 

96 self.initializeDatasetTypes(graph, registerDatasetTypes) 

97 

98 # Save task initialization data or check that saved data 

99 # is consistent with what tasks would save 

100 if saveInitOutputs: 

101 self.saveInitOutputs(graph) 

102 self.saveConfigs(graph) 

103 if saveVersions: 

104 self.savePackageVersions(graph) 

105 

106 @abc.abstractmethod 

107 def initializeDatasetTypes(self, graph: QuantumGraph, registerDatasetTypes: bool = False) -> None: 

108 """Save or check DatasetTypes output by the tasks in a graph. 

109 

110 Iterates over all DatasetTypes for all tasks in a graph and either 

111 tries to add them to registry or compares them to existing ones. 

112 

113 Parameters 

114 ---------- 

115 graph : `~lsst.pipe.base.QuantumGraph` 

116 Execution graph. 

117 registerDatasetTypes : `bool`, optional 

118 If ``True`` then register dataset types in registry, otherwise 

119 they must be already registered. 

120 

121 Raises 

122 ------ 

123 ValueError 

124 Raised if existing DatasetType is different from DatasetType 

125 in a graph. 

126 KeyError 

127 Raised if ``registerDatasetTypes`` is ``False`` and DatasetType 

128 does not exist in registry. 

129 """ 

130 raise NotImplementedError() 

131 

132 def saveInitOutputs(self, graph: QuantumGraph) -> None: 

133 """Write any datasets produced by initializing tasks in a graph. 

134 

135 Parameters 

136 ---------- 

137 graph : `~lsst.pipe.base.QuantumGraph` 

138 Execution graph. 

139 

140 Raises 

141 ------ 

142 TypeError 

143 Raised if the type of existing object in butler is different from 

144 new data. 

145 """ 

146 _LOG.debug("Will save InitOutputs for all tasks") 

147 graph.write_init_outputs(self.butler, skip_existing=self.extendRun) 

148 

149 def saveConfigs(self, graph: QuantumGraph) -> None: 

150 """Write configurations for pipeline tasks to butler or check that 

151 existing configurations are equal to the new ones. 

152 

153 Parameters 

154 ---------- 

155 graph : `~lsst.pipe.base.QuantumGraph` 

156 Execution graph. 

157 

158 Raises 

159 ------ 

160 lsst.daf.butler.registry.ConflictingDefinitionError 

161 Raised if existing object in butler is different from new data, or 

162 if ``extendRun`` is `False` and datasets already exists. 

163 Content of a butler collection should not be changed if this 

164 exception is raised. 

165 """ 

166 graph.write_configs(self.butler, compare_existing=self.extendRun) 

167 

168 def savePackageVersions(self, graph: QuantumGraph) -> None: 

169 """Write versions of software packages to butler. 

170 

171 Parameters 

172 ---------- 

173 graph : `~lsst.pipe.base.QuantumGraph` 

174 Execution graph. 

175 

176 Raises 

177 ------ 

178 TypeError 

179 Raised if existing object in butler is incompatible with new data. 

180 """ 

181 graph.write_packages(self.butler, compare_existing=self.extendRun) 

182 

183 

184class PreExecInit(PreExecInitBase): 

185 """Initialization of registry for QuantumGraph execution. 

186 

187 This class encapsulates all necessary operations that have to be performed 

188 on butler and registry to prepare them for QuantumGraph execution. 

189 

190 Parameters 

191 ---------- 

192 butler : `~lsst.daf.butler.Butler` 

193 Data butler instance. 

194 taskFactory : `~lsst.pipe.base.TaskFactory` 

195 Task factory. 

196 extendRun : `bool`, optional 

197 If `True` then do not try to overwrite any datasets that might exist 

198 in ``butler.run``; instead compare them when appropriate/possible. If 

199 `False`, then any existing conflicting dataset will cause a butler 

200 exception to be raised. 

201 """ 

202 

203 def __init__(self, butler: Butler, taskFactory: TaskFactory, extendRun: bool = False): 

204 super().__init__(butler, taskFactory, extendRun) 

205 self.full_butler = butler 

206 if self.extendRun and self.full_butler.run is None: 

207 raise RuntimeError( 

208 "Cannot perform extendRun logic unless butler is initialized " 

209 "with a default output RUN collection." 

210 ) 

211 

212 def initializeDatasetTypes(self, graph: QuantumGraph, registerDatasetTypes: bool = False) -> None: 

213 # docstring inherited 

214 if registerDatasetTypes: 

215 graph.pipeline_graph.register_dataset_types(self.full_butler) 

216 else: 

217 graph.pipeline_graph.check_dataset_type_registrations(self.full_butler) 

218 

219 

220class PreExecInitLimited(PreExecInitBase): 

221 """Initialization of registry for QuantumGraph execution. 

222 

223 This class works with LimitedButler and expects that all references in 

224 QuantumGraph are resolved. 

225 

226 Parameters 

227 ---------- 

228 butler : `~lsst.daf.butler.LimitedButler` 

229 Limited data butler instance. 

230 taskFactory : `~lsst.pipe.base.TaskFactory` 

231 Task factory. 

232 """ 

233 

234 def __init__(self, butler: LimitedButler, taskFactory: TaskFactory): 

235 super().__init__(butler, taskFactory, False) 

236 

237 def initializeDatasetTypes(self, graph: QuantumGraph, registerDatasetTypes: bool = False) -> None: 

238 # docstring inherited 

239 # With LimitedButler we never create or check dataset types. 

240 pass