Coverage for python / lsst / pipe / base / quantum_graph_skeleton.py: 34%

206 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-22 08:57 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28"""An under-construction version of QuantumGraph and various helper 

29classes. 

30""" 

31 

32from __future__ import annotations 

33 

34__all__ = ( 

35 "DatasetKey", 

36 "PrerequisiteDatasetKey", 

37 "QuantumGraphSkeleton", 

38 "QuantumKey", 

39 "TaskInitKey", 

40) 

41 

42import dataclasses 

43from collections import defaultdict 

44from collections.abc import Iterable, Iterator, MutableMapping, Set 

45from typing import TYPE_CHECKING, Any, ClassVar, Literal 

46 

47import networkx 

48 

49from lsst.daf.butler import ( 

50 Butler, 

51 DataCoordinate, 

52 DataIdValue, 

53 DatasetRef, 

54 DimensionDataAttacher, 

55 DimensionDataExtractor, 

56 DimensionGroup, 

57 DimensionRecordSet, 

58) 

59from lsst.utils.logging import getLogger 

60 

61if TYPE_CHECKING: 

62 pass 

63 

64_LOG = getLogger(__name__) 

65 

66 

67@dataclasses.dataclass(slots=True, eq=True, frozen=True) 

68class QuantumKey: 

69 """Identifier type for quantum keys in a `QuantumGraphSkeleton`.""" 

70 

71 task_label: str 

72 """Label of the task in the pipeline.""" 

73 

74 data_id_values: tuple[DataIdValue, ...] 

75 """Data ID values of the quantum. 

76 

77 Note that keys are fixed given `task_label`, so using only the values here 

78 speeds up comparisons. 

79 """ 

80 

81 is_task: ClassVar[Literal[True]] = True 

82 """Whether this node represents a quantum or task initialization rather 

83 than a dataset (always `True`). 

84 """ 

85 

86 

87@dataclasses.dataclass(slots=True, eq=True, frozen=True) 

88class TaskInitKey: 

89 """Identifier type for task init keys in a `QuantumGraphSkeleton`.""" 

90 

91 task_label: str 

92 """Label of the task in the pipeline.""" 

93 

94 is_task: ClassVar[Literal[True]] = True 

95 """Whether this node represents a quantum or task initialization rather 

96 than a dataset (always `True`). 

97 """ 

98 

99 

100@dataclasses.dataclass(slots=True, eq=True, frozen=True) 

101class DatasetKey: 

102 """Identifier type for dataset keys in a `QuantumGraphSkeleton`.""" 

103 

104 parent_dataset_type_name: str 

105 """Name of the dataset type (never a component).""" 

106 

107 data_id_values: tuple[DataIdValue, ...] 

108 """Data ID values of the dataset. 

109 

110 Note that keys are fixed given `parent_dataset_type_name`, so using only 

111 the values here speeds up comparisons. 

112 """ 

113 

114 is_task: ClassVar[Literal[False]] = False 

115 """Whether this node represents a quantum or task initialization rather 

116 than a dataset (always `False`). 

117 """ 

118 

119 is_prerequisite: ClassVar[Literal[False]] = False 

120 

121 

122@dataclasses.dataclass(slots=True, eq=True, frozen=True) 

123class PrerequisiteDatasetKey: 

124 """Identifier type for prerequisite dataset keys in a 

125 `QuantumGraphSkeleton`. 

126 

127 Unlike regular datasets, prerequisites are not actually required to come 

128 from a find-first search of `input_collections`, so we don't want to 

129 assume that the same data ID implies the same dataset. Happily we also 

130 don't need to search for them by data ID in the graph, so we can use the 

131 dataset ID (UUID) instead. 

132 """ 

133 

134 parent_dataset_type_name: str 

135 """Name of the dataset type (never a component).""" 

136 

137 dataset_id_bytes: bytes 

138 """Dataset ID (UUID) as raw bytes.""" 

139 

140 is_task: ClassVar[Literal[False]] = False 

141 """Whether this node represents a quantum or task initialization rather 

142 than a dataset (always `False`). 

143 """ 

144 

145 is_prerequisite: ClassVar[Literal[True]] = True 

146 

147 

148type Key = QuantumKey | TaskInitKey | DatasetKey | PrerequisiteDatasetKey 

149 

150 

