Coverage for python / lsst / pipe / base / all_dimensions_quantum_graph_builder.py: 16%

539 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-01 08:20 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28"""The standard, general-purpose implementation of the QuantumGraph-generation 

29algorithm. 

30""" 

31 

32from __future__ import annotations 

33 

34__all__ = ("AllDimensionsQuantumGraphBuilder", "DatasetQueryConstraintVariant") 

35 

36import dataclasses 

37import itertools 

38from collections import defaultdict 

39from collections.abc import Callable, Iterable, Mapping 

40from typing import TYPE_CHECKING, Any, final 

41 

42import astropy.table 

43 

44from lsst.daf.butler import ( 

45 Butler, 

46 DataCoordinate, 

47 DimensionElement, 

48 DimensionGroup, 

49 DimensionRecordSet, 

50 MissingDatasetTypeError, 

51 SkyPixDimension, 

52) 

53from lsst.sphgeom import RangeSet 

54from lsst.utils.logging import LsstLogAdapter, PeriodicLogger 

55from lsst.utils.timer import timeMethod 

56 

57from ._datasetQueryConstraints import DatasetQueryConstraintVariant 

58from .quantum_graph_builder import QuantumGraphBuilder, QuantumGraphBuilderError 

59from .quantum_graph_skeleton import DatasetKey, PrerequisiteDatasetKey, QuantumGraphSkeleton, QuantumKey 

60 

61if TYPE_CHECKING: 

62 from .pipeline_graph import DatasetTypeNode, PipelineGraph, TaskNode 

63 

64 

65@final 

66class AllDimensionsQuantumGraphBuilder(QuantumGraphBuilder): 

67 """An implementation of `.quantum_graph_builder.QuantumGraphBuilder` that 

68 uses a single large query for data IDs covering all dimensions in the 

69 pipeline. 

70 

71 Parameters 

72 ---------- 

73 pipeline_graph : `.pipeline_graph.PipelineGraph` 

74 Pipeline to build a `.QuantumGraph` from, as a graph. Will be resolved 

75 in-place with the given butler (any existing resolution is ignored). 

76 butler : `lsst.daf.butler.Butler` 

77 Client for the data repository. Should be read-only. 

78 where : `str`, optional 

79 Butler expression language constraint to apply to all data IDs. 

80 dataset_query_constraint : `DatasetQueryConstraintVariant`, optional 

81 Specification of which overall-input datasets should be used to 

82 constrain the initial data ID queries. Not including an important 

83 constraint can result in catastrophically large query results that take 

84 too long to process, while including too many makes the query much more 

85 complex, increasing the chances that the database will choose a bad 

86 (sometimes catastrophically bad) query plan. 

87 bind : `~collections.abc.Mapping`, optional 

88 Variable substitutions for the ``where`` expression. 

89 data_id_tables : `~collections.abc.Iterable` [ `astropy.table.Table` ],\ 

90 optional 

91 Tables of data IDs to join in as constraints. Missing dimensions that 

92 are constrained by the ``where`` argument or pipeline data ID will be 

93 filled in automatically. 

94 **kwargs 

95 Additional keyword arguments forwarded to 

96 `.quantum_graph_builder.QuantumGraphBuilder`. 

97 

98 Notes 

99 ----- 

100 This is a general-purpose algorithm that delegates the problem of 

101 determining which "end" of the pipeline is more constrained (beginning by 

102 input collection contents vs. end by the ``where`` string) to the database 

103 query planner, which *usually* does a good job. 

104 

105 This algorithm suffers from a serious limitation, which we refer to as the 

106 "tract slicing" problem from its most common variant: the ``where`` string 

107 and general data ID intersection rules apply to *all* data IDs in the 

108 graph. For example, if a ``tract`` constraint is present in the ``where`` 

109 string or an overall-input dataset, then it is impossible for any data ID 

110 that does not overlap that tract to be present anywhere in the pipeline, 

111 such as a ``{visit, detector}`` combination where the ``visit`` overlaps 

112 the ``tract`` even if the ``detector`` does not. 

113 """ 

114 

115 def __init__( 

116 self, 

117 pipeline_graph: PipelineGraph, 

118 butler: Butler, 

119 *, 

120 where: str = "", 

121 dataset_query_constraint: DatasetQueryConstraintVariant = DatasetQueryConstraintVariant.ALL, 

122 bind: Mapping[str, Any] | None = None, 

123 data_id_tables: Iterable[astropy.table.Table] = (), 

124 **kwargs: Any, 

125 ): 

126 super().__init__(pipeline_graph, butler, **kwargs) 

127 assert where is not None, "'where' should be an empty str, not None" 

128 self.where = where 

129 self.dataset_query_constraint = dataset_query_constraint 

130 self.bind = bind 

131 self.data_id_tables = list(data_id_tables) 

132 

133 @timeMethod 

134 def process_subgraph(self, subgraph: PipelineGraph) -> QuantumGraphSkeleton: 

135 # Docstring inherited. 

136 # There is some chance that the dimension query for one subgraph would 

137 # be the same as or a dimension-subset of another. This is an 

138 # optimization opportunity we're not currently taking advantage of. 

139 tree = _DimensionGroupTree(subgraph) 

140 tree.build(self.dataset_query_constraint, self.data_id_tables, log=self.log) 

141 tree.pprint(printer=self.log.debug) 

142 self._query_for_data_ids(tree) 

143 dimension_records = self._fetch_most_dimension_records(tree) 

144 tree.generate_data_ids(self.log) 

145 skeleton: QuantumGraphSkeleton = self._make_subgraph_skeleton(tree) 

146 if not skeleton.has_any_quanta: 

147 # QG is going to be empty; exit early not just for efficiency, but 

148 # also so downstream code doesn't have to guard against this case. 

149 return skeleton 

150 self._find_followup_datasets(tree, skeleton) 

151 all_data_id_dimensions = subgraph.get_all_dimensions() 

152 skeleton.attach_dimension_records(self.butler, all_data_id_dimensions, dimension_records) 

153 return skeleton 

154 

155 def _query_for_data_ids(self, tree: _DimensionGroupTree) -> None: 

156 """Query for data IDs and use the result to populate the dimension 

157 group tree. 

158 

159 Parameters 

160 ---------- 

161 tree : `_DimensionGroupTree` 

162 Tree with dimension group branches that holds subgraph-specific 

163 state for this builder, to be modified in place. 

164 """ 

165 query_cmd: list[str] = [] 

166 with self.butler.query() as query: 

167 query_cmd.append("with butler.query() as query:") 

168 query_cmd.append(f" query = query.join_dimensions({list(tree.queryable_dimensions.names)})") 

169 query = query.join_dimensions(tree.queryable_dimensions) 

170 if tree.dataset_constraint: 

171 query_cmd.append(f" collections = {list(self.input_collections)}") 

172 for dataset_type_name in tree.dataset_constraint: 

173 query_cmd.append(f" query = query.join_dataset_search({dataset_type_name!r}, collections)") 

174 try: 

175 query = query.join_dataset_search(dataset_type_name, self.input_collections) 

176 except MissingDatasetTypeError: 

177 raise QuantumGraphBuilderError( 

178 f"No datasets for overall-input {dataset_type_name!r} found (the dataset type is " 

179 "not even registered). This is probably a bug in either the pipeline definition or " 

180 "the dataset constraints passed to the quantum graph builder." 

181 ) from None 

182 query_cmd.append( 

183 f" query = query.where({dict(tree.subgraph.data_id.mapping)}, " 

184 f"{self.where!r}, bind={self.bind!r})" 

185 ) 

186 query = query.where(tree.subgraph.data_id, self.where, bind=self.bind) 

187 # It's important for tables to be joined in last, so data IDs from 

188 # pipeline and where can be used to fill in missing columns. 

189 for table in self.data_id_tables: 

190 # If this is from ctrl_mpexec's pipetask, it'll have added 

191 # a filename to the metadata for us. 

192 table_name = table.meta.get("filename", "unknown") 

193 query_cmd.append(f" query = query.join_data_coordinate_table(<{table_name}>)") 

194 query = query.join_data_coordinate_table(table) 

195 self.log.verbose("Querying for data IDs via: %s", "\n".join(query_cmd)) 

