Coverage for python / lsst / daf / butler / delegates / arrowtable.py: 8%

135 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-28 08:36 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28"""Support for reading Arrow tables.""" 

29 

30from __future__ import annotations 

31 

32__all__ = ["ArrowTableDelegate"] 

33 

34from collections.abc import Mapping 

35from fnmatch import fnmatchcase 

36from typing import TYPE_CHECKING, Any 

37 

38import pyarrow as pa 

39 

40from lsst.daf.butler import DatasetProvenance, StorageClassDelegate 

41from lsst.utils.introspection import get_full_type_name 

42from lsst.utils.iteration import ensure_iterable 

43 

44if TYPE_CHECKING: 

45 import pandas 

46 

47 from lsst.daf.butler import DatasetRef 

48 

49 

50class ArrowTableDelegate(StorageClassDelegate): 

51 """Delegate that understands ArrowTable and related storage classes.""" 

52 

53 def can_accept(self, inMemoryDataset: Any) -> bool: 

54 # Docstring inherited. 

55 return _checkArrowCompatibleType(inMemoryDataset) is not None 

56 

57 def getComponent(self, composite: Any, componentName: str) -> Any: 

58 """Get a component from an Arrow table or equivalent. 

59 

60 Parameters 

61 ---------- 

62 composite : `~pyarrow.Table` 

63 Arrow table to access component. 

64 componentName : `str` 

65 Name of component to retrieve. 

66 

67 Returns 

68 ------- 

69 component : `object` 

70 The component. 

71 

72 Raises 

73 ------ 

74 AttributeError 

75 The component can not be found. 

76 """ 

77 typeString = _checkArrowCompatibleType(composite) 

78 

79 if typeString is None: 

80 raise ValueError(f"Unsupported composite type {get_full_type_name(composite)}") 

81 

82 if componentName == "columns": 

83 if typeString == "arrow": 

84 return composite.schema 

85 elif typeString == "astropy": 

86 return list(composite.columns.keys()) 

87 elif typeString == "numpy": 

88 return list(composite.dtype.names) 

89 elif typeString == "numpydict": 

90 return list(composite.keys()) 

91 elif typeString == "pandas": 

92 import pandas 

93 

94 if isinstance(composite.columns, pandas.MultiIndex): 

95 return composite.columns 

96 else: 

97 return pandas.Index(self._getAllDataframeColumns(composite)) 

98 

99 elif componentName == "schema": 

100 if typeString == "arrow": 

101 return composite.schema 

102 elif typeString == "astropy": 

103 from lsst.daf.butler.formatters.parquet import ArrowAstropySchema 

104 

105 return ArrowAstropySchema(composite) 

106 elif typeString == "numpy": 

107 from lsst.daf.butler.formatters.parquet import ArrowNumpySchema 

108 

109 return ArrowNumpySchema(composite.dtype) 

110 elif typeString == "numpydict": 

111 from lsst.daf.butler.formatters.parquet import ArrowNumpySchema, _numpy_dict_to_dtype 

112 

113 dtype, _ = _numpy_dict_to_dtype(composite) 

114 return ArrowNumpySchema(dtype) 

115 elif typeString == "pandas": 

116 from lsst.daf.butler.formatters.parquet import DataFrameSchema 

117 

118 return DataFrameSchema(composite.iloc[:0]) 

119 elif componentName == "rowcount": 

120 if typeString == "arrow": 

121 return len(composite[composite.schema.names[0]]) 

122 elif typeString in ["astropy", "numpy", "pandas"]: 

123 return len(composite) 

124 elif typeString == "numpydict": 

125 return len(composite[list(composite.keys())[0]]) 

126 

127 raise AttributeError( 

128 f"Do not know how to retrieve component {componentName} from {get_full_type_name(composite)}" 

129 ) 

130 

131 def handleParameters(self, inMemoryDataset: Any, parameters: Mapping[str, Any] | None = None) -> Any: 

132 typeString = _checkArrowCompatibleType(inMemoryDataset) 

133 

134 if typeString is None: 

135 raise ValueError(f"Unsupported inMemoryDataset type {get_full_type_name(inMemoryDataset)}") 

136 

137 if parameters is None: 

138 return inMemoryDataset 

139 

140 if "columns" in parameters: 

141 readColumns = list(ensure_iterable(parameters["columns"])) 

142 

143 if typeString == "arrow": 

144 allColumns = inMemoryDataset.schema.names 

145 elif typeString == "astropy": 

146 allColumns = inMemoryDataset.columns.keys() 

147 elif typeString == "numpy": 

148 allColumns = inMemoryDataset.dtype.names 

149 elif typeString == "numpydict": 

150 allColumns = list(inMemoryDataset.keys()) 

151 elif typeString == "pandas": 

152 import pandas 

153 

154 allColumns = self._getAllDataframeColumns(inMemoryDataset) 