151class QuantumGraphSkeleton: 

152 """An under-construction quantum graph. 

153 

154 QuantumGraphSkeleton is intended for use inside `QuantumGraphBuilder` and 

155 its subclasses. 

156 

157 Parameters 

158 ---------- 

159 task_labels : `~collections.abc.Iterable` [ `str` ] 

160 The labels of all tasks whose quanta may be included in the graph, in 

161 topological order. 

162 

163 Notes 

164 ----- 

165 QuantumGraphSkeleton models a bipartite version of the quantum graph, in 

166 which both quanta and datasets are represented as nodes and each type of 

167 node only has edges to the other type. 

168 

169 Square-bracket (`getitem`) indexing returns a mutable mapping of a node's 

170 flexible attributes. 

171 

172 The details of the `QuantumGraphSkeleton` API (e.g. which operations 

173 operate on multiple nodes vs. a single node) are set by what's actually 

174 needed by current quantum graph generation algorithms. New variants can be 

175 added as needed, but adding all operations that *might* be useful for some 

176 future algorithm seems premature. 

177 """ 

178 

179 def __init__(self, task_labels: Iterable[str]): 

180 self._tasks: dict[str, tuple[TaskInitKey, set[QuantumKey]]] = {} 

181 self._xgraph: networkx.DiGraph = networkx.DiGraph() 

182 self._global_init_outputs: set[DatasetKey] = set() 

183 self._dimension_data: dict[str, DimensionRecordSet] = {} 

184 for task_label in task_labels: 

185 task_init_key = TaskInitKey(task_label) 

186 self._tasks[task_label] = (task_init_key, set()) 

187 self._xgraph.add_node(task_init_key) 

188 

189 def __contains__(self, key: Key) -> bool: 

190 return key in self._xgraph.nodes 

191 

192 def __getitem__(self, key: Key) -> MutableMapping[str, Any]: 

193 return self._xgraph.nodes[key] 

194 

195 def __iter__(self) -> Iterator[Key]: 

196 return iter(self._xgraph.nodes) 

197 

198 @property 

199 def n_nodes(self) -> int: 

200 """The total number of nodes of all types.""" 

201 return len(self._xgraph.nodes) 

202 

203 @property 

204 def n_edges(self) -> int: 

205 """The total number of edges.""" 

206 return len(self._xgraph.edges) 

207 

208 @property 

209 def has_any_quanta(self) -> bool: 

210 """Test whether this graph has any quanta.""" 

211 for _ in self.iter_all_quanta(): 

212 return True 

213 return False 

214 

215 def has_task(self, task_label: str) -> bool: 

216 """Test whether the given task is in this skeleton. 

217 

218 Tasks are only added to the skeleton at initialization, but may be 

219 removed by `remove_task` if they end up having no quanta. 

220 

221 Parameters 

222 ---------- 

223 task_label : `str` 

224 Task to check for. 

225 

226 Returns 

227 ------- 

228 has : `bool` 

229 `True` if the task is in this skeleton. 

230 """ 

231 return task_label in self._tasks 

232 

233 def get_task_init_node(self, task_label: str) -> TaskInitKey: 

234 """Return the graph node that represents a task's initialization. 

235 

236 Parameters 

237 ---------- 

238 task_label : `str` 

239 The task label to use. 

240 

241 Returns 

242 ------- 

243 node : `TaskInitKey` 

244 The graph node representing this task's initialization. 

245 """ 

246 return self._tasks[task_label][0] 

247 

248 def get_quanta(self, task_label: str) -> Set[QuantumKey]: 

249 """Return the quanta for the given task label. 

250 

251 Parameters 

252 ---------- 

253 task_label : `str` 

254 Label for the task. 

255 

256 Returns 

257 ------- 

258 quanta : `~collections.abc.Set` [ `QuantumKey` ] 

259 A set-like object with the identifiers of all quanta for the given 

260 task. *The skeleton object's set of quanta must not be modified 

261 while iterating over this container; make a copy if mutation during 

262 iteration is necessary*. 

263 """ 

264 return self._tasks[task_label][1] 

265 

266 @property 

267 def global_init_outputs(self) -> Set[DatasetKey]: 

268 """The set of dataset nodes that are not associated with any task.""" 

269 return self._global_init_outputs 

270 

