Coverage for python / lsst / daf / butler / formatters / parquet.py: 12%

553 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-06 08:30 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30__all__ = ( 

31 "ArrowAstropySchema", 

32 "ArrowNumpySchema", 

33 "DataFrameSchema", 

34 "ParquetFormatter", 

35 "add_pandas_index_to_astropy", 

36 "arrow_schema_to_pandas_index", 

37 "arrow_to_astropy", 

38 "arrow_to_numpy", 

39 "arrow_to_numpy_dict", 

40 "arrow_to_pandas", 

41 "astropy_to_arrow", 

42 "astropy_to_pandas", 

43 "compute_row_group_size", 

44 "numpy_dict_to_arrow", 

45 "numpy_to_arrow", 

46 "numpy_to_astropy", 

47 "pandas_to_arrow", 

48 "pandas_to_astropy", 

49) 

50 

51import collections.abc 

52import contextlib 

53import itertools 

54import json 

55import logging 

56import re 

57from collections.abc import Generator, Iterable, Sequence 

58from fnmatch import fnmatchcase 

59from typing import IO, TYPE_CHECKING, Any, cast 

60 

61import pyarrow as pa 

62import pyarrow.parquet as pq 

63 

64from lsst.daf.butler import DatasetProvenance, FormatterV2 

65from lsst.daf.butler.delegates.arrowtable import _add_arrow_provenance, _checkArrowCompatibleType 

66from lsst.resources import ResourcePath 

67from lsst.utils.introspection import get_full_type_name 

68from lsst.utils.iteration import ensure_iterable 

69 

70log = logging.getLogger(__name__) 

71 

72if TYPE_CHECKING: 

73 import astropy.table as atable 

74 import numpy as np 

75 import pandas as pd 

76 

77 try: 

78 import fsspec 

79 from fsspec.spec import AbstractFileSystem 

80 except ImportError: 

81 fsspec = None 

82 AbstractFileSystem = type 

83 

84TARGET_ROW_GROUP_BYTES = 1_000_000_000 

85ASTROPY_PANDAS_INDEX_KEY = "lsst::arrow::astropy_pandas_index" 

86 

87 

88@contextlib.contextmanager 

89def generic_open(path: str, fs: AbstractFileSystem | None) -> Generator[IO]: 

90 if fs is None: 

91 with open(path, "rb") as fh: 

92 yield fh 

93 else: 

94 with fs.open(path) as fh: 

95 yield fh 

96 

97 

98class ParquetFormatter(FormatterV2): 

99 """Interface for reading and writing Arrow Table objects to and from 

100 Parquet files. 

101 """ 

102 

103 default_extension = ".parq" 

104 can_read_from_uri = True 

105 can_read_from_local_file = True 

106 

107 def can_accept(self, in_memory_dataset: Any) -> bool: 

108 # Docstring inherited. 

109 return _checkArrowCompatibleType(in_memory_dataset) is not None 

110 

111 def read_from_uri(self, uri: ResourcePath, component: str | None = None, expected_size: int = -1) -> Any: 

112 # Docstring inherited from Formatter.read. 

113 try: 

114 fs, path = uri.to_fsspec() 

115 except ImportError: 

116 log.debug("fsspec not available; falling back to local file access.") 

117 # This signals to the formatter to use the read_from_local_file 

118 # code path. 

119 return NotImplemented 

120 

121 return self._read_parquet(path=path, fs=fs, component=component, expected_size=expected_size) 

122 

123 def read_from_local_file(self, path: str, component: str | None = None, expected_size: int = -1) -> Any: 

124 # Docstring inherited from Formatter.read. 

125 return self._read_parquet(path=path, component=component, expected_size=expected_size) 

126 

127 def _read_parquet( 

128 self, 

129 path: str, 

130 fs: AbstractFileSystem | None = None, 

131 component: str | None = None, 

132 expected_size: int = -1, 

133 ) -> Any: 

134 with generic_open(path, fs) as handle: 

135 schema = pq.read_schema(handle) 

136 

137 schema_names = ["ArrowSchema", "DataFrameSchema", "ArrowAstropySchema", "ArrowNumpySchema"] 

138 

139 if component in ("columns", "schema") or self.file_descriptor.readStorageClass.name in schema_names: 

140 # The schema will be translated to column format 

141 # depending on the input type. 

142 return schema 

143 elif component == "rowcount": 

144 # Get the rowcount from the metadata if possible, otherwise count. 

145 if b"lsst::arrow::rowcount" in schema.metadata: 

146 return int(schema.metadata[b"lsst::arrow::rowcount"]) 

147 

148 with generic_open(path, fs) as handle: 

149 temp_table = pq.read_table( 

150 handle, 

151 columns=[schema.names[0]], 

152 use_threads=False, 

153 use_pandas_metadata=False, 

154 ) 

155 

156 return len(temp_table[schema.names[0]]) 

157 

158 par_columns = None 

159 if self.file_descriptor.parameters: 

160 par_columns = self.file_descriptor.parameters.pop("columns", None) 

161 if par_columns: 

162 has_pandas_multi_index = False 

163 if schema.metadata and b"pandas" in schema.metadata: 

164 md = json.loads(schema.metadata[b"pandas"]) 

165 if len(md["column_indexes"]) > 1: 

166 has_pandas_multi_index = True 

167 

168 if not has_pandas_multi_index: 

169 # Ensure uniqueness, keeping order. 

170 par_columns_in = list(dict.fromkeys(ensure_iterable(par_columns))) 

171 file_columns = [name for name in schema.names if not name.startswith("__")] 

172 

173 # Do case-sensitive glob-style matching, again ensuring 

174 # uniqueness and ordering. 

175 par_columns = {} 

176 for par_column in par_columns_in: 

177 found = False 

178 for file_column in file_columns: 

179 if fnmatchcase(file_column, par_column): 

180 found = True 

181 par_columns[file_column] = True 

182 if not found: 

183 raise ValueError( 

184 f"Column {par_column} specified in parameters not available in parquet file." 

185 ) 

186 par_columns = list(par_columns.keys()) 

187 else: 

188 par_columns = _standardize_multi_index_columns( 

189 arrow_schema_to_pandas_index(schema), 

190 par_columns, 

191 ) 

192 

193 if len(self.file_descriptor.parameters): 

194 raise ValueError( 

195 f"Unsupported parameters {self.file_descriptor.parameters} in ArrowTable read." 

196 ) 

197 

198 metadata = schema.metadata if schema.metadata is not None else {} 

199 with generic_open(path, fs) as handle: 

200 arrow_table = pq.read_table( 

201 handle, 

202 columns=par_columns, 

203 use_threads=False, 

204 use_pandas_metadata=(b"pandas" in metadata), 

205 ) 

206 

207 return arrow_table 

208 

209 def add_provenance(self, in_memory_dataset: Any, provenance: DatasetProvenance | None = None) -> Any: 

210 return _add_arrow_provenance(in_memory_dataset, self.dataset_ref, provenance) 

211 

212 def write_local_file(self, in_memory_dataset: Any, uri: ResourcePath) -> None: 

