Coverage for python / lsst / pipe / base / quantum_graph / _predicted.py: 21%

614 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-17 08:59 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30__all__ = ( 

31 "PredictedDatasetInfo", 

32 "PredictedDatasetModel", 

33 "PredictedInitQuantaModel", 

34 "PredictedQuantumDatasetsModel", 

35 "PredictedQuantumGraph", 

36 "PredictedQuantumGraphComponents", 

37 "PredictedQuantumGraphReader", 

38 "PredictedQuantumInfo", 

39 "PredictedThinGraphModel", 

40 "PredictedThinQuantumModel", 

41) 

42 

43import dataclasses 

44import itertools 

45import logging 

46import sys 

47import uuid 

48import warnings 

49from collections import defaultdict 

50from collections.abc import Iterable, Iterator, Mapping, Sequence 

51from contextlib import AbstractContextManager, contextmanager 

52from typing import TYPE_CHECKING, Any, cast 

53 

54import networkx 

55import networkx.algorithms.bipartite 

56import pydantic 

57import zstandard 

58 

59from lsst.daf.butler import ( 

60 Config, 

61 DataCoordinate, 

62 DataIdValue, 

63 DatasetRef, 

64 DatasetType, 

65 DimensionDataAttacher, 

66 DimensionDataExtractor, 

67 DimensionGroup, 

68 DimensionRecordSetDeserializer, 

69 DimensionUniverse, 

70 LimitedButler, 

71 Quantum, 

72 QuantumBackedButler, 

73 SerializableDimensionData, 

74) 

75from lsst.daf.butler._rubin import generate_uuidv7 

76from lsst.daf.butler.datastore.record_data import DatastoreRecordData, SerializedDatastoreRecordData 

77from lsst.daf.butler.registry import ConflictingDefinitionError 

78from lsst.resources import ResourcePath, ResourcePathExpression 

79from lsst.utils.packages import Packages 

80 

81from .. import automatic_connection_constants as acc 

82from ..pipeline import TaskDef 

83from ..pipeline_graph import ( 

84 PipelineGraph, 

85 TaskImportMode, 

86 TaskInitNode, 

87 TaskNode, 

88 compare_packages, 

89 log_config_mismatch, 

90) 

91from ._common import ( 

92 FORMAT_VERSION, 

93 BaseQuantumGraph, 

94 BaseQuantumGraphReader, 

95 BaseQuantumGraphWriter, 

96 ConnectionName, 

97 DataCoordinateValues, 

98 DatasetInfo, 

99 DatasetTypeName, 

100 DatastoreName, 

101 HeaderModel, 

102 IncompleteQuantumGraphError, 

103 QuantumIndex, 

104 QuantumInfo, 

105 TaskLabel, 

106) 

107from ._multiblock import DEFAULT_PAGE_SIZE, AddressRow, MultiblockReader, MultiblockWriter 

108 

109if TYPE_CHECKING: 

110 from ..config import PipelineTaskConfig 

111 from ..graph import QgraphSummary, QuantumGraph 

112 

113# Sphinx needs imports for type annotations of base class members. 

114if "sphinx" in sys.modules: 

115 import zipfile # noqa: F401 

116 

117 from ._multiblock import AddressReader, Decompressor # noqa: F401 

118 

119 

120_LOG = logging.getLogger(__name__) 

121 

122 

123class _PredictedThinQuantumModelV0(pydantic.BaseModel): 

124 """Data model for a quantum data ID and internal integer ID in a predicted 

125 quantum graph. 

126 """ 

127 

128 quantum_index: QuantumIndex 

129 """Internal integer ID for this quantum.""" 

130 

131 data_coordinate: DataCoordinateValues = pydantic.Field(default_factory=list) 

132 """Full (required and implied) data coordinate values for this quantum.""" 

133 

134 

135class PredictedThinQuantumModel(pydantic.BaseModel): 

136 """Data model for a quantum data ID and UUID in a predicted 

137 quantum graph. 

138 """ 

139 

140 quantum_id: uuid.UUID 

141 """Universally unique ID for this quantum.""" 

142 

143 data_coordinate: DataCoordinateValues = pydantic.Field(default_factory=list) 

144 """Full (required and implied) data coordinate values for this quantum.""" 

145 

146 # Work around the fact that Sphinx chokes on Pydantic docstring formatting, 

147 # when we inherit those docstrings in our public classes. 

148 if "sphinx" in sys.modules and not TYPE_CHECKING: 

149 

150 def copy(self, *args: Any, **kwargs: Any) -> Any: 

151 """See `pydantic.BaseModel.copy`.""" 

152 return super().copy(*args, **kwargs) 

153 

154 def model_dump(self, *args: Any, **kwargs: Any) -> Any: 

155 """See `pydantic.BaseModel.model_dump`.""" 

156 return super().model_dump(*args, **kwargs) 

157 

158 def model_dump_json(self, *args: Any, **kwargs: Any) -> Any: 

159 """See `pydantic.BaseModel.model_dump_json`.""" 

160 return super().model_dump(*args, **kwargs) 

161 

162 def model_copy(self, *args: Any, **kwargs: Any) -> Any: 

163 """See `pydantic.BaseModel.model_copy`.""" 

164 return super().model_copy(*args, **kwargs) 

165 

166 @classmethod 

167 def model_construct(cls, *args: Any, **kwargs: Any) -> Any: # type: ignore[misc, override] 

168 """See `pydantic.BaseModel.model_construct`.""" 

169 return super().model_construct(*args, **kwargs) 

170 

171 @classmethod 

172 def model_json_schema(cls, *args: Any, **kwargs: Any) -> Any: 

173 """See `pydantic.BaseModel.model_json_schema`.""" 

174 return super().model_json_schema(*args, **kwargs) 

175 

176 @classmethod 

177 def model_validate(cls, *args: Any, **kwargs: Any) -> Any: 

178 """See `pydantic.BaseModel.model_validate`.""" 

179 return super().model_validate(*args, **kwargs) 

180 

181 @classmethod 

182 def model_validate_json(cls, *args: Any, **kwargs: Any) -> Any: 

183 """See `pydantic.BaseModel.model_validate_json`.""" 

184 return super().model_validate_json(*args, **kwargs) 

185 

186 @classmethod 

187 def model_validate_strings(cls, *args: Any, **kwargs: Any) -> Any: 

188 """See `pydantic.BaseModel.model_validate_strings`.""" 

189 return super().model_validate_strings(*args, **kwargs) 

190 

191 

192class _PredictedThinGraphModelV0(pydantic.BaseModel): 

193 """Data model for the predicted quantum graph component that maps each 

194 task label to the data IDs and internal integer IDs of its quanta. 

195 """ 

196 

197 quanta: dict[TaskLabel, list[_PredictedThinQuantumModelV0]] = pydantic.Field(default_factory=dict) 

198 """Minimal descriptions of all quanta, grouped by task label.""" 

199 

200 edges: list[tuple[QuantumIndex, QuantumIndex]] = pydantic.Field(default_factory=list) 

201 """Pairs of (predecessor, successor) internal integer quantum IDs.""" 

202 

203 def _upgraded(self, address_rows: Mapping[uuid.UUID, AddressRow]) -> PredictedThinGraphModel: 

204 """Convert to the v1+ model.""" 

205 uuid_by_index = {v.index: k for k, v in address_rows.items()} 

206 return PredictedThinGraphModel.model_construct( 

207 quanta={ 

208 task_label: [ 

209 PredictedThinQuantumModel.model_construct( 

210 quantum_id=uuid_by_index[q.quantum_index], data_coordinate=q.data_coordinate 

211 ) 

212 for q in quanta 

213 ] 

214 for task_label, quanta in self.quanta.items() 

215 }, 

216 edges=[(uuid_by_index[index1], uuid_by_index[index2]) for index1, index2 in self.edges], 

217 ) 

218 

219 

220class PredictedThinGraphModel(pydantic.BaseModel): 

221 """Data model for the predicted quantum graph component that maps each 

222 task label to the data IDs and UUIDs of its quanta. 

223 """ 

224 

225 quanta: dict[TaskLabel, list[PredictedThinQuantumModel]] = pydantic.Field(default_factory=dict) 

226 """Minimal descriptions of all quanta, grouped by task label.""" 

227 

228 edges: list[tuple[uuid.UUID, uuid.UUID]] = pydantic.Field(default_factory=list) 

229 """Pairs of (predecessor, successor) quantum IDs.""" 

230 

231 # Work around the fact that Sphinx chokes on Pydantic docstring formatting, 

232 # when we inherit those docstrings in our public classes. 

233 if "sphinx" in sys.modules and not TYPE_CHECKING: 

234 

235 def copy(self, *args: Any, **kwargs: Any) -> Any: 

236 """See `pydantic.BaseModel.copy`.""" 

237 return super().copy(*args, **kwargs) 

238 

239 def model_dump(self, *args: Any, **kwargs: Any) -> Any: 

240 """See `pydantic.BaseModel.model_dump`.""" 

241 return super().model_dump(*args, **kwargs) 

242 

243 def model_dump_json(self, *args: Any, **kwargs: Any) -> Any: 

244 """See `pydantic.BaseModel.model_dump_json`.""" 

245 return super().model_dump(*args, **kwargs) 

246 

247 def model_copy(self, *args: Any, **kwargs: Any) -> Any: 

248 """See `pydantic.BaseModel.model_copy`.""" 

249 return super().model_copy(*args, **kwargs) 

250 

251 @classmethod 

252 def model_construct(cls, *args: Any, **kwargs: Any) -> Any: # type: ignore[misc, override] 

253 """See `pydantic.BaseModel.model_construct`.""" 

254 return super().model_construct(*args, **kwargs) 

255 

256 @classmethod 

257 def model_json_schema(cls, *args: Any, **kwargs: Any) -> Any: 

258 """See `pydantic.BaseModel.model_json_schema`.""" 

259 return super().model_json_schema(*args, **kwargs) 

260 

261 @classmethod 

262 def model_validate(cls, *args: Any, **kwargs: Any) -> Any: 

263 """See `pydantic.BaseModel.model_validate`.""" 

264 return super().model_validate(*args, **kwargs) 

265 

266 @classmethod 

267 def model_validate_json(cls, *args: Any, **kwargs: Any) -> Any: 

268 """See `pydantic.BaseModel.model_validate_json`.""" 

269 return super().model_validate_json(*args, **kwargs) 

270 

271 @classmethod 

272 def model_validate_strings(cls, *args: Any, **kwargs: Any) -> Any: 

273 """See `pydantic.BaseModel.model_validate_strings`.""" 

274 return super().model_validate_strings(*args, **kwargs) 

275 

276 

277class PredictedDatasetModel(pydantic.BaseModel): 

278 """Data model for the datasets in a predicted quantum graph file.""" 

279 

280 dataset_id: uuid.UUID 

281 """Universally unique ID for the dataset.""" 

282 

283 dataset_type_name: DatasetTypeName 

284 """Name of the type of this dataset. 

285 

286 This is always a parent dataset type name, not a component. 

287 

288 Note that full dataset type definitions are stored in the pipeline graph. 

289 """ 

290 

291 data_coordinate: DataCoordinateValues = pydantic.Field(default_factory=list) 

292 """The full values (required and implied) of this dataset's data ID.""" 

293 

294 run: str 

295 """This dataset's RUN collection name.""" 

296 

297 @classmethod 

298 def from_dataset_ref(cls, ref: DatasetRef) -> PredictedDatasetModel: 

299 """Construct from a butler `~lsst.daf.butler.DatasetRef`. 

300 

301 Parameters 

302 ---------- 

303 ref : `lsst.daf.butler.DatasetRef` 

304 Dataset reference. 

305 

306 Returns 

307 ------- 

308 model : `PredictedDatasetModel` 

309 Model for the dataset. 

310 """ 

311 dataset_type_name, _ = DatasetType.splitDatasetTypeName(ref.datasetType.name) 