271 def iter_all_quanta(self) -> Iterator[QuantumKey]: 

272 """Iterate over all quanta from any task, in topological (but otherwise 

273 unspecified) order. 

274 """ 

275 for _, quanta in self._tasks.values(): 

276 yield from quanta 

277 

278 def iter_outputs_of(self, quantum_key: QuantumKey | TaskInitKey) -> Iterator[DatasetKey]: 

279 """Iterate over the datasets produced by the given quantum. 

280 

281 Parameters 

282 ---------- 

283 quantum_key : `QuantumKey` or `TaskInitKey` 

284 Quantum to iterate over. 

285 

286 Returns 

287 ------- 

288 datasets : `~collections.abc.Iterator` of `DatasetKey` 

289 Datasets produced by the given quanta. 

290 """ 

291 return self._xgraph.successors(quantum_key) 

292 

293 def iter_inputs_of( 

294 self, quantum_key: QuantumKey | TaskInitKey 

295 ) -> Iterator[DatasetKey | PrerequisiteDatasetKey]: 

296 """Iterate over the datasets consumed by the given quantum. 

297 

298 Parameters 

299 ---------- 

300 quantum_key : `QuantumKey` or `TaskInitKey` 

301 Quantum to iterate over. 

302 

303 Returns 

304 ------- 

305 datasets : `~collections.abc.Iterator` of `DatasetKey` \ 

306 or `PrequisiteDatasetKey` 

307 Datasets consumed by the given quanta. 

308 """ 

309 return self._xgraph.predecessors(quantum_key) 

310 

311 def update(self, other: QuantumGraphSkeleton) -> None: 

312 """Copy all nodes from ``other`` to ``self``. 

313 

314 Parameters 

315 ---------- 

316 other : `QuantumGraphSkeleton` 

317 Source of nodes. The tasks in ``other`` must be a subset of the 

318 tasks in ``self`` (this method is expected to be used to populate 

319 a skeleton for a full from independent-subgraph skeletons). 

320 """ 

321 for task_label, (_, quanta) in other._tasks.items(): 

322 self._tasks[task_label][1].update(quanta) 

323 self._xgraph.update(other._xgraph) 

324 for record_set in other._dimension_data.values(): 

325 self._dimension_data.setdefault( 

326 record_set.element.name, DimensionRecordSet(record_set.element) 

327 ).update(record_set) 

328 

329 def add_quantum_node(self, task_label: str, data_id: DataCoordinate, **attrs: Any) -> QuantumKey: 

330 """Add a new node representing a quantum. 

331 

332 Parameters 

333 ---------- 

334 task_label : `str` 

335 Name of task. 

336 data_id : `~lsst.daf.butler.DataCoordinate` 

337 The data ID of the quantum. 

338 **attrs : `~typing.Any` 

339 Additional attributes. 

340 """ 

341 key = QuantumKey(task_label, data_id.required_values) 

342 self._xgraph.add_node(key, data_id=data_id, **attrs) 

343 self._tasks[key.task_label][1].add(key) 

344 return key 

345 

346 def add_dataset_node( 

347 self, 

348 parent_dataset_type_name: str, 

349 data_id: DataCoordinate, 

350 is_global_init_output: bool = False, 

351 **attrs: Any, 

352 ) -> DatasetKey: 

353 """Add a new node representing a dataset. 

354 

355 Parameters 

356 ---------- 

357 parent_dataset_type_name : `str` 

358 Name of the parent dataset type. 

359 data_id : `~lsst.daf.butler.DataCoordinate` 

360 The dataset data ID. 

361 is_global_init_output : `bool`, optional 

362 Whether this dataset is a global init output. 

363 **attrs : `~typing.Any` 

364 Additional attributes for the node. 

365 """ 

366 key = DatasetKey(parent_dataset_type_name, data_id.required_values) 

367 self._xgraph.add_node(key, data_id=data_id, **attrs) 

368 if is_global_init_output: 

369 assert isinstance(key, DatasetKey), str(key) 

370 self._global_init_outputs.add(key) 

371 return key 

372 

373 def add_prerequisite_node( 

374 self, 

375 ref: DatasetRef, 

376 **attrs: Any, 

377 ) -> PrerequisiteDatasetKey: 