196 # Allow duplicates from common skypix overlaps to make some queries 

197 # run faster. 

198 query._allow_duplicate_overlaps = True 

199 # Iterate over query results, populating data IDs for datasets, 

200 # quanta, and edges. We populate only the first level of the tree 

201 # in the first pass, so we can be done with the query results as 

202 # quickly as possible in case that holds a connection/cursor open. 

203 n_rows = 0 

204 progress_logger: PeriodicLogger | None = None 

205 for common_data_id in query.data_ids(tree.queryable_dimensions): 

206 if progress_logger is None: 

207 # There can be a long wait between submitting the query and 

208 # returning the first row, so we want to make sure we log 

209 # when we get it; note that PeriodicLogger is not going to 

210 # do that for us, as it waits for its interval _after_ the 

211 # first log is seen. 

212 self.log.info("Iterating over data ID query results.") 

213 progress_logger = PeriodicLogger(self.log) 

214 for branch_dimensions, branch in tree.queryable_branches.items(): 

215 data_id = common_data_id.subset(branch_dimensions) 

216 branch.data_ids.add(data_id) 

217 n_rows += 1 

218 progress_logger.log("Iterating over data ID query results: %d rows processed so far.", n_rows) 

219 if n_rows == 0: 

220 # A single multiline log plays better with log aggregators like 

221 # Loki. 

222 lines = ["Initial data ID query returned no rows, so QuantumGraph will be empty."] 

223 try: 

224 lines.extend(query.explain_no_results()) 

225 finally: 

226 lines.append("To reproduce this query for debugging purposes, run:") 

227 lines.append("") 

228 lines.extend(query_cmd) 

229 lines.append(" print(query.any())") 

230 lines.append("") 

231 lines.append("And then try removing various constraints until query.any() returns True.") 

232 # If an exception was raised, write a partial. 

233 self.log.error("\n".join(lines)) 

234 return 

235 self.log.verbose("Done iterating over query results: %d rows processed in total.", n_rows) 

236 # We now recursively populate the data IDs of the rest of the tree. 

237 tree.project_data_ids(self.log) 

238 

239 @timeMethod 

240 def _make_subgraph_skeleton(self, tree: _DimensionGroupTree) -> QuantumGraphSkeleton: 

241 """Build a `QuantumGraphSkeleton` by processing the data IDs in the 

242 dimension group tree. 

243 

244 Parameters 

245 ---------- 

246 tree : `_DimensionGroupTree` 

247 Tree with dimension group branches that holds subgraph-specific 

248 state for this builder. 

249 

250 Returns 

251 ------- 

252 skeleton : `QuantumGraphSkeleton` 

253 Preliminary quantum graph. 

254 """ 

255 skeleton = QuantumGraphSkeleton(tree.subgraph.tasks) 

256 for branch_dimensions, branch in tree.branches_by_dimensions.items(): 

257 self.log.verbose( 

258 "Adding nodes for %s %s data ID(s).", 

259 len(branch.data_ids), 

260 branch_dimensions, 

261 ) 

262 branch.update_skeleton_nodes(skeleton) 

263 for branch_dimensions, branch in tree.branches_by_dimensions.items(): 

264 self.log.verbose( 

265 "Adding edges for %s %s data ID(s).", 

266 len(branch.data_ids), 

267 branch_dimensions, 

268 ) 

269 branch.update_skeleton_edges(skeleton) 

270 n_quanta = sum(len(skeleton.get_quanta(task_label)) for task_label in tree.subgraph.tasks) 

271 self.log.info( 

272 "Initial bipartite graph has %d quanta, %d dataset nodes, and %d edges.", 

273 n_quanta, 

274 skeleton.n_nodes - n_quanta, 

275 skeleton.n_edges, 

276 ) 

277 return skeleton 

278 

279 @timeMethod 

280 def _find_followup_datasets(self, tree: _DimensionGroupTree, skeleton: QuantumGraphSkeleton) -> None: 

281 """Populate `existing_datasets` by performing follow-up queries with 

282 the data IDs in the dimension group tree. 

283 

284 Parameters 

285 ---------- 

286 tree : `_DimensionGroupTree` 

287 Tree with dimension group branches that holds subgraph-specific 

288 state for this builder. 

289 skeleton : `.quantum_graph_skeleton.QuantumGraphSkeleton` 

290 In-progress quantum graph to modify in place. 

291 """ 

292 dataset_key: DatasetKey | PrerequisiteDatasetKey 

293 for dimensions, branch in tree.branches_by_dimensions.items(): 

294 if not dimensions: 

295 for dataset_type_name in branch.dataset_types.keys(): 

296 dataset_key = DatasetKey(dataset_type_name, self.empty_data_id.required_values) 

297 if ref := self.empty_dimensions_datasets.inputs.get(dataset_key): 

298 skeleton.set_dataset_ref(ref, dataset_key) 

299 if ref := self.empty_dimensions_datasets.outputs_for_skip.get(dataset_key): 

300 skeleton.set_output_for_skip(ref) 

301 if ref := self.empty_dimensions_datasets.outputs_in_the_way.get(dataset_key): 

302 skeleton.set_output_in_the_way(ref) 

303 continue 

304 if not branch.dataset_types and not branch.tasks: 

305 continue 

306 if not branch.data_ids: 

307 continue 

308 # Iterate over regular input/output dataset type nodes with these 

309 # dimensions to find those datasets using followup queries. 

310 with self.butler.query() as butler_query: 

311 butler_query = butler_query.join_data_coordinates(branch.data_ids) 

312 for dataset_type_node in branch.dataset_types.values(): 

313 if tree.subgraph.producer_of(dataset_type_node.name) is None: 

314 # Dataset type is an overall input; we always need to 

315 # try to find these. 

316 count = 0 

317 try: 

318 for ref in butler_query.datasets(dataset_type_node.name, self.input_collections): 

319 skeleton.set_dataset_ref(ref) 

320 count += 1 

321 except MissingDatasetTypeError: 

322 pass 

323 self.log.verbose( 

324 "Found %d overall-input dataset(s) of type %r.", count, dataset_type_node.name 

325 ) 

326 continue 

327 if self.skip_existing_in: 

328 # Dataset type is an intermediate or output; need to 

329 # find these if only they're from previously executed 

330 # quanta that we might skip... 

331 count = 0 

332 try: 

333 for ref in butler_query.datasets(dataset_type_node.name, self.skip_existing_in): 

334 skeleton.set_output_for_skip(ref) 

335 count += 1 

336 if ref.run == self.output_run: 

337 skeleton.set_output_in_the_way(ref) 

338 except MissingDatasetTypeError: 

339 pass 

340 self.log.verbose( 

341 "Found %d output dataset(s) of type %r in %s.", 

342 count, 

343 dataset_type_node.name, 

344 self.skip_existing_in, 

345 ) 

346 if self.output_run_exists and not self.skip_existing_starts_with_output_run: 

347 # ...or if they're in the way and would need to be 

348 # clobbered (and we haven't already found them in the 

349 # previous block). 

350 count = 0 

351 try: 

352 for ref in butler_query.datasets(dataset_type_node.name, [self.output_run]): 

353 skeleton.set_output_in_the_way(ref) 

354 count += 1 

355 except MissingDatasetTypeError: 

356 pass 

357 self.log.verbose( 

358 "Found %d output dataset(s) of type %r in %s.", 

359 count, 

360 dataset_type_node.name, 

361 self.output_run, 

362 ) 

363 # Iterate over tasks with these dimensions to perform follow-up 

364 # queries for prerequisite inputs, which may have dimensions 

365 # that were not in ``tree.all_dimensions`` and/or require 

366 # temporal joins to calibration validity ranges. 

367 for task_node in branch.tasks.values(): 

368 task_prerequisite_info = self.prerequisite_info[task_node.label] 

369 for connection_name, finder in list(task_prerequisite_info.finders.items()): 

370 if finder.lookup_function is not None: 

371 self.log.verbose( 

372 "Deferring prerequisite input %r of task %r to per-quantum processing " 

373 "(lookup function provided).", 

374 finder.dataset_type_node.name, 

375 task_node.label, 

376 ) 

377 continue 

378 # We also fall back to the base class if there is a 

