Coverage for python / lsst / pipe / base / pipeline_graph / _edges.py: 28%

214 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-06 08:32 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27from __future__ import annotations 

28 

29__all__ = ("Edge", "ReadEdge", "WriteEdge") 

30 

31from abc import ABC, abstractmethod 

32from collections.abc import Callable, Mapping, Sequence 

33from typing import Any, ClassVar, Self 

34 

35from lsst.daf.butler import DatasetRef, DatasetType, DimensionUniverse, StorageClassFactory 

36from lsst.daf.butler.registry import MissingDatasetTypeError 

37from lsst.utils.classes import immutable 

38 

39from ..connectionTypes import BaseConnection 

40from ._exceptions import ConnectionTypeConsistencyError, IncompatibleDatasetTypeError 

41from ._nodes import NodeKey, NodeType 

42 

43 

44@immutable 

45class Edge(ABC): 

46 """Base class for edges in a pipeline graph. 

47 

48 This represents the link between a task node and an input or output dataset 

49 type. 

50 

51 Parameters 

52 ---------- 

53 task_key : `NodeKey` 

54 Key for the task node this edge is connected to. 

55 dataset_type_key : `NodeKey` 

56 Key for the dataset type node this edge is connected to. 

57 storage_class_name : `str` 

58 Name of the dataset type's storage class as seen by the task. 

59 connection_name : `str` 

60 Internal name for the connection as seen by the task. 

61 is_calibration : `bool` 

62 Whether this dataset type can be included in 

63 `~lsst.daf.butler.CollectionType.CALIBRATION` collections. 

64 raw_dimensions : `frozenset` [ `str` ] 

65 Raw dimensions from the connection definition. 

66 """ 

67 

68 def __init__( 

69 self, 

70 *, 

71 task_key: NodeKey, 

72 dataset_type_key: NodeKey, 

73 storage_class_name: str, 

74 connection_name: str, 

75 is_calibration: bool, 

76 raw_dimensions: frozenset[str], 

77 ): 

78 self.task_key = task_key 

79 self.dataset_type_key = dataset_type_key 

80 self.connection_name = connection_name 

81 self.storage_class_name = storage_class_name 

82 self.is_calibration = is_calibration 

83 self.raw_dimensions = raw_dimensions 

84 

85 INIT_TO_TASK_NAME: ClassVar[str] = "INIT" 

86 """Edge key for the special edge that connects a task init node to the 

87 task node itself (for regular edges, this would be the connection name). 

88 """ 

89 

90 task_key: NodeKey 

91 """Task part of the key for this edge in networkx graphs (`NodeKey`).""" 

92 

93 dataset_type_key: NodeKey 

94 """Task part of the key for this edge in networkx graphs (`NodeKey`).""" 

95 

96 connection_name: str 

97 """Name used by the task to refer to this dataset type (`str`).""" 

98 

99 storage_class_name: str 

100 """Storage class expected by this task (`str`). 

101 

102 If `ReadEdge.component` is not `None`, this is the component storage class, 

103 not the parent storage class. 

104 """ 

105 

106 is_calibration: bool 

107 """Whether this dataset type can be included in 

108 `~lsst.daf.butler.CollectionType.CALIBRATION` collections. 

109 """ 

110 

111 raw_dimensions: frozenset[str] 

112 """Raw dimensions in the task declaration (`frozenset` [`str`]). 

113 

114 This can only be used safely for partial comparisons: two edges with the 

115 same ``raw_dimensions`` (and the same parent dataset type name) always have 

116 the same resolved dimensions, but edges with different ``raw_dimensions`` 

117 may also have the same resolvd dimensions. 

118 """ 

119 

120 @property 

121 def is_init(self) -> bool: 

122 """Whether this dataset is read or written when the task is 

123 constructed, not when it is run. 

124 """ 

125 return self.task_key.node_type is NodeType.TASK_INIT 

126 

127 @property 

128 def task_label(self) -> str: 

129 """Label of the task (`str`).""" 

130 return str(self.task_key) 

131 

132 @property 

133 def parent_dataset_type_name(self) -> str: 

