Coverage for python / lsst / pipe / base / quantum_graph_builder.py: 16%

486 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-22 08:57 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28"""The base class for the QuantumGraph-generation algorithm and various 

29helper classes. 

30""" 

31 

32from __future__ import annotations 

33 

34__all__ = ( 

35 "EmptyDimensionsDatasets", 

36 "OutputExistsError", 

37 "PrerequisiteMissingError", 

38 "QuantumGraphBuilder", 

39 "QuantumGraphBuilderError", 

40) 

41 

42import dataclasses 

43import operator 

44from abc import ABC, abstractmethod 

45from collections import defaultdict 

46from collections.abc import Iterable, Mapping, Sequence 

47from typing import TYPE_CHECKING, Any, cast, final 

48 

49from lsst.daf.butler import ( 

50 Butler, 

51 CollectionType, 

52 DataCoordinate, 

53 DatasetRef, 

54 DatasetType, 

55 DimensionDataAttacher, 

56 DimensionUniverse, 

57 NamedKeyDict, 

58 NamedKeyMapping, 

59 Quantum, 

60) 

61from lsst.daf.butler._rubin import generate_uuidv7 

62from lsst.daf.butler.datastore.record_data import DatastoreRecordData 

63from lsst.daf.butler.registry import MissingCollectionError, MissingDatasetTypeError 

64from lsst.utils.logging import LsstLogAdapter, getLogger 

65from lsst.utils.timer import timeMethod 

66 

67from . import automatic_connection_constants as acc 

68from ._status import NoWorkFound 

69from ._task_metadata import TaskMetadata 

70from .connections import AdjustQuantumHelper, QuantaAdjuster 

71from .pipeline_graph import Edge, PipelineGraph, TaskNode 

72from .prerequisite_helpers import PrerequisiteInfo, SkyPixBoundsBuilder, TimespanBuilder 

73from .quantum_graph_skeleton import ( 

74 DatasetKey, 

75 PrerequisiteDatasetKey, 

76 QuantumGraphSkeleton, 

77 QuantumKey, 

78 TaskInitKey, 

79) 

80 

81if TYPE_CHECKING: 

82 from .graph import QuantumGraph 

83 from .pipeline import TaskDef 

84 from .quantum_graph import PredictedDatasetModel, PredictedQuantumGraphComponents 

85 

86 

87class QuantumGraphBuilderError(Exception): 

88 """Base class for exceptions generated by QuantumGraphBuilder.""" 

89 

90 pass 

91 

92 

93class OutputExistsError(QuantumGraphBuilderError): 

94 """Exception generated when output datasets already exist.""" 

95 

96 pass 

97 

98 

99class PrerequisiteMissingError(QuantumGraphBuilderError): 

100 """Exception generated when a prerequisite dataset does not exist.""" 

101 

102 pass 

103 

104 

105class InitInputMissingError(QuantumGraphBuilderError): 

106 """Exception generated when an init-input dataset does not exist.""" 

107 

108 pass 

109 

110 

111class QuantumGraphBuilder(ABC): 

112 """An abstract base class for building `.QuantumGraph` objects from a 

113 pipeline. 

114 

115 Parameters 

116 ---------- 

117 pipeline_graph : `.pipeline_graph.PipelineGraph` 

118 Pipeline to build a `.QuantumGraph` from, as a graph. Will be resolved 

119 in-place with the given butler (any existing resolution is ignored). 

120 butler : `lsst.daf.butler.Butler` 

121 Client for the data repository. Should be read-only. 

122 input_collections : `~collections.abc.Sequence` [ `str` ], optional 

123 Collections to search for overall-input datasets. If not provided, 

124 ``butler.collections`` is used (and must not be empty). 

125 output_run : `str`, optional 

126 Output `~lsst.daf.butler.CollectionType.RUN` collection. If not 

127 provided, ``butler.run`` is used (and must not be `None`). 

128 skip_existing_in : `~collections.abc.Sequence` [ `str` ], optional 

129 Collections to search for outputs that already exist for the purpose of 

130 skipping quanta that have already been run. 

131 clobber : `bool`, optional 

132 Whether to raise if predicted outputs already exist in ``output_run`` 

133 (not including those quanta that would be skipped because they've 

134 already been run). This never actually clobbers outputs; it just 

135 informs the graph generation algorithm whether execution will run with 

136 clobbering enabled. This is ignored if ``output_run`` does not exist. 

137 

138 Notes 

139 ----- 

140 Constructing a `QuantumGraphBuilder` will run queries for existing datasets 

141 with empty data IDs (including but not limited to init inputs and outputs), 

142 in addition to resolving the given pipeline graph and testing for existence 

143 of the ``output`` run collection. 

144 

145 The `build` method splits the pipeline graph into independent subgraphs, 

146 then calls the abstract method `process_subgraph` on each, to allow 

147 concrete implementations to populate the rough graph structure (the 

148 `~.quantum_graph_skeleton.QuantumGraphSkeleton` class), including searching 

149 for existing datasets. The `build` method then: 

150 

151 - assembles `lsst.daf.butler.Quantum` instances from all data IDs in the 

152 skeleton; 

153 - looks for existing outputs found in ``skip_existing_in`` to see if any 

154 quanta should be skipped; 

155 - calls `PipelineTaskConnections.adjustQuantum` on all quanta, adjusting 

156 downstream quanta appropriately when preliminary predicted outputs are 

157 rejected (pruning nodes that will not have the inputs they need to run); 

158 - attaches datastore records and registry dataset types to the graph. 

159 

160 In addition to implementing `process_subgraph`, derived classes are 

161 generally expected to add new construction keyword-only arguments to 

162 control the data IDs of the quantum graph, while forwarding all of the 

163 arguments defined in the base class to `super`. 

164 """ 

165 

166 def __init__( 

167 self, 

168 pipeline_graph: PipelineGraph, 

169 butler: Butler, 

170 *, 

171 input_collections: Sequence[str] | None = None, 

172 output_run: str | None = None, 

173 skip_existing_in: Sequence[str] = (), 

174 clobber: bool = False, 

175 ): 

176 self.log = getLogger(__name__) 

177 self.metadata = TaskMetadata() 

178 self._pipeline_graph = pipeline_graph 

179 if input_collections is None: 

180 input_collections = butler.collections.defaults 

181 if not input_collections: 

182 raise ValueError("No input collections provided.") 

183 self.input_collections = input_collections 

184 if output_run is None: 

185 output_run = butler.run 

186 if not output_run: 

187 raise ValueError("No output RUN collection provided.") 

188 self.butler = butler.clone(collections=input_collections) 

189 self.output_run = output_run 

190 self.skip_existing_in = skip_existing_in 

191 self.empty_data_id = DataCoordinate.make_empty(butler.dimensions) 

192 self.clobber = clobber 

193 # See whether the output run already exists. 

194 self.output_run_exists = False 

195 try: 

196 if self.butler.registry.getCollectionType(self.output_run) is not CollectionType.RUN: 

197 raise RuntimeError(f"{self.output_run!r} is not a RUN collection.") 

198 self.output_run_exists = True 

199 except MissingCollectionError: 

200 # If the run doesn't exist we never need to clobber. This is not 

201 # an error so you can run with clobber=True the first time you 

202 # attempt some processing as well as all subsequent times, instead 

203 # of forcing the user to make the first attempt different. 

204 self.clobber = False 

205 # We need to know whether the skip_existing_in collection sequence 

206 # starts with the output run collection, as an optimization to avoid 

207 # queries later. 

208 try: 

209 skip_existing_in_flat = self.butler.collections.query(self.skip_existing_in, flatten_chains=True) 

210 except MissingCollectionError: 

211 skip_existing_in_flat = [] 

212 if not skip_existing_in_flat: 

213 self.skip_existing_in = [] 

214 if self.skip_existing_in and self.output_run_exists: 

215 self.skip_existing_starts_with_output_run = self.output_run == skip_existing_in_flat[0] 

216 else: 

217 self.skip_existing_starts_with_output_run = False 

218 try: 

219 packages_storage_class = butler.get_dataset_type(acc.PACKAGES_INIT_OUTPUT_NAME).storageClass_name 

220 except MissingDatasetTypeError: 

221 packages_storage_class = acc.PACKAGES_INIT_OUTPUT_STORAGE_CLASS 

222 self._global_init_output_types = { 

223 acc.PACKAGES_INIT_OUTPUT_NAME: DatasetType( 

224 acc.PACKAGES_INIT_OUTPUT_NAME, 

225 self.universe.empty, 

226 packages_storage_class, 

227 ) 

228 } 