379 # nontrivial spatial or temporal join in the lookup. 

380 if finder.dataset_skypix or finder.dataset_other_spatial: 

381 if task_prerequisite_info.bounds.spatial_connections: 

382 self.log.verbose( 

383 "Deferring prerequisite input %r of task %r to per-quantum processing " 

384 "(for spatial-bounds-connections handling).", 

385 finder.dataset_type_node.name, 

386 task_node.label, 

387 ) 

388 continue 

389 if not task_node.dimensions.spatial: 

390 self.log.verbose( 

391 "Deferring prerequisite input %r of task %r to per-quantum processing " 

392 "(dataset has spatial data IDs, but task does not).", 

393 finder.dataset_type_node.name, 

394 task_node.label, 

395 ) 

396 continue 

397 if finder.dataset_has_timespan: 

398 if task_prerequisite_info.bounds.spatial_connections: 

399 self.log.verbose( 

400 "Deferring prerequisite input %r of task %r to per-quantum processing " 

401 "(for temporal-bounds-connections handling).", 

402 finder.dataset_type_node.name, 

403 task_node.label, 

404 ) 

405 continue 

406 if not task_node.dimensions.temporal: 

407 self.log.verbose( 

408 "Deferring prerequisite input %r of task %r to per-quantum processing " 

409 "(dataset has temporal data IDs, but task does not).", 

410 finder.dataset_type_node.name, 

411 task_node.label, 

412 ) 

413 continue 

414 # We have a simple case where we can do a single query 

415 # that joins the query we already have for the task 

416 # data IDs to the datasets we're looking for. 

417 count = 0 

418 try: 

419 query_results = list( 

420 butler_query.join_dataset_search( 

421 finder.dataset_type_node.dataset_type, self.input_collections 

422 ) 

423 .general( 

424 dimensions | finder.dataset_type_node.dataset_type.dimensions, 

425 dataset_fields={finder.dataset_type_node.name: ...}, 

426 find_first=True, 

427 ) 

428 .iter_tuples(finder.dataset_type_node.dataset_type) 

429 ) 

430 except MissingDatasetTypeError: 

431 query_results = [] 

432 for data_id, refs, _ in query_results: 

433 ref = refs[0] 

434 dataset_key = skeleton.add_prerequisite_node(ref) 

435 quantum_key = QuantumKey( 

436 task_node.label, data_id.subset(dimensions).required_values 

437 ) 

438 skeleton.add_input_edge(quantum_key, dataset_key) 

439 count += 1 

440 # Remove this finder from the mapping so the base class 

441 # knows it doesn't have to look for these 

442 # prerequisites. 

443 del task_prerequisite_info.finders[connection_name] 

444 self.log.verbose( 

445 "Added %d prerequisite input edge(s) from dataset type %r to task %r.", 

446 count, 

447 finder.dataset_type_node.name, 

448 task_node.label, 

449 ) 

450 # Delete data ID sets we don't need anymore to save memory. 

451 del branch.data_ids 

452 

453 @timeMethod 

454 def _fetch_most_dimension_records(self, tree: _DimensionGroupTree) -> list[DimensionRecordSet]: 

455 """Query for dimension records for all non-prerequisite data IDs (and 

456 possibly some prerequisite data IDs). 

457 

458 Parameters 

459 ---------- 

460 tree : `_DimensionGroupTree` 

461 Tree with dimension group branches that holds subgraph-specific 

462 state for this builder. 

463 

464 Returns 

465 ------- 

466 dimension_records : `list` [ `lsst.daf.butler.DimensionRecordSet` ] 

467 List of sets of dimension records. 

468 

469 Notes 

470 ----- 

471 Because the initial common data ID query is used to generate all 

472 quantum and regular input/output dataset data IDs, column subsets of it 

473 can also be used to fetch dimension records for those data IDs. 

474 """ 

475 self.log.verbose("Performing follow-up queries for dimension records.") 

476 result: list[DimensionRecordSet] = [] 

477 for branch in tree.branches_by_dimensions.values(): 

478 if not branch.dimension_records: 

479 continue 

480 if not branch.data_ids: 

481 continue 

482 with self.butler.query() as butler_query: 

483 butler_query = butler_query.join_data_coordinates(branch.data_ids) 

484 for record_set in branch.dimension_records: 

485 record_set.update(butler_query.dimension_records(record_set.element.name)) 

486 result.append(record_set) 

487 return result 

488 

489 

490@dataclasses.dataclass(eq=False, repr=False, slots=True) 

491class _DimensionGroupTwig: 

492 """A small side-branch of the tree of dimensions groups that tracks the 

493 tasks and dataset types with a particular set of dimensions that appear in 

494 the edges populated by its parent branch. 

495 

496 See `_DimensionGroupTree` for more details. 

497 """ 

498 

499 parent_edge_tasks: set[str] = dataclasses.field(default_factory=set) 

500 """Task labels for tasks whose quanta have the dimensions of this twig and 

501 are endpoints of edges that have the combined dimensions of this twig's 

502 parent branch. 

503 """ 

504 

505 parent_edge_dataset_types: set[str] = dataclasses.field(default_factory=set) 

506 """Dataset type names for datasets whose quanta have the dimensions of this 

507 twig and are endpoints of edges that have the combined dimensions of this 

508 twig's parent branch. 

509 """ 

510 

511 

512@dataclasses.dataclass(eq=False, repr=False, slots=True) 

513class _DimensionGroupBranch: 

514 """A node in the tree of dimension groups that are used to recursively 

515 process query data IDs into a quantum graph. 

516 """ 

517 

518 tasks: dict[str, TaskNode] = dataclasses.field(default_factory=dict) 

519 """The task nodes whose quanta have these dimensions, keyed by task label. 

520 """ 

521 

522 dataset_types: dict[str, DatasetTypeNode] = dataclasses.field(default_factory=dict) 

523 """The dataset type nodes whose datasets have these dimensions, keyed by 

524 dataset type name. 

525 """ 

526 

527 dimension_records: list[DimensionRecordSet] = dataclasses.field(default_factory=list) 

528 """Sets of dimension records looked up with these dimensions.""" 

529 

530 data_ids: set[DataCoordinate] = dataclasses.field(default_factory=set) 

531 """All data IDs with these dimensions seen in the QuantumGraph.""" 

532 

533 input_edges: list[tuple[str, str]] = dataclasses.field(default_factory=list) 

534 """Dataset type -> task edges that are populated by this set of dimensions. 

535 

536 These are cases where `dimensions` is the union of the task and dataset 

537 type dimensions. 

538 """ 

539 

540 output_edges: list[tuple[str, str]] = dataclasses.field(default_factory=list) 

541 """Task -> dataset type edges that are populated by this set of dimensions. 

542 

543 These are cases where `dimensions` is the union of the task and dataset 

544 type dimensions. 

545 """ 

546 

547 branches: dict[DimensionGroup, _DimensionGroupBranch] = dataclasses.field(default_factory=dict) 

548 """Child branches whose dimensions are strict subsets of this branch's 

549 dimensions, populated by projecting this branch's set of data IDs (i.e. 

550 remove a dimension, then deduplicate). 

551 """ 

552 

553 twigs: defaultdict[DimensionGroup, _DimensionGroupTwig] = dataclasses.field( 

554 default_factory=lambda: defaultdict(_DimensionGroupTwig) 

555 ) 

556 """Small branches for all of the dimensions that appear on one side of any 

557 edge in `input_edges` or `output_edges`. 

558 """ 

559 

560 def pprint( 

561 self, 

562 dimensions: DimensionGroup, 

563 indent: str = " ", 

564 suffix: str = "", 

565 printer: Callable[[str], None] = print, 

566 ) -> None: 

567 printer(f"{indent}{dimensions}{suffix}") 

568 for branch_dimensions, branch in self.branches.items(): 

569 branch.pprint(branch_dimensions, indent + " ", printer=printer) 

570 

571 def project_data_ids(self, log: LsstLogAdapter, log_indent: str = " ") -> None: 

572 """Populate the data ID sets of child branches from the data IDs in 

573 this branch, recursively. 

574 

575 Parameters 

576 ---------- 

577 log : `lsst.utils.logging.LsstLogAdapter` 

578 Logger to use for status reporting. 

579 log_indent : `str`, optional 

580 Indentation to prefix the log message. This is used when recursing 

581 to make the branch structure clear. 

582 """ 