312 return cls.model_construct( 

313 dataset_id=ref.id, 

314 dataset_type_name=dataset_type_name, 

315 data_coordinate=list(ref.dataId.full_values), 

316 run=ref.run, 

317 ) 

318 

319 # Work around the fact that Sphinx chokes on Pydantic docstring formatting, 

320 # when we inherit those docstrings in our public classes. 

321 if "sphinx" in sys.modules and not TYPE_CHECKING: 

322 

323 def copy(self, *args: Any, **kwargs: Any) -> Any: 

324 """See `pydantic.BaseModel.copy`.""" 

325 return super().copy(*args, **kwargs) 

326 

327 def model_dump(self, *args: Any, **kwargs: Any) -> Any: 

328 """See `pydantic.BaseModel.model_dump`.""" 

329 return super().model_dump(*args, **kwargs) 

330 

331 def model_dump_json(self, *args: Any, **kwargs: Any) -> Any: 

332 """See `pydantic.BaseModel.model_dump_json`.""" 

333 return super().model_dump(*args, **kwargs) 

334 

335 def model_copy(self, *args: Any, **kwargs: Any) -> Any: 

336 """See `pydantic.BaseModel.model_copy`.""" 

337 return super().model_copy(*args, **kwargs) 

338 

339 @classmethod 

340 def model_construct(cls, *args: Any, **kwargs: Any) -> Any: # type: ignore[misc, override] 

341 """See `pydantic.BaseModel.model_construct`.""" 

342 return super().model_construct(*args, **kwargs) 

343 

344 @classmethod 

345 def model_json_schema(cls, *args: Any, **kwargs: Any) -> Any: 

346 """See `pydantic.BaseModel.model_json_schema`.""" 

347 return super().model_json_schema(*args, **kwargs) 

348 

349 @classmethod 

350 def model_validate(cls, *args: Any, **kwargs: Any) -> Any: 

351 """See `pydantic.BaseModel.model_validate`.""" 

352 return super().model_validate(*args, **kwargs) 

353 

354 @classmethod 

355 def model_validate_json(cls, *args: Any, **kwargs: Any) -> Any: 

356 """See `pydantic.BaseModel.model_validate_json`.""" 

357 return super().model_validate_json(*args, **kwargs) 

358 

359 @classmethod 

360 def model_validate_strings(cls, *args: Any, **kwargs: Any) -> Any: 

361 """See `pydantic.BaseModel.model_validate_strings`.""" 

362 return super().model_validate_strings(*args, **kwargs) 

363 

364 

365class PredictedQuantumDatasetsModel(pydantic.BaseModel): 

366 """Data model for a description of a single predicted quantum that includes 

367 its inputs and outputs. 

368 """ 

369 

370 quantum_id: uuid.UUID 

371 """Universally unique ID for the quantum.""" 

372 

373 task_label: TaskLabel 

374 """Label of the task. 

375 

376 Note that task label definitions are stored in the pipeline graph. 

377 """ 

378 

379 data_coordinate: DataCoordinateValues = pydantic.Field(default_factory=list) 

380 """The full values (required and implied) of this quantum's data ID.""" 

381 

382 inputs: dict[ConnectionName, list[PredictedDatasetModel]] = pydantic.Field(default_factory=dict) 

383 """The input datasets to this quantum, grouped by connection name.""" 

384 

385 outputs: dict[ConnectionName, list[PredictedDatasetModel]] = pydantic.Field(default_factory=dict) 

386 """The datasets output by this quantum, grouped by connection name.""" 

387 

388 datastore_records: dict[DatastoreName, SerializedDatastoreRecordData] = pydantic.Field( 

389 default_factory=dict 

390 ) 

391 """Datastore records for inputs to this quantum that are already present in 

392 the data repository. 

393 """ 

394 

395 def iter_input_dataset_ids(self) -> Iterator[uuid.UUID]: 

396 """Return an iterator over the UUIDs of all datasets consumed by this 

397 quantum. 

398 

399 Returns 

400 ------- 

401 iter : `~collections.abc.Iterator` [ `uuid.UUID` ] 

402 Iterator over dataset IDs. 

403 """ 

404 for datasets in self.inputs.values(): 

405 for dataset in datasets: 

406 yield dataset.dataset_id 

407 

408 def iter_output_dataset_ids(self) -> Iterator[uuid.UUID]: 

409 """Return an iterator over the UUIDs of all datasets produced by this 

410 quantum. 

411 

412 Returns 

413 ------- 

414 iter : `~collections.abc.Iterator` [ `uuid.UUID` ] 

415 Iterator over dataset IDs. 

416 """ 

417 for datasets in self.outputs.values(): 

418 for dataset in datasets: 

419 yield dataset.dataset_id 

420 

421 def iter_dataset_ids(self) -> Iterator[uuid.UUID]: 

422 """Return an iterator over the UUIDs of all datasets referenced by this 

423 quantum. 

424 

425 Returns 

426 ------- 

427 iter : `~collections.abc.Iterator` [ `uuid.UUID` ] 

428 Iterator over dataset IDs. 

429 """ 

430 yield from self.iter_input_dataset_ids() 

431 yield from self.iter_output_dataset_ids() 

432 

433 def deserialize_datastore_records(self) -> dict[DatastoreName, DatastoreRecordData]: 

434 """Deserialize the mapping of datastore records.""" 

435 return { 

436 datastore_name: DatastoreRecordData.from_simple(serialized_records) 

437 for datastore_name, serialized_records in self.datastore_records.items() 

438 } 

439 

440 @classmethod 

441 def from_execution_quantum( 

442 cls, task_node: TaskNode, quantum: Quantum, quantum_id: uuid.UUID 

443 ) -> PredictedQuantumDatasetsModel: 

444 """Construct from an `lsst.daf.butler.Quantum` instance. 

445 

446 Parameters 

447 ---------- 

448 task_node : `.pipeline_graph.TaskNode` 

449 Task node from the pipeline graph. 

450 quantum : `lsst.daf.butler.quantum` 

451 Quantum object. 

452 quantum_id : `uuid.UUID` 

453 ID for this quantum. 

454 

455 Returns 

456 ------- 

457 model : `PredictedFullQuantumModel` 

458 Model for this quantum. 

459 """ 

460 result: PredictedQuantumDatasetsModel = cls.model_construct( 

461 quantum_id=quantum_id, 

462 task_label=task_node.label, 

463 data_coordinate=list(cast(DataCoordinate, quantum.dataId).full_values), 

464 ) 

465 for read_edge in task_node.iter_all_inputs(): 

466 refs = sorted(quantum.inputs[read_edge.dataset_type_name], key=lambda ref: ref.dataId) 

467 result.inputs[read_edge.connection_name] = [ 

468 PredictedDatasetModel.from_dataset_ref(ref) for ref in refs 

469 ] 

470 for write_edge in task_node.iter_all_outputs(): 

471 refs = sorted(quantum.outputs[write_edge.dataset_type_name], key=lambda ref: ref.dataId) 

472 result.outputs[write_edge.connection_name] = [ 

473 PredictedDatasetModel.from_dataset_ref(ref) for ref in refs 

474 ] 

475 result.datastore_records = { 

476 store_name: records.to_simple() for store_name, records in quantum.datastore_records.items() 

477 } 

478 return result 

479 

480 @classmethod 

481 def from_old_quantum_graph_init( 

482 cls, task_init_node: TaskInitNode, old_quantum_graph: QuantumGraph 

483 ) -> PredictedQuantumDatasetsModel: 

484 """Construct from the init-input and init-output dataset types of a 

485 task in an old `QuantumGraph` instance. 

486 

487 Parameters 

488 ---------- 

489 task_init_node : `.pipeline_graph.TaskNode` 

490 Task init node from the pipeline graph. 

491 old_quantum_graph : `QuantumGraph` 

492 Quantum graph. 

493 

494 Returns 

495 ------- 

496 model : `PredictedFullQuantumModel` 

497 Model for this "init" quantum. 

498 """ 

499 task_def = old_quantum_graph.findTaskDefByLabel(task_init_node.label) 

500 assert task_def is not None 

501 init_input_refs = { 

502 ref.datasetType.name: ref for ref in (old_quantum_graph.initInputRefs(task_def) or []) 

503 } 

504 init_output_refs = { 

505 ref.datasetType.name: ref for ref in (old_quantum_graph.initOutputRefs(task_def) or []) 

506 } 

507 init_input_ids = {ref.id for ref in init_input_refs.values()} 

508 result: PredictedQuantumDatasetsModel = cls.model_construct( 

509 quantum_id=generate_uuidv7(), task_label=task_init_node.label 

510 ) 

511 for read_edge in task_init_node.iter_all_inputs(): 

512 ref = init_input_refs[read_edge.dataset_type_name] 

513 result.inputs[read_edge.connection_name] = [PredictedDatasetModel.from_dataset_ref(ref)] 

514 for write_edge in task_init_node.iter_all_outputs(): 

515 ref = init_output_refs[write_edge.dataset_type_name] 

516 result.outputs[write_edge.connection_name] = [PredictedDatasetModel.from_dataset_ref(ref)] 

517 datastore_records: dict[str, DatastoreRecordData] = {} 

518 for quantum in old_quantum_graph.get_task_quanta(task_init_node.label).values(): 

519 for store_name, records in quantum.datastore_records.items(): 

520 subset = records.subset(init_input_ids) 

521 if subset is not None: 

522 datastore_records.setdefault(store_name, DatastoreRecordData()).update(subset) 

523 break # All quanta have same init-inputs, so we only need one. 

524 result.datastore_records = { 

525 store_name: records.to_simple() for store_name, records in datastore_records.items() 

526 } 

527 return result 

528 

529 # Work around the fact that Sphinx chokes on Pydantic docstring formatting, 

530 # when we inherit those docstrings in our public classes. 

531 if "sphinx" in sys.modules and not TYPE_CHECKING: 

532 

533 def copy(self, *args: Any, **kwargs: Any) -> Any: 

534 """See `pydantic.BaseModel.copy`.""" 

535 return super().copy(*args, **kwargs) 

536 

537 def model_dump(self, *args: Any, **kwargs: Any) -> Any: 

538 """See `pydantic.BaseModel.model_dump`.""" 

539 return super().model_dump(*args, **kwargs) 

540 

541 def model_dump_json(self, *args: Any, **kwargs: Any) -> Any: 

542 """See `pydantic.BaseModel.model_dump_json`.""" 

543 return super().model_dump(*args, **kwargs) 

544 

545 def model_copy(self, *args: Any, **kwargs: Any) -> Any: 

546 """See `pydantic.BaseModel.model_copy`.""" 

547 return super().model_copy(*args, **kwargs) 

548 

549 @classmethod 

550 def model_construct(cls, *args: Any, **kwargs: Any) -> Any: # type: ignore[misc, override] 

551 """See `pydantic.BaseModel.model_construct`.""" 

552 return super().model_construct(*args, **kwargs) 

553 

554 @classmethod 

555 def model_json_schema(cls, *args: Any, **kwargs: Any) -> Any: 

556 """See `pydantic.BaseModel.model_json_schema`.""" 

557 return super().model_json_schema(*args, **kwargs) 

558 

559 @classmethod 

560 def model_validate(cls, *args: Any, **kwargs: Any) -> Any: 

561 """See `pydantic.BaseModel.model_validate`.""" 

562 return super().model_validate(*args, **kwargs) 

563 

564 @classmethod 

565 def model_validate_json(cls, *args: Any, **kwargs: Any) -> Any: 

566 """See `pydantic.BaseModel.model_validate_json`.""" 

567 return super().model_validate_json(*args, **kwargs) 

568 

569 @classmethod 

570 def model_validate_strings(cls, *args: Any, **kwargs: Any) -> Any: 

571 """See `pydantic.BaseModel.model_validate_strings`.""" 

572 return super().model_validate_strings(*args, **kwargs) 

573 

574 

575class PredictedInitQuantaModel(pydantic.RootModel): 