134 """Name of the parent dataset type (`str`). 

135 

136 All dataset type nodes in a pipeline graph are for parent dataset 

137 types; components are represented by additional `ReadEdge` state. 

138 """ 

139 return str(self.dataset_type_key) 

140 

141 @property 

142 @abstractmethod 

143 def nodes(self) -> tuple[NodeKey, NodeKey]: 

144 """The directed pair of `NodeKey` instances this edge connects. 

145 

146 This tuple is ordered in the same direction as the pipeline flow: 

147 `task_key` precedes `dataset_type_key` for writes, and the 

148 reverse is true for reads. 

149 """ 

150 raise NotImplementedError() 

151 

152 @property 

153 def key(self) -> tuple[NodeKey, NodeKey, str]: 

154 """Ordered tuple of node keys and connection name that uniquely 

155 identifies this edge in a pipeline graph (`NodeKey`, `NodeKey`, `str`). 

156 

157 The nodes are ordered in the same sense as for `nodes`. 

158 """ 

159 return self.nodes + (self.connection_name,) 

160 

161 def __repr__(self) -> str: 

162 return f"{self.nodes[0]} -> {self.nodes[1]} ({self.connection_name})" 

163 

164 @property 

165 def dataset_type_name(self) -> str: 

166 """Dataset type name seen by the task (`str`). 

167 

168 This defaults to the parent dataset type name, which is appropriate 

169 for all writes and most reads. 

170 """ 

171 return self.parent_dataset_type_name 

172 

173 def diff[S: Edge](self: S, other: S, connection_type: str = "connection") -> list[str]: 

174 """Compare this edge to another one from a possibly-different 

175 configuration of the same task label. 

176 

177 Parameters 

178 ---------- 

179 other : `Edge` 

180 Another edge of the same type to compare to. 

181 connection_type : `str` 

182 Human-readable name of the connection type of this edge (e.g. 

183 "init input", "output") for use in returned messages. 

184 

185 Returns 

186 ------- 

187 differences : `list` [ `str` ] 

188 List of string messages describing differences between ``self`` and 

189 ``other``. Will be empty if ``self == other`` or if the only 

190 difference is in the task label or connection name (which are not 

191 checked). Messages will use 'A' to refer to ``self`` and 'B' to 

192 refer to ``other``. 

193 """ 

194 result = [] 

195 if self.dataset_type_name != other.dataset_type_name: 

196 result.append( 

197 f"{connection_type.capitalize()} {self.task_label}.{self.connection_name} has dataset type " 

198 f"{self.dataset_type_name!r} in A, but {other.dataset_type_name!r} in B." 

199 ) 

200 if self.storage_class_name != other.storage_class_name: 

201 result.append( 

202 f"{connection_type.capitalize()} {self.task_label}.{self.connection_name} has storage class " 

203 f"{self.storage_class_name!r} in A, but {other.storage_class_name!r} in B." 

204 ) 

205 if self.raw_dimensions != other.raw_dimensions: 

206 result.append( 

207 f"{connection_type.capitalize()} {self.task_label}.{self.connection_name} has raw dimensions " 

208 f"{set(self.raw_dimensions)} in A, but {set(other.raw_dimensions)} in B " 

209 "(differences in raw dimensions may not lead to differences in resolved dimensions, " 

210 "but this cannot be checked without re-resolving the dataset type)." 

211 ) 

212 if self.is_calibration != other.is_calibration: 

213 result.append( 

214 f"{connection_type.capitalize()} {self.task_label}.{self.connection_name} is marked as a " 

215 f"calibration {'in A but not in B' if self.is_calibration else 'in B but not in A'}." 

216 ) 

217 return result 

218 

219 @abstractmethod 

220 def adapt_dataset_type(self, dataset_type: DatasetType) -> DatasetType: 

221 """Transform the graph's definition of a dataset type (parent, with the 

222 registry or producer's storage class) to the one seen by this task. 

223 

224 Parameters 

225 ---------- 

226 dataset_type : `~lsst.daf.butler.DatasetType` 

227 Graph's definition of dataset type. 

228 

229 Returns 

230 ------- 

231 out_dataset_type : `~lsst.daf.butler.DatasetType` 

232 Dataset type seen by this task. 

233 """ 