378 """Add a new node representing a prerequisite input dataset. 

379 

380 Parameters 

381 ---------- 

382 ref : `~lsst.daf.butler.DatasetRef` 

383 The dataset ref of the prerequisite. 

384 **attrs : `~typing.Any` 

385 Additional attributes for the node. 

386 """ 

387 key = PrerequisiteDatasetKey(ref.datasetType.name, ref.id.bytes) 

388 self._xgraph.add_node(key, data_id=ref.dataId, ref=ref, **attrs) 

389 return key 

390 

391 def remove_quantum_node(self, key: QuantumKey, remove_outputs: bool) -> None: 

392 """Remove a node representing a quantum. 

393 

394 Parameters 

395 ---------- 

396 key : `QuantumKey` 

397 Identifier for the node. 

398 remove_outputs : `bool` 

399 If `True`, also remove all dataset nodes produced by this quantum. 

400 If `False`, any such dataset nodes will become overall inputs. 

401 """ 

402 _, quanta = self._tasks[key.task_label] 

403 quanta.remove(key) 

404 if remove_outputs: 

405 to_remove = list(self._xgraph.successors(key)) 

406 to_remove.append(key) 

407 self._xgraph.remove_nodes_from(to_remove) 

408 else: 

409 self._xgraph.remove_node(key) 

410 

411 def remove_dataset_nodes(self, keys: Iterable[DatasetKey | PrerequisiteDatasetKey]) -> None: 

412 """Remove nodes representing datasets. 

413 

414 Parameters 

415 ---------- 

416 keys : `~collections.abc.Iterable` of `DatasetKey`\ 

417 or `PrerequisiteDatasetKey` 

418 Nodes to remove. 

419 """ 

420 self._xgraph.remove_nodes_from(keys) 

421 

422 def remove_task(self, task_label: str) -> None: 

423 """Fully remove a task from the skeleton. 

424 

425 All init-output datasets and quanta for the task must already have been 

426 removed. 

427 

428 Parameters 

429 ---------- 

430 task_label : `str` 

431 Name of task to remove. 

432 """ 

433 task_init_key, quanta = self._tasks.pop(task_label) 

434 assert not quanta, "Cannot remove task unless all quanta have already been removed." 

435 assert not list(self._xgraph.successors(task_init_key)) 

436 self._xgraph.remove_node(task_init_key) 

437 

438 def add_input_edges( 

439 self, 

440 task_key: QuantumKey | TaskInitKey, 

441 dataset_keys: Iterable[DatasetKey | PrerequisiteDatasetKey], 

442 ) -> None: 

443 """Add edges connecting datasets to a quantum that consumes them. 

444 

445 Parameters 

446 ---------- 

447 task_key : `QuantumKey` or `TaskInitKey` 

448 Quantum to connect. 

449 dataset_keys : `~collections.abc.Iterable` of `DatasetKey`\ 

450 or `PrequisiteDatasetKey` 

451 Datasets to join to the quantum. 

452 

453 Notes 

454 ----- 

455 This must only be called if the task node has already been added. 

456 Use `add_input_edge` if this cannot be assumed. 

457 

458 Dataset nodes that are not already present will be created. 

459 """ 

460 assert task_key in self._xgraph, str(task_key) 

461 self._xgraph.add_edges_from((dataset_key, task_key) for dataset_key in dataset_keys) 

462 

463 def remove_input_edges( 

464 self, 

465 task_key: QuantumKey | TaskInitKey, 

466 dataset_keys: Iterable[DatasetKey | PrerequisiteDatasetKey], 

467 ) -> None: 

468 """Remove edges connecting datasets to a quantum that consumes them. 

469 

470 Parameters 

471 ---------- 

472 task_key : `QuantumKey` or `TaskInitKey` 

473 Quantum to disconnect. 

474 dataset_keys : `~collections.abc.Iterable` of `DatasetKey`\ 

475 or `PrequisiteDatasetKey` 

476 Datasets to remove from the quantum. 

477 """ 

478 self._xgraph.remove_edges_from((dataset_key, task_key) for dataset_key in dataset_keys) 

479 

480 def add_input_edge( 

481 self, 

482 task_key: QuantumKey | TaskInitKey, 

483 dataset_key: DatasetKey | PrerequisiteDatasetKey, 

484 ignore_unrecognized_quanta: bool = False, 

485 ) -> bool: 