576 """Data model for the init-inputs and init-outputs of a predicted quantum 

577 graph. 

578 """ 

579 

580 root: list[PredictedQuantumDatasetsModel] = pydantic.Field(default_factory=list) 

581 """List of special "init" quanta: one for each task, and another for global 

582 init-outputs. 

583 """ 

584 

585 def update_from_old_quantum_graph(self, old_quantum_graph: QuantumGraph) -> None: 

586 """Update this model in-place by extracting from an old `QuantumGraph` 

587 instance. 

588 

589 Parameters 

590 ---------- 

591 old_quantum_graph : `QuantumGraph` 

592 Quantum graph. 

593 """ 

594 global_init_quantum = PredictedQuantumDatasetsModel.model_construct( 

595 quantum_id=generate_uuidv7(), task_label="" 

596 ) 

597 for ref in old_quantum_graph.globalInitOutputRefs(): 

598 global_init_quantum.outputs[ref.datasetType.name] = [PredictedDatasetModel.from_dataset_ref(ref)] 

599 self.root.append(global_init_quantum) 

600 for task_node in old_quantum_graph.pipeline_graph.tasks.values(): 

601 self.root.append( 

602 PredictedQuantumDatasetsModel.from_old_quantum_graph_init(task_node.init, old_quantum_graph) 

603 ) 

604 

605 # Work around the fact that Sphinx chokes on Pydantic docstring formatting, 

606 # when we inherit those docstrings in our public classes. 

607 if "sphinx" in sys.modules and not TYPE_CHECKING: 

608 

609 def copy(self, *args: Any, **kwargs: Any) -> Any: 

610 """See `pydantic.BaseModel.copy`.""" 

611 return super().copy(*args, **kwargs) 

612 

613 def model_dump(self, *args: Any, **kwargs: Any) -> Any: 

614 """See `pydantic.BaseModel.model_dump`.""" 

615 return super().model_dump(*args, **kwargs) 

616 

617 def model_dump_json(self, *args: Any, **kwargs: Any) -> Any: 

618 """See `pydantic.BaseModel.model_dump_json`.""" 

619 return super().model_dump(*args, **kwargs) 

620 

621 def model_copy(self, *args: Any, **kwargs: Any) -> Any: 

622 """See `pydantic.BaseModel.model_copy`.""" 

623 return super().model_copy(*args, **kwargs) 

624 

625 @classmethod 

626 def model_construct(cls, *args: Any, **kwargs: Any) -> Any: # type: ignore[misc, override] 

627 """See `pydantic.BaseModel.model_construct`.""" 

628 return super().model_construct(*args, **kwargs) 

629 

630 @classmethod 

631 def model_json_schema(cls, *args: Any, **kwargs: Any) -> Any: 

632 """See `pydantic.BaseModel.model_json_schema`.""" 

633 return super().model_json_schema(*args, **kwargs) 

634 

635 @classmethod 

636 def model_validate(cls, *args: Any, **kwargs: Any) -> Any: 

637 """See `pydantic.BaseModel.model_validate`.""" 

638 return super().model_validate(*args, **kwargs) 

639 

640 @classmethod 

641 def model_validate_json(cls, *args: Any, **kwargs: Any) -> Any: 

642 """See `pydantic.BaseModel.model_validate_json`.""" 

643 return super().model_validate_json(*args, **kwargs) 

644 

645 @classmethod 

646 def model_validate_strings(cls, *args: Any, **kwargs: Any) -> Any: 

647 """See `pydantic.BaseModel.model_validate_strings`.""" 

648 return super().model_validate_strings(*args, **kwargs) 

649 

650 

651class PredictedQuantumInfo(QuantumInfo): 

652 """A typed dictionary that annotates the attributes of the NetworkX graph 

653 node data for a predicted quantum. 

654 

655 Since NetworkX types are not generic over their node mapping type, this has 

656 to be used explicitly, e.g.:: 

657 

658 node_data: PredictedQuantumInfo = xgraph.nodes[quantum_id] 

659 

660 where ``xgraph`` can be either `PredictedQuantumGraph.quantum_only_xgraph` 

661 or `PredictedQuantumGraph.bipartite_xgraph`. 

662 """ 

663 

664 quantum: Quantum 

665 """Quantum object that can be passed directly to an executor. 

666 

667 This attribute is only present if 

668 `PredictedQuantumGraph.build_execution_quanta` has been run on this node's 

669 quantum ID already. 

670 """ 

671 

672 

673class PredictedDatasetInfo(DatasetInfo): 

674 """A typed dictionary that annotates the attributes of the NetworkX graph 

675 node data for a dataset. 

676 

677 Since NetworkX types are not generic over their node mapping type, this has 

678 to be used explicitly, e.g.:: 

679 

680 node_data: PredictedDatasetInfo = xgraph.nodes[dataset_ids] 

681 

682 where ``xgraph`` is from the `PredictedQuantumGraph.bipartite_xgraph` 

683 property. 

684 """ 

685 

686 

687class PredictedQuantumGraph(BaseQuantumGraph): 

688 """A directed acyclic graph that predicts a processing run and supports it 

689 during execution. 

690 

691 Parameters 

692 ---------- 

693 components : `PredictedQuantumGraphComponents` 

694 A struct of components used to construct the graph. 

695 

696 Notes 

697 ----- 

698 Iteration over a `PredictedQuantumGraph` yields loaded quantum IDs in 

699 deterministic topological order (but the tiebreaker is unspecified). The 

700 `len` of a `PredictedQuantumGraph` is the number of loaded non-init quanta, 

701 i.e. the same as the number of quanta iterated over. 

702 """ 

703 

704 def __init__(self, components: PredictedQuantumGraphComponents): 

705 if not components.header.graph_type == "predicted": 

706 raise TypeError(f"Header is for a {components.header.graph_type!r} graph, not 'predicted'.") 

707 super().__init__(components.header, components.pipeline_graph) 

708 self._quantum_only_xgraph = networkx.DiGraph() 

709 self._bipartite_xgraph = networkx.DiGraph() 

710 self._quanta_by_task_label: dict[str, dict[DataCoordinate, uuid.UUID]] = { 

711 task_label: {} for task_label in self.pipeline_graph.tasks.keys() 

712 } 

713 self._datasets_by_type: dict[str, dict[DataCoordinate, uuid.UUID]] = { 

714 dataset_type_name: {} for dataset_type_name in self.pipeline_graph.dataset_types.keys() 

715 } 

716 self._datasets_by_type[self.pipeline_graph.packages_dataset_type.name] = {} 

717 self._dimension_data = components.dimension_data 

718 self._add_init_quanta(components.init_quanta) 

719 self._quantum_datasets: dict[uuid.UUID, PredictedQuantumDatasetsModel] = {} 

720 self._expanded_data_ids: dict[DataCoordinate, DataCoordinate] = {} 

721 self._add_thin_graph(components.thin_graph) 

722 for quantum_datasets in components.quantum_datasets.values(): 

723 self._add_quantum_datasets(quantum_datasets) 

724 if not components.thin_graph.edges: 

725 # If we loaded the thin_graph, we've already populated this graph. 

726 self._quantum_only_xgraph.update( 

727 networkx.algorithms.bipartite.projected_graph( 

728 networkx.DiGraph(self._bipartite_xgraph), 

729 self._quantum_only_xgraph.nodes.keys(), 

730 ) 

731 ) 

732 if _LOG.isEnabledFor(logging.DEBUG): 

733 for quantum_id in self: 

734 _LOG.debug( 

735 "%s: %s @ %s", 

736 quantum_id, 

737 self._quantum_only_xgraph.nodes[quantum_id]["task_label"], 

738 self._quantum_only_xgraph.nodes[quantum_id]["data_id"].required, 

739 ) 

740 

741 def _add_init_quanta(self, component: PredictedInitQuantaModel) -> None: 

742 self._init_quanta = {q.task_label: q for q in component.root} 

743 empty_data_id = DataCoordinate.make_empty(self.pipeline_graph.universe) 

744 for quantum_datasets in self._init_quanta.values(): 

745 for init_datasets in itertools.chain( 

746 quantum_datasets.inputs.values(), quantum_datasets.outputs.values() 

747 ): 

748 for init_dataset in init_datasets: 

749 self._datasets_by_type[init_dataset.dataset_type_name][empty_data_id] = ( 

750 init_dataset.dataset_id 

751 ) 

752 _LOG.debug( 

753 "%s: %s @ init", 

754 quantum_datasets.quantum_id, 

755 quantum_datasets.task_label, 

756 ) 

757 

758 def _add_thin_graph(self, component: PredictedThinGraphModel) -> None: 

759 self._quantum_only_xgraph.add_edges_from(component.edges) 

760 for task_label, thin_quanta_for_task in component.quanta.items(): 

761 for thin_quantum in thin_quanta_for_task: 

762 self._add_quantum(thin_quantum.quantum_id, task_label, thin_quantum.data_coordinate) 

763 

764 def _add_quantum_datasets(self, quantum_datasets: PredictedQuantumDatasetsModel) -> None: 

765 self._quantum_datasets[quantum_datasets.quantum_id] = quantum_datasets 

766 self._add_quantum( 

767 quantum_datasets.quantum_id, quantum_datasets.task_label, quantum_datasets.data_coordinate 

768 ) 

769 task_node = self.pipeline_graph.tasks[quantum_datasets.task_label] 

770 for connection_name, input_datasets in quantum_datasets.inputs.items(): 

771 pipeline_edge = task_node.get_input_edge(connection_name) 

772 for input_dataset in input_datasets: 

773 self._add_dataset(input_dataset) 

774 self._bipartite_xgraph.add_edge( 

775 input_dataset.dataset_id, 

776 quantum_datasets.quantum_id, 

777 key=connection_name, 

778 is_read=True, 

779 ) 

780 # There might be multiple input connections for the same 

781 # dataset type. 

782 self._bipartite_xgraph.edges[ 

783 input_dataset.dataset_id, quantum_datasets.quantum_id 

784 ].setdefault("pipeline_edges", []).append(pipeline_edge) 

785 for connection_name, output_datasets in quantum_datasets.outputs.items(): 

786 pipeline_edges = [task_node.get_output_edge(connection_name)] 

787 for output_dataset in output_datasets: 

788 self._add_dataset(output_dataset) 

789 self._bipartite_xgraph.add_edge( 

790 quantum_datasets.quantum_id, 

791 output_dataset.dataset_id, 

792 key=connection_name, 

793 is_read=False, 

794 pipeline_edges=pipeline_edges, 

795 ) 

796 

797 def _add_quantum( 

798 self, quantum_id: uuid.UUID, task_label: str, data_coordinate_values: Sequence[DataIdValue] 

799 ) -> None: 

800 task_node = self.pipeline_graph.tasks[task_label] 

801 self._quantum_only_xgraph.add_node(quantum_id, task_label=task_label, pipeline_node=task_node) 

802 self._bipartite_xgraph.add_node(quantum_id, task_label=task_label, pipeline_node=task_node) 

803 data_coordinate_values = tuple(data_coordinate_values) 

804 dimensions = self.pipeline_graph.tasks[task_label].dimensions 

805 data_id = DataCoordinate.from_full_values(dimensions, tuple(data_coordinate_values)) 

806 self._quantum_only_xgraph.nodes[quantum_id].setdefault("data_id", data_id) 

807 self._bipartite_xgraph.nodes[quantum_id].setdefault("data_id", data_id) 

808 self._quanta_by_task_label[task_label][data_id] = quantum_id 

809 

810 def _add_dataset(self, model: PredictedDatasetModel) -> None: 

811 dataset_type_node = self.pipeline_graph.dataset_types[model.dataset_type_name] 

812 data_id = DataCoordinate.from_full_values(dataset_type_node.dimensions, tuple(model.data_coordinate)) 

