Coverage for python / lsst / pipe / base / pipeline_graph / _task_subsets.py: 36%

117 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-26 08:59 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27from __future__ import annotations 

28 

29__all__ = ("StepDefinitions", "TaskSubset") 

30 

31from collections.abc import Iterable, Iterator, MutableSet 

32from contextlib import contextmanager 

33 

34import networkx 

35import networkx.algorithms.boundary 

36 

37from lsst.daf.butler import DimensionGroup, DimensionUniverse 

38 

39from ._exceptions import InvalidStepsError, PipelineGraphError, UnresolvedGraphError 

40from ._nodes import NodeKey, NodeType 

41 

42 

43class TaskSubset(MutableSet[str]): 

44 """A specialized set that represents a labeled subset of the tasks in a 

45 pipeline graph. 

46 

47 Instances of this class should never be constructed directly; they should 

48 only be accessed via the `PipelineGraph.task_subsets` attribute and created 

49 by the `PipelineGraph.add_task_subset` method. 

50 

51 Parameters 

52 ---------- 

53 parent_xgraph : `networkx.DiGraph` 

54 Parent networkx graph that this subgraph is part of. 

55 label : `str` 

56 Label associated with this subset of the pipeline. 

57 members : `set` [ `str` ] 

58 Labels of the tasks that are members of this subset. 

59 description : `str`, optional 

60 Description string associated with this labeled subset. 

61 step_definitions : `StepDefinitions` 

62 Information about special 'step' subsets that partition the pipeline. 

63 

64 Notes 

65 ----- 

66 Iteration order is arbitrary, even when the parent pipeline graph is 

67 ordered (there is no guarantee that an ordering of the tasks in the graph 

68 implies a consistent ordering of subsets). 

69 """ 

70 

71 def __init__( 

72 self, 

73 parent_xgraph: networkx.DiGraph, 

74 label: str, 

75 members: set[str], 

76 description: str, 

77 step_definitions: StepDefinitions, 

78 ): 

79 self._parent_xgraph = parent_xgraph 

80 self._label = label 

81 self._members = members 

82 self._description = description 

83 self._step_definitions = step_definitions 

84 

85 @property 

86 def label(self) -> str: 

87 """Label associated with this subset of the pipeline.""" 

88 return self._label 

89 

90 @property 

91 def description(self) -> str: 

92 """Description string associated with this labeled subset.""" 

93 return self._description 

94 

95 @description.setter 

96 def description(self, value: str) -> None: 

97 # Docstring in getter. 

98 self._description = value 

99 

100 @property 

101 def is_step(self) -> bool: 

102 """Whether this subset is a step.""" 

103 return self.label in self._step_definitions 

104 

105 @property 

106 def dimensions(self) -> DimensionGroup: 

107 """The dimensions that can be used to split up this subset's quanta 

108 into independent groups. 

109 

110 This is only available if `is_step` is `True` and only if the pipeline 

111 graph has been resolved. 

112 """ 

113 return self._step_definitions.get_dimensions(self.label) 

114 

115 @dimensions.setter 

116 def dimensions(self, dimensions: Iterable[str] | DimensionGroup) -> None: 

117 # Docstring in getter. 

118 self._step_definitions.set_dimensions(self.label, dimensions) 

119 

120 def __repr__(self) -> str: 

121 return f"{self.label}: {self.description!r}, tasks={{{', '.join(iter(self))}}}" 

122 

123 def __contains__(self, key: object) -> bool: 

124 return key in self._members 

125 

126 def __len__(self) -> int: 

127 return len(self._members) 

128 

129 def __iter__(self) -> Iterator[str]: 

130 return iter(self._members) 

131 

132 def add(self, value: str) -> None: 

133 """Add a new task to this subset. 

134 

135 Parameters 

136 ---------- 

137 value : `str` 

138 Label for the task. Must already be present in the parent pipeline 

139 graph. 

140 """ 

141 key = NodeKey(NodeType.TASK, value) 

142 if key not in self._parent_xgraph: 