229 with self.butler.registry.caching_context(): 

230 self._pipeline_graph.resolve(self.butler.registry) 

231 self.empty_dimensions_datasets = self._find_empty_dimension_datasets() 

232 self.prerequisite_info = { 

233 task_node.label: PrerequisiteInfo(task_node, self._pipeline_graph) 

234 for task_node in pipeline_graph.tasks.values() 

235 } 

236 

237 log: LsstLogAdapter 

238 """Logger to use for all quantum-graph generation messages. 

239 

240 General and per-task status messages should be logged at `~logging.INFO` 

241 level or higher, per-dataset-type status messages should be logged at 

242 `~lsst.utils.logging.VERBOSE` or higher, and per-data-ID status messages 

243 should be logged at `logging.DEBUG` or higher. 

244 """ 

245 

246 metadata: TaskMetadata 

247 """Metadata to store in the QuantumGraph. 

248 

249 The `TaskMetadata` class is used here primarily in order to enable 

250 resource-usage collection with the `lsst.utils.timer.timeMethod` decorator. 

251 """ 

252 

253 butler: Butler 

254 """Client for the data repository. 

255 

256 Should be read-only. 

257 """ 

258 

259 input_collections: Sequence[str] 

260 """Collections to search for overall-input datasets. 

261 """ 

262 

263 output_run: str 

264 """Output `~lsst.daf.butler.CollectionType.RUN` collection. 

265 """ 

266 

267 skip_existing_in: Sequence[str] 

268 """Collections to search for outputs that already exist for the purpose 

269 of skipping quanta that have already been run. 

270 """ 

271 

272 clobber: bool 

273 """Whether to raise if predicted outputs already exist in ``output_run`` 

274 

275 This never actually clobbers outputs; it just informs the graph generation 

276 algorithm whether execution will run with clobbering enabled. This is 

277 always `False` if `output_run_exists` is `False`. 

278 """ 

279 

280 empty_data_id: DataCoordinate 

281 """An empty data ID in the data repository's dimension universe. 

282 """ 

283 

284 output_run_exists: bool 

285 """Whether the output run exists in the data repository already. 

286 """ 

287 

288 skip_existing_starts_with_output_run: bool 

289 """Whether the `skip_existing_in` sequence begins with `output_run`. 

290 

291 If this is true, any dataset found in `output_run` can be used to 

292 short-circuit queries in `skip_existing_in`. 

293 """ 

294 

295 empty_dimensions_datasets: EmptyDimensionsDatasets 

296 """Struct holding datasets with empty dimensions that have already been 

297 found in the data repository. 

298 """ 

299 

300 prerequisite_info: Mapping[str, PrerequisiteInfo] 

301 """Helper objects for finding prerequisite inputs, organized by task label. 

302 

303 Subclasses that find prerequisites should remove the 

304 covered `~prerequisite_helpers.PrerequisiteFinder` objects from this 

305 attribute. 

306 """ 

307 

308 @property 

309 def universe(self) -> DimensionUniverse: 

310 """Definitions of all data dimensions.""" 

311 return self.butler.dimensions 

312 

313 @final 

314 @timeMethod 

315 def build( 

316 self, metadata: Mapping[str, Any] | None = None, attach_datastore_records: bool = True 

317 ) -> QuantumGraph: 

318 """Build the quantum graph, returning an old `QuantumGraph` instance. 

319 

320 Parameters 

321 ---------- 

322 metadata : `~collections.abc.Mapping`, optional 

323 Flexible metadata to add to the quantum graph. 

324 attach_datastore_records : `bool`, optional 

325 Whether to include datastore records in the graph. Required for 

326 `lsst.daf.butler.QuantumBackedButler` execution. 

327 

328 Returns 

329 ------- 

330 quantum_graph : `.QuantumGraph` 

331 DAG describing processing to be performed. 

332 

333 Notes 

334 ----- 

335 External code is expected to construct a `QuantumGraphBuilder` and then 

336 call this method exactly once. See class documentation for details on 

337 what it does. 

338 """ 

339 skeleton = self._build_skeleton(attach_datastore_records=attach_datastore_records) 

340 if metadata is None: 

341 metadata = { 

342 "input": list(self.input_collections), 

343 "output_run": self.output_run, 

344 } 

345 return self._construct_quantum_graph(skeleton, metadata) 

346 

347 def finish( 

348 self, 

349 output: str | None = None, 

350 metadata: Mapping[str, Any] | None = None, 

351 attach_datastore_records: bool = True, 

352 ) -> PredictedQuantumGraphComponents: 

353 """Return quantum graph components that can be used to save or 

354 construct a `PredictedQuantumGraph` instance. 

355 

356 Parameters 

357 ---------- 

358 output : `str` or `None`, optional 

359 Output `~lsst.daf.butler.CollectionType.CHAINED` collection that 

360 combines the input and output collections. 

361 metadata : `~collections.abc.Mapping`, optional 

362 Mapping of JSON-friendly metadata. Collection information, the 

363 current user, and the current timestamp are automatically 

364 included. 

365 attach_datastore_records : `bool`, optional 

366 Whether to include datastore records for overall inputs for 

367 `~lsst.daf.butler.QuantumBackedButler`. 

368 

369 Returns 

370 ------- 

371 components : `.quantum_graph.PredictedQuantumGraphComponents` 

372 Components that can be used to construct a graph object and/or save 

373 it to disk. 

374 """ 

375 skeleton = self._build_skeleton(attach_datastore_records=attach_datastore_records) 

376 return self._construct_components(skeleton, output=output, metadata=metadata) 

377 

378 def _build_skeleton(self, attach_datastore_records: bool = True) -> QuantumGraphSkeleton: 

379 """Build a complete skeleton for the quantum graph. 

380 

381 Parameters 

382 ---------- 

383 attach_datastore_records : `bool`, optional 

384 Whether to include datastore records in the graph. Required for 

385 `lsst.daf.butler.QuantumBackedButler` execution. 

386 

387 Returns 

388 ------- 

389 quantum_graph_skeleton : `QuantumGraphSkeleton` 

390 DAG describing processing to be performed. 

391 """ 

392 with self.butler.registry.caching_context(): 

393 full_skeleton = QuantumGraphSkeleton(self._pipeline_graph.tasks) 

394 subgraphs = list(self._pipeline_graph.split_independent()) 

395 for i, subgraph in enumerate(subgraphs): 

396 self.log.info( 

397 "Processing pipeline subgraph %d of %d with %d task(s).", 

398 i + 1, 

399 len(subgraphs), 

400 len(subgraph.tasks), 

401 ) 

402 self.log.verbose("Subgraph tasks: [%s]", ", ".join(label for label in subgraph.tasks)) 

403 subgraph_skeleton = self.process_subgraph(subgraph) 

404 full_skeleton.update(subgraph_skeleton) 

405 # Loop over tasks to apply skip-existing logic and add missing 

406 # prerequisites. The pipeline graph must be topologically sorted, 

407 # so a quantum is only processed after any quantum that provides 

408 # its inputs has been processed. 

409 skipped_quanta: dict[str, list[QuantumKey]] = {} 

410 for task_node in self._pipeline_graph.tasks.values(): 

411 skipped_quanta[task_node.label] = self._resolve_task_quanta(task_node, full_skeleton) 

412 # Add any dimension records not handled by the subclass, and 

413 # aggregate any that were added directly to data IDs. 

414 full_skeleton.attach_dimension_records(self.butler, self._pipeline_graph.get_all_dimensions()) 

415 # Loop over tasks again to run the adjust hooks. 

416 for task_node in self._pipeline_graph.tasks.values(): 

417 self._adjust_task_quanta(task_node, full_skeleton, skipped_quanta[task_node.label]) 

418 # Add global init-outputs to the skeleton. 

419 for dataset_type in self._global_init_output_types.values(): 

420 dataset_key = full_skeleton.add_dataset_node( 

421 dataset_type.name, self.empty_data_id, is_global_init_output=True 

422 ) 

423 ref = self.empty_dimensions_datasets.outputs_in_the_way.get(dataset_key) 

424 if ref is None: 

425 ref = DatasetRef(dataset_type, self.empty_data_id, run=self.output_run) 

426 full_skeleton.set_dataset_ref(ref, dataset_key) 

427 # Remove dataset nodes with no edges that are not global init 

428 # outputs, which are generally overall-inputs whose original quanta 

429 # end up skipped or with no work to do (we can't remove these along 

430 # with the quanta because no quantum knows if its the only 

431 # consumer). 