486 """Add an edge connecting a dataset to a quantum that consumes it. 

487 

488 Parameters 

489 ---------- 

490 task_key : `QuantumKey` or `TaskInitKey` 

491 Identifier for the quantum node. 

492 dataset_key : `DatasetKey` or `PrerequisiteKey` 

493 Identifier for the dataset node. 

494 ignore_unrecognized_quanta : `bool`, optional 

495 If `False`, do nothing if the quantum node is not already present. 

496 If `True`, the quantum node is assumed to be present. 

497 

498 Returns 

499 ------- 

500 added : `bool` 

501 `True` if an edge was actually added, `False` if the quantum was 

502 not recognized and the edge was not added as a result. 

503 

504 Notes 

505 ----- 

506 Dataset nodes that are not already present will be created. 

507 """ 

508 if ignore_unrecognized_quanta and task_key not in self._xgraph: 

509 return False 

510 self._xgraph.add_edge(dataset_key, task_key) 

511 return True 

512 

513 def add_output_edge(self, task_key: QuantumKey | TaskInitKey, dataset_key: DatasetKey) -> None: 

514 """Add an edge connecting a dataset to the quantum that produces it. 

515 

516 Parameters 

517 ---------- 

518 task_key : `QuantumKey` or `TaskInitKey` 

519 Identifier for the quantum node. Must identify a node already 

520 present in the graph. 

521 dataset_key : `DatasetKey` 

522 Identifier for the dataset node. Must identify a node already 

523 present in the graph. 

524 """ 

525 assert task_key in self._xgraph, str(task_key) 

526 assert dataset_key in self._xgraph, str(dataset_key) 

527 self._xgraph.add_edge(task_key, dataset_key) 

528 

529 def remove_output_edge(self, dataset_key: DatasetKey) -> None: 

530 """Remove the edge connecting a dataset to the quantum that produces 

531 it. 

532 

533 Parameters 

534 ---------- 

535 dataset_key : `DatasetKey` 

536 Identifier for the dataset node. Must identify a node already 

537 present in the graph. 

538 """ 

539 (task_key,) = self._xgraph.predecessors(dataset_key) 

540 assert dataset_key in self._xgraph, str(dataset_key) 

541 self._xgraph.remove_edge(task_key, dataset_key) 

542 

543 def remove_orphan_datasets(self) -> None: 

544 """Remove any dataset nodes that do not have any edges.""" 

545 for orphan in list(networkx.isolates(self._xgraph)): 

546 if not orphan.is_task and orphan not in self._global_init_outputs: 

547 self._xgraph.remove_node(orphan) 

548 

549 def extract_overall_inputs(self) -> dict[DatasetKey | PrerequisiteDatasetKey, DatasetRef]: 

550 """Find overall input datasets. 

551 

552 Returns 

553 ------- 

554 datasets : `dict` [ `DatasetKey` or `PrerequisiteDatasetKey`, \ 

555 `~lsst.daf.butler.DatasetRef` ] 

556 Overall-input datasets, including prerequisites and init-inputs. 

557 """ 

558 result = {} 

559 for generation in networkx.algorithms.topological_generations(self._xgraph): 

560 for dataset_key in generation: 

561 if dataset_key.is_task: 

562 continue 

563 if (ref := self.get_dataset_ref(dataset_key)) is None: 

564 raise AssertionError( 

565 f"Logic bug in QG generation: dataset {dataset_key} was never resolved." 

566 ) 

567 result[dataset_key] = ref 

568 break 

569 return result 

570 

571 def set_dataset_ref( 

572 self, ref: DatasetRef, key: DatasetKey | PrerequisiteDatasetKey | None = None 

573 ) -> None: 

574 """Associate a dataset node with a `~lsst.daf.butler.DatasetRef` 

575 instance. 

576 

577 Parameters 

578 ---------- 

579 ref : `~lsst.daf.butler.DatasetRef` 

580 `~lsst.daf.butler.DatasetRef` to associate with the node. 

581 key : `DatasetKey` or `PrerequisiteDatasetKey`, optional 

582 Identifier for the graph node. If not provided, a `DatasetKey` 

583 is constructed from the dataset type name and data ID of ``ref``. 

584 """ 

585 if key is None: 

586 key = DatasetKey(ref.datasetType.name, ref.dataId.required_values) 