813 self._bipartite_xgraph.add_node( 

814 model.dataset_id, 

815 dataset_type_name=dataset_type_node.name, 

816 pipeline_node=dataset_type_node, 

817 run=model.run, 

818 ) 

819 self._bipartite_xgraph.nodes[model.dataset_id].setdefault("data_id", data_id) 

820 self._datasets_by_type[model.dataset_type_name][data_id] = model.dataset_id 

821 

822 @classmethod 

823 def open( 

824 cls, 

825 uri: ResourcePathExpression, 

826 page_size: int = DEFAULT_PAGE_SIZE, 

827 import_mode: TaskImportMode = TaskImportMode.ASSUME_CONSISTENT_EDGES, 

828 ) -> AbstractContextManager[PredictedQuantumGraphReader]: 

829 """Open a quantum graph and return a reader to load from it. 

830 

831 Parameters 

832 ---------- 

833 uri : convertible to `lsst.resources.ResourcePath` 

834 URI to open. Should have a ``.qg`` extension. 

835 page_size : `int`, optional 

836 Approximate number of bytes to read at once from address files. 

837 Note that this does not set a page size for *all* reads, but it 

838 does affect the smallest, most numerous reads. 

839 import_mode : `.pipeline_graph.TaskImportMode`, optional 

840 How to handle importing the task classes referenced in the pipeline 

841 graph. 

842 

843 Returns 

844 ------- 

845 reader : `contextlib.AbstractContextManager` [ \ 

846 `PredictedQuantumGraphReader` ] 

847 A context manager that returns the reader when entered. 

848 """ 

849 return PredictedQuantumGraphReader.open(uri, page_size=page_size, import_mode=import_mode) 

850 

851 @classmethod 

852 def read_execution_quanta( 

853 cls, 

854 uri: ResourcePathExpression, 

855 quantum_ids: Iterable[uuid.UUID] | None = None, 

856 page_size: int = DEFAULT_PAGE_SIZE, 

857 ) -> PredictedQuantumGraph: 

858 """Read one or more executable quanta from a quantum graph file. 

859 

860 Parameters 

861 ---------- 

862 uri : convertible to `lsst.resources.ResourcePath` 

863 URI to open. Should have a ``.qg`` extension for new quantum graph 

864 files, or ``.qgraph`` for the old format. 

865 quantum_ids : `~collections.abc.Iterable` [ `uuid.UUID` ], optional 

866 Iterable of quantum IDs to load. If not provided, all quanta will 

867 be loaded. The UUIDs of special init quanta will be ignored. 

868 page_size : `int`, optional 

869 Approximate number of bytes to read at once from address files. 

870 Note that this does not set a page size for *all* reads, but it 

871 does affect the smallest, most numerous reads. 

872 

873 Returns 

874 ------- 

875 quantum_graph : `PredictedQuantumGraph` ] 

876 A quantum graph that can build execution quanta for all of the 

877 given IDs. 

878 """ 

879 return PredictedQuantumGraphComponents.read_execution_quanta( 

880 uri, 

881 quantum_ids, 

882 page_size=page_size, 

883 ).assemble() 

884 

885 @classmethod 

886 def make_empty( 

887 cls, 

888 universe: DimensionUniverse, 

889 *, 

890 output_run: str, 

891 inputs: Iterable[str] = (), 

892 output: str | None = None, 

893 add_packages: bool = True, 

894 ) -> PredictedQuantumGraph: 

895 """Make an empty quantum graph with no tasks. 

896 

897 Parameters 

898 ---------- 

899 universe : `lsst.daf.butler.DimensionUniverse` 

900 Definitions for all butler dimensions. 

901 output_run : `str` 

902 Output run collection. 

903 inputs : `~collections.abc.Iterable` [`str`], optional 

904 Iterable of input collection names. 

905 output : `str` or `None`, optional 

906 Output chained collection. 

907 add_packages : `bool`, optional 

908 Whether to add the special init quantum that writes the 'packages' 

909 dataset. The default (`True`) is consistent with 

910 `~..quantum_graph_builder.QuantumGraphBuilder` behavior when there 

911 are no regular quanta generated. 

912 

913 Returns 

914 ------- 

915 quantum_graph : `PredictedQuantumGraph` 

916 An empty quantum graph. 

917 """ 

918 return cls( 

919 PredictedQuantumGraphComponents.make_empty( 

920 universe, 

921 output_run=output_run, 

922 inputs=inputs, 

923 output=output, 

924 add_packages=add_packages, 

925 ) 

926 ) 

927 

928 @property 

929 def quanta_by_task(self) -> Mapping[str, Mapping[DataCoordinate, uuid.UUID]]: 

930 """A nested mapping of all quanta, keyed first by task name and then by 

931 data ID. 

932 

933 Notes 

934 ----- 

935 This is populated by the ``thin_graph`` component (all quanta are 

936 added) and the `quantum_datasets`` component (only loaded quanta are 

937 added). All tasks in the pipeline graph are included, even if none of 

938 their quanta were loaded (i.e. nested mappings may be empty). 

939 

940 The returned object may be an internal dictionary; as the type 

941 annotation indicates, it should not be modified in place. 

942 """ 

943 return self._quanta_by_task_label 

944 

945 @property 

946 def datasets_by_type(self) -> Mapping[str, Mapping[DataCoordinate, uuid.UUID]]: 

947 """A nested mapping of all datasets, keyed first by dataset type name 

948 and then by data ID. 

949 

950 Notes 

951 ----- 

952 This is populated only by the ``quantum_datasets`` and ``init_quanta`` 

953 components, and only datasets referenced by loaded quanta are present. 

954 All dataset types in the pipeline graph are included, even if none of 

955 their datasets were loaded (i.e. nested mappings may be empty). 

956 

957 The returned object may be an internal dictionary; as the type 

958 annotation indicates, it should not be modified in place. 

959 """ 

960 return self._datasets_by_type 

961 

962 @property 

963 def quantum_only_xgraph(self) -> networkx.DiGraph: 

964 """A directed acyclic graph with quanta as nodes and datasets elided. 

965 

966 Notes 

967 ----- 

968 Node keys are quantum UUIDs, and are populated by the ``thin_graph`` 

969 component (all nodes and edges) and ``quantum_datasets`` component 

970 (only those that were loaded). 

971 

972 Node state dictionaries are described by the 

973 `PredictedQuantumInfo` type. 

974 

975 The returned object is a read-only view of an internal one. 

976 """ 

977 return self._quantum_only_xgraph.copy(as_view=True) 

978 

979 @property 

980 def bipartite_xgraph(self) -> networkx.MultiDiGraph: 

981 """A directed acyclic graph with quantum and dataset nodes. 

982 

983 This graph never includes init-input and init-output datasets. 

984 

985 Notes 

986 ----- 

987 Node keys are quantum or dataset UUIDs. Nodes for quanta are present 

988 if the ``thin_graph`` component is loaded (all nodes) or if the 

989 ``quantum_datasets`` component is loaded (just loaded quanta). Edges 

990 and dataset nodes are only present for quanta whose 

991 ``quantum_datasets`` were loaded. 

992 

993 Node state dictionaries are described by the 

994 `PredictedQuantumInfo` and `PredictedDatasetInfo` types. 

995 

996 The returned object is a read-only view of an internal one. 

997 """ 

998 return self._bipartite_xgraph.copy(as_view=True) 

999 

1000 @property 

1001 def dimension_data(self) -> DimensionDataAttacher | None: 

1002 """All dimension records needed to expand the data IDS in the graph. 

1003 

1004 This may be `None` if the dimension data was not loaded. If all 

1005 execution quanta have been built, all records are guaranteed to have 

1006 been deserialized and the ``records`` attribute is complete. In other 

1007 cases some records may still only be present in the ``deserializers`` 

1008 attribute. 

1009 """ 

1010 return self._dimension_data 

1011 

1012 def __iter__(self) -> Iterator[uuid.UUID]: 

1013 for quanta_for_task in self.quanta_by_task.values(): 

1014 for data_id in sorted(quanta_for_task.keys()): 

1015 yield quanta_for_task[data_id] 

1016 

1017 def __len__(self) -> int: 

1018 return len(self._quantum_only_xgraph) 

1019 

1020 def get_init_inputs(self, task_label: str) -> dict[ConnectionName, DatasetRef]: 

1021 """Return the init-input datasets for the given task. 

1022 

1023 Parameters 

1024 ---------- 

1025 task_label : `str` 

1026 Label of the task. 

1027 

1028 Returns 

1029 ------- 

1030 init_inputs : `dict` [ `str`, `lsst.daf.butler.DatasetRef` ] 

1031 Dataset references for init-input datasets, keyed by connection 

1032 name. Dataset types storage classes match the task connection 

1033 declarations, not necessarily the data repository, and may be 

1034 components. 

1035 """ 

1036 if self._init_quanta is None: 

1037 raise IncompleteQuantumGraphError("The init_quanta component was not loaded.") 

1038 task_init_node = self.pipeline_graph.tasks[task_label].init 

1039 return { 

1040 connection_name: task_init_node.inputs[connection_name].adapt_dataset_ref( 

1041 self._make_init_ref(datasets[0]) 

1042 ) 

1043 for connection_name, datasets in self._init_quanta[task_label].inputs.items() 

1044 } 

1045 

1046 def get_init_outputs(self, task_label: str) -> dict[ConnectionName, DatasetRef]: 

1047 """Return the init-output datasets for the given task. 

1048 

1049 Parameters 

1050 ---------- 

1051 task_label : `str` 

1052 Label of the task. ``""`` may be used to get global init-outputs. 

1053 

1054 Returns 

1055 ------- 

1056 init_outputs : `dict` [ `str`, `lsst.daf.butler.DatasetRef` ] 

1057 Dataset references for init-outputs datasets, keyed by connection 

1058 name. Dataset types storage classes match the task connection 

1059 declarations, not necessarily the data repository. 

1060 """ 

1061 if self._init_quanta is None: 

1062 raise IncompleteQuantumGraphError("The init_quanta component was not loaded.") 

1063 if not task_label: 

1064 (datasets,) = self._init_quanta[""].outputs.values() 

1065 return { 

1066 acc.PACKAGES_INIT_OUTPUT_NAME: DatasetRef( 

1067 self.pipeline_graph.packages_dataset_type, 

1068 DataCoordinate.make_empty(self.pipeline_graph.universe), 

1069 run=datasets[0].run, 

1070 id=datasets[0].dataset_id, 

1071 conform=False, 

1072 ) 

1073 } 

1074 task_init_node = self.pipeline_graph.tasks[task_label].init 

1075 result: dict[ConnectionName, DatasetRef] = {} 

1076 for connection_name, datasets in self._init_quanta[task_label].outputs.items(): 

1077 if connection_name == acc.CONFIG_INIT_OUTPUT_CONNECTION_NAME: 

1078 edge = task_init_node.config_output 

1079 else: 

1080 edge = task_init_node.outputs[connection_name] 

1081 result[connection_name] = edge.adapt_dataset_ref(self._make_init_ref(datasets[0])) 

1082 return result 

1083 

1084 def _make_init_ref(self, dataset: PredictedDatasetModel) -> DatasetRef: 

1085 dataset_type = self.pipeline_graph.dataset_types[dataset.dataset_type_name].dataset_type 

1086 return DatasetRef( 

1087 dataset_type, 

1088 DataCoordinate.make_empty(self.pipeline_graph.universe), 

1089 run=dataset.run, 

1090 id=dataset.dataset_id, 

1091 conform=False, 

1092 ) 

1093 

1094 def build_execution_quanta( 

1095 self, 

1096 quantum_ids: Iterable[uuid.UUID] | None = None, 

1097 task_label: str | None = None, 

1098 ) -> dict[uuid.UUID, Quantum]: 