143 raise PipelineGraphError(f"{value!r} is not a task in the parent pipeline.") 

144 with self._step_definitions._unverified_on_success(): 

145 self._members.add(key.name) 

146 

147 def discard(self, value: str) -> None: 

148 """Remove a task from the subset if it is present. 

149 

150 Parameters 

151 ---------- 

152 value : `str` 

153 Label for the task. Must already be present in the parent pipeline 

154 graph. 

155 """ 

156 with self._step_definitions._unverified_on_success(): 

157 self._members.discard(value) 

158 

159 @classmethod 

160 def _from_iterable[S](cls, iterable: Iterable[S]) -> set[S]: 

161 # This is the hook used by collections.abc.Set when implementing 

162 # operators that return new sets. In this case, we want those to be 

163 # regular `set` (builtin) objects, not `TaskSubset` instances. 

164 return set(iterable) 

165 

166 

167class StepDefinitions: 

168 """A collection of the 'steps' defined in a pipeline graph. 

169 

170 Steps are special task subsets that must be executed separately. They may 

171 also be associated with "sharding dimensions", which are the dimensions of 

172 data IDs that are independent within the step: splitting up a quantum graph 

173 along a step's sharding dimensions produces groups that can be safely 

174 executed independently. 

175 

176 Parameters 

177 ---------- 

178 universe : `lsst.daf.butler.DimensionUniverse`, optional 

179 Definitions for data dimensions. 

180 dimensions_by_label : `dict` [ `str`, `frozenset` [ `str` ] ], optional 

181 Sharding dimensions for step subsets, as dictionary with task labels as 

182 keys and sets of dimension names as values. 

183 verified : `bool`, optional 

184 Whether the step definitions have been checked since the last time 

185 they or some other relevant aspect of the pipeline graph was changed. 

186 

187 Notes 

188 ----- 

189 This class only models `collections.abc.Collection` (it is iterable, sized, 

190 and can be used with ``in`` tests on label names), but it also supports 

191 `append`, `remove`, and `reset` for modifications. 

192 """ 

193 

194 def __init__( 

195 self, 

196 universe: DimensionUniverse | None = None, 

197 dimensions_by_label: dict[str, frozenset[str]] | None = None, 

198 verified: bool = False, 

199 ): 

200 self._universe = universe 

201 self._dimensions_by_label = dimensions_by_label if dimensions_by_label is not None else {} 

202 self._verified = verified 

203 

204 @property 

205 def verified(self) -> bool: 

206 """Whether the step definitions have been checked since the last time 

207 they or some other relevant aspect of the pipeline graph was changed. 

208 

209 This is always `True` if there are no step definitions. 

210 """ 

211 # If there are no steps, the step definitions are still verified. 

212 return self._verified or not self._dimensions_by_label 

213 

214 def __contains__(self, label: object) -> bool: 

215 return label in self._dimensions_by_label 

216 

217 def __iter__(self) -> Iterator[str]: 

218 return iter(self._dimensions_by_label) 

219 

220 def __len__(self) -> int: 

221 return len(self._dimensions_by_label) 

222 

223 def __repr__(self) -> str: 

224 return str(list(self._dimensions_by_label)) 

225 

226 def __eq__(self, other: object) -> bool: 

227 if isinstance(other, StepDefinitions): 

228 return self._dimensions_by_label == other._dimensions_by_label 

229 return NotImplemented 

230 

231 def copy(self) -> StepDefinitions: 

232 """Create a new instance that does not share any mutable state with 

233 this one. 

234 """ 

235 return StepDefinitions( 

236 universe=self._universe, 

237 dimensions_by_label=self._dimensions_by_label.copy(), 

238 verified=self._verified, 

239 ) 

240 

241 def append(self, label: str, dimensions: Iterable[str] | DimensionGroup = ()) -> None: 

242 """Append a new step. 

243 

244 Parameters 

245 ---------- 

246 label : `str` 

247 Task subset label for the new step. 

248 dimensions : `~collections.abc.Iterable` [ `str` ] or \ 

249 `~lsst.daf.butler.DimensionGroup`, optional 

250 Dimensions that can be used to split up the step's quanta 

251 into independent groups. 

252 """ 

