Coverage for python / lsst / pipe / base / pipeline_graph / _pipeline_graph.py: 12%

745 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-28 08:44 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27from __future__ import annotations 

28 

29__all__ = ("PipelineGraph", "compare_packages", "log_config_mismatch") 

30 

31import gzip 

32import itertools 

33import json 

34import logging 

35from collections.abc import Callable, Iterable, Iterator, Mapping, Sequence, Set 

36from typing import TYPE_CHECKING, Any, BinaryIO, Literal, cast 

37 

38import networkx 

39import networkx.algorithms.bipartite 

40import networkx.algorithms.dag 

41 

42from lsst.daf.butler import ( 

43 Butler, 

44 DataCoordinate, 

45 DataId, 

46 DatasetRef, 

47 DatasetType, 

48 DimensionGroup, 

49 DimensionUniverse, 

50 MissingDatasetTypeError, 

51) 

52from lsst.daf.butler.registry import ConflictingDefinitionError, Registry 

53from lsst.resources import ResourcePath, ResourcePathExpression 

54from lsst.utils.packages import Packages 

55 

56from .._dataset_handle import InMemoryDatasetHandle 

57from ..automatic_connection_constants import PACKAGES_INIT_OUTPUT_NAME, PACKAGES_INIT_OUTPUT_STORAGE_CLASS 

58from . import expressions 

59from ._dataset_types import DatasetTypeNode 

60from ._edges import Edge, ReadEdge, WriteEdge 

61from ._exceptions import ( 

62 DuplicateOutputError, 

63 EdgesChangedError, 

64 InvalidExpressionError, 

65 InvalidStepsError, 

66 PipelineDataCycleError, 

67 PipelineGraphError, 

68 PipelineGraphExceptionSafetyError, 

69 UnresolvedGraphError, 

70) 

71from ._mapping_views import DatasetTypeMappingView, TaskMappingView 

72from ._nodes import NodeKey, NodeType 

73from ._task_subsets import StepDefinitions, TaskSubset 

74from ._tasks import TaskImportMode, TaskInitNode, TaskNode, _TaskNodeImportedData 

75 

76if TYPE_CHECKING: 

77 from ..config import PipelineTaskConfig 

78 from ..connections import PipelineTaskConnections 

79 from ..pipeline import TaskDef 

80 from ..pipelineTask import PipelineTask 

81 

82_LOG = logging.getLogger("lsst.pipe.base.pipeline_graph") 

83 

84 

85class PipelineGraph: 

86 """A graph representation of fully-configured pipeline. 

87 

88 `PipelineGraph` instances are typically constructed by calling 

89 `.Pipeline.to_graph`, but in rare cases constructing and then populating an 

90 empty one may be preferable. 

91 

92 Parameters 

93 ---------- 

94 description : `str`, optional 

95 String description for this pipeline. 

96 universe : `lsst.daf.butler.DimensionUniverse`, optional 

97 Definitions for all butler dimensions. If not provided, some 

98 attributes will not be available until `resolve` is called. 

99 data_id : `lsst.daf.butler.DataCoordinate` or other data ID, optional 

100 Data ID that represents a constraint on all quanta generated by this 

101 pipeline. This typically just holds the instrument constraint included 

102 in the pipeline definition, if there was one. 

103 """ 

104 

105 ########################################################################### 

106 # 

107 # Simple Pipeline Graph Inspection Interface: 

108 # 

109 # - for inspecting graph structure, not modifying it (except to sort and] 

110 # resolve); 

111 # 

112 # - no NodeKey objects, just string dataset type name and task label keys; 

113 # 

114 # - graph structure is represented as a pair of mappings, with methods to 

115 # find neighbors and edges of nodes. 

116 # 

117 ########################################################################### 

118 

119 def __init__( 

120 self, 

121 *, 

122 description: str = "", 

123 universe: DimensionUniverse | None = None, 

124 data_id: DataId | None = None, 

125 ) -> None: 

126 self._init_from_args( 

127 xgraph=None, 

128 sorted_keys=None, 

129 task_subsets=None, 

130 description=description, 

131 universe=universe, 

132 data_id=data_id, 

133 step_definitions=StepDefinitions(universe), 

134 ) 

135 

136 def __repr__(self) -> str: 

137 return f"{type(self).__name__}({self.description!r}, tasks={self.tasks!s})" 

138 

139 @property 

140 def description(self) -> str: 

141 """String description for this pipeline.""" 

142 return self._description 

143 

144 @description.setter 

145 def description(self, value: str) -> None: 

146 # Docstring in setter. 

147 self._description = value 

148 

149 @property 

150 def universe(self) -> DimensionUniverse: 

151 """Definitions for all butler dimensions.""" 

152 if self._universe is None: 

153 raise UnresolvedGraphError("Pipeline graph is not resolved.") 

154 return self._universe 

155 

156 @property 

157 def data_id(self) -> DataCoordinate: 

158 """Data ID that represents a constraint on all quanta generated from 

159 this pipeline. 

160 

161 This is may not be available unless the graph is resolved. 

162 """ 

163 return DataCoordinate.standardize(self._raw_data_id, universe=self.universe) 

164 

165 @property 

166 def tasks(self) -> TaskMappingView: 

167 """A mapping view of the tasks in the graph. 

168 

169 This mapping has `str` task label keys and `TaskNode` values. Iteration 

170 is topologically and deterministically ordered if and only if `sort` 

171 has been called since the last modification to the graph. 

172 """ 

173 return self._tasks 

174 

175 @property 

176 def dataset_types(self) -> DatasetTypeMappingView: 

177 """A mapping view of the dataset types in the graph. 

178 

179 This mapping has `str` parent dataset type name keys, but only provides 

180 access to its `DatasetTypeNode` values if `resolve` has been called 

181 since the last modification involving a task that uses a dataset type. 

182 See `DatasetTypeMappingView` for details. 

183 """ 

184 return self._dataset_types 

185 

186 @property 

187 def task_subsets(self) -> Mapping[str, TaskSubset]: 

188 """A mapping of all labeled subsets of tasks. 

189 

190 Keys are subset labels, values are sets of task labels. See 

191 `TaskSubset` for more information. 

192 

193 Use `add_task_subset` to add a new subset. The subsets themselves may 

194 be modified in-place. 

195 """ 

196 return self._task_subsets 

197 

198 @property 

199 def steps(self) -> StepDefinitions: 

200 """An ordered iterable of the labels of task subsets that must be 

201 executed separately. 

202 

203 Steps are intended to be partition of the full pipeline graph - the 

204 steps together include all tasks, and each task belongs to exactly one 

205 step - but this is only checked when the graph is resolved. 

206 

207 The order of the steps is required to be consistent with the graph's 

208 topological ordering, and when sorting a graph with steps, that sort 

209 order is guaranteed to be consistent with the order of the steps. But 

210 there may be other orderings of the steps that are still valid for 

211 execution (e.g. two steps could be runnable in parallel). 

212 """ 

213 return self._step_definitions 

214 

215 @steps.setter 

216 def steps(self, labels: Iterable[str]) -> None: 

217 # Docstring on getter. 

218 self._step_definitions.assign(labels) 

219 

220 def get_task_step(self, task_label: str) -> str: 

221 """Return the step that the given task belongs to. 

222 

223 Parameters 

224 ---------- 

225 task_label : `str` 

226 Label of the task to look up. 

227 

228 Returns 

229 ------- 

230 step_label : `str` 

231 Step the task belongs to. 

232 

233 Raises 

234 ------ 

235 InvalidStepsError 

236 Raised if the pipeline has no steps defined. 

237 UnresolvedGraphError 

238 Raised if the pipeline has been modified since the steps were 

239 last verified. 

240 """ 

241 if not self._step_definitions: 

242 raise InvalidStepsError("No steps have been defined for this pipeline.") 

243 elif self._step_definitions.verified: 

244 return self._xgraph.nodes[NodeKey(NodeType.TASK, task_label)]["step"] 

245 raise UnresolvedGraphError("Steps have not been verified since the last modification.") 

246 

247 @property 

248 def is_fully_resolved(self) -> bool: 

249 """Whether all of this graph's nodes are resolved and any all have 

250 been checked for correctness. 

251 

252 A fully-resolved graph is always sorted as well, and a fully-resolved 

253 graph with sorted 

254 """ 

255 return ( 

256 self._universe is not None 

257 and self._step_definitions.verified 

258 and self._sorted_keys is not None 

259 and all(self.dataset_types.is_resolved(k) for k in self.dataset_types) 

260 ) 

261 

262 @property 

263 def has_been_sorted(self) -> bool: 

264 """Whether this graph's tasks and dataset types have been 

265 topologically sorted (with unspecified but deterministic tiebreakers) 

266 since the last modification to the graph. 

267 

268 If the pipeline graph has step definitions, the sort order is 

269 consistent with the step order. 

270 

271 This may return `False` if the graph *happens* to be sorted but `sort` 

272 was never called,. 

273 """ 

274 return self._sorted_keys is not None 

275 

276 def sort(self) -> None: 

277 """Sort this graph's nodes topologically with deterministic (but 

278 unspecified) tiebreakers. 

279 

280 This does nothing if the graph is already known to be sorted. 

281 """ 

282 if self._sorted_keys is None: 

283 try: 

284 sorted_keys: Sequence[NodeKey] = list(networkx.lexicographical_topological_sort(self._xgraph)) 

285 except networkx.NetworkXUnfeasible as err: # pragma: no cover 

286 # Should't be possible to get here, because we check for cycles 

287 # when adding tasks, but we guard against it anyway. 

288 cycle = networkx.find_cycle(self._xgraph) 

289 raise PipelineDataCycleError( 

290 f"Cycle detected while attempting to sort graph: {cycle}." 

291 ) from err 

292 self._reorder(sorted_keys) 

293 

294 def copy(self) -> PipelineGraph: 

295 """Return a copy of this graph that copies all mutable state.""" 

296 xgraph = self._xgraph.copy() 

297 step_definitions = self._step_definitions.copy() 

298 result = PipelineGraph.__new__(PipelineGraph) 

299 result._init_from_args( 

300 xgraph, 

301 self._sorted_keys, 

302 task_subsets={ 

303 k: TaskSubset(xgraph, v.label, set(v._members), v.description, step_definitions) 

304 for k, v in self._task_subsets.items() 

305 }, 

306 description=self._description, 

307 universe=self._universe, 

308 data_id=self._raw_data_id, 

309 step_definitions=step_definitions, 

310 ) 

311 return result 

312 

313 def __copy__(self) -> PipelineGraph: 

314 # Fully shallow copies are dangerous; we don't want shared mutable 

315 # state to lead to broken class invariants. 

316 return self.copy() 

317 

318 def __deepcopy__(self, memo: dict) -> PipelineGraph: 

319 # Genuine deep copies are unnecessary, since we should only ever care 

320 # that mutable state is copied. 

321 return self.copy() 

322 

323 def diff_tasks(self, other: PipelineGraph) -> list[str]: 

324 """Compare two pipeline graphs. 

325 

326 This only compares graph structure and task classes (including their 

327 edges). It does *not* compare full configuration (which is subject to 

328 spurious differences due to import-cache state), dataset type 

329 resolutions, or sort state. 

330 

331 Parameters 

332 ---------- 

333 other : `PipelineGraph` 

334 Graph to compare to. 

335 

336 Returns 

337 ------- 

338 differences : `list` [ `str` ] 

339 List of string messages describing differences between the 

340 pipelines. If empty, the graphs have the same tasks and 

341 connections. 

342 """ 

343 messages: list[str] = [] 

344 common_labels: Set[str] 

345 if self.tasks.keys() != other.tasks.keys(): 

346 common_labels = self.tasks.keys() & other.tasks.keys() 

347 messages.append( 

348 f"Pipelines have different tasks: A & ~B = {list(self.tasks.keys() - common_labels)}, " 

349 f"B & ~A = {list(other.tasks.keys() - common_labels)}." 

350 ) 

351 else: 

352 common_labels = self.tasks.keys() 

353 for label in common_labels: 

354 a = self.tasks[label] 

355 b = other.tasks[label] 

356 if a.task_class != b.task_class: 

357 messages.append( 

358 f"Task {label!r} has class {a.task_class_name} in A, but {b.task_class_name} in B." 

359 ) 

360 messages.extend(a.diff_edges(b)) 

361 return messages 

362 

363 def producing_edge_of(self, dataset_type_name: str) -> WriteEdge | None: 

364 """Return the `WriteEdge` that links the producing task to the named 

365 dataset type. 

366 

367 Parameters 

368 ---------- 

369 dataset_type_name : `str` 

370 Dataset type name. Must not be a component. 

371 

372 Returns 

373 ------- 

374 edge : `WriteEdge` or `None` 

375 Producing edge or `None` if there isn't one in this graph. 

376 

377 Raises 

378 ------ 

379 DuplicateOutputError 

380 Raised if there are multiple tasks defined to produce this dataset 

381 type. This is only possible if the graph's dataset types are not 

382 resolved. 

383 

384 Notes 

385 ----- 

386 On resolved graphs, it may be slightly more efficient to use:: 

387 

388 graph.dataset_types[dataset_type_name].producing_edge 

389 

390 but this method works on graphs with unresolved dataset types as well. 

391 """ 

392 producer: str | None = None 

393 producing_edge: WriteEdge | None = None 

394 for _, _, producing_edge in self._xgraph.in_edges( 

395 NodeKey(NodeType.DATASET_TYPE, dataset_type_name), data="instance" 

396 ): 

397 assert producing_edge is not None, "Should only be None if we never loop." 

398 if producer is not None: 

399 raise DuplicateOutputError( 

400 f"Dataset type {dataset_type_name!r} is produced by both {producing_edge.task_label!r} " 

401 f"and {producer!r}." 

402 ) 

403 return producing_edge 

404 

405 def consuming_edges_of(self, dataset_type_name: str) -> list[ReadEdge]: 

406 """Return the `ReadEdge` objects that link the named dataset type to 

407 the tasks that consume it. 

408 

409 Parameters 

410 ---------- 

411 dataset_type_name : `str` 

412 Dataset type name. Must not be a component. 

413 

414 Returns 

415 ------- 

416 edges : `list` [ `ReadEdge` ] 

417 Edges that connect this dataset type to the tasks that consume it. 

418 

419 Notes 

420 ----- 

421 On resolved graphs, it may be slightly more efficient to use:: 

422 

423 graph.dataset_types[dataset_type_name].producing_edges 

424 

425 but this method works on graphs with unresolved dataset types as well. 

426 """ 

427 return [ 

428 edge 

429 for _, _, edge in self._xgraph.out_edges( 

430 NodeKey(NodeType.DATASET_TYPE, dataset_type_name), data="instance" 

431 ) 

432 ] 

433 