432 full_skeleton.remove_orphan_datasets() 

433 if attach_datastore_records: 

434 self._attach_datastore_records(full_skeleton) 

435 return full_skeleton 

436 

437 @abstractmethod 

438 def process_subgraph(self, subgraph: PipelineGraph) -> QuantumGraphSkeleton: 

439 """Build the rough structure for an independent subset of the 

440 `.QuantumGraph` and query for relevant existing datasets. 

441 

442 Parameters 

443 ---------- 

444 subgraph : `.pipeline_graph.PipelineGraph` 

445 Subset of the pipeline graph that should be processed by this call. 

446 This is always resolved and topologically sorted. It should not be 

447 modified. 

448 

449 Returns 

450 ------- 

451 skeleton : `.quantum_graph_skeleton.QuantumGraphSkeleton` 

452 Class representing an initial quantum graph. See 

453 `.quantum_graph_skeleton.QuantumGraphSkeleton` docs for details. 

454 After this is returned, the object may be modified in-place in 

455 unspecified ways. 

456 

457 Notes 

458 ----- 

459 The `.quantum_graph_skeleton.QuantumGraphSkeleton` should associate 

460 `lsst.daf.butler.DatasetRef` objects with nodes for existing datasets. 

461 In particular: 

462 

463 - `.quantum_graph_skeleton.QuantumGraphSkeleton.set_dataset_ref` must 

464 be used to associate existing datasets with all overall-input dataset 

465 nodes in the skeleton by querying `input_collections`. This includes 

466 all standard input nodes and any prerequisite nodes added by the 

467 method (prerequisite nodes may also be left out entirely, as the base 

468 class can add them later, albeit possibly less efficiently). 

469 - `.quantum_graph_skeleton.QuantumGraphSkeleton.set_output_for_skip` 

470 must be used to associate existing datasets with output dataset nodes 

471 by querying `skip_existing_in`. 

472 - `.quantum_graph_skeleton.QuantumGraphSkeleton.add_output_in_the_way` 

473 must be used to associated existing outputs with output dataset nodes 

474 by querying `output_run` if `output_run_exists` is `True`. Note that 

475 the presence of such datasets is not automatically an error, even if 

476 `clobber` is `False`, as these may be quanta that will be skipped. 

477 

478 `lsst.daf.butler.DatasetRef` objects for existing datasets with empty 

479 data IDs in all of the above categories may be found in the 

480 `empty_dimensions_datasets` attribute, as these are queried for prior 

481 to this call by the base class, but associating them with graph nodes 

482 is still this method's responsibility. 

483 

484 Dataset types should never be components and should always use the 

485 "common" storage class definition in `pipeline_graph.DatasetTypeNode` 

486 (which is the data repository definition when the dataset type is 

487 registered). 

488 """ 

489 raise NotImplementedError() 

490 

491 @final 

492 @timeMethod 

493 def _resolve_task_quanta(self, task_node: TaskNode, skeleton: QuantumGraphSkeleton) -> list[QuantumKey]: 

494 """Process the quanta for one task in a skeleton graph to skip those 

495 that have already completed and add missing prerequisite inputs. 

496 

497 Parameters 

498 ---------- 

499 task_node : `pipeline_graph.TaskNode` 

500 Node for this task in the pipeline graph. 

501 skeleton : `.quantum_graph_skeleton.QuantumGraphSkeleton` 

502 Preliminary quantum graph, to be modified in-place. 

503 

504 Returns 

505 ------- 

506 skipped_quanta : `list` [ `.quantum_skeleton_graph.QuantumKey` ] 

507 Keys of quanta that were already skipped because their metadata 

508 already exists in a ``skip_existing_in`` collections. 

509 

510 Notes 

511 ----- 

512 This method modifies ``skeleton`` in-place in several ways: 

513 

514 - It associates a `lsst.daf.butler.DatasetRef` with all output datasets 

515 and drops input dataset nodes that do not have a 

516 `lsst.daf.butler.DatasetRef` already. This ensures producing and 

517 consuming tasks start from the same `lsst.daf.butler.DatasetRef`. 

518 - It removes quantum nodes that are to be skipped because their outputs 

519 already exist in `skip_existing_in`. It also marks their outputs 

520 as no longer in the way. 

521 - It adds prerequisite dataset nodes and edges that connect them to the 

522 quanta that consume them. 

523 """ 

524 # Extract the helper object for the prerequisite inputs of this task, 

525 # and tell it to prepare to construct skypix bounds and timespans for 

526 # each quantum (these will automatically do nothing if nothing needs 

527 # those bounds). 

528 task_prerequisite_info = self.prerequisite_info[task_node.label] 

529 task_prerequisite_info.update_bounds() 

530 # Loop over all quanta for this task, remembering the ones we've 

531 # gotten rid of. 

532 skipped_quanta = [] 

533 for quantum_key in skeleton.get_quanta(task_node.label): 

534 if self._skip_quantum_if_metadata_exists(task_node, quantum_key, skeleton): 

535 skipped_quanta.append(quantum_key) 

536 continue 

537 quantum_data_id = skeleton[quantum_key]["data_id"] 

538 skypix_bounds_builder = task_prerequisite_info.bounds.make_skypix_bounds_builder(quantum_data_id) 

539 timespan_builder = task_prerequisite_info.bounds.make_timespan_builder(quantum_data_id) 

540 self._update_quantum_for_adjust( 

541 quantum_key, 

542 skeleton, 

543 task_prerequisite_info, 

544 skypix_bounds_builder, 

545 timespan_builder, 

546 ) 

547 for skipped_quantum in skipped_quanta: 

548 skeleton.remove_quantum_node(skipped_quantum, remove_outputs=False) 

549 return skipped_quanta 

550 

551 @final 

552 @timeMethod 

553 def _adjust_task_quanta( 

554 self, task_node: TaskNode, skeleton: QuantumGraphSkeleton, skipped_quanta: list[QuantumKey] 

555 ) -> None: 

556 """Process the quanta for one task in a skeleton graph by calling the 

557 ``adjust_all_quanta`` and ``adjustQuantum`` hooks. 

558 

559 Parameters 

560 ---------- 

561 task_node : `pipeline_graph.TaskNode` 

562 Node for this task in the pipeline graph. 

563 skeleton : `.quantum_graph_skeleton.QuantumGraphSkeleton` 

564 Preliminary quantum graph, to be modified in-place. 

565 skipped_quanta : `list` [ `.quantum_skeleton_graph.QuantumKey` ] 

566 Keys of quanta that were already skipped because their metadata 

567 already exists in a ``skip_existing_in`` collections. 

568 

569 Notes 

570 ----- 

571 This method modifies ``skeleton`` in-place in several ways: 

572 

573 - It adds "inputs", "outputs", and "init_inputs" attributes to the 

574 quantum nodes, holding the same `NamedValueMapping` objects needed to 

575 construct an actual `Quantum` instances. 

576 - It removes quantum nodes whose 

577 `~PipelineTaskConnections.adjustQuantum` calls raise `NoWorkFound` or 

578 predict no outputs; 

579 - It removes the nodes of output datasets that are "adjusted away". 

580 - It removes the edges of input datasets that are "adjusted away". 

581 

582 The difference between how adjusted inputs and outputs are handled 

583 reflects the fact that many quanta can share the same input, but only 

584 one produces each output. This can lead to the graph having 

585 superfluous isolated nodes after processing is complete, but these 

586 should only be removed after all the quanta from all tasks have been 

587 processed. 

588 """ 

589 # Give the task a chance to adjust all quanta together. This 

590 # operates directly on the skeleton (via a the 'adjuster', which 

591 # is just an interface adapter). 

592 adjuster = QuantaAdjuster(task_node.label, self._pipeline_graph, skeleton, self.butler) 

593 task_node.get_connections().adjust_all_quanta(adjuster) 

594 # Loop over all quanta again, remembering those we get rid of in other 

595 # ways. 

596 no_work_quanta = [] 

597 for quantum_key in skeleton.get_quanta(task_node.label): 

598 adjusted_outputs = self._adapt_quantum_outputs(task_node, quantum_key, skeleton) 

599 adjusted_inputs = self._adapt_quantum_inputs(task_node, quantum_key, skeleton) 

600 # Give the task's Connections class an opportunity to remove 

601 # some inputs, or complain if they are unacceptable. This will 

602 # raise if one of the check conditions is not met, which is the 

603 # intended behavior. 

604 helper = AdjustQuantumHelper(inputs=adjusted_inputs, outputs=adjusted_outputs) 

605 quantum_data_id = skeleton[quantum_key]["data_id"] 