234 raise NotImplementedError() 

235 

236 @abstractmethod 

237 def adapt_dataset_ref(self, ref: DatasetRef) -> DatasetRef: 

238 """Transform the graph's definition of a dataset reference (parent 

239 dataset type, with the registry or producer's storage class) to the one 

240 seen by this task. 

241 

242 Parameters 

243 ---------- 

244 ref : `~lsst.daf.butler.DatasetRef` 

245 Graph's definition of the dataset reference. 

246 

247 Returns 

248 ------- 

249 out_dataset_ref : `~lsst.daf.butler.DatasetRef` 

250 Dataset reference seen by this task. 

251 """ 

252 raise NotImplementedError() 

253 

254 def _to_xgraph_state(self) -> dict[str, Any]: 

255 """Convert this edges's attributes into a dictionary suitable for use 

256 in exported networkx graphs. 

257 """ 

258 return { 

259 "connection_name": self.connection_name, 

260 "parent_dataset_type_name": self.parent_dataset_type_name, 

261 "storage_class_name": self.storage_class_name, 

262 "is_init": bool, 

263 } 

264 

265 @classmethod 

266 def _unreduce(cls, kwargs: dict[str, Any]) -> Self: 

267 """Unpickle an `Edge` instance.""" 

268 return cls(**kwargs) 

269 

270 def __reduce__(self) -> tuple[Callable[[dict[str, Any]], Edge], tuple[dict[str, Any]]]: 

271 return ( 

272 self._unreduce, 

273 ( 

274 dict( 

275 task_key=self.task_key, 

276 dataset_type_key=self.dataset_type_key, 

277 storage_class_name=self.storage_class_name, 

278 connection_name=self.connection_name, 

279 is_calibration=self.is_calibration, 

280 raw_dimensions=self.raw_dimensions, 

281 ), 

282 ), 

283 ) 

284 

285 

286class ReadEdge(Edge): 

287 """Representation of an input connection (including init-inputs and 

288 prerequisites) in a pipeline graph. 

289 

290 Parameters 

291 ---------- 

292 dataset_type_key : `NodeKey` 

293 Key for the dataset type node this edge is connected to. This should 

294 hold the parent dataset type name for component dataset types. 

295 task_key : `NodeKey` 

296 Key for the task node this edge is connected to. 

297 storage_class_name : `str` 

298 Name of the dataset type's storage class as seen by the task. 

299 connection_name : `str` 

300 Internal name for the connection as seen by the task. 

301 is_calibration : `bool` 

302 Whether this dataset type can be included in 

303 `~lsst.daf.butler.CollectionType.CALIBRATION` collections. 

304 raw_dimensions : `frozenset` [ `str` ] 

305 Raw dimensions from the connection definition. 

306 is_prerequisite : `bool` 

307 Whether this dataset must be present in the data repository prior to 

308 `QuantumGraph` generation. 

309 component : `str` or `None` 

310 Component of the dataset type requested by the task. 

311 defer_query_constraint : `bool` 

312 If `True`, by default do not include this dataset type's existence as a 

313 constraint on the initial data ID query in QuantumGraph generation. 

314 

315 Notes 

316 ----- 

317 When included in an exported `networkx` graph (e.g. 

318 `PipelineGraph.make_xgraph`), read edges set the following edge attributes: 

319 

320 - ``parent_dataset_type_name`` 

321 - ``storage_class_name`` 

322 - ``is_init`` 

323 - ``component`` 

324 - ``is_prerequisite`` 

325 

326 As with `ReadEdge` instance attributes, these descriptions of dataset types 

327 are those specific to a task, and may differ from the graph's resolved 

328 dataset type or (if `PipelineGraph.resolve` has not been called) there may 

329 not even be a consistent definition of the dataset type. 

330 """ 

331 

332 def __init__( 

333 self, 

334 dataset_type_key: NodeKey, 

335 task_key: NodeKey, 

336 *, 

337 storage_class_name: str, 

338 connection_name: str, 

339 is_calibration: bool, 

340 raw_dimensions: frozenset[str], 

341 is_prerequisite: bool, 

342 component: str | None, 

343 defer_query_constraint: bool, 

344 ): 