587 self._xgraph.nodes[key]["ref"] = ref 

588 

589 def set_output_for_skip(self, ref: DatasetRef) -> None: 

590 """Associate a dataset node with a `~lsst.daf.butler.DatasetRef` that 

591 represents an existing output in a collection where such outputs can 

592 cause a quantum to be skipped. 

593 

594 Parameters 

595 ---------- 

596 ref : `~lsst.daf.butler.DatasetRef` 

597 `~lsst.daf.butler.DatasetRef` to associate with the node. 

598 """ 

599 key = DatasetKey(ref.datasetType.name, ref.dataId.required_values) 

600 self._xgraph.nodes[key]["output_for_skip"] = ref 

601 

602 def set_output_in_the_way(self, ref: DatasetRef) -> None: 

603 """Associate a dataset node with a `~lsst.daf.butler.DatasetRef` that 

604 represents an existing output in the output RUN collection. 

605 

606 Parameters 

607 ---------- 

608 ref : `~lsst.daf.butler.DatasetRef` 

609 `~lsst.daf.butler.DatasetRef` to associate with the node. 

610 """ 

611 key = DatasetKey(ref.datasetType.name, ref.dataId.required_values) 

612 self._xgraph.nodes[key]["output_in_the_way"] = ref 

613 

614 def get_dataset_ref(self, key: DatasetKey | PrerequisiteDatasetKey) -> DatasetRef | None: 

615 """Return the `~lsst.daf.butler.DatasetRef` associated with the given 

616 node. 

617 

618 This does not return "output for skip" and "output in the way" 

619 datasets. 

620 

621 Parameters 

622 ---------- 

623 key : `DatasetKey` or `PrerequisiteDatasetKey` 

624 Identifier for the graph node. 

625 

626 Returns 

627 ------- 

628 ref : `~lsst.daf.butler.DatasetRef` or `None` 

629 Dataset reference associated with the node. 

630 """ 

631 return self._xgraph.nodes[key].get("ref") 

632 

633 def get_output_for_skip(self, key: DatasetKey) -> DatasetRef | None: 

634 """Return the `~lsst.daf.butler.DatasetRef` associated with the given 

635 node in a collection where it could lead to a quantum being skipped. 

636 

637 Parameters 

638 ---------- 

639 key : `DatasetKey` 

640 Identifier for the graph node. 

641 

642 Returns 

643 ------- 

644 ref : `~lsst.daf.butler.DatasetRef` or `None` 

645 Dataset reference associated with the node. 

646 """ 

647 return self._xgraph.nodes[key].get("output_for_skip") 

648 

649 def get_output_in_the_way(self, key: DatasetKey) -> DatasetRef | None: 

650 """Return the `~lsst.daf.butler.DatasetRef` associated with the given 

651 node in the output RUN collection. 

652 

653 Parameters 

654 ---------- 

655 key : `DatasetKey` 

656 Identifier for the graph node. 

657 

658 Returns 

659 ------- 

660 ref : `~lsst.daf.butler.DatasetRef` or `None` 

661 Dataset reference associated with the node. 

662 """ 

663 return self._xgraph.nodes[key].get("output_in_the_way") 

664 

665 def discard_output_in_the_way(self, key: DatasetKey) -> None: 

666 """Drop any `~lsst.daf.butler.DatasetRef` associated with this node in 

667 the output RUN collection. 

668 

669 Does nothing if there is no such `~lsst.daf.butler.DatasetRef`. 

670 

671 Parameters 

672 ---------- 

673 key : `DatasetKey` 

674 Identifier for the graph node. 

675 """ 

676 self._xgraph.nodes[key].pop("output_in_the_way", None) 

677 

678 def set_data_id(self, key: Key, data_id: DataCoordinate) -> None: 

679 """Set the data ID associated with a node. 

680 

681 This updates the data ID in any `~lsst.daf.butler.DatasetRef` objects 

682 associated with the node via `set_ref`, `set_output_for_skip`, or 

683 `set_output_in_the_way` as well, assuming it is an expanded version 

684 of the original data ID. 

685 

686 Parameters 

687 ---------- 

688 key : `Key` 

689 Identifier for the graph node. 

690 data_id : `~lsst.daf.butler.DataCoordinate` 

691 Data ID for the node. 

692 """ 