213 """Serialize the in memory dataset to a local parquet file. 

214 

215 Parameters 

216 ---------- 

217 in_memory_dataset : `typing.Any` 

218 The Python object to serialize. 

219 uri : `lsst.resources.ResourcePath` 

220 The location to write the local file. 

221 """ 

222 if isinstance(in_memory_dataset, pa.Schema): 

223 pq.write_metadata(in_memory_dataset, uri.ospath) 

224 return 

225 

226 type_string = _checkArrowCompatibleType(in_memory_dataset) 

227 

228 if type_string is None: 

229 raise ValueError( 

230 f"Unsupported type {get_full_type_name(in_memory_dataset)} of " 

231 "inMemoryDataset for ParquetFormatter." 

232 ) 

233 

234 if type_string == "arrow": 

235 arrow_table = in_memory_dataset 

236 elif type_string == "astropy": 

237 arrow_table = astropy_to_arrow(in_memory_dataset) 

238 elif type_string == "numpy": 

239 arrow_table = numpy_to_arrow(in_memory_dataset) 

240 elif type_string == "numpydict": 

241 arrow_table = numpy_dict_to_arrow(in_memory_dataset) 

242 else: 

243 arrow_table = pandas_to_arrow(in_memory_dataset) 

244 

245 row_group_size = compute_row_group_size(arrow_table.schema) 

246 

247 pq.write_table(arrow_table, uri.ospath, row_group_size=row_group_size) 

248 

249 

250def arrow_to_pandas(arrow_table: pa.Table) -> pd.DataFrame: 

251 """Convert a pyarrow table to a pandas DataFrame. 

252 

253 Parameters 

254 ---------- 

255 arrow_table : `pyarrow.Table` 

256 Input arrow table to convert. If the table has ``pandas`` metadata 

257 in the schema it will be used in the construction of the 

258 ``DataFrame``. 

259 

260 Returns 

261 ------- 

262 dataframe : `pandas.DataFrame` 

263 Converted pandas dataframe. 

264 """ 

265 dataframe = arrow_table.to_pandas(use_threads=False, integer_object_nulls=True) 

266 

267 metadata = arrow_table.schema.metadata if arrow_table.schema.metadata is not None else {} 

268 if (key := ASTROPY_PANDAS_INDEX_KEY.encode()) in metadata: 

269 pandas_index = metadata[key].decode("UTF8") 

270 if pandas_index in arrow_table.schema.names: 

271 dataframe.set_index(pandas_index, inplace=True) 

272 else: 

273 log.warning( 

274 "Index column ``%s`` not available for arrow table conversion to DataFrame", 

275 pandas_index, 

276 ) 

277 

278 return dataframe 

279 

280 

281def arrow_to_astropy(arrow_table: pa.Table) -> atable.Table: 

282 """Convert a pyarrow table to an `astropy.table.Table`. 

283 

284 Parameters 

285 ---------- 

286 arrow_table : `pyarrow.Table` 

287 Input arrow table to convert. If the table has astropy unit 

288 metadata in the schema it will be used in the construction 

289 of the ``astropy.table.Table``. 

290 

291 Returns 

292 ------- 

293 table : `astropy.table.Table` 

294 Converted astropy table. 

295 """ 

296 from astropy.table import Table 

297 

298 astropy_table = Table(arrow_to_numpy_dict(arrow_table)) 

299 

300 _apply_astropy_metadata(astropy_table, arrow_table.schema) 

301 

302 if (key := ASTROPY_PANDAS_INDEX_KEY) in astropy_table.meta: 

303 if astropy_table.meta[key] not in astropy_table.columns: 

304 astropy_table.meta.pop(key) 

305 

306 return astropy_table 

307 

308 

309def arrow_to_numpy(arrow_table: pa.Table) -> np.ndarray | np.ma.MaskedArray: 

310 """Convert a pyarrow table to a structured numpy array. 

311 

312 Parameters 

313 ---------- 

314 arrow_table : `pyarrow.Table` 

315 Input arrow table. 

316 

317 Returns 

318 ------- 

319 array : `numpy.ndarray` or `numpy.ma.MaskedArray` (N,) 

320 Numpy array table with N rows and the same column names 

321 as the input arrow table. Will be masked records if any values 

322 in the table are null. 

323 """ 

324 import numpy as np 

325 

326 numpy_dict = arrow_to_numpy_dict(arrow_table) 

327 

328 has_mask = False 

329 dtype: list[tuple] = [] 

330 for name, col in numpy_dict.items(): 

331 if len(shape := numpy_dict[name].shape) <= 1: 

332 dtype.append((name, col.dtype)) 

333 else: 

334 dtype.append((name, (col.dtype, shape[1:]))) 

335 

336 if not has_mask and isinstance(col, np.ma.MaskedArray): 

337 has_mask = True 

338 

339 if has_mask: 

340 array = np.ma.mrecords.fromarrays(numpy_dict.values(), dtype=dtype) 

341 else: 

342 array = np.rec.fromarrays(numpy_dict.values(), dtype=dtype) 

343 return array 

344 

345 

346def arrow_to_numpy_dict(arrow_table: pa.Table) -> dict[str, np.ndarray]: 

347 """Convert a pyarrow table to a dict of numpy arrays. 

348 

349 Parameters 

350 ---------- 

351 arrow_table : `pyarrow.Table` 

352 Input arrow table. 

353 

354 Returns 

355 ------- 

356 numpy_dict : `dict` [`str`, `numpy.ndarray`] 

357 Dict with keys as the column names, values as the arrays. 

358 """ 

359 import numpy as np 

360 

361 schema = arrow_table.schema 

362 metadata = schema.metadata if schema.metadata is not None else {} 

363 

364 numpy_dict = {} 

365 

366 for name in schema.names: 

367 t = schema.field(name).type 

368 

369 if arrow_table[name].null_count == 0: 

370 # Regular non-masked column 

371 col = arrow_table[name].to_numpy() 

372 else: 

373 # For a masked column, we need to ask arrow to fill the null 

374 # values with an appropriately typed value before conversion. 

375 # Then we apply the mask to get a masked array of the correct type. 

376 null_value: Any 

377 match t: 

378 case t if t in (pa.float64(), pa.float32(), pa.float16()): 

379 null_value = np.nan 

380 case t if t in (pa.int64(), pa.int32(), pa.int16(), pa.int8()): 

381 null_value = -1 

382 case t if t in (pa.bool_(),): 

383 null_value = True 

384 case t if t in (pa.string(), pa.binary()): 

385 null_value = "" 

386 case _: 

387 # This is the fallback for unsigned ints in particular. 

388 null_value = 0 

389 

390 col = np.ma.masked_array( 

391 data=arrow_table[name].fill_null(null_value).to_numpy(), 

392 mask=arrow_table[name].is_null().to_numpy(), 

393 fill_value=null_value, 

394 ) 

395 

396 if t in (pa.string(), pa.binary()): 

397 col = col.astype(_arrow_string_to_numpy_dtype(schema, name, col)) 

398 elif isinstance(t, pa.FixedSizeListType): 

