Coverage for python / lsst / daf / butler / dimensions / _record_table.py: 30%

69 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-17 08:49 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30__all__ = ("DimensionRecordTable",) 

31 

32from collections.abc import Iterable, Iterator 

33from typing import TYPE_CHECKING, Any, final, overload 

34 

35import pyarrow as pa 

36import pyarrow.compute as pc 

37 

38from lsst.utils.iteration import chunk_iterable 

39 

40if TYPE_CHECKING: 

41 from ._elements import DimensionElement 

42 from ._records import DimensionRecord 

43 from ._universe import DimensionUniverse 

44 

45 

46@final 

47class DimensionRecordTable: 

48 """A table-like container for `DimensionRecord` objects. 

49 

50 Parameters 

51 ---------- 

52 element : `DimensionElement` or `str`, optional 

53 The dimension element that defines the records held by this table. If 

54 not a `DimensionElement` instance, ``universe`` must be provided. If 

55 not provided, ``table`` must have an "element" entry in its metadata 

56 (as is the case for tables returned by the `to_arrow` method). 

57 records : `~collections.abc.Iterable` [ `DimensionRecord` ], optional 

58 Dimension records to add to the table. 

59 universe : `DimensionUniverse`, optional 

60 Object that defines all dimensions. Ignored if ``element`` is a 

61 `DimensionElement` instance. 

62 table : `pyarrow.Table` 

63 Arrow table to copy columns from. Must have schema returned by 

64 `make_arrow_schema` for this element. This argument is primarily 

65 intended to serve as the way to reconstruct a `DimensionRecordTable` 

66 that has been serialized to an Arrow-supported file or IPC format. 

67 batch_size : `int`, optional 

68 How many elements of ``records`` should be processed at a time, with 

69 each batch yielding a `pyarrow.RecordBatch` in the created table. 

70 Smaller values will reduce peak memory usage for large iterables. 

71 Ignored if ``records`` is empty. 

72 

73 Notes 

74 ----- 

75 `DimensionRecordTable` should generally have a smaller memory footprint 

76 than `DimensionRecordSet` if its rows are unique, and it provides fast 

77 column-oriented access and Arrow interoperability that `DimensionRecordSet` 

78 lacks entirely. In other respects `DimensionRecordSet` is more 

79 featureful and simpler to use efficiently. 

80 """ 

81 

82 def __init__( 

83 self, 

84 element: DimensionElement | str | None = None, 

85 records: Iterable[DimensionRecord] = (), 

86 universe: DimensionUniverse | None = None, 

87 table: pa.Table | None = None, 

88 batch_size: int | None = None, 

89 ): 

90 if element is None: 

91 if table is not None and b"element" in table.schema.metadata: 

92 element = table.schema.metadata[b"element"].decode() 

93 else: 

94 raise TypeError("If 'element' is not provided it must be present in 'table.schema.metadata'.") 

95 if isinstance(element, str): 

96 if universe is None: 

97 raise TypeError("'universe' must be provided if 'element' is not a DimensionElement.") 

98 element = universe[element] 

99 else: 

100 universe = element.universe 

101 self._element = element 

102 self._converters = element.schema.to_arrow() 

103 arrow_schema = pa.schema( 

104 [converter.field for converter in self._converters], 

105 { 

106 b"element": element.name.encode(), 

107 # Since the Arrow table might be saved to a file on its own, we 

108 # include the dimension universe's identifiers in its metadata. 

109 b"namespace": element.universe.namespace.encode(), 

110 b"version": str(element.universe.version).encode(), 

111 }, 

112 ) 

113 self._required_value_fields = [pc.field(name) for name in self._element.schema.required.names] 

114 if batch_size is None: 

115 batches = [self._make_batch(records, arrow_schema)] 

116 else: 

117 batches = [ 

118 self._make_batch(record_chunk, arrow_schema) 

119 for record_chunk in chunk_iterable(records, chunk_size=batch_size) 

120 ] 

121 if table is not None: 

122 batches.extend(table.to_batches()) 

123 self._table: pa.Table = pa.Table.from_batches(batches, arrow_schema) 

124 

125 @classmethod 

126 def make_arrow_schema(cls, element: DimensionElement) -> pa.Schema: 