606 try: 

607 helper.adjust_in_place(task_node.get_connections(), task_node.label, quantum_data_id) 

608 except NoWorkFound as err: 

609 # Do not generate this quantum; it would not produce any 

610 # outputs. Remove it and all of the outputs it might have 

611 # produced from the skeleton. 

612 try: 

613 _, connection_name, _ = err.args 

614 details = f"not enough datasets for connection {connection_name}." 

615 except ValueError: 

616 details = str(err) 

617 self.log.debug( 

618 "No work found for quantum %s of task %s: %s", 

619 quantum_key.data_id_values, 

620 quantum_key.task_label, 

621 details, 

622 ) 

623 no_work_quanta.append(quantum_key) 

624 continue 

625 if helper.outputs_adjusted: 

626 if not any(adjusted_refs for adjusted_refs in helper.outputs.values()): 

627 # No outputs also means we don't generate this quantum. 

628 self.log.debug( 

629 "No outputs predicted for quantum %s of task %s.", 

630 quantum_key.data_id_values, 

631 quantum_key.task_label, 

632 ) 

633 no_work_quanta.append(quantum_key) 

634 continue 

635 # Remove output nodes that were not retained by 

636 # adjustQuantum. 

637 skeleton.remove_dataset_nodes( 

638 self._find_removed(skeleton.iter_outputs_of(quantum_key), helper.outputs) 

639 ) 

640 if helper.inputs_adjusted: 

641 if not any(bool(adjusted_refs) for adjusted_refs in helper.inputs.values()): 

642 raise QuantumGraphBuilderError( 

643 f"adjustQuantum implementation for {task_node.label}@{quantum_key.data_id_values} " 

644 "returned outputs but no inputs." 

645 ) 

646 # Remove input dataset edges that were not retained by 

647 # adjustQuantum. We can't remove the input dataset nodes 

648 # because some other quantum might still want them. 

649 skeleton.remove_input_edges( 

650 quantum_key, self._find_removed(skeleton.iter_inputs_of(quantum_key), helper.inputs) 

651 ) 

652 # Save the adjusted inputs and outputs to the quantum node's 

653 # state so we don't have to regenerate those data structures 

654 # from the graph. 

655 skeleton[quantum_key]["inputs"] = helper.inputs 

656 skeleton[quantum_key]["outputs"] = helper.outputs 

657 for no_work_quantum in no_work_quanta: 

658 skeleton.remove_quantum_node(no_work_quantum, remove_outputs=True) 

659 remaining_quanta = skeleton.get_quanta(task_node.label) 

660 self._resolve_task_init(task_node, skeleton, bool(skipped_quanta)) 

661 message_terms = [] 

662 if no_work_quanta: 

663 message_terms.append(f"{len(no_work_quanta)} had no work to do") 

664 if skipped_quanta: 

665 message_terms.append(f"{len(skipped_quanta)} previously succeeded") 

666 if adjuster.n_removed: 

667 message_terms.append(f"{adjuster.n_removed} removed by adjust_all_quanta") 

668 message_parenthetical = f" ({', '.join(message_terms)})" if message_terms else "" 

669 if remaining_quanta: 

670 self.log.info( 

671 "Generated %s for task %s%s.", 

672 _quantum_or_quanta(len(remaining_quanta)), 

673 task_node.label, 

674 message_parenthetical, 

675 ) 

676 else: 

677 self.log.info( 

678 "Dropping task %s because no quanta remain%s.", task_node.label, message_parenthetical 

679 ) 

680 skeleton.remove_task(task_node.label) 

681 if len(no_work_quanta) > len(remaining_quanta): 

682 only_overall_inputs = self._get_task_inputs_if_overall_only(task_node) 

683 self.log.warning( 

684 "More than half of %s quanta had no work to do given available inputs.\n" 

685 "A query constraint on one of %s may yield a much faster build.", 

686 task_node.label, 

687 only_overall_inputs, 

688 ) 

689 

690 def _get_task_inputs_if_overall_only(self, task_node: TaskNode) -> list[str] | None: 

691 """If the given task consumes only overall-inputs, return their names. 

692 Otherwise return `None`. 

693 """ 

694 result: list[str] = [] 

695 for read_edge in task_node.inputs.values(): 

696 if self._pipeline_graph.producer_of(read_edge.parent_dataset_type_name) is None: 

697 result.append(read_edge.parent_dataset_type_name) 

698 else: 

699 return None 

700 return result 

701 

702 def _skip_quantum_if_metadata_exists( 

703 self, task_node: TaskNode, quantum_key: QuantumKey, skeleton: QuantumGraphSkeleton 

704 ) -> bool: 

705 """Identify and drop quanta that should be skipped because their 

706 metadata datasets already exist. 

707 

708 Parameters 

709 ---------- 

710 task_node : `pipeline_graph.TaskNode` 

711 Node for this task in the pipeline graph. 

712 quantum_key : `QuantumKey` 

713 Identifier for this quantum in the graph. 

714 skeleton : `.quantum_graph_skeleton.QuantumGraphSkeleton` 

715 Preliminary quantum graph, to be modified in-place. 

716 

717 Returns 

718 ------- 

719 skipped : `bool` 

720 `True` if the quantum is being skipped and has been removed from 

721 the graph, `False` otherwise. 

722 

723 Notes 

724 ----- 

725 If the metadata dataset for this quantum exists in the 

726 `skip_existing_in` collections, the quantum will be skipped. This 

727 causes the quantum node to be removed from the graph. Dataset nodes 

728 that were previously the outputs of this quantum will be associated 

729 with `lsst.daf.butler.DatasetRef` objects that were found in 

730 ``skip_existing_in``, or will be removed if there is no such dataset 

731 there. Any output dataset in `output_run` will be removed from the 

732 "output in the way" category. 

733 """ 

734 metadata_dataset_key = DatasetKey( 

735 task_node.metadata_output.parent_dataset_type_name, quantum_key.data_id_values 

736 ) 

737 if skeleton.get_output_for_skip(metadata_dataset_key): 

738 # This quantum's metadata is already present in the the 

739 # skip_existing_in collections; we'll skip it. But the presence of 

740 # the metadata dataset doesn't guarantee that all of the other 

741 # outputs we predicted are present; we have to check. 

742 for output_dataset_key in list(skeleton.iter_outputs_of(quantum_key)): 

743 # If this dataset was "in the way" (i.e. already in the 

744 # output run), it isn't anymore. 

745 skeleton.discard_output_in_the_way(output_dataset_key) 

746 if (output_ref := skeleton.get_output_for_skip(output_dataset_key)) is not None: 

747 # Populate the skeleton graph's node attributes 

748 # with the existing DatasetRef, just like a 

749 # predicted output of a non-skipped quantum. 

750 skeleton.set_dataset_ref(output_ref, output_dataset_key) 

751 else: 

752 # Remove this dataset from the skeleton graph, 

753 # because the quantum that would have produced it 

754 # is being skipped and it doesn't already exist. 

755 skeleton.remove_dataset_nodes([output_dataset_key]) 

756 # Removing the quantum node from the graph will happen outside this 

757 # function. 

758 return True 

759 return False 

760 

761 @final 

762 def _update_quantum_for_adjust( 

763 self, 

764 quantum_key: QuantumKey, 

765 skeleton: QuantumGraphSkeleton, 

766 task_prerequisite_info: PrerequisiteInfo, 

767 skypix_bounds_builder: SkyPixBoundsBuilder, 

768 timespan_builder: TimespanBuilder, 

769 ) -> None: 

770 """Update the quantum node in the skeleton by finding remaining 

771 prerequisite inputs and dropping regular inputs that we now know will 

772 not be produced. 

773 

774 Parameters 

775 ---------- 

776 quantum_key : `QuantumKey` 

777 Identifier for this quantum in the graph. 

778 skeleton : `.quantum_graph_skeleton.QuantumGraphSkeleton` 

779 Preliminary quantum graph, to be modified in-place. 

780 task_prerequisite_info : `~prerequisite_helpers.PrerequisiteInfo` 

781 Information about the prerequisite inputs to this task. 

782 skypix_bounds_builder : `~prerequisite_helpers.SkyPixBoundsBuilder` 

783 An object that accumulates the appropriate spatial bounds for a 

784 quantum. 

785 timespan_builder : `~prerequisite_helpers.TimespanBuilder` 

786 An object that accumulates the appropriate timespan for a quantum. 

787 

788 Notes 

789 ----- 

790 This first looks for outputs already present in the `output_run` (i.e. 

791 "in the way" in the skeleton); if it finds something and `clobber` is 

792 `True`, it uses that ref (it's not ideal that both the original dataset 

793 and its replacement will have the same UUID, but we don't have space in 

794 the quantum graph for two UUIDs, and we need the datastore records of 

795 the original there). If `clobber` is `False`, `RuntimeError` is 

796 raised. If there is no output already present, a new one with a random 

797 UUID is generated. In all cases the dataset node in the skeleton is 

798 associated with a `lsst.daf.butler.DatasetRef`. 

799 """ 