434 def producer_of(self, dataset_type_name: str) -> TaskNode | TaskInitNode | None: 

435 """Return the `TaskNode` or `TaskInitNode` that writes the given 

436 dataset type. 

437 

438 Parameters 

439 ---------- 

440 dataset_type_name : `str` 

441 Dataset type name. Must not be a component. 

442 

443 Returns 

444 ------- 

445 edge : `TaskNode`, `TaskInitNode`, or `None` 

446 Producing node or `None` if there isn't one in this graph. 

447 

448 Raises 

449 ------ 

450 DuplicateOutputError 

451 Raised if there are multiple tasks defined to produce this dataset 

452 type. This is only possible if the graph's dataset types are not 

453 resolved. 

454 """ 

455 if (producing_edge := self.producing_edge_of(dataset_type_name)) is not None: 

456 return self._xgraph.nodes[producing_edge.task_key]["instance"] 

457 return None 

458 

459 def consumers_of(self, dataset_type_name: str) -> list[TaskNode | TaskInitNode]: 

460 """Return the `TaskNode` and/or `TaskInitNode` objects that read 

461 the given dataset type. 

462 

463 Parameters 

464 ---------- 

465 dataset_type_name : `str` 

466 Dataset type name. Must not be a component. 

467 

468 Returns 

469 ------- 

470 edges : `list` [ `ReadEdge` ] 

471 Edges that connect this dataset type to the tasks that consume it. 

472 

473 Notes 

474 ----- 

475 On resolved graphs, it may be slightly more efficient to use:: 

476 

477 graph.dataset_types[dataset_type_name].producing_edges 

478 

479 but this method works on graphs with unresolved dataset types as well. 

480 """ 

481 return [ 

482 self._xgraph.nodes[consuming_edge.task_key]["instance"] 

483 for consuming_edge in self.consuming_edges_of(dataset_type_name) 

484 ] 

485 

486 def inputs_of(self, task_label: str, init: bool = False) -> dict[str, DatasetTypeNode | None]: 

487 """Return the dataset types that are inputs to a task. 

488 

489 Parameters 

490 ---------- 

491 task_label : `str` 

492 Label for the task in the pipeline. 

493 init : `bool`, optional 

494 If `True`, return init-input dataset types instead of runtime 

495 (including prerequisite) inputs. 

496 

497 Returns 

498 ------- 

499 inputs : `dict` [ `str`, `DatasetTypeNode` or `None` ] 

500 Dictionary parent dataset type name keys and either 

501 `DatasetTypeNode` values (if the dataset type has been resolved) 

502 or `None` values. 

503 

504 Notes 

505 ----- 

506 To get the input edges of a task or task init node (which provide 

507 information about storage class overrides nd components) use:: 

508 

509 graph.tasks[task_label].iter_all_inputs() 

510 

511 or 

512 

513 graph.tasks[task_label].init.iter_all_inputs() 

514 

515 or the various mapping attributes of the `TaskNode` and `TaskInitNode` 

516 class. 

517 """ 

518 node: TaskNode | TaskInitNode = self.tasks[task_label] if not init else self.tasks[task_label].init 

519 return { 

520 edge.parent_dataset_type_name: self._xgraph.nodes[edge.dataset_type_key]["instance"] 

521 for edge in node.iter_all_inputs() 

522 } 

523 

524 def outputs_of( 

525 self, task_label: str, init: bool = False, include_automatic_connections: bool = True 

526 ) -> dict[str, DatasetTypeNode | None]: 

527 """Return the dataset types that are outputs of a task. 

528 

529 Parameters 

530 ---------- 

531 task_label : `str` 

532 Label for the task in the pipeline. 

533 init : `bool`, optional 

534 If `True`, return init-output dataset types instead of runtime 

535 outputs. 

536 include_automatic_connections : `bool`, optional 

537 Whether to include automatic connections such as configs, metadata, 

538 and logs. 

539 

540 Returns 

541 ------- 

542 outputs : `dict` [ `str`, `DatasetTypeNode` or `None` ] 

543 Dictionary parent dataset type name keys and either 

544 `DatasetTypeNode` values (if the dataset type has been resolved) 

545 or `None` values. 

546 

547 Notes 

548 ----- 

549 To get the input edges of a task or task init node (which provide 

550 information about storage class overrides nd components) use:: 

551 

552 graph.tasks[task_label].iter_all_outputs() 

553 

554 or 

555 

556 graph.tasks[task_label].init.iter_all_outputs() 

557 

558 or the various mapping attributes of the `TaskNode` and `TaskInitNode` 

559 class. 

560 """ 

561 node: TaskNode | TaskInitNode = self.tasks[task_label] if not init else self.tasks[task_label].init 

562 iterable = node.iter_all_outputs() if include_automatic_connections else node.outputs.values() 

563 return { 

564 edge.parent_dataset_type_name: self._xgraph.nodes[edge.dataset_type_key]["instance"] 

565 for edge in iterable 

566 } 

567 

568 def resolve( 

569 self, 

570 registry: Registry | None = None, 

571 dimensions: DimensionUniverse | None = None, 

572 dataset_types: Mapping[str, DatasetType] | None = None, 

573 visualization_only: bool = False, 

574 ) -> None: 

575 """Resolve all dimensions and dataset types and check them for 

576 consistency. 

577 

578 Resolving a graph also causes it to be sorted. 

579 

580 Parameters 

581 ---------- 

582 registry : `lsst.daf.butler.Registry`, optional 

583 Client for the data repository to resolve against. 

584 dimensions : `lsst.daf.butler.DimensionUniverse`, optional 

585 Definitions for all dimensions. Takes precedence over 

586 ``registry.dimensions`` if both are provided. If neither is 

587 provided, defaults to the default dimension universe 

588 (``lsst.daf.butler.DimensionUniverse()``). 

589 dataset_types : `~collection.abc.Mapping` [ `str`, \ 

590 `~lsst.daf.butler.DatasetType` ], optional 

591 Mapping of dataset types to consider registered. Takes precedence 

592 over ``registry.getDatasetType()`` if both are provided. 

593 visualization_only : `bool`, optional 

594 Resolve the graph as well as possible even when dimensions and 

595 storage classes cannot really be determined. This can include 

596 using the ``universe.commonSkyPix`` as the assumed dimensions of 

597 connections that use the "skypix" placeholder and using "<UNKNOWN>" 

598 as a storage class name (which will fail if the storage class 

599 itself is ever actually loaded). 

600 

601 Notes 

602 ----- 

603 The `universe` attribute is set to ``dimensions`` and used to set all 

604 `TaskNode.dimensions` attributes. Dataset type nodes are resolved by 

605 first looking for a registry definition, then using the producing 

606 task's definition, then looking for consistency between all consuming 

607 task definitions. 

608 

609 Raises 

610 ------ 

611 ConnectionTypeConsistencyError 

612 Raised if a prerequisite input for one task appears as a different 

613 kind of connection in any other task. 

614 DuplicateOutputError 

615 Raised if multiple tasks have the same dataset type as an output. 

616 IncompatibleDatasetTypeError 

617 Raised if different tasks have different definitions of a dataset 

618 type. Different but compatible storage classes are permitted. 

619 MissingDatasetTypeError 

620 Raised if a dataset type definition is required to exist in the 

621 data repository but none was found. This should only occur for 

622 dataset types that are not produced by a task in the pipeline and 

623 are consumed with different storage classes or as components by 

624 tasks in the pipeline. 

625 EdgesChangedError 

626 Raised if ``check_edges_unchanged=True`` and the edges of a task do 

627 change after import and reconfiguration. 

628 """ 

629 get_registered: Callable[[str], DatasetType | None] | None = None 

630 if dataset_types is not None: 

631 # Ruff seems confused about whether this is used below; it is! 

632 get_registered = dataset_types.get 

633 elif registry is not None: 

634 

635 def get_registered(name: str) -> DatasetType | None: 

636 try: 

637 return registry.getDatasetType(name) 

638 except MissingDatasetTypeError: 

639 return None 

640 

641 else: 

642 

643 def get_registered(name: str) -> None: 

644 return None 

645 

646 if dimensions is None: 

647 if registry is not None: 

648 dimensions = registry.dimensions 

649 else: 

650 dimensions = DimensionUniverse() 

651 

652 sort_keys_from_steps = self._resolve_step_flow() 

653 

654 node_key: NodeKey 

655 updates: dict[NodeKey, TaskNode | DatasetTypeNode] = {} 

656 for node_key, node_state in self._xgraph.nodes.items(): 

657 match node_key.node_type: 

658 case NodeType.TASK: 

659 task_node: TaskNode = node_state["instance"] 

660 new_task_node = task_node._resolved(dimensions) 

661 if new_task_node is not task_node: 

662 updates[node_key] = new_task_node 

663 case NodeType.DATASET_TYPE: 

664 dataset_type_node: DatasetTypeNode | None = node_state["instance"] 

665 new_dataset_type_node = DatasetTypeNode._from_edges( 

666 node_key, 

667 self._xgraph, 

668 get_registered, 

669 dimensions, 

670 previous=dataset_type_node, 

671 visualization_only=visualization_only, 

672 ) 

673 # Usage of `is`` here is intentional; `_from_edges` returns 

674 # `previous=dataset_type_node` if it can determine that it 

675 # doesn't need to change. 

676 if new_dataset_type_node is not dataset_type_node: 

677 updates[node_key] = new_dataset_type_node 

678 try: 

679 for node_key, node_value in updates.items(): 

680 self._xgraph.nodes[node_key]["instance"] = node_value 

681 except Exception as err: # pragma: no cover 

682 # There's no known way to get here, but we want to make it 

683 # clear it's a big problem if we do. 

684 raise PipelineGraphExceptionSafetyError( 

685 "Error during dataset type resolution has left the graph in an inconsistent state." 

686 ) from err 

687 

688 # If we get an error here, many graph nodes will have been resolved but 

689 # the steps will still be marked as unverified and unsorted. That's 

690 # still an acceptable state for the pipeline to be in. 

691 self._resolve_step_dimensions(dimensions) 

692 self._step_definitions._verified = True 

693 

694 if sort_keys_from_steps is not None: 

695 self._reorder(sort_keys_from_steps) 

696 else: 

697 self.sort() 

698 self._universe = dimensions 

699 

700 ########################################################################### 

701 # 

702 # Graph Modification Interface: 

703 # 

704 # - methods to add, remove, and replace tasks; 

705 # 

706 # - methods to add and remove task subsets. 

707 # 

708 # These are all things that are usually done in a Pipeline before making a 

709 # graph at all, but there may be cases where we want to modify the graph 

710 # instead. (These are also the methods used to make a graph from a 

711 # Pipeline, or make a graph from another graph.) 

712 # 

713 ########################################################################### 

714 

715 def add_task( 

716 self, 

717 label: str | None, 

718 task_class: type[PipelineTask], 

719 config: PipelineTaskConfig | None = None, 

720 connections: PipelineTaskConnections | None = None, 

721 ) -> TaskNode: 

722 """Add a new task to the graph. 

723 

724 Parameters 

725 ---------- 

726 label : `str` or `None` 

727 Label for the task in the pipeline. If `None`, `Task._DefaultName` 

728 is used. 

729 task_class : `type` [ `PipelineTask` ] 

730 Class object for the task. 

731 config : `PipelineTaskConfig`, optional 

732 Configuration for the task. If not provided, a default-constructed 

733 instance of ``task_class.ConfigClass`` is used. 

734 connections : `PipelineTaskConnections`, optional 

735 Object that describes the dataset types used by the task. If not 

736 provided, one will be constructed from the given configuration. If 

737 provided, it is assumed that ``config`` has already been validated 

738 and frozen. 

739 

740 Returns 

741 ------- 

742 node : `TaskNode` 

743 The new task node added to the graph. 

744 

745 Raises 

746 ------ 

747 ValueError 

748 Raised if configuration validation failed when constructing 

749 ``connections``. 

750 PipelineDataCycleError 

751 Raised if the graph is cyclic after this addition. 

752 RuntimeError 

753 Raised if an unexpected exception (which will be chained) occurred 

754 at a stage that may have left the graph in an inconsistent state. 

755 Other exceptions should leave the graph unchanged. 

756 

757 Notes 

758 ----- 

759 Checks for dataset type consistency and multiple producers do not occur 

760 until `resolve` is called, since the resolution depends on both the 

761 state of the data repository and all contributing tasks. 

762 

763 Adding new tasks removes any existing resolutions of all dataset types 

764 it references and marks the graph as unsorted. It is most efficient 

765 to add all tasks up front and only then resolve and/or sort the graph. 

766 """ 

767 if label is None: 

768 label = task_class._DefaultName 

769 if config is None: 

770 config = task_class.ConfigClass() 

771 _LOG.debug("Adding task %s %s to the pipeline graph", label, task_class) 

772 task_node = TaskNode._from_imported_data( 

773 key=NodeKey(NodeType.TASK, label), 

774 init_key=NodeKey(NodeType.TASK_INIT, label), 

775 data=_TaskNodeImportedData.configure(label, task_class, config, connections), 

776 universe=self._universe, 

777 ) 

778 self.add_task_nodes([task_node]) 

779 return task_node 

780 

781 def add_task_nodes(self, nodes: Iterable[TaskNode], parent: PipelineGraph | None = None) -> None: 

782 """Add one or more existing task nodes to the graph. 

783 

784 Parameters 

785 ---------- 

786 nodes : `~collections.abc.Iterable` [ `TaskNode` ] 

787 Iterable of task nodes to add. If any tasks have resolved 

788 dimensions, they must have the same dimension universe as the rest 

789 of the graph. 

790 parent : `PipelineGraph`, optional 

791 If provided, another `PipelineGraph` from which these nodes were 

792 obtained. Any dataset type nodes already present in ``parent`` 

793 that are referenced by the given tasks will be used in this graph 

794 if they are not already present, preserving any dataset type 

795 resolutions present in the parent graph. Adding nodes from a 

796 parent graph after the graph has its own nodes (e.g. from 

797 `add_task`) or nodes from a third graph may result in invalid 

798 dataset type resolutions. It is safest to only use this argument 

799 when populating an empty graph for the first time. 

800 

801 Raises 

802 ------ 

803 PipelineDataCycleError 

804 Raised if the graph is cyclic after this addition. 

805 

806 Notes 

807 ----- 

808 Checks for dataset type consistency and multiple producers do not occur 

809 until `resolve` is called, since the resolution depends on both the 

810 state of the data repository and all contributing tasks. 

811 

812 Adding new tasks removes any existing resolutions of all dataset types 

813 it references (unless ``parent is not None`` and marks the graph as 

814 unsorted. It is most efficient to add all tasks up front and only then 

815 resolve and/or sort the graph. 

816 """ 

817 node_data: list[tuple[NodeKey, dict[str, Any]]] = [] 