127 """Return the Arrow schema of the table returned by `to_arrow` with the 

128 given dimension element. 

129 

130 Parameters 

131 ---------- 

132 element : `DimensionElement` 

133 Dimension element that defines the schema. 

134 

135 Returns 

136 ------- 

137 schema : `pyarrow.Schema` 

138 Arrow schema. 

139 """ 

140 return pa.schema([converter.field for converter in element.schema.to_arrow()]) 

141 

142 @property 

143 def element(self) -> DimensionElement: 

144 """The dimension element that defines the records of this table.""" 

145 return self._element 

146 

147 def __len__(self) -> int: 

148 return self._table.num_rows 

149 

150 def __iter__(self) -> Iterator[DimensionRecord]: 

151 for i in range(self._table.num_rows): 

152 yield self._get_record_at(self._table, i) 

153 

154 @overload 

155 def __getitem__(self, index: int) -> DimensionRecord: ... 155 ↛ exitline 155 didn't return from function '__getitem__' because

156 

157 @overload 

158 def __getitem__(self, index: slice) -> DimensionRecordTable: ... 158 ↛ exitline 158 didn't return from function '__getitem__' because

159 

160 def __getitem__(self, index: int | slice) -> DimensionRecord | DimensionRecordTable: 

161 if isinstance(index, slice): 

162 result = object.__new__(DimensionRecordTable) 

163 result._element = self._element 

164 result._converters = self._converters 

165 result._table = self._table[index] 

166 return result 

167 else: 

168 return self._get_record_at(self._table, index) 

169 

170 def extend(self, records: Iterable[DimensionRecord]) -> None: 

171 """Add new rows to the end of the table. 

172 

173 Parameters 

174 ---------- 

175 records : `~collections.abc.Iterable` [ `DimensionRecord` ] 

176 Dimension records to add to the table. 

177 """ 

178 batches: list[pa.RecordBatch] = self._table.to_batches() 

179 batches.append(self._make_batch(records, self._table.schema)) 

180 self._table = pa.Table.from_batches(batches, self._table.schema) 

181 

182 def column(self, name: str) -> pa.ChunkedArray: 

183 """Return a single column from the table as an array. 

184 

185 Parameters 

186 ---------- 

187 name : `str` 

188 Name of the column. Valid options are given by 

189 `DimensionElement.schema.names`, and are the same as the attributes 

190 of the dimension records. 

191 

192 Returns 

193 ------- 

194 array : `pyarrow.ChunkedArray` 

195 An array view of the column. 

196 """ 

197 return self._table.column(name) 

198 

199 def to_arrow(self) -> pa.Table: 

200 """Return a Arrow table holding the same records.""" 

201 return self._table 

202 

203 def _make_batch(self, records: Iterable[DimensionRecord], arrow_schema: pa.Schema) -> pa.RecordBatch: 

204 """Make a `pyarrow.RecordBatch` from an iterable of `DimensionRecord`. 

205 

206 Parameters 

207 ---------- 

208 records : `~collections.abc.Iterable` [ `DimensionRecord` ] 

209 Records to add. 

210 arrow_schema : `pyarrow.Schema` 

211 Arrow schema for the record batch. 

212 

213 Returns 

214 ------- 

215 batch : `pyarrow.RecordBatch` 

216 Record batch holding the records. 

217 """ 

218 list_columns: list[list[Any]] = [list() for _ in self._converters] 

219 for record in records: 

220 for converter, column in zip(self._converters, list_columns): 

221 converter.append(getattr(record, converter.name), column) 

222 array_columns = [ 

223 converter.finish(column) for converter, column in zip(self._converters, list_columns) 

224 ] 

225 return pa.record_batch(array_columns, schema=arrow_schema) 

226 

227 def _get_record_at(self, table: pa.Table | pa.RecordBatch, index: int) -> DimensionRecord: 

228 """Construct a `DimensionRecord` from a row in the table. 

229 

230 Parameters 

231 ---------- 

232 table : `pyarrow.Table` or `pyarrow.RecordBatch` 

233 Table or record batch to get values from. 

234 index : `int` 

235 Index of the row to extract. 

236 

237 Returns 

238 ------- 

239 record : `DimensionRecord` 

240 Dimension record representing a table row. 

241 """ 

242 return self._element.RecordClass( 

243 **{k: table.column(j)[index].as_py() for j, k in enumerate(self._element.schema.all.names)} 

244 )