800 dataset_key: DatasetKey | PrerequisiteDatasetKey 

801 for dataset_key in skeleton.iter_outputs_of(quantum_key): 

802 dataset_data_id = skeleton.get_data_id(dataset_key) 

803 dataset_type_node = self._pipeline_graph.dataset_types[dataset_key.parent_dataset_type_name] 

804 if (ref := skeleton.get_output_in_the_way(dataset_key)) is None: 

805 ref = DatasetRef(dataset_type_node.dataset_type, dataset_data_id, run=self.output_run) 

806 elif not self.clobber: 

807 # We intentionally raise here, before running adjustQuantum, 

808 # because it'd be weird if we left an old potential output of a 

809 # task sitting there in the output collection, just because the 

810 # task happened to not actually produce it. 

811 raise OutputExistsError( 

812 f"Potential output dataset {ref} already exists in the output run " 

813 f"{self.output_run}, but clobbering outputs was not expected to be necessary." 

814 ) 

815 skypix_bounds_builder.handle_dataset(dataset_key.parent_dataset_type_name, dataset_data_id) 

816 timespan_builder.handle_dataset(dataset_key.parent_dataset_type_name, dataset_data_id) 

817 skeleton.set_dataset_ref(ref, dataset_key) 

818 quantum_data_id = skeleton.get_data_id(quantum_key) 

819 # Process inputs already present in the skeleton - this should include 

820 # all regular inputs (including intermediates) and may include some 

821 # prerequisites. 

822 for dataset_key in list(skeleton.iter_inputs_of(quantum_key)): 

823 if (ref := skeleton.get_dataset_ref(dataset_key)) is None: 

824 # If the dataset ref hasn't been set either as an existing 

825 # input or as an output of an already-processed upstream 

826 # quantum, it's not going to be produced; remove it. 

827 skeleton.remove_dataset_nodes([dataset_key]) 

828 continue 

829 skypix_bounds_builder.handle_dataset(dataset_key.parent_dataset_type_name, ref.dataId) 

830 timespan_builder.handle_dataset(dataset_key.parent_dataset_type_name, ref.dataId) 

831 # Query for any prerequisites not handled by process_subgraph. Note 

832 # that these were not already in the skeleton graph, so we add them 

833 # now. 

834 skypix_bounds = skypix_bounds_builder.finish() 

835 timespan = timespan_builder.finish() 

836 for finder in task_prerequisite_info.finders.values(): 

837 dataset_keys = [] 

838 for ref in finder.find( 

839 self.butler, self.input_collections, quantum_data_id, skypix_bounds, timespan 

840 ): 

841 dataset_key = skeleton.add_prerequisite_node(ref) 

842 dataset_keys.append(dataset_key) 

843 skeleton.add_input_edges(quantum_key, dataset_keys) 

844 

845 @final 

846 def _adapt_quantum_outputs( 

847 self, 

848 task_node: TaskNode, 

849 quantum_key: QuantumKey, 

850 skeleton: QuantumGraphSkeleton, 

851 ) -> NamedKeyDict[DatasetType, list[DatasetRef]]: 

852 """Adapt outputs for a preliminary quantum and put them into the form 

853 used by `~lsst.daf.butler.Quantum` and 

854 `~PipelineTaskConnections.adjustQuantum`. 

855 

856 Parameters 

857 ---------- 

858 task_node : `pipeline_graph.TaskNode` 

859 Node for this task in the pipeline graph. 

860 quantum_key : `QuantumKey` 

861 Identifier for this quantum in the graph. 

862 skeleton : `.quantum_graph_skeleton.QuantumGraphSkeleton` 

863 Preliminary quantum graph, to be modified in-place. 

864 

865 Returns 

866 ------- 

867 outputs : `~lsst.daf.butler.NamedKeyDict` [ \ 

868 `~lsst.daf.butler.DatasetType`, `list` [ \ 

869 `~lsst.daf.butler.DatasetRef` ] ] 

870 All outputs to the task, using the storage class and components 

871 defined by the task's own connections. 

872 """ 

873 outputs_by_type: dict[str, list[DatasetRef]] = {} 

874 dataset_key: DatasetKey 

875 for dataset_key in skeleton.iter_outputs_of(quantum_key): 

876 ref = skeleton.get_dataset_ref(dataset_key) 

877 assert ref is not None, "Should have been added (or the node removed) in a previous pass." 

878 outputs_by_type.setdefault(dataset_key.parent_dataset_type_name, []).append(ref) 

879 adapted_outputs: NamedKeyDict[DatasetType, list[DatasetRef]] = NamedKeyDict() 

880 for write_edge in task_node.iter_all_outputs(): 

881 dataset_type_node = self._pipeline_graph.dataset_types[write_edge.parent_dataset_type_name] 

882 edge_dataset_type = write_edge.adapt_dataset_type(dataset_type_node.dataset_type) 

883 adapted_outputs[edge_dataset_type] = [ 

884 write_edge.adapt_dataset_ref(ref) 

885 for ref in sorted(outputs_by_type.get(write_edge.parent_dataset_type_name, [])) 

886 ] 

887 return adapted_outputs 

888 

889 @final 

890 def _adapt_quantum_inputs( 

891 self, 

892 task_node: TaskNode, 

893 quantum_key: QuantumKey, 

894 skeleton: QuantumGraphSkeleton, 

895 ) -> NamedKeyDict[DatasetType, list[DatasetRef]]: 

896 """Adapt input datasets for a preliminary quantum into the form used by 

897 `~lsst.daf.butler.Quantum` and 

898 `~PipelineTaskConnections.adjustQuantum`. 

899 

900 Parameters 

901 ---------- 

902 task_node : `pipeline_graph.TaskNode` 

903 Node for this task in the pipeline graph. 

904 quantum_key : `QuantumKey` 

905 Identifier for this quantum in the graph. 

906 skeleton : `.quantum_graph_skeleton.QuantumGraphSkeleton` 

907 Preliminary quantum graph, to be modified in-place. 

908 

909 Returns 

910 ------- 

911 inputs : `~lsst.daf.butler.NamedKeyDict` [ \ 

912 `~lsst.daf.butler.DatasetType`, `list` [ \ 

913 `~lsst.daf.butler.DatasetRef` ] ] 

914 All regular and prerequisite inputs to the task, using the storage 

915 class and components defined by the task's own connections. 

916 

917 Notes 

918 ----- 

919 This method trims input dataset nodes that are not already associated 

920 with a `lsst.daf.butler.DatasetRef`, and queries for prerequisite input 

921 nodes that do not exist. 

922 """ 

923 inputs_by_type: dict[str, set[DatasetRef]] = {} 

924 dataset_key: DatasetKey | PrerequisiteDatasetKey 

925 for dataset_key in list(skeleton.iter_inputs_of(quantum_key)): 

926 ref = skeleton.get_dataset_ref(dataset_key) 

927 assert ref is not None, "Should have been added (or the node removed) in a previous pass." 

928 inputs_by_type.setdefault(dataset_key.parent_dataset_type_name, set()).add(ref) 

929 adapted_inputs: NamedKeyDict[DatasetType, list[DatasetRef]] = NamedKeyDict() 

930 for read_edge in task_node.iter_all_inputs(): 

931 dataset_type_node = self._pipeline_graph.dataset_types[read_edge.parent_dataset_type_name] 

932 edge_dataset_type = read_edge.adapt_dataset_type(dataset_type_node.dataset_type) 

933 if (current_dataset_type := adapted_inputs.keys().get(edge_dataset_type.name)) is None: 

934 adapted_inputs[edge_dataset_type] = [ 

935 read_edge.adapt_dataset_ref(ref) 

936 for ref in sorted(inputs_by_type.get(read_edge.parent_dataset_type_name, frozenset())) 

937 ] 

938 elif current_dataset_type != edge_dataset_type: 

939 raise NotImplementedError( 

940 f"Task {task_node.label!r} has {edge_dataset_type.name!r} as an input via " 

941 "two different connections, with two different storage class overrides. " 

942 "This is not yet supported due to limitations in the Quantum data structure." 

943 ) 