155 

156 if typeString == "pandas" and isinstance(inMemoryDataset.columns, pandas.MultiIndex): 

157 from ..formatters.parquet import _standardize_multi_index_columns 

158 

159 # We have a multi-index dataframe which needs special 

160 # handling. 

161 readColumns = _standardize_multi_index_columns( 

162 inMemoryDataset.columns, 

163 parameters["columns"], 

164 stringify=False, 

165 ) 

166 # Ensure uniqueness, keeping order. 

167 readColumns = list(dict.fromkeys(readColumns)) 

168 else: 

169 readColumnsIn = list(dict.fromkeys(ensure_iterable(parameters["columns"]))) 

170 

171 if typeString == "pandas": 

172 # Exclude index columns from the subset. 

173 readColumnsIn = [ 

174 name for name in readColumnsIn if name not in inMemoryDataset.index.names 

175 ] 

176 

177 readColumnsDict = {} 

178 for column in readColumnsIn: 

179 if not isinstance(column, str): 

180 raise NotImplementedError( 

181 f"InMemoryDataset of a {get_full_type_name(inMemoryDataset)} only " 

182 "supports string column names." 

183 ) 

184 found = False 

185 for allColumn in allColumns: 

186 if fnmatchcase(allColumn, column): 

187 found = True 

188 readColumnsDict[allColumn] = True 

189 if not found: 

190 raise ValueError(f"Unrecognized column name {column!r}.") 

191 

192 readColumns = list(readColumnsDict.keys()) 

193 

194 if typeString == "arrow": 

195 return inMemoryDataset.select(readColumns) 

196 elif typeString in ("astropy", "numpy", "pandas"): 

197 return inMemoryDataset[readColumns] 

198 elif typeString == "numpydict": 

199 return {column: inMemoryDataset[column] for column in readColumns} 

200 else: 

201 return inMemoryDataset 

202 

203 def _getAllDataframeColumns(self, dataset: pandas.DataFrame) -> list[str]: 

204 """Get all columns, including index columns. 

205 

206 Returns 

207 ------- 

208 columns : `list` [`str`] 

209 List of all columns. 

210 """ 

211 allColumns = list(dataset.columns) 

212 if dataset.index.names[0] is not None: 

213 allColumns.extend(dataset.index.names) 

214 

215 return allColumns 

216 

217 def add_provenance( 

218 self, in_memory_dataset: Any, ref: DatasetRef, provenance: DatasetProvenance | None = None 

219 ) -> Any: 

220 return _add_arrow_provenance(in_memory_dataset, ref, provenance) 

221 

222 

223def _add_arrow_provenance( 

224 in_memory_dataset: Any, ref: DatasetRef, provenance: DatasetProvenance | None = None 

225) -> Any: 

226 # Add provenance as a flat dict. For now only do this for Astropy 

227 # tables which have an implemented mechanism for round tripping 

228 # metadata. 

229 type_string = _checkArrowCompatibleType(in_memory_dataset) 

230 if type_string == "astropy": 

231 provenance = provenance if provenance is not None else DatasetProvenance() 

232 prov_dict = provenance.to_flat_dict( 

233 ref, prefix="LSST.BUTLER", sep=".", simple_types=True, max_inputs=2000 

234 ) 

235 

236 # Strip any previous provenance. 

237 DatasetProvenance.strip_provenance_from_flat_dict(in_memory_dataset.meta) 

238 

239 in_memory_dataset.meta.update(prov_dict) 

240 return in_memory_dataset 

241 

242 

243def _checkArrowCompatibleType(dataset: Any) -> str | None: 

244 """Check a dataset for arrow compatiblity and return type string. 

245 

246 Parameters 

247 ---------- 

248 dataset : `object` 

249 Dataset object. 

250 

251 Returns 

252 ------- 

253 typeString : `str` 

254 Type string will be ``arrow`` or ``astropy`` or ``numpy`` or ``pandas`` 

255 or "numpydict". 

256 """ 

257 import numpy as np 

258 from astropy.table import Table as astropyTable 

259 

260 if isinstance(dataset, pa.Table): 

261 return "arrow" 

262 elif isinstance(dataset, astropyTable): 

263 return "astropy" 

264 elif isinstance(dataset, np.ndarray): 

265 return "numpy" 

266 elif isinstance(dataset, dict): 

267 for key, item in dataset.items(): 

268 if not isinstance(item, np.ndarray): 

269 # This is some other sort of dictionary. 

270 return None 

271 return "numpydict" 

272 elif hasattr(dataset, "to_parquet"): 

273 # This may be a pandas DataFrame 

274 try: 

275 import pandas 

276 except ImportError: 

277 pandas = None 

278 

279 if pandas is not None and isinstance(dataset, pandas.DataFrame): 

280 return "pandas" 

281 

282 return None