399 if len(col) > 0: 

400 col = np.stack(col) 

401 else: 

402 # this is an empty column, and needs to be coerced to type. 

403 col = col.astype(t.value_type.to_pandas_dtype()) 

404 

405 shape = _multidim_shape_from_metadata(metadata, t.list_size, name) 

406 col = col.reshape((len(arrow_table), *shape)) 

407 

408 numpy_dict[name] = col 

409 

410 return numpy_dict 

411 

412 

413def _numpy_dict_to_numpy(numpy_dict: dict[str, np.ndarray]) -> np.ndarray: 

414 """Convert a dict of numpy arrays to a structured numpy array. 

415 

416 Parameters 

417 ---------- 

418 numpy_dict : `dict` [`str`, `numpy.ndarray`] 

419 Dict with keys as the column names, values as the arrays. 

420 

421 Returns 

422 ------- 

423 array : `numpy.ndarray` (N,) 

424 Numpy array table with N rows and columns names from the dict keys. 

425 """ 

426 return arrow_to_numpy(numpy_dict_to_arrow(numpy_dict)) 

427 

428 

429def _numpy_to_numpy_dict(np_array: np.ndarray) -> dict[str, np.ndarray]: 

430 """Convert a structured numpy array to a dict of numpy arrays. 

431 

432 Parameters 

433 ---------- 

434 np_array : `numpy.ndarray` 

435 Input numpy array with multiple fields. 

436 

437 Returns 

438 ------- 

439 numpy_dict : `dict` [`str`, `numpy.ndarray`] 

440 Dict with keys as the column names, values as the arrays. 

441 """ 

442 return arrow_to_numpy_dict(numpy_to_arrow(np_array)) 

443 

444 

445def numpy_to_arrow(np_array: np.ndarray) -> pa.Table: 

446 """Convert a numpy array table to an arrow table. 

447 

448 Parameters 

449 ---------- 

450 np_array : `numpy.ndarray` 

451 Input numpy array with multiple fields. 

452 

453 Returns 

454 ------- 

455 arrow_table : `pyarrow.Table` 

456 Converted arrow table. 

457 """ 

458 type_list = _numpy_dtype_to_arrow_types(np_array.dtype) 

459 

460 md = {} 

461 md[b"lsst::arrow::rowcount"] = str(len(np_array)) 

462 

463 names = np_array.dtype.names 

464 if names is None: 

465 names = () 

466 

467 for name in names: 

468 _append_numpy_string_metadata(md, name, np_array.dtype[name]) 

469 _append_numpy_multidim_metadata(md, name, np_array.dtype[name]) 

470 

471 schema = pa.schema(type_list, metadata=md) 

472 

473 arrays = _numpy_style_arrays_to_arrow_arrays( 

474 np_array.dtype, 

475 len(np_array), 

476 np_array, 

477 schema, 

478 ) 

479 

480 arrow_table = pa.Table.from_arrays(arrays, schema=schema) 

481 

482 return arrow_table 

483 

484 

485def numpy_dict_to_arrow(numpy_dict: dict[str, np.ndarray]) -> pa.Table: 

486 """Convert a dict of numpy arrays to an arrow table. 

487 

488 Parameters 

489 ---------- 

490 numpy_dict : `dict` [`str`, `numpy.ndarray`] 

491 Dict with keys as the column names, values as the arrays. 

492 

493 Returns 

494 ------- 

495 arrow_table : `pyarrow.Table` 

496 Converted arrow table. 

497 

498 Raises 

499 ------ 

500 ValueError 

501 Raised if columns in ``numpy_dict`` have unequal numbers of 

502 rows. 

503 """ 

504 dtype, rowcount = _numpy_dict_to_dtype(numpy_dict) 

505 type_list = _numpy_dtype_to_arrow_types(dtype) 

506 

507 md = {} 

508 md[b"lsst::arrow::rowcount"] = str(rowcount) 

509 

510 if dtype.names is not None: 

511 for name in dtype.names: 

512 _append_numpy_string_metadata(md, name, dtype[name]) 

513 _append_numpy_multidim_metadata(md, name, dtype[name]) 

514 

515 schema = pa.schema(type_list, metadata=md) 

516 

517 arrays = _numpy_style_arrays_to_arrow_arrays( 

518 dtype, 

519 rowcount, 

520 numpy_dict, 

521 schema, 

522 ) 

523 

524 arrow_table = pa.Table.from_arrays(arrays, schema=schema) 

525 

526 return arrow_table 

527 

528 

529def astropy_to_arrow(astropy_table: atable.Table) -> pa.Table: 

530 """Convert an astropy table to an arrow table. 

531 

532 Parameters 

533 ---------- 

534 astropy_table : `astropy.table.Table` 

535 Input astropy table. 

536 

537 Returns 

538 ------- 

539 arrow_table : `pyarrow.Table` 

540 Converted arrow table. 

541 """ 

542 from astropy.table import meta 

543 

544 type_list = _numpy_dtype_to_arrow_types(astropy_table.dtype) 

545 

546 md = {} 

547 md[b"lsst::arrow::rowcount"] = str(len(astropy_table)) 

548 

549 if (key := ASTROPY_PANDAS_INDEX_KEY) in astropy_table.meta: 

550 md[key.encode()] = astropy_table.meta[key] 

551 

552 for name in astropy_table.dtype.names: 

553 _append_numpy_string_metadata(md, name, astropy_table.dtype[name]) 

554 _append_numpy_multidim_metadata(md, name, astropy_table.dtype[name]) 

555 

556 meta_yaml = meta.get_yaml_from_table(astropy_table) 

557 meta_yaml_str = "\n".join(meta_yaml) 

558 md[b"table_meta_yaml"] = meta_yaml_str 

559 

560 # Convert type list to fields with metadata. 

561 fields = [] 

562 for name, pa_type in type_list: 

563 field_metadata = {} 

564 if description := astropy_table[name].description: 

565 field_metadata["description"] = description 

566 if unit := astropy_table[name].unit: 

567 field_metadata["unit"] = str(unit) 

568 fields.append( 

569 pa.field( 

570 name, 

571 pa_type, 

572 metadata=field_metadata, 

573 ) 

574 ) 

575 

576 schema = pa.schema(fields, metadata=md) 

577 

578 arrays = _numpy_style_arrays_to_arrow_arrays( 

579 astropy_table.dtype, 

580 len(astropy_table), 

581 astropy_table, 

582 schema, 

583 ) 

584 

585 arrow_table = pa.Table.from_arrays(arrays, schema=schema) 

586 

587 return arrow_table 

588 

589 

590def astropy_to_pandas(astropy_table: atable.Table, index: str | None = None) -> pd.DataFrame: 

591 """Convert an astropy table to a pandas dataframe via arrow. 

592 

593 By going via arrow we avoid pandas masked column bugs (e.g. 

594 https://github.com/pandas-dev/pandas/issues/58173) 

595 

596 Parameters 

597 ---------- 

598 astropy_table : `astropy.table.Table` 

599 Input astropy table. 

600 index : `str`, optional 

601 Name of column to set as index. 

602 

603 Returns 

604 ------- 

605 dataframe : `pandas.DataFrame` 

606 Output pandas dataframe. 

607 """ 