345 super().__init__( 

346 task_key=task_key, 

347 dataset_type_key=dataset_type_key, 

348 storage_class_name=storage_class_name, 

349 connection_name=connection_name, 

350 raw_dimensions=raw_dimensions, 

351 is_calibration=is_calibration, 

352 ) 

353 self.is_prerequisite = is_prerequisite 

354 self.component = component 

355 self.defer_query_constraint = defer_query_constraint 

356 

357 component: str | None 

358 """Component to add to `parent_dataset_type_name` to form the dataset type 

359 name seen by this task (`str` or `None`). 

360 """ 

361 

362 is_prerequisite: bool 

363 """Whether this dataset must be present in the data repository prior to 

364 `QuantumGraph` generation. 

365 """ 

366 

367 defer_query_constraint: bool 

368 """If `True`, by default do not include this dataset type's existence as a 

369 constraint on the initial data ID query in QuantumGraph generation. 

370 

371 This can be `True` either because the connection class had 

372 ``deferQueryConstraint=True`` or because it had ``minimum=0``. 

373 """ 

374 

375 @property 

376 def nodes(self) -> tuple[NodeKey, NodeKey]: 

377 # Docstring inherited. 

378 return (self.dataset_type_key, self.task_key) 

379 

380 @property 

381 def dataset_type_name(self) -> str: 

382 """Complete dataset type name, as seen by the task (`str`).""" 

383 if self.component is not None: 

384 return f"{self.parent_dataset_type_name}.{self.component}" 

385 return self.parent_dataset_type_name 

386 

387 def diff(self: ReadEdge, other: ReadEdge, connection_type: str = "connection") -> list[str]: 

388 # Docstring inherited. 

389 result = super().diff(other, connection_type) 

390 if self.defer_query_constraint != other.defer_query_constraint: 

391 result.append( 

392 f"{connection_type.capitalize()} {self.connection_name!r} is marked as a deferred query " 

393 f"constraint {'in A but not in B' if self.defer_query_constraint else 'in B but not in A'}." 

394 ) 

395 return result 

396 

397 def adapt_dataset_type(self, dataset_type: DatasetType) -> DatasetType: 

398 # Docstring inherited. 

399 if self.component is not None: 

400 dataset_type = dataset_type.makeComponentDatasetType(self.component) 

401 if self.storage_class_name != dataset_type.storageClass_name: 

402 return dataset_type.overrideStorageClass(self.storage_class_name) 

403 return dataset_type 

404 

405 def adapt_dataset_ref(self, ref: DatasetRef) -> DatasetRef: 

406 # Docstring inherited. 

407 if self.component is not None: 

408 ref = ref.makeComponentRef(self.component) 

409 if self.storage_class_name != ref.datasetType.storageClass_name: 

410 return ref.overrideStorageClass(self.storage_class_name) 

411 return ref 

412 

413 @classmethod 

414 def _from_connection_map( 

415 cls, 

416 task_key: NodeKey, 

417 connection_name: str, 

418 connection_map: Mapping[str, BaseConnection], 

419 is_prerequisite: bool = False, 

420 ) -> ReadEdge: 

421 """Construct a `ReadEdge` instance from a `.BaseConnection` object. 

422 

423 Parameters 

424 ---------- 

425 task_key : `NodeKey` 

426 Key for the associated task node or task init node. 

427 connection_name : `str` 

428 Internal name for the connection as seen by the task,. 

429 connection_map : Mapping [ `str`, `.BaseConnection` ] 

430 Mapping of post-configuration object to draw dataset type 

431 information from, keyed by connection name. 

432 is_prerequisite : `bool`, optional 

433 Whether this dataset must be present in the data repository prior 

434 to `QuantumGraph` generation. 

435 

436 Returns 

437 ------- 

438 edge : `ReadEdge` 

439 New edge instance. 

440 """ 

441 connection = connection_map[connection_name] 

442 parent_dataset_type_name, component = DatasetType.splitDatasetTypeName(connection.name) 