944 # If neither the `if` nor the `elif` above match, it means 

945 # multiple input connections have exactly the same dataset 

946 # type, and hence nothing to do after the first one. 

947 return adapted_inputs 

948 

949 @final 

950 def _resolve_task_init( 

951 self, task_node: TaskNode, skeleton: QuantumGraphSkeleton, has_skipped_quanta: bool 

952 ) -> None: 

953 """Add init-input and init-output dataset nodes and edges for a task to 

954 the skeleton. 

955 

956 Parameters 

957 ---------- 

958 task_node : `pipeline_graph.TaskNode` 

959 Pipeline graph description of the task. 

960 skeleton : `QuantumGraphSkeleton` 

961 In-progress quantum graph data structure to update in-place. 

962 has_skipped_quanta : `bool` 

963 Whether any of this task's quanta were skipped because they had 

964 already succeeded. 

965 """ 

966 quanta = skeleton.get_quanta(task_node.label) 

967 task_init_key = TaskInitKey(task_node.label) 

968 if quanta: 

969 adapted_inputs: NamedKeyDict[DatasetType, DatasetRef] = NamedKeyDict() 

970 # Process init-inputs. 

971 input_keys: list[DatasetKey] = [] 

972 for read_edge in task_node.init.iter_all_inputs(): 

973 dataset_key = skeleton.add_dataset_node( 

974 read_edge.parent_dataset_type_name, self.empty_data_id 

975 ) 

976 skeleton.add_input_edge(task_init_key, dataset_key) 

977 if (ref := skeleton.get_dataset_ref(dataset_key)) is None: 

978 try: 

979 ref = self.empty_dimensions_datasets.inputs[dataset_key] 

980 except KeyError: 

981 raise InitInputMissingError( 

982 f"Overall init-input dataset {read_edge.parent_dataset_type_name!r} " 

983 f"needed by task {task_node.label!r} not found in input collection(s) " 

984 f"{self.input_collections}." 

985 ) from None 

986 skeleton.set_dataset_ref(ref, dataset_key) 

987 for quantum_key in skeleton.get_quanta(task_node.label): 

988 skeleton.add_input_edge(quantum_key, dataset_key) 

989 input_keys.append(dataset_key) 

990 adapted_ref = read_edge.adapt_dataset_ref(ref) 

991 adapted_inputs[adapted_ref.datasetType] = adapted_ref 

992 # Save the quantum-adapted init inputs to each quantum, and add 

993 # skeleton edges connecting the init inputs to each quantum. 

994 for quantum_key in skeleton.get_quanta(task_node.label): 

995 skeleton[quantum_key]["init_inputs"] = adapted_inputs 

996 # Process init-outputs. 

997 adapted_outputs: NamedKeyDict[DatasetType, DatasetRef] = NamedKeyDict() 

998 for write_edge in task_node.init.iter_all_outputs(): 

999 dataset_key = skeleton.add_dataset_node( 

1000 write_edge.parent_dataset_type_name, self.empty_data_id 

1001 ) 

1002 if (ref := self.empty_dimensions_datasets.outputs_in_the_way.get(dataset_key)) is None: 

1003 ref = DatasetRef( 

1004 self._pipeline_graph.dataset_types[write_edge.parent_dataset_type_name].dataset_type, 

1005 self.empty_data_id, 

1006 run=self.output_run, 

1007 ) 

1008 skeleton.set_dataset_ref(ref, dataset_key) 

1009 skeleton.add_output_edge(task_init_key, dataset_key) 

1010 adapted_ref = write_edge.adapt_dataset_ref(ref) 

1011 adapted_outputs[adapted_ref.datasetType] = adapted_ref 

1012 skeleton[task_init_key]["inputs"] = adapted_inputs 

1013 skeleton[task_init_key]["outputs"] = adapted_outputs 

1014 elif has_skipped_quanta: 

1015 # No quanta remain for this task, but at least one quantum was 

1016 # skipped because its outputs were present in the skip_existing_in 

1017 # collections. This means all init outputs should be present in 

1018 # the skip_existing_in collections, too, and we need to put those 

1019 # refs in the graph. 

1020 for write_edge in task_node.init.iter_all_outputs(): 

1021 dataset_key = skeleton.add_dataset_node( 

1022 write_edge.parent_dataset_type_name, self.empty_data_id 

1023 ) 

1024 if (ref := self.empty_dimensions_datasets.outputs_for_skip.get(dataset_key)) is None: 

1025 raise InitInputMissingError( 

1026 f"Init-output dataset {write_edge.parent_dataset_type_name!r} of skipped task " 

1027 f"{task_node.label!r} not found in skip-existing-in collection(s) " 

1028 f"{self.skip_existing_in}." 

1029 ) from None 

1030 skeleton.set_dataset_ref(ref, dataset_key) 

1031 # If this dataset was "in the way" (i.e. already in the output 

1032 # run), it isn't anymore. 

1033 skeleton.discard_output_in_the_way(dataset_key) 

1034 # No quanta remain in this task, but none were skipped; this means 

1035 # they all got pruned because of NoWorkFound conditions. This 

1036 # dooms all downstream quanta to the same fate, so we don't bother 

1037 # doing anything with the task's init-outputs, since nothing is 

1038 # going to consume them. 

1039 

1040 @final 

1041 @timeMethod 

1042 def _find_empty_dimension_datasets(self) -> EmptyDimensionsDatasets: 

1043 """Query for all dataset types with no dimensions, updating 

1044 `empty_dimensions_datasets` in-place. 

1045 

1046 This includes but is not limited to init inputs and init outputs. 

1047 """ 

1048 inputs: dict[DatasetKey | PrerequisiteDatasetKey, DatasetRef] = {} 

1049 outputs_for_skip: dict[DatasetKey, DatasetRef] = {} 

1050 outputs_in_the_way: dict[DatasetKey, DatasetRef] = {} 

1051 _, dataset_type_nodes = self._pipeline_graph.group_by_dimensions().get(self.universe.empty, ({}, {})) 

1052 dataset_types = [node.dataset_type for node in dataset_type_nodes.values()] 

1053 dataset_types.extend(self._global_init_output_types.values()) 

1054 for dataset_type in dataset_types: 

1055 key = DatasetKey(dataset_type.name, self.empty_data_id.required_values) 

1056 if ( 

1057 self._pipeline_graph.producer_of(dataset_type.name) is None 

1058 and dataset_type.name not in self._global_init_output_types 

1059 ): 

1060 # Dataset type is an overall input; we always need to try to 

1061 # find these. 

1062 try: 

1063 ref = self.butler.find_dataset(dataset_type.name, collections=self.input_collections) 

1064 except MissingDatasetTypeError: 

1065 ref = None 

1066 if ref is not None: 

1067 inputs[key] = ref 

1068 elif self.skip_existing_in: 

1069 # Dataset type is an intermediate or output; need to find these 

1070 # if only they're from previously executed quanta that we might 

1071 # skip... 

1072 try: 

1073 ref = self.butler.find_dataset(dataset_type.name, collections=self.skip_existing_in) 

1074 except MissingDatasetTypeError: 

1075 ref = None 

1076 if ref is not None: 

1077 outputs_for_skip[key] = ref 

1078 if ref.run == self.output_run: 

1079 outputs_in_the_way[key] = ref 

1080 if self.output_run_exists and not self.skip_existing_starts_with_output_run: 

1081 # ...or if they're in the way and would need to be clobbered 

1082 # (and we haven't already found them in the previous block). 

1083 try: 

1084 ref = self.butler.find_dataset(dataset_type.name, collections=[self.output_run]) 

1085 except MissingDatasetTypeError: 

1086 ref = None 

1087 if ref is not None: 

1088 outputs_in_the_way[key] = ref 

1089 return EmptyDimensionsDatasets( 

1090 inputs=inputs, outputs_for_skip=outputs_for_skip, outputs_in_the_way=outputs_in_the_way 

1091 ) 

1092 

1093 @final 

1094 @timeMethod 

1095 def _attach_datastore_records(self, skeleton: QuantumGraphSkeleton) -> None: 

1096 """Add datastore records for all overall inputs to a preliminary 

1097 quantum graph. 

1098 

1099 Parameters 

1100 ---------- 

1101 skeleton : `.quantum_graph_skeleton.QuantumGraphSkeleton` 

1102 Preliminary quantum graph to update in place. 

1103 

1104 Notes 

1105 ----- 

1106 On return, all quantum nodes in the skeleton graph will have a 

1107 "datastore_records" attribute that is a mapping from datastore name 

1108 to `lsst.daf.butler.DatastoreRecordData`, as used by 

1109 `lsst.daf.butler.Quantum`. 

1110 """ 

