Coverage for python / lsst / daf / butler / dimensions / _record_table.py: 30%
69 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 08:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 08:55 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30__all__ = ("DimensionRecordTable",)
32from collections.abc import Iterable, Iterator
33from typing import TYPE_CHECKING, Any, final, overload
35import pyarrow as pa
36import pyarrow.compute as pc
38from lsst.utils.iteration import chunk_iterable
40if TYPE_CHECKING:
41 from ._elements import DimensionElement
42 from ._records import DimensionRecord
43 from ._universe import DimensionUniverse
46@final
47class DimensionRecordTable:
48 """A table-like container for `DimensionRecord` objects.
50 Parameters
51 ----------
52 element : `DimensionElement` or `str`, optional
53 The dimension element that defines the records held by this table. If
54 not a `DimensionElement` instance, ``universe`` must be provided. If
55 not provided, ``table`` must have an "element" entry in its metadata
56 (as is the case for tables returned by the `to_arrow` method).
57 records : `~collections.abc.Iterable` [ `DimensionRecord` ], optional
58 Dimension records to add to the table.
59 universe : `DimensionUniverse`, optional
60 Object that defines all dimensions. Ignored if ``element`` is a
61 `DimensionElement` instance.
62 table : `pyarrow.Table`
63 Arrow table to copy columns from. Must have schema returned by
64 `make_arrow_schema` for this element. This argument is primarily
65 intended to serve as the way to reconstruct a `DimensionRecordTable`
66 that has been serialized to an Arrow-supported file or IPC format.
67 batch_size : `int`, optional
68 How many elements of ``records`` should be processed at a time, with
69 each batch yielding a `pyarrow.RecordBatch` in the created table.
70 Smaller values will reduce peak memory usage for large iterables.
71 Ignored if ``records`` is empty.
73 Notes
74 -----
75 `DimensionRecordTable` should generally have a smaller memory footprint
76 than `DimensionRecordSet` if its rows are unique, and it provides fast
77 column-oriented access and Arrow interoperability that `DimensionRecordSet`
78 lacks entirely. In other respects `DimensionRecordSet` is more
79 featureful and simpler to use efficiently.
80 """
82 def __init__(
83 self,
84 element: DimensionElement | str | None = None,
85 records: Iterable[DimensionRecord] = (),
86 universe: DimensionUniverse | None = None,
87 table: pa.Table | None = None,
88 batch_size: int | None = None,
89 ):
90 if element is None:
91 if table is not None and b"element" in table.schema.metadata:
92 element = table.schema.metadata[b"element"].decode()
93 else:
94 raise TypeError("If 'element' is not provided it must be present in 'table.schema.metadata'.")
95 if isinstance(element, str):
96 if universe is None:
97 raise TypeError("'universe' must be provided if 'element' is not a DimensionElement.")
98 element = universe[element]
99 else:
100 universe = element.universe
101 self._element = element
102 self._converters = element.schema.to_arrow()
103 arrow_schema = pa.schema(
104 [converter.field for converter in self._converters],
105 {
106 b"element": element.name.encode(),
107 # Since the Arrow table might be saved to a file on its own, we
108 # include the dimension universe's identifiers in its metadata.
109 b"namespace": element.universe.namespace.encode(),
110 b"version": str(element.universe.version).encode(),
111 },
112 )
113 self._required_value_fields = [pc.field(name) for name in self._element.schema.required.names]
114 if batch_size is None:
115 batches = [self._make_batch(records, arrow_schema)]
116 else:
117 batches = [
118 self._make_batch(record_chunk, arrow_schema)
119 for record_chunk in chunk_iterable(records, chunk_size=batch_size)
120 ]
121 if table is not None:
122 batches.extend(table.to_batches())
123 self._table: pa.Table = pa.Table.from_batches(batches, arrow_schema)
125 @classmethod
126 def make_arrow_schema(cls, element: DimensionElement) -> pa.Schema:
127 """Return the Arrow schema of the table returned by `to_arrow` with the
128 given dimension element.
130 Parameters
131 ----------
132 element : `DimensionElement`
133 Dimension element that defines the schema.
135 Returns
136 -------
137 schema : `pyarrow.Schema`
138 Arrow schema.
139 """
140 return pa.schema([converter.field for converter in element.schema.to_arrow()])
142 @property
143 def element(self) -> DimensionElement:
144 """The dimension element that defines the records of this table."""
145 return self._element
147 def __len__(self) -> int:
148 return self._table.num_rows
150 def __iter__(self) -> Iterator[DimensionRecord]:
151 for i in range(self._table.num_rows):
152 yield self._get_record_at(self._table, i)
154 @overload
155 def __getitem__(self, index: int) -> DimensionRecord: ... 155 ↛ exitline 155 didn't return from function '__getitem__' because
157 @overload
158 def __getitem__(self, index: slice) -> DimensionRecordTable: ... 158 ↛ exitline 158 didn't return from function '__getitem__' because
160 def __getitem__(self, index: int | slice) -> DimensionRecord | DimensionRecordTable:
161 if isinstance(index, slice):
162 result = object.__new__(DimensionRecordTable)
163 result._element = self._element
164 result._converters = self._converters
165 result._table = self._table[index]
166 return result
167 else:
168 return self._get_record_at(self._table, index)
170 def extend(self, records: Iterable[DimensionRecord]) -> None:
171 """Add new rows to the end of the table.
173 Parameters
174 ----------
175 records : `~collections.abc.Iterable` [ `DimensionRecord` ]
176 Dimension records to add to the table.
177 """
178 batches: list[pa.RecordBatch] = self._table.to_batches()
179 batches.append(self._make_batch(records, self._table.schema))
180 self._table = pa.Table.from_batches(batches, self._table.schema)
182 def column(self, name: str) -> pa.ChunkedArray:
183 """Return a single column from the table as an array.
185 Parameters
186 ----------
187 name : `str`
188 Name of the column. Valid options are given by
189 `DimensionElement.schema.names`, and are the same as the attributes
190 of the dimension records.
192 Returns
193 -------
194 array : `pyarrow.ChunkedArray`
195 An array view of the column.
196 """
197 return self._table.column(name)
199 def to_arrow(self) -> pa.Table:
200 """Return a Arrow table holding the same records."""
201 return self._table
203 def _make_batch(self, records: Iterable[DimensionRecord], arrow_schema: pa.Schema) -> pa.RecordBatch:
204 """Make a `pyarrow.RecordBatch` from an iterable of `DimensionRecord`.
206 Parameters
207 ----------
208 records : `~collections.abc.Iterable` [ `DimensionRecord` ]
209 Records to add.
210 arrow_schema : `pyarrow.Schema`
211 Arrow schema for the record batch.
213 Returns
214 -------
215 batch : `pyarrow.RecordBatch`
216 Record batch holding the records.
217 """
218 list_columns: list[list[Any]] = [list() for _ in self._converters]
219 for record in records:
220 for converter, column in zip(self._converters, list_columns):
221 converter.append(getattr(record, converter.name), column)
222 array_columns = [
223 converter.finish(column) for converter, column in zip(self._converters, list_columns)
224 ]
225 return pa.record_batch(array_columns, schema=arrow_schema)
227 def _get_record_at(self, table: pa.Table | pa.RecordBatch, index: int) -> DimensionRecord:
228 """Construct a `DimensionRecord` from a row in the table.
230 Parameters
231 ----------
232 table : `pyarrow.Table` or `pyarrow.RecordBatch`
233 Table or record batch to get values from.
234 index : `int`
235 Index of the row to extract.
237 Returns
238 -------
239 record : `DimensionRecord`
240 Dimension record representing a table row.
241 """
242 return self._element.RecordClass(
243 **{k: table.column(j)[index].as_py() for j, k in enumerate(self._element.schema.all.names)}
244 )