583 for data_id in self.data_ids: 

584 for branch_dimensions, branch in self.branches.items(): 

585 branch.data_ids.add(data_id.subset(branch_dimensions)) 

586 for branch_dimensions, branch in self.branches.items(): 

587 log.verbose("%sProjecting query data ID(s) to %s.", log_indent, branch_dimensions) 

588 branch.project_data_ids(log, log_indent + " ") 

589 

590 def update_skeleton_nodes(self, skeleton: QuantumGraphSkeleton) -> None: 

591 """Process the data ID sets of this branch and its children recursively 

592 to add nodes and edges to the under-construction quantum graph. 

593 

594 Parameters 

595 ---------- 

596 skeleton : `QuantumGraphSkeleton` 

597 Under-construction quantum graph to modify in place. 

598 """ 

599 for data_id in self.data_ids: 

600 for task_label in self.tasks: 

601 skeleton.add_quantum_node(task_label, data_id) 

602 for dataset_type_name in self.dataset_types: 

603 skeleton.add_dataset_node(dataset_type_name, data_id) 

604 

605 def update_skeleton_edges(self, skeleton: QuantumGraphSkeleton) -> None: 

606 """Process the data ID sets of this branch and its children recursively 

607 to add nodes and edges to the under-construction quantum graph. 

608 

609 Parameters 

610 ---------- 

611 skeleton : `QuantumGraphSkeleton` 

612 Under-construction quantum graph to modify in place. 

613 """ 

614 for data_id in self.data_ids: 

615 quantum_keys: dict[str, QuantumKey] = {} 

616 dataset_keys: dict[str, DatasetKey] = {} 

617 for twig_dimensions, twig in self.twigs.items(): 

618 twig_data_id = data_id.subset(twig_dimensions) 

619 for task_label in twig.parent_edge_tasks: 

620 quantum_keys[task_label] = QuantumKey(task_label, twig_data_id.required_values) 

621 for dataset_type_name in twig.parent_edge_dataset_types: 

622 dataset_keys[dataset_type_name] = DatasetKey( 

623 dataset_type_name, twig_data_id.required_values 

624 ) 

625 for dataset_type_name, task_label in self.input_edges: 

626 skeleton.add_input_edge(quantum_keys[task_label], dataset_keys[dataset_type_name]) 

627 for task_label, dataset_type_name in self.output_edges: 

628 skeleton.add_output_edge(quantum_keys[task_label], dataset_keys[dataset_type_name]) 

629 if not self.dataset_types and not self.tasks: 

630 # Delete data IDs we don't need anymore to save memory. 

631 del self.data_ids 

632 

633 

634@dataclasses.dataclass(eq=False, repr=False) 

635class _DimensionGroupTree: 

636 """A tree of dimension groups in which branches are subsets of their 

637 parents. 

638 

639 This class holds all of the per-subgraph state for this QG builder 

640 subclass. 

641 

642 Notes 

643 ----- 

644 The full set of dimensions referenced by any task or dataset type (except 

645 prerequisite inputs) forms the conceptual "trunk" of this tree. Each 

646 branch has a subset of the dimensions of its parent branch, and each set 

647 of dimensions appears exactly once in a tree (so there is some flexibility 

648 in where certain dimension subsets may appear; right now this is resolved 

649 somewhat arbitrarily). 

650 We do not add branches for every possible dimension subset; a branch is 

651 created for a `~lsst.daf.butler.DimensionGroup` if: 

652 

653 - if there is a task whose quanta have those dimensions; 

654 - if there is a non-prerequisite dataset type with those dimensions; 

655 - if there is an edge for which the union of the task and dataset type 

656 dimensions are those dimensions; 

657 - if there is a dimension element in any task or non-prerequisite dataset 

658 type dimensions whose `~lsst.daf.butler.DimensionElement.minimal_group` 

659 is those dimensions (allowing us to look up dimension records). 

660 

661 In addition, for any dimension group that has unqueryable dimensions (e.g. 

662 non-common skypix dimensions, like healpix), we create a branch for the 

663 subset of the group with only queryable dimensions. 

664 

665 We process the initial data query by recursing through this tree structure 

666 to populate a data ID set for each branch 

667 (`_DimensionGroupBranch.project_data_ids`), and then process those sets. 

668 This can be far faster than the non-recursive processing the QG builder 

669 used to use because the set of data IDs is smaller (sometimes dramatically 

670 smaller) as we move to smaller sets of dimensions. 

671 

672 In addition to their child branches, a branch that is used to define graph 

673 edges also has "twigs", which are a flatter set of dimension subsets for 

674 each of the tasks and dataset types that appear in that branch's edges. 

675 The same twig dimensions can appear in multiple branches, and twig 

676 dimensions can be the same as their parent branch's (but not a superset). 

677 """ 

678 

679 subgraph: PipelineGraph 

680 """Graph of this subset of the pipeline.""" 

681 

682 all_dimensions: DimensionGroup = dataclasses.field(init=False) 

683 """The union of all dimensions that appear in any task or 

684 (non-prerequisite) dataset type in this subgraph. 

685 """ 

686 

687 queryable_dimensions: DimensionGroup = dataclasses.field(init=False) 

688 """All dimensions except those that cannot be queried for directly via the 

689 butler (e.g. skypix systems other than the common one). 

690 """ 

691 

692 branches_by_dimensions: dict[DimensionGroup, _DimensionGroupBranch] = dataclasses.field(init=False) 

693 """The tasks and dataset types of this subset of the pipeline, grouped 

694 by their dimensions. 

695 """ 

696 

697 dataset_constraint: set[str] = dataclasses.field(default_factory=set) 

698 """The names of dataset types used as query constraints.""" 

699 

700 queryable_branches: dict[DimensionGroup, _DimensionGroupBranch] = dataclasses.field(default_factory=dict) 

701 """The top-level branches in the tree of dimension groups populated by the 

702 butler query. 

703 

704 Data IDs in these branches are populated from the top down, with each 

705 branch a projection ("remove dimension, then deduplicate") of its parent, 

706 starting with the query result rows. 

707 """ 

708 

709 generators: list[DataIdGenerator] = dataclasses.field(default_factory=list) 

710 """Branches for dimensions groups that are populated by algorithmically 

711 generating data IDs from those in one or more other branches. 

712 

713 These are typically variants on the theme of adding a skypix dimension to 

714 another set of dimensions by identifying the sky pixels that overlap the 

715 region of the original dimensions. 

716 """ 

717 

718 def __post_init__(self) -> None: 

719 universe = self.subgraph.universe 

720 assert universe is not None, "Pipeline graph is resolved." 

721 self.branches_by_dimensions = { 

722 dimensions: _DimensionGroupBranch(tasks, dataset_types) 

723 for dimensions, (tasks, dataset_types) in self.subgraph.group_by_dimensions().items() 

724 } 

725 self.all_dimensions = DimensionGroup.union(*self.branches_by_dimensions.keys(), universe=universe) 

726 

727 def build( 

728 self, 

729 requested: DatasetQueryConstraintVariant, 

730 data_id_tables: Iterable[astropy.table.Table], 

731 *, 

732 log: LsstLogAdapter, 

733 ) -> None: 

734 """Organize the branches into a tree. 

735 

736 Parameters 

737 ---------- 

738 requested : `DatasetQueryConstraintVariant` 

739 Query constraint specified by the user. 

740 data_id_tables : `~collections.abc.Iterable` [ `astropy.table.Table` ] 

741 Data ID tables being joined into the query. 

742 log : `lsst.utils.logging.LsstLogAdapter` 

743 Logger that supports ``verbose`` output. 

744 """ 

745 universe = self.all_dimensions.universe 

746 self._make_dimension_record_branches() 

747 self._make_edge_branches() 

748 self._set_dataset_constraint(requested, log) 

749 # Work out which dimensions we can potentially query the database for. 

750 # We start out by dropping all skypix dimensions other than the common 

751 # one, and then we add them back in if a constraint dataset type or 

752 # data ID table provides them. 

