Coverage for python / lsst / daf / butler / queries / tree / _column_set.py: 27%
131 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:37 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:37 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30__all__ = ("ColumnOrder", "ColumnSet", "ResultColumn")
32from collections.abc import Iterable, Iterator, Mapping, Sequence, Set
33from typing import NamedTuple, cast
35from ... import column_spec
36from ...dimensions import DataIdValue, DimensionGroup
37from ...nonempty_mapping import NonemptyMapping
38from ._base import ANY_DATASET, AnyDatasetFieldName, AnyDatasetType
41class ColumnSet:
42 """A set-like hierarchical container for the columns in a query.
44 Parameters
45 ----------
46 dimensions : `DimensionGroup`
47 The dimensions that bound the set of columns, and by default specify
48 the set of dimension key columns present.
50 Notes
51 -----
52 This class does not inherit from `collections.abc.Set` because that brings
53 in a lot of requirements we don't need (particularly interoperability with
54 other set-like objects).
56 This class is iterable over tuples of ``(logical_table, field)``, where
57 ``logical_table`` is a dimension element name or dataset type name, and
58 ``field`` is a column associated with one of those, or `None` for dimension
59 key columns. Iteration order is guaranteed to be deterministic and to
60 start with all included dimension keys in
61 `DimensionGroup.data_coordinate_keys`.
62 """
64 def __init__(self, dimensions: DimensionGroup) -> None:
65 self._dimensions = dimensions
66 self._removed_dimension_keys: set[str] = set()
67 self._dimension_fields: dict[str, set[str]] = {name: set() for name in dimensions.elements}
68 self._dataset_fields = NonemptyMapping[str | AnyDatasetType, set[AnyDatasetFieldName]](set)
70 @property
71 def dimensions(self) -> DimensionGroup:
72 """The dimensions that bound all columns in the set."""
73 return self._dimensions
75 @property
76 def dimension_fields(self) -> Mapping[str, set[str]]:
77 """Dimension record fields included in the set, grouped by dimension
78 element name.
80 The keys of this mapping are always ``self.dimensions.elements``, and
81 nested sets may be empty.
82 """
83 return self._dimension_fields
85 @property
86 def dataset_fields(self) -> NonemptyMapping[str | AnyDatasetType, set[AnyDatasetFieldName]]:
87 """Dataset fields included in the set, grouped by dataset type name.
89 The keys of this mapping are just those that actually have nonempty
90 nested sets.
91 """
92 return self._dataset_fields
94 def __bool__(self) -> bool:
95 return bool(self._dimensions) or any(self._dataset_fields.values())
97 def __eq__(self, other: object) -> bool:
98 if not isinstance(other, ColumnSet):
99 return False
100 return (
101 self._dimensions == other._dimensions
102 and self._removed_dimension_keys == other._removed_dimension_keys
103 and self._dimension_fields == other._dimension_fields
104 and self._dataset_fields == other._dataset_fields
105 )
107 def __str__(self) -> str:
108 return f"{{{', '.join(self.get_qualified_name(k, v) for k, v in self)}}}"
110 def issubset(self, other: ColumnSet) -> bool:
111 """Test whether all columns in this set are also in another.
113 Parameters
114 ----------
115 other : `ColumnSet`
116 Set of columns to compare to.
118 Returns
119 -------
120 issubset : `bool`
121 Whether all columns in ``self`` are also in ``other``.
122 """
123 return (
124 (self._get_dimension_keys() <= other._get_dimension_keys())
125 and all(
126 fields.issubset(other._dimension_fields.get(element_name, frozenset()))
127 for element_name, fields in self._dimension_fields.items()
128 )
129 and all(
130 fields.issubset(other._dataset_fields.get(dataset_type, frozenset()))
131 for dataset_type, fields in self._dataset_fields.items()
132 )
133 )
135 def issuperset(self, other: ColumnSet) -> bool:
136 """Test whether all columns another set are also in this one.
138 Parameters
139 ----------
140 other : `ColumnSet`
141 Set of columns to compare to.
143 Returns
144 -------
145 issuperset : `bool`
146 Whether all columns in ``other`` are also in ``self``.
147 """
148 return other.issubset(self)
150 def isdisjoint(self, other: ColumnSet) -> bool:
151 """Test whether there are no columns in both this set and another.
153 Parameters
154 ----------
155 other : `ColumnSet`
156 Set of columns to compare to.
158 Returns
159 -------
160 isdisjoint : `bool`
161 Whether there are any columns in both ``self`` and ``other``.
162 """
163 return (
164 self._get_dimension_keys().isdisjoint(other._get_dimension_keys())
165 and all(
166 fields.isdisjoint(other._dimension_fields.get(element, frozenset()))
167 for element, fields in self._dimension_fields.items()
168 )
169 and all(
170 fields.isdisjoint(other._dataset_fields.get(dataset_type, frozenset()))
171 for dataset_type, fields in self._dataset_fields.items()
172 )
173 )
175 def copy(self) -> ColumnSet:
176 """Return a copy of this set.
178 Returns
179 -------
180 copy : `ColumnSet`
181 New column set that can be modified without changing the original.
182 """
183 result = ColumnSet(self._dimensions)
184 for element_name, element_fields in self._dimension_fields.items():
185 result._dimension_fields[element_name].update(element_fields)
186 for dataset_type, dataset_fields in self._dataset_fields.items():
187 result._dataset_fields[dataset_type].update(dataset_fields)
188 return result
190 def update_dimensions(self, dimensions: DimensionGroup) -> None:
191 """Add new dimensions to the set.
193 Parameters
194 ----------
195 dimensions : `DimensionGroup`
196 Dimensions to be included.
197 """
198 if not dimensions.issubset(self._dimensions):
199 self._dimensions = dimensions.union(self._dimensions)
200 self._dimension_fields = {
201 name: self._dimension_fields.get(name, set()) for name in self._dimensions.elements
202 }
203 self._removed_dimension_keys.intersection_update(dimensions.names)
205 def update(self, other: ColumnSet) -> None:
206 """Add columns from another set to this one.
208 Parameters
209 ----------
210 other : `ColumnSet`
211 Column set whose columns should be included in this one.
212 """
213 self.update_dimensions(other.dimensions)
214 self._removed_dimension_keys.intersection_update(other._removed_dimension_keys)
215 for element_name, element_fields in other._dimension_fields.items():
216 self._dimension_fields[element_name].update(element_fields)
217 for dataset_type, dataset_fields in other._dataset_fields.items():
218 self._dataset_fields[dataset_type].update(dataset_fields)
220 def drop_dimension_keys(self, names: Iterable[str]) -> ColumnSet:
221 """Remove the given dimension key columns from the set.
223 Parameters
224 ----------
225 names : `~collections.abc.Iterable` [ `str` ]
226 Names of the dimensions to remove.
228 Returns
229 -------
230 self : `ColumnSet`
231 This column set, modified in place.
232 """
233 self._removed_dimension_keys.update(names)
234 return self
236 def drop_implied_dimension_keys(self) -> ColumnSet:
237 """Remove dimension key columns that are implied by others.
239 Returns
240 -------
241 self : `ColumnSet`
242 This column set, modified in place.
243 """
244 return self.drop_dimension_keys(self._dimensions.implied)
246 def restore_dimension_keys(self) -> None:
247 """Restore all removed dimension key columns."""
248 self._removed_dimension_keys.clear()
250 def __iter__(self) -> Iterator[ResultColumn]:
251 yield from self.get_column_order().columns()
253 def get_column_order(self) -> ColumnOrder:
254 dimension_names: list[ResultColumn] = []
255 for dimension_name in self._dimensions.data_coordinate_keys:
256 if dimension_name not in self._removed_dimension_keys:
257 dimension_names.append(ResultColumn(dimension_name, None))
259 # We iterate over DimensionElements and their DimensionRecord columns
260 # in order to make sure that's predictable. We might want to extract
261 # these query results positionally in some contexts.
262 dimension_elements: list[ResultColumn] = []
263 for element_name in self._dimensions.elements:
264 element = self._dimensions.universe[element_name]
265 fields_for_element = self._dimension_fields[element_name]
266 for spec in element.schema.remainder:
267 if spec.name in fields_for_element:
268 dimension_elements.append(ResultColumn(element_name, spec.name))
270 # We sort dataset types and their fields lexicographically just to keep
271 # our queries from having any dependence on set-iteration order.
272 dataset_fields: list[ResultColumn] = []
273 for dataset_type in sorted(self._dataset_fields, key=str): # transform ANY_DATASET to str for sort
274 for field in sorted(self._dataset_fields[dataset_type]):
275 dataset_fields.append(ResultColumn(dataset_type, field))
277 return ColumnOrder(dimension_names, dimension_elements, dataset_fields)
279 def is_timespan(self, logical_table: AnyDatasetType | str, field: str | None) -> bool:
280 """Test whether the given column is a timespan.
282 Parameters
283 ----------
284 logical_table : `str` or ``ANY_DATASET``
285 Name of the dimension element or dataset type the column belongs
286 to. ``ANY_DATASET`` is used to represent any dataset type.
287 field : `str` or `None`
288 Column within the logical table, or `None` for dimension key
289 columns.
291 Returns
292 -------
293 is_timespan : `bool`
294 Whether this column is a timespan.
295 """
296 return field == "timespan"
298 @staticmethod
299 def get_qualified_name(logical_table: AnyDatasetType | str, field: str | None) -> str:
300 """Return string that should be used to fully identify a column.
302 Parameters
303 ----------
304 logical_table : `str` or ``ANY_DATASET``
305 Name of the dimension element or dataset type the column belongs
306 to. ``ANY_DATASET`` is used to represent any dataset type.
307 field : `str` or `None`
308 Column within the logical table, or `None` for dimension key
309 columns.
311 Returns
312 -------
313 name : `str`
314 Fully-qualified name.
315 """
316 return str(logical_table) if field is None else f"{logical_table}:{field}"
318 def get_column_spec(
319 self, logical_table: AnyDatasetType | str, field: str | None
320 ) -> column_spec.ColumnSpec:
321 """Return a complete description of a column.
323 Parameters
324 ----------
325 logical_table : `str` or ``ANY_DATASET``
326 Name of the dimension element or dataset type the column belongs
327 to. ``ANY_DATASET`` is used to represent any dataset type.
328 field : `str` or `None`
329 Column within the logical table, or `None` for dimension key
330 columns.
332 Returns
333 -------
334 spec : `.column_spec.ColumnSpec`
335 Description of the column.
336 """
337 qualified_name = self.get_qualified_name(logical_table, field)
338 if field is None:
339 assert logical_table is not ANY_DATASET
340 return self._dimensions.universe.dimensions[logical_table].primary_key.model_copy(
341 update=dict(name=qualified_name)
342 )
343 if logical_table in self._dimension_fields:
344 assert logical_table is not ANY_DATASET
345 return (
346 self._dimensions.universe[logical_table]
347 .schema.all[field]
348 .model_copy(update=dict(name=qualified_name))
349 )
350 match field:
351 case "dataset_id":
352 return column_spec.UUIDColumnSpec.model_construct(name=qualified_name, nullable=False)
353 case "ingest_date":
354 return column_spec.DateTimeColumnSpec.model_construct(name=qualified_name)
355 case "run":
356 return column_spec.StringColumnSpec.model_construct(
357 name=qualified_name, nullable=False, length=column_spec.COLLECTION_NAME_MAX_LENGTH
358 )
359 case "collection":
360 return column_spec.StringColumnSpec.model_construct(
361 name=qualified_name, nullable=False, length=column_spec.COLLECTION_NAME_MAX_LENGTH
362 )
363 case "timespan":
364 return column_spec.TimespanColumnSpec.model_construct(name=qualified_name, nullable=True)
365 raise AssertionError(f"Unrecognized column identifiers: {logical_table}, {field}.")
367 def _get_dimension_keys(self) -> Set[str]:
368 if not self._removed_dimension_keys:
369 return self._dimensions.names
370 else:
371 return self._dimensions.names - self._removed_dimension_keys
374class ResultColumn(NamedTuple):
375 """Defines a column that can be output from a query."""
377 logical_table: AnyDatasetType | str
378 """Dimension element name or dataset type name."""
380 field: str | None
381 """Column associated with the dimension element or dataset type, or `None`
382 if it is a dimension key column."""
384 def __str__(self) -> str:
385 return str(self.logical_table) if self.field is None else f"{self.logical_table}.{self.field}"
388class ColumnOrder:
389 """Defines the position of columns within a result row and provides helper
390 methods for accessing subsets of columns in a row.
392 Parameters
393 ----------
394 dimension_keys : `~collections.abc.Iterable` [ `ResultColumn` ]
395 Columns corresponding to dimension primary keys.
396 dimension_elements : `~collections.abc.Iterable` [ `ResultColumn` ]
397 Columns corresponding to DimensionElements and their DimensionRecord
398 columns.
399 dataset_fields : `~collections.abc.Iterable` [ `ResultColumn` ]
400 Columns corresponding to dataset types and their fields.
401 """
403 def __init__(
404 self,
405 dimension_keys: Iterable[ResultColumn],
406 dimension_elements: Iterable[ResultColumn],
407 dataset_fields: Iterable[ResultColumn],
408 ):
409 self._dimension_keys = tuple(dimension_keys)
410 self._dimension_elements = tuple(dimension_elements)
411 self._dataset_fields = tuple(dataset_fields)
413 def columns(self) -> Iterator[ResultColumn]:
414 # When editing this method, take care to update the other methods on
415 # this object to correspond to the new order.
416 yield from self._dimension_keys
417 yield from self._dimension_elements
418 yield from self._dataset_fields
420 @property
421 def dimension_key_names(self) -> list[str]:
422 """Return the names of the dimension key columns included in result
423 rows, in the order they appear in the row.
424 """
425 return [cast(str, column.logical_table) for column in self._dimension_keys]
427 def extract_dimension_key_columns(self, row: Sequence[DataIdValue]) -> Sequence[DataIdValue]:
428 """Given a full result row, return just the dimension key columns.
430 Parameters
431 ----------
432 row : `~collections.abc.Sequence` [ `DataIdValue` ]
433 A row output by the SQL query associated with these columns.
434 """
435 return row[: len(self._dimension_keys)]