Coverage for python / lsst / daf / butler / direct_query_driver / _driver.py: 14%
457 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 08:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 08:49 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30__all__ = ("DirectQueryDriver",)
32import dataclasses
33import itertools
34import logging
35import sys
36import uuid
37from collections import defaultdict
38from collections.abc import Iterable, Iterator, Mapping, Set
39from contextlib import ExitStack
40from typing import TYPE_CHECKING, Any, TypeVar, cast, overload
42import sqlalchemy
44from .. import ddl
45from .._collection_type import CollectionType
46from .._dataset_type import DatasetType
47from .._exceptions import InvalidQueryError
48from ..dimensions import DataCoordinate, DataIdValue, DimensionElement, DimensionGroup, DimensionUniverse
49from ..dimensions.record_cache import DimensionRecordCache
50from ..queries import tree as qt
51from ..queries.driver import (
52 DataCoordinateResultPage,
53 DatasetRefResultPage,
54 DimensionRecordResultPage,
55 GeneralResultPage,
56 QueryDriver,
57 ResultPage,
58)
59from ..queries.predicate_constraints_summary import PredicateConstraintsSummary
60from ..queries.result_specs import (
61 DataCoordinateResultSpec,
62 DatasetRefResultSpec,
63 DimensionRecordResultSpec,
64 GeneralResultSpec,
65 ResultSpec,
66)
67from ..registry import CollectionSummary, NoDefaultCollectionError
68from ..registry.interfaces import ChainedCollectionRecord, CollectionRecord
69from ..registry.managers import RegistryManagerInstances
70from ..registry.wildcards import CollectionWildcard
71from ._postprocessing import Postprocessing
72from ._query_analysis import (
73 QueryCollectionAnalysis,
74 QueryFindFirstAnalysis,
75 QueryJoinsAnalysis,
76 QueryTreeAnalysis,
77 ResolvedDatasetSearch,
78)
79from ._query_builder import QueryBuilder, SingleSelectQueryBuilder, UnionQueryBuilder
80from ._result_page_converter import (
81 DataCoordinateResultPageConverter,
82 DatasetRefResultPageConverter,
83 DimensionRecordResultPageConverter,
84 GeneralResultPageConverter,
85 ResultPageConverter,
86 ResultPageConverterContext,
87)
88from ._sql_builders import SqlJoinsBuilder, SqlSelectBuilder, make_table_spec
89from ._sql_column_visitor import SqlColumnVisitor
91if TYPE_CHECKING:
92 from ..registry.interfaces import Database
95_LOG = logging.getLogger(__name__)
97_T = TypeVar("_T", bound=str | qt.AnyDatasetType)
100class DirectQueryDriver(QueryDriver):
101 """The `QueryDriver` implementation for `DirectButler`.
103 Parameters
104 ----------
105 db : `Database`
106 Abstraction for the SQL database.
107 universe : `DimensionUniverse`
108 Definitions of all dimensions.
109 managers : `RegistryManagerInstances`
110 Struct of registry manager objects.
111 dimension_record_cache : `DimensionRecordCache`
112 Cache of dimension records for infrequently-changing, commonly-used
113 dimensions.
114 default_collections : `~collections.abc.Sequence` [ `str` ]
115 Default collection search path.
116 default_data_id : `DataCoordinate`
117 Default governor dimension values.
118 raw_page_size : `int`, optional
119 Number of database rows to fetch for each result page. The actual
120 number of rows in a page may be smaller due to postprocessing.
121 constant_rows_limit : `int`, optional
122 Maximum number of uploaded rows to include in queries via
123 `Database.constant_rows`; above this limit a temporary table is used
124 instead.
125 postprocessing_filter_factor : `int`, optional
126 The number of database rows we expect to have to fetch to yield a
127 single output row for queries that involve postprocessing. This is
128 purely a performance tuning parameter that attempts to balance between
129 fetching too much and requiring multiple fetches; the true value is
130 highly dependent on the actual query.
131 """
133 def __init__(
134 self,
135 db: Database,
136 universe: DimensionUniverse,
137 managers: RegistryManagerInstances,
138 dimension_record_cache: DimensionRecordCache,
139 default_collections: Iterable[str],
140 default_data_id: DataCoordinate,
141 # Increasing raw_page_size increases memory usage for queries on
142 # Butler server, so if you increase this you may need to increase the
143 # memory allocation for the server in Phalanx as well.
144 raw_page_size: int = 2000,
145 constant_rows_limit: int = 1000,
146 postprocessing_filter_factor: int = 10,
147 ):
148 self.db = db
149 self.managers = managers
150 self._dimension_record_cache = dimension_record_cache
151 self._universe = universe
152 self._default_collections = tuple(default_collections)
153 self._default_data_id = default_data_id
154 self._materializations: dict[qt.MaterializationKey, _MaterializationState] = {}
155 self._upload_tables: dict[qt.DataCoordinateUploadKey, sqlalchemy.FromClause] = {}
156 self._exit_stack: ExitStack | None = None
157 self._raw_page_size = raw_page_size
158 self._postprocessing_filter_factor = postprocessing_filter_factor
159 self._constant_rows_limit = min(constant_rows_limit, db.get_constant_rows_max())
160 self._cursors: set[_Cursor] = set()
162 def __enter__(self) -> None:
163 self._exit_stack = ExitStack()
164 # It might be nice to defer opening a transaction here until first use
165 # to reduce the time spent in transactions. But it's worth noting that
166 # this is the default low-level behavior of the Python SQLite driver,
167 # and it makes it incredibly prone to deadlocks. We might be okay
168 # here, because Query doesn't do true write operations - just temp
169 # table writes - but I'm not confident that's enough to make delayed
170 # transaction starts safe against deadlocks, and it'd be more
171 # complicated to implement anyway.
172 #
173 # We start a transaction rather than just opening a connection to make
174 # temp table and cursors work with pg_bouncer transaction affinity.
175 self._exit_stack.enter_context(self.db.transaction(for_temp_tables=True))
177 def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
178 assert self._exit_stack is not None
179 self._materializations.clear()
180 self._upload_tables.clear()
181 # Transfer open cursors' close methods to exit stack, this will help
182 # with the cleanup in case a cursor raises an exceptions on close.
183 for cursor in self._cursors:
184 self._exit_stack.push(cursor.close)
185 self._exit_stack.__exit__(exc_type, exc_value, traceback)
186 self._cursors = set()
187 self._exit_stack = None
189 @property
190 def universe(self) -> DimensionUniverse:
191 return self._universe
193 @overload
194 def execute( 194 ↛ exitline 194 didn't return from function 'execute' because
195 self, result_spec: DataCoordinateResultSpec, tree: qt.QueryTree
196 ) -> Iterator[DataCoordinateResultPage]: ...
198 @overload
199 def execute( 199 ↛ exitline 199 didn't return from function 'execute' because
200 self, result_spec: DimensionRecordResultSpec, tree: qt.QueryTree
201 ) -> Iterator[DimensionRecordResultPage]: ...
203 @overload
204 def execute( 204 ↛ exitline 204 didn't return from function 'execute' because
205 self, result_spec: DatasetRefResultSpec, tree: qt.QueryTree
206 ) -> Iterator[DatasetRefResultPage]: ...
208 @overload
209 def execute(self, result_spec: GeneralResultSpec, tree: qt.QueryTree) -> Iterator[GeneralResultPage]: ... 209 ↛ exitline 209 didn't return from function 'execute' because
211 def execute(self, result_spec: ResultSpec, tree: qt.QueryTree) -> Iterator[ResultPage]:
212 # Docstring inherited.
213 if self._exit_stack is None:
214 raise RuntimeError("QueryDriver context must be entered before queries can be executed.")
215 builder = self.build_query(
216 tree,
217 final_columns=result_spec.get_result_columns(),
218 order_by=result_spec.order_by,
219 find_first_dataset=result_spec.find_first_dataset,
220 allow_duplicate_overlaps=result_spec.allow_duplicate_overlaps,
221 )
222 sql_select, sql_columns = builder.finish_select()
223 if result_spec.order_by:
224 visitor = SqlColumnVisitor(sql_columns, self)
225 sql_select = sql_select.order_by(*[visitor.expect_scalar(term) for term in result_spec.order_by])
226 if result_spec.limit is not None:
227 if builder.postprocessing:
228 builder.postprocessing.limit = result_spec.limit
229 else:
230 sql_select = sql_select.limit(result_spec.limit)
231 if builder.postprocessing.limit is not None:
232 # We might want to fetch many fewer rows than the default page
233 # size if we have to implement limit in postprocessing.
234 raw_page_size = min(
235 self._postprocessing_filter_factor * builder.postprocessing.limit,
236 self._raw_page_size,
237 )
238 else:
239 raw_page_size = self._raw_page_size
240 # Execute the query by initializing a _Cursor object that manages the
241 # lifetime of the result.
242 cursor = _Cursor(
243 self.db,
244 sql_select,
245 postprocessing=builder.postprocessing,
246 raw_page_size=raw_page_size,
247 page_converter=self._create_result_page_converter(result_spec, builder.final_columns),
248 )
249 # Since this function isn't a context manager and the caller could stop
250 # iterating before we retrieve all the results, we have to track open
251 # cursors to ensure we can close them as part of higher-level cleanup.
252 self._cursors.add(cursor)
254 # Return the iterator as a separate function, so that all the code
255 # above runs immediately instead of later when we first read from the
256 # iterator. This ensures that any exceptions that are triggered during
257 # set-up for this query occur immediately.
258 return self._read_results(cursor)
260 def _read_results(self, cursor: _Cursor) -> Iterator[ResultPage]:
261 """Read out all of the result pages from the database."""
262 try:
263 while (result_page := cursor.next()) is not None:
264 yield result_page
265 finally:
266 self._cursors.discard(cursor)
267 cursor.close()
269 def _create_result_page_converter(self, spec: ResultSpec, columns: qt.ColumnSet) -> ResultPageConverter:
270 context = ResultPageConverterContext(
271 db=self.db,
272 column_order=columns.get_column_order(),
273 dimension_record_cache=self._dimension_record_cache,
274 )
275 match spec:
276 case DimensionRecordResultSpec():
277 return DimensionRecordResultPageConverter(spec, context)
278 case DataCoordinateResultSpec():
279 return DataCoordinateResultPageConverter(spec, context)
280 case DatasetRefResultSpec():
281 return DatasetRefResultPageConverter(
282 spec, self.get_dataset_type(spec.dataset_type_name), context
283 )
284 case GeneralResultSpec():
285 return GeneralResultPageConverter(spec, context)
286 case _:
287 raise NotImplementedError(f"Result type '{spec.result_type}' not yet implemented")
289 def materialize(
290 self,
291 tree: qt.QueryTree,
292 dimensions: DimensionGroup,
293 datasets: frozenset[str],
294 allow_duplicate_overlaps: bool = False,
295 key: qt.MaterializationKey | None = None,
296 ) -> qt.MaterializationKey:
297 # Docstring inherited.
298 if self._exit_stack is None:
299 raise RuntimeError("QueryDriver context must be entered before 'materialize' is called.")
300 plan = self.build_query(
301 tree, qt.ColumnSet(dimensions), allow_duplicate_overlaps=allow_duplicate_overlaps
302 )
303 # Current implementation ignores 'datasets' aside from remembering
304 # them, because figuring out what to put in the temporary table for
305 # them is tricky, especially if calibration collections are involved.
306 # That's okay because:
307 #
308 # - the query whose results we materialize includes the dataset
309 # searches as constraints;
310 #
311 # - we still (in Query.materialize) join the dataset searches back in
312 # anyway, and given materialized data IDs the join to the dataset
313 # search is straightforward and definitely well-indexed, and not much
314 # (if at all) worse than joining back in on a materialized UUID.
315 #
316 sql_select, _ = plan.finish_select(return_columns=False)
317 table = self._exit_stack.enter_context(
318 self.db.temporary_table(
319 make_table_spec(plan.final_columns, self.db, plan.postprocessing, make_indices=True)
320 )
321 )
322 self.db.insert(table, select=sql_select)
323 if key is None:
324 key = uuid.uuid4()
325 self._materializations[key] = _MaterializationState(table, datasets, plan.postprocessing)
326 return key
328 def upload_data_coordinates(
329 self,
330 dimensions: DimensionGroup,
331 rows: Iterable[tuple[DataIdValue, ...]],
332 key: qt.DataCoordinateUploadKey | None = None,
333 ) -> qt.DataCoordinateUploadKey:
334 # Docstring inherited.
335 if self._exit_stack is None:
336 raise RuntimeError(
337 "QueryDriver context must be entered before 'upload_data_coordinates' is called."
338 )
339 columns = qt.ColumnSet(dimensions).drop_implied_dimension_keys()
340 table_spec = ddl.TableSpec(
341 [columns.get_column_spec(logical_table, field).to_sql_spec() for logical_table, field in columns]
342 )
343 dict_rows: list[dict[str, Any]]
344 if not columns:
345 table_spec.fields.add(
346 ddl.FieldSpec(
347 SqlSelectBuilder.EMPTY_COLUMNS_NAME,
348 dtype=SqlSelectBuilder.EMPTY_COLUMNS_TYPE,
349 nullable=True,
350 )
351 )
352 dict_rows = [{SqlSelectBuilder.EMPTY_COLUMNS_NAME: None}]
353 else:
354 dict_rows = [dict(zip(dimensions.required, values)) for values in rows]
355 from_clause: sqlalchemy.FromClause
356 if self.db.supports_temporary_tables and len(dict_rows) > self._constant_rows_limit:
357 from_clause = self._exit_stack.enter_context(self.db.temporary_table(table_spec))
358 self.db.insert(from_clause, *dict_rows)
359 else:
360 from_clause = self.db.constant_rows(table_spec.fields, *dict_rows)
361 if key is None:
362 key = uuid.uuid4()
363 self._upload_tables[key] = from_clause
364 return key
366 def count(
367 self,
368 tree: qt.QueryTree,
369 result_spec: ResultSpec,
370 *,
371 exact: bool,
372 discard: bool,
373 ) -> int:
374 # Docstring inherited.
375 columns = result_spec.get_result_columns()
376 builder = self.build_query(tree, columns, find_first_dataset=result_spec.find_first_dataset)
377 if not all(d.collection_records for d in builder.joins_analysis.datasets.values()):
378 return 0
379 # No need to do similar check on
380 if not exact:
381 builder.postprocessing = Postprocessing()
382 if builder.postprocessing:
383 if not discard:
384 raise InvalidQueryError("Cannot count query rows exactly without discarding them.")
385 sql_select, _ = builder.finish_select(return_columns=False)
386 builder.postprocessing.limit = result_spec.limit
387 n = 0
388 with self.db.query(sql_select.execution_options(yield_per=self._raw_page_size)) as results:
389 for _ in builder.postprocessing.apply(results):
390 n += 1
391 return n
392 # If the query has DISTINCT, GROUP BY, or UNION [ALL], nest it in a
393 # subquery so we count deduplicated rows.
394 select_builder = builder.finish_nested()
395 # Replace the columns of the query with just COUNT(*).
396 select_builder.columns = qt.ColumnSet(self._universe.empty)
397 select_builder.joins.special.clear()
398 count_func: sqlalchemy.ColumnElement[int] = sqlalchemy.func.count()
399 select_builder.joins.special["_ROWCOUNT"] = count_func
400 # Render and run the query.
401 sql_select = select_builder.select(builder.postprocessing)
402 with self.db.query(sql_select) as result:
403 count = cast(int, result.scalar())
404 if result_spec.limit is not None:
405 count = min(count, result_spec.limit)
406 return count
408 def any(self, tree: qt.QueryTree, *, execute: bool, exact: bool) -> bool:
409 # Docstring inherited.
410 builder = self.build_query(tree, qt.ColumnSet(tree.dimensions), allow_duplicate_overlaps=True)
411 if not all(d.collection_records for d in builder.joins_analysis.datasets.values()):
412 return False
413 if not execute:
414 if exact:
415 raise InvalidQueryError("Cannot obtain exact result for 'any' without executing.")
416 return True
417 if builder.postprocessing and exact:
418 sql_select, _ = builder.finish_select(return_columns=False)
419 with self.db.query(
420 sql_select.execution_options(yield_per=self._postprocessing_filter_factor)
421 ) as result:
422 for _ in builder.postprocessing.apply(result):
423 return True
424 return False
425 sql_select, _ = builder.finish_select()
426 with self.db.query(sql_select.limit(1)) as result:
427 return result.first() is not None
429 def explain_no_results(self, tree: qt.QueryTree, execute: bool) -> Iterable[str]:
430 # Docstring inherited.
431 plan = self.build_query(tree, qt.ColumnSet(tree.dimensions), analyze_only=True)
432 if plan.joins_analysis.messages or not execute:
433 return plan.joins_analysis.messages
434 # TODO: guess at ways to split up query that might fail or succeed if
435 # run separately, execute them with LIMIT 1 and report the results.
436 return []
438 def get_dataset_type(self, name: str) -> DatasetType:
439 # Docstring inherited
440 return self.managers.datasets.get_dataset_type(name)
442 def get_default_collections(self) -> tuple[str, ...]:
443 # Docstring inherited.
444 if not self._default_collections:
445 raise NoDefaultCollectionError("No collections provided and no default collections.")
446 return self._default_collections
448 def build_query(
449 self,
450 tree: qt.QueryTree,
451 final_columns: qt.ColumnSet,
452 *,
453 order_by: Iterable[qt.OrderExpression] = (),
454 find_first_dataset: str | qt.AnyDatasetType | None = None,
455 analyze_only: bool = False,
456 allow_duplicate_overlaps: bool = False,
457 ) -> QueryBuilder:
458 """Convert a query description into a nearly-complete builder object
459 for the SQL version of that query.
461 Parameters
462 ----------
463 tree : `.queries.tree.QueryTree`
464 Description of the joins and row filters in the query.
465 final_columns : `.queries.tree.ColumnSet`
466 Final output columns that should be emitted by the SQL query.
467 order_by : `~collections.abc.Iterable` [ \
468 `.queries.tree.OrderExpression` ], optional
469 Column expressions to sort by.
470 find_first_dataset : `str`, ``ANY_DATASET``, or `None`, optional
471 Name of a dataset type for which only one result row for each data
472 ID should be returned, with the collections searched in order.
473 ``ANY_DATASET`` is used to represent the search for all dataset
474 types with a particular set of dimensions in ``tree.any_dataset``.
475 analyze_only : `bool`, optional
476 If `True`, perform the initial analysis needed to construct the
477 builder, but do not call methods that build its SQL form. This can
478 be useful for obtaining diagnostic information about the query that
479 would be generated.
480 allow_duplicate_overlaps : `bool`, optional
481 If set to `True` then query will be allowed to generate
482 non-distinct rows for spatial overlaps.
484 Returns
485 -------
486 builder : `QueryBuilder`
487 An object that contains an analysis of the queries columns and
488 general structure, SQLAlchemy representations of most of the
489 constructed query, and a description of Python-side postprocessing
490 to be performed after executing it. Callers should generally only
491 have to call `finish_select` or `finish_nested`; all ``apply``
492 methods will have already been called.
494 Notes
495 -----
496 The SQL queries produced by this driver are built in three steps:
498 - First we "analyze" the entire query, fetching extra information as
499 needed to identify the tables we'll join in, the columns we'll have
500 at each level of what may be a nested SELECT, and the degree to which
501 we'll have to use GROUP BY / DISTINCT or window functions to obtain
502 just the rows we want. This process initializes a `QueryBuilder`
503 instance, but does not call any of its ``apply*`` methods to actually
504 build the SQLAlchemy version of the query, and it is all that is done
505 when ``analyze_only=True``.
506 - In the next step we call `QueryBuilder` ``apply*`` methods to mutate
507 and/or replace the `QueryBuilder` object's nested `SqlSelectBuilder`
508 instances to reflect that analysis, building the SQL from the inside
509 out (subqueries before their parents). This is also done by the
510 `build_query` method by default
511 - The returned builder can be used by calling either `finish_select`
512 (to get the full executable query) or `finish_nested` (to wrap
513 that query as a subquery - if needed - in order to do related
514 queries, like ``COUNT(*)`` or ``LIMIT 1`` checks).
516 Within both the analysis and application steps, we further split the
517 query *structure* and building process into three stages, mapping
518 roughly to levels of (potential) subquery nesting:
520 - In the "joins" stage, we join all the tables we need columns or
521 constraints from and apply the predicate as a WHERE clause. All
522 non-calculated columns are included in the query at this stage.
523 - In the optional "projection" stage, we apply a GROUP BY or DISTINCT
524 to reduce the set of columns and/or rows, and in some cases add new
525 calculated columns.
526 - In the optional "find first" stage, we use a common table expression
527 with PARTITION ON to search for datasets in a sequence of collections
528 in the order those collections were provided.
530 In addition, there are two different overall structures for butler
531 queries, modeled as two different `QueryBuilder` subclasses.
533 - `SingleSelectQueryBuilder` produces a single possibly-nested SELECT
534 query structured directly according to the stages above. This is
535 used for almost all queries - specifically whenever
536 `QueryTree.any_dataset` is `None`.
537 - `UnionQueryBuilder` implements queries for datasets of multiple types
538 with the same dimensions as a UNION ALL combination of SELECTs, in
539 which each component SELECT has the joins/projection/find-first
540 structure. This cannot be implemented as a sequence of
541 `SingleSelectQueryBuilder` objects, however, since the need for the
542 terms to share a single `Postprocessing` object and column list means
543 that they are not independent.
545 The `build_query` method delegates to methods of the `QueryBuilder`
546 object when "applying" to let them handle these differences. These
547 often call back to other methods on the driver (so code shared by both
548 builders mostly lives in the driver class).
549 """
550 # Analyze the dimensions, dataset searches, and other join operands
551 # that will go into the query. This also initializes a
552 # SqlSelectBuilder and Postprocessing with spatial/temporal constraints
553 # potentially transformed by the dimensions manager (but none of the
554 # rest of the analysis reflected in that SqlSelectBuilder).
555 query_tree_analysis = self._analyze_query_tree(tree, allow_duplicate_overlaps)
556 # The "projection" columns differ from the final columns by not
557 # omitting any dimension keys (this keeps queries for different result
558 # types more similar during construction), including any columns needed
559 # only by order_by terms, and including the collection key if we need
560 # it for GROUP BY or DISTINCT.
561 projection_columns = final_columns.copy()
562 projection_columns.restore_dimension_keys()
563 for term in order_by:
564 term.gather_required_columns(projection_columns)
565 # There are two kinds of query builders: simple SELECTS and UNIONs
566 # over dataset types.
567 builder: QueryBuilder
568 if tree.any_dataset is not None:
569 builder = UnionQueryBuilder(
570 query_tree_analysis,
571 union_dataset_dimensions=tree.any_dataset.dimensions,
572 projection_columns=projection_columns,
573 final_columns=final_columns,
574 find_first_dataset=find_first_dataset,
575 )
576 else:
577 assert find_first_dataset != qt.AnyDatasetType.ANY_DATASET
578 builder = SingleSelectQueryBuilder(
579 tree_analysis=query_tree_analysis,
580 projection_columns=projection_columns,
581 final_columns=final_columns,
582 find_first_dataset=find_first_dataset,
583 )
584 # Finish setting up the projection part of the builder.
585 builder.analyze_projection()
586 # The joins-stage query also needs to include all columns needed by the
587 # downstream projection query. Note that this:
588 # - never adds new dimensions to the joins stage (since those are
589 # always a superset of the projection-stage dimensions);
590 # - does not affect our previous determination of
591 # needs_dataset_distinct, because any dataset fields being added to
592 # the joins stage here are already in the projection.
593 builder.joins_analysis.columns.update(builder.projection_columns)
594 # Set up the find-first part of the builder.
595 if find_first_dataset is not None:
596 builder.analyze_find_first()
597 # At this point, analysis is complete, and we can proceed to making
598 # the select_builder(s) reflect that analysis.
599 if not analyze_only:
600 builder.apply_joins(self)
601 builder.apply_projection(self, order_by)
602 builder.apply_find_first(self)
603 return builder
605 def _analyze_query_tree(self, tree: qt.QueryTree, allow_duplicate_overlaps: bool) -> QueryTreeAnalysis:
606 """Analyze a `.queries.tree.QueryTree` as the first step in building
607 a SQL query.
609 Parameters
610 ----------
611 tree : `.queries.tree.QueryTree`
612 Description of the joins and row filters in the query.
613 allow_duplicate_overlaps : `bool`, optional
614 If set to `True` then query will be allowed to generate
615 non-distinct rows for spatial overlaps.
617 Returns
618 -------
619 tree_analysis : `QueryTreeAnalysis`
620 Struct containing additional information need to build the joins
621 stage of a query.
623 Notes
624 -----
625 See `build_query` for the complete picture of how SQL queries are
626 constructed. This method is the very first step for all queries.
628 The fact that this method returns both a QueryPlan and an initial
629 SqlSelectBuilder (rather than just a QueryPlan) is a tradeoff that lets
630 DimensionRecordStorageManager.process_query_overlaps (which is called
631 by the `_analyze_query_tree` call below) pull out overlap expressions
632 from the predicate at the same time it turns them into SQL table joins
633 (in the builder).
634 """
635 # Fetch the records and summaries for any collections we might be
636 # searching for datasets and organize them for the kind of lookups
637 # we'll do later.
638 collection_analysis = self._analyze_collections(tree)
639 # Extract the data ID implied by the predicate; we can use the governor
640 # dimensions in that to constrain the collections we search for
641 # datasets later.
642 predicate_constraints = PredicateConstraintsSummary(tree.predicate)
643 # Use the default data ID to apply additional constraints where needed.
644 predicate_constraints.apply_default_data_id(
645 self._default_data_id,
646 tree.dimensions,
647 validate_governor_constraints=tree.validateGovernorConstraints,
648 )
649 predicate = predicate_constraints.predicate
650 # Delegate to the dimensions manager to rewrite the predicate and start
651 # a SqlSelectBuilder to cover any spatial overlap joins or constraints.
652 # We'll return that SqlSelectBuilder (or copies of it) at the end.
653 (
654 predicate,
655 select_builder,
656 postprocessing,
657 ) = self.managers.dimensions.process_query_overlaps(
658 tree.dimensions,
659 predicate,
660 tree.get_joined_dimension_groups(),
661 collection_analysis.calibration_dataset_types,
662 allow_duplicate_overlaps,
663 predicate_constraints.constraint_data_id,
664 )
665 # Initialize the plan we're return at the end of the method.
666 joins = QueryJoinsAnalysis(predicate=predicate, columns=select_builder.columns)
667 joins.messages.extend(predicate_constraints.messages)
668 # Add columns required by postprocessing.
669 postprocessing.gather_columns_required(joins.columns)
670 # Add materializations, which can also bring in more postprocessing.
671 for m_key, m_dimensions in tree.materializations.items():
672 m_state = self._materializations[m_key]
673 joins.materializations[m_key] = m_dimensions
674 # When a query is materialized, the new tree has an empty
675 # (trivially true) predicate because the original was used to make
676 # the materialized rows. But the original postprocessing isn't
677 # executed when the materialization happens, so we have to include
678 # it here.
679 postprocessing.spatial_join_filtering.extend(m_state.postprocessing.spatial_join_filtering)
680 postprocessing.spatial_where_filtering.extend(m_state.postprocessing.spatial_where_filtering)
681 postprocessing.spatial_expression_filtering.extend(
682 m_state.postprocessing.spatial_expression_filtering
683 )
684 # Add data coordinate uploads.
685 joins.data_coordinate_uploads.update(tree.data_coordinate_uploads)
686 # Add dataset_searches and filter out collections that don't have the
687 # right dataset type or governor dimensions. We re-resolve dataset
688 # searches now that we have a constraint data ID.
689 for dataset_type_name, dataset_search in tree.datasets.items():
690 resolved_dataset_search = self._resolve_dataset_search(
691 dataset_type_name,
692 dataset_search,
693 predicate_constraints.constraint_data_id,
694 collection_analysis.summaries_by_dataset_type[dataset_type_name],
695 )
696 if resolved_dataset_search.dimensions != self.get_dataset_type(dataset_type_name).dimensions:
697 # This is really for server-side defensiveness; it's hard to
698 # imagine the query getting different dimensions for a dataset
699 # type in two calls to the same query driver.
700 raise InvalidQueryError(
701 f"Incorrect dimensions {resolved_dataset_search.dimensions} for dataset "
702 f"{dataset_type_name!r} in query "
703 f"(vs. {self.get_dataset_type(dataset_type_name).dimensions})."
704 )
705 joins.datasets[dataset_type_name] = resolved_dataset_search
706 if not resolved_dataset_search.collection_records:
707 joins.messages.append(
708 f"Search for dataset type {resolved_dataset_search.name!r} in "
709 f"{list(dataset_search.collections)} is doomed to fail."
710 )
711 joins.messages.extend(resolved_dataset_search.messages)
712 # Process the special any_dataset search, if there is one. This entails
713 # making a modified copy of the plan for each distinct post-filtering
714 # collection search path.
715 if tree.any_dataset is None:
716 return QueryTreeAnalysis(
717 joins, union_datasets=[], initial_select_builder=select_builder, postprocessing=postprocessing
718 )
719 union_datasets = self._resolve_union_datasets(tree.any_dataset.dimensions, collection_analysis)
720 return QueryTreeAnalysis(
721 joins,
722 union_datasets=union_datasets,
723 initial_select_builder=select_builder,
724 postprocessing=postprocessing,
725 )
727 def _resolve_union_datasets(
728 self, dimensions: DimensionGroup, collection_analysis: QueryCollectionAnalysis
729 ) -> list[ResolvedDatasetSearch[list[str]]]:
730 """Resolve searches for union datasets.
732 Parameters
733 ----------
734 dimensions : `DimensionGroup`
735 Dimensions of the union dataset types in this query.
736 collection_analysis : `CollectionAnalysis`
737 Information about the collections appearing in this collection.
739 Returns
740 -------
741 searches : `list` [ `ResolvedDatasetSearch` ]
742 Resolved dataset searches for all union dataset types with these
743 dimensions. Each item in the list groups dataset types with the
744 same collection search path.
745 """
746 # Gather the filtered collection search path for each union dataset
747 # type.
748 collections_by_dataset_type = defaultdict[str, list[str]](list)
749 for collection_record, collection_summary in collection_analysis.summaries_by_dataset_type[
750 qt.ANY_DATASET
751 ]:
752 for dataset_type in collection_summary.dataset_types:
753 if dataset_type.dimensions == dimensions:
754 collections_by_dataset_type[dataset_type.name].append(collection_record.name)
755 # Reverse the lookup order on the mapping we just made to group
756 # dataset types by their collection search path. Each such group
757 # yields a term in a union query builder
758 dataset_searches_by_collections: dict[tuple[str, ...], ResolvedDatasetSearch[list[str]]] = {}
759 for dataset_type_name, collection_path in collections_by_dataset_type.items():
760 key = tuple(collection_path)
761 if (resolved_search := dataset_searches_by_collections.get(key)) is None:
762 resolved_search = ResolvedDatasetSearch[list[str]](
763 [],
764 dimensions=dimensions,
765 collection_records=[
766 collection_analysis.collection_records[collection_name]
767 for collection_name in collection_path
768 ],
769 messages=[],
770 )
771 resolved_search.is_calibration_search = any(
772 r.type is CollectionType.CALIBRATION for r in resolved_search.collection_records
773 )
774 dataset_searches_by_collections[key] = resolved_search
775 resolved_search.name.append(dataset_type_name)
776 return list(dataset_searches_by_collections.values())
778 def apply_initial_query_joins(
779 self,
780 select_builder: SqlSelectBuilder,
781 joins_analysis: QueryJoinsAnalysis,
782 union_dataset_dimensions: DimensionGroup | None,
783 ) -> None:
784 """Apply most of the "joins" stage of query construction to a single
785 SELECT.
787 This method is expected to be invoked by `QueryBuilder.apply_joins`
788 implementations. It handles all tables and subqueries in the FROM
789 clause, except:
791 - the `QueryTree.any_dataset` search (handled by `UnionQueryBuilder`
792 directly);
793 - joins of dimension tables needed only for their keys (handled by
794 `apply_missing_dimension_joins`).
796 Parameters
797 ----------
798 select_builder : `SqlSelectBuilder`
799 Low-level SQL builder for a single SELECT term, modified in place.
800 joins_analysis : `QueryJoinsAnalysis`
801 Information about the joins stage of query construction.
802 union_dataset_dimensions : `DimensionGroup` or `None`
803 Dimensions of the union dataset types, or `None` if this is not
804 a union dataset query.
805 """
806 # Process data coordinate upload joins.
807 for upload_key, upload_dimensions in joins_analysis.data_coordinate_uploads.items():
808 select_builder.joins.join(
809 SqlJoinsBuilder(db=self.db, from_clause=self._upload_tables[upload_key]).extract_dimensions(
810 upload_dimensions.required
811 )
812 )
813 # Process materialization joins. We maintain a set of dataset types
814 # that were included in a materialization; searches for these datasets
815 # can be dropped if they are only present to provide a constraint on
816 # data IDs, since that's already embedded in a materialization.
817 materialized_datasets: set[str] = set()
818 for materialization_key, materialization_dimensions in joins_analysis.materializations.items():
819 materialized_datasets.update(
820 self._join_materialization(
821 select_builder.joins, materialization_key, materialization_dimensions
822 )
823 )
824 # Process dataset joins (not including any union dataset). Datasets
825 # searches included in materialization can be skipped unless we need
826 # something from their tables.
827 materialized_datasets = set()
828 for m_state in self._materializations.values():
829 materialized_datasets.update(m_state.datasets)
830 for dataset_type_name, dataset_search in joins_analysis.datasets.items():
831 if (
832 dataset_type_name not in materialized_datasets
833 or dataset_type_name in select_builder.columns.dataset_fields
834 ):
835 self.join_dataset_search(
836 select_builder.joins,
837 dataset_search,
838 joins_analysis.columns.dataset_fields[dataset_search.name],
839 )
840 # Join in dimension element tables that we know we need relationships
841 # or columns from.
842 for element in joins_analysis.iter_mandatory(union_dataset_dimensions):
843 select_builder.joins.join(
844 self.managers.dimensions.make_joins_builder(
845 element, joins_analysis.columns.dimension_fields[element.name]
846 )
847 )
849 def apply_missing_dimension_joins(
850 self, select_builder: SqlSelectBuilder, joins_analysis: QueryJoinsAnalysis
851 ) -> None:
852 """Apply dimension-table joins to a single SQL SELECT builder to ensure
853 the full set of desired dimension keys is preset.
855 This method is expected to be invoked by `QueryBuilder.apply_joins`
856 implementations.
858 Parameters
859 ----------
860 select_builder : `SqlSelectBuilder`
861 Low-level SQL builder for a single SELECT term, modified in place.
862 joins_analysis : `QueryJoinsAnalysis`
863 Information about the joins stage of query construction.
864 """
865 # See if any dimension keys are still missing, and if so join in
866 # their tables. Note that we know there are no fields needed from
867 # these.
868 while not (select_builder.joins.dimension_keys.keys() >= joins_analysis.columns.dimensions.names):
869 # Look for opportunities to join in multiple dimensions via
870 # single table, to reduce the total number of tables joined in.
871 missing_dimension_names = (
872 joins_analysis.columns.dimensions.names - select_builder.joins.dimension_keys.keys()
873 )
874 best = self._universe[
875 max(
876 missing_dimension_names,
877 key=lambda name: len(self._universe[name].dimensions.names & missing_dimension_names),
878 )
879 ]
880 to_join = self.managers.dimensions.make_joins_builder(best, frozenset())
881 select_builder.joins.join(to_join)
882 # Add the WHERE clause to the builder.
883 select_builder.joins.where(
884 joins_analysis.predicate.visit(SqlColumnVisitor(select_builder.joins, self))
885 )
887 def project_spatial_join_filtering(
888 self,
889 columns: qt.ColumnSet,
890 postprocessing: Postprocessing,
891 select_builders: Iterable[SqlSelectBuilder],
892 ) -> None:
893 """Transform spatial join postprocessing into expressions that can be
894 OR'd together via an aggregate function in a GROUP BY.
896 This only affects spatial join constraints involving region columns
897 whose dimensions are being projected away.
899 Parameters
900 ----------
901 columns : `.queries.tree.ColumnSet`
902 Columns that will be included in the final query.
903 postprocessing : `Postprocessing`
904 Object that describes post-query processing; modified in place.
905 select_builders : `~collections.abc.Iterable` [ `SqlSelectBuilder` ]
906 SQL Builder objects to be modified in place.
907 """
908 kept: list[tuple[DimensionElement, DimensionElement]] = []
909 for a, b in postprocessing.spatial_join_filtering:
910 if a.name not in columns.dimensions.elements or b.name not in columns.dimensions.elements:
911 expr_name = f"_{a}_OVERLAPS_{b}"
912 postprocessing.spatial_expression_filtering.append(expr_name)
913 for select_builder in select_builders:
914 expr = sqlalchemy.cast(
915 sqlalchemy.cast(
916 select_builder.joins.fields[a.name]["region"], type_=sqlalchemy.String
917 )
918 + sqlalchemy.literal("&", type_=sqlalchemy.String)
919 + sqlalchemy.cast(
920 select_builder.joins.fields[b.name]["region"], type_=sqlalchemy.String
921 ),
922 type_=sqlalchemy.LargeBinary,
923 )
924 select_builder.joins.special[expr_name] = expr
925 else:
926 kept.append((a, b))
927 postprocessing.spatial_join_filtering = kept
929 def apply_query_projection(
930 self,
931 select_builder: SqlSelectBuilder,
932 postprocessing: Postprocessing,
933 *,
934 join_datasets: Mapping[str, ResolvedDatasetSearch[str]],
935 union_datasets: ResolvedDatasetSearch[list[str]] | None,
936 projection_columns: qt.ColumnSet,
937 needs_dimension_distinct: bool,
938 needs_dataset_distinct: bool,
939 needs_validity_match_count: bool,
940 find_first_dataset: str | qt.AnyDatasetType | None,
941 order_by: Iterable[qt.OrderExpression],
942 ) -> None:
943 """Apply the "projection" stage of query construction to a single
944 SQL SELECT builder.
946 This method is expected to be invoked by
947 `QueryBuilder.apply_projection` implementations.
949 Parameters
950 ----------
951 select_builder : `SqlSelectBuilder`
952 Low-level SQL builder for a single SELECT term, modified in place.
953 postprocessing : `Postprocessing`
954 Description of query processing that happens in Python after the
955 query is executed by the database.
956 join_datasets : `~collections.abc.Mapping` [ `str`, \
957 `ResolvedDatasetSearch` [ `str` ] ]
958 Information about regular (non-union) dataset searches joined into
959 the query.
960 union_datasets : `ResolvedDatasetSearch [ `list` [ `str` ] ] or `None`
961 Information about a search for dataset of multiple types with the
962 same dimensions and the same post-filtering collection search path.
963 projection_columns : `.queries.tree.ColumnSet`
964 Columns to include in this projection stage of the query.
965 needs_dimension_distinct : `bool`
966 Whether this query needs a GROUP BY or DISTINCT to filter out rows
967 where the only differences are dimension values that are not being
968 returned to the user.
969 needs_dataset_distinct : `bool`
970 Whether this query needs a GROUP BY or DISTINCT to filter out rows
971 that correspond to entries for different collections.
972 needs_validity_match_count : `bool`
973 Whether this query needs a COUNT column to track the number of
974 datasets for each data ID and dataset type. If this is `False` but
975 ``postprocessing.check_validity_match_count`` is `True`, a dummy
976 count column that is just "1" should be added, because the check
977 is needed only for some other SELECT term in a UNION ALL.
978 find_first_dataset : `str` or ``ANY_DATASET`` or `None`
979 Name of the dataset type that will need a find-first stage.
980 ``ANY_DATASET`` is used when the union datasets need a find-first
981 search, while `None` is used to represent both ``find_first=False``
982 and the case when ``find_first=True`` but only only collection has
983 survived filtering.
984 order_by : `~collections.abc.Iterable` [ \
985 `.queries.tree.OrderExpression` ]
986 Order by clause associated with the query.
987 """
988 select_builder.columns = projection_columns
989 if not needs_dimension_distinct and not needs_dataset_distinct and not needs_validity_match_count:
990 if postprocessing.check_validity_match_count:
991 select_builder.joins.special[postprocessing.VALIDITY_MATCH_COUNT] = sqlalchemy.literal(1)
992 # Rows are already unique; nothing else to do in this method.
993 return
994 # This method generates either a SELECT DISTINCT [ON] or a SELECT with
995 # GROUP BY. We'll work out which as we go.
996 have_aggregates: bool = False
997 # Dimension key columns form at least most of our GROUP BY or DISTINCT
998 # ON clause.
999 unique_keys: list[sqlalchemy.ColumnElement[Any]] = [
1000 select_builder.joins.dimension_keys[k][0]
1001 for k in projection_columns.dimensions.data_coordinate_keys
1002 ]
1004 # Many of our fields derive their uniqueness from the unique_key
1005 # fields: if rows are unique over the 'unique_key' fields, then they're
1006 # automatically unique over these 'derived_fields'. We just remember
1007 # these as pairs of (logical_table, field) for now.
1008 derived_fields: list[tuple[str | qt.AnyDatasetType, str]] = []
1010 # There are two reasons we might need an aggregate function:
1011 # - to make sure temporal constraints and joins have resulted in at
1012 # most one validity range match for each data ID and collection,
1013 # when we're doing a find-first query.
1014 # - to compute the unions of regions we need for postprocessing, when
1015 # the data IDs for those regions are not wholly included in the
1016 # results (i.e. we need to postprocess on
1017 # visit_detector_region.region, but the output rows don't have
1018 # detector, just visit - so we pack the overlap expression into a
1019 # blob via an aggregate function and interpret it later).
1020 if postprocessing.check_validity_match_count:
1021 if needs_validity_match_count:
1022 select_builder.joins.special[postprocessing.VALIDITY_MATCH_COUNT] = (
1023 sqlalchemy.func.count().label(postprocessing.VALIDITY_MATCH_COUNT)
1024 )
1025 have_aggregates = True
1026 else:
1027 select_builder.joins.special[postprocessing.VALIDITY_MATCH_COUNT] = sqlalchemy.literal(1)
1029 for element in postprocessing.iter_missing(projection_columns):
1030 if element.name in projection_columns.dimensions.elements:
1031 # The region associated with dimension keys returned by the
1032 # query are derived fields, since there is only one region
1033 # associated with each dimension key value.
1034 derived_fields.append((element.name, "region"))
1035 else:
1036 # If there's a projection and we're doing postprocessing, we
1037 # might be collapsing the dimensions of the postprocessing
1038 # regions. When that happens, we want to apply an aggregate
1039 # function to them that computes the union of the regions that
1040 # are grouped together. Note that this should only happen for
1041 # constraints that involve a "given", external-to-the-database
1042 # region (postprocessing.spatial_where_filtering); join
1043 # constraints that need aggregates should have already been
1044 # transformed in advance.
1045 select_builder.joins.fields[element.name]["region"] = ddl.Base64Region.union_aggregate(
1046 select_builder.joins.fields[element.name]["region"]
1047 )
1048 have_aggregates = True
1049 # Postprocessing spatial join constraints where at least one region's
1050 # dimensions are being projected away will have already been turned
1051 # into the kind of expression that sphgeom.Region.decodeOverlapsBase64
1052 # processes. We can just apply an aggregate function to these. Note
1053 # that we don't do this to other constraints in order to minimize
1054 # duplicate fetches of the same region blob.
1055 for expr_name in postprocessing.spatial_expression_filtering:
1056 select_builder.joins.special[expr_name] = sqlalchemy.cast(
1057 sqlalchemy.func.aggregate_strings(select_builder.joins.special[expr_name], "|"),
1058 type_=sqlalchemy.LargeBinary,
1059 )
1060 have_aggregates = True
1062 # All dimension record fields are derived fields.
1063 for element_name, fields_for_element in projection_columns.dimension_fields.items():
1064 for element_field in fields_for_element:
1065 derived_fields.append((element_name, element_field))
1066 # Some dataset fields are derived fields and some are unique keys, and
1067 # it depends on the kinds of collection(s) we're searching and whether
1068 # it's a find-first query.
1069 for dataset_type, fields_for_dataset in projection_columns.dataset_fields.items():
1070 is_find_first = dataset_type == find_first_dataset
1071 dataset_search: ResolvedDatasetSearch[Any]
1072 if dataset_type is qt.ANY_DATASET:
1073 assert union_datasets is not None
1074 dataset_search = union_datasets
1075 else:
1076 dataset_search = join_datasets[dataset_type]
1077 for dataset_field in fields_for_dataset:
1078 if dataset_field == "collection_key":
1079 # If the collection_key field is present, it's needed for
1080 # uniqueness if we're looking in more than one collection.
1081 # If not, it's a derived field.
1082 if len(dataset_search.collection_records) > 1:
1083 unique_keys.append(select_builder.joins.fields[dataset_type]["collection_key"])
1084 else:
1085 derived_fields.append((dataset_type, "collection_key"))
1086 elif dataset_field == "timespan" and dataset_search.is_calibration_search:
1087 # The timespan is also a unique key...
1088 if is_find_first:
1089 # ...unless we're doing a find-first search on this
1090 # dataset, in which case we need to use ANY_VALUE on
1091 # the timespan and check that _VALIDITY_MATCH_COUNT
1092 # (added earlier) is one, indicating that there was
1093 # indeed only one timespan for each data ID in each
1094 # collection that survived the base query's WHERE
1095 # clauses and JOINs.
1096 if not self.db.has_any_aggregate:
1097 raise NotImplementedError(
1098 f"Cannot generate query that returns timespan for {dataset_type!r} after a "
1099 "find-first search, because this database does not support the ANY_VALUE "
1100 "aggregate function (or equivalent)."
1101 )
1102 select_builder.joins.timespans[dataset_type] = select_builder.joins.timespans[
1103 dataset_type
1104 ].apply_any_aggregate(self.db.apply_any_aggregate)
1105 else:
1106 unique_keys.extend(select_builder.joins.timespans[dataset_type].flatten())
1107 elif (
1108 dataset_field == "dataset_id"
1109 and len(dataset_search.collection_records) > 1
1110 and not is_find_first
1111 ):
1112 # If we have more than one collection in the search, we can
1113 # find multiple dataset IDs with the same data ID, in
1114 # different collections.
1115 # In a non-find-first search, we have to make dataset ID a
1116 # unique key to prevent de-duplication for rows with the
1117 # same data ID but different dataset IDs.
1118 # We don't do this for a find-first search because the
1119 # window function will take care of it.
1120 unique_keys.append(select_builder.joins.fields[dataset_type]["dataset_id"])
1121 else:
1122 # Other dataset fields derive their uniqueness from key
1123 # fields.
1124 derived_fields.append((dataset_type, dataset_field))
1125 if not have_aggregates and not derived_fields:
1126 # SELECT DISTINCT is sufficient.
1127 select_builder.distinct = True
1128 # With DISTINCT ON, Postgres requires that the leftmost parts of the
1129 # ORDER BY match the DISTINCT ON expressions. It's somewhat tricky to
1130 # enforce that, so instead we just don't use DISTINCT ON if ORDER BY is
1131 # present. There may be an optimization opportunity by relaxing this
1132 # restriction.
1133 elif not have_aggregates and self.db.has_distinct_on and len(list(order_by)) == 0:
1134 # SELECT DISTINCT ON is sufficient and supported by this database.
1135 select_builder.distinct = tuple(unique_keys)
1136 else:
1137 # GROUP BY is the only option.
1138 if derived_fields:
1139 if self.db.has_any_aggregate:
1140 for logical_table, field in derived_fields:
1141 if field == "timespan":
1142 select_builder.joins.timespans[logical_table] = select_builder.joins.timespans[
1143 logical_table
1144 ].apply_any_aggregate(self.db.apply_any_aggregate)
1145 else:
1146 select_builder.joins.fields[logical_table][field] = self.db.apply_any_aggregate(
1147 select_builder.joins.fields[logical_table][field]
1148 )
1149 else:
1150 _LOG.debug(
1151 "Adding %d fields to GROUP BY because this database backend does not support the "
1152 "ANY_VALUE aggregate function (or equivalent). This may result in a poor query "
1153 "plan. Materializing the query first sometimes avoids this problem. This warning "
1154 "can be ignored unless query performance is a problem.",
1155 len(derived_fields),
1156 )
1157 for logical_table, field in derived_fields:
1158 if field == "timespan":
1159 unique_keys.extend(select_builder.joins.timespans[logical_table].flatten())
1160 else:
1161 unique_keys.append(select_builder.joins.fields[logical_table][field])
1162 select_builder.group_by = tuple(unique_keys)
1164 def apply_query_find_first(
1165 self,
1166 select_builder: SqlSelectBuilder,
1167 postprocessing: Postprocessing,
1168 find_first_analysis: QueryFindFirstAnalysis,
1169 ) -> SqlSelectBuilder:
1170 """Apply the "find first" stage of query construction to a single
1171 SQL SELECT builder.
1173 This method is expected to be invoked by
1174 `QueryBuilder.apply_find_first` implementations.
1176 Parameters
1177 ----------
1178 select_builder : `SqlSelectBuilder`
1179 Low-level SQL builder for a single SELECT term, consumed on return.
1180 postprocessing : `Postprocessing`
1181 Description of query processing that happens in Python after the
1182 query is executed by the database.
1183 find_first_analysis : `QueryFindFirstAnalysis`
1184 Information about the find-first stage gathered during the analysis
1185 phase of query construction.
1187 Returns
1188 -------
1189 select_builder : `SqlSelectBuilder`
1190 Low-level SQL builder that includes the find-first logic.
1191 """
1192 # The query we're building looks like this:
1193 #
1194 # WITH {dst}_base AS (
1195 # {target}
1196 # ...
1197 # )
1198 # SELECT
1199 # {dst}_window.*,
1200 # FROM (
1201 # SELECT
1202 # {dst}_base.*,
1203 # ROW_NUMBER() OVER (
1204 # PARTITION BY {dst_base}.{dimensions}
1205 # ORDER BY {rank}
1206 # ) AS rownum
1207 # ) {dst}_window
1208 # WHERE
1209 # {dst}_window.rownum = 1;
1210 #
1211 # The outermost SELECT will be represented by the SqlSelectBuilder we
1212 # return. The SqlSelectBuilder we're given corresponds to the Common
1213 # Table Expression (CTE) at the top.
1214 #
1215 # For SQLite only, we could use a much simpler GROUP BY instead,
1216 # because it extends the standard to do exactly what we want when MIN
1217 # or MAX appears once and a column does not have an aggregate function
1218 # (https://www.sqlite.org/quirks.html). But since that doesn't work
1219 # with PostgreSQL it doesn't help us.
1220 #
1221 select_builder = select_builder.nested(cte=True, force=True, postprocessing=postprocessing)
1222 # We start by filling out the "window" SELECT statement...
1223 partition_by = [
1224 select_builder.joins.dimension_keys[d][0] for d in select_builder.columns.dimensions.required
1225 ]
1226 rank_sql_column = sqlalchemy.case(
1227 {record.key: n for n, record in enumerate(find_first_analysis.search.collection_records)},
1228 value=select_builder.joins.fields[find_first_analysis.dataset_type]["collection_key"],
1229 )
1230 if partition_by:
1231 select_builder.joins.special["_ROWNUM"] = sqlalchemy.sql.func.row_number().over(
1232 partition_by=partition_by, order_by=rank_sql_column
1233 )
1234 else:
1235 select_builder.joins.special["_ROWNUM"] = sqlalchemy.sql.func.row_number().over(
1236 order_by=rank_sql_column
1237 )
1238 # ... and then turn that into a subquery with a constraint on rownum.
1239 select_builder = select_builder.nested(force=True, postprocessing=postprocessing)
1240 # We can now add the WHERE constraint on rownum into the outer query.
1241 select_builder.joins.where(select_builder.joins.special["_ROWNUM"] == 1)
1242 # Don't propagate _ROWNUM into downstream queries.
1243 del select_builder.joins.special["_ROWNUM"]
1244 return select_builder
1246 def _analyze_collections(self, tree: qt.QueryTree) -> QueryCollectionAnalysis:
1247 """Fetch and organize information about all collections appearing in a
1248 query.
1250 Parameters
1251 ----------
1252 tree : `.queries.tree.QueryTree`
1253 Description of the joins and row filters in the query.
1255 Returns
1256 -------
1257 collection_analysis : `QueryCollectionAnalysis`
1258 Struct containing collection records and summaries, organized
1259 for later access by dataset type.
1260 """
1261 # Retrieve collection information for all collections in a tree.
1262 collection_names = set(
1263 itertools.chain.from_iterable(
1264 dataset_search.collections for dataset_search in tree.datasets.values()
1265 )
1266 )
1267 if tree.any_dataset is not None:
1268 collection_names.update(tree.any_dataset.collections)
1269 collection_records = {
1270 record.name: record
1271 for record in self.managers.collections.resolve_wildcard(
1272 CollectionWildcard.from_names(collection_names), flatten_chains=True, include_chains=True
1273 )
1274 }
1275 non_chain_records = [
1276 record for record in collection_records.values() if record.type is not CollectionType.CHAINED
1277 ]
1278 # Fetch summaries for a subset of dataset types.
1279 if tree.any_dataset is not None:
1280 summaries = self.managers.datasets.fetch_summaries(non_chain_records, dataset_types=None)
1281 else:
1282 dataset_types = [self.get_dataset_type(dataset_type_name) for dataset_type_name in tree.datasets]
1283 summaries = self.managers.datasets.fetch_summaries(non_chain_records, dataset_types)
1284 result = QueryCollectionAnalysis(collection_records=collection_records)
1285 # Do a preliminary resolution for dataset searches to identify any
1286 # calibration lookups that might participate in temporal joins.
1287 for dataset_type_name, dataset_search in tree.iter_all_dataset_searches():
1288 collection_summaries = self._filter_collections(
1289 dataset_search.collections, collection_records, summaries
1290 )
1291 result.summaries_by_dataset_type[dataset_type_name] = collection_summaries
1292 resolved_dataset_search = self._resolve_dataset_search(
1293 dataset_type_name, dataset_search, {}, collection_summaries
1294 )
1295 if resolved_dataset_search.is_calibration_search:
1296 result.calibration_dataset_types.add(dataset_type_name)
1297 return result
1299 def _filter_collections(
1300 self,
1301 collection_names: Iterable[str],
1302 records: Mapping[str, CollectionRecord],
1303 summaries: Mapping[Any, CollectionSummary],
1304 ) -> list[tuple[CollectionRecord, CollectionSummary]]:
1305 """Return a subset of collection records and summaries ordered
1306 according to the input collection list.
1308 Parameters
1309 ----------
1310 collection_names : `~collections.abc.Iterable` [`str`]
1311 List of collection names.
1312 records : `~collections.abc.Mapping` [`str`, `CollectionRecord`]
1313 Mapping of collection names to collection records, must contain
1314 records for all collections in ``collection_names`` and all their
1315 children collections.
1316 summaries : `~collections.abc.Mapping` [`typing.Any`, \
1317 `CollectionSummary`]
1318 Mapping of collection IDs to collection summaries, must contain
1319 summaries for all non-chained collections in the collection tree.
1321 Returns
1322 -------
1323 result `list` [`tuple` [`CollectionRecord`, `CollectionSummary`]]
1324 Sequence of collection records and their corresponding summaries
1325 ordered according to the order of input collections and their
1326 child collections. Does not include chained collections.
1327 """
1328 done: set[str] = set()
1330 def recurse(names: Iterable[str]) -> Iterator[tuple[CollectionRecord, CollectionSummary]]:
1331 for name in names:
1332 if name not in done:
1333 done.add(name)
1334 record = records[name]
1335 if record.type is CollectionType.CHAINED:
1336 yield from recurse(cast(ChainedCollectionRecord, record).children)
1337 else:
1338 yield record, summaries[record.key]
1340 return list(recurse(collection_names))
1342 def _resolve_dataset_search(
1343 self,
1344 dataset_type_name: _T,
1345 dataset_search: qt.DatasetSearch,
1346 constraint_data_id: Mapping[str, DataIdValue],
1347 collections: list[tuple[CollectionRecord, CollectionSummary]],
1348 ) -> ResolvedDatasetSearch[_T]:
1349 """Resolve the collections that should actually be searched for
1350 datasets of a particular type.
1352 Parameters
1353 ----------
1354 dataset_type_name : `str` or ``ANY_DATASET``
1355 Name of the dataset being searched for.
1356 dataset_search : `.queries.tree.DatasetSearch`
1357 Struct holding the dimensions and original collection search path.
1358 constraint_data_id : `~collections.abc.Mapping`
1359 Data ID mapping derived from the query predicate that may be used
1360 to filter out some collections based on their governor dimensions.
1361 collections : `list` [ `tuple` [ \
1362 `.registry.interfaces.CollectionRecord`, \
1363 `.registry.CollectionSummary` ] ]
1364 Tuples of collection record and summary.
1366 Returns
1367 -------
1368 resolved : `ResolvedDatasetSearch`
1369 Struct that extends `dataset_search`` with the dataset type name
1370 and resolved collection records.
1371 """
1372 result = ResolvedDatasetSearch(dataset_type_name, dataset_search.dimensions)
1373 if not collections:
1374 result.messages.append("No datasets can be found because collection list is empty.")
1375 for collection_record, collection_summary in collections:
1376 rejected: bool = False
1377 if (
1378 result.name is not qt.ANY_DATASET
1379 and result.name not in collection_summary.dataset_types.names
1380 ):
1381 result.messages.append(
1382 f"No datasets of type {result.name!r} in collection {collection_record.name!r}."
1383 )
1384 rejected = True
1385 for governor in (
1386 constraint_data_id.keys()
1387 & collection_summary.governors.keys()
1388 & dataset_search.dimensions.names
1389 ):
1390 if constraint_data_id[governor] not in collection_summary.governors[governor]:
1391 result.messages.append(
1392 f"No datasets with {governor}={constraint_data_id[governor]!r} "
1393 f"in collection {collection_record.name!r}."
1394 )
1395 rejected = True
1396 if not rejected:
1397 if collection_record.type is CollectionType.CALIBRATION:
1398 result.is_calibration_search = True
1399 result.collection_records.append(collection_record)
1400 return result
1402 def _join_materialization(
1403 self,
1404 joins_builder: SqlJoinsBuilder,
1405 key: qt.MaterializationKey,
1406 dimensions: DimensionGroup,
1407 ) -> frozenset[str]:
1408 """Join a materialization into an under-construction query.
1410 Parameters
1411 ----------
1412 joins_builder : `SqlJoinsBuilder`
1413 Component of a `SqlSelectBuilder` that holds the FROM and WHERE
1414 clauses. This will be modified in-place on return.
1415 key : `.queries.tree.MaterializationKey`
1416 Unique identifier created for this materialization when it was
1417 created.
1418 dimensions : `DimensionGroup`
1419 Dimensions of the materialization.
1421 Returns
1422 -------
1423 datasets : `frozenset` [ `str` ]
1424 Dataset types that were included as constraints when this
1425 materialization was created.
1426 """
1427 columns = qt.ColumnSet(dimensions)
1428 m_state = self._materializations[key]
1429 joins_builder.join(
1430 SqlJoinsBuilder(db=self.db, from_clause=m_state.table).extract_columns(
1431 columns, m_state.postprocessing
1432 )
1433 )
1434 return m_state.datasets
1436 @overload
1437 def join_dataset_search( 1437 ↛ exitline 1437 didn't return from function 'join_dataset_search' because
1438 self,
1439 joins_builder: SqlJoinsBuilder,
1440 resolved_search: ResolvedDatasetSearch[list[str]],
1441 fields: Set[qt.AnyDatasetFieldName],
1442 union_dataset_type_name: str,
1443 ) -> None: ...
1445 @overload
1446 def join_dataset_search( 1446 ↛ exitline 1446 didn't return from function 'join_dataset_search' because
1447 self,
1448 joins_builder: SqlJoinsBuilder,
1449 resolved_search: ResolvedDatasetSearch[str],
1450 fields: Set[qt.AnyDatasetFieldName],
1451 ) -> None: ...
1453 def join_dataset_search(
1454 self,
1455 joins_builder: SqlJoinsBuilder,
1456 resolved_search: ResolvedDatasetSearch[Any],
1457 fields: Set[qt.AnyDatasetFieldName],
1458 union_dataset_type_name: str | None = None,
1459 ) -> None:
1460 """Join a dataset search into an under-construction query.
1462 Parameters
1463 ----------
1464 joins_builder : `SqlJoinsBuilder`
1465 Component of a `SqlSelectBuilder` that holds the FROM and WHERE
1466 clauses. This will be modified in-place on return.
1467 resolved_search : `ResolvedDatasetSearch`
1468 Struct that describes the dataset type and collections.
1469 fields : `~collections.abc.Set` [ `str` ]
1470 Dataset fields to include.
1471 union_dataset_type_name : `str`, optional
1472 Dataset type name to use when ``resolved_search`` represents
1473 multiple union datasets.
1474 """
1475 # The asserts below will need to be dropped (and the implications
1476 # dealt with instead) if materializations start having dataset fields.
1477 if union_dataset_type_name is None:
1478 dataset_type = self.get_dataset_type(cast(str, resolved_search.name))
1479 assert dataset_type.name not in joins_builder.fields, (
1480 "Dataset fields have unexpectedly already been joined in."
1481 )
1482 assert dataset_type.name not in joins_builder.timespans, (
1483 "Dataset timespan has unexpectedly already been joined in."
1484 )
1485 else:
1486 dataset_type = self.get_dataset_type(union_dataset_type_name)
1487 assert qt.ANY_DATASET not in joins_builder.fields, (
1488 "Union dataset fields have unexpectedly already been joined in."
1489 )
1490 assert qt.ANY_DATASET not in joins_builder.timespans, (
1491 "Union dataset timespan has unexpectedly already been joined in."
1492 )
1494 joins_builder.join(
1495 self.managers.datasets.make_joins_builder(
1496 dataset_type,
1497 resolved_search.collection_records,
1498 fields,
1499 is_union=(union_dataset_type_name is not None),
1500 )
1501 )
1504@dataclasses.dataclass
1505class _MaterializationState:
1506 table: sqlalchemy.Table
1507 datasets: frozenset[str]
1508 postprocessing: Postprocessing
1511class _Cursor:
1512 """A helper class for managing paged query results and cursor lifetimes.
1514 This class holds a context manager for the SQLAlchemy cursor object but is
1515 not itself a context manager. It always cleans up (i.e. calls its `close`
1516 method) when it raises an exception or exhausts the cursor, but external
1517 code is responsible for calling `close` when the cursor is abandoned before
1518 it is exhausted, including when that happens due to an external exception.
1520 Parameters
1521 ----------
1522 db : `.registry.interface.Database`
1523 Database to run the query against.
1524 sql : `sqlalchemy.Executable`
1525 SQL query to execute.
1526 postprocessing : `Postprocessing`
1527 Post-query filtering and checks to perform.
1528 raw_page_size : `int`
1529 Maximum number of SQL result rows to return in each page, before
1530 postprocessing.
1531 page_converter : `ResultPageConverter`
1532 Object for converting raw SQL result rows into ResultPage instances.
1533 """
1535 def __init__(
1536 self,
1537 db: Database,
1538 sql: sqlalchemy.Executable,
1539 postprocessing: Postprocessing,
1540 raw_page_size: int,
1541 page_converter: ResultPageConverter,
1542 ):
1543 self._raw_page_size = raw_page_size
1544 self._postprocessing = postprocessing
1545 self._context = db.query(sql, execution_options=dict(yield_per=raw_page_size))
1546 self._page_converter = page_converter
1547 self._closed = False
1548 cursor = self._context.__enter__()
1549 try:
1550 self._iterator = cursor.partitions()
1551 except BaseException:
1552 self.close(*sys.exc_info())
1553 raise
1555 def close(self, exc_type: Any = None, exc_value: Any = None, traceback: Any = None) -> None:
1556 """Close this cursor.
1558 Parameters
1559 ----------
1560 exc_type : `type`
1561 Exception type as obtained from `sys.exc_info`, or `None` if there
1562 was no error.
1563 exc_value : `BaseException` or `None`
1564 Exception instance as obtained from `sys.exc_info`, or `None` if
1565 there was no error.
1566 traceback : `object`
1567 Traceback as obtained from `sys.exc_info`, or `None` if there was
1568 no error.
1569 """
1570 if not self._closed:
1571 self._context.__exit__(exc_type, exc_value, traceback)
1572 self._closed = True
1574 def next(self) -> ResultPage | None:
1575 """Return the next result page from this query.
1577 When there are no more results after this result page, the `next_page`
1578 attribute of the returned object is `None` and the cursor will be
1579 closed. The cursor is also closed if this method raises an exception.
1580 """
1581 if self._closed:
1582 raise RuntimeError("Cannot continue query result iteration: cursor has been closed")
1583 try:
1584 raw_page = next(self._iterator, None)
1585 if raw_page is None:
1586 self.close()
1587 return None
1589 postprocessed_rows = self._postprocessing.apply(raw_page)
1590 return self._page_converter.convert(postprocessed_rows)
1591 except BaseException:
1592 self.close(*sys.exc_info())
1593 raise