818 edge_data: list[tuple[NodeKey, NodeKey, str, dict[str, Any]]] = [] 

819 for task_node in nodes: 

820 task_node = task_node._resolved(self._universe) 

821 node_data.append( 

822 (task_node.key, {"instance": task_node, "bipartite": task_node.key.node_type.bipartite}) 

823 ) 

824 node_data.append( 

825 ( 

826 task_node.init.key, 

827 {"instance": task_node.init, "bipartite": task_node.init.key.node_type.bipartite}, 

828 ) 

829 ) 

830 # Convert the edge objects attached to the task node to networkx. 

831 for read_edge in task_node.init.iter_all_inputs(): 

832 self._append_graph_data_from_edge(node_data, edge_data, read_edge, parent=parent) 

833 for write_edge in task_node.init.iter_all_outputs(): 

834 self._append_graph_data_from_edge(node_data, edge_data, write_edge, parent=parent) 

835 for read_edge in task_node.iter_all_inputs(): 

836 self._append_graph_data_from_edge(node_data, edge_data, read_edge, parent=parent) 

837 for write_edge in task_node.iter_all_outputs(): 

838 self._append_graph_data_from_edge(node_data, edge_data, write_edge, parent=parent) 

839 # Add a special edge (with no Edge instance) that connects the 

840 # TaskInitNode to the runtime TaskNode. 

841 edge_data.append((task_node.init.key, task_node.key, Edge.INIT_TO_TASK_NAME, {"instance": None})) 

842 if not node_data and not edge_data: 

843 return 

844 # Checks and preparation complete; time to start the actual 

845 # modification, during which it's hard to provide strong exception 

846 # safety. Start by resetting the sort ordering, if there is one. 

847 self._reset() 

848 try: 

849 self._xgraph.add_nodes_from(node_data) 

850 self._xgraph.add_edges_from(edge_data) 

851 if not networkx.algorithms.dag.is_directed_acyclic_graph(self._xgraph): 

852 cycle = networkx.find_cycle(self._xgraph) 

853 raise PipelineDataCycleError(f"Cycle detected while adding tasks: {cycle}.") 

854 except Exception: 

855 # First try to roll back our changes. 

856 try: 

857 self._xgraph.remove_edges_from(edge_data) 

858 self._xgraph.remove_nodes_from(key for key, _ in node_data) 

859 except Exception as err: # pragma: no cover 

860 # There's no known way to get here, but we want to make it 

861 # clear it's a big problem if we do. 

862 raise PipelineGraphExceptionSafetyError( 

863 "Error while attempting to revert PipelineGraph modification has left the graph in " 

864 "an inconsistent state." 

865 ) from err 

866 # Successfully rolled back; raise the original exception. 

867 raise 

868 

869 def reconfigure_tasks( 

870 self, 

871 *args: tuple[str, PipelineTaskConfig], 

872 check_edges_unchanged: bool = False, 

873 assume_edges_unchanged: bool = False, 

874 **kwargs: PipelineTaskConfig, 

875 ) -> None: 

876 """Update the configuration for one or more tasks. 

877 

878 Parameters 

879 ---------- 

880 *args : `tuple` [ `str`, `.PipelineTaskConfig` ] 

881 Positional arguments are each a 2-tuple of task label and new 

882 config object. Note that the same arguments may also be passed as 

883 ``**kwargs``, which is usually more readable, but task labels in 

884 ``*args`` are not required to be valid Python identifiers. 

885 check_edges_unchanged : `bool`, optional 

886 If `True`, require the edges (connections) of the modified tasks to 

887 remain unchanged after the configuration updates, and verify that 

888 this is the case. 

889 assume_edges_unchanged : `bool`, optional 

890 If `True`, the caller declares that the edges (connections) of the 

891 modified tasks will remain unchanged after the configuration 

892 updates, and that it is unnecessary to check this. 

893 **kwargs : `.PipelineTaskConfig` 

894 New config objects or overrides to apply to copies of the current 

895 config objects, with task labels as the keywords. 

896 

897 Returns 

898 ------- 

899 None 

900 

901 Raises 

902 ------ 

903 ValueError 

904 Raised if ``assume_edges_unchanged`` and ``check_edges_unchanged`` 

905 are both `True`, or if the same task appears twice. 

906 EdgesChangedError 

907 Raised if ``check_edges_unchanged=True`` and the edges of a task do 

908 change. 

909 

910 Notes 

911 ----- 

912 If reconfiguring a task causes its edges to change, any dataset type 

913 nodes connected to that task (not just those whose edges have changed!) 

914 will be unresolved. 

915 """ 

916 new_configs: dict[str, PipelineTaskConfig] = {} 

917 for task_label, config_update in itertools.chain(args, kwargs.items()): 

918 if new_configs.setdefault(task_label, config_update) is not config_update: 

919 raise ValueError(f"Config for {task_label!r} provided more than once.") 

920 updates = { 

921 task_label: self.tasks[task_label]._reconfigured(config, rebuild=not assume_edges_unchanged) 

922 for task_label, config in new_configs.items() 

923 } 

924 self._replace_task_nodes( 

925 updates, 

926 check_edges_unchanged=check_edges_unchanged, 

927 assume_edges_unchanged=assume_edges_unchanged, 

928 message_header=( 

929 "Unexpected change in edges for task {task_label!r} from original config (A) to " 

930 "new configs (B):" 

931 ), 

932 ) 

933 

934 def remove_tasks( 

935 self, labels: Iterable[str], drop_from_subsets: bool = True 

936 ) -> list[tuple[TaskNode, set[str]]]: 

937 """Remove one or more tasks from the graph. 

938 

939 Parameters 

940 ---------- 

941 labels : `~collections.abc.Iterable` [ `str` ] 

942 Iterable of the labels of the tasks to remove. 

943 drop_from_subsets : `bool`, optional 

944 If `True`, drop each removed task from any subset in which it 

945 currently appears. If `False`, raise `PipelineGraphError` if any 

946 such subsets exist. 

947 

948 Returns 

949 ------- 

950 nodes_and_subsets : `list` [ `tuple` [ `TaskNode`, `set` [ `str` ] ] ] 

951 List of nodes removed and the labels of task subsets that 

952 referenced them. 

953 

954 Raises 

955 ------ 

956 PipelineGraphError 

957 Raised if ``drop_from_subsets`` is `False` and the task is still 

958 part of one or more subsets. 

959 

960 Notes 

961 ----- 

962 Removing a task will cause dataset nodes with no other referencing 

963 tasks to be removed. Any other dataset type nodes referenced by a 

964 removed task will be reset to an "unresolved" state. 

965 """ 

966 task_nodes_and_subsets = [] 

967 dataset_types: set[NodeKey] = set() 

968 nodes_to_remove = set() 

969 for label in labels: 

970 task_node: TaskNode = self._xgraph.nodes[NodeKey(NodeType.TASK, label)]["instance"] 

971 # Find task subsets that reference this task. 

972 referencing_subsets = { 

973 subset_label 

974 for subset_label, task_subset in self.task_subsets.items() 

975 if label in task_subset 

976 } 

977 if not drop_from_subsets and referencing_subsets: 

978 raise PipelineGraphError( 

979 f"Task {label!r} is still referenced by subset(s) {referencing_subsets}." 

980 ) 

981 task_nodes_and_subsets.append((task_node, referencing_subsets)) 

982 # Find dataset types referenced by this task. 

983 dataset_types.update(self._xgraph.predecessors(task_node.key)) 

984 dataset_types.update(self._xgraph.successors(task_node.key)) 

985 dataset_types.update(self._xgraph.predecessors(task_node.init.key)) 

986 dataset_types.update(self._xgraph.successors(task_node.init.key)) 

987 # Since there's an edge between the task and its init node, we'll 

988 # have added those two nodes here, too, and we don't want that. 

989 dataset_types.remove(task_node.init.key) 

990 dataset_types.remove(task_node.key) 

991 # Mark the task node and its init node for removal from the graph. 

992 nodes_to_remove.add(task_node.key) 

993 nodes_to_remove.add(task_node.init.key) 

994 # Process the referenced datasets to see which ones are orphaned and 

995 # need to be removed vs. just unresolved. 

996 nodes_to_unresolve = [] 

997 for dataset_type_key in dataset_types: 

998 related_tasks = set() 

999 related_tasks.update(self._xgraph.predecessors(dataset_type_key)) 

1000 related_tasks.update(self._xgraph.successors(dataset_type_key)) 

1001 related_tasks.difference_update(nodes_to_remove) 

1002 if not related_tasks: 

1003 nodes_to_remove.add(dataset_type_key) 

1004 else: 

1005 nodes_to_unresolve.append(dataset_type_key) 

1006 # Checks and preparation complete; time to start the actual 

1007 # modification, during which it's hard to provide strong exception 

1008 # safety. Start by resetting the sort ordering. 

1009 self._reset() 

1010 try: 

1011 for dataset_type_key in nodes_to_unresolve: 

1012 self._xgraph.nodes[dataset_type_key]["instance"] = None 

1013 for task_node, referencing_subsets in task_nodes_and_subsets: 

1014 for subset_label in referencing_subsets: 

1015 self._task_subsets[subset_label].remove(task_node.label) 

1016 self._xgraph.remove_nodes_from(nodes_to_remove) 

1017 except Exception as err: # pragma: no cover 

1018 # There's no known way to get here, but we want to make it 

1019 # clear it's a big problem if we do. 

1020 raise PipelineGraphExceptionSafetyError( 

1021 "Error during task removal has left the graph in an inconsistent state." 

1022 ) from err 

1023 return task_nodes_and_subsets 

1024 

1025 def add_task_subset(self, subset_label: str, task_labels: Iterable[str], description: str = "") -> None: 

1026 """Add a label for a set of tasks that are already in the pipeline. 

1027 

1028 Parameters 

1029 ---------- 

1030 subset_label : `str` 

1031 Label for this set of tasks. 

1032 task_labels : `~collections.abc.Iterable` [ `str` ] 

1033 Labels of the tasks to include in the set. All must already be 

1034 included in the graph. 

1035 description : `str`, optional 

1036 String description to associate with this label. 

1037 """ 

1038 subset = TaskSubset(self._xgraph, subset_label, set(task_labels), description, self._step_definitions) 

1039 self._task_subsets[subset_label] = subset 

1040 

1041 def remove_task_subset(self, subset_label: str) -> None: 

1042 """Remove a labeled set of tasks. 

1043 

1044 Parameters 

1045 ---------- 

1046 subset_label : `str` 

1047 Label for this set of tasks. 

1048 

1049 Notes 

1050 ----- 

1051 If this subset is a step, it is also removed from the step definitions. 

1052 """ 

1053 try: 

1054 self._step_definitions.remove(subset_label) 

1055 except KeyError: 

1056 pass 

1057 del self._task_subsets[subset_label] 

1058 

1059 ########################################################################### 

1060 # 

1061 # NetworkX Export Interface: 

1062 # 

1063 # - methods to export the PipelineGraph's content (or various subsets 

1064 # thereof) as NetworkX objects. 

1065 # 

1066 # These are particularly useful when writing tools to visualize the graph, 

1067 # while providing options for which aspects of the graph (tasks, dataset 

1068 # types, or both) to include, since all exported graphs have similar 

1069 # attributes regardless of their structure. 

1070 # 

1071 ########################################################################### 

1072 

1073 def make_xgraph(self) -> networkx.MultiDiGraph: 

1074 """Export a networkx representation of the full pipeline graph, 

1075 including both init and runtime edges. 

1076 

1077 Returns 

1078 ------- 

1079 xgraph : `networkx.MultiDiGraph` 

1080 Directed acyclic graph with parallel edges. 

1081 

1082 Notes 

1083 ----- 

1084 The returned graph uses `NodeKey` instances for nodes. Parallel edges 

1085 represent the same dataset type appearing in multiple connections for 

1086 the same task, and are hence rare. The connection name is used as the 

1087 edge key to disambiguate those parallel edges. 

1088 

1089 Almost all edges connect dataset type nodes to task or task init nodes 

1090 or vice versa, but there is also a special edge that connects each task 

1091 init node to its runtime node. The existence of these edges makes the 

1092 graph not quite bipartite, though its init-only and runtime-only 

1093 subgraphs are bipartite. 

1094 

1095 See `TaskNode`, `TaskInitNode`, `DatasetTypeNode`, `ReadEdge`, and 

1096 `WriteEdge` for the descriptive node and edge attributes added. 

1097 """ 

1098 return self._transform_xgraph_state(self._xgraph.copy(), skip_edges=False) 

1099 

1100 def make_bipartite_xgraph(self, init: bool = False) -> networkx.MultiDiGraph: 

1101 """Return a bipartite networkx representation of just the runtime or 

1102 init-time pipeline graph. 

1103 

1104 Parameters 

1105 ---------- 

1106 init : `bool`, optional 

1107 If `True` (`False` is default) return the graph of task 

1108 initialization nodes and init input/output dataset types, instead 

1109 of the graph of runtime task nodes and regular 

1110 input/output/prerequisite dataset types. 

1111 

1112 Returns 

1113 ------- 

1114 xgraph : `networkx.MultiDiGraph` 

1115 Directed acyclic graph with parallel edges. 

1116 

1117 Notes 

1118 ----- 

1119 The returned graph uses `NodeKey` instances for nodes. Parallel edges 

1120 represent the same dataset type appearing in multiple connections for 

1121 the same task, and are hence rare. The connection name is used as the 

1122 edge key to disambiguate those parallel edges. 

1123 

1124 This graph is bipartite because each dataset type node only has edges 

1125 that connect it to a task [init] node, and vice versa. 

1126 

1127 See `TaskNode`, `TaskInitNode`, `DatasetTypeNode`, `ReadEdge`, and 

1128 `WriteEdge` for the descriptive node and edge attributes added. 

1129 """ 

1130 return self._transform_xgraph_state( 

1131 self._make_bipartite_xgraph_internal(init).copy(), skip_edges=False 

1132 ) 

1133 

1134 def make_task_xgraph(self, init: bool = False) -> networkx.DiGraph: 

1135 """Return a networkx representation of just the tasks in the pipeline. 

1136 

1137 Parameters 

1138 ---------- 

1139 init : `bool`, optional 

1140 If `True` (`False` is default) return the graph of task 

1141 initialization nodes, instead of the graph of runtime task nodes. 

1142 

1143 Returns 

1144 ------- 

1145 xgraph : `networkx.DiGraph` 

1146 Directed acyclic graph with no parallel edges. 

1147 

1148 Notes 

1149 ----- 

1150 The returned graph uses `NodeKey` instances for nodes. The dataset 

1151 types that link these tasks are not represented at all; edges have no 

1152 attributes, and there are no parallel edges. 

1153 

1154 See `TaskNode` and `TaskInitNode` for the descriptive node and 

1155 attributes added. 

1156 """ 