443 return cls( 

444 dataset_type_key=NodeKey(NodeType.DATASET_TYPE, parent_dataset_type_name), 

445 task_key=task_key, 

446 component=component, 

447 storage_class_name=connection.storageClass, 

448 # InitInput connections don't have .isCalibration. 

449 is_calibration=getattr(connection, "isCalibration", False), 

450 is_prerequisite=is_prerequisite, 

451 connection_name=connection_name, 

452 # InitInput connections don't have a .dimensions because they 

453 # always have empty dimensions. 

454 raw_dimensions=frozenset(getattr(connection, "dimensions", frozenset())), 

455 # PrerequisiteInput and InitInput connections don't have a 

456 # .deferGraphConstraint, because they never constrain the initial 

457 # data ID query. 

458 defer_query_constraint=( 

459 getattr(connection, "deferGraphConstraint", False) or getattr(connection, "minimum", 1) == 0 

460 ), 

461 ) 

462 

463 def _resolve_dataset_type( 

464 self, 

465 *, 

466 current: DatasetType | None, 

467 is_initial_query_constraint: bool, 

468 is_prerequisite: bool | None, 

469 universe: DimensionUniverse, 

470 producer: str | None, 

471 consumers: Sequence[str], 

472 is_registered: bool, 

473 visualization_only: bool, 

474 ) -> tuple[DatasetType, bool, bool]: 

475 """Participate in the construction of the `DatasetTypeNode` object 

476 associated with this edge. 

477 

478 Parameters 

479 ---------- 

480 current : `lsst.daf.butler.DatasetType` or `None` 

481 The current graph-wide `~lsst.daf.butler.DatasetType`, or `None`. 

482 This will always be the registry's definition of the parent dataset 

483 type, if one exists. If not, it will be the dataset type 

484 definition from the task in the graph that writes it, if there is 

485 one. If there is no such task, this will be `None`. 

486 is_initial_query_constraint : `bool` 

487 Whether this dataset type is currently marked as a constraint on 

488 the initial data ID query in QuantumGraph generation. 

489 is_prerequisite : `bool` | None` 

490 Whether this dataset type is marked as a prerequisite input in all 

491 edges processed so far. `None` if this is the first edge. 

492 universe : `lsst.daf.butler.DimensionUniverse` 

493 Object that holds all dimension definitions. 

494 producer : `str` or `None` 

495 The label of the task that produces this dataset type in the 

496 pipeline, or `None` if it is an overall input. 

497 consumers : `~collections.abc.Sequence` [ `str` ] 

498 Labels for other consuming tasks that have already participated in 

499 this dataset type's resolution. 

500 is_registered : `bool` 

501 Whether a registration for this dataset type was found in the 

502 data repository. 

503 visualization_only : `bool` 

504 Resolve the graph as well as possible even when dimensions and 

505 storage classes cannot really be determined. This can include 

506 using the ``universe.commonSkyPix`` as the assumed dimensions of 

507 connections that use the "skypix" placeholder and using "<UNKNOWN>" 

508 as a storage class name (which will fail if the storage class 

509 itself is ever actually loaded). 

510 

511 Returns 

512 ------- 

513 dataset_type : `~lsst.daf.butler.DatasetType` 

514 The updated graph-wide dataset type. If ``current`` was provided, 

515 this must be equal to it. 

516 is_initial_query_constraint : `bool` 

517 If `True`, this dataset type should be included as a constraint in 

518 the initial data ID query during QuantumGraph generation; this 

519 requires that ``is_initial_query_constraint`` also be `True` on 

520 input. 

521 is_prerequisite : `bool` 

522 Whether this dataset type is marked as a prerequisite input in this 

523 task and all other edges processed so far. 

524 

525 Raises 

526 ------ 

527 MissingDatasetTypeError 

528 Raised if ``current is None`` and this edge cannot define one on 

529 its own. 

530 IncompatibleDatasetTypeError 

531 Raised if ``current is not None`` and this edge's definition is not 

532 compatible with it. 

533 ConnectionTypeConsistencyError 

534 Raised if a prerequisite input for one task appears as a different 

535 kind of connection in any other task. 

536 """ 

537 if "skypix" in self.raw_dimensions: 

538 if current is None: 

539 if visualization_only: 

540 dimensions = universe.conform( 

541 [d if d != "skypix" else universe.commonSkyPix.name for d in self.raw_dimensions] 

542 ) 