1111 self.log.info("Fetching and attaching datastore records for all overall inputs.") 

1112 overall_inputs = skeleton.extract_overall_inputs() 

1113 exported_records = self.butler._datastore.export_records(overall_inputs.values()) 

1114 for task_label in self._pipeline_graph.tasks: 

1115 if not skeleton.has_task(task_label): 

1116 continue 

1117 self.log.verbose("Fetching and attaching datastore records for task %s.", task_label) 

1118 task_init_key = skeleton.get_task_init_node(task_label) 

1119 init_input_ids = { 

1120 ref.id 

1121 for dataset_key in skeleton.iter_inputs_of(task_init_key) 

1122 if (ref := overall_inputs.get(dataset_key)) is not None 

1123 } 

1124 init_records = {} 

1125 if init_input_ids: 

1126 for datastore_name, records in exported_records.items(): 

1127 matching_records = records.subset(init_input_ids) 

1128 if matching_records is not None: 

1129 init_records[datastore_name] = matching_records 

1130 skeleton[task_init_key]["datastore_records"] = init_records 

1131 for quantum_key in skeleton.get_quanta(task_label): 

1132 quantum_records = {} 

1133 input_ids = { 

1134 ref.id 

1135 for dataset_key in skeleton.iter_inputs_of(quantum_key) 

1136 if (ref := overall_inputs.get(dataset_key)) is not None 

1137 } 

1138 if input_ids: 

1139 for datastore_name, records in exported_records.items(): 

1140 matching_records = records.subset(input_ids) 

1141 if matching_records is not None: 

1142 quantum_records[datastore_name] = matching_records 

1143 skeleton[quantum_key]["datastore_records"] = quantum_records 

1144 

1145 @final 

1146 @timeMethod 

1147 def _construct_quantum_graph( 

1148 self, skeleton: QuantumGraphSkeleton, metadata: Mapping[str, Any] 

1149 ) -> QuantumGraph: 

1150 """Construct a `.QuantumGraph` object from the contents of a 

1151 fully-processed `.quantum_graph_skeleton.QuantumGraphSkeleton`. 

1152 

1153 Parameters 

1154 ---------- 

1155 skeleton : `.quantum_graph_skeleton.QuantumGraphSkeleton` 

1156 Preliminary quantum graph. Must have "init_inputs", "inputs", and 

1157 "outputs" attributes on all quantum nodes, as added by 

1158 `_resolve_task_quanta`, as well as a "datastore_records" attribute 

1159 as added by `_attach_datastore_records`. 

1160 metadata : `~collections.abc.Mapping` 

1161 Flexible metadata to add to the graph. 

1162 

1163 Returns 

1164 ------- 

1165 quantum_graph : `.QuantumGraph` 

1166 DAG describing processing to be performed. 

1167 """ 

1168 from .graph import QuantumGraph 

1169 

1170 self.log.info("Transforming graph skeleton into a QuantumGraph instance.") 

1171 quanta: dict[TaskDef, set[Quantum]] = {} 

1172 init_inputs: dict[TaskDef, Iterable[DatasetRef]] = {} 

1173 init_outputs: dict[TaskDef, Iterable[DatasetRef]] = {} 

1174 for task_def in self._pipeline_graph._iter_task_defs(): 

1175 if not skeleton.has_task(task_def.label): 

1176 continue 

1177 self.log.verbose("Transforming graph skeleton nodes for task %s.", task_def.label) 

1178 task_node = self._pipeline_graph.tasks[task_def.label] 

1179 task_init_key = skeleton.get_task_init_node(task_def.label) 

1180 task_init_state = skeleton[task_init_key] 

1181 init_datastore_records: dict[str, DatastoreRecordData] = task_init_state.get( 

1182 "datastore_records", {} 

1183 ) 

1184 init_inputs[task_def] = task_init_state["inputs"].values() 

1185 init_outputs[task_def] = task_init_state["outputs"].values() 

1186 quanta_for_task: set[Quantum] = set() 

1187 for quantum_key in skeleton.get_quanta(task_node.label): 

1188 quantum_state = skeleton[quantum_key] 

1189 quantum_datastore_records: dict[str, DatastoreRecordData] = quantum_state.get( 

1190 "datastore_records", {} 

1191 ) 

1192 quanta_for_task.add( 

1193 Quantum( 

1194 taskName=task_node.task_class_name, 

1195 taskClass=task_node.task_class, 

1196 dataId=quantum_state["data_id"], 

1197 initInputs=quantum_state["init_inputs"], 

1198 inputs=quantum_state["inputs"], 

1199 outputs=quantum_state["outputs"], 

1200 datastore_records=DatastoreRecordData.merge_mappings( 

1201 quantum_datastore_records, init_datastore_records 

1202 ), 

1203 ) 

1204 ) 

1205 quanta[task_def] = quanta_for_task 

1206 

1207 registry_dataset_types: list[DatasetType] = [ 

1208 node.dataset_type for node in self._pipeline_graph.dataset_types.values() 

1209 ] 

1210 

1211 all_metadata = self.metadata.to_dict() 

1212 all_metadata.update(metadata) 

1213 global_init_outputs: list[DatasetRef] = [] 

1214 for dataset_key in skeleton.global_init_outputs: 

1215 ref = skeleton.get_dataset_ref(dataset_key) 

1216 assert ref is not None, "Global init input refs should be resolved already." 

1217 global_init_outputs.append(ref) 

1218 self.log.verbose("Invoking QuantumGraph class constructor.") 

1219 result = QuantumGraph( 

1220 quanta, 

1221 metadata=all_metadata, 

1222 universe=self.universe, 

1223 initInputs=init_inputs, 

1224 initOutputs=init_outputs, 

1225 globalInitOutputs=global_init_outputs, 

1226 registryDatasetTypes=registry_dataset_types, 

1227 ) 

1228 self.log.info("Graph build complete.") 

1229 return result 

1230 

1231 @final 

1232 @timeMethod 

1233 def _construct_components( 

1234 self, 

1235 skeleton: QuantumGraphSkeleton, 

1236 output: str | None, 

1237 metadata: Mapping[str, Any] | None, 

1238 ) -> PredictedQuantumGraphComponents: 

1239 """Return quantum graph components from a completed skeleton. 

1240 

1241 Parameters 

1242 ---------- 

1243 skeleton : `quantum_graph_skeleton.QuantumGraphSkeleton` 

1244 Temporary data structure used by the builder to represent the 

1245 graph. 

1246 output : `str` or `None`, optional 

1247 Output `~lsst.daf.butler.CollectionType.CHAINED` collection that 

1248 combines the input and output collections. 

1249 metadata : `~collections.abc.Mapping`, optional 

1250 Mapping of JSON-friendly metadata. Collection information, the 

1251 current user, and the current timestamp are automatically 

1252 included. 

1253 

1254 Returns 

1255 ------- 

1256 components : `.quantum_graph.PredictedQuantumGraphComponents` 

1257 Components that can be used to construct a graph object and/or save 

1258 it to disk. 

1259 """ 

1260 from .quantum_graph import ( 

1261 PredictedDatasetModel, 

1262 PredictedQuantumDatasetsModel, 

1263 PredictedQuantumGraphComponents, 

1264 ) 

1265 

1266 self.log.info("Transforming graph skeleton into PredictedQuantumGraph components.") 

1267 components = PredictedQuantumGraphComponents(pipeline_graph=self._pipeline_graph) 

1268 components.header.inputs = list(self.input_collections) 

1269 components.header.output_run = self.output_run 

1270 components.header.output = output 

1271 if metadata is not None: 

1272 components.header.metadata.update(metadata) 

1273 components.dimension_data = DimensionDataAttacher( 

1274 records=skeleton.get_dimension_data(), 

1275 dimensions=self._pipeline_graph.get_all_dimensions(), 

1276 ) 

1277 components.init_quanta.root = [ 

1278 PredictedQuantumDatasetsModel.model_construct( 

1279 quantum_id=generate_uuidv7(), 

1280 task_label="", 

1281 outputs={ 

1282 dataset_key.parent_dataset_type_name: [ 

1283 PredictedDatasetModel.from_dataset_ref( 

1284 cast(DatasetRef, skeleton.get_dataset_ref(dataset_key)) 

1285 ) 

1286 ] 

1287 for dataset_key in skeleton.global_init_outputs 

1288 }, 

1289 ) 

1290 ] 

