Coverage for python / lsst / ctrl / mpexec / preExecInit.py: 41%
41 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:45 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:45 +0000
1# This file is part of ctrl_mpexec.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30__all__ = ["PreExecInit", "PreExecInitBase"]
32# -------------------------------
33# Imports of standard modules --
34# -------------------------------
35import abc
36import logging
37from typing import TYPE_CHECKING
39# -----------------------------
40# Imports for other modules --
41# -----------------------------
43if TYPE_CHECKING:
44 from lsst.daf.butler import Butler, LimitedButler
45 from lsst.pipe.base import QuantumGraph, TaskFactory
47_LOG = logging.getLogger(__name__)
50class PreExecInitBase(abc.ABC):
51 """Common part of the implementation of PreExecInit classes that does not
52 depend on Butler type.
54 Parameters
55 ----------
56 butler : `~lsst.daf.butler.LimitedButler`
57 Butler to use.
58 taskFactory : `lsst.pipe.base.TaskFactory`
59 Ignored and accepted for backwards compatibility.
60 extendRun : `bool`
61 Whether extend run parameter is in use.
62 """
64 def __init__(self, butler: LimitedButler, taskFactory: TaskFactory, extendRun: bool):
65 self.butler = butler
66 self.extendRun = extendRun
68 def initialize(
69 self,
70 graph: QuantumGraph,
71 saveInitOutputs: bool = True,
72 registerDatasetTypes: bool = False,
73 saveVersions: bool = True,
74 ) -> None:
75 """Perform all initialization steps.
77 Convenience method to execute all initialization steps. Instead of
78 calling this method and providing all options it is also possible to
79 call methods individually.
81 Parameters
82 ----------
83 graph : `~lsst.pipe.base.QuantumGraph`
84 Execution graph.
85 saveInitOutputs : `bool`, optional
86 If ``True`` (default) then save "init outputs", configurations,
87 and package versions to butler.
88 registerDatasetTypes : `bool`, optional
89 If ``True`` then register dataset types in registry, otherwise
90 they must be already registered.
91 saveVersions : `bool`, optional
92 If ``False`` then do not save package versions even if
93 ``saveInitOutputs`` is set to ``True``.
94 """
95 # register dataset types or check consistency
96 self.initializeDatasetTypes(graph, registerDatasetTypes)
98 # Save task initialization data or check that saved data
99 # is consistent with what tasks would save
100 if saveInitOutputs:
101 self.saveInitOutputs(graph)
102 self.saveConfigs(graph)
103 if saveVersions:
104 self.savePackageVersions(graph)
106 @abc.abstractmethod
107 def initializeDatasetTypes(self, graph: QuantumGraph, registerDatasetTypes: bool = False) -> None:
108 """Save or check DatasetTypes output by the tasks in a graph.
110 Iterates over all DatasetTypes for all tasks in a graph and either
111 tries to add them to registry or compares them to existing ones.
113 Parameters
114 ----------
115 graph : `~lsst.pipe.base.QuantumGraph`
116 Execution graph.
117 registerDatasetTypes : `bool`, optional
118 If ``True`` then register dataset types in registry, otherwise
119 they must be already registered.
121 Raises
122 ------
123 ValueError
124 Raised if existing DatasetType is different from DatasetType
125 in a graph.
126 KeyError
127 Raised if ``registerDatasetTypes`` is ``False`` and DatasetType
128 does not exist in registry.
129 """
130 raise NotImplementedError()
132 def saveInitOutputs(self, graph: QuantumGraph) -> None:
133 """Write any datasets produced by initializing tasks in a graph.
135 Parameters
136 ----------
137 graph : `~lsst.pipe.base.QuantumGraph`
138 Execution graph.
140 Raises
141 ------
142 TypeError
143 Raised if the type of existing object in butler is different from
144 new data.
145 """
146 _LOG.debug("Will save InitOutputs for all tasks")
147 graph.write_init_outputs(self.butler, skip_existing=self.extendRun)
149 def saveConfigs(self, graph: QuantumGraph) -> None:
150 """Write configurations for pipeline tasks to butler or check that
151 existing configurations are equal to the new ones.
153 Parameters
154 ----------
155 graph : `~lsst.pipe.base.QuantumGraph`
156 Execution graph.
158 Raises
159 ------
160 lsst.daf.butler.registry.ConflictingDefinitionError
161 Raised if existing object in butler is different from new data, or
162 if ``extendRun`` is `False` and datasets already exists.
163 Content of a butler collection should not be changed if this
164 exception is raised.
165 """
166 graph.write_configs(self.butler, compare_existing=self.extendRun)
168 def savePackageVersions(self, graph: QuantumGraph) -> None:
169 """Write versions of software packages to butler.
171 Parameters
172 ----------
173 graph : `~lsst.pipe.base.QuantumGraph`
174 Execution graph.
176 Raises
177 ------
178 TypeError
179 Raised if existing object in butler is incompatible with new data.
180 """
181 graph.write_packages(self.butler, compare_existing=self.extendRun)
184class PreExecInit(PreExecInitBase):
185 """Initialization of registry for QuantumGraph execution.
187 This class encapsulates all necessary operations that have to be performed
188 on butler and registry to prepare them for QuantumGraph execution.
190 Parameters
191 ----------
192 butler : `~lsst.daf.butler.Butler`
193 Data butler instance.
194 taskFactory : `~lsst.pipe.base.TaskFactory`
195 Task factory.
196 extendRun : `bool`, optional
197 If `True` then do not try to overwrite any datasets that might exist
198 in ``butler.run``; instead compare them when appropriate/possible. If
199 `False`, then any existing conflicting dataset will cause a butler
200 exception to be raised.
201 """
203 def __init__(self, butler: Butler, taskFactory: TaskFactory, extendRun: bool = False):
204 super().__init__(butler, taskFactory, extendRun)
205 self.full_butler = butler
206 if self.extendRun and self.full_butler.run is None:
207 raise RuntimeError(
208 "Cannot perform extendRun logic unless butler is initialized "
209 "with a default output RUN collection."
210 )
212 def initializeDatasetTypes(self, graph: QuantumGraph, registerDatasetTypes: bool = False) -> None:
213 # docstring inherited
214 if registerDatasetTypes:
215 graph.pipeline_graph.register_dataset_types(self.full_butler)
216 else:
217 graph.pipeline_graph.check_dataset_type_registrations(self.full_butler)
220class PreExecInitLimited(PreExecInitBase):
221 """Initialization of registry for QuantumGraph execution.
223 This class works with LimitedButler and expects that all references in
224 QuantumGraph are resolved.
226 Parameters
227 ----------
228 butler : `~lsst.daf.butler.LimitedButler`
229 Limited data butler instance.
230 taskFactory : `~lsst.pipe.base.TaskFactory`
231 Task factory.
232 """
234 def __init__(self, butler: LimitedButler, taskFactory: TaskFactory):
235 super().__init__(butler, taskFactory, False)
237 def initializeDatasetTypes(self, graph: QuantumGraph, registerDatasetTypes: bool = False) -> None:
238 # docstring inherited
239 # With LimitedButler we never create or check dataset types.
240 pass