Coverage for python / lsst / daf / butler / queries / result_specs.py: 39%
141 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:36 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:36 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30__all__ = (
31 "DataCoordinateResultSpec",
32 "DatasetRefResultSpec",
33 "DimensionRecordResultSpec",
34 "ResultSpecBase",
35)
37from abc import ABC, abstractmethod
38from collections.abc import Mapping
39from typing import Annotated, Literal, TypeAlias, cast
41import pydantic
43from .._exceptions import InvalidQueryError
44from ..dimensions import DimensionElement, DimensionGroup, DimensionUniverse
45from ..pydantic_utils import DeferredValidation
46from .tree import AnyDatasetFieldName, AnyDatasetType, ColumnSet, OrderExpression, QueryTree
49class ResultSpecBase(pydantic.BaseModel, ABC):
50 """Base class for all query-result specification objects.
52 A result specification is a struct that is combined with a `QueryTree` to
53 represent a serializable query-results object.
54 """
56 result_type: str
57 """String literal that corresponds to a concrete derived type."""
59 order_by: tuple[OrderExpression, ...] = ()
60 """Expressions to sort the rows by."""
62 limit: int | None = None
63 """Maximum number of rows to return, or `None` for no bound."""
65 allow_duplicate_overlaps: bool = False
66 """If set to True the queries are allowed to returnd duplicate rows for
67 spatial overlaps.
68 """
70 def validate_tree(self, tree: QueryTree) -> None:
71 """Check that this result object is consistent with a query tree.
73 Parameters
74 ----------
75 tree : `QueryTree`
76 Query tree that defines the joins and row-filtering that these
77 results will come from.
78 """
79 spec = cast(ResultSpec, self)
80 if not spec.dimensions <= tree.dimensions:
81 raise InvalidQueryError(
82 f"Query result specification has dimensions {spec.dimensions} that are not a subset of the "
83 f"query's dimensions {tree.dimensions}."
84 )
85 result_columns = spec.get_result_columns()
86 assert result_columns.dimensions == spec.dimensions, "enforced by ResultSpec implementations"
87 for dataset_type in result_columns.dataset_fields:
88 if dataset_type not in tree.datasets:
89 raise InvalidQueryError(f"Dataset {dataset_type!r} is not available from this query.")
90 order_by_columns = ColumnSet(spec.dimensions)
91 for term in spec.order_by:
92 term.gather_required_columns(order_by_columns)
93 if not (order_by_columns.dimensions <= spec.dimensions):
94 allowed_columns = spec.dimensions.names
95 invalid_columns = order_by_columns.dimensions.names - allowed_columns
96 raise InvalidQueryError(
97 "Order-by expression may not reference columns that are not in the result dimensions.\n"
98 f"Invalid columns: {invalid_columns}\n"
99 f"Available columns: {allowed_columns}"
100 )
101 for dataset_type in order_by_columns.dataset_fields.keys():
102 if dataset_type not in tree.datasets:
103 raise InvalidQueryError(
104 f"Dataset type {dataset_type!r} in order-by expression is not part of the query."
105 )
107 @property
108 def find_first_dataset(self) -> str | AnyDatasetType | None:
109 """The dataset type for which find-first resolution is required, if
110 any.
111 """
112 return None
114 @abstractmethod
115 def get_result_columns(self) -> ColumnSet:
116 """Return the columns included in the actual result rows.
118 This does not necessarily include all columns required by the
119 `order_by` terms that are also a part of this spec.
120 """
121 raise NotImplementedError()
124class DataCoordinateResultSpec(ResultSpecBase):
125 """Specification for a query that yields `DataCoordinate` objects."""
127 result_type: Literal["data_coordinate"] = "data_coordinate"
129 dimensions: DimensionGroup
130 """The dimensions of the data IDs returned by this query."""
132 include_dimension_records: bool = False
133 """Whether the returned data IDs include dimension records."""
135 def get_result_columns(self) -> ColumnSet:
136 # Docstring inherited.
137 result = ColumnSet(self.dimensions)
138 if self.include_dimension_records:
139 _add_dimension_records_to_column_set(self.dimensions, result)
140 return result
143class DimensionRecordResultSpec(ResultSpecBase):
144 """Specification for a query that yields `DimensionRecord` objects."""
146 result_type: Literal["dimension_record"] = "dimension_record"
148 element: DimensionElement
149 """The name and definition of the dimension records returned by this query.
150 """
152 @property
153 def dimensions(self) -> DimensionGroup:
154 """The dimensions that are required or implied (directly or indirectly)
155 by this dimension element.
156 """
157 return self.element.minimal_group
159 def get_result_columns(self) -> ColumnSet:
160 # Docstring inherited.
161 result = ColumnSet(self.element.minimal_group)
162 if self.element not in self.dimensions.universe.skypix_dimensions:
163 result.dimension_fields[self.element.name].update(self.element.schema.remainder.names)
164 result.drop_dimension_keys(self.element.minimal_group.names - self.element.dimensions.names)
165 return result
168class DatasetRefResultSpec(ResultSpecBase):
169 """Specification for a query that yields `DatasetRef` objects."""
171 result_type: Literal["dataset_ref"] = "dataset_ref"
173 dataset_type_name: str
174 """The dataset type name of the datasets returned by this query."""
176 dimensions: DimensionGroup
177 """The dimensions of the datasets returned by this query."""
179 storage_class_name: str
180 """The name of the storage class of the datasets returned by this query."""
182 include_dimension_records: bool = False
183 """Whether the data IDs returned by this query include dimension records.
184 """
186 find_first: bool
187 """Whether this query should resolve data ID duplicates according to the
188 order of the collections to be searched.
189 """
191 @property
192 def find_first_dataset(self) -> str | None:
193 # Docstring inherited.
194 return self.dataset_type_name if self.find_first else None
196 def get_result_columns(self) -> ColumnSet:
197 # Docstring inherited.
198 result = ColumnSet(self.dimensions)
199 result.dataset_fields[self.dataset_type_name].update({"dataset_id", "run"})
200 if self.include_dimension_records:
201 _add_dimension_records_to_column_set(self.dimensions, result)
202 return result
205class GeneralResultSpec(ResultSpecBase):
206 """Specification for a query that yields a table with
207 an explicit list of columns.
208 """
210 result_type: Literal["general"] = "general"
212 dimensions: DimensionGroup
213 """The dimensions that span all fields returned by this query."""
215 dimension_fields: Mapping[str, set[str]]
216 """Dimension record fields included in this query."""
218 dataset_fields: Mapping[str, set[AnyDatasetFieldName]]
219 """Dataset fields included in this query."""
221 include_dimension_records: bool = False
222 """Whether to include fields for all dimension records, in addition to
223 explicitly specified in `dimension_fields`.
224 """
226 find_first: bool
227 """Whether this query requires find-first resolution for a dataset.
229 This can only be `True` if exactly one dataset type's fields are included
230 in the results and the ``collection`` and ``timespan`` fields for that
231 dataset are not included.
232 """
234 @property
235 def find_first_dataset(self) -> str | None:
236 # Docstring inherited.
237 if self.find_first:
238 if len(self.dataset_fields) != 1:
239 raise InvalidQueryError(
240 "General query with find_first=True cannot have results from multiple dataset searches."
241 )
242 (dataset_type,) = self.dataset_fields.keys()
243 return dataset_type
244 return None
246 def get_result_columns(self) -> ColumnSet:
247 # Docstring inherited.
248 result = ColumnSet(self.dimensions)
249 for element_name, fields_for_element in self.dimension_fields.items():
250 result.dimension_fields[element_name].update(fields_for_element)
251 for dataset_type, fields_for_dataset in self.dataset_fields.items():
252 result.dataset_fields[dataset_type].update(fields_for_dataset)
253 if self.include_dimension_records:
254 # This only adds record fields for non-cached and non-skypix
255 # elements, this is what we want when generating query. When
256 # `include_dimension_records` is True, dimension records for cached
257 # and skypix elements are added to result pages by page converter.
258 _add_dimension_records_to_column_set(self.dimensions, result)
259 return result
261 @pydantic.model_validator(mode="after")
262 def _validate(self) -> GeneralResultSpec:
263 if self.find_first and len(self.dataset_fields) != 1:
264 raise InvalidQueryError("find_first=True requires exactly one result dataset type.")
265 for element_name, fields_for_element in self.dimension_fields.items():
266 if element_name not in self.dimensions.elements:
267 raise InvalidQueryError(f"Dimension element {element_name} is not in {self.dimensions}.")
268 if not fields_for_element:
269 raise InvalidQueryError(
270 f"Empty dimension element field set for {element_name!r} is not permitted."
271 )
272 elif element_name in self.dimensions.universe.skypix_dimensions.names:
273 raise InvalidQueryError(
274 f"Regions for skypix dimension {element_name!r} are not stored; compute them via "
275 f"{element_name}.pixelization.pixel(id) instead."
276 )
277 for dataset_type, fields_for_dataset in self.dataset_fields.items():
278 if not fields_for_dataset:
279 raise InvalidQueryError(f"Empty dataset field set for {dataset_type!r} is not permitted.")
280 return self
283ResultSpec: TypeAlias = Annotated[
284 DataCoordinateResultSpec | DimensionRecordResultSpec | DatasetRefResultSpec | GeneralResultSpec,
285 pydantic.Field(discriminator="result_type"),
286]
289class SerializedResultSpec(DeferredValidation[ResultSpec]):
290 def to_result_spec(self, universe: DimensionUniverse) -> ResultSpec:
291 return self.validated(universe=universe)
294def _add_dimension_records_to_column_set(dimensions: DimensionGroup, column_set: ColumnSet) -> None:
295 """Add extra columns for generating 'expanded' data IDs that include
296 dimension records.
297 """
298 for element_name in dimensions.elements:
299 element = dimensions.universe[element_name]
300 if not element.is_cached and element not in dimensions.universe.skypix_dimensions:
301 column_set.dimension_fields[element_name].update(element.schema.remainder.names)