1157 return self._transform_xgraph_state(self._make_task_xgraph_internal(init), skip_edges=True) 

1158 

1159 def make_dataset_type_xgraph(self, init: bool = False) -> networkx.DiGraph: 

1160 """Return a networkx representation of just the dataset types in the 

1161 pipeline. 

1162 

1163 Parameters 

1164 ---------- 

1165 init : `bool`, optional 

1166 If `True` (`False` is default) return the graph of init input and 

1167 output dataset types, instead of the graph of runtime (input, 

1168 output, prerequisite input) dataset types. 

1169 

1170 Returns 

1171 ------- 

1172 xgraph : `networkx.DiGraph` 

1173 Directed acyclic graph with no parallel edges. 

1174 

1175 Notes 

1176 ----- 

1177 The returned graph uses `NodeKey` instances for nodes. The tasks that 

1178 link these tasks are not represented at all; edges have no attributes, 

1179 and there are no parallel edges. 

1180 

1181 See `DatasetTypeNode` for the descriptive node and attributes added. 

1182 """ 

1183 bipartite_xgraph = self._make_bipartite_xgraph_internal(init) 

1184 dataset_type_keys = [ 

1185 key 

1186 for key, bipartite in bipartite_xgraph.nodes(data="bipartite") 

1187 if bipartite == NodeType.DATASET_TYPE.bipartite 

1188 ] 

1189 return self._transform_xgraph_state( 

1190 networkx.algorithms.bipartite.projected_graph( 

1191 networkx.DiGraph(bipartite_xgraph), dataset_type_keys 

1192 ), 

1193 skip_edges=True, 

1194 ) 

1195 

1196 ########################################################################### 

1197 # 

1198 # Expression-based Selection Interface. 

1199 # 

1200 ########################################################################### 

1201 

1202 def select_tasks(self, expression: str) -> set[str]: 

1203 """Return the tasks that match an expression. 

1204 

1205 Parameters 

1206 ---------- 

1207 expression : `str` 

1208 String expression to evaluate. See 

1209 :ref:`pipeline-graph-subset-expressions`. 

1210 

1211 Returns 

1212 ------- 

1213 task_labels : `set` [ `str` ] 

1214 Set of matching task labels. 

1215 """ 

1216 task_xgraph = self._make_task_xgraph_internal(init=False) 

1217 expr_tree = expressions.parse(expression) 

1218 matching_task_keys = self._select_expression(expr_tree, task_xgraph) 

1219 return {key.name for key in matching_task_keys} 

1220 

1221 def select(self, expression: str) -> PipelineGraph: 

1222 """Return a new pipeline graph with the tasks that match an expression. 

1223 

1224 Parameters 

1225 ---------- 

1226 expression : `str` 

1227 String expression to evaluate. See 

1228 :ref:`pipeline-graph-subset-expressions`. 

1229 

1230 Returns 

1231 ------- 

1232 new_graph : `PipelineGraph` 

1233 New pipeline graph with just the matching tasks. 

1234 

1235 Notes 

1236 ----- 

1237 All resolved dataset type nodes will be preserved. 

1238 

1239 If `has_been_sorted`, the new graph will be sorted as well. 

1240 

1241 Task subsets will not be included in the returned graph. 

1242 """ 

1243 selected_tasks = self.select_tasks(expression) 

1244 new_pipeline_graph = PipelineGraph(universe=self._universe, data_id=self._raw_data_id) 

1245 new_pipeline_graph.add_task_nodes( 

1246 [self.tasks[task_label] for task_label in selected_tasks], parent=self 

1247 ) 

1248 if self.has_been_sorted: 

1249 new_pipeline_graph.sort() 

1250 return new_pipeline_graph 

1251 

1252 ########################################################################### 

1253 # 

1254 # Serialization Interface. 

1255 # 

1256 # Serialization of PipelineGraphs is currently experimental and may not be 

1257 # retained in the future. All serialization methods are 

1258 # underscore-prefixed to ensure nobody mistakes them for a stable interface 

1259 # (let a lone a stable file format). 

1260 # 

1261 ########################################################################### 

1262 

1263 @classmethod 

1264 def _read_stream( 

1265 cls, stream: BinaryIO, import_mode: TaskImportMode = TaskImportMode.REQUIRE_CONSISTENT_EDGES 

1266 ) -> PipelineGraph: 

1267 """Read a serialized `PipelineGraph` from a file-like object. 

1268 

1269 Parameters 

1270 ---------- 

1271 stream : `BinaryIO` 

1272 File-like object opened for binary reading, containing 

1273 gzip-compressed JSON. 

1274 import_mode : `TaskImportMode`, optional 

1275 Whether to import tasks, and how to reconcile any differences 

1276 between the imported task's connections and the those that were 

1277 persisted with the graph. Default is to check that they are the 

1278 same. 

1279 

1280 Returns 

1281 ------- 

1282 graph : `PipelineGraph` 

1283 Deserialized pipeline graph. 

1284 

1285 Raises 

1286 ------ 

1287 PipelineGraphReadError 

1288 Raised if the serialized `PipelineGraph` is not self-consistent. 

1289 EdgesChangedError 

1290 Raised if ``import_mode`` is 

1291 `TaskImportMode.REQUIRED_CONSISTENT_EDGES` and the edges of a task 

1292 did change after import and reconfiguration. 

1293 

1294 Notes 

1295 ----- 

1296 `PipelineGraph` serialization is currently experimental and may be 

1297 removed or significantly changed in the future, with no deprecation 

1298 period. 

1299 """ 

1300 from .io import SerializedPipelineGraph 

1301 

1302 with gzip.open(stream, "rb") as uncompressed_stream: 

1303 data = json.load(uncompressed_stream) 

1304 serialized_graph = SerializedPipelineGraph.model_validate(data) 

1305 return serialized_graph.deserialize(import_mode) 

1306 

1307 @classmethod 

1308 def _read_uri( 

1309 cls, 

1310 uri: ResourcePathExpression, 

1311 import_mode: TaskImportMode = TaskImportMode.REQUIRE_CONSISTENT_EDGES, 

1312 ) -> PipelineGraph: 

1313 """Read a serialized `PipelineGraph` from a file at a URI. 

1314 

1315 Parameters 

1316 ---------- 

1317 uri : convertible to `lsst.resources.ResourcePath` 

1318 URI to a gzip-compressed JSON file containing a serialized pipeline 

1319 graph. 

1320 import_mode : `TaskImportMode`, optional 

1321 Whether to import tasks, and how to reconcile any differences 

1322 between the imported task's connections and the those that were 

1323 persisted with the graph. Default is to check that they are the 

1324 same. 

1325 

1326 Returns 

1327 ------- 

1328 graph : `PipelineGraph` 

1329 Deserialized pipeline graph. 

1330 

1331 Raises 

1332 ------ 

1333 PipelineGraphReadError 

1334 Raised if the serialized `PipelineGraph` is not self-consistent. 

1335 EdgesChangedError 

1336 Raised if ``import_mode`` is 

1337 `TaskImportMode.REQUIRED_CONSISTENT_EDGES` and the edges of a task 

1338 did change after import and reconfiguration. 

1339 

1340 Notes 

1341 ----- 

1342 `PipelineGraph` serialization is currently experimental and may be 

1343 removed or significantly changed in the future, with no deprecation 

1344 period. 

1345 """ 

1346 uri = ResourcePath(uri) 

1347 with uri.open("rb") as stream: 

1348 return cls._read_stream(cast(BinaryIO, stream), import_mode=import_mode) 

1349 

1350 def _write_stream(self, stream: BinaryIO) -> None: 

1351 """Write the pipeline to a file-like object. 

1352 

1353 Parameters 

1354 ---------- 

1355 stream 

1356 File-like object opened for binary writing. 

1357 

1358 Notes 

1359 ----- 

1360 `PipelineGraph` serialization is currently experimental and may be 

1361 removed or significantly changed in the future, with no deprecation 

1362 period. 

1363 

1364 The file format is gzipped JSON, and is intended to be human-readable, 

1365 but it should not be considered a stable public interface for outside 

1366 code, which should always use `PipelineGraph` methods (or at least the 

1367 `io.SerializedPipelineGraph` class) to read these files. 

1368 """ 

1369 from .io import SerializedPipelineGraph 

1370 

1371 with gzip.open(stream, mode="wb") as compressed_stream: 

1372 compressed_stream.write( 

1373 SerializedPipelineGraph.serialize(self).model_dump_json(exclude_defaults=True).encode("utf-8") 

1374 ) 

1375 

1376 def _write_uri(self, uri: ResourcePathExpression) -> None: 

1377 """Write the pipeline to a file given a URI. 

1378 

1379 Parameters 

1380 ---------- 

1381 uri : convertible to `lsst.resources.ResourcePath` 

1382 URI to write to . May have ``.json.gz`` or no extension (which 

1383 will cause a ``.json.gz`` extension to be added). 

1384 

1385 Notes 

1386 ----- 

1387 `PipelineGraph` serialization is currently experimental and may be 

1388 removed or significantly changed in the future, with no deprecation 

1389 period. 

1390 

1391 The file format is gzipped JSON, and is intended to be human-readable, 

1392 but it should not be considered a stable public interface for outside 

1393 code, which should always use `PipelineGraph` methods (or at least the 

1394 `io.SerializedPipelineGraph` class) to read these files. 

1395 """ 

1396 uri = ResourcePath(uri) 

1397 extension = uri.getExtension() 

1398 if not extension: 

1399 uri = uri.updatedExtension(".json.gz") 

1400 elif extension != ".json.gz": 

1401 raise ValueError("Expanded pipeline files should always have a .json.gz extension.") 

1402 with uri.open(mode="wb") as stream: 

1403 self._write_stream(cast(BinaryIO, stream)) 

1404 

1405 def _import_and_configure( 

1406 self, import_mode: TaskImportMode = TaskImportMode.REQUIRE_CONSISTENT_EDGES 

1407 ) -> None: 

1408 """Import the `PipelineTask` classes referenced by all task nodes and 

1409 update those nodes accordingly. 

1410 

1411 Parameters 

1412 ---------- 

1413 import_mode : `TaskImportMode`, optional 

1414 Whether to import tasks, and how to reconcile any differences 

1415 between the imported task's connections and the those that were 

1416 persisted with the graph. Default is to check that they are the 

1417 same. This method does nothing if this is 

1418 `TaskImportMode.DO_NOT_IMPORT`. 

1419 

1420 Raises 

1421 ------ 

1422 EdgesChangedError 

1423 Raised if ``import_mode`` is 

1424 `TaskImportMode.REQUIRED_CONSISTENT_EDGES` and the edges of a task 

1425 did change after import and reconfiguration. 

1426 

1427 Notes 

1428 ----- 

1429 This method shouldn't need to be called unless the graph was 

1430 deserialized without importing and configuring immediately, which is 

1431 not the default behavior (but it can greatly speed up deserialization). 

1432 If all tasks have already been imported this does nothing. 

1433 

1434 Importing and configuring a task can change its 

1435 `~TaskNode.task_class_name` or `~TaskClass.get_config_str` output, 

1436 usually because the software used to read a serialized graph is newer 

1437 than the software used to write it (e.g. a new config option has been 

1438 added, or the task was moved to a new module with a forwarding alias 

1439 left behind). These changes are allowed by 

1440 `TaskImportMode.REQUIRE_CONSISTENT_EDGES`. 

1441 

1442 If importing and configuring a task causes its edges to change, any 

1443 dataset type nodes linked to those edges will be reset to the 

1444 unresolved state. 

1445 """ 

1446 if import_mode is TaskImportMode.DO_NOT_IMPORT: 

1447 return 

1448 rebuild = ( 

1449 import_mode is TaskImportMode.REQUIRE_CONSISTENT_EDGES 

1450 or import_mode is TaskImportMode.OVERRIDE_EDGES 

1451 ) 

1452 updates: dict[str, TaskNode] = {} 

1453 node_key: NodeKey 

1454 for node_key, node_state in self._xgraph.nodes.items(): 

1455 if node_key.node_type is NodeType.TASK: 

1456 task_node: TaskNode = node_state["instance"] 

1457 new_task_node = task_node._imported_and_configured(rebuild) 

1458 if new_task_node is not task_node: 

1459 updates[task_node.label] = new_task_node 

1460 self._replace_task_nodes( 

1461 updates, 

1462 check_edges_unchanged=(import_mode is TaskImportMode.REQUIRE_CONSISTENT_EDGES), 

1463 assume_edges_unchanged=(import_mode is TaskImportMode.ASSUME_CONSISTENT_EDGES), 

1464 message_header=( 

1465 "In task with label {task_label!r}, persisted edges (A)" 

1466 "differ from imported and configured edges (B):" 

1467 ), 

1468 ) 

1469 

1470 ########################################################################### 

1471 # 

1472 # Advanced PipelineGraph Inspection Interface: 

1473 # 

1474 # - methods to iterate over all nodes and edges, utilizing NodeKeys; 

1475 # 

1476 # - methods to find overall inputs and group nodes by their dimensions, 

1477 # which are important operations for QuantumGraph generation. 

1478 # 

1479 ########################################################################### 

1480 

1481 def iter_edges(self, init: bool = False) -> Iterator[Edge]: 

1482 """Iterate over edges in the graph. 

1483 

1484 Parameters 

1485 ---------- 

1486 init : `bool`, optional 

1487 If `True` (`False` is default) iterate over the edges between task 

1488 initialization node and init input/output dataset types, instead of 

1489 the runtime task nodes and regular input/output/prerequisite 

1490 dataset types. 

1491 

1492 Returns 

1493 ------- 

1494 edges : `~collections.abc.Iterator` [ `Edge` ] 

1495 A lazy iterator over `Edge` (`WriteEdge` or `ReadEdge`) instances. 

1496 

1497 Notes 

1498 ----- 

1499 This method always returns _either_ init edges or runtime edges, never 

1500 both. The full (internal) graph that contains both also includes a 

1501 special edge that connects each task init node to its runtime node; 

1502 that is also never returned by this method, since it is never a part of 

1503 the init-only or runtime-only subgraphs. 

1504 """ 

1505 edge: Edge 

1506 for _, _, edge in self._xgraph.edges(data="instance"): 

1507 if edge is not None and edge.is_init == init: 

1508 yield edge 

1509 

1510 def iter_nodes( 

1511 self, 

1512 ) -> Iterator[ 

1513 tuple[Literal[NodeType.TASK_INIT], str, TaskInitNode] 

1514 | tuple[Literal[NodeType.TASK], str, TaskInitNode] 

1515 | tuple[Literal[NodeType.DATASET_TYPE], str, DatasetTypeNode | None] 

1516 ]: 