608 index_requested = False 

609 if (key := ASTROPY_PANDAS_INDEX_KEY) in astropy_table.meta: 

610 _index = astropy_table.meta[key] 

611 if _index not in astropy_table.columns: 

612 log.warning( 

613 "Index column ``%s`` not available for astropy table conversion to DataFrame", 

614 _index, 

615 ) 

616 _index = None 

617 else: 

618 index_requested = True 

619 _index = index 

620 

621 dataframe = arrow_to_pandas(astropy_to_arrow(astropy_table)) 

622 

623 # Set the index if we have a valid index name, and either the 

624 # index was requested in the call to the function or the dataframe 

625 # was not previously indexed with the call to arrow_to_pandas. 

626 if isinstance(_index, str) and (index_requested or dataframe.index.name is None): 

627 dataframe.set_index(_index, inplace=True) 

628 elif _index and index_requested: 

629 raise RuntimeError("index must be a string or None.") 

630 

631 return dataframe 

632 

633 

634def add_pandas_index_to_astropy(astropy_table: atable.Table, index: str) -> None: 

635 """Add special metadata to an astropy table to indicate a pandas index. 

636 

637 Parameters 

638 ---------- 

639 astropy_table : `astropy.table.Table` 

640 Input astropy table. 

641 index : `str` 

642 Name of column for pandas to set as index, if read as DataFrame. 

643 """ 

644 if index not in astropy_table.columns: 

645 raise ValueError("Column ``%s`` not in astropy table columns to use as pandas index.", index) 

646 astropy_table.meta[ASTROPY_PANDAS_INDEX_KEY] = index 

647 

648 

649def _astropy_to_numpy_dict(astropy_table: atable.Table) -> dict[str, np.ndarray]: 

650 """Convert an astropy table to an arrow table. 

651 

652 Parameters 

653 ---------- 

654 astropy_table : `astropy.table.Table` 

655 Input astropy table. 

656 

657 Returns 

658 ------- 

659 numpy_dict : `dict` [`str`, `numpy.ndarray`] 

660 Dict with keys as the column names, values as the arrays. 

661 """ 

662 return arrow_to_numpy_dict(astropy_to_arrow(astropy_table)) 

663 

664 

665def pandas_to_arrow(dataframe: pd.DataFrame, default_length: int = 10) -> pa.Table: 

666 """Convert a pandas dataframe to an arrow table. 

667 

668 Parameters 

669 ---------- 

670 dataframe : `pandas.DataFrame` 

671 Input pandas dataframe. 

672 default_length : `int`, optional 

673 Default string length when not in metadata or can be inferred 

674 from column. 

675 

676 Returns 

677 ------- 

678 arrow_table : `pyarrow.Table` 

679 Converted arrow table. 

680 """ 

681 try: 

682 arrow_table = pa.Table.from_pandas(dataframe) 

683 except pa.ArrowInvalid as e: 

684 msg = "; ".join(e.args) 

685 msg += "; This is usually because the column is mixed type or has uneven length rows." 

686 e.add_note(msg) 

687 raise 

688 

689 # Update the metadata 

690 md = arrow_table.schema.metadata 

691 

692 md[b"lsst::arrow::rowcount"] = str(arrow_table.num_rows) 

693 

694 # We loop through the arrow table columns because the datatypes have 

695 # been checked and converted from pandas objects. 

696 for name in arrow_table.column_names: 

697 if not name.startswith("__") and arrow_table[name].type == pa.string(): 

698 if len(arrow_table[name]) > 0: 

699 strlen = max(len(row.as_py()) for row in arrow_table[name] if row.is_valid) 

700 else: 

701 strlen = default_length 

702 md[f"lsst::arrow::len::{name}".encode()] = str(strlen) 

703 

704 arrow_table = arrow_table.replace_schema_metadata(md) 

705 

706 return arrow_table 

707 

708 

709def pandas_to_astropy(dataframe: pd.DataFrame) -> atable.Table: 

710 """Convert a pandas dataframe to an astropy table, preserving indexes. 

711 

712 Parameters 

713 ---------- 

714 dataframe : `pandas.DataFrame` 

715 Input pandas dataframe. 

716 

717 Returns 

718 ------- 

719 astropy_table : `astropy.table.Table` 

720 Converted astropy table. 

721 """ 

722 import pandas as pd 

723 

724 if isinstance(dataframe.columns, pd.MultiIndex): 

725 raise ValueError("Cannot convert a multi-index dataframe to an astropy table.") 

726 

727 return arrow_to_astropy(pandas_to_arrow(dataframe)) 

728 

729 

730def _pandas_to_numpy_dict(dataframe: pd.DataFrame) -> dict[str, np.ndarray]: 

731 """Convert a pandas dataframe to an dict of numpy arrays. 

732 

733 Parameters 

734 ---------- 

735 dataframe : `pandas.DataFrame` 

736 Input pandas dataframe. 

737 

738 Returns 

739 ------- 

740 numpy_dict : `dict` [`str`, `numpy.ndarray`] 

741 Dict with keys as the column names, values as the arrays. 

742 """ 

743 return arrow_to_numpy_dict(pandas_to_arrow(dataframe)) 

744 

745 

746def numpy_to_astropy(np_array: np.ndarray) -> atable.Table: 

747 """Convert a numpy table to an astropy table. 

748 

749 Parameters 

750 ---------- 

751 np_array : `numpy.ndarray` 

752 Input numpy array with multiple fields. 

753 

754 Returns 

755 ------- 

756 astropy_table : `astropy.table.Table` 

757 Converted astropy table. 

758 """ 

759 from astropy.table import Table 

760 

761 return Table(data=np_array, copy=False) 

762 

763 

764def arrow_schema_to_pandas_index(schema: pa.Schema) -> pd.Index | pd.MultiIndex: 

765 """Convert an arrow schema to a pandas index/multiindex. 

766 

767 Parameters 

768 ---------- 

769 schema : `pyarrow.Schema` 

770 Input pyarrow schema. 

771 

772 Returns 

773 ------- 

774 index : `pandas.Index` or `pandas.MultiIndex` 

775 Converted pandas index. 

776 """ 

777 import pandas as pd 

778 

779 if b"pandas" in schema.metadata: 

780 md = json.loads(schema.metadata[b"pandas"]) 

781 indexes = md["column_indexes"] 

782 len_indexes = len(indexes) 

783 else: 

784 len_indexes = 0 

785 

786 if len_indexes <= 1: 

787 return pd.Index(name for name in schema.names if not name.startswith("__")) 

788 else: 

789 raw_columns = _split_multi_index_column_names(len(indexes), schema.names) 

790 return pd.MultiIndex.from_tuples(raw_columns, names=[f["name"] for f in indexes]) 

791 

792 

793def arrow_schema_to_column_list(schema: pa.Schema) -> list[str]: 

