Coverage for python / lsst / pipe / base / all_dimensions_quantum_graph_builder.py: 16%
539 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:44 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:44 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28"""The standard, general-purpose implementation of the QuantumGraph-generation
29algorithm.
30"""
32from __future__ import annotations
34__all__ = ("AllDimensionsQuantumGraphBuilder", "DatasetQueryConstraintVariant")
36import dataclasses
37import itertools
38from collections import defaultdict
39from collections.abc import Callable, Iterable, Mapping
40from typing import TYPE_CHECKING, Any, final
42import astropy.table
44from lsst.daf.butler import (
45 Butler,
46 DataCoordinate,
47 DimensionElement,
48 DimensionGroup,
49 DimensionRecordSet,
50 MissingDatasetTypeError,
51 SkyPixDimension,
52)
53from lsst.sphgeom import RangeSet
54from lsst.utils.logging import LsstLogAdapter, PeriodicLogger
55from lsst.utils.timer import timeMethod
57from ._datasetQueryConstraints import DatasetQueryConstraintVariant
58from .quantum_graph_builder import QuantumGraphBuilder, QuantumGraphBuilderError
59from .quantum_graph_skeleton import DatasetKey, PrerequisiteDatasetKey, QuantumGraphSkeleton, QuantumKey
61if TYPE_CHECKING:
62 from .pipeline_graph import DatasetTypeNode, PipelineGraph, TaskNode
65@final
66class AllDimensionsQuantumGraphBuilder(QuantumGraphBuilder):
67 """An implementation of `.quantum_graph_builder.QuantumGraphBuilder` that
68 uses a single large query for data IDs covering all dimensions in the
69 pipeline.
71 Parameters
72 ----------
73 pipeline_graph : `.pipeline_graph.PipelineGraph`
74 Pipeline to build a `.QuantumGraph` from, as a graph. Will be resolved
75 in-place with the given butler (any existing resolution is ignored).
76 butler : `lsst.daf.butler.Butler`
77 Client for the data repository. Should be read-only.
78 where : `str`, optional
79 Butler expression language constraint to apply to all data IDs.
80 dataset_query_constraint : `DatasetQueryConstraintVariant`, optional
81 Specification of which overall-input datasets should be used to
82 constrain the initial data ID queries. Not including an important
83 constraint can result in catastrophically large query results that take
84 too long to process, while including too many makes the query much more
85 complex, increasing the chances that the database will choose a bad
86 (sometimes catastrophically bad) query plan.
87 bind : `~collections.abc.Mapping`, optional
88 Variable substitutions for the ``where`` expression.
89 data_id_tables : `~collections.abc.Iterable` [ `astropy.table.Table` ],\
90 optional
91 Tables of data IDs to join in as constraints. Missing dimensions that
92 are constrained by the ``where`` argument or pipeline data ID will be
93 filled in automatically.
94 **kwargs
95 Additional keyword arguments forwarded to
96 `.quantum_graph_builder.QuantumGraphBuilder`.
98 Notes
99 -----
100 This is a general-purpose algorithm that delegates the problem of
101 determining which "end" of the pipeline is more constrained (beginning by
102 input collection contents vs. end by the ``where`` string) to the database
103 query planner, which *usually* does a good job.
105 This algorithm suffers from a serious limitation, which we refer to as the
106 "tract slicing" problem from its most common variant: the ``where`` string
107 and general data ID intersection rules apply to *all* data IDs in the
108 graph. For example, if a ``tract`` constraint is present in the ``where``
109 string or an overall-input dataset, then it is impossible for any data ID
110 that does not overlap that tract to be present anywhere in the pipeline,
111 such as a ``{visit, detector}`` combination where the ``visit`` overlaps
112 the ``tract`` even if the ``detector`` does not.
113 """
115 def __init__(
116 self,
117 pipeline_graph: PipelineGraph,
118 butler: Butler,
119 *,
120 where: str = "",
121 dataset_query_constraint: DatasetQueryConstraintVariant = DatasetQueryConstraintVariant.ALL,
122 bind: Mapping[str, Any] | None = None,
123 data_id_tables: Iterable[astropy.table.Table] = (),
124 **kwargs: Any,
125 ):
126 super().__init__(pipeline_graph, butler, **kwargs)
127 assert where is not None, "'where' should be an empty str, not None"
128 self.where = where
129 self.dataset_query_constraint = dataset_query_constraint
130 self.bind = bind
131 self.data_id_tables = list(data_id_tables)
133 @timeMethod
134 def process_subgraph(self, subgraph: PipelineGraph) -> QuantumGraphSkeleton:
135 # Docstring inherited.
136 # There is some chance that the dimension query for one subgraph would
137 # be the same as or a dimension-subset of another. This is an
138 # optimization opportunity we're not currently taking advantage of.
139 tree = _DimensionGroupTree(subgraph)
140 tree.build(self.dataset_query_constraint, self.data_id_tables, log=self.log)
141 tree.pprint(printer=self.log.debug)
142 self._query_for_data_ids(tree)
143 dimension_records = self._fetch_most_dimension_records(tree)
144 tree.generate_data_ids(self.log)
145 skeleton: QuantumGraphSkeleton = self._make_subgraph_skeleton(tree)
146 if not skeleton.has_any_quanta:
147 # QG is going to be empty; exit early not just for efficiency, but
148 # also so downstream code doesn't have to guard against this case.
149 return skeleton
150 self._find_followup_datasets(tree, skeleton)
151 all_data_id_dimensions = subgraph.get_all_dimensions()
152 skeleton.attach_dimension_records(self.butler, all_data_id_dimensions, dimension_records)
153 return skeleton
155 def _query_for_data_ids(self, tree: _DimensionGroupTree) -> None:
156 """Query for data IDs and use the result to populate the dimension
157 group tree.
159 Parameters
160 ----------
161 tree : `_DimensionGroupTree`
162 Tree with dimension group branches that holds subgraph-specific
163 state for this builder, to be modified in place.
164 """
165 query_cmd: list[str] = []
166 with self.butler.query() as query:
167 query_cmd.append("with butler.query() as query:")
168 query_cmd.append(f" query = query.join_dimensions({list(tree.queryable_dimensions.names)})")
169 query = query.join_dimensions(tree.queryable_dimensions)
170 if tree.dataset_constraint:
171 query_cmd.append(f" collections = {list(self.input_collections)}")
172 for dataset_type_name in tree.dataset_constraint:
173 query_cmd.append(f" query = query.join_dataset_search({dataset_type_name!r}, collections)")
174 try:
175 query = query.join_dataset_search(dataset_type_name, self.input_collections)
176 except MissingDatasetTypeError:
177 raise QuantumGraphBuilderError(
178 f"No datasets for overall-input {dataset_type_name!r} found (the dataset type is "
179 "not even registered). This is probably a bug in either the pipeline definition or "
180 "the dataset constraints passed to the quantum graph builder."
181 ) from None
182 query_cmd.append(
183 f" query = query.where({dict(tree.subgraph.data_id.mapping)}, "
184 f"{self.where!r}, bind={self.bind!r})"
185 )
186 query = query.where(tree.subgraph.data_id, self.where, bind=self.bind)
187 # It's important for tables to be joined in last, so data IDs from
188 # pipeline and where can be used to fill in missing columns.
189 for table in self.data_id_tables:
190 # If this is from ctrl_mpexec's pipetask, it'll have added
191 # a filename to the metadata for us.
192 table_name = table.meta.get("filename", "unknown")
193 query_cmd.append(f" query = query.join_data_coordinate_table(<{table_name}>)")
194 query = query.join_data_coordinate_table(table)
195 self.log.verbose("Querying for data IDs via: %s", "\n".join(query_cmd))
196 # Allow duplicates from common skypix overlaps to make some queries
197 # run faster.
198 query._allow_duplicate_overlaps = True
199 # Iterate over query results, populating data IDs for datasets,
200 # quanta, and edges. We populate only the first level of the tree
201 # in the first pass, so we can be done with the query results as
202 # quickly as possible in case that holds a connection/cursor open.
203 n_rows = 0
204 progress_logger: PeriodicLogger | None = None
205 for common_data_id in query.data_ids(tree.queryable_dimensions):
206 if progress_logger is None:
207 # There can be a long wait between submitting the query and
208 # returning the first row, so we want to make sure we log
209 # when we get it; note that PeriodicLogger is not going to
210 # do that for us, as it waits for its interval _after_ the
211 # first log is seen.
212 self.log.info("Iterating over data ID query results.")
213 progress_logger = PeriodicLogger(self.log)
214 for branch_dimensions, branch in tree.queryable_branches.items():
215 data_id = common_data_id.subset(branch_dimensions)
216 branch.data_ids.add(data_id)
217 n_rows += 1
218 progress_logger.log("Iterating over data ID query results: %d rows processed so far.", n_rows)
219 if n_rows == 0:
220 # A single multiline log plays better with log aggregators like
221 # Loki.
222 lines = ["Initial data ID query returned no rows, so QuantumGraph will be empty."]
223 try:
224 lines.extend(query.explain_no_results())
225 finally:
226 lines.append("To reproduce this query for debugging purposes, run:")
227 lines.append("")
228 lines.extend(query_cmd)
229 lines.append(" print(query.any())")
230 lines.append("")
231 lines.append("And then try removing various constraints until query.any() returns True.")
232 # If an exception was raised, write a partial.
233 self.log.error("\n".join(lines))
234 return
235 self.log.verbose("Done iterating over query results: %d rows processed in total.", n_rows)
236 # We now recursively populate the data IDs of the rest of the tree.
237 tree.project_data_ids(self.log)
239 @timeMethod
240 def _make_subgraph_skeleton(self, tree: _DimensionGroupTree) -> QuantumGraphSkeleton:
241 """Build a `QuantumGraphSkeleton` by processing the data IDs in the
242 dimension group tree.
244 Parameters
245 ----------
246 tree : `_DimensionGroupTree`
247 Tree with dimension group branches that holds subgraph-specific
248 state for this builder.
250 Returns
251 -------
252 skeleton : `QuantumGraphSkeleton`
253 Preliminary quantum graph.
254 """
255 skeleton = QuantumGraphSkeleton(tree.subgraph.tasks)
256 for branch_dimensions, branch in tree.branches_by_dimensions.items():
257 self.log.verbose(
258 "Adding nodes for %s %s data ID(s).",
259 len(branch.data_ids),
260 branch_dimensions,
261 )
262 branch.update_skeleton_nodes(skeleton)
263 for branch_dimensions, branch in tree.branches_by_dimensions.items():
264 self.log.verbose(
265 "Adding edges for %s %s data ID(s).",
266 len(branch.data_ids),
267 branch_dimensions,
268 )
269 branch.update_skeleton_edges(skeleton)
270 n_quanta = sum(len(skeleton.get_quanta(task_label)) for task_label in tree.subgraph.tasks)
271 self.log.info(
272 "Initial bipartite graph has %d quanta, %d dataset nodes, and %d edges.",
273 n_quanta,
274 skeleton.n_nodes - n_quanta,
275 skeleton.n_edges,
276 )
277 return skeleton
279 @timeMethod
280 def _find_followup_datasets(self, tree: _DimensionGroupTree, skeleton: QuantumGraphSkeleton) -> None:
281 """Populate `existing_datasets` by performing follow-up queries with
282 the data IDs in the dimension group tree.
284 Parameters
285 ----------
286 tree : `_DimensionGroupTree`
287 Tree with dimension group branches that holds subgraph-specific
288 state for this builder.
289 skeleton : `.quantum_graph_skeleton.QuantumGraphSkeleton`
290 In-progress quantum graph to modify in place.
291 """
292 dataset_key: DatasetKey | PrerequisiteDatasetKey
293 for dimensions, branch in tree.branches_by_dimensions.items():
294 if not dimensions:
295 for dataset_type_name in branch.dataset_types.keys():
296 dataset_key = DatasetKey(dataset_type_name, self.empty_data_id.required_values)
297 if ref := self.empty_dimensions_datasets.inputs.get(dataset_key):
298 skeleton.set_dataset_ref(ref, dataset_key)
299 if ref := self.empty_dimensions_datasets.outputs_for_skip.get(dataset_key):
300 skeleton.set_output_for_skip(ref)
301 if ref := self.empty_dimensions_datasets.outputs_in_the_way.get(dataset_key):
302 skeleton.set_output_in_the_way(ref)
303 continue
304 if not branch.dataset_types and not branch.tasks:
305 continue
306 if not branch.data_ids:
307 continue
308 # Iterate over regular input/output dataset type nodes with these
309 # dimensions to find those datasets using followup queries.
310 with self.butler.query() as butler_query:
311 butler_query = butler_query.join_data_coordinates(branch.data_ids)
312 for dataset_type_node in branch.dataset_types.values():
313 if tree.subgraph.producer_of(dataset_type_node.name) is None:
314 # Dataset type is an overall input; we always need to
315 # try to find these.
316 count = 0
317 try:
318 for ref in butler_query.datasets(dataset_type_node.name, self.input_collections):
319 skeleton.set_dataset_ref(ref)
320 count += 1
321 except MissingDatasetTypeError:
322 pass
323 self.log.verbose(
324 "Found %d overall-input dataset(s) of type %r.", count, dataset_type_node.name
325 )
326 continue
327 if self.skip_existing_in:
328 # Dataset type is an intermediate or output; need to
329 # find these if only they're from previously executed
330 # quanta that we might skip...
331 count = 0
332 try:
333 for ref in butler_query.datasets(dataset_type_node.name, self.skip_existing_in):
334 skeleton.set_output_for_skip(ref)
335 count += 1
336 if ref.run == self.output_run:
337 skeleton.set_output_in_the_way(ref)
338 except MissingDatasetTypeError:
339 pass
340 self.log.verbose(
341 "Found %d output dataset(s) of type %r in %s.",
342 count,
343 dataset_type_node.name,
344 self.skip_existing_in,
345 )
346 if self.output_run_exists and not self.skip_existing_starts_with_output_run:
347 # ...or if they're in the way and would need to be
348 # clobbered (and we haven't already found them in the
349 # previous block).
350 count = 0
351 try:
352 for ref in butler_query.datasets(dataset_type_node.name, [self.output_run]):
353 skeleton.set_output_in_the_way(ref)
354 count += 1
355 except MissingDatasetTypeError:
356 pass
357 self.log.verbose(
358 "Found %d output dataset(s) of type %r in %s.",
359 count,
360 dataset_type_node.name,
361 self.output_run,
362 )
363 # Iterate over tasks with these dimensions to perform follow-up
364 # queries for prerequisite inputs, which may have dimensions
365 # that were not in ``tree.all_dimensions`` and/or require
366 # temporal joins to calibration validity ranges.
367 for task_node in branch.tasks.values():
368 task_prerequisite_info = self.prerequisite_info[task_node.label]
369 for connection_name, finder in list(task_prerequisite_info.finders.items()):
370 if finder.lookup_function is not None:
371 self.log.verbose(
372 "Deferring prerequisite input %r of task %r to per-quantum processing "
373 "(lookup function provided).",
374 finder.dataset_type_node.name,
375 task_node.label,
376 )
377 continue
378 # We also fall back to the base class if there is a
379 # nontrivial spatial or temporal join in the lookup.
380 if finder.dataset_skypix or finder.dataset_other_spatial:
381 if task_prerequisite_info.bounds.spatial_connections:
382 self.log.verbose(
383 "Deferring prerequisite input %r of task %r to per-quantum processing "
384 "(for spatial-bounds-connections handling).",
385 finder.dataset_type_node.name,
386 task_node.label,
387 )
388 continue
389 if not task_node.dimensions.spatial:
390 self.log.verbose(
391 "Deferring prerequisite input %r of task %r to per-quantum processing "
392 "(dataset has spatial data IDs, but task does not).",
393 finder.dataset_type_node.name,
394 task_node.label,
395 )
396 continue
397 if finder.dataset_has_timespan:
398 if task_prerequisite_info.bounds.spatial_connections:
399 self.log.verbose(
400 "Deferring prerequisite input %r of task %r to per-quantum processing "
401 "(for temporal-bounds-connections handling).",
402 finder.dataset_type_node.name,
403 task_node.label,
404 )
405 continue
406 if not task_node.dimensions.temporal:
407 self.log.verbose(
408 "Deferring prerequisite input %r of task %r to per-quantum processing "
409 "(dataset has temporal data IDs, but task does not).",
410 finder.dataset_type_node.name,
411 task_node.label,
412 )
413 continue
414 # We have a simple case where we can do a single query
415 # that joins the query we already have for the task
416 # data IDs to the datasets we're looking for.
417 count = 0
418 try:
419 query_results = list(
420 butler_query.join_dataset_search(
421 finder.dataset_type_node.dataset_type, self.input_collections
422 )
423 .general(
424 dimensions | finder.dataset_type_node.dataset_type.dimensions,
425 dataset_fields={finder.dataset_type_node.name: ...},
426 find_first=True,
427 )
428 .iter_tuples(finder.dataset_type_node.dataset_type)
429 )
430 except MissingDatasetTypeError:
431 query_results = []
432 for data_id, refs, _ in query_results:
433 ref = refs[0]
434 dataset_key = skeleton.add_prerequisite_node(ref)
435 quantum_key = QuantumKey(
436 task_node.label, data_id.subset(dimensions).required_values
437 )
438 skeleton.add_input_edge(quantum_key, dataset_key)
439 count += 1
440 # Remove this finder from the mapping so the base class
441 # knows it doesn't have to look for these
442 # prerequisites.
443 del task_prerequisite_info.finders[connection_name]
444 self.log.verbose(
445 "Added %d prerequisite input edge(s) from dataset type %r to task %r.",
446 count,
447 finder.dataset_type_node.name,
448 task_node.label,
449 )
450 # Delete data ID sets we don't need anymore to save memory.
451 del branch.data_ids
453 @timeMethod
454 def _fetch_most_dimension_records(self, tree: _DimensionGroupTree) -> list[DimensionRecordSet]:
455 """Query for dimension records for all non-prerequisite data IDs (and
456 possibly some prerequisite data IDs).
458 Parameters
459 ----------
460 tree : `_DimensionGroupTree`
461 Tree with dimension group branches that holds subgraph-specific
462 state for this builder.
464 Returns
465 -------
466 dimension_records : `list` [ `lsst.daf.butler.DimensionRecordSet` ]
467 List of sets of dimension records.
469 Notes
470 -----
471 Because the initial common data ID query is used to generate all
472 quantum and regular input/output dataset data IDs, column subsets of it
473 can also be used to fetch dimension records for those data IDs.
474 """
475 self.log.verbose("Performing follow-up queries for dimension records.")
476 result: list[DimensionRecordSet] = []
477 for branch in tree.branches_by_dimensions.values():
478 if not branch.dimension_records:
479 continue
480 if not branch.data_ids:
481 continue
482 with self.butler.query() as butler_query:
483 butler_query = butler_query.join_data_coordinates(branch.data_ids)
484 for record_set in branch.dimension_records:
485 record_set.update(butler_query.dimension_records(record_set.element.name))
486 result.append(record_set)
487 return result
490@dataclasses.dataclass(eq=False, repr=False, slots=True)
491class _DimensionGroupTwig:
492 """A small side-branch of the tree of dimensions groups that tracks the
493 tasks and dataset types with a particular set of dimensions that appear in
494 the edges populated by its parent branch.
496 See `_DimensionGroupTree` for more details.
497 """
499 parent_edge_tasks: set[str] = dataclasses.field(default_factory=set)
500 """Task labels for tasks whose quanta have the dimensions of this twig and
501 are endpoints of edges that have the combined dimensions of this twig's
502 parent branch.
503 """
505 parent_edge_dataset_types: set[str] = dataclasses.field(default_factory=set)
506 """Dataset type names for datasets whose quanta have the dimensions of this
507 twig and are endpoints of edges that have the combined dimensions of this
508 twig's parent branch.
509 """
512@dataclasses.dataclass(eq=False, repr=False, slots=True)
513class _DimensionGroupBranch:
514 """A node in the tree of dimension groups that are used to recursively
515 process query data IDs into a quantum graph.
516 """
518 tasks: dict[str, TaskNode] = dataclasses.field(default_factory=dict)
519 """The task nodes whose quanta have these dimensions, keyed by task label.
520 """
522 dataset_types: dict[str, DatasetTypeNode] = dataclasses.field(default_factory=dict)
523 """The dataset type nodes whose datasets have these dimensions, keyed by
524 dataset type name.
525 """
527 dimension_records: list[DimensionRecordSet] = dataclasses.field(default_factory=list)
528 """Sets of dimension records looked up with these dimensions."""
530 data_ids: set[DataCoordinate] = dataclasses.field(default_factory=set)
531 """All data IDs with these dimensions seen in the QuantumGraph."""
533 input_edges: list[tuple[str, str]] = dataclasses.field(default_factory=list)
534 """Dataset type -> task edges that are populated by this set of dimensions.
536 These are cases where `dimensions` is the union of the task and dataset
537 type dimensions.
538 """
540 output_edges: list[tuple[str, str]] = dataclasses.field(default_factory=list)
541 """Task -> dataset type edges that are populated by this set of dimensions.
543 These are cases where `dimensions` is the union of the task and dataset
544 type dimensions.
545 """
547 branches: dict[DimensionGroup, _DimensionGroupBranch] = dataclasses.field(default_factory=dict)
548 """Child branches whose dimensions are strict subsets of this branch's
549 dimensions, populated by projecting this branch's set of data IDs (i.e.
550 remove a dimension, then deduplicate).
551 """
553 twigs: defaultdict[DimensionGroup, _DimensionGroupTwig] = dataclasses.field(
554 default_factory=lambda: defaultdict(_DimensionGroupTwig)
555 )
556 """Small branches for all of the dimensions that appear on one side of any
557 edge in `input_edges` or `output_edges`.
558 """
560 def pprint(
561 self,
562 dimensions: DimensionGroup,
563 indent: str = " ",
564 suffix: str = "",
565 printer: Callable[[str], None] = print,
566 ) -> None:
567 printer(f"{indent}{dimensions}{suffix}")
568 for branch_dimensions, branch in self.branches.items():
569 branch.pprint(branch_dimensions, indent + " ", printer=printer)
571 def project_data_ids(self, log: LsstLogAdapter, log_indent: str = " ") -> None:
572 """Populate the data ID sets of child branches from the data IDs in
573 this branch, recursively.
575 Parameters
576 ----------
577 log : `lsst.utils.logging.LsstLogAdapter`
578 Logger to use for status reporting.
579 log_indent : `str`, optional
580 Indentation to prefix the log message. This is used when recursing
581 to make the branch structure clear.
582 """
583 for data_id in self.data_ids:
584 for branch_dimensions, branch in self.branches.items():
585 branch.data_ids.add(data_id.subset(branch_dimensions))
586 for branch_dimensions, branch in self.branches.items():
587 log.verbose("%sProjecting query data ID(s) to %s.", log_indent, branch_dimensions)
588 branch.project_data_ids(log, log_indent + " ")
590 def update_skeleton_nodes(self, skeleton: QuantumGraphSkeleton) -> None:
591 """Process the data ID sets of this branch and its children recursively
592 to add nodes and edges to the under-construction quantum graph.
594 Parameters
595 ----------
596 skeleton : `QuantumGraphSkeleton`
597 Under-construction quantum graph to modify in place.
598 """
599 for data_id in self.data_ids:
600 for task_label in self.tasks:
601 skeleton.add_quantum_node(task_label, data_id)
602 for dataset_type_name in self.dataset_types:
603 skeleton.add_dataset_node(dataset_type_name, data_id)
605 def update_skeleton_edges(self, skeleton: QuantumGraphSkeleton) -> None:
606 """Process the data ID sets of this branch and its children recursively
607 to add nodes and edges to the under-construction quantum graph.
609 Parameters
610 ----------
611 skeleton : `QuantumGraphSkeleton`
612 Under-construction quantum graph to modify in place.
613 """
614 for data_id in self.data_ids:
615 quantum_keys: dict[str, QuantumKey] = {}
616 dataset_keys: dict[str, DatasetKey] = {}
617 for twig_dimensions, twig in self.twigs.items():
618 twig_data_id = data_id.subset(twig_dimensions)
619 for task_label in twig.parent_edge_tasks:
620 quantum_keys[task_label] = QuantumKey(task_label, twig_data_id.required_values)
621 for dataset_type_name in twig.parent_edge_dataset_types:
622 dataset_keys[dataset_type_name] = DatasetKey(
623 dataset_type_name, twig_data_id.required_values
624 )
625 for dataset_type_name, task_label in self.input_edges:
626 skeleton.add_input_edge(quantum_keys[task_label], dataset_keys[dataset_type_name])
627 for task_label, dataset_type_name in self.output_edges:
628 skeleton.add_output_edge(quantum_keys[task_label], dataset_keys[dataset_type_name])
629 if not self.dataset_types and not self.tasks:
630 # Delete data IDs we don't need anymore to save memory.
631 del self.data_ids
634@dataclasses.dataclass(eq=False, repr=False)
635class _DimensionGroupTree:
636 """A tree of dimension groups in which branches are subsets of their
637 parents.
639 This class holds all of the per-subgraph state for this QG builder
640 subclass.
642 Notes
643 -----
644 The full set of dimensions referenced by any task or dataset type (except
645 prerequisite inputs) forms the conceptual "trunk" of this tree. Each
646 branch has a subset of the dimensions of its parent branch, and each set
647 of dimensions appears exactly once in a tree (so there is some flexibility
648 in where certain dimension subsets may appear; right now this is resolved
649 somewhat arbitrarily).
650 We do not add branches for every possible dimension subset; a branch is
651 created for a `~lsst.daf.butler.DimensionGroup` if:
653 - if there is a task whose quanta have those dimensions;
654 - if there is a non-prerequisite dataset type with those dimensions;
655 - if there is an edge for which the union of the task and dataset type
656 dimensions are those dimensions;
657 - if there is a dimension element in any task or non-prerequisite dataset
658 type dimensions whose `~lsst.daf.butler.DimensionElement.minimal_group`
659 is those dimensions (allowing us to look up dimension records).
661 In addition, for any dimension group that has unqueryable dimensions (e.g.
662 non-common skypix dimensions, like healpix), we create a branch for the
663 subset of the group with only queryable dimensions.
665 We process the initial data query by recursing through this tree structure
666 to populate a data ID set for each branch
667 (`_DimensionGroupBranch.project_data_ids`), and then process those sets.
668 This can be far faster than the non-recursive processing the QG builder
669 used to use because the set of data IDs is smaller (sometimes dramatically
670 smaller) as we move to smaller sets of dimensions.
672 In addition to their child branches, a branch that is used to define graph
673 edges also has "twigs", which are a flatter set of dimension subsets for
674 each of the tasks and dataset types that appear in that branch's edges.
675 The same twig dimensions can appear in multiple branches, and twig
676 dimensions can be the same as their parent branch's (but not a superset).
677 """
679 subgraph: PipelineGraph
680 """Graph of this subset of the pipeline."""
682 all_dimensions: DimensionGroup = dataclasses.field(init=False)
683 """The union of all dimensions that appear in any task or
684 (non-prerequisite) dataset type in this subgraph.
685 """
687 queryable_dimensions: DimensionGroup = dataclasses.field(init=False)
688 """All dimensions except those that cannot be queried for directly via the
689 butler (e.g. skypix systems other than the common one).
690 """
692 branches_by_dimensions: dict[DimensionGroup, _DimensionGroupBranch] = dataclasses.field(init=False)
693 """The tasks and dataset types of this subset of the pipeline, grouped
694 by their dimensions.
695 """
697 dataset_constraint: set[str] = dataclasses.field(default_factory=set)
698 """The names of dataset types used as query constraints."""
700 queryable_branches: dict[DimensionGroup, _DimensionGroupBranch] = dataclasses.field(default_factory=dict)
701 """The top-level branches in the tree of dimension groups populated by the
702 butler query.
704 Data IDs in these branches are populated from the top down, with each
705 branch a projection ("remove dimension, then deduplicate") of its parent,
706 starting with the query result rows.
707 """
709 generators: list[DataIdGenerator] = dataclasses.field(default_factory=list)
710 """Branches for dimensions groups that are populated by algorithmically
711 generating data IDs from those in one or more other branches.
713 These are typically variants on the theme of adding a skypix dimension to
714 another set of dimensions by identifying the sky pixels that overlap the
715 region of the original dimensions.
716 """
718 def __post_init__(self) -> None:
719 universe = self.subgraph.universe
720 assert universe is not None, "Pipeline graph is resolved."
721 self.branches_by_dimensions = {
722 dimensions: _DimensionGroupBranch(tasks, dataset_types)
723 for dimensions, (tasks, dataset_types) in self.subgraph.group_by_dimensions().items()
724 }
725 self.all_dimensions = DimensionGroup.union(*self.branches_by_dimensions.keys(), universe=universe)
727 def build(
728 self,
729 requested: DatasetQueryConstraintVariant,
730 data_id_tables: Iterable[astropy.table.Table],
731 *,
732 log: LsstLogAdapter,
733 ) -> None:
734 """Organize the branches into a tree.
736 Parameters
737 ----------
738 requested : `DatasetQueryConstraintVariant`
739 Query constraint specified by the user.
740 data_id_tables : `~collections.abc.Iterable` [ `astropy.table.Table` ]
741 Data ID tables being joined into the query.
742 log : `lsst.utils.logging.LsstLogAdapter`
743 Logger that supports ``verbose`` output.
744 """
745 universe = self.all_dimensions.universe
746 self._make_dimension_record_branches()
747 self._make_edge_branches()
748 self._set_dataset_constraint(requested, log)
749 # Work out which dimensions we can potentially query the database for.
750 # We start out by dropping all skypix dimensions other than the common
751 # one, and then we add them back in if a constraint dataset type or
752 # data ID table provides them.
753 unqueryable_skypix = universe.conform(self.all_dimensions.skypix - {universe.commonSkyPix.name})
754 self.queryable_dimensions = self.all_dimensions.difference(unqueryable_skypix)
755 for dataset_type_name in sorted(self.dataset_constraint):
756 dataset_type_dimensions = self.subgraph.dataset_types[dataset_type_name].dimensions
757 dataset_type_skypix = dataset_type_dimensions.intersection(unqueryable_skypix)
758 if dataset_type_skypix:
759 log.info(
760 f"Including {dataset_type_skypix} in the set of dimensions to query via "
761 f"{dataset_type_name}. If this query fails, exclude those dataset type "
762 "from the constraint or provide a data ID table for missing spatial joins."
763 )
764 self.queryable_dimensions = self.queryable_dimensions.union(dataset_type_dimensions)
765 for data_id_table in data_id_tables:
766 table_dimensions = universe.conform(data_id_table.colnames)
767 if table_dimensions.skypix:
768 self.queryable_dimensions = self.queryable_dimensions.union(table_dimensions)
769 # Set up the tree to generate most data IDs by querying for them from
770 # the database and then projecting to subset dimensions.
771 branches_not_in_tree = set(self.branches_by_dimensions.keys())
772 self._make_queryable_branch_tree(branches_not_in_tree)
773 # Try to find ways to generate other data IDs directly from the
774 # queryable branches.
775 self._make_queryable_overlap_branch_generators(branches_not_in_tree)
776 # As long as there are still branches that haven't been inserted into
777 # the tree, try to add them as projections of generated branches or
778 # generators on generated branches.
779 while branches_not_in_tree:
780 # Look for projections first, since those are more efficient, and
781 # some may be available after we've added some generators.
782 # We intentionally add the same branch as a projection of multiple
783 # parents since (unlike queryable dimensions) there's no guarantee
784 # that each parent branch's data IDs would project to the same set
785 # (e.g. a visit-healpix overlap may yield different healpixels than
786 # a patch-healpix overlap, even if the visits and patches overlap).
787 for target_dimensions in sorted(branches_not_in_tree):
788 for generator in self.generators:
789 if self._maybe_insert_projection_branch(
790 target_dimensions, generator.dimensions, generator.branch.branches
791 ):
792 branches_not_in_tree.discard(target_dimensions)
793 if not self._make_general_overlap_branch_generator(branches_not_in_tree):
794 break
795 # After we've exhausted overlap generation, try generation via joins
796 # of dimensions we can already query for or generate.
797 while branches_not_in_tree:
798 if not self._make_join_branch_generator(branches_not_in_tree):
799 raise QuantumGraphBuilderError(f"Could not generate data IDs for {branches_not_in_tree}.")
801 def _set_dataset_constraint(self, requested: DatasetQueryConstraintVariant, log: LsstLogAdapter) -> None:
802 """Set the dataset query constraint.
804 Parameters
805 ----------
806 requested : `DatasetQueryConstraintVariant`
807 Query constraint specified by the user.
808 log : `lsst.utils.logging.LsstLogAdapter`
809 Logger that supports ``verbose`` output.
810 """
811 overall_inputs: dict[str, DatasetTypeNode] = {
812 name: node # type: ignore
813 for name, node in self.subgraph.iter_overall_inputs()
814 if not node.is_prerequisite # type: ignore
815 }
816 match requested:
817 case DatasetQueryConstraintVariant.ALL:
818 self.dataset_constraint = {
819 name
820 for name, dataset_type_node in overall_inputs.items()
821 if (dataset_type_node.is_initial_query_constraint and dataset_type_node.dimensions)
822 }
823 case DatasetQueryConstraintVariant.OFF:
824 pass
825 case DatasetQueryConstraintVariant.LIST:
826 self.dataset_constraint = set(requested)
827 inputs = {
828 name for name, dataset_type_node in overall_inputs.items() if dataset_type_node.dimensions
829 }
830 if remainder := self.dataset_constraint.difference(inputs):
831 log.verbose(
832 "Ignoring dataset types %s in dataset query constraint that are not inputs to this "
833 "subgraph, on the assumption that they are relevant for a different subgraph.",
834 remainder,
835 )
836 self.dataset_constraint.intersection_update(inputs)
837 if not self.dataset_constraint:
838 raise QuantumGraphBuilderError(
839 "An explicit dataset query constraint was provided, but it does not include any "
840 f"inputs to the pipeline subset with tasks {list(self.subgraph.tasks.keys())}."
841 )
842 case _:
843 raise QuantumGraphBuilderError(
844 f"Unable to handle type {requested} given as dataset query constraint."
845 )
847 def _make_dimension_record_branches(self) -> None:
848 """Ensure we have branches for all dimension elements we'll need to
849 fetch dimension records for.
850 """
851 for element_name in self.all_dimensions.elements:
852 element = self.all_dimensions.universe[element_name]
853 record_set = DimensionRecordSet(element_name, universe=self.all_dimensions.universe)
854 if element.minimal_group in self.branches_by_dimensions:
855 self.branches_by_dimensions[element.minimal_group].dimension_records.append(record_set)
856 else:
857 self.branches_by_dimensions[element.minimal_group] = _DimensionGroupBranch(
858 dimension_records=[record_set]
859 )
861 def _make_edge_branches(self) -> None:
862 """Ensure we have branches for all edges in the graph."""
864 def update_edge_branch(
865 task_node: TaskNode, dataset_type_node: DatasetTypeNode
866 ) -> _DimensionGroupBranch:
867 union_dimensions = task_node.dimensions.union(dataset_type_node.dimensions)
868 if (branch := self.branches_by_dimensions.get(union_dimensions)) is None:
869 branch = _DimensionGroupBranch()
870 self.branches_by_dimensions[union_dimensions] = branch
871 branch.twigs[dataset_type_node.dimensions].parent_edge_dataset_types.add(dataset_type_node.name)
872 branch.twigs[task_node.dimensions].parent_edge_tasks.add(task_node.label)
873 return branch
875 for task_node in self.subgraph.tasks.values():
876 for dataset_type_node in self.subgraph.inputs_of(task_node.label).values():
877 assert dataset_type_node is not None, "Pipeline graph is resolved."
878 if dataset_type_node.is_prerequisite:
879 continue
880 branch = update_edge_branch(task_node, dataset_type_node)
881 branch.input_edges.append((dataset_type_node.name, task_node.label))
882 for dataset_type_node in self.subgraph.outputs_of(task_node.label).values():
883 assert dataset_type_node is not None, "Pipeline graph is resolved."
884 branch = update_edge_branch(task_node, dataset_type_node)
885 branch.output_edges.append((task_node.label, dataset_type_node.name))
887 def _make_queryable_branch_tree(self, branches_not_in_tree: set[DimensionGroup]) -> None:
888 """Assemble the branches with queryable dimensions into a tree, in
889 which each branch has a subset of the dimensions of its parent.
891 Parameters
892 ----------
893 branches_not_in_tree : `set` [ `lsst.daf.butler.DimensionGroup` ]
894 Dimensions that have not yet been inserted into the tree. Updated
895 in place.
896 """
897 for target_dimensions in sorted(branches_not_in_tree):
898 if target_dimensions.issubset(self.queryable_dimensions):
899 if self._maybe_insert_projection_branch(
900 target_dimensions, self.queryable_dimensions, self.queryable_branches
901 ):
902 branches_not_in_tree.remove(target_dimensions)
903 else:
904 raise AssertionError(
905 "Projection-branch insertion should not fail for queryable dimensions."
906 )
908 def _maybe_insert_projection_branch(
909 self,
910 target_dimensions: DimensionGroup,
911 candidate_dimensions: DimensionGroup,
912 candidate_projection_branches: dict[DimensionGroup, _DimensionGroupBranch],
913 ) -> bool:
914 """Insert a branch at the appropriate location in a [sub]tree.
916 Branches are inserted below the first parent branch whose dimensions
917 are a superset of their own.
919 Parameters
920 ----------
921 target_dimensions : `lsst.daf.butler.DimensionGroup`
922 Dimensions of the branch to be inserted.
923 candidate_dimensions : `lsst.daf.butler.DimensionGroup`
924 Dimensions of the subtree the branch might be inserted under. If
925 this is not a superset of ``target_dimensions``, this method
926 returns `False` and nothing is done.
927 candidate_projection_branches : `dict` [ \
928 `lsst.daf.butler.DimensionGroup`, `_DimensionGroupBranch` ]
929 Subtree branches to be updated directly or indirectly (i.e. in a
930 nested branch).
932 Returns
933 -------
934 inserted : `bool`
935 Whether the branch was actually inserted.
936 """
937 if candidate_dimensions >= target_dimensions:
938 target_branch = self.branches_by_dimensions[target_dimensions]
939 for child_dimensions in list(candidate_projection_branches.keys()):
940 if self._maybe_insert_projection_branch(
941 child_dimensions, target_dimensions, target_branch.branches
942 ):
943 del candidate_projection_branches[child_dimensions]
944 for child_dimensions, child_branch in candidate_projection_branches.items():
945 if self._maybe_insert_projection_branch(
946 target_dimensions, child_dimensions, child_branch.branches
947 ):
948 return True
949 candidate_projection_branches[target_dimensions] = target_branch
950 return True
951 return False
953 def _make_queryable_overlap_branch_generators(self, branches_not_in_tree: set[DimensionGroup]) -> None:
954 """Add data ID generators for sets of dimensions that can only
955 partially queried for, with the rest needing to be generated by
956 manipulating the data IDs of the queryable subset.
958 Parameters
959 ----------
960 branches_not_in_tree : `set` [ `lsst.daf.butler.DimensionGroup` ]
961 Dimensions that have not yet been inserted into the tree. Updated
962 in place.
963 """
964 for target_dimensions in sorted(branches_not_in_tree):
965 queryable_subset_dimensions = target_dimensions.intersection(self.queryable_dimensions)
966 # Make sure we actually have a branch to capture the queryable
967 # subset data IDs (i.e. in case we didn't already have one for some
968 # dataset type or task, etc).
969 if queryable_subset_dimensions not in self.branches_by_dimensions:
970 # If we have to make a new queryable branch, we also have to
971 # insert it into the tree so its data IDs get populated.
972 self.branches_by_dimensions[queryable_subset_dimensions] = _DimensionGroupBranch()
973 if not self._maybe_insert_projection_branch(
974 queryable_subset_dimensions,
975 self.queryable_dimensions,
976 self.queryable_branches,
977 ):
978 raise AssertionError(
979 "Projection-branch insertion should not fail for queryable dimensions."
980 )
981 if queryable_region_name := queryable_subset_dimensions.region_dimension:
982 # If there is a single well-defined region for the queryable
983 # subset, we can potentially generate skypix IDs from it.
984 # Do the target dimensions just add a single skypix dimension
985 # to the queryable subset?
986 remainder_dimensions = target_dimensions - queryable_subset_dimensions
987 if (remainder_skypix := get_single_skypix(remainder_dimensions)) is not None:
988 queryable_region_element = target_dimensions.universe[queryable_region_name]
989 self._append_data_id_generator(
990 queryable_subset_dimensions,
991 queryable_region_element,
992 target_dimensions,
993 remainder_skypix,
994 branches_not_in_tree,
995 )
997 def _append_data_id_generator(
998 self,
999 source_dimensions: DimensionGroup,
1000 source_region_element: DimensionElement,
1001 target_dimensions: DimensionGroup,
1002 remainder_skypix: SkyPixDimension,
1003 branches_not_in_tree: set[DimensionGroup],
1004 ) -> None:
1005 """Append an appropriate `DataIdGenerator` instance for generating
1006 data IDs with the given characteristics.
1008 Parameters
1009 ----------
1010 source_dimensions : `lsst.daf.butler.DimensionGroup`
1011 Dimensions whose data IDs can already populated, to use as a
1012 starting point.
1013 source_region_element : `lsst.daf.butler.DimensionElement`
1014 Dimension element associated with the region for the source
1015 dimensions. It is guaranteed that there is exactly one such
1016 region.
1017 target_dimensions : `lsst.daf.butler.DimensionGroup`
1018 Dimensions of the data IDs to be generated.
1019 remainder_skypix : `lsst.daf.butler.SkyPixDimension`
1020 The single skypix dimension that is being added to
1021 ``source_dimensions`` to yield ``target_dimensions``.
1022 branches_not_in_tree : `set` [ `lsst.daf.butler.DimensionGroup` ]
1023 Dimensions that have not yet been inserted into the tree. Updated
1024 in place.
1025 """
1026 target_branch = self.branches_by_dimensions[target_dimensions]
1027 # We want to do the overlap calculation without any extra dimensions
1028 # beyond the two spatial dimensions, which may or may not be what we
1029 # already have.
1030 overlap_dimensions = source_region_element.minimal_group | remainder_skypix.minimal_group
1031 generator: DataIdGenerator
1032 if overlap_dimensions == target_dimensions:
1033 if isinstance(source_region_element, SkyPixDimension):
1034 if source_region_element.system == remainder_skypix.system:
1035 if source_region_element.level > remainder_skypix.level:
1036 generator = SkyPixGatherDataIdGenerator(
1037 target_branch,
1038 target_dimensions,
1039 source_dimensions,
1040 remainder_skypix,
1041 source_region_element,
1042 )
1043 else:
1044 generator = SkyPixScatterDataIdGenerator(
1045 target_branch,
1046 target_dimensions,
1047 source_dimensions,
1048 remainder_skypix,
1049 source_region_element,
1050 )
1051 else:
1052 generator = CrossSystemDataIdGenerator(
1053 target_branch,
1054 target_dimensions,
1055 source_dimensions,
1056 remainder_skypix,
1057 source_region_element,
1058 )
1059 else:
1060 generator = DatabaseSourceDataIdGenerator(
1061 target_branch,
1062 target_dimensions,
1063 source_dimensions,
1064 remainder_skypix,
1065 source_region_element,
1066 )
1067 # We know we can populate the data IDs in remainder_skypix_branch
1068 # from the target branch by projection. Even if it's already
1069 # populated by some other generated branch, we want to populate it
1070 # again in case that picks up additional sky pixels.
1071 target_branch.branches[remainder_skypix.minimal_group] = self.branches_by_dimensions[
1072 remainder_skypix.minimal_group
1073 ]
1074 branches_not_in_tree.discard(remainder_skypix.minimal_group)
1075 else:
1076 if overlap_dimensions not in self.branches_by_dimensions:
1077 self.branches_by_dimensions[overlap_dimensions] = _DimensionGroupBranch()
1078 branches_not_in_tree.add(overlap_dimensions)
1079 self._append_data_id_generator(
1080 source_region_element.minimal_group,
1081 source_region_element,
1082 overlap_dimensions,
1083 remainder_skypix,
1084 branches_not_in_tree,
1085 )
1086 generator = JoinDataIdGenerator(
1087 target_branch,
1088 target_dimensions,
1089 source_dimensions,
1090 overlap_dimensions,
1091 )
1092 self.generators.append(generator)
1093 branches_not_in_tree.remove(target_dimensions)
1095 def _make_general_overlap_branch_generator(self, branches_not_in_tree: set[DimensionGroup]) -> bool:
1096 """Add data ID generators for sets of dimensions that can be generated
1097 via skypix envelopes of other generated data IDs.
1099 This method should be called in a loop until it returns `False`
1100 (indicating no progress was made) or ``branches_not_in_tree`` is empty
1101 (indicating no more work to be done).
1103 Parameters
1104 ----------
1105 branches_not_in_tree : `set` [ `lsst.daf.butler.DimensionGroup` ]
1106 Dimensions that have not yet been inserted into the tree. Updated
1107 in place.
1109 Returns
1110 -------
1111 appended : `bool`
1112 Whether a new data ID generator was successfully appended.
1113 """
1114 dimensions_done = sorted(self.branches_by_dimensions.keys() - branches_not_in_tree)
1115 for source_dimensions in dimensions_done:
1116 for target_dimensions in sorted(branches_not_in_tree):
1117 if not source_dimensions <= target_dimensions:
1118 continue
1119 remainder_dimensions = target_dimensions - source_dimensions
1120 if (remainder_skypix := get_single_skypix(remainder_dimensions)) is not None:
1121 if source_region_name := source_dimensions.region_dimension:
1122 # If the target dimensions are just adding a single
1123 # skypix to the source dimensions and the source
1124 # dimensions have a single region column, we can
1125 # generate the skypix indices from the envelopes of
1126 # those regions.
1127 source_region_element = source_dimensions.universe[source_region_name]
1128 self._append_data_id_generator(
1129 source_dimensions,
1130 source_region_element,
1131 target_dimensions,
1132 remainder_skypix,
1133 branches_not_in_tree,
1134 )
1135 return True
1136 return not branches_not_in_tree
1138 def _make_join_branch_generator(self, branches_not_in_tree: set[DimensionGroup]) -> bool:
1139 """Add data ID generators for sets of dimensions that can be generated
1140 via inner joints of other generated data IDs.
1142 This method should be called in a loop until it returns `False`
1143 (indicating no progress was made) or ``branches_not_in_tree`` is empty
1144 (indicating no more work to be done).
1146 Parameters
1147 ----------
1148 branches_not_in_tree : `set` [ `lsst.daf.butler.DimensionGroup` ]
1149 Dimensions that have not yet been inserted into the tree. Updated
1150 in place.
1152 Returns
1153 -------
1154 appended : `bool`
1155 Whether a new data ID generator was successfully appended.
1156 """
1157 for target_dimensions in sorted(branches_not_in_tree):
1158 dimensions_done = sorted(self.branches_by_dimensions.keys() - branches_not_in_tree)
1159 candidates_by_common: dict[DimensionGroup, tuple[DimensionGroup, DimensionGroup]] = {}
1160 for operand1, operand2 in itertools.combinations(dimensions_done, 2):
1161 if operand1.union(operand2) == target_dimensions:
1162 candidates_by_common[operand1.intersection(operand2)] = (operand1, operand2)
1163 if candidates_by_common:
1164 # Because DimensionGroup defines a set-like inequality
1165 # operator, 'max' returns the set of dimensions that contains
1166 # as many of the other sets of dimensions as possible, which is
1167 # a reasonable guess at the most-constrained join.
1168 operand1, operand2 = candidates_by_common[max(candidates_by_common)]
1169 generator = JoinDataIdGenerator(
1170 self.branches_by_dimensions[target_dimensions],
1171 target_dimensions,
1172 operand1,
1173 operand2,
1174 )
1175 self.generators.append(generator)
1176 branches_not_in_tree.remove(target_dimensions)
1177 return True
1178 return not branches_not_in_tree
1180 def project_data_ids(self, log: LsstLogAdapter) -> None:
1181 """Recursively populate the data ID sets of the dimension group tree
1182 from the data ID sets of the queryable branches.
1184 Parameters
1185 ----------
1186 log : `lsst.logging.LsstLogAdapter`
1187 Logger to use for status reporting.
1188 """
1189 for branch_dimensions, branch in self.queryable_branches.items():
1190 log.verbose("Projecting query data ID(s) to %s.", branch_dimensions)
1191 branch.project_data_ids(log)
1193 def generate_data_ids(self, log: LsstLogAdapter) -> None:
1194 """Run all data ID generators.
1196 This runs data ID generators and projects data IDs to their subset
1197 dimensions. It can only be called after queryable data IDs have been
1198 populated and dimension records fetched.
1200 Parameters
1201 ----------
1202 log : `lsst.logging.LsstLogAdapter`
1203 Logger to use for status reporting.
1204 """
1205 for generator in self.generators:
1206 generator.run(log, self.branches_by_dimensions)
1207 generator.branch.project_data_ids(log, log_indent=" ")
1209 def pprint(self, printer: Callable[[str], None] = print) -> None:
1210 """Print a human-readable representation of the dimensions tree.
1212 Parameters
1213 ----------
1214 printer : `~collections.abc.Callable`, optional
1215 A function that takes a single string argument and prints a single
1216 line (including a newline). Default is the built-in `print`
1217 function.
1218 """
1219 printer("Queryable:")
1220 for branch_dimensions, branch in self.queryable_branches.items():
1221 branch.pprint(branch_dimensions, " ", printer=printer)
1222 printer("Generator:")
1223 for generator in self.generators:
1224 generator.pprint(" ", printer=printer)
1227def get_single_skypix(dimensions: DimensionGroup) -> SkyPixDimension | None:
1228 """Try to coerce a dimension group a single skypix dimenison.
1230 Parameters
1231 ----------
1232 dimensions : `lsst.daf.butler.DimensionGroup`
1233 Input dimensions.
1235 Returns
1236 -------
1237 skypix : `lsst.daf.butler.SkyPixDimension` or `None`
1238 A skypix dimension that is the only dimension in the given group, or
1239 `None` in all other cases.
1240 """
1241 if len(dimensions) == 1:
1242 (name,) = dimensions.names
1243 return dimensions.universe.skypix_dimensions.get(name)
1244 return None
1247@dataclasses.dataclass
1248class DataIdGenerator:
1249 """A base class for generators for quantum and dataset data IDs that cannot
1250 be directly queried for.
1251 """
1253 branch: _DimensionGroupBranch
1254 """Branch of the dimensions tree that this generator populates."""
1256 dimensions: DimensionGroup
1257 """Dimensions of the data IDs generated."""
1259 source: DimensionGroup
1260 """Dimensions of another set of data IDs that this generator uses as a
1261 starting point.
1262 """
1264 def pprint(self, indent: str = " ", printer: Callable[[str], None] = print) -> None:
1265 """Print a human-readable representation of this generator.
1267 Parameters
1268 ----------
1269 indent : `str`
1270 Blank spaces to prefix the output with (useful when this is nested
1271 in hierarchical object being printed).
1272 printer : `~collections.abc.Callable`, optional
1273 A function that takes a single string argument and prints a single
1274 line (including a newline). Default is the built-in `print`
1275 function.
1276 """
1277 self.branch.pprint(
1278 self.dimensions,
1279 indent,
1280 f" <- {self.source} ({self.__class__.__name__})",
1281 printer=printer,
1282 )
1284 def run(self, log: LsstLogAdapter, branches: Mapping[DimensionGroup, _DimensionGroupBranch]) -> None:
1285 """Run the generator, populating its branch's data IDs.
1287 Parameters
1288 ----------
1289 log : `lsst.log.LsstLogAdapter`
1290 Logger with a ``verbose`` method as well as the built-in ones.
1291 branches : `~collections.abc.Mapping`
1292 Mapping of other dimension branches, keyed by their dimensions.
1293 """
1294 raise NotImplementedError()
1297@dataclasses.dataclass
1298class DatabaseSourceDataIdGenerator(DataIdGenerator):
1299 """A data ID generator that generates skypix indices from the envelope of
1300 regions stored in the database.
1301 """
1303 remainder_skypix: SkyPixDimension
1304 """A single additional skypix dimension to be added to the source
1305 dimensions.
1306 """
1308 source_element: DimensionElement
1309 """Dimension element that the database-stored regions are associated with.
1310 """
1312 def run(self, log: LsstLogAdapter, branches: Mapping[DimensionGroup, _DimensionGroupBranch]) -> None:
1313 # Docstring inherited.
1314 source_branch = branches[self.source]
1315 log.verbose(
1316 "Generating %s data IDs via %s envelope of %s %s region(s).",
1317 self.dimensions,
1318 self.remainder_skypix,
1319 len(source_branch.data_ids),
1320 self.source_element,
1321 )
1322 pixelization = self.remainder_skypix.pixelization
1323 (source_records,) = [
1324 record_set
1325 for record_set in source_branch.dimension_records
1326 if record_set.element == self.source_element
1327 ]
1328 for source_data_id in source_branch.data_ids:
1329 source_record = source_records.find(source_data_id)
1330 for begin, end in pixelization.envelope(source_record.region):
1331 for index in range(begin, end):
1332 target_data_id = DataCoordinate.standardize(
1333 source_data_id,
1334 **{self.remainder_skypix.name: index}, # type: ignore[arg-type]
1335 )
1336 self.branch.data_ids.add(target_data_id)
1339@dataclasses.dataclass
1340class CrossSystemDataIdGenerator(DataIdGenerator):
1341 """A data ID generator that generates skypix indices from the envelope of
1342 skypix regions from some other system (e.g. healpix from HTM).
1343 """
1345 remainder_skypix: SkyPixDimension
1346 """A single additional skypix dimension to be added to the source
1347 dimensions.
1348 """
1350 source_skypix: SkyPixDimension
1351 """Dimension element for the already-known skypix indices."""
1353 def run(self, log: LsstLogAdapter, branches: Mapping[DimensionGroup, _DimensionGroupBranch]) -> None:
1354 # Docstring inherited.
1355 source_branch = branches[self.source]
1356 log.verbose(
1357 "Generating %s data IDs via %s envelope of %s %s region(s).",
1358 self.dimensions,
1359 self.remainder_skypix,
1360 len(source_branch.data_ids),
1361 self.source_skypix,
1362 )
1363 source_pixelization = self.source_skypix.pixelization
1364 remainder_pixelization = self.remainder_skypix.pixelization
1365 for source_data_id in source_branch.data_ids:
1366 source_region = source_pixelization.pixel(source_data_id[self.source_skypix.name])
1367 for begin, end in remainder_pixelization.envelope(source_region):
1368 for index in range(begin, end):
1369 target_data_id = DataCoordinate.standardize(
1370 source_data_id,
1371 **{self.remainder_skypix.name: index}, # type: ignore[arg-type]
1372 )
1373 self.branch.data_ids.add(target_data_id)
1376@dataclasses.dataclass
1377class SkyPixScatterDataIdGenerator(DataIdGenerator):
1378 """A data ID generator that generates skypix indices at a high (fine) level
1379 from low-level (coarse) indices in the same system.
1380 """
1382 remainder_skypix: SkyPixDimension
1383 """A single additional skypix dimension to be added to the source
1384 dimensions.
1385 """
1387 source_skypix: SkyPixDimension
1388 """Dimension element for the already-known skypix indices."""
1390 def run(self, log: LsstLogAdapter, branches: Mapping[DimensionGroup, _DimensionGroupBranch]) -> None:
1391 # Docstring inherited.
1392 factor = 4 ** (self.remainder_skypix.level - self.source_skypix.level)
1393 source_branch = branches[self.source]
1394 log.verbose(
1395 "Generating %s data IDs by scaling %s %s IDs in %s by %s.",
1396 self.dimensions,
1397 len(source_branch.data_ids),
1398 self.remainder_skypix,
1399 self.source,
1400 factor,
1401 )
1402 for source_data_id in source_branch.data_ids:
1403 ranges = RangeSet(source_data_id[self.source_skypix.name])
1404 ranges.scale(factor)
1405 for begin, end in ranges:
1406 for index in range(begin, end):
1407 target_data_id = DataCoordinate.standardize(
1408 source_data_id,
1409 **{self.remainder_skypix.name: index}, # type: ignore[arg-type]
1410 )
1411 self.branch.data_ids.add(target_data_id)
1414@dataclasses.dataclass
1415class SkyPixGatherDataIdGenerator(DataIdGenerator):
1416 """A data ID generator that generates skypix indices at a low (coarse)
1417 level from high-level (fine) indices in the same system.
1418 """
1420 remainder_skypix: SkyPixDimension
1421 """A single additional skypix dimension to be added to the source
1422 dimensions.
1423 """
1425 source_skypix: SkyPixDimension
1426 """Dimension element for the already-known skypix indices."""
1428 def run(self, log: LsstLogAdapter, branches: Mapping[DimensionGroup, _DimensionGroupBranch]) -> None:
1429 # Docstring inherited.
1430 factor = 4 ** (self.source_skypix.level - self.remainder_skypix.level)
1431 source_branch = branches[self.source]
1432 log.verbose(
1433 "Generating %s data IDs by dividing %s %s IDs in %s by %s.",
1434 self.dimensions,
1435 len(source_branch.data_ids),
1436 self.remainder_skypix,
1437 self.source,
1438 factor,
1439 )
1440 for source_data_id in source_branch.data_ids:
1441 index = source_data_id[self.source_skypix.name] // factor
1442 target_data_id = DataCoordinate.standardize(source_data_id, **{self.remainder_skypix.name: index})
1443 self.branch.data_ids.add(target_data_id)
1446@dataclasses.dataclass
1447class JoinDataIdGenerator(DataIdGenerator):
1448 """A data ID that does an inner join between two already-populated
1449 sets of data IDs.
1450 """
1452 other: DimensionGroup
1453 """Dimensions of the other data ID branches to join to those of ``source``.
1454 """
1456 def run(self, log: LsstLogAdapter, branches: Mapping[DimensionGroup, _DimensionGroupBranch]) -> None:
1457 # Docstring inherited.
1458 source_branch = branches[self.source]
1459 other_branch = branches[self.other]
1460 log.verbose(
1461 "Generating %s data IDs by joining %s (%s) to %s (%s).",
1462 self.dimensions,
1463 self.source,
1464 len(source_branch.data_ids),
1465 self.other,
1466 len(other_branch.data_ids),
1467 )
1468 common = self.source & self.other
1469 other_by_common: defaultdict[DataCoordinate, list[DataCoordinate]] = defaultdict(list)
1470 for other_data_id in other_branch.data_ids:
1471 other_by_common[other_data_id.subset(common)].append(other_data_id)
1472 source_by_common: defaultdict[DataCoordinate, list[DataCoordinate]] = defaultdict(list)
1473 for source_data_id in source_branch.data_ids:
1474 source_by_common[source_data_id.subset(common)].append(source_data_id)
1475 for common_data_id in other_by_common.keys() & source_by_common.keys():
1476 for other_data_id in other_by_common[common_data_id]:
1477 for source_data_id in source_by_common[common_data_id]:
1478 self.branch.data_ids.add(other_data_id.union(source_data_id))