1291 for task_node in self._pipeline_graph.tasks.values(): 

1292 if not skeleton.has_task(task_node.label): 

1293 continue 

1294 self.log.verbose("Transforming graph skeleton nodes for task %s.", task_node.label) 

1295 task_init_key = TaskInitKey(task_node.label) 

1296 init_quantum_datasets = PredictedQuantumDatasetsModel.model_construct( 

1297 quantum_id=generate_uuidv7(), 

1298 task_label=task_node.label, 

1299 inputs=self._make_predicted_datasets( 

1300 skeleton, 

1301 task_node.init.iter_all_inputs(), 

1302 skeleton.iter_inputs_of(task_init_key), 

1303 ), 

1304 outputs=self._make_predicted_datasets( 

1305 skeleton, 

1306 task_node.init.iter_all_outputs(), 

1307 skeleton.iter_outputs_of(task_init_key), 

1308 ), 

1309 datastore_records={ 

1310 datastore_name: records.to_simple() 

1311 for datastore_name, records in skeleton[task_init_key] 

1312 .get("datastore_records", {}) 

1313 .items() 

1314 }, 

1315 ) 

1316 components.init_quanta.root.append(init_quantum_datasets) 

1317 for quantum_key in skeleton.get_quanta(task_node.label): 

1318 quantum_datasets = PredictedQuantumDatasetsModel.model_construct( 

1319 quantum_id=generate_uuidv7(), 

1320 task_label=task_node.label, 

1321 data_coordinate=list(skeleton.get_data_id(quantum_key).full_values), 

1322 inputs=self._make_predicted_datasets( 

1323 skeleton, 

1324 task_node.iter_all_inputs(), 

1325 skeleton.iter_inputs_of(quantum_key), 

1326 ), 

1327 outputs=self._make_predicted_datasets( 

1328 skeleton, 

1329 task_node.iter_all_outputs(), 

1330 skeleton.iter_outputs_of(quantum_key), 

1331 ), 

1332 datastore_records={ 

1333 datastore_name: records.to_simple() 

1334 for datastore_name, records in skeleton[quantum_key] 

1335 .get("datastore_records", {}) 

1336 .items() 

1337 }, 

1338 ) 

1339 components.quantum_datasets[quantum_datasets.quantum_id] = quantum_datasets 

1340 self.log.verbose("Building the thin summary graph.") 

1341 components.set_thin_graph() 

1342 components.set_header_counts() 

1343 self.log.info("Graph build complete.") 

1344 return components 

1345 

1346 @staticmethod 

1347 def _make_predicted_datasets( 

1348 skeleton: QuantumGraphSkeleton, 

1349 edges: Iterable[Edge], 

1350 dataset_keys: Iterable[DatasetKey | PrerequisiteDatasetKey], 

1351 ) -> dict[str, list[PredictedDatasetModel]]: 

1352 """Make the predicted quantum graph model objects that represent the 

1353 datasets from an iterable of pipeline graph edges. 

1354 

1355 Parameters 

1356 ---------- 

1357 skeleton : `quantum_graph_skeleton.QuantumGraphSkeleton` 

1358 Temporary data structure used by the builder to represent the 

1359 graph. 

1360 edges : `~collections.abc.Iterable` [ `.pipeline_graph.Edge` ] 

1361 Pipeline graph edges. 

1362 dataset_keys : `~collections.abc.Iterable` [ \ 

1363 `.quantum_graph_skeleton.DatasetKey` or\ 

1364 `.quantum_graph_skeleton.PrerequisiteDatasetKey` ] 

1365 All nodes in the skeleton that correspond to any of the given 

1366 pipeline graph edges. 

1367 

1368 Returns 

1369 ------- 

1370 predicted_datasets : `dict` [ `str`, \ 

1371 `list` [ `.quantum_graph.PredictedDatasetModel` ] ] 

1372 Mapping of dataset models, keyed by connection name. 

1373 """ 

1374 from .quantum_graph import PredictedDatasetModel 

1375 

1376 connection_names_by_dataset_type: defaultdict[str, list[str]] = defaultdict(list) 

1377 result: dict[str, list[PredictedDatasetModel]] = {} 

1378 for edge in edges: 

1379 connection_names_by_dataset_type[edge.parent_dataset_type_name].append(edge.connection_name) 

1380 result[edge.connection_name] = [] 

1381 

1382 for dataset_key in dataset_keys: 

1383 connection_names = connection_names_by_dataset_type.get(dataset_key.parent_dataset_type_name) 

1384 if connection_names is None: 

1385 # Ignore if this isn't one of the connections we're processing 

1386 # (probably an init-input), which would also be predecessor to 

1387 # a quantum node, but should be handled separately. 

1388 continue 

1389 ref = skeleton.get_dataset_ref(dataset_key) 

1390 assert ref is not None, "DatasetRefs should have already been added to skeleton." 

1391 for connection_name in connection_names: 

1392 result[connection_name].append(PredictedDatasetModel.from_dataset_ref(ref)) 

1393 for refs in result.values(): 

1394 refs.sort(key=operator.attrgetter("data_coordinate")) 

1395 return result 

1396 

1397 @staticmethod 

1398 @final 

1399 def _find_removed( 

1400 original: Iterable[DatasetKey | PrerequisiteDatasetKey], 

1401 adjusted: NamedKeyMapping[DatasetType, Sequence[DatasetRef]], 

1402 ) -> set[DatasetKey | PrerequisiteDatasetKey]: 

1403 """Identify skeleton-graph dataset nodes that have been removed by 

1404 `~PipelineTaskConnections.adjustQuantum`. 

1405 

1406 Parameters 

1407 ---------- 

1408 original : `~collections.abc.Iterable` [ `DatasetKey` or \ 

1409 `PrerequisiteDatasetKey` ] 

1410 Identifiers for the dataset nodes that were the original neighbors 

1411 (inputs or outputs) of a quantum. 

1412 adjusted : `~lsst.daf.butler.NamedKeyMapping` [ \ 

1413 `~lsst.daf.butler.DatasetType`, \ 

1414 `~collections.abc.Sequence` [ `lsst.daf.butler.DatasetType` ] ] 

1415 Adjusted neighbors, in the form used by `lsst.daf.butler.Quantum`. 

1416 

1417 Returns 

1418 ------- 

1419 removed : `set` [ `DatasetKey` ] 

1420 Datasets in ``original`` that have no counterpart in ``adjusted``. 

1421 """ 

1422 result = set(original) 

1423 for dataset_type, kept_refs in adjusted.items(): 

1424 parent_dataset_type_name, _ = DatasetType.splitDatasetTypeName(dataset_type.name) 

1425 for kept_ref in kept_refs: 

1426 # We don't know if this was a DatasetKey or a 

1427 # PrerequisiteDatasetKey; just try both. 

1428 result.discard(DatasetKey(parent_dataset_type_name, kept_ref.dataId.required_values)) 

1429 result.discard(PrerequisiteDatasetKey(parent_dataset_type_name, kept_ref.id.bytes)) 

1430 return result 

1431 

1432 

1433@dataclasses.dataclass(eq=False, order=False) 

1434class EmptyDimensionsDatasets: 

1435 """Struct that holds the results of empty-dimensions dataset queries for 

1436 `QuantumGraphBuilder`. 

1437 """ 

1438 

1439 inputs: Mapping[DatasetKey | PrerequisiteDatasetKey, DatasetRef] = dataclasses.field(default_factory=dict) 

1440 """Overall-input datasets found in `QuantumGraphBuilder.input_collections`. 

1441 

1442 This may include prerequisite inputs. It does include init-inputs. 

1443 It does not include intermediates. 

1444 """ 

1445 

1446 outputs_for_skip: Mapping[DatasetKey, DatasetRef] = dataclasses.field(default_factory=dict) 

1447 """Output datasets found in `QuantumGraphBuilder.skip_existing_in`. 

1448 

1449 It is unspecified whether this contains init-outputs; there is 

1450 no concept of skipping at the init stage, so this is not expected to 

1451 matter. 

1452 """ 

1453 

1454 outputs_in_the_way: Mapping[DatasetKey, DatasetRef] = dataclasses.field(default_factory=dict) 

1455 """Output datasets found in `QuantumGraphBuilder.output_run`. 

1456 

1457 This includes regular outputs and init-outputs. 

1458 """ 

1459 

1460 

1461def _quantum_or_quanta(n: int) -> str: 

1462 """Correctly pluralize 'quantum' if needed.""" 

1463 return f"{n} quanta" if n != 1 else "1 quantum"