543 else: 

544 raise MissingDatasetTypeError( 

545 f"DatasetType '{self.dataset_type_name}' referenced by " 

546 f"{self.task_label!r} uses 'skypix' as a dimension " 

547 f"placeholder, but has not been registered with the data repository. " 

548 f"Note that reference catalog names are now used as the dataset " 

549 f"type name instead of 'ref_cat'." 

550 ) 

551 else: 

552 rest1 = set(universe.conform(self.raw_dimensions - {"skypix"}).names) 

553 rest2 = current.dimensions.names - current.dimensions.skypix 

554 if rest1 != rest2: 

555 raise IncompatibleDatasetTypeError( 

556 f"Non-skypix dimensions for dataset type {self.dataset_type_name} declared in " 

557 f"connections ({rest1}) are inconsistent with those in " 

558 f"registry's version of this dataset ({rest2})." 

559 ) 

560 dimensions = current.dimensions 

561 else: 

562 dimensions = universe.conform(self.raw_dimensions) 

563 is_initial_query_constraint = is_initial_query_constraint and not self.defer_query_constraint 

564 if is_prerequisite is None: 

565 is_prerequisite = self.is_prerequisite 

566 elif is_prerequisite and not self.is_prerequisite: 

567 raise ConnectionTypeConsistencyError( 

568 f"Dataset type {self.parent_dataset_type_name!r} is a prerequisite input to {consumers}, " 

569 f"but it is not a prerequisite to {self.task_label!r}." 

570 ) 

571 elif not is_prerequisite and self.is_prerequisite: 

572 if producer is not None: 

573 raise ConnectionTypeConsistencyError( 

574 f"Dataset type {self.parent_dataset_type_name!r} is a prerequisite input to " 

575 f"{self.task_label}, but it is produced by {producer!r}." 

576 ) 

577 else: 

578 raise ConnectionTypeConsistencyError( 

579 f"Dataset type {self.parent_dataset_type_name!r} is a prerequisite input to " 

580 f"{self.task_label}, but it is a regular input to {consumers!r}." 

581 ) 

582 

583 def report_current_origin() -> str: 

584 if is_registered: 

585 return "data repository" 

586 elif producer is not None: 

587 return f"producing task {producer!r}" 

588 else: 

589 return f"consuming task(s) {consumers!r}" 

590 

591 if self.component is not None: 

592 if current is None: 

593 if visualization_only: 

594 current = DatasetType( 

595 self.parent_dataset_type_name, 

596 dimensions, 

597 storageClass="<UNKNOWN>", 

598 isCalibration=self.is_calibration, 

599 ) 

600 else: 

601 raise MissingDatasetTypeError( 

602 f"Dataset type {self.parent_dataset_type_name!r} is not registered and not produced " 

603 f"by this pipeline, but it is used by task {self.task_label!r}, via component " 

604 f"{self.component!r}. This pipeline cannot be resolved until the parent dataset " 

605 "type is registered." 

606 ) 

607 else: 

608 try: 

609 all_current_components = current.storageClass.allComponents() 

610 except (KeyError, ImportError): 

611 if visualization_only: 

612 current = DatasetType( 

613 self.parent_dataset_type_name, 

614 dimensions, 

615 storageClass="<UNKNOWN>", 

616 isCalibration=self.is_calibration, 

617 ) 

618 return current, is_initial_query_constraint, is_prerequisite 

619 raise 

620 if self.component not in all_current_components: 

621 raise IncompatibleDatasetTypeError( 

622 f"Dataset type {self.parent_dataset_type_name!r} has storage class " 

623 f"{current.storageClass_name!r} (from {report_current_origin()}), " 

624 f"which does not include component {self.component!r} " 

625 f"as requested by task {self.task_label!r}." 

626 ) 

627 # Note that we can't actually make a fully-correct DatasetType 

628 # for the component the task wants, because we don't have the 

629 # parent storage class. 

630 current_component = all_current_components[self.component] 

631 

632 if ( 

633 not visualization_only 

634 and current_component.name != self.storage_class_name 

635 and not StorageClassFactory() 

636 .getStorageClass(self.storage_class_name) 

637 .can_convert(current_component) 

638 ): 