1517 """Iterate over nodes in the graph. 

1518 

1519 Returns 

1520 ------- 

1521 nodes : `~collections.abc.Iterator` [ `tuple` ] 

1522 A lazy iterator over all of the nodes in the graph. Each yielded 

1523 element is a tuple of: 

1524 

1525 - the node type enum value (`NodeType`); 

1526 - the string name for the node (task label or parent dataset type 

1527 name); 

1528 - the node value (`TaskNode`, `TaskInitNode`, `DatasetTypeNode`, 

1529 or `None` for dataset type nodes that have not been resolved). 

1530 """ 

1531 key: NodeKey 

1532 if self._sorted_keys is not None: 

1533 for key in self._sorted_keys: 

1534 yield key.node_type, key.name, self._xgraph.nodes[key]["instance"] # type: ignore 

1535 else: 

1536 for key, node in self._xgraph.nodes(data="instance"): 

1537 yield key.node_type, key.name, node # type: ignore 

1538 

1539 def iter_overall_inputs(self) -> Iterator[tuple[str, DatasetTypeNode | None]]: 

1540 """Iterate over all of the dataset types that are consumed but not 

1541 produced by the graph. 

1542 

1543 Returns 

1544 ------- 

1545 dataset_types : `~collections.abc.Iterator` [ `tuple` ] 

1546 A lazy iterator over the overall-input dataset types (including 

1547 overall init inputs and prerequisites). Each yielded element is a 

1548 tuple of: 

1549 

1550 - the parent dataset type name; 

1551 - the resolved `DatasetTypeNode`, or `None` if the dataset type has 

1552 not been resolved. 

1553 """ 

1554 for generation in networkx.algorithms.dag.topological_generations(self._xgraph): 

1555 key: NodeKey 

1556 for key in generation: 

1557 # While we expect all tasks to have at least one input and 

1558 # hence never appear in the first topological generation, that 

1559 # is not true of task init nodes. 

1560 if key.node_type is NodeType.DATASET_TYPE: 

1561 yield key.name, self._xgraph.nodes[key]["instance"] 

1562 return 

1563 

1564 def group_by_dimensions( 

1565 self, prerequisites: bool = False 

1566 ) -> dict[DimensionGroup, tuple[dict[str, TaskNode], dict[str, DatasetTypeNode]]]: 

1567 """Group this graph's tasks and dataset types by their dimensions. 

1568 

1569 Parameters 

1570 ---------- 

1571 prerequisites : `bool`, optional 

1572 If `True`, include prerequisite dataset types as well as regular 

1573 input and output datasets (including intermediates). 

1574 

1575 Returns 

1576 ------- 

1577 groups : `dict` [ `~lsst.daf.butler.DimensionGroup`, `tuple` ] 

1578 A dictionary of groups keyed by `~lsst.daf.butler.DimensionGroup`, 

1579 in which each value is a tuple of: 

1580 

1581 - a `dict` of `TaskNode` instances, keyed by task label 

1582 - a `dict` of `DatasetTypeNode` instances, keyed by 

1583 dataset type name. 

1584 

1585 that have those dimensions. 

1586 

1587 Notes 

1588 ----- 

1589 Init inputs and outputs are always included, but always have empty 

1590 dimensions and are hence are all grouped together. 

1591 """ 

1592 result: dict[DimensionGroup, tuple[dict[str, TaskNode], dict[str, DatasetTypeNode]]] = {} 

1593 next_new_value: tuple[dict[str, TaskNode], dict[str, DatasetTypeNode]] = ({}, {}) 

1594 for task_label, task_node in self.tasks.items(): 

1595 if task_node.dimensions is None: 

1596 raise UnresolvedGraphError(f"Task with label {task_label!r} has not been resolved.") 

1597 if (group := result.setdefault(task_node.dimensions, next_new_value)) is next_new_value: 

1598 next_new_value = ({}, {}) # make new lists for next time 

1599 group[0][task_node.label] = task_node 

1600 for dataset_type_name, dataset_type_node in self.dataset_types.items(): 

1601 if dataset_type_node is None: 

1602 raise UnresolvedGraphError(f"Dataset type {dataset_type_name!r} has not been resolved.") 

1603 if not dataset_type_node.is_prerequisite or prerequisites: 

1604 if ( 

1605 group := result.setdefault(dataset_type_node.dataset_type.dimensions, next_new_value) 

1606 ) is next_new_value: 

1607 next_new_value = ({}, {}) # make new lists for next time 

1608 group[1][dataset_type_node.name] = dataset_type_node 

1609 return result 

1610 

1611 def get_all_dimensions(self, prerequisites: bool = True) -> DimensionGroup: 

1612 """Return all dimensions used in this graph's tasks and dataset types. 

1613 

1614 Parameters 

1615 ---------- 

1616 prerequisites : `bool`, optional 

1617 If `False`, do not include the dimensions that are only used by 

1618 prerequisite input dataset types. 

1619 

1620 Returns 

1621 ------- 

1622 dimensions : `~lsst.daf.butler.DimensionGroup`. 

1623 All dimensions in this pipeline. 

1624 """ 

1625 return DimensionGroup.union( 

1626 *self.group_by_dimensions(prerequisites=prerequisites).keys(), 

1627 universe=self.universe, 

1628 ) 

1629 

1630 def split_independent(self) -> Iterable[PipelineGraph]: 

1631 """Iterate over independent subgraphs that together comprise this 

1632 pipeline graph. 

1633 

1634 Returns 

1635 ------- 

1636 subgraphs : `~collections.abc.Iterable` [ `PipelineGraph` ] 

1637 An iterable over component subgraphs that could be run 

1638 independently (they have only overall inputs in common). May be a 

1639 lazy iterator. 

1640 

1641 Notes 

1642 ----- 

1643 All resolved dataset type nodes will be preserved. 

1644 

1645 If there is only one component, ``self`` may be returned as the only 

1646 element in the iterable. 

1647 

1648 If `has_been_sorted`, all subgraphs will be sorted as well. 

1649 

1650 Task subsets will not be included in the returned graphs. 

1651 """ 

1652 # Having an overall input in common isn't enough to make subgraphs 

1653 # dependent on each other, so we want to look for connected component 

1654 # subgraphs of the task-only projected graph. 

1655 bipartite_xgraph = self._make_bipartite_xgraph_internal(init=False) 

1656 task_keys = { 

1657 key 

1658 for key, bipartite in bipartite_xgraph.nodes(data="bipartite") 

1659 if bipartite == NodeType.TASK.bipartite 

1660 } 

1661 task_xgraph = networkx.algorithms.bipartite.projected_graph( 

1662 networkx.DiGraph(bipartite_xgraph), task_keys 

1663 ) 

1664 # "Weakly" connected means connected in only one direction, which is 

1665 # the only kind of "connected" a DAG can ever be. 

1666 for component_task_keys in networkx.algorithms.weakly_connected_components(task_xgraph): 

1667 if component_task_keys == task_keys: 

1668 yield self 

1669 return 

1670 else: 

1671 component_subgraph = PipelineGraph(universe=self._universe, data_id=self._raw_data_id) 

1672 component_subgraph.add_task_nodes( 

1673 [self._xgraph.nodes[key]["instance"] for key in component_task_keys], parent=self 

1674 ) 

1675 if self.has_been_sorted: 

1676 component_subgraph.sort() 

1677 yield component_subgraph 

1678 

1679 ########################################################################### 

1680 # 

1681 # Data repository/collection initialization 

1682 # 

1683 ########################################################################### 

1684 

1685 @property 

1686 def packages_dataset_type(self) -> DatasetType: 

1687 """The special "packages" dataset type that records software versions. 

1688 

1689 This is not associated with a task and hence is 

1690 not considered part of the pipeline graph in other respects, but it 

1691 does get written with other provenance datasets. 

1692 """ 

1693 if self._universe is None: 

1694 raise UnresolvedGraphError( 

1695 "PipelineGraph must be resolved in order to get the packages dataset type." 

1696 ) 

1697 return DatasetType( 

1698 PACKAGES_INIT_OUTPUT_NAME, self._universe.empty, PACKAGES_INIT_OUTPUT_STORAGE_CLASS 

1699 ) 

1700 

1701 def register_dataset_types( 

1702 self, 

1703 butler: Butler, 

1704 include_packages: bool = True, 

1705 *, 

1706 include_inputs: bool = True, 

1707 include_configs: bool = True, 

1708 include_logs: bool = True, 

1709 ) -> None: 

1710 """Register all dataset types in a data repository. 

1711 

1712 Parameters 

1713 ---------- 

1714 butler : `~lsst.daf.butler.Butler` 

1715 Data repository client. 

1716 include_packages : `bool`, optional 

1717 Whether to include the special "packages" dataset type that records 

1718 software versions (this is not associated with a task and hence is 

1719 not considered part of the pipeline graph in other respects, but it 

1720 does get written with other provenance datasets). 

1721 include_inputs : `bool`, optional 

1722 Whether to register overall-input dataset types as well as outputs. 

1723 include_configs : `bool`, optional 

1724 Whether to register task config dataset types. 

1725 include_logs : `bool`, optional 

1726 Whether to register task log dataset types. 

1727 """ 

1728 dataset_types = { 

1729 node.name: node.dataset_type 

1730 for node in self.dataset_types.values() 

1731 if include_inputs or self.producer_of(node.name) is not None 

1732 } 

1733 if include_packages: 

1734 dataset_types[self.packages_dataset_type.name] = self.packages_dataset_type 

1735 if not include_configs: 

1736 for task_node in self.tasks.values(): 

1737 del dataset_types[task_node.init.config_output.dataset_type_name] 

1738 if not include_logs: 

1739 for task_node in self.tasks.values(): 

1740 if task_node.log_output is not None: 

1741 del dataset_types[task_node.log_output.dataset_type_name] 

1742 for dataset_type in dataset_types.values(): 

1743 butler.registry.registerDatasetType(dataset_type) 

1744 

1745 def check_dataset_type_registrations(self, butler: Butler, include_packages: bool = True) -> None: 

1746 """Check that dataset type registrations in a data repository match 

1747 the definitions in this pipeline graph. 

1748 

1749 Parameters 

1750 ---------- 

1751 butler : `~lsst.daf.butler.Butler` 

1752 Data repository client. 

1753 include_packages : `bool`, optional 

1754 Whether to include the special "packages" dataset type that records 

1755 software versions (this is not associated with a task and hence is 

1756 not considered part of the pipeline graph in other respects, but it 

1757 does get written with other provenance datasets). 

1758 

1759 Returns 

1760 ------- 

1761 None 

1762 

1763 Raises 

1764 ------ 

1765 lsst.daf.butler.MissingDatasetTypeError 

1766 Raised if one or more non-optional-input or output dataset types in 

1767 the pipeline is not registered at all. 

1768 lsst.daf.butler.ConflictingDefinitionError 

1769 Raised if the definition in the data repository is not identical 

1770 to the definition in the pipeline graph. 

1771 

1772 Notes 

1773 ----- 

1774 Note that dataset type definitions that are storage-class-conversion 

1775 compatible but not identical are not permitted by these checks, because 

1776 the expectation is that these differences are handled by `resolve`, 

1777 which makes the pipeline graph use the data repository definitions. 

1778 This method is intended to check that none of those definitions have 

1779 changed. 

1780 """ 

1781 dataset_types = [node.dataset_type for node in self.dataset_types.values()] 

1782 if include_packages: 

1783 dataset_types.append(self.packages_dataset_type) 

1784 missing_dataset_types: list[str] = [] 

1785 for dataset_type in dataset_types: 

1786 try: 

1787 expected = butler.registry.getDatasetType(dataset_type.name) 

1788 except MissingDatasetTypeError: 

1789 expected = None 

1790 if expected is None: 

1791 # The user probably forgot to register dataset types 

1792 # at least once (which should be an error), 

1793 # but we could also get here if this is an optional input for 

1794 # which no datasets were found in this repo (not an error). 

1795 if ( 

1796 not ( 

1797 self.producer_of(dataset_type.name) is None 

1798 and all( 

1799 self.tasks[input_edge.task_label].is_optional(input_edge.connection_name) 

1800 for input_edge in self.consuming_edges_of(dataset_type.name) 

1801 ) 

1802 ) 

1803 or dataset_type.name == PACKAGES_INIT_OUTPUT_NAME 

1804 ): 

1805 missing_dataset_types.append(dataset_type.name) 

1806 elif expected != dataset_type: 

1807 raise ConflictingDefinitionError( 

1808 f"DatasetType definition in registry has changed since the pipeline graph was resolved: " 

1809 f"{dataset_type} (graph) != {expected} (registry)." 

1810 ) 

1811 if missing_dataset_types: 

1812 plural = "s" if len(missing_dataset_types) != 1 else "" 

1813 raise MissingDatasetTypeError( 

1814 f"Missing dataset type definition{plural}: {', '.join(missing_dataset_types)}. " 

1815 "Dataset types have to be registered in advance (on the command-line, either via " 

1816 "`butler register-dataset-type` or the `--register-dataset-types` option to `pipetask run`." 

1817 ) 

1818 

1819 def instantiate_tasks( 

1820 self, 

1821 get_init_input: Callable[[DatasetType], Any] | None = None, 

1822 init_outputs: list[tuple[Any, DatasetType]] | None = None, 

1823 labels: Iterable[str] | None = None, 

1824 ) -> list[PipelineTask]: 

1825 """Instantiate all tasks in the pipeline. 

1826 

1827 Parameters 

1828 ---------- 

1829 get_init_input : `~collections.abc.Callable`, optional 

1830 Callable that accepts a single `~lsst.daf.butler.DatasetType` 

1831 parameter and returns the init-input dataset associated with that 

1832 dataset type. Must respect the storage class embedded in the type. 

1833 This is optional if the pipeline does not have any overall init 

1834 inputs. When a full butler is available, 

1835 `lsst.daf.butler.Butler.get` can be used directly here. 

1836 init_outputs : `list`, optional 

1837 A list of ``(obj, dataset type)`` init-output dataset pairs, to be 

1838 appended to in-place. Both the object and the dataset type will 

1839 correspond to the storage class of the output connection, which 

1840 may not be the same as the storage class on the graph's dataset 

1841 type node. 

1842 labels : `~collections.abc.Iterable` [ `str` ], optional 

1843 The labels of tasks to instantiate. If not provided, all tasks in 

1844 the graph will be instantiated. 

1845 

1846 Returns 

1847 ------- 

1848 tasks : `list` 

1849 Constructed `PipelineTask` instances. 

1850 """ 

1851 if not self.is_fully_resolved: 

1852 raise UnresolvedGraphError("Pipeline graph must be fully resolved before instantiating tasks.") 

1853 empty_data_id = DataCoordinate.make_empty(self.universe) 