1099 """Build `lsst.daf.butler.Quantum` objects suitable for executing 

1100 tasks. 

1101 

1102 In addition to returning the quantum objects directly, this also causes 

1103 the `quantum_only_xgraph` and `bipartite_xgraph` graphs to include a 

1104 ``quantum`` attribute for the affected quanta. 

1105 

1106 Parameters 

1107 ---------- 

1108 quantum_ids : `~collections.abc.Iterable` [ `uuid.UUID` ], optional 

1109 IDs of all quanta to return. If not provided, all quanta for the 

1110 given task label (if given) or graph are returned. 

1111 task_label : `str`, optional 

1112 Task label whose quanta should be generated. Ignored if 

1113 ``quantum_ids`` is not `None`. 

1114 

1115 Returns 

1116 ------- 

1117 quanta : `dict` [ `uuid.UUID`, `lsst.daf.butler.Quantum` ] 

1118 Mapping of quanta, keyed by UUID. All dataset types are adapted to 

1119 the task's storage class declarations and inputs may be components. 

1120 All data IDs have dimension records attached. 

1121 """ 

1122 if not self._init_quanta: 

1123 raise IncompleteQuantumGraphError( 

1124 "Cannot build execution quanta without loading the ``init_quanta`` component." 

1125 ) 

1126 if quantum_ids is None: 

1127 if task_label is not None: 

1128 quantum_ids = self._quanta_by_task_label[task_label].values() 

1129 else: 

1130 quantum_ids = self._quantum_only_xgraph.nodes.keys() 

1131 else: 

1132 # Guard against single-pass iterators. 

1133 quantum_ids = list(quantum_ids) 

1134 del task_label # make sure we don't accidentally use this. 

1135 result: dict[uuid.UUID, Quantum] = {} 

1136 self._expand_execution_quantum_data_ids(quantum_ids) 

1137 task_init_datastore_records: dict[TaskLabel, dict[DatastoreName, DatastoreRecordData]] = {} 

1138 for quantum_id in quantum_ids: 

1139 quantum_node_dict: PredictedQuantumInfo = self._quantum_only_xgraph.nodes[quantum_id] 

1140 if "quantum" in quantum_node_dict: 

1141 result[quantum_id] = quantum_node_dict["quantum"] 

1142 continue 

1143 # We've declare the info dict keys to all be required because that 

1144 # saves a lot of casting, but the reality is that they can either 

1145 # be fully populated or totally unpopulated. But that makes mypy 

1146 # think the check above always succeeds. 

1147 try: # type:ignore [unreachable] 

1148 quantum_datasets = self._quantum_datasets[quantum_id] 

1149 except KeyError: 

1150 raise IncompleteQuantumGraphError( 

1151 f"Full quantum information for {quantum_id} was not loaded." 

1152 ) from None 

1153 task_node = self.pipeline_graph.tasks[quantum_datasets.task_label] 

1154 quantum_data_id = self._expanded_data_ids[self._bipartite_xgraph.nodes[quantum_id]["data_id"]] 

1155 inputs = self._build_execution_quantum_refs(task_node, quantum_datasets.inputs) 

1156 outputs = self._build_execution_quantum_refs(task_node, quantum_datasets.outputs) 

1157 if task_node.label not in task_init_datastore_records: 

1158 task_init_datastore_records[task_node.label] = self._init_quanta[ 

1159 task_node.label 

1160 ].deserialize_datastore_records() 

1161 quantum = Quantum( 

1162 taskName=task_node.task_class_name, 

1163 taskClass=task_node.task_class, 

1164 dataId=quantum_data_id, 

1165 initInputs={ 

1166 ref.datasetType: ref for ref in self.get_init_inputs(quantum_datasets.task_label).values() 

1167 }, 

1168 inputs=inputs, 

1169 outputs=outputs, 

1170 datastore_records=DatastoreRecordData.merge_mappings( 

1171 quantum_datasets.deserialize_datastore_records(), 

1172 task_init_datastore_records[task_node.label], 

1173 ), 

1174 ) 

1175 self._quantum_only_xgraph.nodes[quantum_id]["quantum"] = quantum 

1176 self._bipartite_xgraph.nodes[quantum_id]["quantum"] = quantum 

1177 result[quantum_id] = quantum 

1178 return result 

1179 

1180 def _expand_execution_quantum_data_ids(self, quantum_ids: Iterable[uuid.UUID]) -> None: 

1181 if self._dimension_data is None: 

1182 raise IncompleteQuantumGraphError( 

1183 "Cannot build execution quanta without loading the ``dimension_data`` component." 

1184 ) 

1185 data_ids_to_expand: dict[DimensionGroup, set[DataCoordinate]] = defaultdict(set) 

1186 for quantum_id in quantum_ids: 

1187 data_id: DataCoordinate = self._bipartite_xgraph.nodes[quantum_id]["data_id"] 

1188 if data_id.hasRecords(): 

1189 self._expanded_data_ids[data_id] = data_id 

1190 else: 

1191 data_ids_to_expand[data_id.dimensions].add(data_id) 

1192 for dataset_id in itertools.chain( 

1193 self._bipartite_xgraph.predecessors(quantum_id), 

1194 self._bipartite_xgraph.successors(quantum_id), 

1195 ): 

1196 data_id = self._bipartite_xgraph.nodes[dataset_id]["data_id"] 

1197 if data_id.hasRecords(): 

1198 self._expanded_data_ids[data_id] = data_id 

1199 else: 

1200 data_ids_to_expand[data_id.dimensions].add(data_id) 

1201 for dimensions, data_ids_for_dimensions in data_ids_to_expand.items(): 

1202 self._expanded_data_ids.update( 

1203 (d, d) for d in self._dimension_data.attach(dimensions, data_ids_for_dimensions) 

1204 ) 

1205 

1206 def _build_execution_quantum_refs( 

1207 self, task_node: TaskNode, model_mapping: dict[ConnectionName, list[PredictedDatasetModel]] 

1208 ) -> dict[DatasetType, list[DatasetRef]]: 

1209 results: dict[DatasetType, list[DatasetRef]] = {} 

1210 for connection_name, datasets in model_mapping.items(): 

1211 edge = task_node.get_edge(connection_name) 

1212 dataset_type = edge.adapt_dataset_type( 

1213 self.pipeline_graph.dataset_types[edge.parent_dataset_type_name].dataset_type 

1214 ) 

1215 results[dataset_type] = [self._make_general_ref(dataset_type, d.dataset_id) for d in datasets] 

1216 return results 

1217 

1218 def _make_general_ref(self, dataset_type: DatasetType, dataset_id: uuid.UUID) -> DatasetRef: 

1219 node_state = self._bipartite_xgraph.nodes[dataset_id] 

1220 data_id = self._expanded_data_ids[node_state["data_id"]] 

1221 return DatasetRef(dataset_type, data_id, run=node_state["run"], id=dataset_id) 

1222 

1223 def make_init_qbb( 

1224 self, 

1225 butler_config: Config | ResourcePathExpression, 

1226 *, 

1227 config_search_paths: Iterable[str] | None = None, 

1228 ) -> QuantumBackedButler: 

1229 """Construct an quantum-backed butler suitable for reading and writing 

1230 init input and init output datasets, respectively. 

1231 

1232 This only requires the ``init_quanta`` component to have been loaded. 

1233 

1234 Parameters 

1235 ---------- 

1236 butler_config : `~lsst.daf.butler.Config` or \ 

1237 `~lsst.resources.ResourcePathExpression` 

1238 A butler repository root, configuration filename, or configuration 

1239 instance. 

1240 config_search_paths : `~collections.abc.Iterable` [ `str` ], optional 

1241 Additional search paths for butler configuration. 

1242 

1243 Returns 

1244 ------- 

1245 qbb : `~lsst.daf.butler.QuantumBackedButler` 

1246 A limited butler that can ``get`` init-input datasets and ``put`` 

1247 init-output datasets. 

1248 """ 

1249 # Collect all init input/output dataset IDs. 

1250 predicted_inputs: set[uuid.UUID] = set() 

1251 predicted_outputs: set[uuid.UUID] = set() 

1252 datastore_record_maps: list[dict[DatastoreName, DatastoreRecordData]] = [] 

1253 for init_quantum_datasets in self._init_quanta.values(): 

1254 predicted_inputs.update( 

1255 d.dataset_id for d in itertools.chain.from_iterable(init_quantum_datasets.inputs.values()) 

1256 ) 

1257 predicted_outputs.update( 

1258 d.dataset_id for d in itertools.chain.from_iterable(init_quantum_datasets.outputs.values()) 

1259 ) 

1260 datastore_record_maps.append( 

1261 { 

1262 datastore_name: DatastoreRecordData.from_simple(serialized_records) 

1263 for datastore_name, serialized_records in init_quantum_datasets.datastore_records.items() 

1264 } 

1265 ) 

1266 # Remove intermediates from inputs. 

1267 predicted_inputs -= predicted_outputs 

1268 dataset_types = {d.name: d.dataset_type for d in self.pipeline_graph.dataset_types.values()} 

1269 # Make butler from everything. 

1270 return QuantumBackedButler.from_predicted( 

1271 config=butler_config, 

1272 predicted_inputs=predicted_inputs, 

1273 predicted_outputs=predicted_outputs, 

1274 dimensions=self.pipeline_graph.universe, 

1275 datastore_records=DatastoreRecordData.merge_mappings(*datastore_record_maps), 

1276 search_paths=list(config_search_paths) if config_search_paths is not None else None, 

1277 dataset_types=dataset_types, 

1278 ) 

1279 

1280 def write_init_outputs(self, butler: LimitedButler, skip_existing: bool = True) -> None: 

1281 """Write the init-output datasets for all tasks in the quantum graph. 

1282 

1283 This only requires the ``init_quanta`` component to have been loaded. 

1284 

1285 Parameters 

1286 ---------- 

1287 butler : `lsst.daf.butler.LimitedButler` 

1288 A limited butler data repository client. 

1289 skip_existing : `bool`, optional 

1290 If `True` (default) ignore init-outputs that already exist. If 

1291 `False`, raise. 

1292 

1293 Raises 

1294 ------ 

1295 lsst.daf.butler.registry.ConflictingDefinitionError 

1296 Raised if an init-output dataset already exists and 

1297 ``skip_existing=False``. 

1298 """ 

1299 # Extract init-input and init-output refs from the QG. 

1300 input_refs: dict[str, DatasetRef] = {} 

1301 output_refs: dict[str, DatasetRef] = {} 

1302 for task_node in self.pipeline_graph.tasks.values(): 

1303 if task_node.label not in self._init_quanta: 

1304 continue 

1305 input_refs.update( 

1306 {ref.datasetType.name: ref for ref in self.get_init_inputs(task_node.label).values()} 

1307 ) 

1308 output_refs.update( 

1309 { 

1310 ref.datasetType.name: ref 

1311 for ref in self.get_init_outputs(task_node.label).values() 

1312 if ref.datasetType.name != task_node.init.config_output.dataset_type_name 

1313 } 

1314 ) 

1315 for ref, is_stored in butler.stored_many(output_refs.values()).items(): 

1316 if is_stored: 

1317 if not skip_existing: 

1318 raise ConflictingDefinitionError(f"Init-output dataset {ref} already exists.") 

1319 # We'll `put` whatever's left in output_refs at the end. 

1320 del output_refs[ref.datasetType.name] 

1321 # Instantiate tasks, reading overall init-inputs and gathering 

1322 # init-output in-memory objects. 

1323 init_outputs: list[tuple[Any, DatasetType]] = [] 

1324 self.pipeline_graph.instantiate_tasks( 

1325 get_init_input=lambda dataset_type: butler.get( 

1326 input_refs[dataset_type.name].overrideStorageClass(dataset_type.storageClass) 

1327 ), 

1328 init_outputs=init_outputs, 

1329 # A task can be in the pipeline graph without having an init 

1330 # quantum if it doesn't have any regular quanta either (e.g. they 

1331 # were all skipped), and the _init_quanta has a "" entry for global 

1332 # init-outputs that we don't want to pass here. 

1333 labels=self.pipeline_graph.tasks.keys() & self._init_quanta.keys(), 

1334 ) 