753 unqueryable_skypix = universe.conform(self.all_dimensions.skypix - {universe.commonSkyPix.name}) 

754 self.queryable_dimensions = self.all_dimensions.difference(unqueryable_skypix) 

755 for dataset_type_name in sorted(self.dataset_constraint): 

756 dataset_type_dimensions = self.subgraph.dataset_types[dataset_type_name].dimensions 

757 dataset_type_skypix = dataset_type_dimensions.intersection(unqueryable_skypix) 

758 if dataset_type_skypix: 

759 log.info( 

760 f"Including {dataset_type_skypix} in the set of dimensions to query via " 

761 f"{dataset_type_name}. If this query fails, exclude those dataset type " 

762 "from the constraint or provide a data ID table for missing spatial joins." 

763 ) 

764 self.queryable_dimensions = self.queryable_dimensions.union(dataset_type_dimensions) 

765 for data_id_table in data_id_tables: 

766 table_dimensions = universe.conform(data_id_table.colnames) 

767 if table_dimensions.skypix: 

768 self.queryable_dimensions = self.queryable_dimensions.union(table_dimensions) 

769 # Set up the tree to generate most data IDs by querying for them from 

770 # the database and then projecting to subset dimensions. 

771 branches_not_in_tree = set(self.branches_by_dimensions.keys()) 

772 self._make_queryable_branch_tree(branches_not_in_tree) 

773 # Try to find ways to generate other data IDs directly from the 

774 # queryable branches. 

775 self._make_queryable_overlap_branch_generators(branches_not_in_tree) 

776 # As long as there are still branches that haven't been inserted into 

777 # the tree, try to add them as projections of generated branches or 

778 # generators on generated branches. 

779 while branches_not_in_tree: 

780 # Look for projections first, since those are more efficient, and 

781 # some may be available after we've added some generators. 

782 # We intentionally add the same branch as a projection of multiple 

783 # parents since (unlike queryable dimensions) there's no guarantee 

784 # that each parent branch's data IDs would project to the same set 

785 # (e.g. a visit-healpix overlap may yield different healpixels than 

786 # a patch-healpix overlap, even if the visits and patches overlap). 

787 for target_dimensions in sorted(branches_not_in_tree): 

788 for generator in self.generators: 

789 if self._maybe_insert_projection_branch( 

790 target_dimensions, generator.dimensions, generator.branch.branches 

791 ): 

792 branches_not_in_tree.discard(target_dimensions) 

793 if not self._make_general_overlap_branch_generator(branches_not_in_tree): 

794 break 

795 # After we've exhausted overlap generation, try generation via joins 

796 # of dimensions we can already query for or generate. 

797 while branches_not_in_tree: 

798 if not self._make_join_branch_generator(branches_not_in_tree): 

799 raise QuantumGraphBuilderError(f"Could not generate data IDs for {branches_not_in_tree}.") 

800 

801 def _set_dataset_constraint(self, requested: DatasetQueryConstraintVariant, log: LsstLogAdapter) -> None: 

802 """Set the dataset query constraint. 

803 

804 Parameters 

805 ---------- 

806 requested : `DatasetQueryConstraintVariant` 

807 Query constraint specified by the user. 

808 log : `lsst.utils.logging.LsstLogAdapter` 

809 Logger that supports ``verbose`` output. 

810 """ 

811 overall_inputs: dict[str, DatasetTypeNode] = { 

812 name: node # type: ignore 

813 for name, node in self.subgraph.iter_overall_inputs() 

814 if not node.is_prerequisite # type: ignore 

815 } 

816 match requested: 

817 case DatasetQueryConstraintVariant.ALL: 

818 self.dataset_constraint = { 

819 name 

820 for name, dataset_type_node in overall_inputs.items() 

821 if (dataset_type_node.is_initial_query_constraint and dataset_type_node.dimensions) 

822 } 

823 case DatasetQueryConstraintVariant.OFF: 

824 pass 

825 case DatasetQueryConstraintVariant.LIST: 

826 self.dataset_constraint = set(requested) 

827 inputs = { 

828 name for name, dataset_type_node in overall_inputs.items() if dataset_type_node.dimensions 

829 } 

830 if remainder := self.dataset_constraint.difference(inputs): 

831 log.verbose( 

832 "Ignoring dataset types %s in dataset query constraint that are not inputs to this " 

833 "subgraph, on the assumption that they are relevant for a different subgraph.", 

834 remainder, 

835 ) 

836 self.dataset_constraint.intersection_update(inputs) 

837 if not self.dataset_constraint: 

838 raise QuantumGraphBuilderError( 

839 "An explicit dataset query constraint was provided, but it does not include any " 

840 f"inputs to the pipeline subset with tasks {list(self.subgraph.tasks.keys())}." 

841 ) 

842 case _: 

843 raise QuantumGraphBuilderError( 

844 f"Unable to handle type {requested} given as dataset query constraint." 

845 ) 

846 

847 def _make_dimension_record_branches(self) -> None: 

848 """Ensure we have branches for all dimension elements we'll need to 

849 fetch dimension records for. 

850 """ 

851 for element_name in self.all_dimensions.elements: 

852 element = self.all_dimensions.universe[element_name] 

853 record_set = DimensionRecordSet(element_name, universe=self.all_dimensions.universe) 

854 if element.minimal_group in self.branches_by_dimensions: 

855 self.branches_by_dimensions[element.minimal_group].dimension_records.append(record_set) 

856 else: 

857 self.branches_by_dimensions[element.minimal_group] = _DimensionGroupBranch( 

858 dimension_records=[record_set] 

859 ) 

860 

861 def _make_edge_branches(self) -> None: 

862 """Ensure we have branches for all edges in the graph.""" 

863 

864 def update_edge_branch( 

865 task_node: TaskNode, dataset_type_node: DatasetTypeNode 

866 ) -> _DimensionGroupBranch: 

867 union_dimensions = task_node.dimensions.union(dataset_type_node.dimensions) 

868 if (branch := self.branches_by_dimensions.get(union_dimensions)) is None: 

869 branch = _DimensionGroupBranch() 

870 self.branches_by_dimensions[union_dimensions] = branch 

871 branch.twigs[dataset_type_node.dimensions].parent_edge_dataset_types.add(dataset_type_node.name) 

872 branch.twigs[task_node.dimensions].parent_edge_tasks.add(task_node.label) 

873 return branch 

874 

875 for task_node in self.subgraph.tasks.values(): 

876 for dataset_type_node in self.subgraph.inputs_of(task_node.label).values(): 

877 assert dataset_type_node is not None, "Pipeline graph is resolved." 

878 if dataset_type_node.is_prerequisite: 

879 continue 

880 branch = update_edge_branch(task_node, dataset_type_node) 

881 branch.input_edges.append((dataset_type_node.name, task_node.label)) 

882 for dataset_type_node in self.subgraph.outputs_of(task_node.label).values(): 

883 assert dataset_type_node is not None, "Pipeline graph is resolved." 

884 branch = update_edge_branch(task_node, dataset_type_node) 

885 branch.output_edges.append((task_node.label, dataset_type_node.name)) 

886 

887 def _make_queryable_branch_tree(self, branches_not_in_tree: set[DimensionGroup]) -> None: 

888 """Assemble the branches with queryable dimensions into a tree, in 

889 which each branch has a subset of the dimensions of its parent. 

890 

891 Parameters 

892 ---------- 

893 branches_not_in_tree : `set` [ `lsst.daf.butler.DimensionGroup` ] 

894 Dimensions that have not yet been inserted into the tree. Updated 

895 in place. 

896 """ 

897 for target_dimensions in sorted(branches_not_in_tree): 

898 if target_dimensions.issubset(self.queryable_dimensions): 

899 if self._maybe_insert_projection_branch( 

900 target_dimensions, self.queryable_dimensions, self.queryable_branches 

901 ): 

902 branches_not_in_tree.remove(target_dimensions) 

903 else: 

904 raise AssertionError( 

905 "Projection-branch insertion should not fail for queryable dimensions." 

906 ) 

907 

908 def _maybe_insert_projection_branch( 

909 self, 

910 target_dimensions: DimensionGroup, 

911 candidate_dimensions: DimensionGroup, 

912 candidate_projection_branches: dict[DimensionGroup, _DimensionGroupBranch], 

913 ) -> bool: 