794 """Convert an arrow schema to a list of string column names. 

795 

796 Parameters 

797 ---------- 

798 schema : `pyarrow.Schema` 

799 Input pyarrow schema. 

800 

801 Returns 

802 ------- 

803 column_list : `list` [`str`] 

804 Converted list of column names. 

805 """ 

806 return list(schema.names) 

807 

808 

809class DataFrameSchema: 

810 """Wrapper class for a schema for a pandas DataFrame. 

811 

812 Parameters 

813 ---------- 

814 dataframe : `pandas.DataFrame` 

815 Dataframe to turn into a schema. 

816 """ 

817 

818 def __init__(self, dataframe: pd.DataFrame) -> None: 

819 self._schema = dataframe.loc[[False] * len(dataframe)] 

820 

821 @classmethod 

822 def from_arrow(cls, schema: pa.Schema) -> DataFrameSchema: 

823 """Convert an arrow schema into a `DataFrameSchema`. 

824 

825 Parameters 

826 ---------- 

827 schema : `pyarrow.Schema` 

828 The pyarrow schema to convert. 

829 

830 Returns 

831 ------- 

832 dataframe_schema : `DataFrameSchema` 

833 Converted dataframe schema. 

834 """ 

835 empty_table = pa.Table.from_pylist([] * len(schema.names), schema=schema) 

836 

837 return cls(empty_table.to_pandas()) 

838 

839 def to_arrow_schema(self) -> pa.Schema: 

840 """Convert to an arrow schema. 

841 

842 Returns 

843 ------- 

844 arrow_schema : `pyarrow.Schema` 

845 Converted pyarrow schema. 

846 """ 

847 arrow_table = pa.Table.from_pandas(self._schema) 

848 

849 return arrow_table.schema 

850 

851 def to_arrow_numpy_schema(self) -> ArrowNumpySchema: 

852 """Convert to an `ArrowNumpySchema`. 

853 

854 Returns 

855 ------- 

856 arrow_numpy_schema : `ArrowNumpySchema` 

857 Converted arrow numpy schema. 

858 """ 

859 return ArrowNumpySchema.from_arrow(self.to_arrow_schema()) 

860 

861 def to_arrow_astropy_schema(self) -> ArrowAstropySchema: 

862 """Convert to an ArrowAstropySchema. 

863 

864 Returns 

865 ------- 

866 arrow_astropy_schema : `ArrowAstropySchema` 

867 Converted arrow astropy schema. 

868 """ 

869 return ArrowAstropySchema.from_arrow(self.to_arrow_schema()) 

870 

871 @property 

872 def schema(self) -> np.dtype: 

873 return self._schema 

874 

875 def __repr__(self) -> str: 

876 return repr(self._schema) 

877 

878 def __eq__(self, other: object) -> bool: 

879 if not isinstance(other, DataFrameSchema): 

880 return NotImplemented 

881 

882 return self._schema.equals(other._schema) 

883 

884 

885class ArrowAstropySchema: 

886 """Wrapper class for a schema for an astropy table. 

887 

888 Parameters 

889 ---------- 

890 astropy_table : `astropy.table.Table` 

891 Input astropy table. 

892 """ 

893 

894 def __init__(self, astropy_table: atable.Table) -> None: 

895 self._schema = astropy_table[:0] 

896 

897 @classmethod 

898 def from_arrow(cls, schema: pa.Schema) -> ArrowAstropySchema: 

899 """Convert an arrow schema into a ArrowAstropySchema. 

900 

901 Parameters 

902 ---------- 

903 schema : `pyarrow.Schema` 

904 Input pyarrow schema. 

905 

906 Returns 

907 ------- 

908 astropy_schema : `ArrowAstropySchema` 

909 Converted arrow astropy schema. 

910 """ 

911 import numpy as np 

912 from astropy.table import Table 

913 

914 dtype = _schema_to_dtype_list(schema) 

915 

916 data = np.zeros(0, dtype=dtype) 

917 astropy_table = Table(data=data) 

918 

919 _apply_astropy_metadata(astropy_table, schema) 

920 

921 return cls(astropy_table) 

922 

923 def to_arrow_schema(self) -> pa.Schema: 

924 """Convert to an arrow schema. 

925 

926 Returns 

927 ------- 

928 arrow_schema : `pyarrow.Schema` 

929 Converted pyarrow schema. 

930 """ 

931 return astropy_to_arrow(self._schema).schema 

932 

933 def to_dataframe_schema(self) -> DataFrameSchema: 

934 """Convert to a DataFrameSchema. 

935 

936 Returns 

937 ------- 

938 dataframe_schema : `DataFrameSchema` 

939 Converted dataframe schema. 

940 """ 

941 return DataFrameSchema.from_arrow(astropy_to_arrow(self._schema).schema) 

942 

943 def to_arrow_numpy_schema(self) -> ArrowNumpySchema: 

944 """Convert to an `ArrowNumpySchema`. 

945 

946 Returns 

947 ------- 

948 arrow_numpy_schema : `ArrowNumpySchema` 

949 Converted arrow numpy schema. 

950 """ 

951 return ArrowNumpySchema.from_arrow(astropy_to_arrow(self._schema).schema) 

952 

953 @property 

954 def schema(self) -> atable.Table: 

955 return self._schema 

956 

957 def __repr__(self) -> str: 

958 return repr(self._schema) 

959 

960 def __eq__(self, other: object) -> bool: 

961 if not isinstance(other, ArrowAstropySchema): 

962 return NotImplemented 

963 

964 # If this comparison passes then the two tables have the 

965 # same column names. 

966 if self._schema.dtype != other._schema.dtype: 

967 return False 

968 

969 for name in self._schema.columns: 

970 if not self._schema[name].unit == other._schema[name].unit: 

971 return False 

972 if not self._schema[name].description == other._schema[name].description: 

973 return False 

974 if not self._schema[name].format == other._schema[name].format: 

975 return False 

976 

977 return True 

978 

979 

980class ArrowNumpySchema: 

981 """Wrapper class for a schema for a numpy ndarray. 

982 

983 Parameters 

984 ---------- 

985 numpy_dtype : `numpy.dtype` 

986 Numpy dtype to convert. 

987 """ 

988 

989 def __init__(self, numpy_dtype: np.dtype) -> None: 

990 self._dtype = numpy_dtype 

991 

992 @classmethod 

993 def from_arrow(cls, schema: pa.Schema) -> ArrowNumpySchema: 

994 """Convert an arrow schema into an `ArrowNumpySchema`. 

995 

996 Parameters 

997 ---------- 

998 schema : `pyarrow.Schema` 

999 Pyarrow schema to convert. 

1000 

1001 Returns 

1002 ------- 

1003 numpy_schema : `ArrowNumpySchema` 

1004 Converted arrow numpy schema. 

1005 """ 

1006 import numpy as np 

1007 

1008 dtype = _schema_to_dtype_list(schema) 

1009 

1010 return cls(np.dtype(dtype)) 

1011 

1012 def to_arrow_astropy_schema(self) -> ArrowAstropySchema: 

1013 """Convert to an `ArrowAstropySchema`. 

1014 

1015 Returns 

1016 ------- 

1017 astropy_schema : `ArrowAstropySchema` 

1018 Converted arrow astropy schema. 

1019 """ 