1335 # Write init-outputs that weren't already present. 

1336 for obj, dataset_type in init_outputs: 

1337 if new_ref := output_refs.get(dataset_type.name): 

1338 assert new_ref.datasetType.storageClass_name == dataset_type.storageClass_name, ( 

1339 "QG init refs should use task connection storage classes." 

1340 ) 

1341 butler.put(obj, new_ref) 

1342 

1343 def write_configs(self, butler: LimitedButler, compare_existing: bool = True) -> None: 

1344 """Write the config datasets for all tasks in the quantum graph. 

1345 

1346 Parameters 

1347 ---------- 

1348 butler : `lsst.daf.butler.LimitedButler` 

1349 A limited butler data repository client. 

1350 compare_existing : `bool`, optional 

1351 If `True` check configs that already exist for consistency. If 

1352 `False`, always raise if configs already exist. 

1353 

1354 Raises 

1355 ------ 

1356 lsst.daf.butler.registry.ConflictingDefinitionError 

1357 Raised if an config dataset already exists and 

1358 ``compare_existing=False``, or if the existing config is not 

1359 consistent with the config in the quantum graph. 

1360 """ 

1361 to_put: list[tuple[PipelineTaskConfig, DatasetRef]] = [] 

1362 for task_node in self.pipeline_graph.tasks.values(): 

1363 if task_node.label not in self._init_quanta: 

1364 continue 

1365 dataset_type_name = task_node.init.config_output.dataset_type_name 

1366 ref = self.get_init_outputs(task_node.label)[acc.CONFIG_INIT_OUTPUT_CONNECTION_NAME] 

1367 try: 

1368 old_config = butler.get(ref) 

1369 except (LookupError, FileNotFoundError): 

1370 old_config = None 

1371 if old_config is not None: 

1372 if not compare_existing: 

1373 raise ConflictingDefinitionError(f"Config dataset {ref} already exists.") 

1374 if not task_node.config.compare(old_config, shortcut=False, output=log_config_mismatch): 

1375 raise ConflictingDefinitionError( 

1376 f"Config does not match existing task config {dataset_type_name!r} in " 

1377 "butler; tasks configurations must be consistent within the same run collection." 

1378 ) 

1379 else: 

1380 to_put.append((task_node.config, ref)) 

1381 # We do writes at the end to minimize the mess we leave behind when we 

1382 # raise an exception. 

1383 for config, ref in to_put: 

1384 butler.put(config, ref) 

1385 

1386 def write_packages(self, butler: LimitedButler, compare_existing: bool = True) -> None: 

1387 """Write the 'packages' dataset for the currently-active software 

1388 versions. 

1389 

1390 Parameters 

1391 ---------- 

1392 butler : `lsst.daf.butler.LimitedButler` 

1393 A limited butler data repository client. 

1394 compare_existing : `bool`, optional 

1395 If `True` check packages that already exist for consistency. If 

1396 `False`, always raise if the packages dataset already exists. 

1397 

1398 Raises 

1399 ------ 

1400 lsst.daf.butler.registry.ConflictingDefinitionError 

1401 Raised if the packages dataset already exists and is not consistent 

1402 with the current packages. 

1403 """ 

1404 new_packages = Packages.fromSystem() 

1405 (ref,) = self.get_init_outputs("").values() 

1406 try: 

1407 packages = butler.get(ref) 

1408 except (LookupError, FileNotFoundError): 

1409 packages = None 

1410 if packages is not None: 

1411 if not compare_existing: 

1412 raise ConflictingDefinitionError(f"Packages dataset {ref} already exists.") 

1413 if compare_packages(packages, new_packages): 

1414 # have to remove existing dataset first; butler has no 

1415 # replace option. 

1416 butler.pruneDatasets([ref], unstore=True, purge=True) 

1417 butler.put(packages, ref) 

1418 else: 

1419 butler.put(new_packages, ref) 

1420 

1421 def init_output_run(self, butler: LimitedButler, existing: bool = True) -> None: 

1422 """Initialize a new output RUN collection by writing init-output 

1423 datasets (including configs and packages). 

1424 

1425 Parameters 

1426 ---------- 

1427 butler : `lsst.daf.butler.LimitedButler` 

1428 A limited butler data repository client. 

1429 existing : `bool`, optional 

1430 If `True` check or ignore outputs that already exist. If 

1431 `False`, always raise if an output dataset already exists. 

1432 

1433 Raises 

1434 ------ 

1435 lsst.daf.butler.registry.ConflictingDefinitionError 

1436 Raised if there are existing init output datasets, and either 

1437 ``existing=False`` or their contents are not compatible with this 

1438 graph. 

1439 """ 

1440 self.write_configs(butler, compare_existing=existing) 

1441 self.write_packages(butler, compare_existing=existing) 

1442 self.write_init_outputs(butler, skip_existing=existing) 

1443 

1444 @classmethod 

1445 def from_old_quantum_graph(cls, old_quantum_graph: QuantumGraph) -> PredictedQuantumGraph: 

1446 """Construct from an old `QuantumGraph` instance. 

1447 

1448 Parameters 

1449 ---------- 

1450 old_quantum_graph : `QuantumGraph` 

1451 Quantum graph to transform. 

1452 

1453 Returns 

1454 ------- 

1455 predicted_quantum_graph : `PredictedQuantumGraph` 

1456 A new predicted quantum graph. 

1457 """ 

1458 return PredictedQuantumGraphComponents.from_old_quantum_graph(old_quantum_graph).assemble() 

1459 

1460 def to_old_quantum_graph(self) -> QuantumGraph: 

1461 """Transform into an old `QuantumGraph` instance. 

1462 

1463 Returns 

1464 ------- 

1465 old_quantum_graph : `QuantumGraph` 

1466 Old quantum graph. 

1467 

1468 Notes 

1469 ----- 

1470 This can only be called on graphs that have loaded all quantum 

1471 datasets, init datasets, and dimension records. 

1472 """ 

1473 from ..graph import QuantumGraph 

1474 

1475 quanta: dict[TaskDef, set[Quantum]] = {} 

1476 quantum_to_quantum_id: dict[Quantum, uuid.UUID] = {} 

1477 init_inputs: dict[TaskDef, list[DatasetRef]] = {} 

1478 init_outputs: dict[TaskDef, list[DatasetRef]] = {} 

1479 for task_def in self.pipeline_graph._iter_task_defs(): 

1480 if not self._quanta_by_task_label.get(task_def.label): 

1481 continue 

1482 quanta_for_task: set[Quantum] = set() 

1483 for quantum_id, quantum in self.build_execution_quanta(task_label=task_def.label).items(): 

1484 quanta_for_task.add(quantum) 

1485 quantum_to_quantum_id[quantum] = quantum_id 

1486 quanta[task_def] = quanta_for_task 

1487 init_inputs[task_def] = list(self.get_init_inputs(task_def.label).values()) 

1488 init_outputs[task_def] = list(self.get_init_outputs(task_def.label).values()) 

1489 global_init_outputs = list(self.get_init_outputs("").values()) 

1490 registry_dataset_types = [d.dataset_type for d in self.pipeline_graph.dataset_types.values()] 

1491 result = object.__new__(QuantumGraph) 

1492 result._buildGraphs( 

1493 quanta, 

1494 _quantumToNodeId=quantum_to_quantum_id, 

1495 metadata=self.header.to_old_metadata(), 

1496 universe=self.pipeline_graph.universe, 

1497 initInputs=init_inputs, 

1498 initOutputs=init_outputs, 

1499 globalInitOutputs=global_init_outputs, 

1500 registryDatasetTypes=registry_dataset_types, 

1501 ) 

1502 return result 

1503 

1504 def _make_summary(self) -> QgraphSummary: 

1505 from ..graph import QgraphSummary, QgraphTaskSummary 

1506 

1507 summary = QgraphSummary( 

1508 cmdLine=self.header.command or None, 

1509 creationUTC=str(self.header.timestamp) if self.header.timestamp is not None else None, 

1510 inputCollection=self.header.inputs or None, 

1511 outputCollection=self.header.output, 

1512 outputRun=self.header.output_run, 

1513 ) 

1514 for task_label, quanta_for_task in self.quanta_by_task.items(): 

1515 task_summary = QgraphTaskSummary(taskLabel=task_label, numQuanta=len(quanta_for_task)) 

1516 task_node = self.pipeline_graph.tasks[task_label] 

1517 for quantum_id in quanta_for_task.values(): 

1518 quantum_datasets = self._quantum_datasets[quantum_id] 

1519 for connection_name, input_datasets in quantum_datasets.inputs.items(): 

1520 task_summary.numInputs[ 

1521 task_node.get_input_edge(connection_name).parent_dataset_type_name 

1522 ] += len(input_datasets) 

1523 for connection_name, output_datasets in quantum_datasets.outputs.items(): 

1524 task_summary.numOutputs[ 

1525 task_node.get_output_edge(connection_name).parent_dataset_type_name 

1526 ] += len(output_datasets) 

1527 summary.qgraphTaskSummaries[task_label] = task_summary 

1528 return summary 

1529 

1530 

1531@dataclasses.dataclass(kw_only=True) 

1532class PredictedQuantumGraphComponents: 

1533 """A helper class for building and writing predicted quantum graphs. 

1534 

1535 Notes 

1536 ----- 

1537 This class is a simple struct of model classes to allow different tools 

1538 that build predicted quantum graphs to assemble them in whatever order they 

1539 prefer. It does not enforce any internal invariants (e.g. the quantum and 

1540 dataset counts in the header, different representations of quanta, internal 

1541 ID sorting, etc.), but it does provide methods that can satisfy them. 

1542 """ 

1543 

1544 def __post_init__(self) -> None: 

1545 self.header.graph_type = "predicted" 

1546 

1547 header: HeaderModel = dataclasses.field(default_factory=HeaderModel) 

1548 """Basic metadata about the graph.""" 

1549 

1550 pipeline_graph: PipelineGraph 

1551 """Description of the pipeline this graph runs, including all task label 

1552 and dataset type definitions. 

1553 

1554 This may include tasks that do not have any quanta (e.g. due to skipping 

1555 already-executed tasks). 

1556 

1557 This also includes the dimension universe used to construct the graph. 

1558 """ 

1559 

1560 dimension_data: DimensionDataAttacher | None = None 

1561 """Object that can attach dimension records to data IDs. 

1562 """ 

1563 

1564 init_quanta: PredictedInitQuantaModel = dataclasses.field(default_factory=PredictedInitQuantaModel) 

1565 """A list of special quanta that describe the init-inputs and init-outputs 

1566 of the graph. 

1567 

1568 Tasks that are included in the pipeline graph but do not have any quanta 

1569 may or may not have an init quantum, but tasks that do have regular quanta 

1570 always have an init quantum as well. 

1571 

1572 When used to construct a `PredictedQuantumGraph`, this must have either 

1573 zero entries or all tasks in the pipeline. 

1574 """ 

1575 

1576 thin_graph: PredictedThinGraphModel = dataclasses.field(default_factory=PredictedThinGraphModel) 

1577 """A lightweight quantum-quantum DAG with task labels and data IDs only. 

1578 

1579 This does not include the special "init" quanta. 

1580 """ 

1581 

1582 quantum_datasets: dict[uuid.UUID, PredictedQuantumDatasetsModel] = dataclasses.field(default_factory=dict) 

1583 """The full descriptions of all quanta, including input and output 

1584 dataset, keyed by UUID. 

1585 

1586 When used to construct a `PredictedQuantumGraph`, this need not have all 

1587 entries. 

1588 

1589 This does not include special "init" quanta. 

1590 """ 

1591 

1592 @classmethod 

1593 def make_empty( 

1594 cls, 

1595 universe: DimensionUniverse, 

1596 *, 

1597 output_run: str, 

1598 inputs: Iterable[str] = (), 

1599 output: str | None = None, 

1600 add_packages: bool = True, 

1601 ) -> PredictedQuantumGraphComponents: 