253 if self._universe is not None: 

254 dimensions = self._universe.conform(dimensions) 

255 if isinstance(dimensions, DimensionGroup): 

256 dimensions = frozenset(dimensions.names) 

257 else: 

258 dimensions = frozenset(dimensions) 

259 with self._unverified_on_success(): 

260 self._dimensions_by_label[label] = dimensions 

261 

262 def remove(self, label: str) -> None: 

263 """Remove a named step. 

264 

265 Parameters 

266 ---------- 

267 label : `str` 

268 Task subset label to remove from the list of steps. 

269 

270 Notes 

271 ----- 

272 This does not remove the task subset itself; it just "demotes" it to a 

273 non-step subset. 

274 """ 

275 with self._unverified_on_success(): 

276 del self._dimensions_by_label[label] 

277 

278 def assign(self, labels: Iterable[str]) -> None: 

279 """Set all step definitions to the given labels. 

280 

281 Parameters 

282 ---------- 

283 labels : `~collections.abc.Iterable` [ `str` ] 

284 Subset labels to use as the new steps. 

285 

286 Notes 

287 ----- 

288 Sharding dimensions are preserved for any label that was previously a 

289 step. If ``labels`` is a `StepDefinitions`` instance, sharding 

290 dimensions from that instance will be used. 

291 """ 

292 if isinstance(labels, StepDefinitions): 

293 with self._unverified_on_success(): 

294 self._dimensions_by_label = labels._dimensions_by_label.copy() 

295 else: 

296 with self._unverified_on_success(): 

297 self._dimensions_by_label = { 

298 label: self._dimensions_by_label.get(label, frozenset()) for label in labels 

299 } 

300 

301 def clear(self) -> None: 

302 """Remove all step definitions.""" 

303 self.assign(()) 

304 

305 def get_dimensions(self, label: str) -> DimensionGroup: 

306 """Return the dimensions that can be used to split up a step's quanta 

307 into independent groups. 

308 

309 Parameters 

310 ---------- 

311 label : `str` 

312 Label for the step. 

313 

314 Returns 

315 ------- 

316 dimensions : `lsst.daf.butler.DimensionGroup` 

317 Dimensions that can be used to split up this step's quanta. 

318 """ 

319 try: 

320 raw_dimensions = self._dimensions_by_label[label] 

321 except KeyError: 

322 raise InvalidStepsError(f"Task subset {label!r} is not a step.") from None 

323 if self._universe is not None: 

324 return self._universe.conform(raw_dimensions) 

325 else: 

326 raise UnresolvedGraphError("Step sharding dimensions have not been resolved.") 

327 

328 def set_dimensions(self, label: str, dimensions: Iterable[str] | DimensionGroup) -> None: 

329 """Set the dimensions that can be used to split up a step's quanta 

330 into independent groups. 

331 

332 Parameters 

333 ---------- 

334 label : `str` 

335 Label for the step. 

336 dimensions : `lsst.daf.butler.DimensionGroup` 

337 Dimensions that can be used to split up this step's quanta. 

338 """ 

339 if label not in self._dimensions_by_label: 

340 raise PipelineGraphError(f"Subset {label!r} is not a step.") 

341 if self._universe is not None: 

342 dimensions = self._universe.conform(dimensions) 

343 if isinstance(dimensions, DimensionGroup): 

344 dimensions = frozenset(dimensions.names) 

345 else: 

346 dimensions = frozenset(dimensions) 

347 with self._unverified_on_success(): 

348 self._dimensions_by_label[label] = dimensions 

349 

350 @contextmanager 

351 def _unverified_on_success(self) -> Iterator[None]: 

352 """Return the a context manager that marks the step definitions as 

353 unverified if it is exited without an exception. 

354 

355 This should be used only for exception-safe modifications for which an 

356 exception means no changes were made (and hence the verified state can 

357 remain unchanged as well). 

358 """ 

359 yield 

360 self._verified = False