1020 import numpy as np 

1021 

1022 return ArrowAstropySchema.from_arrow(numpy_to_arrow(np.zeros(0, dtype=self._dtype)).schema) 

1023 

1024 def to_dataframe_schema(self) -> DataFrameSchema: 

1025 """Convert to a `DataFrameSchema`. 

1026 

1027 Returns 

1028 ------- 

1029 dataframe_schema : `DataFrameSchema` 

1030 Converted dataframe schema. 

1031 """ 

1032 import numpy as np 

1033 

1034 return DataFrameSchema.from_arrow(numpy_to_arrow(np.zeros(0, dtype=self._dtype)).schema) 

1035 

1036 def to_arrow_schema(self) -> pa.Schema: 

1037 """Convert to a `pyarrow.Schema`. 

1038 

1039 Returns 

1040 ------- 

1041 arrow_schema : `pyarrow.Schema` 

1042 Converted pyarrow schema. 

1043 """ 

1044 import numpy as np 

1045 

1046 return numpy_to_arrow(np.zeros(0, dtype=self._dtype)).schema 

1047 

1048 @property 

1049 def schema(self) -> np.dtype: 

1050 return self._dtype 

1051 

1052 def __repr__(self) -> str: 

1053 return repr(self._dtype) 

1054 

1055 def __eq__(self, other: object) -> bool: 

1056 if not isinstance(other, ArrowNumpySchema): 

1057 return NotImplemented 

1058 

1059 if not self._dtype == other._dtype: 

1060 return False 

1061 

1062 return True 

1063 

1064 

1065def _split_multi_index_column_names(n: int, names: Iterable[str]) -> list[Sequence[str]]: 

1066 """Split a string that represents a multi-index column. 

1067 

1068 PyArrow maps Pandas' multi-index column names (which are tuples in Python) 

1069 to flat strings on disk. This routine exists to reconstruct the original 

1070 tuple. 

1071 

1072 Parameters 

1073 ---------- 

1074 n : `int` 

1075 Number of levels in the `pandas.MultiIndex` that is being 

1076 reconstructed. 

1077 names : `~collections.abc.Iterable` [`str`] 

1078 Strings to be split. 

1079 

1080 Returns 

1081 ------- 

1082 column_names : `list` [`tuple` [`str`]] 

1083 A list of multi-index column name tuples. 

1084 """ 

1085 column_names: list[Sequence[str]] = [] 

1086 

1087 pattern = re.compile(r"\({}\)".format(", ".join(["'(.*)'"] * n))) 

1088 for name in names: 

1089 m = re.search(pattern, name) 

1090 if m is not None: 

1091 column_names.append(m.groups()) 

1092 

1093 return column_names 

1094 

1095 

1096def _standardize_multi_index_columns( 

1097 pd_index: pd.MultiIndex, 

1098 columns: Any, 

1099 stringify: bool = True, 

1100) -> list[str | Sequence[Any]]: 

1101 """Transform a dictionary/iterable index from a multi-index column list 

1102 into a string directly understandable by PyArrow. 

1103 

1104 Parameters 

1105 ---------- 

1106 pd_index : `pandas.MultiIndex` 

1107 Pandas multi-index. 

1108 columns : `list` [`tuple`] or `dict` [`str`, `str` or `list` [`str`]] 

1109 Columns to standardize. 

1110 stringify : `bool`, optional 

1111 Should the column names be stringified? 

1112 

1113 Returns 

1114 ------- 

1115 names : `list` [`str`] 

1116 Stringified representation of a multi-index column name. 

1117 """ 

1118 index_level_names = tuple(pd_index.names) 

1119 

1120 names: list[str | Sequence[Any]] = [] 

1121 

1122 if isinstance(columns, list): 

1123 for requested in columns: 

1124 if not isinstance(requested, tuple): 

1125 raise ValueError( 

1126 "Columns parameter for multi-index data frame must be a dictionary or list of tuples. " 

1127 f"Instead got a {get_full_type_name(requested)}." 

1128 ) 

1129 if stringify: 

1130 names.append(str(requested)) 

1131 else: 

1132 names.append(requested) 

1133 else: 

1134 if not isinstance(columns, collections.abc.Mapping): 

1135 raise ValueError( 

1136 "Columns parameter for multi-index data frame must be a dictionary or list of tuples. " 

1137 f"Instead got a {get_full_type_name(columns)}." 

1138 ) 

1139 if not set(index_level_names).issuperset(columns.keys()): 

1140 raise ValueError( 

1141 f"Cannot use dict with keys {set(columns.keys())} to select columns from {index_level_names}." 

1142 ) 

1143 factors = [ 

1144 ensure_iterable(columns.get(level, pd_index.levels[i])) 

1145 for i, level in enumerate(index_level_names) 

1146 ] 

1147 for requested in itertools.product(*factors): 

1148 for i, value in enumerate(requested): 

1149 if value not in pd_index.levels[i]: 

1150 raise ValueError(f"Unrecognized value {value!r} for index {index_level_names[i]!r}.") 

1151 if stringify: 

1152 names.append(str(requested)) 

1153 else: 

1154 names.append(requested) 

1155 

1156 return names 

1157 

1158 

1159def _apply_astropy_metadata(astropy_table: atable.Table, arrow_schema: pa.Schema) -> None: 

1160 """Apply any astropy metadata from the schema metadata. 

1161 

1162 Parameters 

1163 ---------- 

1164 astropy_table : `astropy.table.Table` 

1165 Table to apply metadata. 

1166 arrow_schema : `pyarrow.Schema` 

1167 Arrow schema with metadata. 

1168 """ 

1169 from astropy.table import meta 

1170 

1171 metadata = arrow_schema.metadata if arrow_schema.metadata is not None else {} 

1172 

1173 # Check if we have a special astropy metadata header yaml. 

1174 meta_yaml = metadata.get(b"table_meta_yaml", None) 

1175 if meta_yaml: 

1176 meta_yaml = meta_yaml.decode("UTF8").split("\n") 

1177 meta_hdr = meta.get_header_from_yaml(meta_yaml) 

1178 

1179 # Set description, format, unit, meta from the column 

1180 # metadata that was serialized with the table. 

1181 header_cols = {x["name"]: x for x in meta_hdr["datatype"]} 

1182 for col in astropy_table.columns.values(): 

1183 for attr in ("description", "format", "unit", "meta"): 

1184 if attr in header_cols[col.name]: 

1185 setattr(col, attr, header_cols[col.name][attr]) 

1186 

1187 if "meta" in meta_hdr: 

1188 astropy_table.meta.update(meta_hdr["meta"]) 

1189 else: 

1190 # If we don't have astropy header data, we may have arrow field 

1191 # metadata. 

1192 for name in arrow_schema.names: 

1193 field_metadata = arrow_schema.field(name).metadata 

1194 if field_metadata is None: 

1195 continue 

1196 if ( 

1197 b"description" in field_metadata 

1198 and (description := field_metadata[b"description"].decode("UTF-8")) != "" 

1199 ): 