693 state: MutableMapping[str, Any] = self._xgraph.nodes[key] 

694 state["data_id"] = data_id 

695 ref: DatasetRef | None 

696 if (ref := state.get("ref")) is not None: 

697 state["ref"] = ref.expanded(data_id) 

698 output_for_skip: DatasetRef | None 

699 if (output_for_skip := state.get("output_for_skip")) is not None: 

700 state["output_for_skip"] = output_for_skip.expanded(data_id) 

701 output_in_the_way: DatasetRef | None 

702 if (output_in_the_way := state.get("output_in_the_way")) is not None: 

703 state["output_in_the_way"] = output_in_the_way.expanded(data_id) 

704 

705 def get_data_id(self, key: Key) -> DataCoordinate: 

706 """Return the full data ID for a quantum or dataset, if available. 

707 

708 Parameters 

709 ---------- 

710 key : `Key` 

711 Identifier for the graph node. 

712 

713 Returns 

714 ------- 

715 data_id : `~lsst.daf.butler.DataCoordinate` 

716 Expanded data ID for the node, if one is available. 

717 

718 Raises 

719 ------ 

720 KeyError 

721 Raised if this node does not have an expanded data ID. 

722 """ 

723 return self._xgraph.nodes[key]["data_id"] 

724 

725 def attach_dimension_records( 

726 self, 

727 butler: Butler, 

728 dimensions: DimensionGroup, 

729 dimension_records: Iterable[DimensionRecordSet] = (), 

730 ) -> None: 

731 """Attach dimension records to the data IDs in the skeleton. 

732 

733 This both attaches records to data IDs in the skeleton and aggregates 

734 any existing records on data IDS, so `get_dimension_data` returns all 

735 dimension records used in the skeleton. It can be called multiple 

736 times. 

737 

738 Parameters 

739 ---------- 

740 butler : `lsst.daf.butler.Butler` 

741 Butler to use to query for missing dimension records. 

742 dimensions : `lsst.daf.butler.DimensionGroup` 

743 Superset of all of the dimensions of all data IDs. 

744 dimension_records : `~collections.abc.Iterable` [ \ 

745 `lsst.daf.butler.DimensionRecordSet` ], optional 

746 Iterable of sets of dimension records to attach. 

747 """ 

748 for record_set in dimension_records: 

749 self._dimension_data.setdefault( 

750 record_set.element.name, DimensionRecordSet(record_set.element) 

751 ).update(record_set) 

752 # Group all nodes by data ID (and dimensions of data ID). 

753 data_ids_to_expand: defaultdict[DimensionGroup, defaultdict[DataCoordinate, list[Key]]] = defaultdict( 

754 lambda: defaultdict(list) 

755 ) 

756 extractor = DimensionDataExtractor.from_dimension_group(dimensions) 

757 data_id: DataCoordinate | None 

758 for node_key in self: 

759 if data_id := self[node_key].get("data_id"): 

760 if data_id.hasRecords(): 

761 extractor.update([data_id]) 

762 else: 

763 data_ids_to_expand[data_id.dimensions][data_id].append(node_key) 

764 # Add records we extracted from data IDs that were already expanded, in 

765 # case other nodes want them. 

766 for record_set in extractor.records.values(): 

767 self._dimension_data.setdefault( 

768 record_set.element.name, DimensionRecordSet(record_set.element) 

769 ).update(record_set) 

770 attacher = DimensionDataAttacher(records=self._dimension_data.values(), dimensions=dimensions) 

771 for dimensions, data_ids in data_ids_to_expand.items(): 

772 with butler.query() as query: 

773 # Butler query will be used as-needed to get dimension records 

774 # (from prerequisites) we didn't fetch in advance. These are 

775 # cached in the attacher so we don't look them up multiple 

776 # times. 

777 expanded_data_ids = attacher.attach(dimensions, data_ids.keys(), query=query) 

778 for expanded_data_id, node_keys in zip(expanded_data_ids, data_ids.values()): 

779 for node_key in node_keys: 

780 self.set_data_id(node_key, expanded_data_id) 

781 # Hold on to any records that we had to query for or extracted. 

782 self._dimension_data = attacher.records 

783 

784 def get_dimension_data(self) -> list[DimensionRecordSet]: 

785 """Return the dimension records attached to data IDs.""" 

786 return list(self._dimension_data.values())