1854 labels = set(labels) if labels is not None else self.tasks.keys() 

1855 handles: dict[str, InMemoryDatasetHandle] = {} 

1856 tasks: list[PipelineTask] = [] 

1857 for task_node in self.tasks.values(): 

1858 if task_node.label not in labels: 

1859 continue 

1860 task_init_inputs: dict[str, Any] = {} 

1861 for read_edge in task_node.init.inputs.values(): 

1862 if (handle := handles.get(read_edge.dataset_type_name)) is not None: 

1863 obj = handle.get(storageClass=read_edge.storage_class_name) 

1864 elif ( 

1865 read_edge.component is not None 

1866 and (parent_handle := handles.get(read_edge.parent_dataset_type_name)) is not None 

1867 ): 

1868 obj = parent_handle.get( 

1869 storageClass=read_edge.storage_class_name, component=read_edge.component 

1870 ) 

1871 else: 

1872 dataset_type_node = self.dataset_types[read_edge.parent_dataset_type_name] 

1873 if get_init_input is None: 

1874 raise ValueError( 

1875 f"Task {task_node.label!r} requires init-input " 

1876 f"{read_edge.dataset_type_name} but no 'get_init_input' callback was provided." 

1877 ) 

1878 obj = get_init_input(read_edge.adapt_dataset_type(dataset_type_node.dataset_type)) 

1879 n_consumers = len(self.consumers_of(dataset_type_node.name)) 

1880 if ( 

1881 n_consumers > 1 

1882 and read_edge.component is None 

1883 and read_edge.storage_class_name == dataset_type_node.storage_class_name 

1884 ): 

1885 # Caching what we just got is safe in general only 

1886 # if there was no storage class conversion, since 

1887 # a->b and a->c does not imply b->c. 

1888 handles[read_edge.dataset_type_name] = InMemoryDatasetHandle( 

1889 obj, 

1890 storageClass=dataset_type_node.storage_class, 

1891 dataId=empty_data_id, 

1892 copy=True, 

1893 ) 

1894 task_init_inputs[read_edge.connection_name] = obj 

1895 task = task_node.task_class( 

1896 config=task_node.config, initInputs=task_init_inputs, name=task_node.label 

1897 ) 

1898 tasks.append(task) 

1899 for write_edge in task_node.init.outputs.values(): 

1900 dataset_type_node = self.dataset_types[write_edge.parent_dataset_type_name] 

1901 obj = getattr(task, write_edge.connection_name) 

1902 # We don't immediately coerce obj to the dataset_type_node 

1903 # storage class (which should be the repo storage class, if 

1904 # there is one) when appending to `init_outputs` because a 

1905 # formatter might be able to do a better job of that later; 

1906 # instead we pair it with a dataset type that's consistent with 

1907 # the in-memory type. We do coerce when populating `handles`, 

1908 # though, because going through the dataset_type_node storage 

1909 # class is the conversion path we checked when we resolved the 

1910 # pipeline graph. 

1911 if init_outputs is not None: 

1912 init_outputs.append((obj, write_edge.adapt_dataset_type(dataset_type_node.dataset_type))) 

1913 n_consumers = len(self.consumers_of(dataset_type_node.name)) 

1914 if n_consumers > 0: 

1915 handles[dataset_type_node.name] = InMemoryDatasetHandle( 

1916 dataset_type_node.storage_class.coerce_type(obj), 

1917 dataId=empty_data_id, 

1918 storageClass=dataset_type_node.storage_class, 

1919 copy=(n_consumers > 1), 

1920 ) 

1921 return tasks 

1922 

1923 def write_init_outputs(self, butler: Butler) -> None: 

1924 """Write the init-output datasets for all tasks in the pipeline graph. 

1925 

1926 Parameters 

1927 ---------- 

1928 butler : `lsst.daf.butler.Butler` 

1929 A full butler data repository client with its default run set 

1930 to the collection where datasets should be written. 

1931 

1932 Notes 

1933 ----- 

1934 Datasets that already exist in the butler's output run collection will 

1935 not be written. 

1936 

1937 This method writes outputs with new random dataset IDs and should 

1938 hence only be used when writing init-outputs prior to building a 

1939 `QuantumGraph`. Use `QuantumGraph.write_init_outputs` if a quantum 

1940 graph has already been built. 

1941 """ 

1942 init_outputs: list[tuple[Any, DatasetType]] = [] 

1943 self.instantiate_tasks(butler.get, init_outputs) 

1944 found_refs: dict[str, DatasetRef] = {} 

1945 to_put: list[tuple[Any, DatasetType]] = [] 

1946 for obj, dataset_type in init_outputs: 

1947 if (ref := butler.find_dataset(dataset_type, collections=butler.run)) is not None: 

1948 found_refs[dataset_type.name] = ref 

1949 else: 

1950 to_put.append((obj, dataset_type)) 

1951 for ref, stored in butler.stored_many(found_refs.values()).items(): 

1952 if not stored: 

1953 raise FileNotFoundError( 

1954 f"Init-output dataset {ref.datasetType.name!r} was found in RUN {ref.run!r} " 

1955 f"but had not actually been stored (or was stored and later deleted)." 

1956 ) 

1957 for obj, dataset_type in to_put: 

1958 butler.put(obj, dataset_type) 

1959 

1960 def write_configs(self, butler: Butler) -> None: 

1961 """Write the config datasets for all tasks in the pipeline graph. 

1962 

1963 Parameters 

1964 ---------- 

1965 butler : `lsst.daf.butler.Butler` 

1966 A full butler data repository client with its default run set 

1967 to the collection where datasets should be written. 

1968 

1969 Notes 

1970 ----- 

1971 Config datasets that already exist in the butler's output run 

1972 collection will be checked for consistency. 

1973 

1974 This method writes outputs with new random dataset IDs and should 

1975 hence only be used when writing init-outputs prior to building a 

1976 `QuantumGraph`. Use `QuantumGraph.write_configs` if a quantum graph 

1977 has already been built. 

1978 

1979 Raises 

1980 ------ 

1981 lsst.daf.butler.registry.ConflictingDefinitionError 

1982 Raised if a config dataset already exists and is not consistent 

1983 with the config in the pipeline graph. 

1984 """ 

1985 to_put: list[tuple[PipelineTaskConfig, str]] = [] 

1986 for task_node in self.tasks.values(): 

1987 dataset_type_name = task_node.init.config_output.dataset_type_name 

1988 if (ref := butler.find_dataset(dataset_type_name, collections=butler.run)) is not None: 

1989 old_config = butler.get(ref) 

1990 if not task_node.config.compare(old_config, shortcut=False, output=log_config_mismatch): 

1991 raise ConflictingDefinitionError( 

1992 f"Config does not match existing task config {dataset_type_name!r} in " 

1993 "butler; tasks configurations must be consistent within the same run collection" 

1994 ) 

1995 else: 

1996 to_put.append((task_node.config, dataset_type_name)) 

1997 # We do writes at the end to minimize the mess we leave behind when we 

1998 # raise an exception. 

1999 for config, dataset_type_name in to_put: 

2000 butler.put(config, dataset_type_name) 

2001 

2002 def write_packages(self, butler: Butler) -> None: 

2003 """Write the 'packages' dataset for the currently-active software 

2004 versions. 

2005 

2006 Parameters 

2007 ---------- 

2008 butler : `lsst.daf.butler.Butler` 

2009 A full butler data repository client with its default run set 

2010 to the collection where datasets should be written. 

2011 

2012 Notes 

2013 ----- 

2014 If the packages dataset already exists, it will be compared to the 

2015 versions in the current packages. New packages that weren't present 

2016 before are not considered an inconsistency. 

2017 

2018 This method writes outputs with new random dataset IDs and should 

2019 hence only be used when writing init-outputs prior to building a 

2020 `QuantumGraph`. Use `QuantumGraph.write_packages` if a quantum graph 

2021 has already been built. 

2022 

2023 Raises 

2024 ------ 

2025 lsst.daf.butler.registry.ConflictingDefinitionError 

2026 Raised if the packages dataset already exists and is not consistent 

2027 with the current packages. 

2028 """ 

2029 new_packages = Packages.fromSystem() 

2030 if (ref := butler.find_dataset(self.packages_dataset_type)) is not None: 

2031 packages = butler.get(ref) 

2032 if compare_packages(packages, new_packages): 

2033 # have to remove existing dataset first; butler has no 

2034 # replace option. 

2035 butler.pruneDatasets([ref], unstore=True, purge=True) 

2036 butler.put(packages, ref) 

2037 else: 

2038 butler.put(new_packages, self.packages_dataset_type) 

2039 

2040 def init_output_run(self, butler: Butler) -> None: 

2041 """Initialize a new output RUN collection by writing init-output 

2042 datasets (including configs and packages). 

2043 

2044 Parameters 

2045 ---------- 

2046 butler : `lsst.daf.butler.Butler` 

2047 A full butler data repository client with its default run set 

2048 to the collection where datasets should be written. 

2049 """ 

2050 self.write_configs(butler) 

2051 self.write_packages(butler) 

2052 self.write_init_outputs(butler) 

2053 

2054 ########################################################################### 

2055 # 

2056 # Class- and Package-Private Methods. 

2057 # 

2058 ########################################################################### 

2059 

2060 def _iter_task_defs(self) -> Iterator[TaskDef]: 

2061 """Iterate over this pipeline as a sequence of `TaskDef` instances. 

2062 

2063 Notes 

2064 ----- 

2065 This is a package-private method intended to aid in the transition to a 

2066 codebase more fully integrated with the `PipelineGraph` class, in which 

2067 both `TaskDef` and `PipelineDatasetTypes` are expected to go away, and 

2068 much of the functionality on the `Pipeline` class will be moved to 

2069 `PipelineGraph` as well. 

2070 

2071 Raises 

2072 ------ 

2073 TaskNotImportedError 

2074 Raised if `TaskNode.is_imported` is `False` for any task. 

2075 """ 

2076 from ..pipeline import TaskDef 

2077 

2078 for node in self._tasks.values(): 

2079 yield TaskDef( 

2080 config=node.config, 

2081 taskClass=node.task_class, 

2082 label=node.label, 

2083 connections=node.get_connections(), 

2084 ) 

2085 

2086 def _init_from_args( 

2087 self, 

2088 xgraph: networkx.MultiDiGraph | None, 

2089 sorted_keys: Sequence[NodeKey] | None, 

2090 task_subsets: dict[str, TaskSubset] | None, 

2091 description: str, 

2092 universe: DimensionUniverse | None, 

2093 data_id: DataId | None, 

2094 step_definitions: StepDefinitions, 

2095 ) -> None: 

2096 """Initialize the graph with possibly-nontrivial arguments. 

2097 

2098 Parameters 

2099 ---------- 

2100 xgraph : `networkx.MultiDiGraph` or `None` 

2101 The backing networkx graph, or `None` to create an empty one. 

2102 This graph has `NodeKey` instances for nodes and the same structure 

2103 as the graph exported by `make_xgraph`, but its nodes and edges 

2104 have a single ``instance`` attribute that holds a `TaskNode`, 

2105 `TaskInitNode`, `DatasetTypeNode` (or `None`), `ReadEdge`, or 

2106 `WriteEdge` instance. 

2107 sorted_keys : `~collections.abc.Sequence` [ `NodeKey` ] or `None` 

2108 Topologically sorted sequence of node keys, or `None` if the graph 

2109 is not sorted. 

2110 task_subsets : `dict` [ `str`, `TaskSubset` ] 

2111 Labeled subsets of tasks. Values must be constructed with 

2112 ``xgraph`` as their parent graph. 

2113 description : `str` 

2114 String description for this pipeline. 

2115 universe : `lsst.daf.butler.DimensionUniverse` or `None` 

2116 Definitions of all dimensions. 

2117 data_id : `lsst.daf.butler.DataCoordinate` or other data ID mapping. 

2118 Data ID that represents a constraint on all quanta generated from 

2119 this pipeline. 

2120 step_definitions : `StepDefinitions` 

2121 Struct holding information about steps. 

2122 

2123 Notes 

2124 ----- 

2125 Only empty `PipelineGraph` instances should be constructed directly by 

2126 users, which sets the signature of ``__init__`` itself, but methods on 

2127 `PipelineGraph` and its helper classes need to be able to create them 

2128 with state. Those methods can call this after calling ``__new__`` 

2129 manually, skipping ``__init__``. 

2130 """ 

2131 self._xgraph = xgraph if xgraph is not None else networkx.MultiDiGraph() 

2132 self._sorted_keys: Sequence[NodeKey] | None = None 

2133 self._task_subsets = task_subsets if task_subsets is not None else {} 

2134 self._description = description 

2135 self._tasks = TaskMappingView(self._xgraph) 

2136 self._dataset_types = DatasetTypeMappingView(self._xgraph) 

2137 self._raw_data_id: dict[str, Any] 

2138 if isinstance(data_id, DataCoordinate): 

2139 if universe is None: 

2140 universe = data_id.universe 

2141 else: 

2142 assert universe is data_id.universe, "data_id.universe and given universe differ" 

2143 self._raw_data_id = dict(data_id.required) 

2144 elif data_id is None: 

2145 self._raw_data_id = {} 

2146 else: 

2147 self._raw_data_id = dict(data_id) 

2148 self._universe = universe 

2149 if sorted_keys is not None: 

2150 self._reorder(sorted_keys) 

2151 self._step_definitions = step_definitions 

2152 

2153 def _make_bipartite_xgraph_internal(self, init: bool) -> networkx.MultiDiGraph: 

2154 """Make a bipartite init-only or runtime-only internal subgraph. 

2155 

2156 See `make_bipartite_xgraph` for parameters and return values. 

2157 

2158 Notes 

2159 ----- 

2160 This method returns a view of the `PipelineGraph` object's internal 

2161 backing graph, and hence should only be called in methods that copy the 

2162 result either explicitly or by running a copying algorithm before 

2163 returning it to the user. 

2164 """ 

2165 return self._xgraph.edge_subgraph([edge.key for edge in self.iter_edges(init)]) 

2166 

2167 def _make_task_xgraph_internal(self, init: bool) -> networkx.DiGraph: 

2168 """Make a init-only or runtime-only internal task subgraph. 

2169 

2170 See `make_task_xgraph` for parameters and return values. 

2171 

2172 Notes 

2173 ----- 

2174 This method returns a view of the `PipelineGraph` object's internal 

2175 backing graph, and hence should only be called in methods that copy the 

2176 result either explicitly or by running a copying algorithm before 

2177 returning it to the user. 

2178 """ 

2179 bipartite_xgraph = self._make_bipartite_xgraph_internal(init=init) 

2180 task_keys = [ 

2181 key 

2182 for key, bipartite in bipartite_xgraph.nodes(data="bipartite") 

2183 if bipartite == NodeType.TASK.bipartite 

2184 ] 