1200 astropy_table[name].description = description 

1201 if b"unit" in field_metadata and (unit := field_metadata[b"unit"].decode("UTF-8")) != "": 

1202 astropy_table[name].unit = unit 

1203 

1204 

1205def _arrow_string_to_numpy_dtype( 

1206 schema: pa.Schema, name: str, numpy_column: np.ndarray | None = None, default_length: int = 10 

1207) -> str: 

1208 """Get the numpy dtype string associated with an arrow column. 

1209 

1210 Parameters 

1211 ---------- 

1212 schema : `pyarrow.Schema` 

1213 Arrow table schema. 

1214 name : `str` 

1215 Column name. 

1216 numpy_column : `numpy.ndarray`, optional 

1217 Column to determine numpy string dtype. 

1218 default_length : `int`, optional 

1219 Default string length when not in metadata or can be inferred 

1220 from column. 

1221 

1222 Returns 

1223 ------- 

1224 dtype_str : `str` 

1225 Numpy dtype string. 

1226 """ 

1227 # Special-case for string and binary columns 

1228 md_name = f"lsst::arrow::len::{name}" 

1229 strlen = default_length 

1230 metadata = schema.metadata if schema.metadata is not None else {} 

1231 if (encoded := md_name.encode("UTF-8")) in metadata: 

1232 # String/bytes length from header. 

1233 strlen = int(schema.metadata[encoded]) 

1234 elif numpy_column is not None and len(numpy_column) > 0: 

1235 lengths = [len(row) for row in numpy_column if row] 

1236 strlen = max(lengths) if lengths else 0 

1237 

1238 dtype = f"U{strlen}" if schema.field(name).type == pa.string() else f"|S{strlen}" 

1239 

1240 return dtype 

1241 

1242 

1243def _append_numpy_string_metadata(metadata: dict[bytes, str], name: str, dtype: np.dtype) -> None: 

1244 """Append numpy string length keys to arrow metadata. 

1245 

1246 All column types are handled, but the metadata is only modified for 

1247 string and byte columns. 

1248 

1249 Parameters 

1250 ---------- 

1251 metadata : `dict` [`bytes`, `str`] 

1252 Metadata dictionary; modified in place. 

1253 name : `str` 

1254 Column name. 

1255 dtype : `np.dtype` 

1256 Numpy dtype. 

1257 """ 

1258 import numpy as np 

1259 

1260 if dtype.type is np.str_: 