639 raise IncompatibleDatasetTypeError( 

640 f"Dataset type '{self.parent_dataset_type_name}.{self.component}' has storage class " 

641 f"{all_current_components[self.component].name!r} " 

642 f"(from {report_current_origin()}), which cannot be converted to " 

643 f"{self.storage_class_name!r}, as requested by task {self.task_label!r}." 

644 ) 

645 return current, is_initial_query_constraint, is_prerequisite 

646 else: 

647 dataset_type = DatasetType( 

648 self.parent_dataset_type_name, 

649 dimensions, 

650 storageClass=self.storage_class_name, 

651 isCalibration=self.is_calibration, 

652 ) 

653 if current is not None: 

654 if not is_registered and producer is None: 

655 # Current definition comes from another consumer; we 

656 # require the dataset types to be exactly equal (not just 

657 # compatible), since neither connection should take 

658 # precedence. 

659 if dataset_type != current: 

660 if visualization_only and dataset_type.dimensions == current.dimensions: 

661 # Make a visualization-only ambiguous storage class 

662 # "name". 

663 all_storage_classes = set(current.storageClass_name.split("/")) 

664 all_storage_classes.update(dataset_type.storageClass_name.split("/")) 

665 current = DatasetType( 

666 current.name, 

667 current.dimensions, 

668 "/".join(sorted(all_storage_classes)), 

669 ) 

670 else: 

671 raise MissingDatasetTypeError( 

672 f"Definitions differ for input dataset type " 

673 f"{self.parent_dataset_type_name!r}; task {self.task_label!r} has " 

674 f"{dataset_type}, but the definition from {report_current_origin()} is " 

675 f"{current}. If the storage classes are compatible but different, " 

676 "registering the dataset type in the data repository in advance will avoid " 

677 "this error." 

678 ) 

679 elif not visualization_only and not dataset_type.is_compatible_with(current): 

680 raise IncompatibleDatasetTypeError( 

681 f"Incompatible definition for input dataset type {self.parent_dataset_type_name!r}; " 

682 f"task {self.task_label!r} has {dataset_type}, but the definition " 

683 f"from {report_current_origin()} is {current}." 

684 ) 

685 return current, is_initial_query_constraint, is_prerequisite 

686 else: 

687 return dataset_type, is_initial_query_constraint, is_prerequisite 

688 

689 def _to_xgraph_state(self) -> dict[str, Any]: 

690 # Docstring inherited. 

691 result = super()._to_xgraph_state() 

692 result["component"] = self.component 

693 result["is_prerequisite"] = self.is_prerequisite 

694 return result 

695 

696 def __reduce__(self) -> tuple[Callable[[dict[str, Any]], Edge], tuple[dict[str, Any]]]: 

697 return ( 

698 self._unreduce, 

699 ( 

700 dict( 

701 dataset_type_key=self.dataset_type_key, 

702 task_key=self.task_key, 

703 storage_class_name=self.storage_class_name, 

704 connection_name=self.connection_name, 

705 is_calibration=self.is_calibration, 

706 raw_dimensions=self.raw_dimensions, 

707 is_prerequisite=self.is_prerequisite, 

708 component=self.component, 

709 defer_query_constraint=self.defer_query_constraint, 

710 ), 

711 ), 

712 ) 

713 

714 

715class WriteEdge(Edge): 

716 """Representation of an output connection (including init-outputs) in a 

717 pipeline graph. 

718 

719 Notes 

720 ----- 

721 When included in an exported `networkx` graph (e.g. 

722 `PipelineGraph.make_xgraph`), write edges set the following edge 

723 attributes: 

724 

725 - ``parent_dataset_type_name`` 

726 - ``storage_class_name`` 

727 - ``is_init`` 

728 

729 As with `WRiteEdge` instance attributes, these descriptions of dataset 

730 types are those specific to a task, and may differ from the graph's 

731 resolved dataset type or (if `PipelineGraph.resolve` has not been called) 

732 there may not even be a consistent definition of the dataset type. 

733 """ 

734 

735 @property 

736 def nodes(self) -> tuple[NodeKey, NodeKey]: 