2185 return networkx.algorithms.bipartite.projected_graph(networkx.DiGraph(bipartite_xgraph), task_keys) 

2186 

2187 def _transform_xgraph_state[G: networkx.DiGraph | networkx.MultiDiGraph]( 

2188 self, xgraph: G, skip_edges: bool 

2189 ) -> G: 

2190 """Transform networkx graph attributes in-place from the internal 

2191 "instance" attributes to the documented exported attributes. 

2192 

2193 Parameters 

2194 ---------- 

2195 xgraph : `networkx.DiGraph` or `networkx.MultiDiGraph` 

2196 Graph whose state should be transformed. 

2197 skip_edges : `bool` 

2198 If `True`, do not transform edge state. 

2199 

2200 Returns 

2201 ------- 

2202 xgraph : `networkx.DiGraph` or `networkx.MultiDiGraph` 

2203 The same object passed in, after modification. 

2204 

2205 Notes 

2206 ----- 

2207 This should be called after making a copy of the internal graph but 

2208 before any projection down to just task or dataset type nodes, since 

2209 it assumes stateful edges. 

2210 """ 

2211 state: dict[str, Any] 

2212 for state in xgraph.nodes.values(): 

2213 node_value: TaskInitNode | TaskNode | DatasetTypeNode | None = state.pop("instance") 

2214 if node_value is not None: 

2215 state.update(node_value._to_xgraph_state()) 

2216 else: 

2217 # This is a dataset type node that is not resolved. 

2218 state["bipartite"] = NodeType.DATASET_TYPE.bipartite 

2219 if not skip_edges: 

2220 for _, _, state in xgraph.edges(data=True): 

2221 edge: Edge | None = state.pop("instance", None) 

2222 if edge is not None: 

2223 state.update(edge._to_xgraph_state()) 

2224 return xgraph 

2225 

2226 def _replace_task_nodes( 

2227 self, 

2228 updates: Mapping[str, TaskNode], 

2229 check_edges_unchanged: bool, 

2230 assume_edges_unchanged: bool, 

2231 message_header: str, 

2232 ) -> None: 

2233 """Replace task nodes and update edges and dataset type nodes 

2234 accordingly. 

2235 

2236 Parameters 

2237 ---------- 

2238 updates : `~collections.abc.Mapping` [ `str`, `TaskNode` ] 

2239 New task nodes with task label keys. All keys must be task labels 

2240 that are already present in the graph. 

2241 check_edges_unchanged : `bool`, optional 

2242 If `True`, require the edges (connections) of the modified tasks to 

2243 remain unchanged after importing and configuring each task, and 

2244 verify that this is the case. 

2245 assume_edges_unchanged : `bool`, optional 

2246 If `True`, the caller declares that the edges (connections) of the 

2247 modified tasks will remain unchanged importing and configuring each 

2248 task, and that it is unnecessary to check this. 

2249 message_header : `str` 

2250 Template for `str.format` with a single ``task_label`` placeholder 

2251 to use as the first line in `EdgesChangedError` messages that show 

2252 the differences between new task edges and old task edges. Should 

2253 include the fact that the rest of the message will refer to the old 

2254 task as "A" and the new task as "B", and end with a colon. 

2255 

2256 Raises 

2257 ------ 

2258 ValueError 

2259 Raised if ``assume_edges_unchanged`` and ``check_edges_unchanged`` 

2260 are both `True`, or if a full config is provided for a task after 

2261 another full config or an override has already been provided. 

2262 EdgesChangedError 

2263 Raised if ``check_edges_unchanged=True`` and the edges of a task do 

2264 change. 

2265 """ 

2266 deep: dict[str, TaskNode] = {} 

2267 shallow: dict[str, TaskNode] = {} 

2268 if assume_edges_unchanged: 

2269 if check_edges_unchanged: 

2270 raise ValueError("Cannot simultaneously assume and check that edges have not changed.") 

2271 shallow.update(updates) 

2272 else: 

2273 for task_label, new_task_node in updates.items(): 

2274 old_task_node = self.tasks[task_label] 

2275 messages = old_task_node.diff_edges(new_task_node) 

2276 if messages: 

2277 if check_edges_unchanged: 

2278 messages.insert(0, message_header.format(task_label=task_label)) 

2279 raise EdgesChangedError("\n".join(messages)) 

2280 else: 

2281 deep[task_label] = new_task_node 

2282 else: 

2283 shallow[task_label] = new_task_node 

2284 try: 

2285 if deep: 

2286 removed = self.remove_tasks(deep.keys(), drop_from_subsets=True) 

2287 self.add_task_nodes(deep.values()) 

2288 for replaced_task_node, referencing_subsets in removed: 

2289 for subset_label in referencing_subsets: 

2290 self._task_subsets[subset_label].add(replaced_task_node.label) 

2291 for task_node in shallow.values(): 

2292 self._xgraph.nodes[task_node.key]["instance"] = task_node 

2293 self._xgraph.nodes[task_node.init.key]["instance"] = task_node.init 

2294 except PipelineGraphExceptionSafetyError: # pragma: no cover 

2295 raise 

2296 except Exception as err: # pragma: no cover 

2297 # There's no known way to get here, but we want to make it clear 

2298 # it's a big problem if we do. 

2299 raise PipelineGraphExceptionSafetyError( 

2300 "Error while replacing tasks has left the graph in an inconsistent state." 

2301 ) from err 

2302 

2303 def _append_graph_data_from_edge( 

2304 self, 

2305 node_data: list[tuple[NodeKey, dict[str, Any]]], 

2306 edge_data: list[tuple[NodeKey, NodeKey, str, dict[str, Any]]], 

2307 edge: Edge, 

2308 parent: PipelineGraph | None, 

2309 ) -> None: 

2310 """Append networkx state dictionaries for an edge and the corresponding 

2311 dataset type node. 

2312 

2313 Parameters 

2314 ---------- 

2315 node_data : `list` 

2316 List of node keys and state dictionaries. A node is appended if 

2317 one does not already exist for this dataset type. 

2318 edge_data : `list` 

2319 List of node key pairs, connection names, and state dictionaries 

2320 for edges. 

2321 edge : `Edge` 

2322 New edge being processed. 

2323 parent : `PipelineGraph` or `None` 

2324 Another pipeline graph whose dataset type nodes should be used 

2325 when present. 

2326 """ 

2327 new_dataset_type_node = None 

2328 if parent is not None: 

2329 new_dataset_type_node = parent._xgraph.nodes[edge.dataset_type_key].get("instance") 

2330 if (existing_dataset_type_state := self._xgraph.nodes.get(edge.dataset_type_key)) is not None: 

2331 existing_dataset_type_state["instance"] = new_dataset_type_node 

2332 else: 

2333 node_data.append( 

2334 ( 

2335 edge.dataset_type_key, 

2336 { 

2337 "instance": new_dataset_type_node, 

2338 "bipartite": NodeType.DATASET_TYPE.bipartite, 

2339 }, 

2340 ) 

2341 ) 

2342 edge_data.append( 

2343 edge.nodes 

2344 + ( 

2345 edge.connection_name, 

2346 {"instance": edge}, 

2347 ) 

2348 ) 

2349 

2350 def _reorder(self, sorted_keys: Sequence[NodeKey]) -> None: 

2351 """Set the order of all views of this graph from the given sorted 

2352 sequence of task labels and dataset type names. 

2353 """ 

2354 self._sorted_keys = sorted_keys 

2355 self._tasks._reorder(sorted_keys) 

2356 self._dataset_types._reorder(sorted_keys) 

2357 

2358 def _reset(self) -> None: 

2359 """Reset all views of this graph following a modification that might 

2360 invalidate them. 

2361 """ 

2362 self._sorted_keys = None 

2363 self._tasks._reset() 

2364 self._dataset_types._reset() 

2365 

2366 def _resolve_step_flow(self) -> Sequence[NodeKey] | None: 

2367 """Check that step definitions are consistent with the ordering of 

2368 the graph's nodes and that they partition the task graph. 

2369 

2370 Returns 

2371 ------- 

2372 sort_keys : `~collections.abc.Sequence` [ `NodeKey` ] or `None` 

2373 Sort order for the pipeline graph that is consistent with the 

2374 step order, or `None` if there were no steps defined for this 

2375 pipeline. 

2376 """ 

2377 if not self._step_definitions: 

2378 return None 

2379 task_labels_so_far: set[str] = set() 

2380 # Inputs we've already seen, and the tasks that wanted them: 

2381 inputs_so_far: dict[str, set[str]] = {} 

2382 sort_keys: list[NodeKey] = [] 

2383 keys_already_sorted: set[NodeKey] = set() 

2384 for step_label in self.steps: 

2385 try: 

2386 task_subset = self.task_subsets[step_label] 

2387 except KeyError: 

2388 raise InvalidStepsError(f"Step {step_label!r} is not a task subset.") from None 

2389 if not task_labels_so_far.isdisjoint(task_subset): 

2390 raise InvalidStepsError( 

2391 f"Step {step_label!r} repeats task(s) {task_labels_so_far & task_subset}." 

2392 ) 

2393 task_labels_so_far.update(task_subset) 

2394 # We need a temporary data structure for tracking the inputs of 

2395 # just this step to avoid complaining about input-output 

2396 # relationships within the step. We'll merge this into the 

2397 # cumulative one later. 

2398 step_inputs: dict[str, set[str]] = {} 

2399 # We'll also gather all task, task-init and dataset type networkx 

2400 # graph keys for the step so we can make and sort a step sub-graph. 

2401 new_step_keys: set[NodeKey] = set() 

2402 for task_label in task_subset: 

2403 for input_name in self.inputs_of(task_label): 

2404 step_inputs.setdefault(input_name, set()).add(task_label) 

2405 # Check that none of the outputs of the tasks in this step were 

2406 # expected as inputs of a previous step's tasks. 

2407 task_outputs = self.outputs_of(task_label) 

2408 if not inputs_so_far.keys().isdisjoint(task_outputs.keys()): 

2409 msg: list[str] = [] 

2410 for input_name in inputs_so_far.keys() & task_outputs.keys(): 

2411 msg.append(f"{input_name} (used by {inputs_so_far[input_name]})") 

2412 raise InvalidStepsError( 

2413 f"Task {task_label} in step {step_label!r} produces dataset types " 

2414 f"[{', '.join(msg)}], but these are consumed by tasks in earlier " 

2415 "steps. Either steps are out of order or the graph is cyclic." 

2416 ) 

2417 task_init_key = NodeKey(NodeType.TASK_INIT, task_label) 

2418 task_key = NodeKey(NodeType.TASK, task_label) 

2419 new_step_keys.add(task_init_key) 

2420 new_step_keys.add(task_key) 

2421 new_step_keys.update(self._xgraph.predecessors(task_init_key)) 

2422 new_step_keys.update(self._xgraph.predecessors(task_key)) 

2423 new_step_keys.update(self._xgraph.successors(task_init_key)) 

2424 new_step_keys.update(self._xgraph.successors(task_key)) 

2425 # Also record the step the task is in as a private xgraph 

2426 # attribute so we can look up the step given a task (or, 

2427 # indirectly, dataset type); this has to be used with care 

2428 # because if the steps haven't been verified it can be wrong. 

2429 self._xgraph.nodes[task_init_key]["step"] = step_label 

2430 self._xgraph.nodes[task_key]["step"] = step_label 

2431 # Drop step input keys that were already either inputs or outputs 

2432 # of a previous step, since they'll have already been added to 

2433 # sort_keys. 

2434 new_step_keys.difference_update(keys_already_sorted) 

2435 # Make the step subgraph, sort it, and extend the overall sort_keys 

2436 # with result. 

2437 step_xgraph = self._xgraph.subgraph(new_step_keys) 

2438 sort_keys.extend(networkx.dag.lexicographical_topological_sort(step_xgraph)) 

2439 keys_already_sorted.update(new_step_keys) 

2440 for input_name, consuming_tasks in step_inputs.items(): 

2441 inputs_so_far.setdefault(input_name, set()).update(consuming_tasks) 

2442 if not task_labels_so_far.issuperset(self.tasks.keys()): 

2443 # Note that the converse issubset test effectively happens when we 

2444 # look up inputs and outputs of each task. 

2445 raise InvalidStepsError(f"No step contains task(s) {self.tasks.keys() - task_labels_so_far}.") 

2446 return sort_keys 

2447 

2448 def _resolve_step_dimensions(self, universe: DimensionUniverse) -> None: 

2449 """Check that step sharding dimensions are consistent with task and 

2450 output dataset dimensions. 

2451 

2452 Parameters 

2453 ---------- 

2454 universe : `~lsst.daf.butler.DimensionUniverse` 

2455 Definitions for all dimensions. Will be attached to the step 

2456 definitions by this method. 

2457 """ 

2458 self._step_definitions._universe = universe 

2459 for step_label in self.steps: 

2460 dimensions = self.steps.get_dimensions(step_label) 

2461 for task_label in self.task_subsets[step_label]: 

2462 task_node = self.tasks[task_label] 

2463 if not _dimensions_compatible(dimensions, task_node.dimensions): 

2464 raise InvalidStepsError( 

2465 f"Dimensions {task_node.dimensions} of task {task_label!r} are not compatible with " 

2466 f"the sharding dimensions {dimensions} of step {step_label!r}." 

2467 ) 

2468 for dataset_type_node in self.outputs_of(task_label).values(): 

2469 assert dataset_type_node is not None, "dataset types should be resolved first" 

2470 if not _dimensions_compatible(dimensions, dataset_type_node.dimensions): 

2471 raise InvalidStepsError( 

2472 f"Dimensions {dataset_type_node.dimensions} of dataset type " 

2473 f"{dataset_type_node.name!r} (produced by task {task_label!r}) are not " 

2474 f"compatible with the sharding dimensions {dimensions} of step " 

2475 f"{step_label!r}." 

2476 ) 

2477 

2478 def _select_expression(self, expr_tree: expressions.Node, task_xgraph: networkx.DiGraph) -> set[NodeKey]: 

2479 """Select tasks from a pipeline based on a string expression. 

2480 

2481 This is the primary implementation method for `select` and 

2482 `select_tasks`. 

2483 

2484 Parameters 

2485 ---------- 

2486 expr_tree : `expressions.Node` 

2487 Expression [sub]tree to process (recursively). 

2488 task_xgraph : `networkx.DiGraph` 

2489 NetworkX graph of all tasks (runtime nodes only) in the pipeline. 

2490 

2491 Returns 

2492 ------- 

2493 selected : `set` [ `NodeKey` ] 

2494 Set of `NodeKey` objects for matching tasks (only; no dataset type 

2495 or task-init nodes). 

2496 """ 

2497 match expr_tree: 