1261 metadata[f"lsst::arrow::len::{name}".encode()] = str(dtype.itemsize // 4) 

1262 metadata[f"table::len::{name}".encode()] = str(dtype.itemsize // 4) 

1263 elif dtype.type is np.bytes_: 

1264 metadata[f"lsst::arrow::len::{name}".encode()] = str(dtype.itemsize) 

1265 metadata[f"table::len::{name}".encode()] = str(dtype.itemsize) 

1266 

1267 

1268def _append_numpy_multidim_metadata(metadata: dict[bytes, str], name: str, dtype: np.dtype) -> None: 

1269 """Append numpy multi-dimensional shapes to arrow metadata. 

1270 

1271 All column types are handled, but the metadata is only modified for 

1272 multi-dimensional columns. 

1273 

1274 Parameters 

1275 ---------- 

1276 metadata : `dict` [`bytes`, `str`] 

1277 Metadata dictionary; modified in place. 

1278 name : `str` 

1279 Column name. 

1280 dtype : `np.dtype` 

1281 Numpy dtype. 

1282 """ 

1283 if len(dtype.shape) > 1: 

1284 metadata[f"lsst::arrow::shape::{name}".encode()] = str(dtype.shape) 

1285 

1286 

1287def _multidim_shape_from_metadata(metadata: dict[bytes, bytes], list_size: int, name: str) -> tuple[int, ...]: 

1288 """Retrieve the shape from the metadata, if available. 

1289 

1290 Parameters 

1291 ---------- 

1292 metadata : `dict` [`bytes`, `bytes`] 

1293 Metadata dictionary. 

1294 list_size : `int` 

1295 Size of the list datatype. 

1296 name : `str` 

1297 Column name. 

1298 

1299 Returns 

1300 ------- 

1301 shape : `tuple` [`int`] 

1302 Shape associated with the column. 

1303 

1304 Raises 

1305 ------ 

1306 RuntimeError 

1307 Raised if metadata is found but has incorrect format. 

1308 """ 

1309 md_name = f"lsst::arrow::shape::{name}" 

1310 if (encoded := md_name.encode("UTF-8")) in metadata: 

1311 groups = re.search(r"\((.*)\)", metadata[encoded].decode("UTF-8")) 

1312 if groups is None: 

1313 raise RuntimeError("Illegal value found in metadata.") 

1314 shape = tuple(int(x) for x in groups[1].split(",") if x != "") 

1315 else: 

1316 shape = (list_size,) 

1317 

1318 return shape 

1319 

1320 

1321def _schema_to_dtype_list(schema: pa.Schema) -> list[tuple[str, tuple[Any] | str]]: 

1322 """Convert a pyarrow schema to a numpy dtype. 

1323 

1324 Parameters 

1325 ---------- 

1326 schema : `pyarrow.Schema` 

1327 Input pyarrow schema. 

1328 

1329 Returns 

1330 ------- 

1331 dtype_list: `list` [`tuple`] 

1332 A list with name, type pairs. 

1333 """ 

1334 metadata = schema.metadata if schema.metadata is not None else {} 

1335 

1336 dtype: list[Any] = [] 

1337 for name in schema.names: 

1338 t = schema.field(name).type 

1339 if isinstance(t, pa.FixedSizeListType): 

1340 shape = _multidim_shape_from_metadata(metadata, t.list_size, name) 

1341 dtype.append((name, (t.value_type.to_pandas_dtype(), shape))) 

1342 elif t not in (pa.string(), pa.binary()): 

1343 dtype.append((name, t.to_pandas_dtype())) 

1344 else: 

1345 dtype.append((name, _arrow_string_to_numpy_dtype(schema, name))) 

1346 

1347 return dtype 

1348 

1349 

1350def _numpy_dtype_to_arrow_types(dtype: np.dtype) -> list[Any]: 

1351 """Convert a numpy dtype to a list of arrow types. 

1352 

1353 Parameters 

1354 ---------- 

1355 dtype : `numpy.dtype` 

1356 Numpy dtype to convert. 

1357 

1358 Returns 

1359 ------- 

1360 type_list : `list` [`object`] 

1361 Converted list of arrow types. 

1362 """ 

1363 from math import prod 

1364 

1365 import numpy as np 

1366 

1367 type_list: list[Any] = [] 

1368 if dtype.names is None: 

1369 return type_list 

1370 

1371 for name in dtype.names: 

1372 dt = dtype[name] 

1373 arrow_type: Any 

1374 if len(dt.shape) > 0: 

1375 arrow_type = pa.list_( 

1376 pa.from_numpy_dtype(cast(tuple[np.dtype, tuple[int, ...]], dt.subdtype)[0].type), 

1377 prod(dt.shape), 

1378 ) 

1379 elif dt.type == np.datetime64: 

1380 time_unit = "ns" if "ns" in dt.str else "us" 

1381 # The pa.timestamp() is the correct datatype to round-trip 

1382 # a numpy datetime64[ns] or datetime[us] array. 

1383 arrow_type = pa.timestamp(time_unit) 

1384 else: 

1385 try: 

1386 arrow_type = pa.from_numpy_dtype(dt.type) 

1387 except pa.ArrowNotImplementedError as e: 

1388 msg = f"Could not serialize column {name} (type {str(dt)}) to Parquet." 

1389 if dt == np.dtype("O"): 

1390 msg += " This is usually because the column is mixed type or has uneven length rows." 

1391 e.add_note(msg) 

1392 raise 

1393 type_list.append((name, arrow_type)) 

1394 

1395 return type_list 

1396 

1397 

1398def _numpy_dict_to_dtype(numpy_dict: dict[str, np.ndarray]) -> tuple[np.dtype, int]: 

1399 """Extract equivalent table dtype from dict of numpy arrays. 

1400 

1401 Parameters 

1402 ---------- 

1403 numpy_dict : `dict` [`str`, `numpy.ndarray`] 

1404 Dict with keys as the column names, values as the arrays. 

1405 

1406 Returns 

1407 ------- 

1408 dtype : `numpy.dtype` 

1409 dtype of equivalent table. 

1410 rowcount : `int` 

1411 Number of rows in the table. 

1412 

1413 Raises 

1414 ------ 

1415 ValueError if columns in numpy_dict have unequal numbers of rows. 

1416 """ 

1417 import numpy as np 

1418 

1419 dtype_list: list[tuple] = [] 

1420 rowcount = 0 

1421 for name, col in numpy_dict.items(): 

1422 if rowcount == 0: 

1423 rowcount = len(col) 

1424 if len(col) != rowcount: 

1425 raise ValueError(f"Column {name} has a different number of rows.") 

1426 if len(col.shape) == 1: 

1427 dtype_list.append((name, col.dtype)) 

1428 else: 

1429 dtype_list.append((name, (col.dtype, col.shape[1:]))) 

1430 dtype = np.dtype(dtype_list) 

1431 

1432 return (dtype, rowcount) 

1433 

1434 

1435def _numpy_style_arrays_to_arrow_arrays( 

1436 dtype: np.dtype, 

1437 rowcount: int, 

1438 np_style_arrays: dict[str, np.ndarray] | np.ndarray | atable.Table, 

1439 schema: pa.Schema, 

1440) -> list[pa.Array]: 

1441 """Convert numpy-style arrays to arrow arrays. 

1442 

1443 Parameters 

1444 ---------- 

1445 dtype : `numpy.dtype` 

1446 Numpy dtype of input table/arrays. 

1447 rowcount : `int` 

1448 Number of rows in input table/arrays. 

1449 np_style_arrays : `dict` [`str`, `np.ndarray`] or `np.ndarray` 

1450 or `astropy.table.Table` 

1451 Arrays to convert to arrow. 

1452 schema : `pyarrow.Schema` 

1453 Schema of arrow table. 

1454 

1455 Returns 

1456 ------- 

1457 arrow_arrays : `list` [`pyarrow.Array`] 

1458 List of converted pyarrow arrays. 

1459 """ 

1460 import numpy as np 

1461 

1462 arrow_arrays: list[pa.Array] = [] 

1463 if dtype.names is None: 

1464 return arrow_arrays 

1465 

1466 for name in dtype.names: 

1467 dt = dtype[name] 

1468 val: Any 

1469 if len(dt.shape) > 0: 

1470 if rowcount > 0: 

1471 val = np.split(np_style_arrays[name].ravel(), rowcount) 

1472 else: 

1473 val = [] 

1474 else: 

1475 val = np_style_arrays[name] 

1476 

1477 try: 

1478 arrow_arrays.append(pa.array(val, type=schema.field(name).type)) 

1479 except pa.ArrowNotImplementedError as err: 

1480 # Check if val is big-endian. 

1481 if (np.little_endian and val.dtype.byteorder == ">") or ( 

1482 not np.little_endian and val.dtype.byteorder == "=" 

1483 ): 

1484 # We need to convert the array to little-endian. 

1485 val2 = val.byteswap() 

1486 val2.dtype = val2.dtype.newbyteorder("<") 

1487 arrow_arrays.append(pa.array(val2, type=schema.field(name).type)) 

1488 else: 

1489 # This failed for some other reason so raise the exception. 

1490 raise err 

1491 

1492 return arrow_arrays 

1493 

1494 

1495def compute_row_group_size(schema: pa.Schema, target_size: int = TARGET_ROW_GROUP_BYTES) -> int: 

1496 """Compute approximate row group size for a given arrow schema. 

1497 

1498 Given a schema, this routine will compute the number of rows in a row group 

1499 that targets the persisted size on disk (or smaller). The exact size on 

1500 disk depends on the compression settings and ratios; typical binary data 

1501 tables will have around 15-20% compression with the pyarrow default 

1502 ``snappy`` compression algorithm. 

1503 

1504 Parameters 

1505 ---------- 

1506 schema : `pyarrow.Schema` 

1507 Arrow table schema. 

1508 target_size : `int`, optional 

1509 The target size (in bytes). 

1510 

1511 Returns 

1512 ------- 

1513 row_group_size : `int` 

1514 Number of rows per row group to hit the target size. 

1515 """ 

1516 bit_width = 0 

1517 

1518 metadata = schema.metadata if schema.metadata is not None else {} 

1519 

1520 for name in schema.names: 

1521 t = schema.field(name).type 

1522 

1523 if t in (pa.string(), pa.binary()): 

1524 md_name = f"lsst::arrow::len::{name}" 

1525 

1526 if (encoded := md_name.encode("UTF-8")) in metadata: 

1527 # String/bytes length from header. 

1528 strlen = int(schema.metadata[encoded]) 

1529 else: 

1530 # We don't know the string width, so guess something. 

1531 strlen = 10 

1532 

1533 # Assuming UTF-8 encoding, and very few wide characters. 

1534 t_width = 8 * strlen 

1535 elif isinstance(t, pa.FixedSizeListType): 

1536 if t.value_type == pa.null(): 

1537 t_width = 0 

1538 else: 

1539 t_width = t.list_size * t.value_type.bit_width 

1540 elif t == pa.null(): 

1541 t_width = 0 

1542 elif isinstance(t, pa.ListType): 

1543 if t.value_type == pa.null(): 

1544 t_width = 0 

1545 else: 

1546 # This is a variable length list, just choose 

1547 # something arbitrary. 

1548 t_width = 10 * t.value_type.bit_width 

1549 else: 

1550 t_width = t.bit_width 

1551 

1552 bit_width += t_width 

1553 

1554 # Insist it is at least 1 byte wide to avoid any divide-by-zero errors. 

1555 if bit_width < 8: 

1556 bit_width = 8 

1557 

1558 byte_width = bit_width // 8 

1559 

1560 return target_size // byte_width