Coverage for python / lsst / daf / butler / direct_query_driver / _result_page_converter.py: 31%
173 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:37 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:37 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30import datetime
31from abc import abstractmethod
32from collections.abc import Iterable
33from dataclasses import dataclass
34from typing import TYPE_CHECKING, Any
36import astropy.time
37import sqlalchemy
39from .._dataset_ref import DatasetRef
40from .._dataset_type import DatasetType
41from ..dimensions import (
42 DataCoordinate,
43 DimensionElement,
44 DimensionGroup,
45 DimensionRecord,
46 DimensionRecordSet,
47 SkyPixDimension,
48)
49from ..dimensions.record_cache import DimensionRecordCache
50from ..queries import tree as qt
51from ..queries.driver import (
52 DataCoordinateResultPage,
53 DatasetRefResultPage,
54 DimensionRecordResultPage,
55 GeneralResultPage,
56 ResultPage,
57)
58from ..queries.result_specs import (
59 DataCoordinateResultSpec,
60 DatasetRefResultSpec,
61 DimensionRecordResultSpec,
62 GeneralResultSpec,
63)
64from ..timespan_database_representation import TimespanDatabaseRepresentation
66if TYPE_CHECKING:
67 from ..registry.interfaces import Database
70class ResultPageConverter:
71 """Interface for raw SQL result row conversion to `ResultPage`."""
73 @abstractmethod
74 def convert(self, raw_rows: Iterable[sqlalchemy.Row]) -> ResultPage:
75 """Convert raw SQL result rows into a `ResultPage` object containing
76 high-level `Butler` types.
78 Parameters
79 ----------
80 raw_rows : `~collections.abc.Iterable` [ `sqlalchemy.Row` ]
81 Iterable of SQLAlchemy rows, with `Postprocessing` filters already
82 applied.
84 Returns
85 -------
86 result_page : `ResultPage`
87 Converted results.
88 """
89 raise NotImplementedError()
92@dataclass(frozen=True)
93class ResultPageConverterContext:
94 """Parameters used by all result page converters."""
96 db: Database
97 column_order: qt.ColumnOrder
98 dimension_record_cache: DimensionRecordCache
101class DimensionRecordResultPageConverter(ResultPageConverter): # numpydoc ignore=PR01
102 """Converts raw SQL rows into pages of `DimensionRecord` query results."""
104 def __init__(self, spec: DimensionRecordResultSpec, ctx: ResultPageConverterContext) -> None:
105 self._result_spec = spec
106 self._converter = _create_dimension_record_row_converter(spec.element, ctx, use_cache=False)
108 def convert(self, raw_rows: Iterable[sqlalchemy.Row]) -> DimensionRecordResultPage:
109 record_set = DimensionRecordSet(self._result_spec.element)
110 for raw_row in raw_rows:
111 record_set.add(self._converter.convert(raw_row))
112 return DimensionRecordResultPage(spec=self._result_spec, rows=record_set)
115def _create_dimension_record_row_converter(
116 element: DimensionElement, ctx: ResultPageConverterContext, *, use_cache: bool = True
117) -> _DimensionRecordRowConverter:
118 if use_cache and element.is_cached:
119 return _CachedDimensionRecordRowConverter(element, ctx.dimension_record_cache)
120 elif isinstance(element, SkyPixDimension):
121 return _SkypixDimensionRecordRowConverter(element)
122 else:
123 return _NormalDimensionRecordRowConverter(element, ctx.db)
126class _DimensionRecordRowConverter:
127 """Interface definition for helper objects that convert a result row into a
128 DimensionRecord instance.
129 """
131 @abstractmethod
132 def convert(self, row: sqlalchemy.Row) -> DimensionRecord:
133 raise NotImplementedError()
136class _NormalDimensionRecordRowConverter(_DimensionRecordRowConverter):
137 """Helper for converting result row into a DimensionRecord instance for
138 typical dimensions (all non-skypix dimensions).
139 """
141 def __init__(self, element: DimensionElement, db: Database) -> None:
142 self._db = db
143 self._element = element
144 self._record_cls = self._element.RecordClass
146 # Mapping from DimensionRecord attribute name to qualified column
147 # name, but as a list of tuples since we'd just iterate over items
148 # anyway.
149 column_map = list(
150 zip(
151 element.schema.dimensions.names,
152 element.dimensions.names,
153 )
154 )
155 for field in element.schema.remainder.names:
156 if field != "timespan":
157 column_map.append((field, qt.ColumnSet.get_qualified_name(element.name, field)))
158 self._column_map = column_map
160 self._timespan_qualified_name: str | None = None
161 if element.temporal:
162 self._timespan_qualified_name = qt.ColumnSet.get_qualified_name(element.name, "timespan")
164 def convert(self, row: sqlalchemy.Row) -> DimensionRecord:
165 m = row._mapping
166 d = {k: m[v] for k, v in self._column_map}
167 if self._timespan_qualified_name is not None:
168 d["timespan"] = self._db.getTimespanRepresentation().extract(
169 m, name=self._timespan_qualified_name
170 )
171 return self._record_cls(**d)
174class _SkypixDimensionRecordRowConverter(_DimensionRecordRowConverter):
175 """Helper for converting result row into a DimensionRecord instance for
176 skypix dimensions.
177 """
179 def __init__(self, element: SkyPixDimension):
180 self._pixelization = element.pixelization
181 self._id_qualified_name = qt.ColumnSet.get_qualified_name(element.name, None)
182 self._record_cls = element.RecordClass
184 def convert(self, row: sqlalchemy.Row) -> DimensionRecord:
185 pixel_id = row._mapping[self._id_qualified_name]
186 return self._record_cls(id=pixel_id, region=self._pixelization.pixel(pixel_id))
189class _CachedDimensionRecordRowConverter(_DimensionRecordRowConverter):
190 """Helper for converting result row into a DimensionRecord instance for
191 "cached" dimensions. These are dimensions with few records, used by
192 many/all dataset types. For these dimensions, a complete cache of records
193 is stored client-side instead of re-fetching them from the DB constantly.
194 """
196 def __init__(self, element: DimensionElement, cache: DimensionRecordCache) -> None:
197 self._element = element
198 self._cache = cache
199 self._key_columns = [qt.ColumnSet.get_qualified_name(name, None) for name in element.required.names]
201 def convert(self, row: sqlalchemy.Row) -> DimensionRecord:
202 mapping = row._mapping
203 values = tuple(mapping[key] for key in self._key_columns)
204 return self._cache[self._element.name].find_with_required_values(values)
207class DataCoordinateResultPageConverter(ResultPageConverter): # numpydoc ignore=PR01
208 """Converts raw SQL result iterables into a page of `DataCoordinate`
209 query results.
210 """
212 def __init__(
213 self,
214 spec: DataCoordinateResultSpec,
215 ctx: ResultPageConverterContext,
216 ) -> None:
217 self._spec = spec
218 self._converter = _DataCoordinateRowConverter(
219 spec.dimensions, ctx, include_dimension_records=spec.include_dimension_records
220 )
222 def convert(
223 self,
224 raw_rows: Iterable[sqlalchemy.Row],
225 ) -> DataCoordinateResultPage:
226 convert = self._converter.convert
227 rows = [convert(row) for row in raw_rows]
228 return DataCoordinateResultPage(spec=self._spec, rows=rows)
231class DatasetRefResultPageConverter(ResultPageConverter): # numpydoc ignore=PR01
232 """Convert raw SQL result iterables into pages of `DatasetRef` query
233 results.
234 """
236 def __init__(
237 self,
238 spec: DatasetRefResultSpec,
239 dataset_type: DatasetType,
240 ctx: ResultPageConverterContext,
241 ) -> None:
242 self._spec = spec
243 self._dataset_type = dataset_type
244 self._data_coordinate_converter = _DataCoordinateRowConverter(
245 spec.dimensions, ctx, include_dimension_records=spec.include_dimension_records
246 )
247 self._column_order = ctx.column_order
248 self._name_shrinker = ctx.db.name_shrinker
250 def convert(
251 self,
252 raw_rows: Iterable[sqlalchemy.Row],
253 ) -> DatasetRefResultPage:
254 run_column = self._name_shrinker.shrink(
255 qt.ColumnSet.get_qualified_name(self._spec.dataset_type_name, "run")
256 )
257 dataset_id_column = self._name_shrinker.shrink(
258 qt.ColumnSet.get_qualified_name(self._spec.dataset_type_name, "dataset_id")
259 )
260 rows = [
261 DatasetRef(
262 datasetType=self._dataset_type,
263 dataId=self._data_coordinate_converter.convert(row),
264 run=row._mapping[run_column],
265 id=row._mapping[dataset_id_column],
266 conform=False,
267 )
268 for row in raw_rows
269 ]
271 return DatasetRefResultPage(spec=self._spec, rows=rows)
274class _DataCoordinateRowConverter:
275 """Helper for converting a raw SQL result row into a DataCoordinate
276 instance.
277 """
279 def __init__(
280 self,
281 dimensions: DimensionGroup,
282 ctx: ResultPageConverterContext,
283 include_dimension_records: bool,
284 ):
285 assert list(dimensions.data_coordinate_keys) == ctx.column_order.dimension_key_names, (
286 "Dimension keys in result row should be in same order as those specified by the result spec"
287 )
289 self._dimensions = dimensions
290 self._column_order = ctx.column_order
291 self._dimension_record_converter = None
292 if include_dimension_records:
293 self._dimension_record_converter = _DimensionGroupRecordRowConverter(dimensions, ctx)
295 def convert(self, row: sqlalchemy.Row) -> DataCoordinate:
296 coordinate = DataCoordinate.from_full_values(
297 self._dimensions, tuple(self._column_order.extract_dimension_key_columns(row))
298 )
300 if self._dimension_record_converter is None:
301 return coordinate
302 else:
303 return coordinate.expanded(self._dimension_record_converter.convert(row))
306class _DimensionGroupRecordRowConverter: # numpydoc ignore=PR01
307 """Helper for pulling out all the DimensionRecords in a raw SQL result
308 row.
309 """
311 def __init__(self, dimensions: DimensionGroup, ctx: ResultPageConverterContext) -> None:
312 self._record_converters = {
313 name: _create_dimension_record_row_converter(dimensions.universe[name], ctx)
314 for name in dimensions.elements
315 }
317 def convert(self, row: sqlalchemy.Row) -> dict[str, DimensionRecord]: # numpydoc ignore=PR01
318 """Return a mapping from dimension name to dimension records for all
319 the dimensions in the database row.
320 """
321 return {name: converter.convert(row) for name, converter in self._record_converters.items()}
324class GeneralResultPageConverter(ResultPageConverter): # numpydoc ignore=PR01
325 """Converts raw SQL rows into pages of `GeneralResult` query results."""
327 def __init__(self, spec: GeneralResultSpec, ctx: ResultPageConverterContext) -> None:
328 self.spec = spec
329 # In case `spec.include_dimension_records` is True then in addition to
330 # columns returned by the query we have to add columns from dimension
331 # records that are not returned by the query. These columns belong to
332 # either cached or skypix dimensions.
333 columns = spec.get_result_columns()
334 universe = spec.dimensions.universe
335 self.converters: list[_GeneralColumnConverter] = []
336 self.record_converters: dict[DimensionElement, _DimensionRecordRowConverter] = {}
337 for column in columns:
338 column_name = qt.ColumnSet.get_qualified_name(column.logical_table, column.field)
339 converter: _GeneralColumnConverter
340 if column.field == TimespanDatabaseRepresentation.NAME:
341 converter = _TimespanGeneralColumnConverter(column_name, ctx.db)
342 elif column.field == "ingest_date":
343 converter = _TimestampGeneralColumnConverter(column_name)
344 else:
345 converter = _DefaultGeneralColumnConverter(column_name)
346 self.converters.append(converter)
348 if spec.include_dimension_records:
349 universe = self.spec.dimensions.universe
350 for element_name in self.spec.dimensions.elements:
351 element = universe[element_name]
352 if isinstance(element, SkyPixDimension):
353 self.record_converters[element] = _SkypixDimensionRecordRowConverter(element)
354 elif element.is_cached:
355 self.record_converters[element] = _CachedDimensionRecordRowConverter(
356 element, ctx.dimension_record_cache
357 )
359 def convert(self, raw_rows: Iterable[sqlalchemy.Row]) -> GeneralResultPage:
360 rows = []
361 dimension_records = None
362 if self.spec.include_dimension_records:
363 dimension_records = {element: DimensionRecordSet(element) for element in self.record_converters}
364 for row in raw_rows:
365 rows.append(tuple(cvt.convert(row) for cvt in self.converters))
366 if dimension_records:
367 for element, converter in self.record_converters.items():
368 dimension_records[element].add(converter.convert(row))
370 return GeneralResultPage(spec=self.spec, rows=rows, dimension_records=dimension_records)
373class _GeneralColumnConverter:
374 """Interface for converting one or more columns in a result row to a single
375 column value in output row.
376 """
378 @abstractmethod
379 def convert(self, row: sqlalchemy.Row) -> Any:
380 """Convert one or more columns in the row into single value.
382 Parameters
383 ----------
384 row : `sqlalchemy.Row`
385 Row of values.
387 Returns
388 -------
389 value : `typing.Any`
390 Result of the conversion.
391 """
392 raise NotImplementedError()
395class _DefaultGeneralColumnConverter(_GeneralColumnConverter):
396 """Converter that returns column value without conversion.
398 Parameters
399 ----------
400 name : `str`
401 Column name
402 """
404 def __init__(self, name: str):
405 self.name = name
407 def convert(self, row: sqlalchemy.Row) -> Any:
408 return row._mapping[self.name]
411class _TimestampGeneralColumnConverter(_GeneralColumnConverter):
412 """Converter that transforms ``datetime`` instances into astropy Time. Only
413 ``dataset.ingest_date`` column was using native timestamps in the initial
414 schema version, and we are switching to our common nanoseconds-since-epoch
415 representation for that column in newer schema versions. For both schema
416 versions we want to return astropy time to clients.
418 Parameters
419 ----------
420 name : `str`
421 Column name
422 """
424 def __init__(self, name: str):
425 self.name = name
427 def convert(self, row: sqlalchemy.Row) -> Any:
428 value = row._mapping[self.name]
429 if isinstance(value, datetime.datetime):
430 value = astropy.time.Time(value, scale="utc").tai
431 return value
434class _TimespanGeneralColumnConverter(_GeneralColumnConverter):
435 """Converter that extracts timespan from the row.
437 Parameters
438 ----------
439 name : `str`
440 Column name or prefix.
441 db : `Database`
442 Database instance.
443 """
445 def __init__(self, name: str, db: Database):
446 self.timespan_class = db.getTimespanRepresentation()
447 self.name = name
449 def convert(self, row: sqlalchemy.Row) -> Any:
450 timespan = self.timespan_class.extract(row._mapping, self.name)
451 return timespan