914 """Insert a branch at the appropriate location in a [sub]tree. 

915 

916 Branches are inserted below the first parent branch whose dimensions 

917 are a superset of their own. 

918 

919 Parameters 

920 ---------- 

921 target_dimensions : `lsst.daf.butler.DimensionGroup` 

922 Dimensions of the branch to be inserted. 

923 candidate_dimensions : `lsst.daf.butler.DimensionGroup` 

924 Dimensions of the subtree the branch might be inserted under. If 

925 this is not a superset of ``target_dimensions``, this method 

926 returns `False` and nothing is done. 

927 candidate_projection_branches : `dict` [ \ 

928 `lsst.daf.butler.DimensionGroup`, `_DimensionGroupBranch` ] 

929 Subtree branches to be updated directly or indirectly (i.e. in a 

930 nested branch). 

931 

932 Returns 

933 ------- 

934 inserted : `bool` 

935 Whether the branch was actually inserted. 

936 """ 

937 if candidate_dimensions >= target_dimensions: 

938 target_branch = self.branches_by_dimensions[target_dimensions] 

939 for child_dimensions in list(candidate_projection_branches.keys()): 

940 if self._maybe_insert_projection_branch( 

941 child_dimensions, target_dimensions, target_branch.branches 

942 ): 

943 del candidate_projection_branches[child_dimensions] 

944 for child_dimensions, child_branch in candidate_projection_branches.items(): 

945 if self._maybe_insert_projection_branch( 

946 target_dimensions, child_dimensions, child_branch.branches 

947 ): 

948 return True 

949 candidate_projection_branches[target_dimensions] = target_branch 

950 return True 

951 return False 

952 

953 def _make_queryable_overlap_branch_generators(self, branches_not_in_tree: set[DimensionGroup]) -> None: 

954 """Add data ID generators for sets of dimensions that can only 

955 partially queried for, with the rest needing to be generated by 

956 manipulating the data IDs of the queryable subset. 

957 

958 Parameters 

959 ---------- 

960 branches_not_in_tree : `set` [ `lsst.daf.butler.DimensionGroup` ] 

961 Dimensions that have not yet been inserted into the tree. Updated 

962 in place. 

963 """ 

964 for target_dimensions in sorted(branches_not_in_tree): 

965 queryable_subset_dimensions = target_dimensions.intersection(self.queryable_dimensions) 

966 # Make sure we actually have a branch to capture the queryable 

967 # subset data IDs (i.e. in case we didn't already have one for some 

968 # dataset type or task, etc). 

969 if queryable_subset_dimensions not in self.branches_by_dimensions: 

970 # If we have to make a new queryable branch, we also have to 

971 # insert it into the tree so its data IDs get populated. 

972 self.branches_by_dimensions[queryable_subset_dimensions] = _DimensionGroupBranch() 

973 if not self._maybe_insert_projection_branch( 

974 queryable_subset_dimensions, 

975 self.queryable_dimensions, 

976 self.queryable_branches, 

977 ): 

978 raise AssertionError( 

979 "Projection-branch insertion should not fail for queryable dimensions." 

980 ) 

981 if queryable_region_name := queryable_subset_dimensions.region_dimension: 

982 # If there is a single well-defined region for the queryable 

983 # subset, we can potentially generate skypix IDs from it. 

984 # Do the target dimensions just add a single skypix dimension 

985 # to the queryable subset? 

986 remainder_dimensions = target_dimensions - queryable_subset_dimensions 

987 if (remainder_skypix := get_single_skypix(remainder_dimensions)) is not None: 

988 queryable_region_element = target_dimensions.universe[queryable_region_name] 

989 self._append_data_id_generator( 

990 queryable_subset_dimensions, 

991 queryable_region_element, 

992 target_dimensions, 

993 remainder_skypix, 

994 branches_not_in_tree, 

995 ) 

996 

997 def _append_data_id_generator( 

998 self, 

999 source_dimensions: DimensionGroup, 

1000 source_region_element: DimensionElement, 

1001 target_dimensions: DimensionGroup, 

1002 remainder_skypix: SkyPixDimension, 

1003 branches_not_in_tree: set[DimensionGroup], 

1004 ) -> None: 

1005 """Append an appropriate `DataIdGenerator` instance for generating 

1006 data IDs with the given characteristics. 

1007 

1008 Parameters 

1009 ---------- 

1010 source_dimensions : `lsst.daf.butler.DimensionGroup` 

1011 Dimensions whose data IDs can already populated, to use as a 

1012 starting point. 

1013 source_region_element : `lsst.daf.butler.DimensionElement` 

1014 Dimension element associated with the region for the source 

1015 dimensions. It is guaranteed that there is exactly one such 

1016 region. 

1017 target_dimensions : `lsst.daf.butler.DimensionGroup` 

1018 Dimensions of the data IDs to be generated. 

1019 remainder_skypix : `lsst.daf.butler.SkyPixDimension` 

1020 The single skypix dimension that is being added to 

1021 ``source_dimensions`` to yield ``target_dimensions``. 

1022 branches_not_in_tree : `set` [ `lsst.daf.butler.DimensionGroup` ] 

1023 Dimensions that have not yet been inserted into the tree. Updated 

1024 in place. 

1025 """ 

1026 target_branch = self.branches_by_dimensions[target_dimensions] 

1027 # We want to do the overlap calculation without any extra dimensions 

1028 # beyond the two spatial dimensions, which may or may not be what we 

1029 # already have. 

1030 overlap_dimensions = source_region_element.minimal_group | remainder_skypix.minimal_group 

1031 generator: DataIdGenerator 

1032 if overlap_dimensions == target_dimensions: 

1033 if isinstance(source_region_element, SkyPixDimension): 

1034 if source_region_element.system == remainder_skypix.system: 

1035 if source_region_element.level > remainder_skypix.level: 

1036 generator = SkyPixGatherDataIdGenerator( 

1037 target_branch, 

1038 target_dimensions, 

1039 source_dimensions, 

1040 remainder_skypix, 

1041 source_region_element, 

1042 ) 

1043 else: 

1044 generator = SkyPixScatterDataIdGenerator( 

1045 target_branch, 

1046 target_dimensions, 

1047 source_dimensions, 

1048 remainder_skypix, 

1049 source_region_element, 

1050 ) 

1051 else: 

1052 generator = CrossSystemDataIdGenerator( 

1053 target_branch, 

1054 target_dimensions, 

1055 source_dimensions, 

1056 remainder_skypix, 

1057 source_region_element, 

1058 ) 

1059 else: 

1060 generator = DatabaseSourceDataIdGenerator( 

1061 target_branch, 

1062 target_dimensions, 

1063 source_dimensions, 

1064 remainder_skypix, 

1065 source_region_element, 

1066 ) 

1067 # We know we can populate the data IDs in remainder_skypix_branch 

1068 # from the target branch by projection. Even if it's already 

1069 # populated by some other generated branch, we want to populate it 

1070 # again in case that picks up additional sky pixels. 

1071 target_branch.branches[remainder_skypix.minimal_group] = self.branches_by_dimensions[ 

1072 remainder_skypix.minimal_group 

1073 ] 

1074 branches_not_in_tree.discard(remainder_skypix.minimal_group) 

1075 else: 

1076 if overlap_dimensions not in self.branches_by_dimensions: 

1077 self.branches_by_dimensions[overlap_dimensions] = _DimensionGroupBranch() 

1078 branches_not_in_tree.add(overlap_dimensions) 

1079 self._append_data_id_generator( 

1080 source_region_element.minimal_group, 

1081 source_region_element, 

1082 overlap_dimensions, 

1083 remainder_skypix, 

1084 branches_not_in_tree, 

1085 ) 

1086 generator = JoinDataIdGenerator( 

1087 target_branch, 

1088 target_dimensions, 

1089 source_dimensions, 

1090 overlap_dimensions, 

1091 ) 

1092 self.generators.append(generator) 

1093 branches_not_in_tree.remove(target_dimensions) 

1094 

1095 def _make_general_overlap_branch_generator(self, branches_not_in_tree: set[DimensionGroup]) -> bool: 