2498 case expressions.IdentifierNode(qualifier=qualifier, label=label): 

2499 match self._select_identifier(qualifier, label): 

2500 case NodeKey(node_type=NodeType.TASK) as task_key: 

2501 return {task_key} 

2502 case NodeKey(node_type=NodeType.DATASET_TYPE) as dataset_type_key: 

2503 # Since a dataset type can have only one producer, this 

2504 # yields 0- (for overall inputs) or 1-element sets. 

2505 for producer_key, _ in self._xgraph.in_edges(dataset_type_key): 

2506 if producer_key.node_type is NodeType.TASK_INIT: 

2507 raise InvalidExpressionError( 

2508 f"Init-output dataset type {label!r} cannot be used directly in an " 

2509 "expression." 

2510 ) 

2511 return {producer_key} 

2512 return set() 

2513 case TaskSubset() as task_subset: 

2514 return {NodeKey(NodeType.TASK, label) for label in task_subset} 

2515 case _: # pragma: no cover 

2516 raise AssertionError("Identifier type inconsistent with grammar.") 

2517 case expressions.DirectionNode(operator=operator, start=start): 

2518 match self._select_identifier(start.qualifier, start.label): 

2519 case NodeKey(node_type=NodeType.TASK) as task_key: 

2520 if operator.startswith("<"): 

2521 return self._select_task_ancestors( 

2522 task_key, task_xgraph, inclusive=operator.endswith("=") 

2523 ) 

2524 else: 

2525 assert operator.startswith(">"), "Guaranteed by grammar." 

2526 return self._select_task_descendants( 

2527 task_key, task_xgraph, inclusive=operator.endswith("=") 

2528 ) 

2529 case NodeKey(node_type=NodeType.DATASET_TYPE) as dataset_type_key: 

2530 if operator.startswith("<"): 

2531 return self._select_dataset_type_ancestors( 

2532 dataset_type_key, task_xgraph, inclusive=operator.endswith("=") 

2533 ) 

2534 else: 

2535 assert operator.startswith(">"), "Guaranteed by grammar." 

2536 return self._select_dataset_type_descendants( 

2537 dataset_type_key, task_xgraph, inclusive=operator.endswith("=") 

2538 ) 

2539 case TaskSubset(): 

2540 raise InvalidExpressionError( 

2541 f"Task subset identifier {start!r} cannot be used as the start of an " 

2542 "ancestor/descendant search." 

2543 ) 

2544 case _: # pragma: no cover 

2545 raise AssertionError("Unexpected parsed identifier result type.") 

2546 case expressions.NotNode(operand=operand): 

2547 operand_result = self._select_expression(operand, task_xgraph) 

2548 return set(task_xgraph.nodes.keys() - operand_result) 

2549 case expressions.UnionNode(lhs=lhs, rhs=rhs): 

2550 lhs_result = self._select_expression(lhs, task_xgraph) 

2551 rhs_result = self._select_expression(rhs, task_xgraph) 

2552 return lhs_result.union(rhs_result) 

2553 case expressions.IntersectionNode(lhs=lhs, rhs=rhs): 

2554 lhs_result = self._select_expression(lhs, task_xgraph) 

2555 rhs_result = self._select_expression(rhs, task_xgraph) 

2556 return lhs_result.intersection(rhs_result) 

2557 case _: # pragma: no cover 

2558 raise AssertionError("Expression parse node inconsistent with grammar.") 

2559 

2560 def _select_task_ancestors( 

2561 self, start: NodeKey, task_xgraph: networkx.DiGraph, inclusive: bool 

2562 ) -> set[NodeKey]: 

2563 """Return all task-node ancestors of the given task node, as defined by 

2564 the `select` expression language. 

2565 

2566 Parameters 

2567 ---------- 

2568 start : `NodeKey` 

2569 A runtime task node key. 

2570 task_xgraph : `networkx.DiGraph` 

2571 NetworkX graph of all tasks (runtime nodes only) in the pipeline. 

2572 inclusive : `bool` 

2573 Whether to include the ``start`` node in the results. 

2574 

2575 Returns 

2576 ------- 

2577 selected : `set` [ `NodeKey` ] 

2578 Set of `NodeKey` objects for matching tasks (only; no dataset type 

2579 or task-init nodes). 

2580 """ 

2581 result = set(networkx.dag.ancestors(task_xgraph, start)) 

2582 if inclusive: 

2583 result.add(start) 

2584 return result 

2585 

2586 def _select_task_descendants( 

2587 self, start: NodeKey, task_xgraph: networkx.DiGraph, inclusive: bool 

2588 ) -> set[NodeKey]: 

2589 """Return all task-node descendants of the given task node, as defined 

2590 by the `select` expression language. 

2591 

2592 Parameters 

2593 ---------- 

2594 start : `NodeKey` 

2595 A runtime task node key. 

2596 task_xgraph : `networkx.DiGraph` 

2597 NetworkX graph of all tasks (runtime nodes only) in the pipeline. 

2598 inclusive : `bool` 

2599 Whether to include the ``start`` node in the results. 

2600 

2601 Returns 

2602 ------- 

2603 selected : `set` [ `NodeKey` ] 

2604 Set of `NodeKey` objects for matching tasks (only; no dataset type 

2605 or task-init nodes). 

2606 """ 

2607 result = set(networkx.dag.descendants(task_xgraph, start)) 

2608 if inclusive: 

2609 result.add(start) 

2610 return result 

2611 

2612 def _select_dataset_type_ancestors( 

2613 self, start: NodeKey, task_xgraph: networkx.DiGraph, inclusive: bool 

2614 ) -> set[NodeKey]: 

2615 """Return all task-node ancestors of the given dataset type node, as 

2616 defined by the `select` expression language. 

2617 

2618 Parameters 

2619 ---------- 

2620 start : `NodeKey` 

2621 A dataset type node key. May not be an init-output. 

2622 task_xgraph : `networkx.DiGraph` 

2623 NetworkX graph of all tasks (runtime nodes only) in the pipeline. 

2624 inclusive : `bool` 

2625 Whether to include the producer of the ``start`` node in the 

2626 results. 

2627 

2628 Returns 

2629 ------- 

2630 selected : `set` [ `NodeKey` ] 

2631 Set of `NodeKey` objects for matching tasks (only; no dataset type 

2632 or task-init nodes). 

2633 """ 

2634 result: set[NodeKey] = set() 

2635 for producer_key, _ in self._xgraph.in_edges(start): 

2636 if producer_key.node_type is NodeType.TASK_INIT: 

2637 raise InvalidExpressionError( 

2638 f"Init-output dataset type {start.name!r} cannot be used as the " 

2639 "starting point for an ancestor ('<' or '<=') search." 

2640 ) 

2641 result.update(networkx.dag.ancestors(task_xgraph, producer_key)) 

2642 if inclusive: 

2643 result.add(producer_key) 

2644 return result 

2645 

2646 def _select_dataset_type_descendants( 

2647 self, start: NodeKey, task_xgraph: networkx.DiGraph, inclusive: bool 

2648 ) -> set[NodeKey]: 

2649 """Return all task-node descendatns of the given dataset type node, as 

2650 defined by the `select` expression language. 

2651 

2652 Parameters 

2653 ---------- 

2654 start : `NodeKey` 

2655 A dataset type node key. May not be an init-output if 

2656 ``inclusive=True``. 

2657 task_xgraph : `networkx.DiGraph` 

2658 NetworkX graph of all tasks (runtime nodes only) in the pipeline. 

2659 inclusive : `bool` 

2660 Whether to include the producer of the ``start`` node in the 

2661 results. 

2662 

2663 Returns 

2664 ------- 

2665 selected : `set` [ `NodeKey` ] 

2666 Set of `NodeKey` objects for matching tasks (only; no dataset type 

2667 or task-init nodes). 

2668 """ 

2669 result: set[NodeKey] = set() 

2670 if inclusive: 

2671 for producer_key, _ in self._xgraph.in_edges(start): 

2672 if producer_key.node_type is NodeType.TASK_INIT: 

2673 raise InvalidExpressionError( 

2674 f"Init-output dataset type {start.name!r} cannot be used as the " 

2675 "starting point for an includsive descendant ('>=') search." 

2676 ) 

2677 result.add(producer_key) 

2678 # We also include tasks that consume a dataset type as an init-input, 

2679 # since that can affect their runtime behavior. 

2680 consumer_keys: set[NodeKey] = { 

2681 ( 

2682 consumer_key 

2683 if consumer_key.node_type is NodeType.TASK 

2684 else NodeKey(NodeType.TASK, consumer_key.name) 

2685 ) 

2686 for _, consumer_key in self._xgraph.out_edges(start) 

2687 } 

2688 for consumer_key in consumer_keys: 

2689 result.add(consumer_key) 

2690 result.update(networkx.dag.descendants(task_xgraph, consumer_key)) 

2691 return result 

2692 

2693 def _select_identifier( 

2694 self, qualifier: Literal["T", "D", "S"] | None, label: str 

2695 ) -> NodeKey | TaskSubset: 

2696 """Return the node key or task subset that corresponds to a `select` 

2697 expression identifier. 

2698 

2699 Parameters 

2700 ---------- 

2701 qualifier : `str` or `None` 

2702 Task, dataset type, or task subset qualifier included in the 

2703 identifier, if any. 

2704 label : `str` 

2705 Task label, dataset type name, or task subset label. 

2706 

2707 Returns 

2708 ------- 

2709 key_or_subset : `NodeKey` or `TaskSubset` 

2710 A `NodeKey` for a task or dataset type, or a `TaskSubset` for a 

2711 task subset. 

2712 """ 

2713 match qualifier: 

2714 case None: 

2715 task_key = NodeKey(NodeType.TASK, label) 

2716 dataset_type_key = NodeKey(NodeType.DATASET_TYPE, label) 

2717 if task_key in self._xgraph.nodes: 

2718 if dataset_type_key in self._xgraph.nodes: 

2719 raise InvalidExpressionError( 

2720 f"{label!r} is both a task label and a dataset type name; " 

2721 "prefix with 'T:' or 'D:' (respectively) to specify which." 

2722 ) 

2723 assert label not in self._task_subsets, "Should be prohibited at construction." 

2724 return task_key 

2725 elif dataset_type_key in self._xgraph.nodes: 

2726 if label in self._task_subsets: 

2727 raise InvalidExpressionError( 

2728 f"{label!r} is both a subset label and a dataset type name; " 

2729 "prefix with 'S:' or 'D:' (respectively) to specify which." 

2730 ) 

2731 return dataset_type_key 

2732 elif label in self._task_subsets: 

2733 return self._task_subsets[label] 

2734 else: 

2735 raise InvalidExpressionError( 

2736 f"{label!r} is not a task label, task subset label, or dataset type name." 

2737 ) 

2738 case "T": 

2739 task_key = NodeKey(NodeType.TASK, label) 

2740 if task_key not in self._xgraph.nodes: 

2741 raise InvalidExpressionError(f"Task with label {label!r} does not exist.") 

2742 return task_key 

2743 case "D": 

2744 dataset_type_key = NodeKey(NodeType.DATASET_TYPE, label) 

2745 if dataset_type_key not in self._xgraph.nodes: 

2746 raise InvalidExpressionError(f"Dataset type with name {label!r} does not exist.") 

2747 return dataset_type_key 

2748 case "S": 

2749 try: 

2750 return self._task_subsets[label] 

2751 except KeyError: 

2752 raise InvalidExpressionError(f"Task subset with label {label!r} does not exist.") 

2753 case _: # pragma: no cover 

2754 raise AssertionError("Unexpected identifier qualifier in expression.") 

2755 

2756 _xgraph: networkx.MultiDiGraph 

2757 _sorted_keys: Sequence[NodeKey] | None 

2758 _task_subsets: dict[str, TaskSubset] 

2759 _step_definitions: StepDefinitions 

2760 _description: str 

2761 _tasks: TaskMappingView 

2762 _dataset_types: DatasetTypeMappingView 

2763 _raw_data_id: dict[str, Any] 

2764 _universe: DimensionUniverse | None 

2765 

2766 

2767def log_config_mismatch(msg: str) -> None: 

2768 """Log messages about configuration mismatch. 

2769 

2770 Parameters 

2771 ---------- 

2772 msg : `str` 

2773 Log message to use. 

2774 """ 

2775 _LOG.fatal("Comparing configuration: %s", msg) 

2776 

2777 

2778def compare_packages(packages: Packages, new_packages: Packages) -> bool: 

2779 """Compare two versions of Packages. 

2780 

2781 Parameters 

2782 ---------- 

2783 packages : `Packages` 

2784 Previously recorded package versions. Updated in place to include 

2785 any new packages that weren't present before. 

2786 new_packages : `Packages` 

2787 New set of package versions. 

2788 

2789 Returns 

2790 ------- 

2791 updated : `bool` 

2792 `True` if ``packages`` was updated, `False` if not. 

2793 

2794 Raises 

2795 ------ 

2796 ConflictingDefinitionError 

2797 Raised if versions are inconsistent. 

2798 """ 

2799 diff = new_packages.difference(packages) 

2800 if diff: 

2801 versions_str = "; ".join(f"{pkg}: {diff[pkg][1]} vs {diff[pkg][0]}" for pkg in diff) 

2802 raise ConflictingDefinitionError(f"Package versions mismatch: ({versions_str})") 

2803 else: 

2804 _LOG.debug("new packages are consistent with old") 

2805 # Update the old set of packages in case we have more packages 

2806 # that haven't been persisted. 

2807 extra = new_packages.extra(packages) 

2808 if extra: 

2809 _LOG.debug("extra packages: %s", extra) 

2810 packages.update(new_packages) 

2811 return True 

2812 return False 

2813 

2814 

2815def _dimensions_compatible(dimensions: DimensionGroup, object_dimensions: DimensionGroup) -> bool: 

2816 if dimensions.issubset(object_dimensions): 

2817 # Easy typical case. 

2818 return True 

2819 # Hard case: if any sharding dimensions that are not in the object 

2820 # dimensions are related to something in the object dimensions by a 

2821 # many-to-many join table, this is okay too. The main use case here is to 

2822 # let {exposure, [detector]} satisfy sharding dimensions of 

2823 # {visit, [detector]}. 

2824 universe = dimensions.universe 

2825 unmatched_sharding = dimensions.required - object_dimensions.required 

2826 unmatched_object = object_dimensions.required - dimensions.required 

2827 unmatched_union_group = dimensions.union(object_dimensions) 

2828 for element_name in unmatched_union_group.elements: 

2829 if ( 

2830 (element := universe[element_name]).defines_relationships 

2831 and not element.dimensions.isdisjoint(unmatched_sharding) 

2832 and not element.dimensions.isdisjoint(unmatched_object) 

2833 ): 

2834 return True 

2835 return False