1602 """Make components for an empty quantum graph with no tasks. 

1603 

1604 Parameters 

1605 ---------- 

1606 universe : `lsst.daf.butler.DimensionUniverse` 

1607 Definitions for all butler dimensions. 

1608 output_run : `str` 

1609 Output run collection. 

1610 inputs : `~collections.abc.Iterable` [`str`], optional 

1611 Iterable of input collection names. 

1612 output : `str` or `None`, optional 

1613 Output chained collection. 

1614 add_packages : `bool`, optional 

1615 Whether to add the special init quantum that writes the 'packages' 

1616 dataset. The default (`True`) is consistent with 

1617 `~..quantum_graph_builder.QuantumGraphBuilder` behavior when there 

1618 are no regular quanta generated. 

1619 

1620 Returns 

1621 ------- 

1622 components : `PredictedQuantumGraphComponents` 

1623 Components that can be used to build or write an empty quantum 

1624 graph. 

1625 """ 

1626 components = cls(pipeline_graph=PipelineGraph(universe=universe)) 

1627 components.header.inputs = list(inputs) 

1628 components.header.output_run = output_run 

1629 components.header.output = output 

1630 if add_packages: 

1631 components.init_quanta.root = [ 

1632 PredictedQuantumDatasetsModel.model_construct( 

1633 quantum_id=generate_uuidv7(), 

1634 task_label="", 

1635 outputs={ 

1636 acc.PACKAGES_INIT_OUTPUT_NAME: [ 

1637 PredictedDatasetModel( 

1638 dataset_id=generate_uuidv7(), 

1639 dataset_type_name=acc.PACKAGES_INIT_OUTPUT_NAME, 

1640 data_coordinate=[], 

1641 run=output_run, 

1642 ) 

1643 ] 

1644 }, 

1645 ) 

1646 ] 

1647 return components 

1648 

1649 def make_dataset_ref(self, predicted: PredictedDatasetModel) -> DatasetRef: 

1650 """Make a `lsst.daf.butler.DatasetRef` from information in the 

1651 predicted quantum graph. 

1652 

1653 Parameters 

1654 ---------- 

1655 predicted : `PredictedDatasetModel` 

1656 Model for the dataset in the predicted graph. 

1657 

1658 Returns 

1659 ------- 

1660 ref : `lsst.daf.butler.DatasetRef` 

1661 A dataset reference. Data ID will be expanded if and only if 

1662 the dimension data has been loaded. 

1663 """ 

1664 try: 

1665 dataset_type = self.pipeline_graph.dataset_types[predicted.dataset_type_name].dataset_type 

1666 except KeyError: 

1667 if predicted.dataset_type_name == acc.PACKAGES_INIT_OUTPUT_NAME: 

1668 dataset_type = self.pipeline_graph.packages_dataset_type 

1669 else: 

1670 raise 

1671 data_id = DataCoordinate.from_full_values(dataset_type.dimensions, tuple(predicted.data_coordinate)) 

1672 if self.dimension_data is not None: 

1673 (data_id,) = self.dimension_data.attach(dataset_type.dimensions, [data_id]) 

1674 return DatasetRef( 

1675 dataset_type, 

1676 data_id, 

1677 run=predicted.run, 

1678 id=predicted.dataset_id, 

1679 ) 

1680 

1681 def set_thin_graph(self) -> None: 

1682 """Populate the `thin_graph` component from the `pipeline_graph`, 

1683 `quantum_datasets` components (which must be complete). 

1684 """ 

1685 bipartite_xgraph = networkx.DiGraph() 

1686 self.thin_graph.quanta = {task_label: [] for task_label in self.pipeline_graph.tasks} 

1687 graph_quantum_ids: list[uuid.UUID] = [] 

1688 for quantum_datasets in self.quantum_datasets.values(): 

1689 self.thin_graph.quanta[quantum_datasets.task_label].append( 

1690 PredictedThinQuantumModel.model_construct( 

1691 quantum_id=quantum_datasets.quantum_id, 

1692 data_coordinate=quantum_datasets.data_coordinate, 

1693 ) 

1694 ) 

1695 for dataset in itertools.chain.from_iterable(quantum_datasets.inputs.values()): 

1696 bipartite_xgraph.add_edge(dataset.dataset_id, quantum_datasets.quantum_id) 

1697 for dataset in itertools.chain.from_iterable(quantum_datasets.outputs.values()): 

1698 bipartite_xgraph.add_edge(quantum_datasets.quantum_id, dataset.dataset_id) 

1699 graph_quantum_ids.append(quantum_datasets.quantum_id) 

1700 quantum_only_xgraph: networkx.DiGraph = networkx.bipartite.projected_graph( 

1701 bipartite_xgraph, graph_quantum_ids 

1702 ) 

1703 self.thin_graph.edges = list(quantum_only_xgraph.edges) 

1704 

1705 def set_header_counts(self) -> None: 

1706 """Populate the quantum and dataset counts in the header from the 

1707 `thin_graph`, `init_quanta`, and `quantum_datasets` components. 

1708 """ 

1709 self.header.n_quanta = len(self.quantum_datasets) 

1710 self.header.n_task_quanta = { 

1711 task_label: len(thin_quanta) for task_label, thin_quanta in self.thin_graph.quanta.items() 

1712 } 

1713 all_dataset_ids: set[uuid.UUID] = set() 

1714 for quantum_datasets in itertools.chain(self.init_quanta.root, self.quantum_datasets.values()): 

1715 all_dataset_ids.update(quantum_datasets.iter_dataset_ids()) 

1716 self.header.n_datasets = len(all_dataset_ids) 

1717 

1718 def update_output_run(self, output_run: str) -> None: 

1719 """Update the output `~lsst.daf.butler.CollectionType.RUN` collection 

1720 name in all datasets and regenerate all output dataset and quantum 

1721 UUIDs. 

1722 

1723 Parameters 

1724 ---------- 

1725 output_run : `str` 

1726 New output `~lsst.daf.butler.CollectionType.RUN` collection name. 

1727 """ 

1728 uuid_map: dict[uuid.UUID, uuid.UUID] = {} 

1729 # Do all outputs and then all inputs in separate passes so we don't 

1730 # need to rely on topological ordering of anything. 

1731 for quantum_datasets in itertools.chain(self.init_quanta.root, self.quantum_datasets.values()): 

1732 new_quantum_id = generate_uuidv7() 

1733 quantum_datasets.quantum_id = new_quantum_id 

1734 for output_dataset in itertools.chain.from_iterable(quantum_datasets.outputs.values()): 

1735 assert output_dataset.run == self.header.output_run, ( 

1736 f"Incorrect run {output_dataset.run} for output dataset {output_dataset.dataset_id}." 

1737 ) 

1738 new_dataset_id = generate_uuidv7() 

1739 uuid_map[output_dataset.dataset_id] = new_dataset_id 

1740 output_dataset.dataset_id = new_dataset_id 

1741 output_dataset.run = output_run 

1742 for quantum_datasets in itertools.chain(self.init_quanta.root, self.quantum_datasets.values()): 

1743 for input_dataset in itertools.chain.from_iterable(quantum_datasets.inputs.values()): 

1744 if input_dataset.run == self.header.output_run: 

1745 input_dataset.run = output_run 

1746 input_dataset.dataset_id = uuid_map.get( 

1747 input_dataset.dataset_id, 

1748 # This dataset isn't necessary an output of the graph 

1749 # just because it's in the output run; the graph could 

1750 # have been built with extend_run=True. 

1751 input_dataset.dataset_id, 

1752 ) 

1753 # Update the keys of the quantum_datasets dict. 

1754 self.quantum_datasets = {qd.quantum_id: qd for qd in self.quantum_datasets.values()} 

1755 # Since the UUIDs have changed, the thin graph needs to be rewritten. 

1756 self.set_thin_graph() 

1757 # Update the header last, since we use it above to get the old run. 

1758 self.header.output_run = output_run 

1759 

1760 def assemble(self) -> PredictedQuantumGraph: 

1761 """Construct a `PredictedQuantumGraph` from these components.""" 

1762 return PredictedQuantumGraph(self) 

1763 

1764 @classmethod 

1765 def read_execution_quanta( 

1766 cls, 

1767 uri: ResourcePathExpression, 

1768 quantum_ids: Iterable[uuid.UUID] | None = None, 

1769 page_size: int = DEFAULT_PAGE_SIZE, 

1770 ) -> PredictedQuantumGraphComponents: 

1771 """Read one or more executable quanta from a quantum graph file. 

1772 

1773 Parameters 

1774 ---------- 

1775 uri : convertible to `lsst.resources.ResourcePath` 

1776 URI to open. Should have a ``.qg`` extension for new quantum graph 

1777 files, or ``.qgraph`` for the old format. 

1778 quantum_ids : `~collections.abc.Iterable` [ `uuid.UUID` ], optional 

1779 Iterable of quantum IDs to load. If not provided, all quanta will 

1780 be loaded. The UUIDs of special init quanta will be ignored. 

1781 page_size : `int`, optional 

1782 Approximate number of bytes to read at once from address files. 

1783 Note that this does not set a page size for *all* reads, but it 

1784 does affect the smallest, most numerous reads. 

1785 

1786 Returns 

1787 ------- 

1788 components : `PredictedQuantumGraphComponents` ] 

1789 Components for quantum graph that can build execution quanta for 

1790 all of the given IDs. 

1791 """ 

1792 uri = ResourcePath(uri) 

1793 if uri.getExtension() == ".qgraph": 

1794 _LOG.warning( 

1795 f"Reading and converting old quantum graph {uri}. " 

1796 "Use the '.qg' extension to write in the new format." 

1797 ) 

1798 from ..graph import QuantumGraph 

1799 

1800 old_qg = QuantumGraph.loadUri(uri, nodes=quantum_ids) 

1801 return PredictedQuantumGraphComponents.from_old_quantum_graph(old_qg) 

1802 

1803 with PredictedQuantumGraph.open(uri, page_size=page_size) as reader: 

1804 reader.read_execution_quanta(quantum_ids) 

1805 return reader.components 

1806 

1807 @classmethod 

1808 def from_old_quantum_graph(cls, old_quantum_graph: QuantumGraph) -> PredictedQuantumGraphComponents: 

1809 """Construct from an old `QuantumGraph` instance. 

1810 

1811 Parameters 

1812 ---------- 

1813 old_quantum_graph : `QuantumGraph` 

1814 Quantum graph to transform. 

1815 

1816 Returns 

1817 ------- 

1818 components : `PredictedQuantumGraphComponents` 

1819 Components for a new predicted quantum graph. 

1820 """ 

1821 header = HeaderModel.from_old_quantum_graph(old_quantum_graph) 

1822 result = cls(header=header, pipeline_graph=old_quantum_graph.pipeline_graph) 

1823 result.init_quanta.update_from_old_quantum_graph(old_quantum_graph) 

1824 dimension_data_extractor = DimensionDataExtractor.from_dimension_group( 

1825 old_quantum_graph.pipeline_graph.get_all_dimensions() 

1826 ) 

1827 for task_node in old_quantum_graph.pipeline_graph.tasks.values(): 

1828 task_quanta = old_quantum_graph.get_task_quanta(task_node.label) 

1829 for quantum_id, quantum in task_quanta.items(): 

1830 result.quantum_datasets[quantum_id] = PredictedQuantumDatasetsModel.from_execution_quantum( 

1831 task_node, quantum, quantum_id 

1832 ) 

1833 dimension_data_extractor.update([cast(DataCoordinate, quantum.dataId)]) 

1834 for refs in itertools.chain(quantum.inputs.values(), quantum.outputs.values()): 

1835 dimension_data_extractor.update(ref.dataId for ref in refs) 

1836 result.dimension_data = DimensionDataAttacher( 

1837 records=dimension_data_extractor.records.values(), 

1838 dimensions=result.pipeline_graph.get_all_dimensions(), 

1839 ) 