1096 """Add data ID generators for sets of dimensions that can be generated 

1097 via skypix envelopes of other generated data IDs. 

1098 

1099 This method should be called in a loop until it returns `False` 

1100 (indicating no progress was made) or ``branches_not_in_tree`` is empty 

1101 (indicating no more work to be done). 

1102 

1103 Parameters 

1104 ---------- 

1105 branches_not_in_tree : `set` [ `lsst.daf.butler.DimensionGroup` ] 

1106 Dimensions that have not yet been inserted into the tree. Updated 

1107 in place. 

1108 

1109 Returns 

1110 ------- 

1111 appended : `bool` 

1112 Whether a new data ID generator was successfully appended. 

1113 """ 

1114 dimensions_done = sorted(self.branches_by_dimensions.keys() - branches_not_in_tree) 

1115 for source_dimensions in dimensions_done: 

1116 for target_dimensions in sorted(branches_not_in_tree): 

1117 if not source_dimensions <= target_dimensions: 

1118 continue 

1119 remainder_dimensions = target_dimensions - source_dimensions 

1120 if (remainder_skypix := get_single_skypix(remainder_dimensions)) is not None: 

1121 if source_region_name := source_dimensions.region_dimension: 

1122 # If the target dimensions are just adding a single 

1123 # skypix to the source dimensions and the source 

1124 # dimensions have a single region column, we can 

1125 # generate the skypix indices from the envelopes of 

1126 # those regions. 

1127 source_region_element = source_dimensions.universe[source_region_name] 

1128 self._append_data_id_generator( 

1129 source_dimensions, 

1130 source_region_element, 

1131 target_dimensions, 

1132 remainder_skypix, 

1133 branches_not_in_tree, 

1134 ) 

1135 return True 

1136 return not branches_not_in_tree 

1137 

1138 def _make_join_branch_generator(self, branches_not_in_tree: set[DimensionGroup]) -> bool: 

1139 """Add data ID generators for sets of dimensions that can be generated 

1140 via inner joints of other generated data IDs. 

1141 

1142 This method should be called in a loop until it returns `False` 

1143 (indicating no progress was made) or ``branches_not_in_tree`` is empty 

1144 (indicating no more work to be done). 

1145 

1146 Parameters 

1147 ---------- 

1148 branches_not_in_tree : `set` [ `lsst.daf.butler.DimensionGroup` ] 

1149 Dimensions that have not yet been inserted into the tree. Updated 

1150 in place. 

1151 

1152 Returns 

1153 ------- 

1154 appended : `bool` 

1155 Whether a new data ID generator was successfully appended. 

1156 """ 

1157 for target_dimensions in sorted(branches_not_in_tree): 

1158 dimensions_done = sorted(self.branches_by_dimensions.keys() - branches_not_in_tree) 

1159 candidates_by_common: dict[DimensionGroup, tuple[DimensionGroup, DimensionGroup]] = {} 

1160 for operand1, operand2 in itertools.combinations(dimensions_done, 2): 

1161 if operand1.union(operand2) == target_dimensions: 

1162 candidates_by_common[operand1.intersection(operand2)] = (operand1, operand2) 

1163 if candidates_by_common: 

1164 # Because DimensionGroup defines a set-like inequality 

1165 # operator, 'max' returns the set of dimensions that contains 

1166 # as many of the other sets of dimensions as possible, which is 

1167 # a reasonable guess at the most-constrained join. 

1168 operand1, operand2 = candidates_by_common[max(candidates_by_common)] 

1169 generator = JoinDataIdGenerator( 

1170 self.branches_by_dimensions[target_dimensions], 

1171 target_dimensions, 

1172 operand1, 

1173 operand2, 

1174 ) 

1175 self.generators.append(generator) 

1176 branches_not_in_tree.remove(target_dimensions) 

1177 return True 

1178 return not branches_not_in_tree 

1179 

1180 def project_data_ids(self, log: LsstLogAdapter) -> None: 

1181 """Recursively populate the data ID sets of the dimension group tree 

1182 from the data ID sets of the queryable branches. 

1183 

1184 Parameters 

1185 ---------- 

1186 log : `lsst.logging.LsstLogAdapter` 

1187 Logger to use for status reporting. 

1188 """ 

1189 for branch_dimensions, branch in self.queryable_branches.items(): 

1190 log.verbose("Projecting query data ID(s) to %s.", branch_dimensions) 

1191 branch.project_data_ids(log) 

1192 

1193 def generate_data_ids(self, log: LsstLogAdapter) -> None: 

1194 """Run all data ID generators. 

1195 

1196 This runs data ID generators and projects data IDs to their subset 

1197 dimensions. It can only be called after queryable data IDs have been 

1198 populated and dimension records fetched. 

1199 

1200 Parameters 

1201 ---------- 

1202 log : `lsst.logging.LsstLogAdapter` 

1203 Logger to use for status reporting. 

1204 """ 

1205 for generator in self.generators: 

1206 generator.run(log, self.branches_by_dimensions) 

1207 generator.branch.project_data_ids(log, log_indent=" ") 

1208 

1209 def pprint(self, printer: Callable[[str], None] = print) -> None: 

1210 """Print a human-readable representation of the dimensions tree. 

1211 

1212 Parameters 

1213 ---------- 

1214 printer : `~collections.abc.Callable`, optional 

1215 A function that takes a single string argument and prints a single 

1216 line (including a newline). Default is the built-in `print` 

1217 function. 

1218 """ 

1219 printer("Queryable:") 

1220 for branch_dimensions, branch in self.queryable_branches.items(): 

1221 branch.pprint(branch_dimensions, " ", printer=printer) 

1222 printer("Generator:") 

1223 for generator in self.generators: 

1224 generator.pprint(" ", printer=printer) 

1225 

1226 

1227def get_single_skypix(dimensions: DimensionGroup) -> SkyPixDimension | None: 

1228 """Try to coerce a dimension group a single skypix dimenison. 

1229 

1230 Parameters 

1231 ---------- 

1232 dimensions : `lsst.daf.butler.DimensionGroup` 

1233 Input dimensions. 

1234 

1235 Returns 

1236 ------- 

1237 skypix : `lsst.daf.butler.SkyPixDimension` or `None` 

1238 A skypix dimension that is the only dimension in the given group, or 

1239 `None` in all other cases. 

1240 """ 

1241 if len(dimensions) == 1: 

1242 (name,) = dimensions.names 

1243 return dimensions.universe.skypix_dimensions.get(name) 

1244 return None 

1245 

1246 

1247@dataclasses.dataclass 

1248class DataIdGenerator: 

1249 """A base class for generators for quantum and dataset data IDs that cannot 

1250 be directly queried for. 

1251 """ 

1252 

1253 branch: _DimensionGroupBranch 

1254 """Branch of the dimensions tree that this generator populates.""" 

1255 

1256 dimensions: DimensionGroup 

1257 """Dimensions of the data IDs generated.""" 

1258 

1259 source: DimensionGroup 

1260 """Dimensions of another set of data IDs that this generator uses as a 

1261 starting point. 

1262 """ 

1263 

1264 def pprint(self, indent: str = " ", printer: Callable[[str], None] = print) -> None: 

1265 """Print a human-readable representation of this generator. 

1266 

1267 Parameters 

1268 ---------- 

1269 indent : `str` 

1270 Blank spaces to prefix the output with (useful when this is nested 

1271 in hierarchical object being printed). 

1272 printer : `~collections.abc.Callable`, optional 

1273 A function that takes a single string argument and prints a single 

1274 line (including a newline). Default is the built-in `print` 

1275 function. 

1276 """ 

1277 self.branch.pprint( 

1278 self.dimensions, 

1279 indent, 

1280 f" <- {self.source} ({self.__class__.__name__})", 

1281 printer=printer, 

1282 ) 

1283 

1284 def run(self, log: LsstLogAdapter, branches: Mapping[DimensionGroup, _DimensionGroupBranch]) -> None: 

1285 """Run the generator, populating its branch's data IDs. 

1286 

1287 Parameters 

1288 ---------- 

1289 log : `lsst.log.LsstLogAdapter` 

1290 Logger with a ``verbose`` method as well as the built-in ones. 

1291 branches : `~collections.abc.Mapping` 

1292 Mapping of other dimension branches, keyed by their dimensions. 

1293 """ 