737 # Docstring inherited. 

738 return (self.task_key, self.dataset_type_key) 

739 

740 def adapt_dataset_type(self, dataset_type: DatasetType) -> DatasetType: 

741 # Docstring inherited. 

742 if self.storage_class_name != dataset_type.storageClass_name: 

743 return dataset_type.overrideStorageClass(self.storage_class_name) 

744 return dataset_type 

745 

746 def adapt_dataset_ref(self, ref: DatasetRef) -> DatasetRef: 

747 # Docstring inherited. 

748 if self.storage_class_name != ref.datasetType.storageClass_name: 

749 return ref.overrideStorageClass(self.storage_class_name) 

750 return ref 

751 

752 @classmethod 

753 def _from_connection_map( 

754 cls, 

755 task_key: NodeKey, 

756 connection_name: str, 

757 connection_map: Mapping[str, BaseConnection], 

758 ) -> WriteEdge: 

759 """Construct a `WriteEdge` instance from a `.BaseConnection` object. 

760 

761 Parameters 

762 ---------- 

763 task_key : `NodeKey` 

764 Key for the associated task node or task init node. 

765 connection_name : `str` 

766 Internal name for the connection as seen by the task,. 

767 connection_map : Mapping [ `str`, `.BaseConnection` ] 

768 Mapping of post-configuration object to draw dataset type 

769 information from, keyed by connection name. 

770 

771 Returns 

772 ------- 

773 edge : `WriteEdge` 

774 New edge instance. 

775 """ 

776 connection = connection_map[connection_name] 

777 parent_dataset_type_name, component = DatasetType.splitDatasetTypeName(connection.name) 

778 if component is not None: 

779 raise ValueError( 

780 f"Illegal output component dataset {connection.name!r} in task {task_key.name!r}." 

781 ) 

782 return cls( 

783 task_key=task_key, 

784 dataset_type_key=NodeKey(NodeType.DATASET_TYPE, parent_dataset_type_name), 

785 storage_class_name=connection.storageClass, 

786 connection_name=connection_name, 

787 # InitOutput connections don't have .isCalibration. 

788 is_calibration=getattr(connection, "isCalibration", False), 

789 # InitOutput connections don't have a .dimensions because they 

790 # always have empty dimensions. 

791 raw_dimensions=frozenset(getattr(connection, "dimensions", frozenset())), 

792 ) 

793 

794 def _resolve_dataset_type(self, current: DatasetType | None, universe: DimensionUniverse) -> DatasetType: 

795 """Participate in the construction of the `DatasetTypeNode` object 

796 associated with this edge. 

797 

798 Parameters 

799 ---------- 

800 current : `lsst.daf.butler.DatasetType` or `None` 

801 The current graph-wide `~lsst.daf.butler.DatasetType`, or `None`. 

802 This will always be the registry's definition of the parent dataset 

803 type, if one exists. 

804 universe : `lsst.daf.butler.DimensionUniverse` 

805 Object that holds all dimension definitions. 

806 

807 Returns 

808 ------- 

809 dataset_type : `~lsst.daf.butler.DatasetType` 

810 A dataset type compatible with this edge. If ``current`` was 

811 provided, this must be equal to it. 

812 

813 Raises 

814 ------ 

815 IncompatibleDatasetTypeError 

816 Raised if ``current is not None`` and this edge's definition is not 

817 compatible with it. 

818 """ 

819 try: 

820 dimensions = universe.conform(self.raw_dimensions) 

821 dataset_type = DatasetType( 

822 self.parent_dataset_type_name, 

823 dimensions, 

824 storageClass=self.storage_class_name, 

825 isCalibration=self.is_calibration, 

826 ) 

827 except Exception as err: 

828 err.add_note(f"In connection {self.connection_name!r} of task {self.task_label!r}.") 

829 raise 

830 if current is not None: 

831 if not current.is_compatible_with(dataset_type): 

832 raise IncompatibleDatasetTypeError( 

833 f"Incompatible definition for output dataset type {self.parent_dataset_type_name!r}: " 

834 f"task {self.task_label!r} has {dataset_type}, but data repository has {current}." 

835 ) 

836 return current 

837 else: 

838 return dataset_type