Coverage for python / lsst / daf / butler / delegates / arrowtable.py: 8%
135 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 08:43 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 08:43 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28"""Support for reading Arrow tables."""
30from __future__ import annotations
32__all__ = ["ArrowTableDelegate"]
34from collections.abc import Mapping
35from fnmatch import fnmatchcase
36from typing import TYPE_CHECKING, Any
38import pyarrow as pa
40from lsst.daf.butler import DatasetProvenance, StorageClassDelegate
41from lsst.utils.introspection import get_full_type_name
42from lsst.utils.iteration import ensure_iterable
44if TYPE_CHECKING:
45 import pandas
47 from lsst.daf.butler import DatasetRef
50class ArrowTableDelegate(StorageClassDelegate):
51 """Delegate that understands ArrowTable and related storage classes."""
53 def can_accept(self, inMemoryDataset: Any) -> bool:
54 # Docstring inherited.
55 return _checkArrowCompatibleType(inMemoryDataset) is not None
57 def getComponent(self, composite: Any, componentName: str) -> Any:
58 """Get a component from an Arrow table or equivalent.
60 Parameters
61 ----------
62 composite : `~pyarrow.Table`
63 Arrow table to access component.
64 componentName : `str`
65 Name of component to retrieve.
67 Returns
68 -------
69 component : `object`
70 The component.
72 Raises
73 ------
74 AttributeError
75 The component can not be found.
76 """
77 typeString = _checkArrowCompatibleType(composite)
79 if typeString is None:
80 raise ValueError(f"Unsupported composite type {get_full_type_name(composite)}")
82 if componentName == "columns":
83 if typeString == "arrow":
84 return composite.schema
85 elif typeString == "astropy":
86 return list(composite.columns.keys())
87 elif typeString == "numpy":
88 return list(composite.dtype.names)
89 elif typeString == "numpydict":
90 return list(composite.keys())
91 elif typeString == "pandas":
92 import pandas
94 if isinstance(composite.columns, pandas.MultiIndex):
95 return composite.columns
96 else:
97 return pandas.Index(self._getAllDataframeColumns(composite))
99 elif componentName == "schema":
100 if typeString == "arrow":
101 return composite.schema
102 elif typeString == "astropy":
103 from lsst.daf.butler.formatters.parquet import ArrowAstropySchema
105 return ArrowAstropySchema(composite)
106 elif typeString == "numpy":
107 from lsst.daf.butler.formatters.parquet import ArrowNumpySchema
109 return ArrowNumpySchema(composite.dtype)
110 elif typeString == "numpydict":
111 from lsst.daf.butler.formatters.parquet import ArrowNumpySchema, _numpy_dict_to_dtype
113 dtype, _ = _numpy_dict_to_dtype(composite)
114 return ArrowNumpySchema(dtype)
115 elif typeString == "pandas":
116 from lsst.daf.butler.formatters.parquet import DataFrameSchema
118 return DataFrameSchema(composite.iloc[:0])
119 elif componentName == "rowcount":
120 if typeString == "arrow":
121 return len(composite[composite.schema.names[0]])
122 elif typeString in ["astropy", "numpy", "pandas"]:
123 return len(composite)
124 elif typeString == "numpydict":
125 return len(composite[list(composite.keys())[0]])
127 raise AttributeError(
128 f"Do not know how to retrieve component {componentName} from {get_full_type_name(composite)}"
129 )
131 def handleParameters(self, inMemoryDataset: Any, parameters: Mapping[str, Any] | None = None) -> Any:
132 typeString = _checkArrowCompatibleType(inMemoryDataset)
134 if typeString is None:
135 raise ValueError(f"Unsupported inMemoryDataset type {get_full_type_name(inMemoryDataset)}")
137 if parameters is None:
138 return inMemoryDataset
140 if "columns" in parameters:
141 readColumns = list(ensure_iterable(parameters["columns"]))
143 if typeString == "arrow":
144 allColumns = inMemoryDataset.schema.names
145 elif typeString == "astropy":
146 allColumns = inMemoryDataset.columns.keys()
147 elif typeString == "numpy":
148 allColumns = inMemoryDataset.dtype.names
149 elif typeString == "numpydict":
150 allColumns = list(inMemoryDataset.keys())
151 elif typeString == "pandas":
152 import pandas
154 allColumns = self._getAllDataframeColumns(inMemoryDataset)
156 if typeString == "pandas" and isinstance(inMemoryDataset.columns, pandas.MultiIndex):
157 from ..formatters.parquet import _standardize_multi_index_columns
159 # We have a multi-index dataframe which needs special
160 # handling.
161 readColumns = _standardize_multi_index_columns(
162 inMemoryDataset.columns,
163 parameters["columns"],
164 stringify=False,
165 )
166 # Ensure uniqueness, keeping order.
167 readColumns = list(dict.fromkeys(readColumns))
168 else:
169 readColumnsIn = list(dict.fromkeys(ensure_iterable(parameters["columns"])))
171 if typeString == "pandas":
172 # Exclude index columns from the subset.
173 readColumnsIn = [
174 name for name in readColumnsIn if name not in inMemoryDataset.index.names
175 ]
177 readColumnsDict = {}
178 for column in readColumnsIn:
179 if not isinstance(column, str):
180 raise NotImplementedError(
181 f"InMemoryDataset of a {get_full_type_name(inMemoryDataset)} only "
182 "supports string column names."
183 )
184 found = False
185 for allColumn in allColumns:
186 if fnmatchcase(allColumn, column):
187 found = True
188 readColumnsDict[allColumn] = True
189 if not found:
190 raise ValueError(f"Unrecognized column name {column!r}.")
192 readColumns = list(readColumnsDict.keys())
194 if typeString == "arrow":
195 return inMemoryDataset.select(readColumns)
196 elif typeString in ("astropy", "numpy", "pandas"):
197 return inMemoryDataset[readColumns]
198 elif typeString == "numpydict":
199 return {column: inMemoryDataset[column] for column in readColumns}
200 else:
201 return inMemoryDataset
203 def _getAllDataframeColumns(self, dataset: pandas.DataFrame) -> list[str]:
204 """Get all columns, including index columns.
206 Returns
207 -------
208 columns : `list` [`str`]
209 List of all columns.
210 """
211 allColumns = list(dataset.columns)
212 if dataset.index.names[0] is not None:
213 allColumns.extend(dataset.index.names)
215 return allColumns
217 def add_provenance(
218 self, in_memory_dataset: Any, ref: DatasetRef, provenance: DatasetProvenance | None = None
219 ) -> Any:
220 return _add_arrow_provenance(in_memory_dataset, ref, provenance)
223def _add_arrow_provenance(
224 in_memory_dataset: Any, ref: DatasetRef, provenance: DatasetProvenance | None = None
225) -> Any:
226 # Add provenance as a flat dict. For now only do this for Astropy
227 # tables which have an implemented mechanism for round tripping
228 # metadata.
229 type_string = _checkArrowCompatibleType(in_memory_dataset)
230 if type_string == "astropy":
231 provenance = provenance if provenance is not None else DatasetProvenance()
232 prov_dict = provenance.to_flat_dict(
233 ref, prefix="LSST.BUTLER", sep=".", simple_types=True, max_inputs=2000
234 )
236 # Strip any previous provenance.
237 DatasetProvenance.strip_provenance_from_flat_dict(in_memory_dataset.meta)
239 in_memory_dataset.meta.update(prov_dict)
240 return in_memory_dataset
243def _checkArrowCompatibleType(dataset: Any) -> str | None:
244 """Check a dataset for arrow compatiblity and return type string.
246 Parameters
247 ----------
248 dataset : `object`
249 Dataset object.
251 Returns
252 -------
253 typeString : `str`
254 Type string will be ``arrow`` or ``astropy`` or ``numpy`` or ``pandas``
255 or "numpydict".
256 """
257 import numpy as np
258 from astropy.table import Table as astropyTable
260 if isinstance(dataset, pa.Table):
261 return "arrow"
262 elif isinstance(dataset, astropyTable):
263 return "astropy"
264 elif isinstance(dataset, np.ndarray):
265 return "numpy"
266 elif isinstance(dataset, dict):
267 for key, item in dataset.items():
268 if not isinstance(item, np.ndarray):
269 # This is some other sort of dictionary.
270 return None
271 return "numpydict"
272 elif hasattr(dataset, "to_parquet"):
273 # This may be a pandas DataFrame
274 try:
275 import pandas
276 except ImportError:
277 pandas = None
279 if pandas is not None and isinstance(dataset, pandas.DataFrame):
280 return "pandas"
282 return None