1294 raise NotImplementedError() 

1295 

1296 

1297@dataclasses.dataclass 

1298class DatabaseSourceDataIdGenerator(DataIdGenerator): 

1299 """A data ID generator that generates skypix indices from the envelope of 

1300 regions stored in the database. 

1301 """ 

1302 

1303 remainder_skypix: SkyPixDimension 

1304 """A single additional skypix dimension to be added to the source 

1305 dimensions. 

1306 """ 

1307 

1308 source_element: DimensionElement 

1309 """Dimension element that the database-stored regions are associated with. 

1310 """ 

1311 

1312 def run(self, log: LsstLogAdapter, branches: Mapping[DimensionGroup, _DimensionGroupBranch]) -> None: 

1313 # Docstring inherited. 

1314 source_branch = branches[self.source] 

1315 log.verbose( 

1316 "Generating %s data IDs via %s envelope of %s %s region(s).", 

1317 self.dimensions, 

1318 self.remainder_skypix, 

1319 len(source_branch.data_ids), 

1320 self.source_element, 

1321 ) 

1322 pixelization = self.remainder_skypix.pixelization 

1323 (source_records,) = [ 

1324 record_set 

1325 for record_set in source_branch.dimension_records 

1326 if record_set.element == self.source_element 

1327 ] 

1328 for source_data_id in source_branch.data_ids: 

1329 source_record = source_records.find(source_data_id) 

1330 for begin, end in pixelization.envelope(source_record.region): 

1331 for index in range(begin, end): 

1332 target_data_id = DataCoordinate.standardize( 

1333 source_data_id, 

1334 **{self.remainder_skypix.name: index}, # type: ignore[arg-type] 

1335 ) 

1336 self.branch.data_ids.add(target_data_id) 

1337 

1338 

1339@dataclasses.dataclass 

1340class CrossSystemDataIdGenerator(DataIdGenerator): 

1341 """A data ID generator that generates skypix indices from the envelope of 

1342 skypix regions from some other system (e.g. healpix from HTM). 

1343 """ 

1344 

1345 remainder_skypix: SkyPixDimension 

1346 """A single additional skypix dimension to be added to the source 

1347 dimensions. 

1348 """ 

1349 

1350 source_skypix: SkyPixDimension 

1351 """Dimension element for the already-known skypix indices.""" 

1352 

1353 def run(self, log: LsstLogAdapter, branches: Mapping[DimensionGroup, _DimensionGroupBranch]) -> None: 

1354 # Docstring inherited. 

1355 source_branch = branches[self.source] 

1356 log.verbose( 

1357 "Generating %s data IDs via %s envelope of %s %s region(s).", 

1358 self.dimensions, 

1359 self.remainder_skypix, 

1360 len(source_branch.data_ids), 

1361 self.source_skypix, 

1362 ) 

1363 source_pixelization = self.source_skypix.pixelization 

1364 remainder_pixelization = self.remainder_skypix.pixelization 

1365 for source_data_id in source_branch.data_ids: 

1366 source_region = source_pixelization.pixel(source_data_id[self.source_skypix.name]) 

1367 for begin, end in remainder_pixelization.envelope(source_region): 

1368 for index in range(begin, end): 

1369 target_data_id = DataCoordinate.standardize( 

1370 source_data_id, 

1371 **{self.remainder_skypix.name: index}, # type: ignore[arg-type] 

1372 ) 

1373 self.branch.data_ids.add(target_data_id) 

1374 

1375 

1376@dataclasses.dataclass 

1377class SkyPixScatterDataIdGenerator(DataIdGenerator): 

1378 """A data ID generator that generates skypix indices at a high (fine) level 

1379 from low-level (coarse) indices in the same system. 

1380 """ 

1381 

1382 remainder_skypix: SkyPixDimension 

1383 """A single additional skypix dimension to be added to the source 

1384 dimensions. 

1385 """ 

1386 

1387 source_skypix: SkyPixDimension 

1388 """Dimension element for the already-known skypix indices.""" 

1389 

1390 def run(self, log: LsstLogAdapter, branches: Mapping[DimensionGroup, _DimensionGroupBranch]) -> None: 

1391 # Docstring inherited. 

1392 factor = 4 ** (self.remainder_skypix.level - self.source_skypix.level) 

1393 source_branch = branches[self.source] 

1394 log.verbose( 

1395 "Generating %s data IDs by scaling %s %s IDs in %s by %s.", 

1396 self.dimensions, 

1397 len(source_branch.data_ids), 

1398 self.remainder_skypix, 

1399 self.source, 

1400 factor, 

1401 ) 

1402 for source_data_id in source_branch.data_ids: 

1403 ranges = RangeSet(source_data_id[self.source_skypix.name]) 

1404 ranges.scale(factor) 

1405 for begin, end in ranges: 

1406 for index in range(begin, end): 

1407 target_data_id = DataCoordinate.standardize( 

1408 source_data_id, 

1409 **{self.remainder_skypix.name: index}, # type: ignore[arg-type] 

1410 ) 

1411 self.branch.data_ids.add(target_data_id) 

1412 

1413 

1414@dataclasses.dataclass 

1415class SkyPixGatherDataIdGenerator(DataIdGenerator): 

1416 """A data ID generator that generates skypix indices at a low (coarse) 

1417 level from high-level (fine) indices in the same system. 

1418 """ 

1419 

1420 remainder_skypix: SkyPixDimension 

1421 """A single additional skypix dimension to be added to the source 

1422 dimensions. 

1423 """ 

1424 

1425 source_skypix: SkyPixDimension 

1426 """Dimension element for the already-known skypix indices.""" 

1427 

1428 def run(self, log: LsstLogAdapter, branches: Mapping[DimensionGroup, _DimensionGroupBranch]) -> None: 

1429 # Docstring inherited. 

1430 factor = 4 ** (self.source_skypix.level - self.remainder_skypix.level) 

1431 source_branch = branches[self.source] 

1432 log.verbose( 

1433 "Generating %s data IDs by dividing %s %s IDs in %s by %s.", 

1434 self.dimensions, 

1435 len(source_branch.data_ids), 

1436 self.remainder_skypix, 

1437 self.source, 

1438 factor, 

1439 ) 

1440 for source_data_id in source_branch.data_ids: 

1441 index = source_data_id[self.source_skypix.name] // factor 

1442 target_data_id = DataCoordinate.standardize(source_data_id, **{self.remainder_skypix.name: index}) 

1443 self.branch.data_ids.add(target_data_id) 

1444 

1445 

1446@dataclasses.dataclass 

1447class JoinDataIdGenerator(DataIdGenerator): 

1448 """A data ID that does an inner join between two already-populated 

1449 sets of data IDs. 

1450 """ 

1451 

1452 other: DimensionGroup 

1453 """Dimensions of the other data ID branches to join to those of ``source``. 

1454 """ 

1455 

1456 def run(self, log: LsstLogAdapter, branches: Mapping[DimensionGroup, _DimensionGroupBranch]) -> None: 

1457 # Docstring inherited. 

1458 source_branch = branches[self.source] 

1459 other_branch = branches[self.other] 

1460 log.verbose( 

1461 "Generating %s data IDs by joining %s (%s) to %s (%s).", 

1462 self.dimensions, 

1463 self.source, 

1464 len(source_branch.data_ids), 

1465 self.other, 

1466 len(other_branch.data_ids), 

1467 ) 

1468 common = self.source & self.other 

1469 other_by_common: defaultdict[DataCoordinate, list[DataCoordinate]] = defaultdict(list) 

1470 for other_data_id in other_branch.data_ids: 

1471 other_by_common[other_data_id.subset(common)].append(other_data_id) 

1472 source_by_common: defaultdict[DataCoordinate, list[DataCoordinate]] = defaultdict(list) 

1473 for source_data_id in source_branch.data_ids: 

1474 source_by_common[source_data_id.subset(common)].append(source_data_id) 

1475 for common_data_id in other_by_common.keys() & source_by_common.keys(): 

1476 for other_data_id in other_by_common[common_data_id]: 

1477 for source_data_id in source_by_common[common_data_id]: 

1478 self.branch.data_ids.add(other_data_id.union(source_data_id))