1840 result.set_thin_graph() 

1841 result.set_header_counts() 

1842 return result 

1843 

1844 def write( 

1845 self, 

1846 uri: ResourcePathExpression, 

1847 *, 

1848 zstd_level: int = 10, 

1849 zstd_dict_size: int = 32768, 

1850 zstd_dict_n_inputs: int = 512, 

1851 ) -> None: 

1852 """Write the graph to a file. 

1853 

1854 Parameters 

1855 ---------- 

1856 uri : convertible to `lsst.resources.ResourcePath` 

1857 Path to write to. Should have a ``.qg`` extension, or ``.qgraph`` 

1858 to force writing the old format. 

1859 zstd_level : `int`, optional 

1860 ZStandard compression level to use on JSON blocks. 

1861 zstd_dict_size : `int`, optional 

1862 Size of a ZStandard dictionary that shares compression information 

1863 across components. Set to zero to disable the dictionary. 

1864 Dictionary compression is automatically disabled if the number of 

1865 quanta is smaller than ``zstd_dict_n_inputs``. 

1866 zstd_dict_n_inputs : `int`, optional 

1867 Maximum number of `PredictedQuantumDatasetsModel` JSON 

1868 representations to feed the ZStandard dictionary training routine. 

1869 

1870 Notes 

1871 ----- 

1872 Only a complete predicted quantum graph with all components fully 

1873 populated should be written. 

1874 """ 

1875 if self.header.n_task_quanta != { 

1876 task_label: len(quanta) for task_label, quanta in self.thin_graph.quanta.items() 

1877 }: 

1878 raise RuntimeError( 

1879 "Cannot save graph after partial read of quanta: thin graph is inconsistent with header." 

1880 ) 

1881 # Ensure we record the actual version we're about to write, in case 

1882 # we're rewriting an old graph in a new format. 

1883 self.header.version = FORMAT_VERSION 

1884 uri = ResourcePath(uri) 

1885 match uri.getExtension(): 

1886 case ".qg": 

1887 pass 

1888 case ".qgraph": 

1889 _LOG.warning( 

1890 "Converting to an old-format quantum graph.. " 

1891 "Use '.qg' instead of '.qgraph' to save in the new format." 

1892 ) 

1893 old_qg = self.assemble().to_old_quantum_graph() 

1894 old_qg.saveUri(uri) 

1895 return 

1896 case ext: 

1897 raise ValueError( 

1898 f"Unsupported extension {ext!r} for quantum graph; " 

1899 "expected '.qg' (or '.qgraph' to force the old format)." 

1900 ) 

1901 cdict_data: bytes | None = None 

1902 quantum_datasets_json: dict[uuid.UUID, bytes] = {} 

1903 if len(self.quantum_datasets) < zstd_dict_n_inputs: 

1904 # ZStandard will fail if we ask to use a compression dict without 

1905 # giving it enough data, and it only helps if we have a lot of 

1906 # quanta. 

1907 zstd_dict_size = 0 

1908 if zstd_dict_size: 

1909 quantum_datasets_json = { 

1910 quantum_model.quantum_id: quantum_model.model_dump_json().encode() 

1911 for quantum_model in itertools.islice(self.quantum_datasets.values(), zstd_dict_n_inputs) 

1912 } 

1913 try: 

1914 cdict_data = zstandard.train_dictionary( 

1915 zstd_dict_size, 

1916 list(quantum_datasets_json.values()), 

1917 level=zstd_level, 

1918 ).as_bytes() 

1919 except zstandard.ZstdError as err: 

1920 warnings.warn(f"Not using a compression dictionary: {err}.") 

1921 with BaseQuantumGraphWriter.open( 

1922 uri, 

1923 header=self.header, 

1924 pipeline_graph=self.pipeline_graph, 

1925 address_filename="quanta", 

1926 cdict_data=cdict_data, 

1927 zstd_level=zstd_level, 

1928 ) as writer: 

1929 writer.write_single_model("thin_graph", self.thin_graph) 

1930 if self.dimension_data is None: 

1931 raise IncompleteQuantumGraphError( 

1932 "Cannot save predicted quantum graph with no dimension data." 

1933 ) 

1934 serialized_dimension_data = self.dimension_data.serialized() 

1935 writer.write_single_model("dimension_data", serialized_dimension_data) 

1936 del serialized_dimension_data 

1937 writer.write_single_model("init_quanta", self.init_quanta) 

1938 with MultiblockWriter.open_in_zip( 

1939 writer.zf, "quantum_datasets", writer.int_size 

1940 ) as quantum_datasets_mb: 

1941 for quantum_model in self.quantum_datasets.values(): 

1942 if json_data := quantum_datasets_json.get(quantum_model.quantum_id): 

1943 quantum_datasets_mb.write_bytes( 

1944 quantum_model.quantum_id, writer.compressor.compress(json_data) 

1945 ) 

1946 else: 

1947 quantum_datasets_mb.write_model( 

1948 quantum_model.quantum_id, quantum_model, writer.compressor 

1949 ) 

1950 writer.address_writer.addresses.append(quantum_datasets_mb.addresses) 

1951 

1952 

1953@dataclasses.dataclass 

1954class PredictedQuantumGraphReader(BaseQuantumGraphReader): 

1955 """A helper class for reading predicted quantum graphs.""" 

1956 

1957 components: PredictedQuantumGraphComponents = dataclasses.field(init=False) 

1958 """Quantum graph components populated by this reader's methods.""" 

1959 

1960 @classmethod 

1961 @contextmanager 

1962 def open( 

1963 cls, 

1964 uri: ResourcePathExpression, 

1965 *, 

1966 page_size: int = DEFAULT_PAGE_SIZE, 

1967 import_mode: TaskImportMode = TaskImportMode.ASSUME_CONSISTENT_EDGES, 

1968 ) -> Iterator[PredictedQuantumGraphReader]: 

1969 """Construct a reader from a URI. 

1970 

1971 Parameters 

1972 ---------- 

1973 uri : convertible to `lsst.resources.ResourcePath` 

1974 URI to open. Should have a ``.qg`` extension. 

1975 page_size : `int`, optional 

1976 Approximate number of bytes to read at once from address files. 

1977 Note that this does not set a page size for *all* reads, but it 

1978 does affect the smallest, most numerous reads. 

1979 import_mode : `.pipeline_graph.TaskImportMode`, optional 

1980 How to handle importing the task classes referenced in the pipeline 

1981 graph. 

1982 

1983 Returns 

1984 ------- 

1985 reader : `contextlib.AbstractContextManager` [ \ 

1986 `PredictedQuantumGraphReader` ] 

1987 A context manager that returns the reader when entered. 

1988 """ 

1989 with cls._open( 

1990 uri, 

1991 graph_type="predicted", 

1992 address_filename="quanta", 

1993 page_size=page_size, 

1994 import_mode=import_mode, 

1995 n_addresses=1, 

1996 ) as self: 

1997 yield self 

1998 

1999 def __post_init__(self) -> None: 

2000 self.components = PredictedQuantumGraphComponents( 

2001 header=self.header, pipeline_graph=self.pipeline_graph 

2002 ) 

2003 

2004 def finish(self) -> PredictedQuantumGraph: 

2005 """Construct a `PredictedQuantumGraph` instance from this reader.""" 

2006 return self.components.assemble() 

2007 

2008 def read_all(self) -> None: 

2009 """Read all components in full.""" 

2010 self.read_thin_graph() 

2011 self.read_execution_quanta() 

2012 

2013 def read_thin_graph(self) -> None: 

2014 """Read the thin graph. 

2015 

2016 The thin graph is a quantum-quantum DAG with just task labels and data 

2017 IDs as node attributes. It always includes all regular quanta, and 

2018 does not include init-input or init-output information. 

2019 """ 

2020 if not self.components.thin_graph.quanta: 

2021 if self.header.version > 0: 

2022 self.components.thin_graph = self._read_single_block("thin_graph", PredictedThinGraphModel) 

2023 else: 

2024 self.address_reader.read_all() 

2025 thin_graph_v0 = self._read_single_block("thin_graph", _PredictedThinGraphModelV0) 

2026 self.components.thin_graph = thin_graph_v0._upgraded(self.address_reader.rows) 

2027 

2028 def read_init_quanta(self) -> None: 

2029 """Read the list of special quanta that represent init-inputs and 

2030 init-outputs. 

2031 """ 

2032 if not self.components.init_quanta.root: 

2033 self.components.init_quanta = self._read_single_block("init_quanta", PredictedInitQuantaModel) 

2034 

2035 def read_dimension_data(self) -> None: 

2036 """Read all dimension records. 

2037 

2038 Record data IDs will be immediately deserialized, while other fields 

2039 will be left in serialized form until they are needed. 

2040 """ 

2041 if self.components.dimension_data is None: 

2042 serializable_dimension_data = self._read_single_block("dimension_data", SerializableDimensionData) 

2043 self.components.dimension_data = DimensionDataAttacher( 

2044 deserializers=[ 

2045 DimensionRecordSetDeserializer.from_raw( 

2046 self.components.pipeline_graph.universe[element], serialized_records 

2047 ) 

2048 for element, serialized_records in serializable_dimension_data.root.items() 

2049 ], 

2050 dimensions=DimensionGroup.union( 

2051 *self.components.pipeline_graph.group_by_dimensions(prerequisites=True).keys(), 

2052 universe=self.components.pipeline_graph.universe, 

2053 ), 

2054 ) 

2055 

2056 def read_quantum_datasets(self, quantum_ids: Iterable[uuid.UUID] | None = None) -> None: 

2057 """Read information about all datasets produced and consumed by the 

2058 given quantum IDs. 

2059 

2060 Parameters 

2061 ---------- 

2062 quantum_ids : `~collections.abc.Iterable` [ `uuid.UUID` ], optional 

2063 Iterable of quantum IDs to load. If not provided, all quanta will 

2064 be loaded. The UUIDs of special init quanta will be ignored. 

2065 """ 

2066 quantum_datasets: PredictedQuantumDatasetsModel | None 

2067 if quantum_ids is None: 

2068 if len(self.components.quantum_datasets) != self.header.n_quanta: 

2069 for quantum_datasets in MultiblockReader.read_all_models_in_zip( 

2070 self.zf, 

2071 "quantum_datasets", 

2072 PredictedQuantumDatasetsModel, 

2073 self.decompressor, 

2074 int_size=self.components.header.int_size, 

2075 page_size=self.page_size, 

2076 ): 

2077 self.components.quantum_datasets.setdefault(quantum_datasets.quantum_id, quantum_datasets) 

2078 self.address_reader.read_all() 

2079 return 

2080 with MultiblockReader.open_in_zip( 

2081 self.zf, "quantum_datasets", int_size=self.components.header.int_size 

2082 ) as mb_reader: 

2083 for quantum_id in quantum_ids: 

2084 if quantum_id in self.components.quantum_datasets: 

2085 continue 

2086 address_row = self.address_reader.find(quantum_id) 

2087 quantum_datasets = mb_reader.read_model( 

2088 address_row.addresses[0], PredictedQuantumDatasetsModel, self.decompressor 

2089 ) 

2090 if quantum_datasets is not None: 

2091 self.components.quantum_datasets[address_row.key] = quantum_datasets 

2092 return 

2093 

2094 def read_execution_quanta(self, quantum_ids: Iterable[uuid.UUID] | None = None) -> None: 

2095 """Read all information needed to execute the given quanta. 

2096 

2097 Parameters 

2098 ---------- 

2099 quantum_ids : `~collections.abc.Iterable` [ `uuid.UUID` ], optional 

2100 Iterable of quantum IDs to load. If not provided, all quanta will 

2101 be loaded. The UUIDs of special init quanta will be ignored. 

2102 """ 

2103 self.read_init_quanta() 

2104 self.read_dimension_data() 

2105 self.read_quantum_datasets(quantum_ids)